Edit on GitHub

sqlglot.optimizer.qualify_columns

  1from __future__ import annotations
  2
  3import itertools
  4import typing as t
  5
  6from sqlglot import alias, exp
  7from sqlglot.dialects.dialect import Dialect, DialectType
  8from sqlglot.errors import OptimizeError
  9from sqlglot.helper import seq_get, SingleValuedMapping
 10from sqlglot.optimizer.annotate_types import TypeAnnotator
 11from sqlglot.optimizer.scope import Scope, build_scope, traverse_scope, walk_in_scope
 12from sqlglot.optimizer.simplify import simplify_parens
 13from sqlglot.schema import Schema, ensure_schema
 14
 15if t.TYPE_CHECKING:
 16    from sqlglot._typing import E
 17
 18
 19def qualify_columns(
 20    expression: exp.Expression,
 21    schema: t.Dict | Schema,
 22    expand_alias_refs: bool = True,
 23    expand_stars: bool = True,
 24    infer_schema: t.Optional[bool] = None,
 25) -> exp.Expression:
 26    """
 27    Rewrite sqlglot AST to have fully qualified columns.
 28
 29    Example:
 30        >>> import sqlglot
 31        >>> schema = {"tbl": {"col": "INT"}}
 32        >>> expression = sqlglot.parse_one("SELECT col FROM tbl")
 33        >>> qualify_columns(expression, schema).sql()
 34        'SELECT tbl.col AS col FROM tbl'
 35
 36    Args:
 37        expression: Expression to qualify.
 38        schema: Database schema.
 39        expand_alias_refs: Whether to expand references to aliases.
 40        expand_stars: Whether to expand star queries. This is a necessary step
 41            for most of the optimizer's rules to work; do not set to False unless you
 42            know what you're doing!
 43        infer_schema: Whether to infer the schema if missing.
 44
 45    Returns:
 46        The qualified expression.
 47
 48    Notes:
 49        - Currently only handles a single PIVOT or UNPIVOT operator
 50    """
 51    schema = ensure_schema(schema)
 52    annotator = TypeAnnotator(schema)
 53    infer_schema = schema.empty if infer_schema is None else infer_schema
 54    dialect = Dialect.get_or_raise(schema.dialect)
 55    pseudocolumns = dialect.PSEUDOCOLUMNS
 56
 57    for scope in traverse_scope(expression):
 58        resolver = Resolver(scope, schema, infer_schema=infer_schema)
 59        _pop_table_column_aliases(scope.ctes)
 60        _pop_table_column_aliases(scope.derived_tables)
 61        using_column_tables = _expand_using(scope, resolver)
 62
 63        if (schema.empty or dialect.FORCE_EARLY_ALIAS_REF_EXPANSION) and expand_alias_refs:
 64            _expand_alias_refs(
 65                scope,
 66                resolver,
 67                expand_only_groupby=dialect.EXPAND_ALIAS_REFS_EARLY_ONLY_IN_GROUP_BY,
 68            )
 69
 70        _convert_columns_to_dots(scope, resolver)
 71        _qualify_columns(scope, resolver)
 72
 73        if not schema.empty and expand_alias_refs:
 74            _expand_alias_refs(scope, resolver)
 75
 76        if not isinstance(scope.expression, exp.UDTF):
 77            if expand_stars:
 78                _expand_stars(
 79                    scope,
 80                    resolver,
 81                    using_column_tables,
 82                    pseudocolumns,
 83                    annotator,
 84                )
 85            qualify_outputs(scope)
 86
 87        _expand_group_by(scope, dialect)
 88        _expand_order_by(scope, resolver)
 89
 90        if dialect == "bigquery":
 91            annotator.annotate_scope(scope)
 92
 93    return expression
 94
 95
 96def validate_qualify_columns(expression: E) -> E:
 97    """Raise an `OptimizeError` if any columns aren't qualified"""
 98    all_unqualified_columns = []
 99    for scope in traverse_scope(expression):
100        if isinstance(scope.expression, exp.Select):
101            unqualified_columns = scope.unqualified_columns
102
103            if scope.external_columns and not scope.is_correlated_subquery and not scope.pivots:
104                column = scope.external_columns[0]
105                for_table = f" for table: '{column.table}'" if column.table else ""
106                raise OptimizeError(f"Column '{column}' could not be resolved{for_table}")
107
108            if unqualified_columns and scope.pivots and scope.pivots[0].unpivot:
109                # New columns produced by the UNPIVOT can't be qualified, but there may be columns
110                # under the UNPIVOT's IN clause that can and should be qualified. We recompute
111                # this list here to ensure those in the former category will be excluded.
112                unpivot_columns = set(_unpivot_columns(scope.pivots[0]))
113                unqualified_columns = [c for c in unqualified_columns if c not in unpivot_columns]
114
115            all_unqualified_columns.extend(unqualified_columns)
116
117    if all_unqualified_columns:
118        raise OptimizeError(f"Ambiguous columns: {all_unqualified_columns}")
119
120    return expression
121
122
123def _unpivot_columns(unpivot: exp.Pivot) -> t.Iterator[exp.Column]:
124    name_column = []
125    field = unpivot.args.get("field")
126    if isinstance(field, exp.In) and isinstance(field.this, exp.Column):
127        name_column.append(field.this)
128
129    value_columns = (c for e in unpivot.expressions for c in e.find_all(exp.Column))
130    return itertools.chain(name_column, value_columns)
131
132
133def _pop_table_column_aliases(derived_tables: t.List[exp.CTE | exp.Subquery]) -> None:
134    """
135    Remove table column aliases.
136
137    For example, `col1` and `col2` will be dropped in SELECT ... FROM (SELECT ...) AS foo(col1, col2)
138    """
139    for derived_table in derived_tables:
140        if isinstance(derived_table.parent, exp.With) and derived_table.parent.recursive:
141            continue
142        table_alias = derived_table.args.get("alias")
143        if table_alias:
144            table_alias.args.pop("columns", None)
145
146
147def _expand_using(scope: Scope, resolver: Resolver) -> t.Dict[str, t.Any]:
148    joins = list(scope.find_all(exp.Join))
149    names = {join.alias_or_name for join in joins}
150    ordered = [key for key in scope.selected_sources if key not in names]
151
152    # Mapping of automatically joined column names to an ordered set of source names (dict).
153    column_tables: t.Dict[str, t.Dict[str, t.Any]] = {}
154
155    for i, join in enumerate(joins):
156        using = join.args.get("using")
157
158        if not using:
159            continue
160
161        join_table = join.alias_or_name
162
163        columns = {}
164
165        for source_name in scope.selected_sources:
166            if source_name in ordered:
167                for column_name in resolver.get_source_columns(source_name):
168                    if column_name not in columns:
169                        columns[column_name] = source_name
170
171        source_table = ordered[-1]
172        ordered.append(join_table)
173        join_columns = resolver.get_source_columns(join_table)
174        conditions = []
175        using_identifier_count = len(using)
176
177        for identifier in using:
178            identifier = identifier.name
179            table = columns.get(identifier)
180
181            if not table or identifier not in join_columns:
182                if (columns and "*" not in columns) and join_columns:
183                    raise OptimizeError(f"Cannot automatically join: {identifier}")
184
185            table = table or source_table
186
187            if i == 0 or using_identifier_count == 1:
188                lhs: exp.Expression = exp.column(identifier, table=table)
189            else:
190                coalesce_columns = [
191                    exp.column(identifier, table=t)
192                    for t in ordered[:-1]
193                    if identifier in resolver.get_source_columns(t)
194                ]
195                if len(coalesce_columns) > 1:
196                    lhs = exp.func("coalesce", *coalesce_columns)
197                else:
198                    lhs = exp.column(identifier, table=table)
199
200            conditions.append(lhs.eq(exp.column(identifier, table=join_table)))
201
202            # Set all values in the dict to None, because we only care about the key ordering
203            tables = column_tables.setdefault(identifier, {})
204            if table not in tables:
205                tables[table] = None
206            if join_table not in tables:
207                tables[join_table] = None
208
209        join.args.pop("using")
210        join.set("on", exp.and_(*conditions, copy=False))
211
212    if column_tables:
213        for column in scope.columns:
214            if not column.table and column.name in column_tables:
215                tables = column_tables[column.name]
216                coalesce_args = [exp.column(column.name, table=table) for table in tables]
217                replacement = exp.func("coalesce", *coalesce_args)
218
219                # Ensure selects keep their output name
220                if isinstance(column.parent, exp.Select):
221                    replacement = alias(replacement, alias=column.name, copy=False)
222
223                scope.replace(column, replacement)
224
225    return column_tables
226
227
228def _expand_alias_refs(scope: Scope, resolver: Resolver, expand_only_groupby: bool = False) -> None:
229    expression = scope.expression
230
231    if not isinstance(expression, exp.Select):
232        return
233
234    alias_to_expression: t.Dict[str, t.Tuple[exp.Expression, int]] = {}
235
236    def replace_columns(
237        node: t.Optional[exp.Expression], resolve_table: bool = False, literal_index: bool = False
238    ) -> None:
239        if not node or (expand_only_groupby and not isinstance(node, exp.Group)):
240            return
241
242        for column in walk_in_scope(node, prune=lambda node: node.is_star):
243            if not isinstance(column, exp.Column):
244                continue
245
246            table = resolver.get_table(column.name) if resolve_table and not column.table else None
247            alias_expr, i = alias_to_expression.get(column.name, (None, 1))
248            double_agg = (
249                (
250                    alias_expr.find(exp.AggFunc)
251                    and (
252                        column.find_ancestor(exp.AggFunc)
253                        and not isinstance(column.find_ancestor(exp.Window, exp.Select), exp.Window)
254                    )
255                )
256                if alias_expr
257                else False
258            )
259
260            if table and (not alias_expr or double_agg):
261                column.set("table", table)
262            elif not column.table and alias_expr and not double_agg:
263                if isinstance(alias_expr, exp.Literal) and (literal_index or resolve_table):
264                    if literal_index:
265                        column.replace(exp.Literal.number(i))
266                else:
267                    column = column.replace(exp.paren(alias_expr))
268                    simplified = simplify_parens(column)
269                    if simplified is not column:
270                        column.replace(simplified)
271
272    for i, projection in enumerate(scope.expression.selects):
273        replace_columns(projection)
274
275        if isinstance(projection, exp.Alias):
276            alias_to_expression[projection.alias] = (projection.this, i + 1)
277
278    replace_columns(expression.args.get("where"))
279    replace_columns(expression.args.get("group"), literal_index=True)
280    replace_columns(expression.args.get("having"), resolve_table=True)
281    replace_columns(expression.args.get("qualify"), resolve_table=True)
282
283    scope.clear_cache()
284
285
286def _expand_group_by(scope: Scope, dialect: DialectType) -> None:
287    expression = scope.expression
288    group = expression.args.get("group")
289    if not group:
290        return
291
292    group.set("expressions", _expand_positional_references(scope, group.expressions, dialect))
293    expression.set("group", group)
294
295
296def _expand_order_by(scope: Scope, resolver: Resolver) -> None:
297    order = scope.expression.args.get("order")
298    if not order:
299        return
300
301    ordereds = order.expressions
302    for ordered, new_expression in zip(
303        ordereds,
304        _expand_positional_references(
305            scope, (o.this for o in ordereds), resolver.schema.dialect, alias=True
306        ),
307    ):
308        for agg in ordered.find_all(exp.AggFunc):
309            for col in agg.find_all(exp.Column):
310                if not col.table:
311                    col.set("table", resolver.get_table(col.name))
312
313        ordered.set("this", new_expression)
314
315    if scope.expression.args.get("group"):
316        selects = {s.this: exp.column(s.alias_or_name) for s in scope.expression.selects}
317
318        for ordered in ordereds:
319            ordered = ordered.this
320
321            ordered.replace(
322                exp.to_identifier(_select_by_pos(scope, ordered).alias)
323                if ordered.is_int
324                else selects.get(ordered, ordered)
325            )
326
327
328def _expand_positional_references(
329    scope: Scope, expressions: t.Iterable[exp.Expression], dialect: DialectType, alias: bool = False
330) -> t.List[exp.Expression]:
331    new_nodes: t.List[exp.Expression] = []
332    ambiguous_projections = None
333
334    for node in expressions:
335        if node.is_int:
336            select = _select_by_pos(scope, t.cast(exp.Literal, node))
337
338            if alias:
339                new_nodes.append(exp.column(select.args["alias"].copy()))
340            else:
341                select = select.this
342
343                if dialect == "bigquery":
344                    if ambiguous_projections is None:
345                        # When a projection name is also a source name and it is referenced in the
346                        # GROUP BY clause, BQ can't understand what the identifier corresponds to
347                        ambiguous_projections = {
348                            s.alias_or_name
349                            for s in scope.expression.selects
350                            if s.alias_or_name in scope.selected_sources
351                        }
352
353                    ambiguous = any(
354                        column.parts[0].name in ambiguous_projections
355                        for column in select.find_all(exp.Column)
356                    )
357                else:
358                    ambiguous = False
359
360                if (
361                    isinstance(select, exp.CONSTANTS)
362                    or select.find(exp.Explode, exp.Unnest)
363                    or ambiguous
364                ):
365                    new_nodes.append(node)
366                else:
367                    new_nodes.append(select.copy())
368        else:
369            new_nodes.append(node)
370
371    return new_nodes
372
373
374def _select_by_pos(scope: Scope, node: exp.Literal) -> exp.Alias:
375    try:
376        return scope.expression.selects[int(node.this) - 1].assert_is(exp.Alias)
377    except IndexError:
378        raise OptimizeError(f"Unknown output column: {node.name}")
379
380
381def _convert_columns_to_dots(scope: Scope, resolver: Resolver) -> None:
382    """
383    Converts `Column` instances that represent struct field lookup into chained `Dots`.
384
385    Struct field lookups look like columns (e.g. "struct"."field"), but they need to be
386    qualified separately and represented as Dot(Dot(...(<table>.<column>, field1), field2, ...)).
387    """
388    converted = False
389    for column in itertools.chain(scope.columns, scope.stars):
390        if isinstance(column, exp.Dot):
391            continue
392
393        column_table: t.Optional[str | exp.Identifier] = column.table
394        if (
395            column_table
396            and column_table not in scope.sources
397            and (
398                not scope.parent
399                or column_table not in scope.parent.sources
400                or not scope.is_correlated_subquery
401            )
402        ):
403            root, *parts = column.parts
404
405            if root.name in scope.sources:
406                # The struct is already qualified, but we still need to change the AST
407                column_table = root
408                root, *parts = parts
409            else:
410                column_table = resolver.get_table(root.name)
411
412            if column_table:
413                converted = True
414                column.replace(exp.Dot.build([exp.column(root, table=column_table), *parts]))
415
416    if converted:
417        # We want to re-aggregate the converted columns, otherwise they'd be skipped in
418        # a `for column in scope.columns` iteration, even though they shouldn't be
419        scope.clear_cache()
420
421
422def _qualify_columns(scope: Scope, resolver: Resolver) -> None:
423    """Disambiguate columns, ensuring each column specifies a source"""
424    for column in scope.columns:
425        column_table = column.table
426        column_name = column.name
427
428        if column_table and column_table in scope.sources:
429            source_columns = resolver.get_source_columns(column_table)
430            if source_columns and column_name not in source_columns and "*" not in source_columns:
431                raise OptimizeError(f"Unknown column: {column_name}")
432
433        if not column_table:
434            if scope.pivots and not column.find_ancestor(exp.Pivot):
435                # If the column is under the Pivot expression, we need to qualify it
436                # using the name of the pivoted source instead of the pivot's alias
437                column.set("table", exp.to_identifier(scope.pivots[0].alias))
438                continue
439
440            # column_table can be a '' because bigquery unnest has no table alias
441            column_table = resolver.get_table(column_name)
442            if column_table:
443                column.set("table", column_table)
444
445    for pivot in scope.pivots:
446        for column in pivot.find_all(exp.Column):
447            if not column.table and column.name in resolver.all_columns:
448                column_table = resolver.get_table(column.name)
449                if column_table:
450                    column.set("table", column_table)
451
452
453def _expand_struct_stars(
454    expression: exp.Dot,
455) -> t.List[exp.Alias]:
456    """[BigQuery] Expand/Flatten foo.bar.* where bar is a struct column"""
457
458    dot_column = t.cast(exp.Column, expression.find(exp.Column))
459    if not dot_column.is_type(exp.DataType.Type.STRUCT):
460        return []
461
462    # All nested struct values are ColumnDefs, so normalize the first exp.Column in one
463    dot_column = dot_column.copy()
464    starting_struct = exp.ColumnDef(this=dot_column.this, kind=dot_column.type)
465
466    # First part is the table name and last part is the star so they can be dropped
467    dot_parts = expression.parts[1:-1]
468
469    # If we're expanding a nested struct eg. t.c.f1.f2.* find the last struct (f2 in this case)
470    for part in dot_parts[1:]:
471        for field in t.cast(exp.DataType, starting_struct.kind).expressions:
472            # Unable to expand star unless all fields are named
473            if not isinstance(field.this, exp.Identifier):
474                return []
475
476            if field.name == part.name and field.kind.is_type(exp.DataType.Type.STRUCT):
477                starting_struct = field
478                break
479        else:
480            # There is no matching field in the struct
481            return []
482
483    taken_names = set()
484    new_selections = []
485
486    for field in t.cast(exp.DataType, starting_struct.kind).expressions:
487        name = field.name
488
489        # Ambiguous or anonymous fields can't be expanded
490        if name in taken_names or not isinstance(field.this, exp.Identifier):
491            return []
492
493        taken_names.add(name)
494
495        this = field.this.copy()
496        root, *parts = [part.copy() for part in itertools.chain(dot_parts, [this])]
497        new_column = exp.column(
498            t.cast(exp.Identifier, root), table=dot_column.args.get("table"), fields=parts
499        )
500        new_selections.append(alias(new_column, this, copy=False))
501
502    return new_selections
503
504
505def _expand_stars(
506    scope: Scope,
507    resolver: Resolver,
508    using_column_tables: t.Dict[str, t.Any],
509    pseudocolumns: t.Set[str],
510    annotator: TypeAnnotator,
511) -> None:
512    """Expand stars to lists of column selections"""
513
514    new_selections = []
515    except_columns: t.Dict[int, t.Set[str]] = {}
516    replace_columns: t.Dict[int, t.Dict[str, exp.Alias]] = {}
517    rename_columns: t.Dict[int, t.Dict[str, str]] = {}
518
519    coalesced_columns = set()
520    dialect = resolver.schema.dialect
521
522    pivot_output_columns = None
523    pivot_exclude_columns = None
524
525    pivot = t.cast(t.Optional[exp.Pivot], seq_get(scope.pivots, 0))
526    if isinstance(pivot, exp.Pivot) and not pivot.alias_column_names:
527        if pivot.unpivot:
528            pivot_output_columns = [c.output_name for c in _unpivot_columns(pivot)]
529
530            field = pivot.args.get("field")
531            if isinstance(field, exp.In):
532                pivot_exclude_columns = {
533                    c.output_name for e in field.expressions for c in e.find_all(exp.Column)
534                }
535        else:
536            pivot_exclude_columns = set(c.output_name for c in pivot.find_all(exp.Column))
537
538            pivot_output_columns = [c.output_name for c in pivot.args.get("columns", [])]
539            if not pivot_output_columns:
540                pivot_output_columns = [c.alias_or_name for c in pivot.expressions]
541
542    is_bigquery = dialect == "bigquery"
543    if is_bigquery and any(isinstance(col, exp.Dot) for col in scope.stars):
544        # Found struct expansion, annotate scope ahead of time
545        annotator.annotate_scope(scope)
546
547    for expression in scope.expression.selects:
548        tables = []
549        if isinstance(expression, exp.Star):
550            tables.extend(scope.selected_sources)
551            _add_except_columns(expression, tables, except_columns)
552            _add_replace_columns(expression, tables, replace_columns)
553            _add_rename_columns(expression, tables, rename_columns)
554        elif expression.is_star:
555            if not isinstance(expression, exp.Dot):
556                tables.append(expression.table)
557                _add_except_columns(expression.this, tables, except_columns)
558                _add_replace_columns(expression.this, tables, replace_columns)
559                _add_rename_columns(expression.this, tables, rename_columns)
560            elif is_bigquery:
561                struct_fields = _expand_struct_stars(expression)
562                if struct_fields:
563                    new_selections.extend(struct_fields)
564                    continue
565
566        if not tables:
567            new_selections.append(expression)
568            continue
569
570        for table in tables:
571            if table not in scope.sources:
572                raise OptimizeError(f"Unknown table: {table}")
573
574            columns = resolver.get_source_columns(table, only_visible=True)
575            columns = columns or scope.outer_columns
576
577            if pseudocolumns:
578                columns = [name for name in columns if name.upper() not in pseudocolumns]
579
580            if not columns or "*" in columns:
581                return
582
583            table_id = id(table)
584            columns_to_exclude = except_columns.get(table_id) or set()
585            renamed_columns = rename_columns.get(table_id, {})
586            replaced_columns = replace_columns.get(table_id, {})
587
588            if pivot:
589                if pivot_output_columns and pivot_exclude_columns:
590                    pivot_columns = [c for c in columns if c not in pivot_exclude_columns]
591                    pivot_columns.extend(pivot_output_columns)
592                else:
593                    pivot_columns = pivot.alias_column_names
594
595                if pivot_columns:
596                    new_selections.extend(
597                        alias(exp.column(name, table=pivot.alias), name, copy=False)
598                        for name in pivot_columns
599                        if name not in columns_to_exclude
600                    )
601                    continue
602
603            for name in columns:
604                if name in columns_to_exclude or name in coalesced_columns:
605                    continue
606                if name in using_column_tables and table in using_column_tables[name]:
607                    coalesced_columns.add(name)
608                    tables = using_column_tables[name]
609                    coalesce_args = [exp.column(name, table=table) for table in tables]
610
611                    new_selections.append(
612                        alias(exp.func("coalesce", *coalesce_args), alias=name, copy=False)
613                    )
614                else:
615                    alias_ = renamed_columns.get(name, name)
616                    selection_expr = replaced_columns.get(name) or exp.column(name, table=table)
617                    new_selections.append(
618                        alias(selection_expr, alias_, copy=False)
619                        if alias_ != name
620                        else selection_expr
621                    )
622
623    # Ensures we don't overwrite the initial selections with an empty list
624    if new_selections and isinstance(scope.expression, exp.Select):
625        scope.expression.set("expressions", new_selections)
626
627
628def _add_except_columns(
629    expression: exp.Expression, tables, except_columns: t.Dict[int, t.Set[str]]
630) -> None:
631    except_ = expression.args.get("except")
632
633    if not except_:
634        return
635
636    columns = {e.name for e in except_}
637
638    for table in tables:
639        except_columns[id(table)] = columns
640
641
642def _add_rename_columns(
643    expression: exp.Expression, tables, rename_columns: t.Dict[int, t.Dict[str, str]]
644) -> None:
645    rename = expression.args.get("rename")
646
647    if not rename:
648        return
649
650    columns = {e.this.name: e.alias for e in rename}
651
652    for table in tables:
653        rename_columns[id(table)] = columns
654
655
656def _add_replace_columns(
657    expression: exp.Expression, tables, replace_columns: t.Dict[int, t.Dict[str, exp.Alias]]
658) -> None:
659    replace = expression.args.get("replace")
660
661    if not replace:
662        return
663
664    columns = {e.alias: e for e in replace}
665
666    for table in tables:
667        replace_columns[id(table)] = columns
668
669
670def qualify_outputs(scope_or_expression: Scope | exp.Expression) -> None:
671    """Ensure all output columns are aliased"""
672    if isinstance(scope_or_expression, exp.Expression):
673        scope = build_scope(scope_or_expression)
674        if not isinstance(scope, Scope):
675            return
676    else:
677        scope = scope_or_expression
678
679    new_selections = []
680    for i, (selection, aliased_column) in enumerate(
681        itertools.zip_longest(scope.expression.selects, scope.outer_columns)
682    ):
683        if selection is None:
684            break
685
686        if isinstance(selection, exp.Subquery):
687            if not selection.output_name:
688                selection.set("alias", exp.TableAlias(this=exp.to_identifier(f"_col_{i}")))
689        elif not isinstance(selection, exp.Alias) and not selection.is_star:
690            selection = alias(
691                selection,
692                alias=selection.output_name or f"_col_{i}",
693                copy=False,
694            )
695        if aliased_column:
696            selection.set("alias", exp.to_identifier(aliased_column))
697
698        new_selections.append(selection)
699
700    if isinstance(scope.expression, exp.Select):
701        scope.expression.set("expressions", new_selections)
702
703
704def quote_identifiers(expression: E, dialect: DialectType = None, identify: bool = True) -> E:
705    """Makes sure all identifiers that need to be quoted are quoted."""
706    return expression.transform(
707        Dialect.get_or_raise(dialect).quote_identifier, identify=identify, copy=False
708    )  # type: ignore
709
710
711def pushdown_cte_alias_columns(expression: exp.Expression) -> exp.Expression:
712    """
713    Pushes down the CTE alias columns into the projection,
714
715    This step is useful in Snowflake where the CTE alias columns can be referenced in the HAVING.
716
717    Example:
718        >>> import sqlglot
719        >>> expression = sqlglot.parse_one("WITH y (c) AS (SELECT SUM(a) FROM ( SELECT 1 a ) AS x HAVING c > 0) SELECT c FROM y")
720        >>> pushdown_cte_alias_columns(expression).sql()
721        'WITH y(c) AS (SELECT SUM(a) AS c FROM (SELECT 1 AS a) AS x HAVING c > 0) SELECT c FROM y'
722
723    Args:
724        expression: Expression to pushdown.
725
726    Returns:
727        The expression with the CTE aliases pushed down into the projection.
728    """
729    for cte in expression.find_all(exp.CTE):
730        if cte.alias_column_names:
731            new_expressions = []
732            for _alias, projection in zip(cte.alias_column_names, cte.this.expressions):
733                if isinstance(projection, exp.Alias):
734                    projection.set("alias", _alias)
735                else:
736                    projection = alias(projection, alias=_alias)
737                new_expressions.append(projection)
738            cte.this.set("expressions", new_expressions)
739
740    return expression
741
742
743class Resolver:
744    """
745    Helper for resolving columns.
746
747    This is a class so we can lazily load some things and easily share them across functions.
748    """
749
750    def __init__(self, scope: Scope, schema: Schema, infer_schema: bool = True):
751        self.scope = scope
752        self.schema = schema
753        self._source_columns: t.Optional[t.Dict[str, t.Sequence[str]]] = None
754        self._unambiguous_columns: t.Optional[t.Mapping[str, str]] = None
755        self._all_columns: t.Optional[t.Set[str]] = None
756        self._infer_schema = infer_schema
757        self._get_source_columns_cache: t.Dict[t.Tuple[str, bool], t.Sequence[str]] = {}
758
759    def get_table(self, column_name: str) -> t.Optional[exp.Identifier]:
760        """
761        Get the table for a column name.
762
763        Args:
764            column_name: The column name to find the table for.
765        Returns:
766            The table name if it can be found/inferred.
767        """
768        if self._unambiguous_columns is None:
769            self._unambiguous_columns = self._get_unambiguous_columns(
770                self._get_all_source_columns()
771            )
772
773        table_name = self._unambiguous_columns.get(column_name)
774
775        if not table_name and self._infer_schema:
776            sources_without_schema = tuple(
777                source
778                for source, columns in self._get_all_source_columns().items()
779                if not columns or "*" in columns
780            )
781            if len(sources_without_schema) == 1:
782                table_name = sources_without_schema[0]
783
784        if table_name not in self.scope.selected_sources:
785            return exp.to_identifier(table_name)
786
787        node, _ = self.scope.selected_sources.get(table_name)
788
789        if isinstance(node, exp.Query):
790            while node and node.alias != table_name:
791                node = node.parent
792
793        node_alias = node.args.get("alias")
794        if node_alias:
795            return exp.to_identifier(node_alias.this)
796
797        return exp.to_identifier(table_name)
798
799    @property
800    def all_columns(self) -> t.Set[str]:
801        """All available columns of all sources in this scope"""
802        if self._all_columns is None:
803            self._all_columns = {
804                column for columns in self._get_all_source_columns().values() for column in columns
805            }
806        return self._all_columns
807
808    def get_source_columns(self, name: str, only_visible: bool = False) -> t.Sequence[str]:
809        """Resolve the source columns for a given source `name`."""
810        cache_key = (name, only_visible)
811        if cache_key not in self._get_source_columns_cache:
812            if name not in self.scope.sources:
813                raise OptimizeError(f"Unknown table: {name}")
814
815            source = self.scope.sources[name]
816
817            if isinstance(source, exp.Table):
818                columns = self.schema.column_names(source, only_visible)
819            elif isinstance(source, Scope) and isinstance(
820                source.expression, (exp.Values, exp.Unnest)
821            ):
822                columns = source.expression.named_selects
823
824                # in bigquery, unnest structs are automatically scoped as tables, so you can
825                # directly select a struct field in a query.
826                # this handles the case where the unnest is statically defined.
827                if self.schema.dialect == "bigquery":
828                    if source.expression.is_type(exp.DataType.Type.STRUCT):
829                        for k in source.expression.type.expressions:  # type: ignore
830                            columns.append(k.name)
831            else:
832                columns = source.expression.named_selects
833
834            node, _ = self.scope.selected_sources.get(name) or (None, None)
835            if isinstance(node, Scope):
836                column_aliases = node.expression.alias_column_names
837            elif isinstance(node, exp.Expression):
838                column_aliases = node.alias_column_names
839            else:
840                column_aliases = []
841
842            if column_aliases:
843                # If the source's columns are aliased, their aliases shadow the corresponding column names.
844                # This can be expensive if there are lots of columns, so only do this if column_aliases exist.
845                columns = [
846                    alias or name
847                    for (name, alias) in itertools.zip_longest(columns, column_aliases)
848                ]
849
850            self._get_source_columns_cache[cache_key] = columns
851
852        return self._get_source_columns_cache[cache_key]
853
854    def _get_all_source_columns(self) -> t.Dict[str, t.Sequence[str]]:
855        if self._source_columns is None:
856            self._source_columns = {
857                source_name: self.get_source_columns(source_name)
858                for source_name, source in itertools.chain(
859                    self.scope.selected_sources.items(), self.scope.lateral_sources.items()
860                )
861            }
862        return self._source_columns
863
864    def _get_unambiguous_columns(
865        self, source_columns: t.Dict[str, t.Sequence[str]]
866    ) -> t.Mapping[str, str]:
867        """
868        Find all the unambiguous columns in sources.
869
870        Args:
871            source_columns: Mapping of names to source columns.
872
873        Returns:
874            Mapping of column name to source name.
875        """
876        if not source_columns:
877            return {}
878
879        source_columns_pairs = list(source_columns.items())
880
881        first_table, first_columns = source_columns_pairs[0]
882
883        if len(source_columns_pairs) == 1:
884            # Performance optimization - avoid copying first_columns if there is only one table.
885            return SingleValuedMapping(first_columns, first_table)
886
887        unambiguous_columns = {col: first_table for col in first_columns}
888        all_columns = set(unambiguous_columns)
889
890        for table, columns in source_columns_pairs[1:]:
891            unique = set(columns)
892            ambiguous = all_columns.intersection(unique)
893            all_columns.update(columns)
894
895            for column in ambiguous:
896                unambiguous_columns.pop(column, None)
897            for column in unique.difference(ambiguous):
898                unambiguous_columns[column] = table
899
900        return unambiguous_columns
def qualify_columns( expression: sqlglot.expressions.Expression, schema: Union[Dict, sqlglot.schema.Schema], expand_alias_refs: bool = True, expand_stars: bool = True, infer_schema: Optional[bool] = None) -> sqlglot.expressions.Expression:
20def qualify_columns(
21    expression: exp.Expression,
22    schema: t.Dict | Schema,
23    expand_alias_refs: bool = True,
24    expand_stars: bool = True,
25    infer_schema: t.Optional[bool] = None,
26) -> exp.Expression:
27    """
28    Rewrite sqlglot AST to have fully qualified columns.
29
30    Example:
31        >>> import sqlglot
32        >>> schema = {"tbl": {"col": "INT"}}
33        >>> expression = sqlglot.parse_one("SELECT col FROM tbl")
34        >>> qualify_columns(expression, schema).sql()
35        'SELECT tbl.col AS col FROM tbl'
36
37    Args:
38        expression: Expression to qualify.
39        schema: Database schema.
40        expand_alias_refs: Whether to expand references to aliases.
41        expand_stars: Whether to expand star queries. This is a necessary step
42            for most of the optimizer's rules to work; do not set to False unless you
43            know what you're doing!
44        infer_schema: Whether to infer the schema if missing.
45
46    Returns:
47        The qualified expression.
48
49    Notes:
50        - Currently only handles a single PIVOT or UNPIVOT operator
51    """
52    schema = ensure_schema(schema)
53    annotator = TypeAnnotator(schema)
54    infer_schema = schema.empty if infer_schema is None else infer_schema
55    dialect = Dialect.get_or_raise(schema.dialect)
56    pseudocolumns = dialect.PSEUDOCOLUMNS
57
58    for scope in traverse_scope(expression):
59        resolver = Resolver(scope, schema, infer_schema=infer_schema)
60        _pop_table_column_aliases(scope.ctes)
61        _pop_table_column_aliases(scope.derived_tables)
62        using_column_tables = _expand_using(scope, resolver)
63
64        if (schema.empty or dialect.FORCE_EARLY_ALIAS_REF_EXPANSION) and expand_alias_refs:
65            _expand_alias_refs(
66                scope,
67                resolver,
68                expand_only_groupby=dialect.EXPAND_ALIAS_REFS_EARLY_ONLY_IN_GROUP_BY,
69            )
70
71        _convert_columns_to_dots(scope, resolver)
72        _qualify_columns(scope, resolver)
73
74        if not schema.empty and expand_alias_refs:
75            _expand_alias_refs(scope, resolver)
76
77        if not isinstance(scope.expression, exp.UDTF):
78            if expand_stars:
79                _expand_stars(
80                    scope,
81                    resolver,
82                    using_column_tables,
83                    pseudocolumns,
84                    annotator,
85                )
86            qualify_outputs(scope)
87
88        _expand_group_by(scope, dialect)
89        _expand_order_by(scope, resolver)
90
91        if dialect == "bigquery":
92            annotator.annotate_scope(scope)
93
94    return expression

Rewrite sqlglot AST to have fully qualified columns.

Example:
>>> import sqlglot
>>> schema = {"tbl": {"col": "INT"}}
>>> expression = sqlglot.parse_one("SELECT col FROM tbl")
>>> qualify_columns(expression, schema).sql()
'SELECT tbl.col AS col FROM tbl'
Arguments:
  • expression: Expression to qualify.
  • schema: Database schema.
  • expand_alias_refs: Whether to expand references to aliases.
  • expand_stars: Whether to expand star queries. This is a necessary step for most of the optimizer's rules to work; do not set to False unless you know what you're doing!
  • infer_schema: Whether to infer the schema if missing.
Returns:

The qualified expression.

Notes:
  • Currently only handles a single PIVOT or UNPIVOT operator
def validate_qualify_columns(expression: ~E) -> ~E:
 97def validate_qualify_columns(expression: E) -> E:
 98    """Raise an `OptimizeError` if any columns aren't qualified"""
 99    all_unqualified_columns = []
100    for scope in traverse_scope(expression):
101        if isinstance(scope.expression, exp.Select):
102            unqualified_columns = scope.unqualified_columns
103
104            if scope.external_columns and not scope.is_correlated_subquery and not scope.pivots:
105                column = scope.external_columns[0]
106                for_table = f" for table: '{column.table}'" if column.table else ""
107                raise OptimizeError(f"Column '{column}' could not be resolved{for_table}")
108
109            if unqualified_columns and scope.pivots and scope.pivots[0].unpivot:
110                # New columns produced by the UNPIVOT can't be qualified, but there may be columns
111                # under the UNPIVOT's IN clause that can and should be qualified. We recompute
112                # this list here to ensure those in the former category will be excluded.
113                unpivot_columns = set(_unpivot_columns(scope.pivots[0]))
114                unqualified_columns = [c for c in unqualified_columns if c not in unpivot_columns]
115
116            all_unqualified_columns.extend(unqualified_columns)
117
118    if all_unqualified_columns:
119        raise OptimizeError(f"Ambiguous columns: {all_unqualified_columns}")
120
121    return expression

Raise an OptimizeError if any columns aren't qualified

def qualify_outputs( scope_or_expression: sqlglot.optimizer.scope.Scope | sqlglot.expressions.Expression) -> None:
671def qualify_outputs(scope_or_expression: Scope | exp.Expression) -> None:
672    """Ensure all output columns are aliased"""
673    if isinstance(scope_or_expression, exp.Expression):
674        scope = build_scope(scope_or_expression)
675        if not isinstance(scope, Scope):
676            return
677    else:
678        scope = scope_or_expression
679
680    new_selections = []
681    for i, (selection, aliased_column) in enumerate(
682        itertools.zip_longest(scope.expression.selects, scope.outer_columns)
683    ):
684        if selection is None:
685            break
686
687        if isinstance(selection, exp.Subquery):
688            if not selection.output_name:
689                selection.set("alias", exp.TableAlias(this=exp.to_identifier(f"_col_{i}")))
690        elif not isinstance(selection, exp.Alias) and not selection.is_star:
691            selection = alias(
692                selection,
693                alias=selection.output_name or f"_col_{i}",
694                copy=False,
695            )
696        if aliased_column:
697            selection.set("alias", exp.to_identifier(aliased_column))
698
699        new_selections.append(selection)
700
701    if isinstance(scope.expression, exp.Select):
702        scope.expression.set("expressions", new_selections)

Ensure all output columns are aliased

def quote_identifiers( expression: ~E, dialect: Union[str, sqlglot.dialects.dialect.Dialect, Type[sqlglot.dialects.dialect.Dialect], NoneType] = None, identify: bool = True) -> ~E:
705def quote_identifiers(expression: E, dialect: DialectType = None, identify: bool = True) -> E:
706    """Makes sure all identifiers that need to be quoted are quoted."""
707    return expression.transform(
708        Dialect.get_or_raise(dialect).quote_identifier, identify=identify, copy=False
709    )  # type: ignore

Makes sure all identifiers that need to be quoted are quoted.

def pushdown_cte_alias_columns( expression: sqlglot.expressions.Expression) -> sqlglot.expressions.Expression:
712def pushdown_cte_alias_columns(expression: exp.Expression) -> exp.Expression:
713    """
714    Pushes down the CTE alias columns into the projection,
715
716    This step is useful in Snowflake where the CTE alias columns can be referenced in the HAVING.
717
718    Example:
719        >>> import sqlglot
720        >>> expression = sqlglot.parse_one("WITH y (c) AS (SELECT SUM(a) FROM ( SELECT 1 a ) AS x HAVING c > 0) SELECT c FROM y")
721        >>> pushdown_cte_alias_columns(expression).sql()
722        'WITH y(c) AS (SELECT SUM(a) AS c FROM (SELECT 1 AS a) AS x HAVING c > 0) SELECT c FROM y'
723
724    Args:
725        expression: Expression to pushdown.
726
727    Returns:
728        The expression with the CTE aliases pushed down into the projection.
729    """
730    for cte in expression.find_all(exp.CTE):
731        if cte.alias_column_names:
732            new_expressions = []
733            for _alias, projection in zip(cte.alias_column_names, cte.this.expressions):
734                if isinstance(projection, exp.Alias):
735                    projection.set("alias", _alias)
736                else:
737                    projection = alias(projection, alias=_alias)
738                new_expressions.append(projection)
739            cte.this.set("expressions", new_expressions)
740
741    return expression

Pushes down the CTE alias columns into the projection,

This step is useful in Snowflake where the CTE alias columns can be referenced in the HAVING.

Example:
>>> import sqlglot
>>> expression = sqlglot.parse_one("WITH y (c) AS (SELECT SUM(a) FROM ( SELECT 1 a ) AS x HAVING c > 0) SELECT c FROM y")
>>> pushdown_cte_alias_columns(expression).sql()
'WITH y(c) AS (SELECT SUM(a) AS c FROM (SELECT 1 AS a) AS x HAVING c > 0) SELECT c FROM y'
Arguments:
  • expression: Expression to pushdown.
Returns:

The expression with the CTE aliases pushed down into the projection.

class Resolver:
744class Resolver:
745    """
746    Helper for resolving columns.
747
748    This is a class so we can lazily load some things and easily share them across functions.
749    """
750
751    def __init__(self, scope: Scope, schema: Schema, infer_schema: bool = True):
752        self.scope = scope
753        self.schema = schema
754        self._source_columns: t.Optional[t.Dict[str, t.Sequence[str]]] = None
755        self._unambiguous_columns: t.Optional[t.Mapping[str, str]] = None
756        self._all_columns: t.Optional[t.Set[str]] = None
757        self._infer_schema = infer_schema
758        self._get_source_columns_cache: t.Dict[t.Tuple[str, bool], t.Sequence[str]] = {}
759
760    def get_table(self, column_name: str) -> t.Optional[exp.Identifier]:
761        """
762        Get the table for a column name.
763
764        Args:
765            column_name: The column name to find the table for.
766        Returns:
767            The table name if it can be found/inferred.
768        """
769        if self._unambiguous_columns is None:
770            self._unambiguous_columns = self._get_unambiguous_columns(
771                self._get_all_source_columns()
772            )
773
774        table_name = self._unambiguous_columns.get(column_name)
775
776        if not table_name and self._infer_schema:
777            sources_without_schema = tuple(
778                source
779                for source, columns in self._get_all_source_columns().items()
780                if not columns or "*" in columns
781            )
782            if len(sources_without_schema) == 1:
783                table_name = sources_without_schema[0]
784
785        if table_name not in self.scope.selected_sources:
786            return exp.to_identifier(table_name)
787
788        node, _ = self.scope.selected_sources.get(table_name)
789
790        if isinstance(node, exp.Query):
791            while node and node.alias != table_name:
792                node = node.parent
793
794        node_alias = node.args.get("alias")
795        if node_alias:
796            return exp.to_identifier(node_alias.this)
797
798        return exp.to_identifier(table_name)
799
800    @property
801    def all_columns(self) -> t.Set[str]:
802        """All available columns of all sources in this scope"""
803        if self._all_columns is None:
804            self._all_columns = {
805                column for columns in self._get_all_source_columns().values() for column in columns
806            }
807        return self._all_columns
808
809    def get_source_columns(self, name: str, only_visible: bool = False) -> t.Sequence[str]:
810        """Resolve the source columns for a given source `name`."""
811        cache_key = (name, only_visible)
812        if cache_key not in self._get_source_columns_cache:
813            if name not in self.scope.sources:
814                raise OptimizeError(f"Unknown table: {name}")
815
816            source = self.scope.sources[name]
817
818            if isinstance(source, exp.Table):
819                columns = self.schema.column_names(source, only_visible)
820            elif isinstance(source, Scope) and isinstance(
821                source.expression, (exp.Values, exp.Unnest)
822            ):
823                columns = source.expression.named_selects
824
825                # in bigquery, unnest structs are automatically scoped as tables, so you can
826                # directly select a struct field in a query.
827                # this handles the case where the unnest is statically defined.
828                if self.schema.dialect == "bigquery":
829                    if source.expression.is_type(exp.DataType.Type.STRUCT):
830                        for k in source.expression.type.expressions:  # type: ignore
831                            columns.append(k.name)
832            else:
833                columns = source.expression.named_selects
834
835            node, _ = self.scope.selected_sources.get(name) or (None, None)
836            if isinstance(node, Scope):
837                column_aliases = node.expression.alias_column_names
838            elif isinstance(node, exp.Expression):
839                column_aliases = node.alias_column_names
840            else:
841                column_aliases = []
842
843            if column_aliases:
844                # If the source's columns are aliased, their aliases shadow the corresponding column names.
845                # This can be expensive if there are lots of columns, so only do this if column_aliases exist.
846                columns = [
847                    alias or name
848                    for (name, alias) in itertools.zip_longest(columns, column_aliases)
849                ]
850
851            self._get_source_columns_cache[cache_key] = columns
852
853        return self._get_source_columns_cache[cache_key]
854
855    def _get_all_source_columns(self) -> t.Dict[str, t.Sequence[str]]:
856        if self._source_columns is None:
857            self._source_columns = {
858                source_name: self.get_source_columns(source_name)
859                for source_name, source in itertools.chain(
860                    self.scope.selected_sources.items(), self.scope.lateral_sources.items()
861                )
862            }
863        return self._source_columns
864
865    def _get_unambiguous_columns(
866        self, source_columns: t.Dict[str, t.Sequence[str]]
867    ) -> t.Mapping[str, str]:
868        """
869        Find all the unambiguous columns in sources.
870
871        Args:
872            source_columns: Mapping of names to source columns.
873
874        Returns:
875            Mapping of column name to source name.
876        """
877        if not source_columns:
878            return {}
879
880        source_columns_pairs = list(source_columns.items())
881
882        first_table, first_columns = source_columns_pairs[0]
883
884        if len(source_columns_pairs) == 1:
885            # Performance optimization - avoid copying first_columns if there is only one table.
886            return SingleValuedMapping(first_columns, first_table)
887
888        unambiguous_columns = {col: first_table for col in first_columns}
889        all_columns = set(unambiguous_columns)
890
891        for table, columns in source_columns_pairs[1:]:
892            unique = set(columns)
893            ambiguous = all_columns.intersection(unique)
894            all_columns.update(columns)
895
896            for column in ambiguous:
897                unambiguous_columns.pop(column, None)
898            for column in unique.difference(ambiguous):
899                unambiguous_columns[column] = table
900
901        return unambiguous_columns

Helper for resolving columns.

This is a class so we can lazily load some things and easily share them across functions.

Resolver( scope: sqlglot.optimizer.scope.Scope, schema: sqlglot.schema.Schema, infer_schema: bool = True)
751    def __init__(self, scope: Scope, schema: Schema, infer_schema: bool = True):
752        self.scope = scope
753        self.schema = schema
754        self._source_columns: t.Optional[t.Dict[str, t.Sequence[str]]] = None
755        self._unambiguous_columns: t.Optional[t.Mapping[str, str]] = None
756        self._all_columns: t.Optional[t.Set[str]] = None
757        self._infer_schema = infer_schema
758        self._get_source_columns_cache: t.Dict[t.Tuple[str, bool], t.Sequence[str]] = {}
scope
schema
def get_table(self, column_name: str) -> Optional[sqlglot.expressions.Identifier]:
760    def get_table(self, column_name: str) -> t.Optional[exp.Identifier]:
761        """
762        Get the table for a column name.
763
764        Args:
765            column_name: The column name to find the table for.
766        Returns:
767            The table name if it can be found/inferred.
768        """
769        if self._unambiguous_columns is None:
770            self._unambiguous_columns = self._get_unambiguous_columns(
771                self._get_all_source_columns()
772            )
773
774        table_name = self._unambiguous_columns.get(column_name)
775
776        if not table_name and self._infer_schema:
777            sources_without_schema = tuple(
778                source
779                for source, columns in self._get_all_source_columns().items()
780                if not columns or "*" in columns
781            )
782            if len(sources_without_schema) == 1:
783                table_name = sources_without_schema[0]
784
785        if table_name not in self.scope.selected_sources:
786            return exp.to_identifier(table_name)
787
788        node, _ = self.scope.selected_sources.get(table_name)
789
790        if isinstance(node, exp.Query):
791            while node and node.alias != table_name:
792                node = node.parent
793
794        node_alias = node.args.get("alias")
795        if node_alias:
796            return exp.to_identifier(node_alias.this)
797
798        return exp.to_identifier(table_name)

Get the table for a column name.

Arguments:
  • column_name: The column name to find the table for.
Returns:

The table name if it can be found/inferred.

all_columns: Set[str]
800    @property
801    def all_columns(self) -> t.Set[str]:
802        """All available columns of all sources in this scope"""
803        if self._all_columns is None:
804            self._all_columns = {
805                column for columns in self._get_all_source_columns().values() for column in columns
806            }
807        return self._all_columns

All available columns of all sources in this scope

def get_source_columns(self, name: str, only_visible: bool = False) -> Sequence[str]:
809    def get_source_columns(self, name: str, only_visible: bool = False) -> t.Sequence[str]:
810        """Resolve the source columns for a given source `name`."""
811        cache_key = (name, only_visible)
812        if cache_key not in self._get_source_columns_cache:
813            if name not in self.scope.sources:
814                raise OptimizeError(f"Unknown table: {name}")
815
816            source = self.scope.sources[name]
817
818            if isinstance(source, exp.Table):
819                columns = self.schema.column_names(source, only_visible)
820            elif isinstance(source, Scope) and isinstance(
821                source.expression, (exp.Values, exp.Unnest)
822            ):
823                columns = source.expression.named_selects
824
825                # in bigquery, unnest structs are automatically scoped as tables, so you can
826                # directly select a struct field in a query.
827                # this handles the case where the unnest is statically defined.
828                if self.schema.dialect == "bigquery":
829                    if source.expression.is_type(exp.DataType.Type.STRUCT):
830                        for k in source.expression.type.expressions:  # type: ignore
831                            columns.append(k.name)
832            else:
833                columns = source.expression.named_selects
834
835            node, _ = self.scope.selected_sources.get(name) or (None, None)
836            if isinstance(node, Scope):
837                column_aliases = node.expression.alias_column_names
838            elif isinstance(node, exp.Expression):
839                column_aliases = node.alias_column_names
840            else:
841                column_aliases = []
842
843            if column_aliases:
844                # If the source's columns are aliased, their aliases shadow the corresponding column names.
845                # This can be expensive if there are lots of columns, so only do this if column_aliases exist.
846                columns = [
847                    alias or name
848                    for (name, alias) in itertools.zip_longest(columns, column_aliases)
849                ]
850
851            self._get_source_columns_cache[cache_key] = columns
852
853        return self._get_source_columns_cache[cache_key]

Resolve the source columns for a given source name.