Edit on GitHub

sqlglot.helper

  1from __future__ import annotations
  2
  3import datetime
  4import inspect
  5import logging
  6import re
  7import sys
  8import typing as t
  9from collections.abc import Collection, Set
 10from copy import copy
 11from difflib import get_close_matches
 12from enum import Enum
 13from itertools import count
 14
 15try:
 16    from mypy_extensions import mypyc_attr, trait
 17except ImportError:
 18
 19    def mypyc_attr(*attrs: str, **kwattrs: object) -> t.Callable[[t.Any], t.Any]:  # type: ignore[misc]
 20        return lambda f: f
 21
 22    def trait(f: t.Any) -> t.Any:  # type: ignore[misc]
 23        return f
 24
 25
 26T = t.TypeVar("T")
 27E = t.TypeVar("E")
 28
 29if t.TYPE_CHECKING:
 30    from sqlglot.expression_core import ExpressionCore
 31
 32
 33CAMEL_CASE_PATTERN = re.compile("(?<!^)(?=[A-Z])")
 34PYTHON_VERSION = sys.version_info[:2]
 35logger = logging.getLogger("sqlglot")
 36
 37
 38class AutoName(Enum):
 39    """
 40    This is used for creating Enum classes where `auto()` is the string form
 41    of the corresponding enum's identifier (e.g. FOO.value results in "FOO").
 42
 43    Reference: https://docs.python.org/3/howto/enum.html#using-automatic-values
 44    """
 45
 46    def _generate_next_value_(name, _start, _count, _last_values):
 47        return name
 48
 49
 50def suggest_closest_match_and_fail(
 51    kind: str,
 52    word: str,
 53    possibilities: t.Iterable[str],
 54) -> None:
 55    close_matches = get_close_matches(word, possibilities, n=1)
 56
 57    similar = seq_get(close_matches, 0) or ""
 58    if similar:
 59        similar = f" Did you mean {similar}?"
 60
 61    raise ValueError(f"Unknown {kind} '{word}'.{similar}")
 62
 63
 64def seq_get(seq: t.Sequence[T], index: int) -> t.Optional[T]:
 65    """Returns the value in `seq` at position `index`, or `None` if `index` is out of bounds."""
 66    try:
 67        return seq[index]
 68    except IndexError:
 69        return None
 70
 71
 72@t.overload
 73def ensure_list(value: t.Collection[T]) -> t.List[T]: ...
 74
 75
 76@t.overload
 77def ensure_list(value: None) -> t.List: ...
 78
 79
 80@t.overload
 81def ensure_list(value: T) -> t.List[T]: ...
 82
 83
 84def ensure_list(value):
 85    """
 86    Ensures that a value is a list, otherwise casts or wraps it into one.
 87
 88    Args:
 89        value: The value of interest.
 90
 91    Returns:
 92        The value cast as a list if it's a list or a tuple, or else the value wrapped in a list.
 93    """
 94    if value is None:
 95        return []
 96    if isinstance(value, (list, tuple)):
 97        return list(value)
 98
 99    return [value]
100
101
102@t.overload
103def ensure_collection(value: t.Collection[T]) -> t.Collection[T]: ...
104
105
106@t.overload
107def ensure_collection(value: T) -> t.Collection[T]: ...
108
109
110def ensure_collection(value):
111    """
112    Ensures that a value is a collection (excluding `str` and `bytes`), otherwise wraps it into a list.
113
114    Args:
115        value: The value of interest.
116
117    Returns:
118        The value if it's a collection, or else the value wrapped in a list.
119    """
120    if value is None:
121        return []
122    return (
123        value if isinstance(value, Collection) and not isinstance(value, (str, bytes)) else [value]
124    )
125
126
127def csv(*args: str, sep: str = ", ") -> str:
128    """
129    Formats any number of string arguments as CSV.
130
131    Args:
132        args: The string arguments to format.
133        sep: The argument separator.
134
135    Returns:
136        The arguments formatted as a CSV string.
137    """
138    return sep.join(arg for arg in args if arg)
139
140
141def subclasses(
142    module_name: str,
143    classes: t.Type | t.Tuple[t.Type, ...],
144    exclude: t.Set[t.Type] = set(),
145) -> t.List[t.Type]:
146    """
147    Returns all subclasses for a collection of classes, possibly excluding some of them.
148
149    Args:
150        module_name: The name of the module to search for subclasses in.
151        classes: Class(es) we want to find the subclasses of.
152        exclude: Classes we want to exclude from the returned list.
153
154    Returns:
155        The target subclasses.
156    """
157    return [
158        obj
159        for _, obj in inspect.getmembers(
160            sys.modules[module_name],
161            lambda obj: inspect.isclass(obj) and issubclass(obj, classes) and obj not in exclude,
162        )
163    ]
164
165
166def camel_to_snake_case(name: str) -> str:
167    """Converts `name` from camelCase to snake_case and returns the result."""
168    return CAMEL_CASE_PATTERN.sub("_", name).upper()
169
170
171def while_changing(expression: E, func: t.Callable[[E], E]) -> E:
172    """
173    Applies a transformation to a given expression until a fix point is reached.
174
175    Args:
176        expression: The expression to be transformed.
177        func: The transformation to be applied.
178
179    Returns:
180        The transformed expression.
181    """
182
183    while True:
184        start_hash = hash(expression)
185        expression = func(expression)
186        end_hash = hash(expression)
187
188        if start_hash == end_hash:
189            break
190
191    return expression
192
193
194def tsort(dag: t.Dict[T, t.Set[T]]) -> t.List[T]:
195    """
196    Sorts a given directed acyclic graph in topological order.
197
198    Args:
199        dag: The graph to be sorted.
200
201    Returns:
202        A list that contains all of the graph's nodes in topological order.
203    """
204    result = []
205
206    for node, deps in tuple(dag.items()):
207        for dep in deps:
208            if dep not in dag:
209                dag[dep] = set()
210
211    while dag:
212        current = {node for node, deps in dag.items() if not deps}
213
214        if not current:
215            raise ValueError("Cycle error")
216
217        for node in current:
218            dag.pop(node)
219
220        for deps in dag.values():
221            deps -= current
222
223        result.extend(sorted(current))  # type: ignore
224
225    return result
226
227
228def find_new_name(taken: t.Collection[str], base: str) -> str:
229    """
230    Searches for a new name.
231
232    Args:
233        taken: A collection of taken names.
234        base: Base name to alter.
235
236    Returns:
237        The new, available name.
238    """
239    if base not in taken:
240        return base
241
242    i = 2
243    new = f"{base}_{i}"
244    while new in taken:
245        i += 1
246        new = f"{base}_{i}"
247
248    return new
249
250
251def is_int(text: str) -> bool:
252    return is_type(text, int)
253
254
255def is_float(text: str) -> bool:
256    return is_type(text, float)
257
258
259def is_type(text: str, target_type: t.Type) -> bool:
260    try:
261        target_type(text)
262        return True
263    except ValueError:
264        return False
265
266
267def name_sequence(prefix: str) -> t.Callable[[], str]:
268    """Returns a name generator given a prefix (e.g. a0, a1, a2, ... if the prefix is "a")."""
269    sequence = count()
270    return lambda: f"{prefix}{next(sequence)}"
271
272
273def object_to_dict(obj: t.Any, **kwargs) -> t.Dict:
274    """Returns a dictionary created from an object's attributes."""
275    return {
276        **{k: v.copy() if hasattr(v, "copy") else copy(v) for k, v in vars(obj).items()},
277        **kwargs,
278    }
279
280
281def split_num_words(
282    value: str, sep: str, min_num_words: int, fill_from_start: bool = True
283) -> t.List[t.Optional[str]]:
284    """
285    Perform a split on a value and return N words as a result with `None` used for words that don't exist.
286
287    Args:
288        value: The value to be split.
289        sep: The value to use to split on.
290        min_num_words: The minimum number of words that are going to be in the result.
291        fill_from_start: Indicates that if `None` values should be inserted at the start or end of the list.
292
293    Examples:
294        >>> split_num_words("db.table", ".", 3)
295        [None, 'db', 'table']
296        >>> split_num_words("db.table", ".", 3, fill_from_start=False)
297        ['db', 'table', None]
298        >>> split_num_words("db.table", ".", 1)
299        ['db', 'table']
300
301    Returns:
302        The list of words returned by `split`, possibly augmented by a number of `None` values.
303    """
304    words = value.split(sep)
305    if fill_from_start:
306        return [None] * (min_num_words - len(words)) + words
307    return words + [None] * (min_num_words - len(words))
308
309
310def is_iterable(value: t.Any) -> bool:
311    """
312    Checks if the value is an iterable, excluding the types `str` and `bytes`.
313
314    Examples:
315        >>> is_iterable([1,2])
316        True
317        >>> is_iterable("test")
318        False
319
320    Args:
321        value: The value to check if it is an iterable.
322
323    Returns:
324        A `bool` value indicating if it is an iterable.
325    """
326    from sqlglot.expression_core import ExpressionCore
327
328    return hasattr(value, "__iter__") and not isinstance(value, (str, bytes, ExpressionCore))
329
330
331def flatten(values: t.Iterable[t.Iterable[t.Any] | t.Any]) -> t.Iterator[t.Any]:
332    """
333    Flattens an iterable that can contain both iterable and non-iterable elements. Objects of
334    type `str` and `bytes` are not regarded as iterables.
335
336    Examples:
337        >>> list(flatten([[1, 2], 3, {4}, (5, "bla")]))
338        [1, 2, 3, 4, 5, 'bla']
339        >>> list(flatten([1, 2, 3]))
340        [1, 2, 3]
341
342    Args:
343        values: The value to be flattened.
344
345    Yields:
346        Non-iterable elements in `values`.
347    """
348    for value in values:
349        if is_iterable(value):
350            yield from flatten(value)
351        else:
352            yield value
353
354
355def dict_depth(d: t.Any) -> int:
356    """
357    Get the nesting depth of a dictionary.
358
359    Example:
360        >>> dict_depth(None)
361        0
362        >>> dict_depth({})
363        1
364        >>> dict_depth({"a": "b"})
365        1
366        >>> dict_depth({"a": {}})
367        2
368        >>> dict_depth({"a": {"b": {}}})
369        3
370    """
371    try:
372        return 1 + dict_depth(next(iter(d.values())))
373    except AttributeError:
374        # d doesn't have attribute "values"
375        return 0
376    except StopIteration:
377        # d.values() returns an empty sequence
378        return 1
379
380
381def first(it: t.Iterable[T]) -> T:
382    """Returns the first element from an iterable (useful for sets)."""
383    return next(i for i in it)
384
385
386def to_bool(value: t.Optional[str | bool]) -> t.Optional[str | bool]:
387    if isinstance(value, bool) or value is None:
388        return value
389
390    # Coerce the value to boolean if it matches to the truthy/falsy values below
391    value_lower = value.lower()
392    if value_lower in ("true", "1"):
393        return True
394    if value_lower in ("false", "0"):
395        return False
396
397    return value
398
399
400def merge_ranges(ranges: t.List[t.Tuple[t.Any, t.Any]]) -> t.List[t.Tuple[t.Any, t.Any]]:
401    """
402    Merges a sequence of ranges, represented as tuples (low, high) whose values
403    belong to some totally-ordered set.
404
405    Example:
406        >>> merge_ranges([(1, 3), (2, 6)])
407        [(1, 6)]
408    """
409    if not ranges:
410        return []
411
412    ranges = sorted(ranges)
413
414    merged = [ranges[0]]
415
416    for start, end in ranges[1:]:
417        last_start, last_end = merged[-1]
418
419        if start <= last_end:
420            merged[-1] = (last_start, max(last_end, end))
421        else:
422            merged.append((start, end))
423
424    return merged
425
426
427def is_iso_date(text: str) -> bool:
428    try:
429        datetime.date.fromisoformat(text)
430        return True
431    except ValueError:
432        return False
433
434
435def is_iso_datetime(text: str) -> bool:
436    try:
437        datetime.datetime.fromisoformat(text)
438        return True
439    except ValueError:
440        return False
441
442
443# Interval units that operate on date components
444DATE_UNITS = {"day", "week", "month", "quarter", "year", "year_month"}
445
446
447def is_date_unit(expression: t.Optional[ExpressionCore]) -> bool:
448    return expression is not None and expression.name.lower() in DATE_UNITS
449
450
451K = t.TypeVar("K")
452V = t.TypeVar("V")
453
454
455class SingleValuedMapping(t.Mapping[K, V]):
456    """
457    Mapping where all keys return the same value.
458
459    This rigamarole is meant to avoid copying keys, which was originally intended
460    as an optimization while qualifying columns for tables with lots of columns.
461    """
462
463    def __init__(self, keys: t.Collection[K], value: V):
464        self._keys = keys if isinstance(keys, Set) else set(keys)
465        self._value = value
466
467    def __getitem__(self, key: K) -> V:
468        if key in self._keys:
469            return self._value
470        raise KeyError(key)
471
472    def __len__(self) -> int:
473        return len(self._keys)
474
475    def __iter__(self) -> t.Iterator[K]:
476        return iter(self._keys)
CAMEL_CASE_PATTERN = re.compile('(?<!^)(?=[A-Z])')
PYTHON_VERSION = (3, 10)
logger = <Logger sqlglot (WARNING)>
class AutoName(enum.Enum):
39class AutoName(Enum):
40    """
41    This is used for creating Enum classes where `auto()` is the string form
42    of the corresponding enum's identifier (e.g. FOO.value results in "FOO").
43
44    Reference: https://docs.python.org/3/howto/enum.html#using-automatic-values
45    """
46
47    def _generate_next_value_(name, _start, _count, _last_values):
48        return name

This is used for creating Enum classes where auto() is the string form of the corresponding enum's identifier (e.g. FOO.value results in "FOO").

Reference: https://docs.python.org/3/howto/enum.html#using-automatic-values

def suggest_closest_match_and_fail(kind: str, word: str, possibilities: Iterable[str]) -> None:
51def suggest_closest_match_and_fail(
52    kind: str,
53    word: str,
54    possibilities: t.Iterable[str],
55) -> None:
56    close_matches = get_close_matches(word, possibilities, n=1)
57
58    similar = seq_get(close_matches, 0) or ""
59    if similar:
60        similar = f" Did you mean {similar}?"
61
62    raise ValueError(f"Unknown {kind} '{word}'.{similar}")
def seq_get(seq: Sequence[~T], index: int) -> Optional[~T]:
65def seq_get(seq: t.Sequence[T], index: int) -> t.Optional[T]:
66    """Returns the value in `seq` at position `index`, or `None` if `index` is out of bounds."""
67    try:
68        return seq[index]
69    except IndexError:
70        return None

Returns the value in seq at position index, or None if index is out of bounds.

def ensure_list(value):
 85def ensure_list(value):
 86    """
 87    Ensures that a value is a list, otherwise casts or wraps it into one.
 88
 89    Args:
 90        value: The value of interest.
 91
 92    Returns:
 93        The value cast as a list if it's a list or a tuple, or else the value wrapped in a list.
 94    """
 95    if value is None:
 96        return []
 97    if isinstance(value, (list, tuple)):
 98        return list(value)
 99
100    return [value]

Ensures that a value is a list, otherwise casts or wraps it into one.

Arguments:
  • value: The value of interest.
Returns:

The value cast as a list if it's a list or a tuple, or else the value wrapped in a list.

def ensure_collection(value):
111def ensure_collection(value):
112    """
113    Ensures that a value is a collection (excluding `str` and `bytes`), otherwise wraps it into a list.
114
115    Args:
116        value: The value of interest.
117
118    Returns:
119        The value if it's a collection, or else the value wrapped in a list.
120    """
121    if value is None:
122        return []
123    return (
124        value if isinstance(value, Collection) and not isinstance(value, (str, bytes)) else [value]
125    )

Ensures that a value is a collection (excluding str and bytes), otherwise wraps it into a list.

Arguments:
  • value: The value of interest.
Returns:

The value if it's a collection, or else the value wrapped in a list.

def csv(*args: str, sep: str = ', ') -> str:
128def csv(*args: str, sep: str = ", ") -> str:
129    """
130    Formats any number of string arguments as CSV.
131
132    Args:
133        args: The string arguments to format.
134        sep: The argument separator.
135
136    Returns:
137        The arguments formatted as a CSV string.
138    """
139    return sep.join(arg for arg in args if arg)

Formats any number of string arguments as CSV.

Arguments:
  • args: The string arguments to format.
  • sep: The argument separator.
Returns:

The arguments formatted as a CSV string.

def subclasses( module_name: str, classes: Union[Type, Tuple[Type, ...]], exclude: Set[Type] = set()) -> List[Type]:
142def subclasses(
143    module_name: str,
144    classes: t.Type | t.Tuple[t.Type, ...],
145    exclude: t.Set[t.Type] = set(),
146) -> t.List[t.Type]:
147    """
148    Returns all subclasses for a collection of classes, possibly excluding some of them.
149
150    Args:
151        module_name: The name of the module to search for subclasses in.
152        classes: Class(es) we want to find the subclasses of.
153        exclude: Classes we want to exclude from the returned list.
154
155    Returns:
156        The target subclasses.
157    """
158    return [
159        obj
160        for _, obj in inspect.getmembers(
161            sys.modules[module_name],
162            lambda obj: inspect.isclass(obj) and issubclass(obj, classes) and obj not in exclude,
163        )
164    ]

Returns all subclasses for a collection of classes, possibly excluding some of them.

Arguments:
  • module_name: The name of the module to search for subclasses in.
  • classes: Class(es) we want to find the subclasses of.
  • exclude: Classes we want to exclude from the returned list.
Returns:

The target subclasses.

def camel_to_snake_case(name: str) -> str:
167def camel_to_snake_case(name: str) -> str:
168    """Converts `name` from camelCase to snake_case and returns the result."""
169    return CAMEL_CASE_PATTERN.sub("_", name).upper()

Converts name from camelCase to snake_case and returns the result.

def while_changing(expression: ~E, func: Callable[[~E], ~E]) -> ~E:
172def while_changing(expression: E, func: t.Callable[[E], E]) -> E:
173    """
174    Applies a transformation to a given expression until a fix point is reached.
175
176    Args:
177        expression: The expression to be transformed.
178        func: The transformation to be applied.
179
180    Returns:
181        The transformed expression.
182    """
183
184    while True:
185        start_hash = hash(expression)
186        expression = func(expression)
187        end_hash = hash(expression)
188
189        if start_hash == end_hash:
190            break
191
192    return expression

Applies a transformation to a given expression until a fix point is reached.

Arguments:
  • expression: The expression to be transformed.
  • func: The transformation to be applied.
Returns:

The transformed expression.

def tsort(dag: Dict[~T, Set[~T]]) -> List[~T]:
195def tsort(dag: t.Dict[T, t.Set[T]]) -> t.List[T]:
196    """
197    Sorts a given directed acyclic graph in topological order.
198
199    Args:
200        dag: The graph to be sorted.
201
202    Returns:
203        A list that contains all of the graph's nodes in topological order.
204    """
205    result = []
206
207    for node, deps in tuple(dag.items()):
208        for dep in deps:
209            if dep not in dag:
210                dag[dep] = set()
211
212    while dag:
213        current = {node for node, deps in dag.items() if not deps}
214
215        if not current:
216            raise ValueError("Cycle error")
217
218        for node in current:
219            dag.pop(node)
220
221        for deps in dag.values():
222            deps -= current
223
224        result.extend(sorted(current))  # type: ignore
225
226    return result

Sorts a given directed acyclic graph in topological order.

Arguments:
  • dag: The graph to be sorted.
Returns:

A list that contains all of the graph's nodes in topological order.

def find_new_name(taken: Collection[str], base: str) -> str:
229def find_new_name(taken: t.Collection[str], base: str) -> str:
230    """
231    Searches for a new name.
232
233    Args:
234        taken: A collection of taken names.
235        base: Base name to alter.
236
237    Returns:
238        The new, available name.
239    """
240    if base not in taken:
241        return base
242
243    i = 2
244    new = f"{base}_{i}"
245    while new in taken:
246        i += 1
247        new = f"{base}_{i}"
248
249    return new

Searches for a new name.

Arguments:
  • taken: A collection of taken names.
  • base: Base name to alter.
Returns:

The new, available name.

def is_int(text: str) -> bool:
252def is_int(text: str) -> bool:
253    return is_type(text, int)
def is_float(text: str) -> bool:
256def is_float(text: str) -> bool:
257    return is_type(text, float)
def is_type(text: str, target_type: Type) -> bool:
260def is_type(text: str, target_type: t.Type) -> bool:
261    try:
262        target_type(text)
263        return True
264    except ValueError:
265        return False
def name_sequence(prefix: str) -> Callable[[], str]:
268def name_sequence(prefix: str) -> t.Callable[[], str]:
269    """Returns a name generator given a prefix (e.g. a0, a1, a2, ... if the prefix is "a")."""
270    sequence = count()
271    return lambda: f"{prefix}{next(sequence)}"

Returns a name generator given a prefix (e.g. a0, a1, a2, ... if the prefix is "a").

def object_to_dict(obj: Any, **kwargs) -> Dict:
274def object_to_dict(obj: t.Any, **kwargs) -> t.Dict:
275    """Returns a dictionary created from an object's attributes."""
276    return {
277        **{k: v.copy() if hasattr(v, "copy") else copy(v) for k, v in vars(obj).items()},
278        **kwargs,
279    }

Returns a dictionary created from an object's attributes.

def split_num_words( value: str, sep: str, min_num_words: int, fill_from_start: bool = True) -> List[Optional[str]]:
282def split_num_words(
283    value: str, sep: str, min_num_words: int, fill_from_start: bool = True
284) -> t.List[t.Optional[str]]:
285    """
286    Perform a split on a value and return N words as a result with `None` used for words that don't exist.
287
288    Args:
289        value: The value to be split.
290        sep: The value to use to split on.
291        min_num_words: The minimum number of words that are going to be in the result.
292        fill_from_start: Indicates that if `None` values should be inserted at the start or end of the list.
293
294    Examples:
295        >>> split_num_words("db.table", ".", 3)
296        [None, 'db', 'table']
297        >>> split_num_words("db.table", ".", 3, fill_from_start=False)
298        ['db', 'table', None]
299        >>> split_num_words("db.table", ".", 1)
300        ['db', 'table']
301
302    Returns:
303        The list of words returned by `split`, possibly augmented by a number of `None` values.
304    """
305    words = value.split(sep)
306    if fill_from_start:
307        return [None] * (min_num_words - len(words)) + words
308    return words + [None] * (min_num_words - len(words))

Perform a split on a value and return N words as a result with None used for words that don't exist.

Arguments:
  • value: The value to be split.
  • sep: The value to use to split on.
  • min_num_words: The minimum number of words that are going to be in the result.
  • fill_from_start: Indicates that if None values should be inserted at the start or end of the list.
Examples:
>>> split_num_words("db.table", ".", 3)
[None, 'db', 'table']
>>> split_num_words("db.table", ".", 3, fill_from_start=False)
['db', 'table', None]
>>> split_num_words("db.table", ".", 1)
['db', 'table']
Returns:

The list of words returned by split, possibly augmented by a number of None values.

def is_iterable(value: Any) -> bool:
311def is_iterable(value: t.Any) -> bool:
312    """
313    Checks if the value is an iterable, excluding the types `str` and `bytes`.
314
315    Examples:
316        >>> is_iterable([1,2])
317        True
318        >>> is_iterable("test")
319        False
320
321    Args:
322        value: The value to check if it is an iterable.
323
324    Returns:
325        A `bool` value indicating if it is an iterable.
326    """
327    from sqlglot.expression_core import ExpressionCore
328
329    return hasattr(value, "__iter__") and not isinstance(value, (str, bytes, ExpressionCore))

Checks if the value is an iterable, excluding the types str and bytes.

Examples:
>>> is_iterable([1,2])
True
>>> is_iterable("test")
False
Arguments:
  • value: The value to check if it is an iterable.
Returns:

A bool value indicating if it is an iterable.

def flatten(values: Iterable[Union[Iterable[Any], Any]]) -> Iterator[Any]:
332def flatten(values: t.Iterable[t.Iterable[t.Any] | t.Any]) -> t.Iterator[t.Any]:
333    """
334    Flattens an iterable that can contain both iterable and non-iterable elements. Objects of
335    type `str` and `bytes` are not regarded as iterables.
336
337    Examples:
338        >>> list(flatten([[1, 2], 3, {4}, (5, "bla")]))
339        [1, 2, 3, 4, 5, 'bla']
340        >>> list(flatten([1, 2, 3]))
341        [1, 2, 3]
342
343    Args:
344        values: The value to be flattened.
345
346    Yields:
347        Non-iterable elements in `values`.
348    """
349    for value in values:
350        if is_iterable(value):
351            yield from flatten(value)
352        else:
353            yield value

Flattens an iterable that can contain both iterable and non-iterable elements. Objects of type str and bytes are not regarded as iterables.

Examples:
>>> list(flatten([[1, 2], 3, {4}, (5, "bla")]))
[1, 2, 3, 4, 5, 'bla']
>>> list(flatten([1, 2, 3]))
[1, 2, 3]
Arguments:
  • values: The value to be flattened.
Yields:

Non-iterable elements in values.

def dict_depth(d: Any) -> int:
356def dict_depth(d: t.Any) -> int:
357    """
358    Get the nesting depth of a dictionary.
359
360    Example:
361        >>> dict_depth(None)
362        0
363        >>> dict_depth({})
364        1
365        >>> dict_depth({"a": "b"})
366        1
367        >>> dict_depth({"a": {}})
368        2
369        >>> dict_depth({"a": {"b": {}}})
370        3
371    """
372    try:
373        return 1 + dict_depth(next(iter(d.values())))
374    except AttributeError:
375        # d doesn't have attribute "values"
376        return 0
377    except StopIteration:
378        # d.values() returns an empty sequence
379        return 1

Get the nesting depth of a dictionary.

Example:
>>> dict_depth(None)
0
>>> dict_depth({})
1
>>> dict_depth({"a": "b"})
1
>>> dict_depth({"a": {}})
2
>>> dict_depth({"a": {"b": {}}})
3
def first(it: Iterable[~T]) -> ~T:
382def first(it: t.Iterable[T]) -> T:
383    """Returns the first element from an iterable (useful for sets)."""
384    return next(i for i in it)

Returns the first element from an iterable (useful for sets).

def to_bool(value: Union[str, bool, NoneType]) -> Union[str, bool, NoneType]:
387def to_bool(value: t.Optional[str | bool]) -> t.Optional[str | bool]:
388    if isinstance(value, bool) or value is None:
389        return value
390
391    # Coerce the value to boolean if it matches to the truthy/falsy values below
392    value_lower = value.lower()
393    if value_lower in ("true", "1"):
394        return True
395    if value_lower in ("false", "0"):
396        return False
397
398    return value
def merge_ranges(ranges: List[Tuple[Any, Any]]) -> List[Tuple[Any, Any]]:
401def merge_ranges(ranges: t.List[t.Tuple[t.Any, t.Any]]) -> t.List[t.Tuple[t.Any, t.Any]]:
402    """
403    Merges a sequence of ranges, represented as tuples (low, high) whose values
404    belong to some totally-ordered set.
405
406    Example:
407        >>> merge_ranges([(1, 3), (2, 6)])
408        [(1, 6)]
409    """
410    if not ranges:
411        return []
412
413    ranges = sorted(ranges)
414
415    merged = [ranges[0]]
416
417    for start, end in ranges[1:]:
418        last_start, last_end = merged[-1]
419
420        if start <= last_end:
421            merged[-1] = (last_start, max(last_end, end))
422        else:
423            merged.append((start, end))
424
425    return merged

Merges a sequence of ranges, represented as tuples (low, high) whose values belong to some totally-ordered set.

Example:
>>> merge_ranges([(1, 3), (2, 6)])
[(1, 6)]
def is_iso_date(text: str) -> bool:
428def is_iso_date(text: str) -> bool:
429    try:
430        datetime.date.fromisoformat(text)
431        return True
432    except ValueError:
433        return False
def is_iso_datetime(text: str) -> bool:
436def is_iso_datetime(text: str) -> bool:
437    try:
438        datetime.datetime.fromisoformat(text)
439        return True
440    except ValueError:
441        return False
DATE_UNITS = {'year', 'quarter', 'day', 'year_month', 'week', 'month'}
def is_date_unit(expression: Optional[sqlglot.expression_core.ExpressionCore]) -> bool:
448def is_date_unit(expression: t.Optional[ExpressionCore]) -> bool:
449    return expression is not None and expression.name.lower() in DATE_UNITS
class SingleValuedMapping(typing.Mapping[~K, ~V]):
456class SingleValuedMapping(t.Mapping[K, V]):
457    """
458    Mapping where all keys return the same value.
459
460    This rigamarole is meant to avoid copying keys, which was originally intended
461    as an optimization while qualifying columns for tables with lots of columns.
462    """
463
464    def __init__(self, keys: t.Collection[K], value: V):
465        self._keys = keys if isinstance(keys, Set) else set(keys)
466        self._value = value
467
468    def __getitem__(self, key: K) -> V:
469        if key in self._keys:
470            return self._value
471        raise KeyError(key)
472
473    def __len__(self) -> int:
474        return len(self._keys)
475
476    def __iter__(self) -> t.Iterator[K]:
477        return iter(self._keys)

Mapping where all keys return the same value.

This rigamarole is meant to avoid copying keys, which was originally intended as an optimization while qualifying columns for tables with lots of columns.

SingleValuedMapping(keys: Collection[~K], value: ~V)
464    def __init__(self, keys: t.Collection[K], value: V):
465        self._keys = keys if isinstance(keys, Set) else set(keys)
466        self._value = value