Edit on GitHub

sqlglot.helper

  1from __future__ import annotations
  2
  3import datetime
  4import inspect
  5import logging
  6import re
  7import sys
  8import typing as t
  9from collections.abc import Collection, Set, Iterable, Sequence, Iterator, Mapping
 10from copy import copy
 11from difflib import get_close_matches
 12from enum import Enum
 13from itertools import count
 14from builtins import type as Type
 15
 16try:
 17    from mypy_extensions import mypyc_attr, trait, i64
 18except ImportError:
 19
 20    def mypyc_attr(*attrs: str, **kwattrs: object) -> t.Callable[[t.Any], t.Any]:  # type: ignore[misc]
 21        return lambda f: f
 22
 23    def trait(f: t.Any) -> t.Any:  # type: ignore[misc]
 24        return f
 25
 26    i64 = int  # type: ignore[misc,assignment]
 27
 28
 29T = t.TypeVar("T")
 30E = t.TypeVar("E")
 31
 32if t.TYPE_CHECKING:
 33    from sqlglot.expressions import Expr
 34
 35
 36CAMEL_CASE_PATTERN = re.compile("(?<!^)(?=[A-Z])")
 37PYTHON_VERSION = sys.version_info[:2]
 38logger = logging.getLogger("sqlglot")
 39
 40
 41class AutoName(Enum):
 42    """
 43    This is used for creating Enum classes where `auto()` is the string form
 44    of the corresponding enum's identifier (e.g. FOO.value results in "FOO").
 45
 46    Reference: https://docs.python.org/3/howto/enum.html#using-automatic-values
 47    """
 48
 49    def _generate_next_value_(name, _start, _count, _last_values):
 50        return name
 51
 52
 53def suggest_closest_match_and_fail(
 54    kind: str,
 55    word: str,
 56    possibilities: Iterable[str],
 57) -> None:
 58    close_matches = get_close_matches(word, possibilities, n=1)
 59
 60    similar = seq_get(close_matches, 0) or ""
 61    if similar:
 62        similar = f" Did you mean {similar}?"
 63
 64    raise ValueError(f"Unknown {kind} '{word}'.{similar}")
 65
 66
 67def seq_get(seq: Sequence[T], index: int) -> T | None:
 68    """Returns the value in `seq` at position `index`, or `None` if `index` is out of bounds."""
 69    try:
 70        return seq[index]
 71    except IndexError:
 72        return None
 73
 74
 75def ensure_list(value: T | list[T] | tuple[T, ...] | None) -> list[T]:
 76    """
 77    Ensures that a value is a list, otherwise casts or wraps it into one.
 78
 79    Args:
 80        value: The value of interest.
 81
 82    Returns:
 83        The value cast as a list if it's a list or a tuple, or else the value wrapped in a list.
 84    """
 85    if value is None:
 86        return []
 87    if isinstance(value, list):
 88        return value
 89    if isinstance(value, tuple):
 90        return list(value)
 91
 92    return [value]
 93
 94
 95@t.overload
 96def ensure_collection(value: Collection[T]) -> Collection[T]: ...
 97
 98
 99@t.overload
100def ensure_collection(value: T) -> Collection[T]: ...
101
102
103def ensure_collection(value):
104    """
105    Ensures that a value is a collection (excluding `str` and `bytes`), otherwise wraps it into a list.
106
107    Args:
108        value: The value of interest.
109
110    Returns:
111        The value if it's a collection, or else the value wrapped in a list.
112    """
113    if value is None:
114        return []
115    return (
116        value if isinstance(value, Collection) and not isinstance(value, (str, bytes)) else [value]
117    )
118
119
120def csv(*args: str, sep: str = ", ") -> str:
121    """
122    Formats any number of string arguments as CSV.
123
124    Args:
125        args: The string arguments to format.
126        sep: The argument separator.
127
128    Returns:
129        The arguments formatted as a CSV string.
130    """
131    return sep.join(arg for arg in args if arg)
132
133
134def subclasses(
135    module_name: str,
136    classes: Type[T] | tuple[Type[T], ...],
137    exclude: set[Type[T]] = set(),
138) -> list[Type[T]]:
139    """
140    Returns all subclasses for a collection of classes, possibly excluding some of them.
141
142    Args:
143        module_name: The name of the module to search for subclasses in.
144        classes: Class(es) we want to find the subclasses of.
145        exclude: Classes we want to exclude from the returned list.
146
147    Returns:
148        The target subclasses.
149    """
150    return [
151        obj
152        for _, obj in inspect.getmembers(
153            sys.modules[module_name],
154            lambda obj: inspect.isclass(obj) and issubclass(obj, classes) and obj not in exclude,
155        )
156    ]
157
158
159def camel_to_snake_case(name: str) -> str:
160    """Converts `name` from camelCase to snake_case and returns the result."""
161    return CAMEL_CASE_PATTERN.sub("_", name).upper()
162
163
164def while_changing(expression: E, func: t.Callable[[E], E]) -> E:
165    """
166    Applies a transformation to a given expression until a fix point is reached.
167
168    Args:
169        expression: The expression to be transformed.
170        func: The transformation to be applied.
171
172    Returns:
173        The transformed expression.
174    """
175
176    while True:
177        start_hash = hash(expression)
178        expression = func(expression)
179        end_hash = hash(expression)
180
181        if start_hash == end_hash:
182            break
183
184    return expression
185
186
187def tsort(dag: dict[T, set[T]]) -> list[T]:
188    """
189    Sorts a given directed acyclic graph in topological order.
190
191    Args:
192        dag: The graph to be sorted.
193
194    Returns:
195        A list that contains all of the graph's nodes in topological order.
196    """
197    result = []
198
199    for node, deps in tuple(dag.items()):
200        for dep in deps:
201            if dep not in dag:
202                dag[dep] = set()
203
204    while dag:
205        current = {node for node, deps in dag.items() if not deps}
206
207        if not current:
208            raise ValueError("Cycle error")
209
210        for node in current:
211            dag.pop(node)
212
213        for deps in dag.values():
214            deps -= current
215
216        result.extend(sorted(current))  # type: ignore
217
218    return result
219
220
221def find_new_name(taken: Collection[str], base: str) -> str:
222    """
223    Searches for a new name.
224
225    Args:
226        taken: A collection of taken names.
227        base: Base name to alter.
228
229    Returns:
230        The new, available name.
231    """
232    if base not in taken:
233        return base
234
235    i = 2
236    new = f"{base}_{i}"
237    while new in taken:
238        i += 1
239        new = f"{base}_{i}"
240
241    return new
242
243
244def is_int(text: str) -> bool:
245    return is_type(text, int)
246
247
248def is_float(text: str) -> bool:
249    return is_type(text, float)
250
251
252def is_type(text: str, target_type: Type) -> bool:
253    try:
254        target_type(text)
255        return True
256    except ValueError:
257        return False
258
259
260def name_sequence(prefix: str) -> t.Callable[[], str]:
261    """Returns a name generator given a prefix (e.g. a0, a1, a2, ... if the prefix is "a")."""
262    sequence = count()
263    return lambda: f"{prefix}{next(sequence)}"
264
265
266def object_to_dict(obj: t.Any, **kwargs) -> dict:
267    """Returns a dictionary created from an object's attributes."""
268    return {
269        **{k: v.copy() if hasattr(v, "copy") else copy(v) for k, v in vars(obj).items()},
270        **kwargs,
271    }
272
273
274def split_num_words(
275    value: str, sep: str, min_num_words: int, fill_from_start: bool = True
276) -> list[str | None]:
277    """
278    Perform a split on a value and return N words as a result with `None` used for words that don't exist.
279
280    Args:
281        value: The value to be split.
282        sep: The value to use to split on.
283        min_num_words: The minimum number of words that are going to be in the result.
284        fill_from_start: Indicates that if `None` values should be inserted at the start or end of the list.
285
286    Examples:
287        >>> split_num_words("db.table", ".", 3)
288        [None, 'db', 'table']
289        >>> split_num_words("db.table", ".", 3, fill_from_start=False)
290        ['db', 'table', None]
291        >>> split_num_words("db.table", ".", 1)
292        ['db', 'table']
293
294    Returns:
295        The list of words returned by `split`, possibly augmented by a number of `None` values.
296    """
297    words = value.split(sep)
298    if fill_from_start:
299        return [None] * (min_num_words - len(words)) + words
300    return words + [None] * (min_num_words - len(words))
301
302
303def is_iterable(value: t.Any) -> bool:
304    """
305    Checks if the value is an iterable, excluding the types `str` and `bytes`.
306
307    Examples:
308        >>> is_iterable([1,2])
309        True
310        >>> is_iterable("test")
311        False
312
313    Args:
314        value: The value to check if it is an iterable.
315
316    Returns:
317        A `bool` value indicating if it is an iterable.
318    """
319    from sqlglot.expressions import Expr
320
321    return hasattr(value, "__iter__") and not isinstance(value, (str, bytes, Expr))
322
323
324def flatten(values: Iterable[Iterable[t.Any] | t.Any]) -> Iterator[t.Any]:
325    """
326    Flattens an iterable that can contain both iterable and non-iterable elements. Objects of
327    type `str` and `bytes` are not regarded as iterables.
328
329    Examples:
330        >>> list(flatten([[1, 2], 3, {4}, (5, "bla")]))
331        [1, 2, 3, 4, 5, 'bla']
332        >>> list(flatten([1, 2, 3]))
333        [1, 2, 3]
334
335    Args:
336        values: The value to be flattened.
337
338    Yields:
339        Non-iterable elements in `values`.
340    """
341    for value in values:
342        if is_iterable(value):
343            yield from flatten(value)
344        else:
345            yield value
346
347
348def dict_depth(d: t.Any) -> int:
349    """
350    Get the nesting depth of a dictionary.
351
352    Example:
353        >>> dict_depth(None)
354        0
355        >>> dict_depth({})
356        1
357        >>> dict_depth({"a": "b"})
358        1
359        >>> dict_depth({"a": {}})
360        2
361        >>> dict_depth({"a": {"b": {}}})
362        3
363    """
364    try:
365        return 1 + dict_depth(next(iter(d.values())))
366    except AttributeError:
367        # d doesn't have attribute "values"
368        return 0
369    except StopIteration:
370        # d.values() returns an empty sequence
371        return 1
372
373
374def first(it: Iterable[T]) -> T:
375    """Returns the first element from an iterable (useful for sets)."""
376    return next(i for i in it)
377
378
379@t.overload
380def to_bool(value: None) -> None: ...
381
382
383@t.overload
384def to_bool(value: bool) -> bool: ...
385
386
387@t.overload
388def to_bool(value: str) -> str | bool: ...
389
390
391def to_bool(value: str | bool | None) -> str | bool | None:
392    if isinstance(value, bool) or value is None:
393        return value
394
395    # Coerce the value to boolean if it matches to the truthy/falsy values below
396    value_lower = value.lower()
397    if value_lower in ("true", "1"):
398        return True
399    if value_lower in ("false", "0"):
400        return False
401
402    return value
403
404
405def merge_ranges(ranges: list[tuple[t.Any, t.Any]]) -> list[tuple[t.Any, t.Any]]:
406    """
407    Merges a sequence of ranges, represented as tuples (low, high) whose values
408    belong to some totally-ordered set.
409
410    Example:
411        >>> merge_ranges([(1, 3), (2, 6)])
412        [(1, 6)]
413    """
414    if not ranges:
415        return []
416
417    ranges = sorted(ranges)
418
419    merged = [ranges[0]]
420
421    for start, end in ranges[1:]:
422        last_start, last_end = merged[-1]
423
424        if start <= last_end:
425            merged[-1] = (last_start, max(last_end, end))
426        else:
427            merged.append((start, end))
428
429    return merged
430
431
432def is_iso_date(text: str) -> bool:
433    try:
434        datetime.date.fromisoformat(text)
435        return True
436    except ValueError:
437        return False
438
439
440def is_iso_datetime(text: str) -> bool:
441    try:
442        datetime.datetime.fromisoformat(text)
443        return True
444    except ValueError:
445        return False
446
447
448# Interval units that operate on date components
449DATE_UNITS = {"day", "week", "month", "quarter", "year", "year_month"}
450
451
452def is_date_unit(expression: Expr | None) -> bool:
453    return expression is not None and expression.name.lower() in DATE_UNITS
454
455
456K = t.TypeVar("K")
457V = t.TypeVar("V")
458
459
460class SingleValuedMapping(Mapping[K, V]):
461    """
462    Mapping where all keys return the same value.
463
464    This rigamarole is meant to avoid copying keys, which was originally intended
465    as an optimization while qualifying columns for tables with lots of columns.
466    """
467
468    def __init__(self, keys: Collection[K], value: V):
469        self._keys = keys if isinstance(keys, Set) else set(keys)
470        self._value = value
471
472    def __getitem__(self, key: K) -> V:
473        if key in self._keys:
474            return self._value
475        raise KeyError(key)
476
477    def __len__(self) -> int:
478        return len(self._keys)
479
480    def __iter__(self) -> Iterator[K]:
481        return iter(self._keys)
CAMEL_CASE_PATTERN = re.compile('(?<!^)(?=[A-Z])')
PYTHON_VERSION = (3, 10)
logger = <Logger sqlglot (WARNING)>
class AutoName(enum.Enum):
42class AutoName(Enum):
43    """
44    This is used for creating Enum classes where `auto()` is the string form
45    of the corresponding enum's identifier (e.g. FOO.value results in "FOO").
46
47    Reference: https://docs.python.org/3/howto/enum.html#using-automatic-values
48    """
49
50    def _generate_next_value_(name, _start, _count, _last_values):
51        return name

This is used for creating Enum classes where auto() is the string form of the corresponding enum's identifier (e.g. FOO.value results in "FOO").

Reference: https://docs.python.org/3/howto/enum.html#using-automatic-values

def suggest_closest_match_and_fail(kind: str, word: str, possibilities: Iterable[str]) -> None:
54def suggest_closest_match_and_fail(
55    kind: str,
56    word: str,
57    possibilities: Iterable[str],
58) -> None:
59    close_matches = get_close_matches(word, possibilities, n=1)
60
61    similar = seq_get(close_matches, 0) or ""
62    if similar:
63        similar = f" Did you mean {similar}?"
64
65    raise ValueError(f"Unknown {kind} '{word}'.{similar}")
def seq_get(seq: Sequence[~T], index: int) -> Optional[~T]:
68def seq_get(seq: Sequence[T], index: int) -> T | None:
69    """Returns the value in `seq` at position `index`, or `None` if `index` is out of bounds."""
70    try:
71        return seq[index]
72    except IndexError:
73        return None

Returns the value in seq at position index, or None if index is out of bounds.

def ensure_list(value: Union[~T, list[~T], tuple[~T, ...], NoneType]) -> list[~T]:
76def ensure_list(value: T | list[T] | tuple[T, ...] | None) -> list[T]:
77    """
78    Ensures that a value is a list, otherwise casts or wraps it into one.
79
80    Args:
81        value: The value of interest.
82
83    Returns:
84        The value cast as a list if it's a list or a tuple, or else the value wrapped in a list.
85    """
86    if value is None:
87        return []
88    if isinstance(value, list):
89        return value
90    if isinstance(value, tuple):
91        return list(value)
92
93    return [value]

Ensures that a value is a list, otherwise casts or wraps it into one.

Arguments:
  • value: The value of interest.
Returns:

The value cast as a list if it's a list or a tuple, or else the value wrapped in a list.

def ensure_collection(value):
104def ensure_collection(value):
105    """
106    Ensures that a value is a collection (excluding `str` and `bytes`), otherwise wraps it into a list.
107
108    Args:
109        value: The value of interest.
110
111    Returns:
112        The value if it's a collection, or else the value wrapped in a list.
113    """
114    if value is None:
115        return []
116    return (
117        value if isinstance(value, Collection) and not isinstance(value, (str, bytes)) else [value]
118    )

Ensures that a value is a collection (excluding str and bytes), otherwise wraps it into a list.

Arguments:
  • value: The value of interest.
Returns:

The value if it's a collection, or else the value wrapped in a list.

def csv(*args: str, sep: str = ', ') -> str:
121def csv(*args: str, sep: str = ", ") -> str:
122    """
123    Formats any number of string arguments as CSV.
124
125    Args:
126        args: The string arguments to format.
127        sep: The argument separator.
128
129    Returns:
130        The arguments formatted as a CSV string.
131    """
132    return sep.join(arg for arg in args if arg)

Formats any number of string arguments as CSV.

Arguments:
  • args: The string arguments to format.
  • sep: The argument separator.
Returns:

The arguments formatted as a CSV string.

def subclasses( module_name: str, classes: type[~T] | tuple[type[~T], ...], exclude: set[type[~T]] = set()) -> list[type[~T]]:
135def subclasses(
136    module_name: str,
137    classes: Type[T] | tuple[Type[T], ...],
138    exclude: set[Type[T]] = set(),
139) -> list[Type[T]]:
140    """
141    Returns all subclasses for a collection of classes, possibly excluding some of them.
142
143    Args:
144        module_name: The name of the module to search for subclasses in.
145        classes: Class(es) we want to find the subclasses of.
146        exclude: Classes we want to exclude from the returned list.
147
148    Returns:
149        The target subclasses.
150    """
151    return [
152        obj
153        for _, obj in inspect.getmembers(
154            sys.modules[module_name],
155            lambda obj: inspect.isclass(obj) and issubclass(obj, classes) and obj not in exclude,
156        )
157    ]

Returns all subclasses for a collection of classes, possibly excluding some of them.

Arguments:
  • module_name: The name of the module to search for subclasses in.
  • classes: Class(es) we want to find the subclasses of.
  • exclude: Classes we want to exclude from the returned list.
Returns:

The target subclasses.

def camel_to_snake_case(name: str) -> str:
160def camel_to_snake_case(name: str) -> str:
161    """Converts `name` from camelCase to snake_case and returns the result."""
162    return CAMEL_CASE_PATTERN.sub("_", name).upper()

Converts name from camelCase to snake_case and returns the result.

def while_changing(expression: ~E, func: Callable[[~E], ~E]) -> ~E:
165def while_changing(expression: E, func: t.Callable[[E], E]) -> E:
166    """
167    Applies a transformation to a given expression until a fix point is reached.
168
169    Args:
170        expression: The expression to be transformed.
171        func: The transformation to be applied.
172
173    Returns:
174        The transformed expression.
175    """
176
177    while True:
178        start_hash = hash(expression)
179        expression = func(expression)
180        end_hash = hash(expression)
181
182        if start_hash == end_hash:
183            break
184
185    return expression

Applies a transformation to a given expression until a fix point is reached.

Arguments:
  • expression: The expression to be transformed.
  • func: The transformation to be applied.
Returns:

The transformed expression.

def tsort(dag: dict[~T, set[~T]]) -> list[~T]:
188def tsort(dag: dict[T, set[T]]) -> list[T]:
189    """
190    Sorts a given directed acyclic graph in topological order.
191
192    Args:
193        dag: The graph to be sorted.
194
195    Returns:
196        A list that contains all of the graph's nodes in topological order.
197    """
198    result = []
199
200    for node, deps in tuple(dag.items()):
201        for dep in deps:
202            if dep not in dag:
203                dag[dep] = set()
204
205    while dag:
206        current = {node for node, deps in dag.items() if not deps}
207
208        if not current:
209            raise ValueError("Cycle error")
210
211        for node in current:
212            dag.pop(node)
213
214        for deps in dag.values():
215            deps -= current
216
217        result.extend(sorted(current))  # type: ignore
218
219    return result

Sorts a given directed acyclic graph in topological order.

Arguments:
  • dag: The graph to be sorted.
Returns:

A list that contains all of the graph's nodes in topological order.

def find_new_name(taken: Collection[str], base: str) -> str:
222def find_new_name(taken: Collection[str], base: str) -> str:
223    """
224    Searches for a new name.
225
226    Args:
227        taken: A collection of taken names.
228        base: Base name to alter.
229
230    Returns:
231        The new, available name.
232    """
233    if base not in taken:
234        return base
235
236    i = 2
237    new = f"{base}_{i}"
238    while new in taken:
239        i += 1
240        new = f"{base}_{i}"
241
242    return new

Searches for a new name.

Arguments:
  • taken: A collection of taken names.
  • base: Base name to alter.
Returns:

The new, available name.

def is_int(text: str) -> bool:
245def is_int(text: str) -> bool:
246    return is_type(text, int)
def is_float(text: str) -> bool:
249def is_float(text: str) -> bool:
250    return is_type(text, float)
def is_type(text: str, target_type: type) -> bool:
253def is_type(text: str, target_type: Type) -> bool:
254    try:
255        target_type(text)
256        return True
257    except ValueError:
258        return False
def name_sequence(prefix: str) -> Callable[[], str]:
261def name_sequence(prefix: str) -> t.Callable[[], str]:
262    """Returns a name generator given a prefix (e.g. a0, a1, a2, ... if the prefix is "a")."""
263    sequence = count()
264    return lambda: f"{prefix}{next(sequence)}"

Returns a name generator given a prefix (e.g. a0, a1, a2, ... if the prefix is "a").

def object_to_dict(obj: Any, **kwargs) -> dict:
267def object_to_dict(obj: t.Any, **kwargs) -> dict:
268    """Returns a dictionary created from an object's attributes."""
269    return {
270        **{k: v.copy() if hasattr(v, "copy") else copy(v) for k, v in vars(obj).items()},
271        **kwargs,
272    }

Returns a dictionary created from an object's attributes.

def split_num_words( value: str, sep: str, min_num_words: int, fill_from_start: bool = True) -> list[str | None]:
275def split_num_words(
276    value: str, sep: str, min_num_words: int, fill_from_start: bool = True
277) -> list[str | None]:
278    """
279    Perform a split on a value and return N words as a result with `None` used for words that don't exist.
280
281    Args:
282        value: The value to be split.
283        sep: The value to use to split on.
284        min_num_words: The minimum number of words that are going to be in the result.
285        fill_from_start: Indicates that if `None` values should be inserted at the start or end of the list.
286
287    Examples:
288        >>> split_num_words("db.table", ".", 3)
289        [None, 'db', 'table']
290        >>> split_num_words("db.table", ".", 3, fill_from_start=False)
291        ['db', 'table', None]
292        >>> split_num_words("db.table", ".", 1)
293        ['db', 'table']
294
295    Returns:
296        The list of words returned by `split`, possibly augmented by a number of `None` values.
297    """
298    words = value.split(sep)
299    if fill_from_start:
300        return [None] * (min_num_words - len(words)) + words
301    return words + [None] * (min_num_words - len(words))

Perform a split on a value and return N words as a result with None used for words that don't exist.

Arguments:
  • value: The value to be split.
  • sep: The value to use to split on.
  • min_num_words: The minimum number of words that are going to be in the result.
  • fill_from_start: Indicates that if None values should be inserted at the start or end of the list.
Examples:
>>> split_num_words("db.table", ".", 3)
[None, 'db', 'table']
>>> split_num_words("db.table", ".", 3, fill_from_start=False)
['db', 'table', None]
>>> split_num_words("db.table", ".", 1)
['db', 'table']
Returns:

The list of words returned by split, possibly augmented by a number of None values.

def is_iterable(value: Any) -> bool:
304def is_iterable(value: t.Any) -> bool:
305    """
306    Checks if the value is an iterable, excluding the types `str` and `bytes`.
307
308    Examples:
309        >>> is_iterable([1,2])
310        True
311        >>> is_iterable("test")
312        False
313
314    Args:
315        value: The value to check if it is an iterable.
316
317    Returns:
318        A `bool` value indicating if it is an iterable.
319    """
320    from sqlglot.expressions import Expr
321
322    return hasattr(value, "__iter__") and not isinstance(value, (str, bytes, Expr))

Checks if the value is an iterable, excluding the types str and bytes.

Examples:
>>> is_iterable([1,2])
True
>>> is_iterable("test")
False
Arguments:
  • value: The value to check if it is an iterable.
Returns:

A bool value indicating if it is an iterable.

def flatten( values: Iterable[typing.Union[Iterable[typing.Any], typing.Any]]) -> Iterator[typing.Any]:
325def flatten(values: Iterable[Iterable[t.Any] | t.Any]) -> Iterator[t.Any]:
326    """
327    Flattens an iterable that can contain both iterable and non-iterable elements. Objects of
328    type `str` and `bytes` are not regarded as iterables.
329
330    Examples:
331        >>> list(flatten([[1, 2], 3, {4}, (5, "bla")]))
332        [1, 2, 3, 4, 5, 'bla']
333        >>> list(flatten([1, 2, 3]))
334        [1, 2, 3]
335
336    Args:
337        values: The value to be flattened.
338
339    Yields:
340        Non-iterable elements in `values`.
341    """
342    for value in values:
343        if is_iterable(value):
344            yield from flatten(value)
345        else:
346            yield value

Flattens an iterable that can contain both iterable and non-iterable elements. Objects of type str and bytes are not regarded as iterables.

Examples:
>>> list(flatten([[1, 2], 3, {4}, (5, "bla")]))
[1, 2, 3, 4, 5, 'bla']
>>> list(flatten([1, 2, 3]))
[1, 2, 3]
Arguments:
  • values: The value to be flattened.
Yields:

Non-iterable elements in values.

def dict_depth(d: Any) -> int:
349def dict_depth(d: t.Any) -> int:
350    """
351    Get the nesting depth of a dictionary.
352
353    Example:
354        >>> dict_depth(None)
355        0
356        >>> dict_depth({})
357        1
358        >>> dict_depth({"a": "b"})
359        1
360        >>> dict_depth({"a": {}})
361        2
362        >>> dict_depth({"a": {"b": {}}})
363        3
364    """
365    try:
366        return 1 + dict_depth(next(iter(d.values())))
367    except AttributeError:
368        # d doesn't have attribute "values"
369        return 0
370    except StopIteration:
371        # d.values() returns an empty sequence
372        return 1

Get the nesting depth of a dictionary.

Example:
>>> dict_depth(None)
0
>>> dict_depth({})
1
>>> dict_depth({"a": "b"})
1
>>> dict_depth({"a": {}})
2
>>> dict_depth({"a": {"b": {}}})
3
def first(it: Iterable[~T]) -> ~T:
375def first(it: Iterable[T]) -> T:
376    """Returns the first element from an iterable (useful for sets)."""
377    return next(i for i in it)

Returns the first element from an iterable (useful for sets).

def to_bool(value: str | bool | None) -> str | bool | None:
392def to_bool(value: str | bool | None) -> str | bool | None:
393    if isinstance(value, bool) or value is None:
394        return value
395
396    # Coerce the value to boolean if it matches to the truthy/falsy values below
397    value_lower = value.lower()
398    if value_lower in ("true", "1"):
399        return True
400    if value_lower in ("false", "0"):
401        return False
402
403    return value
def merge_ranges( ranges: list[tuple[typing.Any, typing.Any]]) -> list[tuple[typing.Any, typing.Any]]:
406def merge_ranges(ranges: list[tuple[t.Any, t.Any]]) -> list[tuple[t.Any, t.Any]]:
407    """
408    Merges a sequence of ranges, represented as tuples (low, high) whose values
409    belong to some totally-ordered set.
410
411    Example:
412        >>> merge_ranges([(1, 3), (2, 6)])
413        [(1, 6)]
414    """
415    if not ranges:
416        return []
417
418    ranges = sorted(ranges)
419
420    merged = [ranges[0]]
421
422    for start, end in ranges[1:]:
423        last_start, last_end = merged[-1]
424
425        if start <= last_end:
426            merged[-1] = (last_start, max(last_end, end))
427        else:
428            merged.append((start, end))
429
430    return merged

Merges a sequence of ranges, represented as tuples (low, high) whose values belong to some totally-ordered set.

Example:
>>> merge_ranges([(1, 3), (2, 6)])
[(1, 6)]
def is_iso_date(text: str) -> bool:
433def is_iso_date(text: str) -> bool:
434    try:
435        datetime.date.fromisoformat(text)
436        return True
437    except ValueError:
438        return False
def is_iso_datetime(text: str) -> bool:
441def is_iso_datetime(text: str) -> bool:
442    try:
443        datetime.datetime.fromisoformat(text)
444        return True
445    except ValueError:
446        return False
DATE_UNITS = {'year', 'day', 'month', 'year_month', 'quarter', 'week'}
def is_date_unit(expression: sqlglot.expressions.core.Expr | None) -> bool:
453def is_date_unit(expression: Expr | None) -> bool:
454    return expression is not None and expression.name.lower() in DATE_UNITS
class SingleValuedMapping(collections.abc.Mapping[~K, ~V]):
461class SingleValuedMapping(Mapping[K, V]):
462    """
463    Mapping where all keys return the same value.
464
465    This rigamarole is meant to avoid copying keys, which was originally intended
466    as an optimization while qualifying columns for tables with lots of columns.
467    """
468
469    def __init__(self, keys: Collection[K], value: V):
470        self._keys = keys if isinstance(keys, Set) else set(keys)
471        self._value = value
472
473    def __getitem__(self, key: K) -> V:
474        if key in self._keys:
475            return self._value
476        raise KeyError(key)
477
478    def __len__(self) -> int:
479        return len(self._keys)
480
481    def __iter__(self) -> Iterator[K]:
482        return iter(self._keys)

Mapping where all keys return the same value.

This rigamarole is meant to avoid copying keys, which was originally intended as an optimization while qualifying columns for tables with lots of columns.

SingleValuedMapping(keys: Collection[~K], value: ~V)
469    def __init__(self, keys: Collection[K], value: V):
470        self._keys = keys if isinstance(keys, Set) else set(keys)
471        self._value = value