sqlglot.helper
1from __future__ import annotations 2 3import datetime 4import inspect 5import logging 6import re 7import sys 8import typing as t 9from collections.abc import Collection, Set 10from copy import copy 11from difflib import get_close_matches 12from enum import Enum 13from itertools import count 14 15if t.TYPE_CHECKING: 16 from sqlglot import exp 17 from sqlglot._typing import A, E, T 18 from sqlglot.dialects.dialect import DialectType 19 from sqlglot.expressions import Expression 20 21 22CAMEL_CASE_PATTERN = re.compile("(?<!^)(?=[A-Z])") 23PYTHON_VERSION = sys.version_info[:2] 24logger = logging.getLogger("sqlglot") 25 26 27class AutoName(Enum): 28 """ 29 This is used for creating Enum classes where `auto()` is the string form 30 of the corresponding enum's identifier (e.g. FOO.value results in "FOO"). 31 32 Reference: https://docs.python.org/3/howto/enum.html#using-automatic-values 33 """ 34 35 def _generate_next_value_(name, _start, _count, _last_values): 36 return name 37 38 39class classproperty(property): 40 """ 41 Similar to a normal property but works for class methods 42 """ 43 44 def __get__(self, obj: t.Any, owner: t.Any = None) -> t.Any: 45 return classmethod(self.fget).__get__(None, owner)() # type: ignore 46 47 48def suggest_closest_match_and_fail( 49 kind: str, 50 word: str, 51 possibilities: t.Iterable[str], 52) -> None: 53 close_matches = get_close_matches(word, possibilities, n=1) 54 55 similar = seq_get(close_matches, 0) or "" 56 if similar: 57 similar = f" Did you mean {similar}?" 58 59 raise ValueError(f"Unknown {kind} '{word}'.{similar}") 60 61 62def seq_get(seq: t.Sequence[T], index: int) -> t.Optional[T]: 63 """Returns the value in `seq` at position `index`, or `None` if `index` is out of bounds.""" 64 try: 65 return seq[index] 66 except IndexError: 67 return None 68 69 70@t.overload 71def ensure_list(value: t.Collection[T]) -> t.List[T]: ... 72 73 74@t.overload 75def ensure_list(value: None) -> t.List: ... 76 77 78@t.overload 79def ensure_list(value: T) -> t.List[T]: ... 80 81 82def ensure_list(value): 83 """ 84 Ensures that a value is a list, otherwise casts or wraps it into one. 85 86 Args: 87 value: The value of interest. 88 89 Returns: 90 The value cast as a list if it's a list or a tuple, or else the value wrapped in a list. 91 """ 92 if value is None: 93 return [] 94 if isinstance(value, (list, tuple)): 95 return list(value) 96 97 return [value] 98 99 100@t.overload 101def ensure_collection(value: t.Collection[T]) -> t.Collection[T]: ... 102 103 104@t.overload 105def ensure_collection(value: T) -> t.Collection[T]: ... 106 107 108def ensure_collection(value): 109 """ 110 Ensures that a value is a collection (excluding `str` and `bytes`), otherwise wraps it into a list. 111 112 Args: 113 value: The value of interest. 114 115 Returns: 116 The value if it's a collection, or else the value wrapped in a list. 117 """ 118 if value is None: 119 return [] 120 return ( 121 value if isinstance(value, Collection) and not isinstance(value, (str, bytes)) else [value] 122 ) 123 124 125def csv(*args: str, sep: str = ", ") -> str: 126 """ 127 Formats any number of string arguments as CSV. 128 129 Args: 130 args: The string arguments to format. 131 sep: The argument separator. 132 133 Returns: 134 The arguments formatted as a CSV string. 135 """ 136 return sep.join(arg for arg in args if arg) 137 138 139def subclasses( 140 module_name: str, 141 classes: t.Type | t.Tuple[t.Type, ...], 142 exclude: t.Set[t.Type] = set(), 143) -> t.List[t.Type]: 144 """ 145 Returns all subclasses for a collection of classes, possibly excluding some of them. 146 147 Args: 148 module_name: The name of the module to search for subclasses in. 149 classes: Class(es) we want to find the subclasses of. 150 exclude: Classes we want to exclude from the returned list. 151 152 Returns: 153 The target subclasses. 154 """ 155 return [ 156 obj 157 for _, obj in inspect.getmembers( 158 sys.modules[module_name], 159 lambda obj: inspect.isclass(obj) and issubclass(obj, classes) and obj not in exclude, 160 ) 161 ] 162 163 164def apply_index_offset( 165 this: exp.Expression, 166 expressions: t.List[E], 167 offset: int, 168 dialect: DialectType = None, 169) -> t.List[E]: 170 """ 171 Applies an offset to a given integer literal expression. 172 173 Args: 174 this: The target of the index. 175 expressions: The expression the offset will be applied to, wrapped in a list. 176 offset: The offset that will be applied. 177 dialect: the dialect of interest. 178 179 Returns: 180 The original expression with the offset applied to it, wrapped in a list. If the provided 181 `expressions` argument contains more than one expression, it's returned unaffected. 182 """ 183 if not offset or len(expressions) != 1: 184 return expressions 185 186 expression = expressions[0] 187 188 from sqlglot import exp 189 from sqlglot.optimizer.annotate_types import annotate_types 190 from sqlglot.optimizer.simplify import simplify 191 192 if not this.type: 193 annotate_types(this, dialect=dialect) 194 195 if t.cast(exp.DataType, this.type).this not in ( 196 exp.DataType.Type.UNKNOWN, 197 exp.DataType.Type.ARRAY, 198 ): 199 return expressions 200 201 if not expression.type: 202 annotate_types(expression, dialect=dialect) 203 204 if t.cast(exp.DataType, expression.type).this in exp.DataType.INTEGER_TYPES: 205 logger.info("Applying array index offset (%s)", offset) 206 expression = simplify(expression + offset) 207 return [expression] 208 209 return expressions 210 211 212def camel_to_snake_case(name: str) -> str: 213 """Converts `name` from camelCase to snake_case and returns the result.""" 214 return CAMEL_CASE_PATTERN.sub("_", name).upper() 215 216 217def while_changing(expression: Expression, func: t.Callable[[Expression], E]) -> E: 218 """ 219 Applies a transformation to a given expression until a fix point is reached. 220 221 Args: 222 expression: The expression to be transformed. 223 func: The transformation to be applied. 224 225 Returns: 226 The transformed expression. 227 """ 228 229 while True: 230 start_hash = hash(expression) 231 expression = func(expression) 232 end_hash = hash(expression) 233 234 if start_hash == end_hash: 235 break 236 237 return expression 238 239 240def tsort(dag: t.Dict[T, t.Set[T]]) -> t.List[T]: 241 """ 242 Sorts a given directed acyclic graph in topological order. 243 244 Args: 245 dag: The graph to be sorted. 246 247 Returns: 248 A list that contains all of the graph's nodes in topological order. 249 """ 250 result = [] 251 252 for node, deps in tuple(dag.items()): 253 for dep in deps: 254 if dep not in dag: 255 dag[dep] = set() 256 257 while dag: 258 current = {node for node, deps in dag.items() if not deps} 259 260 if not current: 261 raise ValueError("Cycle error") 262 263 for node in current: 264 dag.pop(node) 265 266 for deps in dag.values(): 267 deps -= current 268 269 result.extend(sorted(current)) # type: ignore 270 271 return result 272 273 274def find_new_name(taken: t.Collection[str], base: str) -> str: 275 """ 276 Searches for a new name. 277 278 Args: 279 taken: A collection of taken names. 280 base: Base name to alter. 281 282 Returns: 283 The new, available name. 284 """ 285 if base not in taken: 286 return base 287 288 i = 2 289 new = f"{base}_{i}" 290 while new in taken: 291 i += 1 292 new = f"{base}_{i}" 293 294 return new 295 296 297def is_int(text: str) -> bool: 298 return is_type(text, int) 299 300 301def is_float(text: str) -> bool: 302 return is_type(text, float) 303 304 305def is_type(text: str, target_type: t.Type) -> bool: 306 try: 307 target_type(text) 308 return True 309 except ValueError: 310 return False 311 312 313def name_sequence(prefix: str) -> t.Callable[[], str]: 314 """Returns a name generator given a prefix (e.g. a0, a1, a2, ... if the prefix is "a").""" 315 sequence = count() 316 return lambda: f"{prefix}{next(sequence)}" 317 318 319def object_to_dict(obj: t.Any, **kwargs) -> t.Dict: 320 """Returns a dictionary created from an object's attributes.""" 321 return { 322 **{k: v.copy() if hasattr(v, "copy") else copy(v) for k, v in vars(obj).items()}, 323 **kwargs, 324 } 325 326 327def split_num_words( 328 value: str, sep: str, min_num_words: int, fill_from_start: bool = True 329) -> t.List[t.Optional[str]]: 330 """ 331 Perform a split on a value and return N words as a result with `None` used for words that don't exist. 332 333 Args: 334 value: The value to be split. 335 sep: The value to use to split on. 336 min_num_words: The minimum number of words that are going to be in the result. 337 fill_from_start: Indicates that if `None` values should be inserted at the start or end of the list. 338 339 Examples: 340 >>> split_num_words("db.table", ".", 3) 341 [None, 'db', 'table'] 342 >>> split_num_words("db.table", ".", 3, fill_from_start=False) 343 ['db', 'table', None] 344 >>> split_num_words("db.table", ".", 1) 345 ['db', 'table'] 346 347 Returns: 348 The list of words returned by `split`, possibly augmented by a number of `None` values. 349 """ 350 words = value.split(sep) 351 if fill_from_start: 352 return [None] * (min_num_words - len(words)) + words 353 return words + [None] * (min_num_words - len(words)) 354 355 356def is_iterable(value: t.Any) -> bool: 357 """ 358 Checks if the value is an iterable, excluding the types `str` and `bytes`. 359 360 Examples: 361 >>> is_iterable([1,2]) 362 True 363 >>> is_iterable("test") 364 False 365 366 Args: 367 value: The value to check if it is an iterable. 368 369 Returns: 370 A `bool` value indicating if it is an iterable. 371 """ 372 from sqlglot import Expression 373 374 return hasattr(value, "__iter__") and not isinstance(value, (str, bytes, Expression)) 375 376 377def flatten(values: t.Iterable[t.Iterable[t.Any] | t.Any]) -> t.Iterator[t.Any]: 378 """ 379 Flattens an iterable that can contain both iterable and non-iterable elements. Objects of 380 type `str` and `bytes` are not regarded as iterables. 381 382 Examples: 383 >>> list(flatten([[1, 2], 3, {4}, (5, "bla")])) 384 [1, 2, 3, 4, 5, 'bla'] 385 >>> list(flatten([1, 2, 3])) 386 [1, 2, 3] 387 388 Args: 389 values: The value to be flattened. 390 391 Yields: 392 Non-iterable elements in `values`. 393 """ 394 for value in values: 395 if is_iterable(value): 396 yield from flatten(value) 397 else: 398 yield value 399 400 401def dict_depth(d: t.Dict) -> int: 402 """ 403 Get the nesting depth of a dictionary. 404 405 Example: 406 >>> dict_depth(None) 407 0 408 >>> dict_depth({}) 409 1 410 >>> dict_depth({"a": "b"}) 411 1 412 >>> dict_depth({"a": {}}) 413 2 414 >>> dict_depth({"a": {"b": {}}}) 415 3 416 """ 417 try: 418 return 1 + dict_depth(next(iter(d.values()))) 419 except AttributeError: 420 # d doesn't have attribute "values" 421 return 0 422 except StopIteration: 423 # d.values() returns an empty sequence 424 return 1 425 426 427def first(it: t.Iterable[T]) -> T: 428 """Returns the first element from an iterable (useful for sets).""" 429 return next(i for i in it) 430 431 432def to_bool(value: t.Optional[str | bool]) -> t.Optional[str | bool]: 433 if isinstance(value, bool) or value is None: 434 return value 435 436 # Coerce the value to boolean if it matches to the truthy/falsy values below 437 value_lower = value.lower() 438 if value_lower in ("true", "1"): 439 return True 440 if value_lower in ("false", "0"): 441 return False 442 443 return value 444 445 446def merge_ranges(ranges: t.List[t.Tuple[A, A]]) -> t.List[t.Tuple[A, A]]: 447 """ 448 Merges a sequence of ranges, represented as tuples (low, high) whose values 449 belong to some totally-ordered set. 450 451 Example: 452 >>> merge_ranges([(1, 3), (2, 6)]) 453 [(1, 6)] 454 """ 455 if not ranges: 456 return [] 457 458 ranges = sorted(ranges) 459 460 merged = [ranges[0]] 461 462 for start, end in ranges[1:]: 463 last_start, last_end = merged[-1] 464 465 if start <= last_end: 466 merged[-1] = (last_start, max(last_end, end)) 467 else: 468 merged.append((start, end)) 469 470 return merged 471 472 473def is_iso_date(text: str) -> bool: 474 try: 475 datetime.date.fromisoformat(text) 476 return True 477 except ValueError: 478 return False 479 480 481def is_iso_datetime(text: str) -> bool: 482 try: 483 datetime.datetime.fromisoformat(text) 484 return True 485 except ValueError: 486 return False 487 488 489# Interval units that operate on date components 490DATE_UNITS = {"day", "week", "month", "quarter", "year", "year_month"} 491 492 493def is_date_unit(expression: t.Optional[exp.Expression]) -> bool: 494 return expression is not None and expression.name.lower() in DATE_UNITS 495 496 497K = t.TypeVar("K") 498V = t.TypeVar("V") 499 500 501class SingleValuedMapping(t.Mapping[K, V]): 502 """ 503 Mapping where all keys return the same value. 504 505 This rigamarole is meant to avoid copying keys, which was originally intended 506 as an optimization while qualifying columns for tables with lots of columns. 507 """ 508 509 def __init__(self, keys: t.Collection[K], value: V): 510 self._keys = keys if isinstance(keys, Set) else set(keys) 511 self._value = value 512 513 def __getitem__(self, key: K) -> V: 514 if key in self._keys: 515 return self._value 516 raise KeyError(key) 517 518 def __len__(self) -> int: 519 return len(self._keys) 520 521 def __iter__(self) -> t.Iterator[K]: 522 return iter(self._keys)
28class AutoName(Enum): 29 """ 30 This is used for creating Enum classes where `auto()` is the string form 31 of the corresponding enum's identifier (e.g. FOO.value results in "FOO"). 32 33 Reference: https://docs.python.org/3/howto/enum.html#using-automatic-values 34 """ 35 36 def _generate_next_value_(name, _start, _count, _last_values): 37 return name
This is used for creating Enum classes where auto() is the string form
of the corresponding enum's identifier (e.g. FOO.value results in "FOO").
Reference: https://docs.python.org/3/howto/enum.html#using-automatic-values
40class classproperty(property): 41 """ 42 Similar to a normal property but works for class methods 43 """ 44 45 def __get__(self, obj: t.Any, owner: t.Any = None) -> t.Any: 46 return classmethod(self.fget).__get__(None, owner)() # type: ignore
Similar to a normal property but works for class methods
49def suggest_closest_match_and_fail( 50 kind: str, 51 word: str, 52 possibilities: t.Iterable[str], 53) -> None: 54 close_matches = get_close_matches(word, possibilities, n=1) 55 56 similar = seq_get(close_matches, 0) or "" 57 if similar: 58 similar = f" Did you mean {similar}?" 59 60 raise ValueError(f"Unknown {kind} '{word}'.{similar}")
63def seq_get(seq: t.Sequence[T], index: int) -> t.Optional[T]: 64 """Returns the value in `seq` at position `index`, or `None` if `index` is out of bounds.""" 65 try: 66 return seq[index] 67 except IndexError: 68 return None
Returns the value in seq at position index, or None if index is out of bounds.
83def ensure_list(value): 84 """ 85 Ensures that a value is a list, otherwise casts or wraps it into one. 86 87 Args: 88 value: The value of interest. 89 90 Returns: 91 The value cast as a list if it's a list or a tuple, or else the value wrapped in a list. 92 """ 93 if value is None: 94 return [] 95 if isinstance(value, (list, tuple)): 96 return list(value) 97 98 return [value]
Ensures that a value is a list, otherwise casts or wraps it into one.
Arguments:
- value: The value of interest.
Returns:
The value cast as a list if it's a list or a tuple, or else the value wrapped in a list.
109def ensure_collection(value): 110 """ 111 Ensures that a value is a collection (excluding `str` and `bytes`), otherwise wraps it into a list. 112 113 Args: 114 value: The value of interest. 115 116 Returns: 117 The value if it's a collection, or else the value wrapped in a list. 118 """ 119 if value is None: 120 return [] 121 return ( 122 value if isinstance(value, Collection) and not isinstance(value, (str, bytes)) else [value] 123 )
Ensures that a value is a collection (excluding str and bytes), otherwise wraps it into a list.
Arguments:
- value: The value of interest.
Returns:
The value if it's a collection, or else the value wrapped in a list.
126def csv(*args: str, sep: str = ", ") -> str: 127 """ 128 Formats any number of string arguments as CSV. 129 130 Args: 131 args: The string arguments to format. 132 sep: The argument separator. 133 134 Returns: 135 The arguments formatted as a CSV string. 136 """ 137 return sep.join(arg for arg in args if arg)
Formats any number of string arguments as CSV.
Arguments:
- args: The string arguments to format.
- sep: The argument separator.
Returns:
The arguments formatted as a CSV string.
140def subclasses( 141 module_name: str, 142 classes: t.Type | t.Tuple[t.Type, ...], 143 exclude: t.Set[t.Type] = set(), 144) -> t.List[t.Type]: 145 """ 146 Returns all subclasses for a collection of classes, possibly excluding some of them. 147 148 Args: 149 module_name: The name of the module to search for subclasses in. 150 classes: Class(es) we want to find the subclasses of. 151 exclude: Classes we want to exclude from the returned list. 152 153 Returns: 154 The target subclasses. 155 """ 156 return [ 157 obj 158 for _, obj in inspect.getmembers( 159 sys.modules[module_name], 160 lambda obj: inspect.isclass(obj) and issubclass(obj, classes) and obj not in exclude, 161 ) 162 ]
Returns all subclasses for a collection of classes, possibly excluding some of them.
Arguments:
- module_name: The name of the module to search for subclasses in.
- classes: Class(es) we want to find the subclasses of.
- exclude: Classes we want to exclude from the returned list.
Returns:
The target subclasses.
165def apply_index_offset( 166 this: exp.Expression, 167 expressions: t.List[E], 168 offset: int, 169 dialect: DialectType = None, 170) -> t.List[E]: 171 """ 172 Applies an offset to a given integer literal expression. 173 174 Args: 175 this: The target of the index. 176 expressions: The expression the offset will be applied to, wrapped in a list. 177 offset: The offset that will be applied. 178 dialect: the dialect of interest. 179 180 Returns: 181 The original expression with the offset applied to it, wrapped in a list. If the provided 182 `expressions` argument contains more than one expression, it's returned unaffected. 183 """ 184 if not offset or len(expressions) != 1: 185 return expressions 186 187 expression = expressions[0] 188 189 from sqlglot import exp 190 from sqlglot.optimizer.annotate_types import annotate_types 191 from sqlglot.optimizer.simplify import simplify 192 193 if not this.type: 194 annotate_types(this, dialect=dialect) 195 196 if t.cast(exp.DataType, this.type).this not in ( 197 exp.DataType.Type.UNKNOWN, 198 exp.DataType.Type.ARRAY, 199 ): 200 return expressions 201 202 if not expression.type: 203 annotate_types(expression, dialect=dialect) 204 205 if t.cast(exp.DataType, expression.type).this in exp.DataType.INTEGER_TYPES: 206 logger.info("Applying array index offset (%s)", offset) 207 expression = simplify(expression + offset) 208 return [expression] 209 210 return expressions
Applies an offset to a given integer literal expression.
Arguments:
- this: The target of the index.
- expressions: The expression the offset will be applied to, wrapped in a list.
- offset: The offset that will be applied.
- dialect: the dialect of interest.
Returns:
The original expression with the offset applied to it, wrapped in a list. If the provided
expressionsargument contains more than one expression, it's returned unaffected.
213def camel_to_snake_case(name: str) -> str: 214 """Converts `name` from camelCase to snake_case and returns the result.""" 215 return CAMEL_CASE_PATTERN.sub("_", name).upper()
Converts name from camelCase to snake_case and returns the result.
218def while_changing(expression: Expression, func: t.Callable[[Expression], E]) -> E: 219 """ 220 Applies a transformation to a given expression until a fix point is reached. 221 222 Args: 223 expression: The expression to be transformed. 224 func: The transformation to be applied. 225 226 Returns: 227 The transformed expression. 228 """ 229 230 while True: 231 start_hash = hash(expression) 232 expression = func(expression) 233 end_hash = hash(expression) 234 235 if start_hash == end_hash: 236 break 237 238 return expression
Applies a transformation to a given expression until a fix point is reached.
Arguments:
- expression: The expression to be transformed.
- func: The transformation to be applied.
Returns:
The transformed expression.
241def tsort(dag: t.Dict[T, t.Set[T]]) -> t.List[T]: 242 """ 243 Sorts a given directed acyclic graph in topological order. 244 245 Args: 246 dag: The graph to be sorted. 247 248 Returns: 249 A list that contains all of the graph's nodes in topological order. 250 """ 251 result = [] 252 253 for node, deps in tuple(dag.items()): 254 for dep in deps: 255 if dep not in dag: 256 dag[dep] = set() 257 258 while dag: 259 current = {node for node, deps in dag.items() if not deps} 260 261 if not current: 262 raise ValueError("Cycle error") 263 264 for node in current: 265 dag.pop(node) 266 267 for deps in dag.values(): 268 deps -= current 269 270 result.extend(sorted(current)) # type: ignore 271 272 return result
Sorts a given directed acyclic graph in topological order.
Arguments:
- dag: The graph to be sorted.
Returns:
A list that contains all of the graph's nodes in topological order.
275def find_new_name(taken: t.Collection[str], base: str) -> str: 276 """ 277 Searches for a new name. 278 279 Args: 280 taken: A collection of taken names. 281 base: Base name to alter. 282 283 Returns: 284 The new, available name. 285 """ 286 if base not in taken: 287 return base 288 289 i = 2 290 new = f"{base}_{i}" 291 while new in taken: 292 i += 1 293 new = f"{base}_{i}" 294 295 return new
Searches for a new name.
Arguments:
- taken: A collection of taken names.
- base: Base name to alter.
Returns:
The new, available name.
314def name_sequence(prefix: str) -> t.Callable[[], str]: 315 """Returns a name generator given a prefix (e.g. a0, a1, a2, ... if the prefix is "a").""" 316 sequence = count() 317 return lambda: f"{prefix}{next(sequence)}"
Returns a name generator given a prefix (e.g. a0, a1, a2, ... if the prefix is "a").
320def object_to_dict(obj: t.Any, **kwargs) -> t.Dict: 321 """Returns a dictionary created from an object's attributes.""" 322 return { 323 **{k: v.copy() if hasattr(v, "copy") else copy(v) for k, v in vars(obj).items()}, 324 **kwargs, 325 }
Returns a dictionary created from an object's attributes.
328def split_num_words( 329 value: str, sep: str, min_num_words: int, fill_from_start: bool = True 330) -> t.List[t.Optional[str]]: 331 """ 332 Perform a split on a value and return N words as a result with `None` used for words that don't exist. 333 334 Args: 335 value: The value to be split. 336 sep: The value to use to split on. 337 min_num_words: The minimum number of words that are going to be in the result. 338 fill_from_start: Indicates that if `None` values should be inserted at the start or end of the list. 339 340 Examples: 341 >>> split_num_words("db.table", ".", 3) 342 [None, 'db', 'table'] 343 >>> split_num_words("db.table", ".", 3, fill_from_start=False) 344 ['db', 'table', None] 345 >>> split_num_words("db.table", ".", 1) 346 ['db', 'table'] 347 348 Returns: 349 The list of words returned by `split`, possibly augmented by a number of `None` values. 350 """ 351 words = value.split(sep) 352 if fill_from_start: 353 return [None] * (min_num_words - len(words)) + words 354 return words + [None] * (min_num_words - len(words))
Perform a split on a value and return N words as a result with None used for words that don't exist.
Arguments:
- value: The value to be split.
- sep: The value to use to split on.
- min_num_words: The minimum number of words that are going to be in the result.
- fill_from_start: Indicates that if
Nonevalues should be inserted at the start or end of the list.
Examples:
>>> split_num_words("db.table", ".", 3) [None, 'db', 'table'] >>> split_num_words("db.table", ".", 3, fill_from_start=False) ['db', 'table', None] >>> split_num_words("db.table", ".", 1) ['db', 'table']
Returns:
The list of words returned by
split, possibly augmented by a number ofNonevalues.
357def is_iterable(value: t.Any) -> bool: 358 """ 359 Checks if the value is an iterable, excluding the types `str` and `bytes`. 360 361 Examples: 362 >>> is_iterable([1,2]) 363 True 364 >>> is_iterable("test") 365 False 366 367 Args: 368 value: The value to check if it is an iterable. 369 370 Returns: 371 A `bool` value indicating if it is an iterable. 372 """ 373 from sqlglot import Expression 374 375 return hasattr(value, "__iter__") and not isinstance(value, (str, bytes, Expression))
Checks if the value is an iterable, excluding the types str and bytes.
Examples:
>>> is_iterable([1,2]) True >>> is_iterable("test") False
Arguments:
- value: The value to check if it is an iterable.
Returns:
A
boolvalue indicating if it is an iterable.
378def flatten(values: t.Iterable[t.Iterable[t.Any] | t.Any]) -> t.Iterator[t.Any]: 379 """ 380 Flattens an iterable that can contain both iterable and non-iterable elements. Objects of 381 type `str` and `bytes` are not regarded as iterables. 382 383 Examples: 384 >>> list(flatten([[1, 2], 3, {4}, (5, "bla")])) 385 [1, 2, 3, 4, 5, 'bla'] 386 >>> list(flatten([1, 2, 3])) 387 [1, 2, 3] 388 389 Args: 390 values: The value to be flattened. 391 392 Yields: 393 Non-iterable elements in `values`. 394 """ 395 for value in values: 396 if is_iterable(value): 397 yield from flatten(value) 398 else: 399 yield value
Flattens an iterable that can contain both iterable and non-iterable elements. Objects of
type str and bytes are not regarded as iterables.
Examples:
>>> list(flatten([[1, 2], 3, {4}, (5, "bla")])) [1, 2, 3, 4, 5, 'bla'] >>> list(flatten([1, 2, 3])) [1, 2, 3]
Arguments:
- values: The value to be flattened.
Yields:
Non-iterable elements in
values.
402def dict_depth(d: t.Dict) -> int: 403 """ 404 Get the nesting depth of a dictionary. 405 406 Example: 407 >>> dict_depth(None) 408 0 409 >>> dict_depth({}) 410 1 411 >>> dict_depth({"a": "b"}) 412 1 413 >>> dict_depth({"a": {}}) 414 2 415 >>> dict_depth({"a": {"b": {}}}) 416 3 417 """ 418 try: 419 return 1 + dict_depth(next(iter(d.values()))) 420 except AttributeError: 421 # d doesn't have attribute "values" 422 return 0 423 except StopIteration: 424 # d.values() returns an empty sequence 425 return 1
Get the nesting depth of a dictionary.
Example:
>>> dict_depth(None) 0 >>> dict_depth({}) 1 >>> dict_depth({"a": "b"}) 1 >>> dict_depth({"a": {}}) 2 >>> dict_depth({"a": {"b": {}}}) 3
428def first(it: t.Iterable[T]) -> T: 429 """Returns the first element from an iterable (useful for sets).""" 430 return next(i for i in it)
Returns the first element from an iterable (useful for sets).
433def to_bool(value: t.Optional[str | bool]) -> t.Optional[str | bool]: 434 if isinstance(value, bool) or value is None: 435 return value 436 437 # Coerce the value to boolean if it matches to the truthy/falsy values below 438 value_lower = value.lower() 439 if value_lower in ("true", "1"): 440 return True 441 if value_lower in ("false", "0"): 442 return False 443 444 return value
447def merge_ranges(ranges: t.List[t.Tuple[A, A]]) -> t.List[t.Tuple[A, A]]: 448 """ 449 Merges a sequence of ranges, represented as tuples (low, high) whose values 450 belong to some totally-ordered set. 451 452 Example: 453 >>> merge_ranges([(1, 3), (2, 6)]) 454 [(1, 6)] 455 """ 456 if not ranges: 457 return [] 458 459 ranges = sorted(ranges) 460 461 merged = [ranges[0]] 462 463 for start, end in ranges[1:]: 464 last_start, last_end = merged[-1] 465 466 if start <= last_end: 467 merged[-1] = (last_start, max(last_end, end)) 468 else: 469 merged.append((start, end)) 470 471 return merged
Merges a sequence of ranges, represented as tuples (low, high) whose values belong to some totally-ordered set.
Example:
>>> merge_ranges([(1, 3), (2, 6)]) [(1, 6)]
502class SingleValuedMapping(t.Mapping[K, V]): 503 """ 504 Mapping where all keys return the same value. 505 506 This rigamarole is meant to avoid copying keys, which was originally intended 507 as an optimization while qualifying columns for tables with lots of columns. 508 """ 509 510 def __init__(self, keys: t.Collection[K], value: V): 511 self._keys = keys if isinstance(keys, Set) else set(keys) 512 self._value = value 513 514 def __getitem__(self, key: K) -> V: 515 if key in self._keys: 516 return self._value 517 raise KeyError(key) 518 519 def __len__(self) -> int: 520 return len(self._keys) 521 522 def __iter__(self) -> t.Iterator[K]: 523 return iter(self._keys)
Mapping where all keys return the same value.
This rigamarole is meant to avoid copying keys, which was originally intended as an optimization while qualifying columns for tables with lots of columns.