Edit on GitHub

sqlglot.optimizer.qualify_columns

  1from __future__ import annotations
  2
  3import itertools
  4import typing as t
  5
  6from sqlglot import alias, exp
  7from sqlglot.dialects.dialect import Dialect, DialectType
  8from sqlglot.errors import OptimizeError
  9from sqlglot.helper import seq_get, SingleValuedMapping
 10from sqlglot.optimizer.annotate_types import TypeAnnotator
 11from sqlglot.optimizer.scope import Scope, build_scope, traverse_scope, walk_in_scope
 12from sqlglot.optimizer.simplify import simplify_parens
 13from sqlglot.schema import Schema, ensure_schema
 14
 15if t.TYPE_CHECKING:
 16    from sqlglot._typing import E
 17
 18
 19def qualify_columns(
 20    expression: exp.Expression,
 21    schema: t.Dict | Schema,
 22    expand_alias_refs: bool = True,
 23    expand_stars: bool = True,
 24    infer_schema: t.Optional[bool] = None,
 25) -> exp.Expression:
 26    """
 27    Rewrite sqlglot AST to have fully qualified columns.
 28
 29    Example:
 30        >>> import sqlglot
 31        >>> schema = {"tbl": {"col": "INT"}}
 32        >>> expression = sqlglot.parse_one("SELECT col FROM tbl")
 33        >>> qualify_columns(expression, schema).sql()
 34        'SELECT tbl.col AS col FROM tbl'
 35
 36    Args:
 37        expression: Expression to qualify.
 38        schema: Database schema.
 39        expand_alias_refs: Whether to expand references to aliases.
 40        expand_stars: Whether to expand star queries. This is a necessary step
 41            for most of the optimizer's rules to work; do not set to False unless you
 42            know what you're doing!
 43        infer_schema: Whether to infer the schema if missing.
 44
 45    Returns:
 46        The qualified expression.
 47
 48    Notes:
 49        - Currently only handles a single PIVOT or UNPIVOT operator
 50    """
 51    schema = ensure_schema(schema)
 52    annotator = TypeAnnotator(schema)
 53    infer_schema = schema.empty if infer_schema is None else infer_schema
 54    dialect = Dialect.get_or_raise(schema.dialect)
 55    pseudocolumns = dialect.PSEUDOCOLUMNS
 56
 57    for scope in traverse_scope(expression):
 58        resolver = Resolver(scope, schema, infer_schema=infer_schema)
 59        _pop_table_column_aliases(scope.ctes)
 60        _pop_table_column_aliases(scope.derived_tables)
 61        using_column_tables = _expand_using(scope, resolver)
 62
 63        if (schema.empty or dialect.FORCE_EARLY_ALIAS_REF_EXPANSION) and expand_alias_refs:
 64            _expand_alias_refs(
 65                scope,
 66                resolver,
 67                expand_only_groupby=dialect.EXPAND_ALIAS_REFS_EARLY_ONLY_IN_GROUP_BY,
 68            )
 69
 70        _convert_columns_to_dots(scope, resolver)
 71        _qualify_columns(scope, resolver)
 72
 73        if not schema.empty and expand_alias_refs:
 74            _expand_alias_refs(scope, resolver)
 75
 76        if not isinstance(scope.expression, exp.UDTF):
 77            if expand_stars:
 78                _expand_stars(
 79                    scope,
 80                    resolver,
 81                    using_column_tables,
 82                    pseudocolumns,
 83                    annotator,
 84                )
 85            qualify_outputs(scope)
 86
 87        _expand_group_by(scope, dialect)
 88        _expand_order_by(scope, resolver)
 89
 90        if dialect == "bigquery":
 91            annotator.annotate_scope(scope)
 92
 93    return expression
 94
 95
 96def validate_qualify_columns(expression: E) -> E:
 97    """Raise an `OptimizeError` if any columns aren't qualified"""
 98    all_unqualified_columns = []
 99    for scope in traverse_scope(expression):
100        if isinstance(scope.expression, exp.Select):
101            unqualified_columns = scope.unqualified_columns
102
103            if scope.external_columns and not scope.is_correlated_subquery and not scope.pivots:
104                column = scope.external_columns[0]
105                for_table = f" for table: '{column.table}'" if column.table else ""
106                raise OptimizeError(f"Column '{column}' could not be resolved{for_table}")
107
108            if unqualified_columns and scope.pivots and scope.pivots[0].unpivot:
109                # New columns produced by the UNPIVOT can't be qualified, but there may be columns
110                # under the UNPIVOT's IN clause that can and should be qualified. We recompute
111                # this list here to ensure those in the former category will be excluded.
112                unpivot_columns = set(_unpivot_columns(scope.pivots[0]))
113                unqualified_columns = [c for c in unqualified_columns if c not in unpivot_columns]
114
115            all_unqualified_columns.extend(unqualified_columns)
116
117    if all_unqualified_columns:
118        raise OptimizeError(f"Ambiguous columns: {all_unqualified_columns}")
119
120    return expression
121
122
123def _unpivot_columns(unpivot: exp.Pivot) -> t.Iterator[exp.Column]:
124    name_column = []
125    field = unpivot.args.get("field")
126    if isinstance(field, exp.In) and isinstance(field.this, exp.Column):
127        name_column.append(field.this)
128
129    value_columns = (c for e in unpivot.expressions for c in e.find_all(exp.Column))
130    return itertools.chain(name_column, value_columns)
131
132
133def _pop_table_column_aliases(derived_tables: t.List[exp.CTE | exp.Subquery]) -> None:
134    """
135    Remove table column aliases.
136
137    For example, `col1` and `col2` will be dropped in SELECT ... FROM (SELECT ...) AS foo(col1, col2)
138    """
139    for derived_table in derived_tables:
140        if isinstance(derived_table.parent, exp.With) and derived_table.parent.recursive:
141            continue
142        table_alias = derived_table.args.get("alias")
143        if table_alias:
144            table_alias.args.pop("columns", None)
145
146
147def _expand_using(scope: Scope, resolver: Resolver) -> t.Dict[str, t.Any]:
148    columns = {}
149
150    def _update_source_columns(source_name: str) -> None:
151        for column_name in resolver.get_source_columns(source_name):
152            if column_name not in columns:
153                columns[column_name] = source_name
154
155    joins = list(scope.find_all(exp.Join))
156    names = {join.alias_or_name for join in joins}
157    ordered = [key for key in scope.selected_sources if key not in names]
158
159    # Mapping of automatically joined column names to an ordered set of source names (dict).
160    column_tables: t.Dict[str, t.Dict[str, t.Any]] = {}
161
162    for source_name in ordered:
163        _update_source_columns(source_name)
164
165    for i, join in enumerate(joins):
166        source_table = ordered[-1]
167        if source_table:
168            _update_source_columns(source_table)
169
170        join_table = join.alias_or_name
171        ordered.append(join_table)
172
173        using = join.args.get("using")
174        if not using:
175            continue
176
177        join_columns = resolver.get_source_columns(join_table)
178        conditions = []
179        using_identifier_count = len(using)
180
181        for identifier in using:
182            identifier = identifier.name
183            table = columns.get(identifier)
184
185            if not table or identifier not in join_columns:
186                if (columns and "*" not in columns) and join_columns:
187                    raise OptimizeError(f"Cannot automatically join: {identifier}")
188
189            table = table or source_table
190
191            if i == 0 or using_identifier_count == 1:
192                lhs: exp.Expression = exp.column(identifier, table=table)
193            else:
194                coalesce_columns = [
195                    exp.column(identifier, table=t)
196                    for t in ordered[:-1]
197                    if identifier in resolver.get_source_columns(t)
198                ]
199                if len(coalesce_columns) > 1:
200                    lhs = exp.func("coalesce", *coalesce_columns)
201                else:
202                    lhs = exp.column(identifier, table=table)
203
204            conditions.append(lhs.eq(exp.column(identifier, table=join_table)))
205
206            # Set all values in the dict to None, because we only care about the key ordering
207            tables = column_tables.setdefault(identifier, {})
208            if table not in tables:
209                tables[table] = None
210            if join_table not in tables:
211                tables[join_table] = None
212
213        join.args.pop("using")
214        join.set("on", exp.and_(*conditions, copy=False))
215
216    if column_tables:
217        for column in scope.columns:
218            if not column.table and column.name in column_tables:
219                tables = column_tables[column.name]
220                coalesce_args = [exp.column(column.name, table=table) for table in tables]
221                replacement = exp.func("coalesce", *coalesce_args)
222
223                # Ensure selects keep their output name
224                if isinstance(column.parent, exp.Select):
225                    replacement = alias(replacement, alias=column.name, copy=False)
226
227                scope.replace(column, replacement)
228
229    return column_tables
230
231
232def _expand_alias_refs(scope: Scope, resolver: Resolver, expand_only_groupby: bool = False) -> None:
233    expression = scope.expression
234
235    if not isinstance(expression, exp.Select):
236        return
237
238    alias_to_expression: t.Dict[str, t.Tuple[exp.Expression, int]] = {}
239
240    def replace_columns(
241        node: t.Optional[exp.Expression], resolve_table: bool = False, literal_index: bool = False
242    ) -> None:
243        if not node or (expand_only_groupby and not isinstance(node, exp.Group)):
244            return
245
246        for column in walk_in_scope(node, prune=lambda node: node.is_star):
247            if not isinstance(column, exp.Column):
248                continue
249
250            table = resolver.get_table(column.name) if resolve_table and not column.table else None
251            alias_expr, i = alias_to_expression.get(column.name, (None, 1))
252            double_agg = (
253                (
254                    alias_expr.find(exp.AggFunc)
255                    and (
256                        column.find_ancestor(exp.AggFunc)
257                        and not isinstance(column.find_ancestor(exp.Window, exp.Select), exp.Window)
258                    )
259                )
260                if alias_expr
261                else False
262            )
263
264            if table and (not alias_expr or double_agg):
265                column.set("table", table)
266            elif not column.table and alias_expr and not double_agg:
267                if isinstance(alias_expr, exp.Literal) and (literal_index or resolve_table):
268                    if literal_index:
269                        column.replace(exp.Literal.number(i))
270                else:
271                    column = column.replace(exp.paren(alias_expr))
272                    simplified = simplify_parens(column)
273                    if simplified is not column:
274                        column.replace(simplified)
275
276    for i, projection in enumerate(scope.expression.selects):
277        replace_columns(projection)
278
279        if isinstance(projection, exp.Alias):
280            alias_to_expression[projection.alias] = (projection.this, i + 1)
281
282    parent_scope = scope
283    while parent_scope.is_union:
284        parent_scope = parent_scope.parent
285
286    # We shouldn't expand aliases if they match the recursive CTE's columns
287    if parent_scope.is_cte:
288        cte = parent_scope.expression.parent
289        if cte.find_ancestor(exp.With).recursive:
290            for recursive_cte_column in cte.args["alias"].columns or cte.this.selects:
291                alias_to_expression.pop(recursive_cte_column.output_name, None)
292
293    replace_columns(expression.args.get("where"))
294    replace_columns(expression.args.get("group"), literal_index=True)
295    replace_columns(expression.args.get("having"), resolve_table=True)
296    replace_columns(expression.args.get("qualify"), resolve_table=True)
297
298    scope.clear_cache()
299
300
301def _expand_group_by(scope: Scope, dialect: DialectType) -> None:
302    expression = scope.expression
303    group = expression.args.get("group")
304    if not group:
305        return
306
307    group.set("expressions", _expand_positional_references(scope, group.expressions, dialect))
308    expression.set("group", group)
309
310
311def _expand_order_by(scope: Scope, resolver: Resolver) -> None:
312    order = scope.expression.args.get("order")
313    if not order:
314        return
315
316    ordereds = order.expressions
317    for ordered, new_expression in zip(
318        ordereds,
319        _expand_positional_references(
320            scope, (o.this for o in ordereds), resolver.schema.dialect, alias=True
321        ),
322    ):
323        for agg in ordered.find_all(exp.AggFunc):
324            for col in agg.find_all(exp.Column):
325                if not col.table:
326                    col.set("table", resolver.get_table(col.name))
327
328        ordered.set("this", new_expression)
329
330    if scope.expression.args.get("group"):
331        selects = {s.this: exp.column(s.alias_or_name) for s in scope.expression.selects}
332
333        for ordered in ordereds:
334            ordered = ordered.this
335
336            ordered.replace(
337                exp.to_identifier(_select_by_pos(scope, ordered).alias)
338                if ordered.is_int
339                else selects.get(ordered, ordered)
340            )
341
342
343def _expand_positional_references(
344    scope: Scope, expressions: t.Iterable[exp.Expression], dialect: DialectType, alias: bool = False
345) -> t.List[exp.Expression]:
346    new_nodes: t.List[exp.Expression] = []
347    ambiguous_projections = None
348
349    for node in expressions:
350        if node.is_int:
351            select = _select_by_pos(scope, t.cast(exp.Literal, node))
352
353            if alias:
354                new_nodes.append(exp.column(select.args["alias"].copy()))
355            else:
356                select = select.this
357
358                if dialect == "bigquery":
359                    if ambiguous_projections is None:
360                        # When a projection name is also a source name and it is referenced in the
361                        # GROUP BY clause, BQ can't understand what the identifier corresponds to
362                        ambiguous_projections = {
363                            s.alias_or_name
364                            for s in scope.expression.selects
365                            if s.alias_or_name in scope.selected_sources
366                        }
367
368                    ambiguous = any(
369                        column.parts[0].name in ambiguous_projections
370                        for column in select.find_all(exp.Column)
371                    )
372                else:
373                    ambiguous = False
374
375                if (
376                    isinstance(select, exp.CONSTANTS)
377                    or select.find(exp.Explode, exp.Unnest)
378                    or ambiguous
379                ):
380                    new_nodes.append(node)
381                else:
382                    new_nodes.append(select.copy())
383        else:
384            new_nodes.append(node)
385
386    return new_nodes
387
388
389def _select_by_pos(scope: Scope, node: exp.Literal) -> exp.Alias:
390    try:
391        return scope.expression.selects[int(node.this) - 1].assert_is(exp.Alias)
392    except IndexError:
393        raise OptimizeError(f"Unknown output column: {node.name}")
394
395
396def _convert_columns_to_dots(scope: Scope, resolver: Resolver) -> None:
397    """
398    Converts `Column` instances that represent struct field lookup into chained `Dots`.
399
400    Struct field lookups look like columns (e.g. "struct"."field"), but they need to be
401    qualified separately and represented as Dot(Dot(...(<table>.<column>, field1), field2, ...)).
402    """
403    converted = False
404    for column in itertools.chain(scope.columns, scope.stars):
405        if isinstance(column, exp.Dot):
406            continue
407
408        column_table: t.Optional[str | exp.Identifier] = column.table
409        if (
410            column_table
411            and column_table not in scope.sources
412            and (
413                not scope.parent
414                or column_table not in scope.parent.sources
415                or not scope.is_correlated_subquery
416            )
417        ):
418            root, *parts = column.parts
419
420            if root.name in scope.sources:
421                # The struct is already qualified, but we still need to change the AST
422                column_table = root
423                root, *parts = parts
424            else:
425                column_table = resolver.get_table(root.name)
426
427            if column_table:
428                converted = True
429                column.replace(exp.Dot.build([exp.column(root, table=column_table), *parts]))
430
431    if converted:
432        # We want to re-aggregate the converted columns, otherwise they'd be skipped in
433        # a `for column in scope.columns` iteration, even though they shouldn't be
434        scope.clear_cache()
435
436
437def _qualify_columns(scope: Scope, resolver: Resolver) -> None:
438    """Disambiguate columns, ensuring each column specifies a source"""
439    for column in scope.columns:
440        column_table = column.table
441        column_name = column.name
442
443        if column_table and column_table in scope.sources:
444            source_columns = resolver.get_source_columns(column_table)
445            if source_columns and column_name not in source_columns and "*" not in source_columns:
446                raise OptimizeError(f"Unknown column: {column_name}")
447
448        if not column_table:
449            if scope.pivots and not column.find_ancestor(exp.Pivot):
450                # If the column is under the Pivot expression, we need to qualify it
451                # using the name of the pivoted source instead of the pivot's alias
452                column.set("table", exp.to_identifier(scope.pivots[0].alias))
453                continue
454
455            # column_table can be a '' because bigquery unnest has no table alias
456            column_table = resolver.get_table(column_name)
457            if column_table:
458                column.set("table", column_table)
459
460    for pivot in scope.pivots:
461        for column in pivot.find_all(exp.Column):
462            if not column.table and column.name in resolver.all_columns:
463                column_table = resolver.get_table(column.name)
464                if column_table:
465                    column.set("table", column_table)
466
467
468def _expand_struct_stars(
469    expression: exp.Dot,
470) -> t.List[exp.Alias]:
471    """[BigQuery] Expand/Flatten foo.bar.* where bar is a struct column"""
472
473    dot_column = t.cast(exp.Column, expression.find(exp.Column))
474    if not dot_column.is_type(exp.DataType.Type.STRUCT):
475        return []
476
477    # All nested struct values are ColumnDefs, so normalize the first exp.Column in one
478    dot_column = dot_column.copy()
479    starting_struct = exp.ColumnDef(this=dot_column.this, kind=dot_column.type)
480
481    # First part is the table name and last part is the star so they can be dropped
482    dot_parts = expression.parts[1:-1]
483
484    # If we're expanding a nested struct eg. t.c.f1.f2.* find the last struct (f2 in this case)
485    for part in dot_parts[1:]:
486        for field in t.cast(exp.DataType, starting_struct.kind).expressions:
487            # Unable to expand star unless all fields are named
488            if not isinstance(field.this, exp.Identifier):
489                return []
490
491            if field.name == part.name and field.kind.is_type(exp.DataType.Type.STRUCT):
492                starting_struct = field
493                break
494        else:
495            # There is no matching field in the struct
496            return []
497
498    taken_names = set()
499    new_selections = []
500
501    for field in t.cast(exp.DataType, starting_struct.kind).expressions:
502        name = field.name
503
504        # Ambiguous or anonymous fields can't be expanded
505        if name in taken_names or not isinstance(field.this, exp.Identifier):
506            return []
507
508        taken_names.add(name)
509
510        this = field.this.copy()
511        root, *parts = [part.copy() for part in itertools.chain(dot_parts, [this])]
512        new_column = exp.column(
513            t.cast(exp.Identifier, root), table=dot_column.args.get("table"), fields=parts
514        )
515        new_selections.append(alias(new_column, this, copy=False))
516
517    return new_selections
518
519
520def _expand_stars(
521    scope: Scope,
522    resolver: Resolver,
523    using_column_tables: t.Dict[str, t.Any],
524    pseudocolumns: t.Set[str],
525    annotator: TypeAnnotator,
526) -> None:
527    """Expand stars to lists of column selections"""
528
529    new_selections: t.List[exp.Expression] = []
530    except_columns: t.Dict[int, t.Set[str]] = {}
531    replace_columns: t.Dict[int, t.Dict[str, exp.Alias]] = {}
532    rename_columns: t.Dict[int, t.Dict[str, str]] = {}
533
534    coalesced_columns = set()
535    dialect = resolver.schema.dialect
536
537    pivot_output_columns = None
538    pivot_exclude_columns = None
539
540    pivot = t.cast(t.Optional[exp.Pivot], seq_get(scope.pivots, 0))
541    if isinstance(pivot, exp.Pivot) and not pivot.alias_column_names:
542        if pivot.unpivot:
543            pivot_output_columns = [c.output_name for c in _unpivot_columns(pivot)]
544
545            field = pivot.args.get("field")
546            if isinstance(field, exp.In):
547                pivot_exclude_columns = {
548                    c.output_name for e in field.expressions for c in e.find_all(exp.Column)
549                }
550        else:
551            pivot_exclude_columns = set(c.output_name for c in pivot.find_all(exp.Column))
552
553            pivot_output_columns = [c.output_name for c in pivot.args.get("columns", [])]
554            if not pivot_output_columns:
555                pivot_output_columns = [c.alias_or_name for c in pivot.expressions]
556
557    is_bigquery = dialect == "bigquery"
558    if is_bigquery and any(isinstance(col, exp.Dot) for col in scope.stars):
559        # Found struct expansion, annotate scope ahead of time
560        annotator.annotate_scope(scope)
561
562    for expression in scope.expression.selects:
563        tables = []
564        if isinstance(expression, exp.Star):
565            tables.extend(scope.selected_sources)
566            _add_except_columns(expression, tables, except_columns)
567            _add_replace_columns(expression, tables, replace_columns)
568            _add_rename_columns(expression, tables, rename_columns)
569        elif expression.is_star:
570            if not isinstance(expression, exp.Dot):
571                tables.append(expression.table)
572                _add_except_columns(expression.this, tables, except_columns)
573                _add_replace_columns(expression.this, tables, replace_columns)
574                _add_rename_columns(expression.this, tables, rename_columns)
575            elif is_bigquery:
576                struct_fields = _expand_struct_stars(expression)
577                if struct_fields:
578                    new_selections.extend(struct_fields)
579                    continue
580
581        if not tables:
582            new_selections.append(expression)
583            continue
584
585        for table in tables:
586            if table not in scope.sources:
587                raise OptimizeError(f"Unknown table: {table}")
588
589            columns = resolver.get_source_columns(table, only_visible=True)
590            columns = columns or scope.outer_columns
591
592            if pseudocolumns:
593                columns = [name for name in columns if name.upper() not in pseudocolumns]
594
595            if not columns or "*" in columns:
596                return
597
598            table_id = id(table)
599            columns_to_exclude = except_columns.get(table_id) or set()
600            renamed_columns = rename_columns.get(table_id, {})
601            replaced_columns = replace_columns.get(table_id, {})
602
603            if pivot:
604                if pivot_output_columns and pivot_exclude_columns:
605                    pivot_columns = [c for c in columns if c not in pivot_exclude_columns]
606                    pivot_columns.extend(pivot_output_columns)
607                else:
608                    pivot_columns = pivot.alias_column_names
609
610                if pivot_columns:
611                    new_selections.extend(
612                        alias(exp.column(name, table=pivot.alias), name, copy=False)
613                        for name in pivot_columns
614                        if name not in columns_to_exclude
615                    )
616                    continue
617
618            for name in columns:
619                if name in columns_to_exclude or name in coalesced_columns:
620                    continue
621                if name in using_column_tables and table in using_column_tables[name]:
622                    coalesced_columns.add(name)
623                    tables = using_column_tables[name]
624                    coalesce_args = [exp.column(name, table=table) for table in tables]
625
626                    new_selections.append(
627                        alias(exp.func("coalesce", *coalesce_args), alias=name, copy=False)
628                    )
629                else:
630                    alias_ = renamed_columns.get(name, name)
631                    selection_expr = replaced_columns.get(name) or exp.column(name, table=table)
632                    new_selections.append(
633                        alias(selection_expr, alias_, copy=False)
634                        if alias_ != name
635                        else selection_expr
636                    )
637
638    # Ensures we don't overwrite the initial selections with an empty list
639    if new_selections and isinstance(scope.expression, exp.Select):
640        scope.expression.set("expressions", new_selections)
641
642
643def _add_except_columns(
644    expression: exp.Expression, tables, except_columns: t.Dict[int, t.Set[str]]
645) -> None:
646    except_ = expression.args.get("except")
647
648    if not except_:
649        return
650
651    columns = {e.name for e in except_}
652
653    for table in tables:
654        except_columns[id(table)] = columns
655
656
657def _add_rename_columns(
658    expression: exp.Expression, tables, rename_columns: t.Dict[int, t.Dict[str, str]]
659) -> None:
660    rename = expression.args.get("rename")
661
662    if not rename:
663        return
664
665    columns = {e.this.name: e.alias for e in rename}
666
667    for table in tables:
668        rename_columns[id(table)] = columns
669
670
671def _add_replace_columns(
672    expression: exp.Expression, tables, replace_columns: t.Dict[int, t.Dict[str, exp.Alias]]
673) -> None:
674    replace = expression.args.get("replace")
675
676    if not replace:
677        return
678
679    columns = {e.alias: e for e in replace}
680
681    for table in tables:
682        replace_columns[id(table)] = columns
683
684
685def qualify_outputs(scope_or_expression: Scope | exp.Expression) -> None:
686    """Ensure all output columns are aliased"""
687    if isinstance(scope_or_expression, exp.Expression):
688        scope = build_scope(scope_or_expression)
689        if not isinstance(scope, Scope):
690            return
691    else:
692        scope = scope_or_expression
693
694    new_selections = []
695    for i, (selection, aliased_column) in enumerate(
696        itertools.zip_longest(scope.expression.selects, scope.outer_columns)
697    ):
698        if selection is None:
699            break
700
701        if isinstance(selection, exp.Subquery):
702            if not selection.output_name:
703                selection.set("alias", exp.TableAlias(this=exp.to_identifier(f"_col_{i}")))
704        elif not isinstance(selection, exp.Alias) and not selection.is_star:
705            selection = alias(
706                selection,
707                alias=selection.output_name or f"_col_{i}",
708                copy=False,
709            )
710        if aliased_column:
711            selection.set("alias", exp.to_identifier(aliased_column))
712
713        new_selections.append(selection)
714
715    if isinstance(scope.expression, exp.Select):
716        scope.expression.set("expressions", new_selections)
717
718
719def quote_identifiers(expression: E, dialect: DialectType = None, identify: bool = True) -> E:
720    """Makes sure all identifiers that need to be quoted are quoted."""
721    return expression.transform(
722        Dialect.get_or_raise(dialect).quote_identifier, identify=identify, copy=False
723    )  # type: ignore
724
725
726def pushdown_cte_alias_columns(expression: exp.Expression) -> exp.Expression:
727    """
728    Pushes down the CTE alias columns into the projection,
729
730    This step is useful in Snowflake where the CTE alias columns can be referenced in the HAVING.
731
732    Example:
733        >>> import sqlglot
734        >>> expression = sqlglot.parse_one("WITH y (c) AS (SELECT SUM(a) FROM ( SELECT 1 a ) AS x HAVING c > 0) SELECT c FROM y")
735        >>> pushdown_cte_alias_columns(expression).sql()
736        'WITH y(c) AS (SELECT SUM(a) AS c FROM (SELECT 1 AS a) AS x HAVING c > 0) SELECT c FROM y'
737
738    Args:
739        expression: Expression to pushdown.
740
741    Returns:
742        The expression with the CTE aliases pushed down into the projection.
743    """
744    for cte in expression.find_all(exp.CTE):
745        if cte.alias_column_names:
746            new_expressions = []
747            for _alias, projection in zip(cte.alias_column_names, cte.this.expressions):
748                if isinstance(projection, exp.Alias):
749                    projection.set("alias", _alias)
750                else:
751                    projection = alias(projection, alias=_alias)
752                new_expressions.append(projection)
753            cte.this.set("expressions", new_expressions)
754
755    return expression
756
757
758class Resolver:
759    """
760    Helper for resolving columns.
761
762    This is a class so we can lazily load some things and easily share them across functions.
763    """
764
765    def __init__(self, scope: Scope, schema: Schema, infer_schema: bool = True):
766        self.scope = scope
767        self.schema = schema
768        self._source_columns: t.Optional[t.Dict[str, t.Sequence[str]]] = None
769        self._unambiguous_columns: t.Optional[t.Mapping[str, str]] = None
770        self._all_columns: t.Optional[t.Set[str]] = None
771        self._infer_schema = infer_schema
772        self._get_source_columns_cache: t.Dict[t.Tuple[str, bool], t.Sequence[str]] = {}
773
774    def get_table(self, column_name: str) -> t.Optional[exp.Identifier]:
775        """
776        Get the table for a column name.
777
778        Args:
779            column_name: The column name to find the table for.
780        Returns:
781            The table name if it can be found/inferred.
782        """
783        if self._unambiguous_columns is None:
784            self._unambiguous_columns = self._get_unambiguous_columns(
785                self._get_all_source_columns()
786            )
787
788        table_name = self._unambiguous_columns.get(column_name)
789
790        if not table_name and self._infer_schema:
791            sources_without_schema = tuple(
792                source
793                for source, columns in self._get_all_source_columns().items()
794                if not columns or "*" in columns
795            )
796            if len(sources_without_schema) == 1:
797                table_name = sources_without_schema[0]
798
799        if table_name not in self.scope.selected_sources:
800            return exp.to_identifier(table_name)
801
802        node, _ = self.scope.selected_sources.get(table_name)
803
804        if isinstance(node, exp.Query):
805            while node and node.alias != table_name:
806                node = node.parent
807
808        node_alias = node.args.get("alias")
809        if node_alias:
810            return exp.to_identifier(node_alias.this)
811
812        return exp.to_identifier(table_name)
813
814    @property
815    def all_columns(self) -> t.Set[str]:
816        """All available columns of all sources in this scope"""
817        if self._all_columns is None:
818            self._all_columns = {
819                column for columns in self._get_all_source_columns().values() for column in columns
820            }
821        return self._all_columns
822
823    def get_source_columns(self, name: str, only_visible: bool = False) -> t.Sequence[str]:
824        """Resolve the source columns for a given source `name`."""
825        cache_key = (name, only_visible)
826        if cache_key not in self._get_source_columns_cache:
827            if name not in self.scope.sources:
828                raise OptimizeError(f"Unknown table: {name}")
829
830            source = self.scope.sources[name]
831
832            if isinstance(source, exp.Table):
833                columns = self.schema.column_names(source, only_visible)
834            elif isinstance(source, Scope) and isinstance(
835                source.expression, (exp.Values, exp.Unnest)
836            ):
837                columns = source.expression.named_selects
838
839                # in bigquery, unnest structs are automatically scoped as tables, so you can
840                # directly select a struct field in a query.
841                # this handles the case where the unnest is statically defined.
842                if self.schema.dialect == "bigquery":
843                    if source.expression.is_type(exp.DataType.Type.STRUCT):
844                        for k in source.expression.type.expressions:  # type: ignore
845                            columns.append(k.name)
846            else:
847                columns = source.expression.named_selects
848
849            node, _ = self.scope.selected_sources.get(name) or (None, None)
850            if isinstance(node, Scope):
851                column_aliases = node.expression.alias_column_names
852            elif isinstance(node, exp.Expression):
853                column_aliases = node.alias_column_names
854            else:
855                column_aliases = []
856
857            if column_aliases:
858                # If the source's columns are aliased, their aliases shadow the corresponding column names.
859                # This can be expensive if there are lots of columns, so only do this if column_aliases exist.
860                columns = [
861                    alias or name
862                    for (name, alias) in itertools.zip_longest(columns, column_aliases)
863                ]
864
865            self._get_source_columns_cache[cache_key] = columns
866
867        return self._get_source_columns_cache[cache_key]
868
869    def _get_all_source_columns(self) -> t.Dict[str, t.Sequence[str]]:
870        if self._source_columns is None:
871            self._source_columns = {
872                source_name: self.get_source_columns(source_name)
873                for source_name, source in itertools.chain(
874                    self.scope.selected_sources.items(), self.scope.lateral_sources.items()
875                )
876            }
877        return self._source_columns
878
879    def _get_unambiguous_columns(
880        self, source_columns: t.Dict[str, t.Sequence[str]]
881    ) -> t.Mapping[str, str]:
882        """
883        Find all the unambiguous columns in sources.
884
885        Args:
886            source_columns: Mapping of names to source columns.
887
888        Returns:
889            Mapping of column name to source name.
890        """
891        if not source_columns:
892            return {}
893
894        source_columns_pairs = list(source_columns.items())
895
896        first_table, first_columns = source_columns_pairs[0]
897
898        if len(source_columns_pairs) == 1:
899            # Performance optimization - avoid copying first_columns if there is only one table.
900            return SingleValuedMapping(first_columns, first_table)
901
902        unambiguous_columns = {col: first_table for col in first_columns}
903        all_columns = set(unambiguous_columns)
904
905        for table, columns in source_columns_pairs[1:]:
906            unique = set(columns)
907            ambiguous = all_columns.intersection(unique)
908            all_columns.update(columns)
909
910            for column in ambiguous:
911                unambiguous_columns.pop(column, None)
912            for column in unique.difference(ambiguous):
913                unambiguous_columns[column] = table
914
915        return unambiguous_columns
def qualify_columns( expression: sqlglot.expressions.Expression, schema: Union[Dict, sqlglot.schema.Schema], expand_alias_refs: bool = True, expand_stars: bool = True, infer_schema: Optional[bool] = None) -> sqlglot.expressions.Expression:
20def qualify_columns(
21    expression: exp.Expression,
22    schema: t.Dict | Schema,
23    expand_alias_refs: bool = True,
24    expand_stars: bool = True,
25    infer_schema: t.Optional[bool] = None,
26) -> exp.Expression:
27    """
28    Rewrite sqlglot AST to have fully qualified columns.
29
30    Example:
31        >>> import sqlglot
32        >>> schema = {"tbl": {"col": "INT"}}
33        >>> expression = sqlglot.parse_one("SELECT col FROM tbl")
34        >>> qualify_columns(expression, schema).sql()
35        'SELECT tbl.col AS col FROM tbl'
36
37    Args:
38        expression: Expression to qualify.
39        schema: Database schema.
40        expand_alias_refs: Whether to expand references to aliases.
41        expand_stars: Whether to expand star queries. This is a necessary step
42            for most of the optimizer's rules to work; do not set to False unless you
43            know what you're doing!
44        infer_schema: Whether to infer the schema if missing.
45
46    Returns:
47        The qualified expression.
48
49    Notes:
50        - Currently only handles a single PIVOT or UNPIVOT operator
51    """
52    schema = ensure_schema(schema)
53    annotator = TypeAnnotator(schema)
54    infer_schema = schema.empty if infer_schema is None else infer_schema
55    dialect = Dialect.get_or_raise(schema.dialect)
56    pseudocolumns = dialect.PSEUDOCOLUMNS
57
58    for scope in traverse_scope(expression):
59        resolver = Resolver(scope, schema, infer_schema=infer_schema)
60        _pop_table_column_aliases(scope.ctes)
61        _pop_table_column_aliases(scope.derived_tables)
62        using_column_tables = _expand_using(scope, resolver)
63
64        if (schema.empty or dialect.FORCE_EARLY_ALIAS_REF_EXPANSION) and expand_alias_refs:
65            _expand_alias_refs(
66                scope,
67                resolver,
68                expand_only_groupby=dialect.EXPAND_ALIAS_REFS_EARLY_ONLY_IN_GROUP_BY,
69            )
70
71        _convert_columns_to_dots(scope, resolver)
72        _qualify_columns(scope, resolver)
73
74        if not schema.empty and expand_alias_refs:
75            _expand_alias_refs(scope, resolver)
76
77        if not isinstance(scope.expression, exp.UDTF):
78            if expand_stars:
79                _expand_stars(
80                    scope,
81                    resolver,
82                    using_column_tables,
83                    pseudocolumns,
84                    annotator,
85                )
86            qualify_outputs(scope)
87
88        _expand_group_by(scope, dialect)
89        _expand_order_by(scope, resolver)
90
91        if dialect == "bigquery":
92            annotator.annotate_scope(scope)
93
94    return expression

Rewrite sqlglot AST to have fully qualified columns.

Example:
>>> import sqlglot
>>> schema = {"tbl": {"col": "INT"}}
>>> expression = sqlglot.parse_one("SELECT col FROM tbl")
>>> qualify_columns(expression, schema).sql()
'SELECT tbl.col AS col FROM tbl'
Arguments:
  • expression: Expression to qualify.
  • schema: Database schema.
  • expand_alias_refs: Whether to expand references to aliases.
  • expand_stars: Whether to expand star queries. This is a necessary step for most of the optimizer's rules to work; do not set to False unless you know what you're doing!
  • infer_schema: Whether to infer the schema if missing.
Returns:

The qualified expression.

Notes:
  • Currently only handles a single PIVOT or UNPIVOT operator
def validate_qualify_columns(expression: ~E) -> ~E:
 97def validate_qualify_columns(expression: E) -> E:
 98    """Raise an `OptimizeError` if any columns aren't qualified"""
 99    all_unqualified_columns = []
100    for scope in traverse_scope(expression):
101        if isinstance(scope.expression, exp.Select):
102            unqualified_columns = scope.unqualified_columns
103
104            if scope.external_columns and not scope.is_correlated_subquery and not scope.pivots:
105                column = scope.external_columns[0]
106                for_table = f" for table: '{column.table}'" if column.table else ""
107                raise OptimizeError(f"Column '{column}' could not be resolved{for_table}")
108
109            if unqualified_columns and scope.pivots and scope.pivots[0].unpivot:
110                # New columns produced by the UNPIVOT can't be qualified, but there may be columns
111                # under the UNPIVOT's IN clause that can and should be qualified. We recompute
112                # this list here to ensure those in the former category will be excluded.
113                unpivot_columns = set(_unpivot_columns(scope.pivots[0]))
114                unqualified_columns = [c for c in unqualified_columns if c not in unpivot_columns]
115
116            all_unqualified_columns.extend(unqualified_columns)
117
118    if all_unqualified_columns:
119        raise OptimizeError(f"Ambiguous columns: {all_unqualified_columns}")
120
121    return expression

Raise an OptimizeError if any columns aren't qualified

def qualify_outputs( scope_or_expression: sqlglot.optimizer.scope.Scope | sqlglot.expressions.Expression) -> None:
686def qualify_outputs(scope_or_expression: Scope | exp.Expression) -> None:
687    """Ensure all output columns are aliased"""
688    if isinstance(scope_or_expression, exp.Expression):
689        scope = build_scope(scope_or_expression)
690        if not isinstance(scope, Scope):
691            return
692    else:
693        scope = scope_or_expression
694
695    new_selections = []
696    for i, (selection, aliased_column) in enumerate(
697        itertools.zip_longest(scope.expression.selects, scope.outer_columns)
698    ):
699        if selection is None:
700            break
701
702        if isinstance(selection, exp.Subquery):
703            if not selection.output_name:
704                selection.set("alias", exp.TableAlias(this=exp.to_identifier(f"_col_{i}")))
705        elif not isinstance(selection, exp.Alias) and not selection.is_star:
706            selection = alias(
707                selection,
708                alias=selection.output_name or f"_col_{i}",
709                copy=False,
710            )
711        if aliased_column:
712            selection.set("alias", exp.to_identifier(aliased_column))
713
714        new_selections.append(selection)
715
716    if isinstance(scope.expression, exp.Select):
717        scope.expression.set("expressions", new_selections)

Ensure all output columns are aliased

def quote_identifiers( expression: ~E, dialect: Union[str, sqlglot.dialects.dialect.Dialect, Type[sqlglot.dialects.dialect.Dialect], NoneType] = None, identify: bool = True) -> ~E:
720def quote_identifiers(expression: E, dialect: DialectType = None, identify: bool = True) -> E:
721    """Makes sure all identifiers that need to be quoted are quoted."""
722    return expression.transform(
723        Dialect.get_or_raise(dialect).quote_identifier, identify=identify, copy=False
724    )  # type: ignore

Makes sure all identifiers that need to be quoted are quoted.

def pushdown_cte_alias_columns( expression: sqlglot.expressions.Expression) -> sqlglot.expressions.Expression:
727def pushdown_cte_alias_columns(expression: exp.Expression) -> exp.Expression:
728    """
729    Pushes down the CTE alias columns into the projection,
730
731    This step is useful in Snowflake where the CTE alias columns can be referenced in the HAVING.
732
733    Example:
734        >>> import sqlglot
735        >>> expression = sqlglot.parse_one("WITH y (c) AS (SELECT SUM(a) FROM ( SELECT 1 a ) AS x HAVING c > 0) SELECT c FROM y")
736        >>> pushdown_cte_alias_columns(expression).sql()
737        'WITH y(c) AS (SELECT SUM(a) AS c FROM (SELECT 1 AS a) AS x HAVING c > 0) SELECT c FROM y'
738
739    Args:
740        expression: Expression to pushdown.
741
742    Returns:
743        The expression with the CTE aliases pushed down into the projection.
744    """
745    for cte in expression.find_all(exp.CTE):
746        if cte.alias_column_names:
747            new_expressions = []
748            for _alias, projection in zip(cte.alias_column_names, cte.this.expressions):
749                if isinstance(projection, exp.Alias):
750                    projection.set("alias", _alias)
751                else:
752                    projection = alias(projection, alias=_alias)
753                new_expressions.append(projection)
754            cte.this.set("expressions", new_expressions)
755
756    return expression

Pushes down the CTE alias columns into the projection,

This step is useful in Snowflake where the CTE alias columns can be referenced in the HAVING.

Example:
>>> import sqlglot
>>> expression = sqlglot.parse_one("WITH y (c) AS (SELECT SUM(a) FROM ( SELECT 1 a ) AS x HAVING c > 0) SELECT c FROM y")
>>> pushdown_cte_alias_columns(expression).sql()
'WITH y(c) AS (SELECT SUM(a) AS c FROM (SELECT 1 AS a) AS x HAVING c > 0) SELECT c FROM y'
Arguments:
  • expression: Expression to pushdown.
Returns:

The expression with the CTE aliases pushed down into the projection.

class Resolver:
759class Resolver:
760    """
761    Helper for resolving columns.
762
763    This is a class so we can lazily load some things and easily share them across functions.
764    """
765
766    def __init__(self, scope: Scope, schema: Schema, infer_schema: bool = True):
767        self.scope = scope
768        self.schema = schema
769        self._source_columns: t.Optional[t.Dict[str, t.Sequence[str]]] = None
770        self._unambiguous_columns: t.Optional[t.Mapping[str, str]] = None
771        self._all_columns: t.Optional[t.Set[str]] = None
772        self._infer_schema = infer_schema
773        self._get_source_columns_cache: t.Dict[t.Tuple[str, bool], t.Sequence[str]] = {}
774
775    def get_table(self, column_name: str) -> t.Optional[exp.Identifier]:
776        """
777        Get the table for a column name.
778
779        Args:
780            column_name: The column name to find the table for.
781        Returns:
782            The table name if it can be found/inferred.
783        """
784        if self._unambiguous_columns is None:
785            self._unambiguous_columns = self._get_unambiguous_columns(
786                self._get_all_source_columns()
787            )
788
789        table_name = self._unambiguous_columns.get(column_name)
790
791        if not table_name and self._infer_schema:
792            sources_without_schema = tuple(
793                source
794                for source, columns in self._get_all_source_columns().items()
795                if not columns or "*" in columns
796            )
797            if len(sources_without_schema) == 1:
798                table_name = sources_without_schema[0]
799
800        if table_name not in self.scope.selected_sources:
801            return exp.to_identifier(table_name)
802
803        node, _ = self.scope.selected_sources.get(table_name)
804
805        if isinstance(node, exp.Query):
806            while node and node.alias != table_name:
807                node = node.parent
808
809        node_alias = node.args.get("alias")
810        if node_alias:
811            return exp.to_identifier(node_alias.this)
812
813        return exp.to_identifier(table_name)
814
815    @property
816    def all_columns(self) -> t.Set[str]:
817        """All available columns of all sources in this scope"""
818        if self._all_columns is None:
819            self._all_columns = {
820                column for columns in self._get_all_source_columns().values() for column in columns
821            }
822        return self._all_columns
823
824    def get_source_columns(self, name: str, only_visible: bool = False) -> t.Sequence[str]:
825        """Resolve the source columns for a given source `name`."""
826        cache_key = (name, only_visible)
827        if cache_key not in self._get_source_columns_cache:
828            if name not in self.scope.sources:
829                raise OptimizeError(f"Unknown table: {name}")
830
831            source = self.scope.sources[name]
832
833            if isinstance(source, exp.Table):
834                columns = self.schema.column_names(source, only_visible)
835            elif isinstance(source, Scope) and isinstance(
836                source.expression, (exp.Values, exp.Unnest)
837            ):
838                columns = source.expression.named_selects
839
840                # in bigquery, unnest structs are automatically scoped as tables, so you can
841                # directly select a struct field in a query.
842                # this handles the case where the unnest is statically defined.
843                if self.schema.dialect == "bigquery":
844                    if source.expression.is_type(exp.DataType.Type.STRUCT):
845                        for k in source.expression.type.expressions:  # type: ignore
846                            columns.append(k.name)
847            else:
848                columns = source.expression.named_selects
849
850            node, _ = self.scope.selected_sources.get(name) or (None, None)
851            if isinstance(node, Scope):
852                column_aliases = node.expression.alias_column_names
853            elif isinstance(node, exp.Expression):
854                column_aliases = node.alias_column_names
855            else:
856                column_aliases = []
857
858            if column_aliases:
859                # If the source's columns are aliased, their aliases shadow the corresponding column names.
860                # This can be expensive if there are lots of columns, so only do this if column_aliases exist.
861                columns = [
862                    alias or name
863                    for (name, alias) in itertools.zip_longest(columns, column_aliases)
864                ]
865
866            self._get_source_columns_cache[cache_key] = columns
867
868        return self._get_source_columns_cache[cache_key]
869
870    def _get_all_source_columns(self) -> t.Dict[str, t.Sequence[str]]:
871        if self._source_columns is None:
872            self._source_columns = {
873                source_name: self.get_source_columns(source_name)
874                for source_name, source in itertools.chain(
875                    self.scope.selected_sources.items(), self.scope.lateral_sources.items()
876                )
877            }
878        return self._source_columns
879
880    def _get_unambiguous_columns(
881        self, source_columns: t.Dict[str, t.Sequence[str]]
882    ) -> t.Mapping[str, str]:
883        """
884        Find all the unambiguous columns in sources.
885
886        Args:
887            source_columns: Mapping of names to source columns.
888
889        Returns:
890            Mapping of column name to source name.
891        """
892        if not source_columns:
893            return {}
894
895        source_columns_pairs = list(source_columns.items())
896
897        first_table, first_columns = source_columns_pairs[0]
898
899        if len(source_columns_pairs) == 1:
900            # Performance optimization - avoid copying first_columns if there is only one table.
901            return SingleValuedMapping(first_columns, first_table)
902
903        unambiguous_columns = {col: first_table for col in first_columns}
904        all_columns = set(unambiguous_columns)
905
906        for table, columns in source_columns_pairs[1:]:
907            unique = set(columns)
908            ambiguous = all_columns.intersection(unique)
909            all_columns.update(columns)
910
911            for column in ambiguous:
912                unambiguous_columns.pop(column, None)
913            for column in unique.difference(ambiguous):
914                unambiguous_columns[column] = table
915
916        return unambiguous_columns

Helper for resolving columns.

This is a class so we can lazily load some things and easily share them across functions.

Resolver( scope: sqlglot.optimizer.scope.Scope, schema: sqlglot.schema.Schema, infer_schema: bool = True)
766    def __init__(self, scope: Scope, schema: Schema, infer_schema: bool = True):
767        self.scope = scope
768        self.schema = schema
769        self._source_columns: t.Optional[t.Dict[str, t.Sequence[str]]] = None
770        self._unambiguous_columns: t.Optional[t.Mapping[str, str]] = None
771        self._all_columns: t.Optional[t.Set[str]] = None
772        self._infer_schema = infer_schema
773        self._get_source_columns_cache: t.Dict[t.Tuple[str, bool], t.Sequence[str]] = {}
scope
schema
def get_table(self, column_name: str) -> Optional[sqlglot.expressions.Identifier]:
775    def get_table(self, column_name: str) -> t.Optional[exp.Identifier]:
776        """
777        Get the table for a column name.
778
779        Args:
780            column_name: The column name to find the table for.
781        Returns:
782            The table name if it can be found/inferred.
783        """
784        if self._unambiguous_columns is None:
785            self._unambiguous_columns = self._get_unambiguous_columns(
786                self._get_all_source_columns()
787            )
788
789        table_name = self._unambiguous_columns.get(column_name)
790
791        if not table_name and self._infer_schema:
792            sources_without_schema = tuple(
793                source
794                for source, columns in self._get_all_source_columns().items()
795                if not columns or "*" in columns
796            )
797            if len(sources_without_schema) == 1:
798                table_name = sources_without_schema[0]
799
800        if table_name not in self.scope.selected_sources:
801            return exp.to_identifier(table_name)
802
803        node, _ = self.scope.selected_sources.get(table_name)
804
805        if isinstance(node, exp.Query):
806            while node and node.alias != table_name:
807                node = node.parent
808
809        node_alias = node.args.get("alias")
810        if node_alias:
811            return exp.to_identifier(node_alias.this)
812
813        return exp.to_identifier(table_name)

Get the table for a column name.

Arguments:
  • column_name: The column name to find the table for.
Returns:

The table name if it can be found/inferred.

all_columns: Set[str]
815    @property
816    def all_columns(self) -> t.Set[str]:
817        """All available columns of all sources in this scope"""
818        if self._all_columns is None:
819            self._all_columns = {
820                column for columns in self._get_all_source_columns().values() for column in columns
821            }
822        return self._all_columns

All available columns of all sources in this scope

def get_source_columns(self, name: str, only_visible: bool = False) -> Sequence[str]:
824    def get_source_columns(self, name: str, only_visible: bool = False) -> t.Sequence[str]:
825        """Resolve the source columns for a given source `name`."""
826        cache_key = (name, only_visible)
827        if cache_key not in self._get_source_columns_cache:
828            if name not in self.scope.sources:
829                raise OptimizeError(f"Unknown table: {name}")
830
831            source = self.scope.sources[name]
832
833            if isinstance(source, exp.Table):
834                columns = self.schema.column_names(source, only_visible)
835            elif isinstance(source, Scope) and isinstance(
836                source.expression, (exp.Values, exp.Unnest)
837            ):
838                columns = source.expression.named_selects
839
840                # in bigquery, unnest structs are automatically scoped as tables, so you can
841                # directly select a struct field in a query.
842                # this handles the case where the unnest is statically defined.
843                if self.schema.dialect == "bigquery":
844                    if source.expression.is_type(exp.DataType.Type.STRUCT):
845                        for k in source.expression.type.expressions:  # type: ignore
846                            columns.append(k.name)
847            else:
848                columns = source.expression.named_selects
849
850            node, _ = self.scope.selected_sources.get(name) or (None, None)
851            if isinstance(node, Scope):
852                column_aliases = node.expression.alias_column_names
853            elif isinstance(node, exp.Expression):
854                column_aliases = node.alias_column_names
855            else:
856                column_aliases = []
857
858            if column_aliases:
859                # If the source's columns are aliased, their aliases shadow the corresponding column names.
860                # This can be expensive if there are lots of columns, so only do this if column_aliases exist.
861                columns = [
862                    alias or name
863                    for (name, alias) in itertools.zip_longest(columns, column_aliases)
864                ]
865
866            self._get_source_columns_cache[cache_key] = columns
867
868        return self._get_source_columns_cache[cache_key]

Resolve the source columns for a given source name.