Edit on GitHub

sqlglot.helper

  1from __future__ import annotations
  2
  3import datetime
  4import inspect
  5import logging
  6import re
  7import sys
  8import typing as t
  9from collections.abc import Collection, Set
 10from copy import copy
 11from difflib import get_close_matches
 12from enum import Enum
 13from itertools import count
 14
 15if t.TYPE_CHECKING:
 16    from sqlglot import exp
 17    from sqlglot._typing import A, E, T
 18    from sqlglot.dialects.dialect import DialectType
 19    from sqlglot.expressions import Expression
 20
 21
 22CAMEL_CASE_PATTERN = re.compile("(?<!^)(?=[A-Z])")
 23PYTHON_VERSION = sys.version_info[:2]
 24logger = logging.getLogger("sqlglot")
 25
 26
 27class AutoName(Enum):
 28    """
 29    This is used for creating Enum classes where `auto()` is the string form
 30    of the corresponding enum's identifier (e.g. FOO.value results in "FOO").
 31
 32    Reference: https://docs.python.org/3/howto/enum.html#using-automatic-values
 33    """
 34
 35    def _generate_next_value_(name, _start, _count, _last_values):
 36        return name
 37
 38
 39class classproperty(property):
 40    """
 41    Similar to a normal property but works for class methods
 42    """
 43
 44    def __get__(self, obj: t.Any, owner: t.Any = None) -> t.Any:
 45        return classmethod(self.fget).__get__(None, owner)()  # type: ignore
 46
 47
 48def suggest_closest_match_and_fail(
 49    kind: str,
 50    word: str,
 51    possibilities: t.Iterable[str],
 52) -> None:
 53    close_matches = get_close_matches(word, possibilities, n=1)
 54
 55    similar = seq_get(close_matches, 0) or ""
 56    if similar:
 57        similar = f" Did you mean {similar}?"
 58
 59    raise ValueError(f"Unknown {kind} '{word}'.{similar}")
 60
 61
 62def seq_get(seq: t.Sequence[T], index: int) -> t.Optional[T]:
 63    """Returns the value in `seq` at position `index`, or `None` if `index` is out of bounds."""
 64    try:
 65        return seq[index]
 66    except IndexError:
 67        return None
 68
 69
 70@t.overload
 71def ensure_list(value: t.Collection[T]) -> t.List[T]: ...
 72
 73
 74@t.overload
 75def ensure_list(value: None) -> t.List: ...
 76
 77
 78@t.overload
 79def ensure_list(value: T) -> t.List[T]: ...
 80
 81
 82def ensure_list(value):
 83    """
 84    Ensures that a value is a list, otherwise casts or wraps it into one.
 85
 86    Args:
 87        value: The value of interest.
 88
 89    Returns:
 90        The value cast as a list if it's a list or a tuple, or else the value wrapped in a list.
 91    """
 92    if value is None:
 93        return []
 94    if isinstance(value, (list, tuple)):
 95        return list(value)
 96
 97    return [value]
 98
 99
100@t.overload
101def ensure_collection(value: t.Collection[T]) -> t.Collection[T]: ...
102
103
104@t.overload
105def ensure_collection(value: T) -> t.Collection[T]: ...
106
107
108def ensure_collection(value):
109    """
110    Ensures that a value is a collection (excluding `str` and `bytes`), otherwise wraps it into a list.
111
112    Args:
113        value: The value of interest.
114
115    Returns:
116        The value if it's a collection, or else the value wrapped in a list.
117    """
118    if value is None:
119        return []
120    return (
121        value if isinstance(value, Collection) and not isinstance(value, (str, bytes)) else [value]
122    )
123
124
125def csv(*args: str, sep: str = ", ") -> str:
126    """
127    Formats any number of string arguments as CSV.
128
129    Args:
130        args: The string arguments to format.
131        sep: The argument separator.
132
133    Returns:
134        The arguments formatted as a CSV string.
135    """
136    return sep.join(arg for arg in args if arg)
137
138
139def subclasses(
140    module_name: str,
141    classes: t.Type | t.Tuple[t.Type, ...],
142    exclude: t.Set[t.Type] = set(),
143) -> t.List[t.Type]:
144    """
145    Returns all subclasses for a collection of classes, possibly excluding some of them.
146
147    Args:
148        module_name: The name of the module to search for subclasses in.
149        classes: Class(es) we want to find the subclasses of.
150        exclude: Classes we want to exclude from the returned list.
151
152    Returns:
153        The target subclasses.
154    """
155    return [
156        obj
157        for _, obj in inspect.getmembers(
158            sys.modules[module_name],
159            lambda obj: inspect.isclass(obj) and issubclass(obj, classes) and obj not in exclude,
160        )
161    ]
162
163
164def apply_index_offset(
165    this: exp.Expression,
166    expressions: t.List[E],
167    offset: int,
168    dialect: DialectType = None,
169) -> t.List[E]:
170    """
171    Applies an offset to a given integer literal expression.
172
173    Args:
174        this: The target of the index.
175        expressions: The expression the offset will be applied to, wrapped in a list.
176        offset: The offset that will be applied.
177        dialect: the dialect of interest.
178
179    Returns:
180        The original expression with the offset applied to it, wrapped in a list. If the provided
181        `expressions` argument contains more than one expression, it's returned unaffected.
182    """
183    if not offset or len(expressions) != 1:
184        return expressions
185
186    expression = expressions[0]
187
188    from sqlglot import exp
189    from sqlglot.optimizer.annotate_types import annotate_types
190    from sqlglot.optimizer.simplify import simplify
191
192    if not this.type:
193        annotate_types(this, dialect=dialect)
194
195    if t.cast(exp.DataType, this.type).this not in (
196        exp.DataType.Type.UNKNOWN,
197        exp.DataType.Type.ARRAY,
198    ):
199        return expressions
200
201    if not expression.type:
202        annotate_types(expression, dialect=dialect)
203
204    if t.cast(exp.DataType, expression.type).this in exp.DataType.INTEGER_TYPES:
205        logger.info("Applying array index offset (%s)", offset)
206        expression = simplify(expression + offset)
207        return [expression]
208
209    return expressions
210
211
212def camel_to_snake_case(name: str) -> str:
213    """Converts `name` from camelCase to snake_case and returns the result."""
214    return CAMEL_CASE_PATTERN.sub("_", name).upper()
215
216
217def while_changing(expression: Expression, func: t.Callable[[Expression], E]) -> E:
218    """
219    Applies a transformation to a given expression until a fix point is reached.
220
221    Args:
222        expression: The expression to be transformed.
223        func: The transformation to be applied.
224
225    Returns:
226        The transformed expression.
227    """
228
229    while True:
230        start_hash = hash(expression)
231        expression = func(expression)
232        end_hash = hash(expression)
233
234        if start_hash == end_hash:
235            break
236
237    return expression
238
239
240def tsort(dag: t.Dict[T, t.Set[T]]) -> t.List[T]:
241    """
242    Sorts a given directed acyclic graph in topological order.
243
244    Args:
245        dag: The graph to be sorted.
246
247    Returns:
248        A list that contains all of the graph's nodes in topological order.
249    """
250    result = []
251
252    for node, deps in tuple(dag.items()):
253        for dep in deps:
254            if dep not in dag:
255                dag[dep] = set()
256
257    while dag:
258        current = {node for node, deps in dag.items() if not deps}
259
260        if not current:
261            raise ValueError("Cycle error")
262
263        for node in current:
264            dag.pop(node)
265
266        for deps in dag.values():
267            deps -= current
268
269        result.extend(sorted(current))  # type: ignore
270
271    return result
272
273
274def find_new_name(taken: t.Collection[str], base: str) -> str:
275    """
276    Searches for a new name.
277
278    Args:
279        taken: A collection of taken names.
280        base: Base name to alter.
281
282    Returns:
283        The new, available name.
284    """
285    if base not in taken:
286        return base
287
288    i = 2
289    new = f"{base}_{i}"
290    while new in taken:
291        i += 1
292        new = f"{base}_{i}"
293
294    return new
295
296
297def is_int(text: str) -> bool:
298    return is_type(text, int)
299
300
301def is_float(text: str) -> bool:
302    return is_type(text, float)
303
304
305def is_type(text: str, target_type: t.Type) -> bool:
306    try:
307        target_type(text)
308        return True
309    except ValueError:
310        return False
311
312
313def name_sequence(prefix: str) -> t.Callable[[], str]:
314    """Returns a name generator given a prefix (e.g. a0, a1, a2, ... if the prefix is "a")."""
315    sequence = count()
316    return lambda: f"{prefix}{next(sequence)}"
317
318
319def object_to_dict(obj: t.Any, **kwargs) -> t.Dict:
320    """Returns a dictionary created from an object's attributes."""
321    return {
322        **{k: v.copy() if hasattr(v, "copy") else copy(v) for k, v in vars(obj).items()},
323        **kwargs,
324    }
325
326
327def split_num_words(
328    value: str, sep: str, min_num_words: int, fill_from_start: bool = True
329) -> t.List[t.Optional[str]]:
330    """
331    Perform a split on a value and return N words as a result with `None` used for words that don't exist.
332
333    Args:
334        value: The value to be split.
335        sep: The value to use to split on.
336        min_num_words: The minimum number of words that are going to be in the result.
337        fill_from_start: Indicates that if `None` values should be inserted at the start or end of the list.
338
339    Examples:
340        >>> split_num_words("db.table", ".", 3)
341        [None, 'db', 'table']
342        >>> split_num_words("db.table", ".", 3, fill_from_start=False)
343        ['db', 'table', None]
344        >>> split_num_words("db.table", ".", 1)
345        ['db', 'table']
346
347    Returns:
348        The list of words returned by `split`, possibly augmented by a number of `None` values.
349    """
350    words = value.split(sep)
351    if fill_from_start:
352        return [None] * (min_num_words - len(words)) + words
353    return words + [None] * (min_num_words - len(words))
354
355
356def is_iterable(value: t.Any) -> bool:
357    """
358    Checks if the value is an iterable, excluding the types `str` and `bytes`.
359
360    Examples:
361        >>> is_iterable([1,2])
362        True
363        >>> is_iterable("test")
364        False
365
366    Args:
367        value: The value to check if it is an iterable.
368
369    Returns:
370        A `bool` value indicating if it is an iterable.
371    """
372    from sqlglot import Expression
373
374    return hasattr(value, "__iter__") and not isinstance(value, (str, bytes, Expression))
375
376
377def flatten(values: t.Iterable[t.Iterable[t.Any] | t.Any]) -> t.Iterator[t.Any]:
378    """
379    Flattens an iterable that can contain both iterable and non-iterable elements. Objects of
380    type `str` and `bytes` are not regarded as iterables.
381
382    Examples:
383        >>> list(flatten([[1, 2], 3, {4}, (5, "bla")]))
384        [1, 2, 3, 4, 5, 'bla']
385        >>> list(flatten([1, 2, 3]))
386        [1, 2, 3]
387
388    Args:
389        values: The value to be flattened.
390
391    Yields:
392        Non-iterable elements in `values`.
393    """
394    for value in values:
395        if is_iterable(value):
396            yield from flatten(value)
397        else:
398            yield value
399
400
401def dict_depth(d: t.Dict) -> int:
402    """
403    Get the nesting depth of a dictionary.
404
405    Example:
406        >>> dict_depth(None)
407        0
408        >>> dict_depth({})
409        1
410        >>> dict_depth({"a": "b"})
411        1
412        >>> dict_depth({"a": {}})
413        2
414        >>> dict_depth({"a": {"b": {}}})
415        3
416    """
417    try:
418        return 1 + dict_depth(next(iter(d.values())))
419    except AttributeError:
420        # d doesn't have attribute "values"
421        return 0
422    except StopIteration:
423        # d.values() returns an empty sequence
424        return 1
425
426
427def first(it: t.Iterable[T]) -> T:
428    """Returns the first element from an iterable (useful for sets)."""
429    return next(i for i in it)
430
431
432def to_bool(value: t.Optional[str | bool]) -> t.Optional[str | bool]:
433    if isinstance(value, bool) or value is None:
434        return value
435
436    # Coerce the value to boolean if it matches to the truthy/falsy values below
437    value_lower = value.lower()
438    if value_lower in ("true", "1"):
439        return True
440    if value_lower in ("false", "0"):
441        return False
442
443    return value
444
445
446def merge_ranges(ranges: t.List[t.Tuple[A, A]]) -> t.List[t.Tuple[A, A]]:
447    """
448    Merges a sequence of ranges, represented as tuples (low, high) whose values
449    belong to some totally-ordered set.
450
451    Example:
452        >>> merge_ranges([(1, 3), (2, 6)])
453        [(1, 6)]
454    """
455    if not ranges:
456        return []
457
458    ranges = sorted(ranges)
459
460    merged = [ranges[0]]
461
462    for start, end in ranges[1:]:
463        last_start, last_end = merged[-1]
464
465        if start <= last_end:
466            merged[-1] = (last_start, max(last_end, end))
467        else:
468            merged.append((start, end))
469
470    return merged
471
472
473def is_iso_date(text: str) -> bool:
474    try:
475        datetime.date.fromisoformat(text)
476        return True
477    except ValueError:
478        return False
479
480
481def is_iso_datetime(text: str) -> bool:
482    try:
483        datetime.datetime.fromisoformat(text)
484        return True
485    except ValueError:
486        return False
487
488
489# Interval units that operate on date components
490DATE_UNITS = {"day", "week", "month", "quarter", "year", "year_month"}
491
492
493def is_date_unit(expression: t.Optional[exp.Expression]) -> bool:
494    return expression is not None and expression.name.lower() in DATE_UNITS
495
496
497K = t.TypeVar("K")
498V = t.TypeVar("V")
499
500
501class SingleValuedMapping(t.Mapping[K, V]):
502    """
503    Mapping where all keys return the same value.
504
505    This rigamarole is meant to avoid copying keys, which was originally intended
506    as an optimization while qualifying columns for tables with lots of columns.
507    """
508
509    def __init__(self, keys: t.Collection[K], value: V):
510        self._keys = keys if isinstance(keys, Set) else set(keys)
511        self._value = value
512
513    def __getitem__(self, key: K) -> V:
514        if key in self._keys:
515            return self._value
516        raise KeyError(key)
517
518    def __len__(self) -> int:
519        return len(self._keys)
520
521    def __iter__(self) -> t.Iterator[K]:
522        return iter(self._keys)
CAMEL_CASE_PATTERN = re.compile('(?<!^)(?=[A-Z])')
PYTHON_VERSION = (3, 10)
logger = <Logger sqlglot (WARNING)>
class AutoName(enum.Enum):
28class AutoName(Enum):
29    """
30    This is used for creating Enum classes where `auto()` is the string form
31    of the corresponding enum's identifier (e.g. FOO.value results in "FOO").
32
33    Reference: https://docs.python.org/3/howto/enum.html#using-automatic-values
34    """
35
36    def _generate_next_value_(name, _start, _count, _last_values):
37        return name

This is used for creating Enum classes where auto() is the string form of the corresponding enum's identifier (e.g. FOO.value results in "FOO").

Reference: https://docs.python.org/3/howto/enum.html#using-automatic-values

class classproperty(builtins.property):
40class classproperty(property):
41    """
42    Similar to a normal property but works for class methods
43    """
44
45    def __get__(self, obj: t.Any, owner: t.Any = None) -> t.Any:
46        return classmethod(self.fget).__get__(None, owner)()  # type: ignore

Similar to a normal property but works for class methods

def suggest_closest_match_and_fail(kind: str, word: str, possibilities: Iterable[str]) -> None:
49def suggest_closest_match_and_fail(
50    kind: str,
51    word: str,
52    possibilities: t.Iterable[str],
53) -> None:
54    close_matches = get_close_matches(word, possibilities, n=1)
55
56    similar = seq_get(close_matches, 0) or ""
57    if similar:
58        similar = f" Did you mean {similar}?"
59
60    raise ValueError(f"Unknown {kind} '{word}'.{similar}")
def seq_get(seq: Sequence[~T], index: int) -> Optional[~T]:
63def seq_get(seq: t.Sequence[T], index: int) -> t.Optional[T]:
64    """Returns the value in `seq` at position `index`, or `None` if `index` is out of bounds."""
65    try:
66        return seq[index]
67    except IndexError:
68        return None

Returns the value in seq at position index, or None if index is out of bounds.

def ensure_list(value):
83def ensure_list(value):
84    """
85    Ensures that a value is a list, otherwise casts or wraps it into one.
86
87    Args:
88        value: The value of interest.
89
90    Returns:
91        The value cast as a list if it's a list or a tuple, or else the value wrapped in a list.
92    """
93    if value is None:
94        return []
95    if isinstance(value, (list, tuple)):
96        return list(value)
97
98    return [value]

Ensures that a value is a list, otherwise casts or wraps it into one.

Arguments:
  • value: The value of interest.
Returns:

The value cast as a list if it's a list or a tuple, or else the value wrapped in a list.

def ensure_collection(value):
109def ensure_collection(value):
110    """
111    Ensures that a value is a collection (excluding `str` and `bytes`), otherwise wraps it into a list.
112
113    Args:
114        value: The value of interest.
115
116    Returns:
117        The value if it's a collection, or else the value wrapped in a list.
118    """
119    if value is None:
120        return []
121    return (
122        value if isinstance(value, Collection) and not isinstance(value, (str, bytes)) else [value]
123    )

Ensures that a value is a collection (excluding str and bytes), otherwise wraps it into a list.

Arguments:
  • value: The value of interest.
Returns:

The value if it's a collection, or else the value wrapped in a list.

def csv(*args: str, sep: str = ', ') -> str:
126def csv(*args: str, sep: str = ", ") -> str:
127    """
128    Formats any number of string arguments as CSV.
129
130    Args:
131        args: The string arguments to format.
132        sep: The argument separator.
133
134    Returns:
135        The arguments formatted as a CSV string.
136    """
137    return sep.join(arg for arg in args if arg)

Formats any number of string arguments as CSV.

Arguments:
  • args: The string arguments to format.
  • sep: The argument separator.
Returns:

The arguments formatted as a CSV string.

def subclasses( module_name: str, classes: Union[Type, Tuple[Type, ...]], exclude: Set[Type] = set()) -> List[Type]:
140def subclasses(
141    module_name: str,
142    classes: t.Type | t.Tuple[t.Type, ...],
143    exclude: t.Set[t.Type] = set(),
144) -> t.List[t.Type]:
145    """
146    Returns all subclasses for a collection of classes, possibly excluding some of them.
147
148    Args:
149        module_name: The name of the module to search for subclasses in.
150        classes: Class(es) we want to find the subclasses of.
151        exclude: Classes we want to exclude from the returned list.
152
153    Returns:
154        The target subclasses.
155    """
156    return [
157        obj
158        for _, obj in inspect.getmembers(
159            sys.modules[module_name],
160            lambda obj: inspect.isclass(obj) and issubclass(obj, classes) and obj not in exclude,
161        )
162    ]

Returns all subclasses for a collection of classes, possibly excluding some of them.

Arguments:
  • module_name: The name of the module to search for subclasses in.
  • classes: Class(es) we want to find the subclasses of.
  • exclude: Classes we want to exclude from the returned list.
Returns:

The target subclasses.

def apply_index_offset( this: sqlglot.expressions.Expression, expressions: List[~E], offset: int, dialect: Union[str, sqlglot.dialects.Dialect, Type[sqlglot.dialects.Dialect], NoneType] = None) -> List[~E]:
165def apply_index_offset(
166    this: exp.Expression,
167    expressions: t.List[E],
168    offset: int,
169    dialect: DialectType = None,
170) -> t.List[E]:
171    """
172    Applies an offset to a given integer literal expression.
173
174    Args:
175        this: The target of the index.
176        expressions: The expression the offset will be applied to, wrapped in a list.
177        offset: The offset that will be applied.
178        dialect: the dialect of interest.
179
180    Returns:
181        The original expression with the offset applied to it, wrapped in a list. If the provided
182        `expressions` argument contains more than one expression, it's returned unaffected.
183    """
184    if not offset or len(expressions) != 1:
185        return expressions
186
187    expression = expressions[0]
188
189    from sqlglot import exp
190    from sqlglot.optimizer.annotate_types import annotate_types
191    from sqlglot.optimizer.simplify import simplify
192
193    if not this.type:
194        annotate_types(this, dialect=dialect)
195
196    if t.cast(exp.DataType, this.type).this not in (
197        exp.DataType.Type.UNKNOWN,
198        exp.DataType.Type.ARRAY,
199    ):
200        return expressions
201
202    if not expression.type:
203        annotate_types(expression, dialect=dialect)
204
205    if t.cast(exp.DataType, expression.type).this in exp.DataType.INTEGER_TYPES:
206        logger.info("Applying array index offset (%s)", offset)
207        expression = simplify(expression + offset)
208        return [expression]
209
210    return expressions

Applies an offset to a given integer literal expression.

Arguments:
  • this: The target of the index.
  • expressions: The expression the offset will be applied to, wrapped in a list.
  • offset: The offset that will be applied.
  • dialect: the dialect of interest.
Returns:

The original expression with the offset applied to it, wrapped in a list. If the provided expressions argument contains more than one expression, it's returned unaffected.

def camel_to_snake_case(name: str) -> str:
213def camel_to_snake_case(name: str) -> str:
214    """Converts `name` from camelCase to snake_case and returns the result."""
215    return CAMEL_CASE_PATTERN.sub("_", name).upper()

Converts name from camelCase to snake_case and returns the result.

def while_changing( expression: sqlglot.expressions.Expression, func: Callable[[sqlglot.expressions.Expression], ~E]) -> ~E:
218def while_changing(expression: Expression, func: t.Callable[[Expression], E]) -> E:
219    """
220    Applies a transformation to a given expression until a fix point is reached.
221
222    Args:
223        expression: The expression to be transformed.
224        func: The transformation to be applied.
225
226    Returns:
227        The transformed expression.
228    """
229
230    while True:
231        start_hash = hash(expression)
232        expression = func(expression)
233        end_hash = hash(expression)
234
235        if start_hash == end_hash:
236            break
237
238    return expression

Applies a transformation to a given expression until a fix point is reached.

Arguments:
  • expression: The expression to be transformed.
  • func: The transformation to be applied.
Returns:

The transformed expression.

def tsort(dag: Dict[~T, Set[~T]]) -> List[~T]:
241def tsort(dag: t.Dict[T, t.Set[T]]) -> t.List[T]:
242    """
243    Sorts a given directed acyclic graph in topological order.
244
245    Args:
246        dag: The graph to be sorted.
247
248    Returns:
249        A list that contains all of the graph's nodes in topological order.
250    """
251    result = []
252
253    for node, deps in tuple(dag.items()):
254        for dep in deps:
255            if dep not in dag:
256                dag[dep] = set()
257
258    while dag:
259        current = {node for node, deps in dag.items() if not deps}
260
261        if not current:
262            raise ValueError("Cycle error")
263
264        for node in current:
265            dag.pop(node)
266
267        for deps in dag.values():
268            deps -= current
269
270        result.extend(sorted(current))  # type: ignore
271
272    return result

Sorts a given directed acyclic graph in topological order.

Arguments:
  • dag: The graph to be sorted.
Returns:

A list that contains all of the graph's nodes in topological order.

def find_new_name(taken: Collection[str], base: str) -> str:
275def find_new_name(taken: t.Collection[str], base: str) -> str:
276    """
277    Searches for a new name.
278
279    Args:
280        taken: A collection of taken names.
281        base: Base name to alter.
282
283    Returns:
284        The new, available name.
285    """
286    if base not in taken:
287        return base
288
289    i = 2
290    new = f"{base}_{i}"
291    while new in taken:
292        i += 1
293        new = f"{base}_{i}"
294
295    return new

Searches for a new name.

Arguments:
  • taken: A collection of taken names.
  • base: Base name to alter.
Returns:

The new, available name.

def is_int(text: str) -> bool:
298def is_int(text: str) -> bool:
299    return is_type(text, int)
def is_float(text: str) -> bool:
302def is_float(text: str) -> bool:
303    return is_type(text, float)
def is_type(text: str, target_type: Type) -> bool:
306def is_type(text: str, target_type: t.Type) -> bool:
307    try:
308        target_type(text)
309        return True
310    except ValueError:
311        return False
def name_sequence(prefix: str) -> Callable[[], str]:
314def name_sequence(prefix: str) -> t.Callable[[], str]:
315    """Returns a name generator given a prefix (e.g. a0, a1, a2, ... if the prefix is "a")."""
316    sequence = count()
317    return lambda: f"{prefix}{next(sequence)}"

Returns a name generator given a prefix (e.g. a0, a1, a2, ... if the prefix is "a").

def object_to_dict(obj: Any, **kwargs) -> Dict:
320def object_to_dict(obj: t.Any, **kwargs) -> t.Dict:
321    """Returns a dictionary created from an object's attributes."""
322    return {
323        **{k: v.copy() if hasattr(v, "copy") else copy(v) for k, v in vars(obj).items()},
324        **kwargs,
325    }

Returns a dictionary created from an object's attributes.

def split_num_words( value: str, sep: str, min_num_words: int, fill_from_start: bool = True) -> List[Optional[str]]:
328def split_num_words(
329    value: str, sep: str, min_num_words: int, fill_from_start: bool = True
330) -> t.List[t.Optional[str]]:
331    """
332    Perform a split on a value and return N words as a result with `None` used for words that don't exist.
333
334    Args:
335        value: The value to be split.
336        sep: The value to use to split on.
337        min_num_words: The minimum number of words that are going to be in the result.
338        fill_from_start: Indicates that if `None` values should be inserted at the start or end of the list.
339
340    Examples:
341        >>> split_num_words("db.table", ".", 3)
342        [None, 'db', 'table']
343        >>> split_num_words("db.table", ".", 3, fill_from_start=False)
344        ['db', 'table', None]
345        >>> split_num_words("db.table", ".", 1)
346        ['db', 'table']
347
348    Returns:
349        The list of words returned by `split`, possibly augmented by a number of `None` values.
350    """
351    words = value.split(sep)
352    if fill_from_start:
353        return [None] * (min_num_words - len(words)) + words
354    return words + [None] * (min_num_words - len(words))

Perform a split on a value and return N words as a result with None used for words that don't exist.

Arguments:
  • value: The value to be split.
  • sep: The value to use to split on.
  • min_num_words: The minimum number of words that are going to be in the result.
  • fill_from_start: Indicates that if None values should be inserted at the start or end of the list.
Examples:
>>> split_num_words("db.table", ".", 3)
[None, 'db', 'table']
>>> split_num_words("db.table", ".", 3, fill_from_start=False)
['db', 'table', None]
>>> split_num_words("db.table", ".", 1)
['db', 'table']
Returns:

The list of words returned by split, possibly augmented by a number of None values.

def is_iterable(value: Any) -> bool:
357def is_iterable(value: t.Any) -> bool:
358    """
359    Checks if the value is an iterable, excluding the types `str` and `bytes`.
360
361    Examples:
362        >>> is_iterable([1,2])
363        True
364        >>> is_iterable("test")
365        False
366
367    Args:
368        value: The value to check if it is an iterable.
369
370    Returns:
371        A `bool` value indicating if it is an iterable.
372    """
373    from sqlglot import Expression
374
375    return hasattr(value, "__iter__") and not isinstance(value, (str, bytes, Expression))

Checks if the value is an iterable, excluding the types str and bytes.

Examples:
>>> is_iterable([1,2])
True
>>> is_iterable("test")
False
Arguments:
  • value: The value to check if it is an iterable.
Returns:

A bool value indicating if it is an iterable.

def flatten(values: Iterable[Union[Iterable[Any], Any]]) -> Iterator[Any]:
378def flatten(values: t.Iterable[t.Iterable[t.Any] | t.Any]) -> t.Iterator[t.Any]:
379    """
380    Flattens an iterable that can contain both iterable and non-iterable elements. Objects of
381    type `str` and `bytes` are not regarded as iterables.
382
383    Examples:
384        >>> list(flatten([[1, 2], 3, {4}, (5, "bla")]))
385        [1, 2, 3, 4, 5, 'bla']
386        >>> list(flatten([1, 2, 3]))
387        [1, 2, 3]
388
389    Args:
390        values: The value to be flattened.
391
392    Yields:
393        Non-iterable elements in `values`.
394    """
395    for value in values:
396        if is_iterable(value):
397            yield from flatten(value)
398        else:
399            yield value

Flattens an iterable that can contain both iterable and non-iterable elements. Objects of type str and bytes are not regarded as iterables.

Examples:
>>> list(flatten([[1, 2], 3, {4}, (5, "bla")]))
[1, 2, 3, 4, 5, 'bla']
>>> list(flatten([1, 2, 3]))
[1, 2, 3]
Arguments:
  • values: The value to be flattened.
Yields:

Non-iterable elements in values.

def dict_depth(d: Dict) -> int:
402def dict_depth(d: t.Dict) -> int:
403    """
404    Get the nesting depth of a dictionary.
405
406    Example:
407        >>> dict_depth(None)
408        0
409        >>> dict_depth({})
410        1
411        >>> dict_depth({"a": "b"})
412        1
413        >>> dict_depth({"a": {}})
414        2
415        >>> dict_depth({"a": {"b": {}}})
416        3
417    """
418    try:
419        return 1 + dict_depth(next(iter(d.values())))
420    except AttributeError:
421        # d doesn't have attribute "values"
422        return 0
423    except StopIteration:
424        # d.values() returns an empty sequence
425        return 1

Get the nesting depth of a dictionary.

Example:
>>> dict_depth(None)
0
>>> dict_depth({})
1
>>> dict_depth({"a": "b"})
1
>>> dict_depth({"a": {}})
2
>>> dict_depth({"a": {"b": {}}})
3
def first(it: Iterable[~T]) -> ~T:
428def first(it: t.Iterable[T]) -> T:
429    """Returns the first element from an iterable (useful for sets)."""
430    return next(i for i in it)

Returns the first element from an iterable (useful for sets).

def to_bool(value: Union[str, bool, NoneType]) -> Union[str, bool, NoneType]:
433def to_bool(value: t.Optional[str | bool]) -> t.Optional[str | bool]:
434    if isinstance(value, bool) or value is None:
435        return value
436
437    # Coerce the value to boolean if it matches to the truthy/falsy values below
438    value_lower = value.lower()
439    if value_lower in ("true", "1"):
440        return True
441    if value_lower in ("false", "0"):
442        return False
443
444    return value
def merge_ranges(ranges: List[Tuple[~A, ~A]]) -> List[Tuple[~A, ~A]]:
447def merge_ranges(ranges: t.List[t.Tuple[A, A]]) -> t.List[t.Tuple[A, A]]:
448    """
449    Merges a sequence of ranges, represented as tuples (low, high) whose values
450    belong to some totally-ordered set.
451
452    Example:
453        >>> merge_ranges([(1, 3), (2, 6)])
454        [(1, 6)]
455    """
456    if not ranges:
457        return []
458
459    ranges = sorted(ranges)
460
461    merged = [ranges[0]]
462
463    for start, end in ranges[1:]:
464        last_start, last_end = merged[-1]
465
466        if start <= last_end:
467            merged[-1] = (last_start, max(last_end, end))
468        else:
469            merged.append((start, end))
470
471    return merged

Merges a sequence of ranges, represented as tuples (low, high) whose values belong to some totally-ordered set.

Example:
>>> merge_ranges([(1, 3), (2, 6)])
[(1, 6)]
def is_iso_date(text: str) -> bool:
474def is_iso_date(text: str) -> bool:
475    try:
476        datetime.date.fromisoformat(text)
477        return True
478    except ValueError:
479        return False
def is_iso_datetime(text: str) -> bool:
482def is_iso_datetime(text: str) -> bool:
483    try:
484        datetime.datetime.fromisoformat(text)
485        return True
486    except ValueError:
487        return False
DATE_UNITS = {'month', 'week', 'year', 'day', 'quarter', 'year_month'}
def is_date_unit(expression: Optional[sqlglot.expressions.Expression]) -> bool:
494def is_date_unit(expression: t.Optional[exp.Expression]) -> bool:
495    return expression is not None and expression.name.lower() in DATE_UNITS
class SingleValuedMapping(typing.Mapping[~K, ~V]):
502class SingleValuedMapping(t.Mapping[K, V]):
503    """
504    Mapping where all keys return the same value.
505
506    This rigamarole is meant to avoid copying keys, which was originally intended
507    as an optimization while qualifying columns for tables with lots of columns.
508    """
509
510    def __init__(self, keys: t.Collection[K], value: V):
511        self._keys = keys if isinstance(keys, Set) else set(keys)
512        self._value = value
513
514    def __getitem__(self, key: K) -> V:
515        if key in self._keys:
516            return self._value
517        raise KeyError(key)
518
519    def __len__(self) -> int:
520        return len(self._keys)
521
522    def __iter__(self) -> t.Iterator[K]:
523        return iter(self._keys)

Mapping where all keys return the same value.

This rigamarole is meant to avoid copying keys, which was originally intended as an optimization while qualifying columns for tables with lots of columns.

SingleValuedMapping(keys: Collection[~K], value: ~V)
510    def __init__(self, keys: t.Collection[K], value: V):
511        self._keys = keys if isinstance(keys, Set) else set(keys)
512        self._value = value