sqlglot.planner
1from __future__ import annotations 2 3import math 4import typing as t 5 6from sqlglot import alias, exp 7from sqlglot.helper import name_sequence 8from sqlglot.optimizer.eliminate_joins import join_condition 9from collections.abc import Iterator, Sequence, Iterable 10 11 12class Plan: 13 def __init__(self, expression: exp.Expr) -> None: 14 self.expression: exp.Expr = expression.copy() 15 self.root: Step = Step.from_expression(self.expression) 16 self._dag: dict[Step, set[Step]] = {} 17 18 @property 19 def dag(self) -> dict[Step, set[Step]]: 20 if not self._dag: 21 dag: dict[Step, set[Step]] = {} 22 nodes = {self.root} 23 24 while nodes: 25 node = nodes.pop() 26 dag[node] = set() 27 28 for dep in node.dependencies: 29 dag[node].add(dep) 30 nodes.add(dep) 31 32 self._dag = dag 33 34 return self._dag 35 36 @property 37 def leaves(self) -> Iterator[Step]: 38 return (node for node, deps in self.dag.items() if not deps) 39 40 def __repr__(self) -> str: 41 return f"Plan\n----\n{repr(self.root)}" 42 43 44class Step: 45 @classmethod 46 def from_expression(cls, expression: exp.Expr, ctes: dict[str, Step] | None = None) -> Step: 47 """ 48 Builds a DAG of Steps from a SQL expression so that it's easier to execute in an engine. 49 Note: the expression's tables and subqueries must be aliased for this method to work. For 50 example, given the following expression: 51 52 SELECT 53 x.a, 54 SUM(x.b) 55 FROM x AS x 56 JOIN y AS y 57 ON x.a = y.a 58 GROUP BY x.a 59 60 the following DAG is produced (the expression IDs might differ per execution): 61 62 - Aggregate: x (4347984624) 63 Context: 64 Aggregations: 65 - SUM(x.b) 66 Group: 67 - x.a 68 Projections: 69 - x.a 70 - "x"."" 71 Dependencies: 72 - Join: x (4347985296) 73 Context: 74 y: 75 On: x.a = y.a 76 Projections: 77 Dependencies: 78 - Scan: x (4347983136) 79 Context: 80 Source: x AS x 81 Projections: 82 - Scan: y (4343416624) 83 Context: 84 Source: y AS y 85 Projections: 86 87 Args: 88 expression: the expression to build the DAG from. 89 ctes: a dictionary that maps CTEs to their corresponding Step DAG by name. 90 91 Returns: 92 A Step DAG corresponding to `expression`. 93 """ 94 ctes = ctes or {} 95 expression = expression.unnest() 96 with_: exp.With | None = expression.args.get("with_") 97 98 # CTEs break the mold of scope and introduce themselves to all in the context. 99 if with_ is not None: 100 ctes = ctes.copy() 101 for cte in with_.expressions: 102 step = Step.from_expression(cte.this, ctes) 103 step.name = cte.alias 104 ctes[step.name] = step # type: ignore 105 106 from_ = expression.args.get("from_") 107 108 if isinstance(expression, exp.Select) and from_: 109 step = Scan.from_expression(from_.this, ctes) 110 elif isinstance(expression, exp.SetOperation): 111 step = SetOperation.from_expression(expression, ctes) 112 else: 113 step = Scan() 114 115 joins: list[exp.Join] | None = expression.args.get("joins") 116 117 if joins is not None: 118 join = Join.from_joins(joins, ctes) 119 join.name = step.name 120 join.source_name = step.name 121 join.add_dependency(step) 122 step = join 123 # final selects in this chain of steps representing a select 124 projections: list[exp.Expr] = [] 125 # intermediate computations of agg funcs eg x + 1 in SUM(x + 1) 126 operands: dict[exp.Expr, str] = {} 127 aggregations: dict[exp.Expr, None] = {} 128 next_operand_name = name_sequence("_a_") 129 130 def extract_agg_operands(expression: exp.Expr) -> bool: 131 agg_funcs = tuple(expression.find_all(exp.AggFunc)) 132 if agg_funcs: 133 aggregations[expression] = None 134 135 for agg in agg_funcs: 136 for operand in agg.unnest_operands(): 137 if isinstance(operand, exp.Column): 138 continue 139 if operand not in operands: 140 operands[operand] = next_operand_name() 141 142 operand.replace(exp.column(operands[operand], quoted=True)) 143 144 return bool(agg_funcs) 145 146 def set_ops_and_aggs(step) -> None: 147 step.operands = tuple(alias(operand, alias_) for operand, alias_ in operands.items()) 148 step.aggregations = list(aggregations) 149 150 for e in expression.expressions: 151 if e.find(exp.AggFunc): 152 projections.append(exp.column(e.alias_or_name, step.name, quoted=True)) 153 extract_agg_operands(e) 154 else: 155 projections.append(e) 156 157 where: exp.Where | None = expression.args.get("where") 158 159 if where is not None: 160 step.condition = where.this 161 162 group: exp.Group | None = expression.args.get("group") 163 164 if group is not None or aggregations: 165 aggregate = Aggregate() 166 aggregate.source = step.name 167 aggregate.name = step.name 168 169 having: exp.Having | None = expression.args.get("having") 170 171 if having is not None: 172 if extract_agg_operands(exp.alias_(having.this, "_h", quoted=True)): 173 aggregate.condition = exp.column("_h", step.name, quoted=True) 174 else: 175 aggregate.condition = having.this 176 177 set_ops_and_aggs(aggregate) 178 179 # give aggregates names and replace projections with references to them 180 aggregate.group = { 181 f"_g{i}": e for i, e in enumerate(group.expressions if group else []) 182 } 183 184 intermediate: dict[str | exp.Expr, str] = {} 185 for k, v in aggregate.group.items(): 186 intermediate[v] = k 187 if isinstance(v, exp.Column): 188 intermediate[v.name] = k 189 190 for projection in projections: 191 for node in projection.walk(): 192 name = intermediate.get(node) 193 if name: 194 node.replace(exp.column(name, step.name)) 195 196 if aggregate.condition: 197 for node in aggregate.condition.walk(): 198 name = intermediate.get(node) or intermediate.get(node.name) 199 if name: 200 node.replace(exp.column(name, step.name)) 201 202 aggregate.add_dependency(step) 203 step = aggregate 204 else: 205 aggregate = None 206 207 order: exp.Order | None = expression.args.get("order") 208 209 if order is not None: 210 if aggregate is not None and isinstance(step, Aggregate): 211 for i, ordered in enumerate(order.expressions): 212 if extract_agg_operands(exp.alias_(ordered.this, f"_o_{i}", quoted=True)): 213 ordered.this.replace(exp.column(f"_o_{i}", step.name, quoted=True)) 214 215 set_ops_and_aggs(aggregate) 216 217 sort = Sort() 218 sort.name = step.name 219 sort.key = order.expressions 220 sort.add_dependency(step) 221 step = sort 222 223 step.projections = projections 224 225 if isinstance(expression, exp.Select) and expression.args.get("distinct"): 226 distinct = Aggregate() 227 distinct.source = step.name 228 distinct.name = step.name 229 distinct.group = { 230 e.alias_or_name: exp.column(col=e.alias_or_name, table=step.name) 231 for e in projections or expression.expressions 232 } 233 distinct.add_dependency(step) 234 step = distinct 235 236 limit: exp.Limit | None = expression.args.get("limit") 237 238 if limit is not None: 239 step.limit = int(limit.text("expression")) 240 241 return step 242 243 def __init__(self) -> None: 244 self.name: str | None = None 245 self.dependencies: set[Step] = set() 246 self.dependents: set[Step] = set() 247 self.projections: Sequence[exp.Expr] = [] 248 self.limit: float = math.inf 249 self.condition: exp.Expr | None = None 250 251 def add_dependency(self, dependency: Step) -> None: 252 self.dependencies.add(dependency) 253 dependency.dependents.add(self) 254 255 def __repr__(self) -> str: 256 return self.to_s() 257 258 def to_s(self, level: int = 0) -> str: 259 indent = " " * level 260 nested = f"{indent} " 261 262 context = self._to_s(f"{nested} ") 263 264 if context: 265 context = [f"{nested}Context:"] + context 266 267 lines = [ 268 f"{indent}- {self.id}", 269 *context, 270 f"{nested}Projections:", 271 ] 272 273 for expression in self.projections: 274 lines.append(f"{nested} - {expression.sql()}") 275 276 if self.condition: 277 lines.append(f"{nested}Condition: {self.condition.sql()}") 278 279 if self.limit is not math.inf: 280 lines.append(f"{nested}Limit: {self.limit}") 281 282 if self.dependencies: 283 lines.append(f"{nested}Dependencies:") 284 for dependency in self.dependencies: 285 lines.append(" " + dependency.to_s(level + 1)) 286 287 return "\n".join(lines) 288 289 @property 290 def type_name(self) -> str: 291 return self.__class__.__name__ 292 293 @property 294 def id(self) -> str: 295 name = self.name 296 name = f" {name}" if name else "" 297 return f"{self.type_name}:{name} ({id(self)})" 298 299 def _to_s(self, _indent: str) -> list[str]: 300 return [] 301 302 303class Scan(Step): 304 @classmethod 305 def from_expression(cls, expression: exp.Expr, ctes: dict[str, Step] | None = None) -> Step: 306 table: exp.Expr = expression 307 alias_ = expression.alias_or_name 308 309 if isinstance(expression, exp.Subquery): 310 table = expression.this 311 step = Step.from_expression(table, ctes) 312 step.name = alias_ 313 return step 314 315 step = Scan() 316 step.name = alias_ 317 step.source = expression 318 if ctes and table.name in ctes: 319 step.add_dependency(ctes[table.name]) 320 321 return step 322 323 def __init__(self) -> None: 324 super().__init__() 325 self.source: exp.Expr | None = None 326 327 def _to_s(self, indent: str) -> list[str]: 328 return [f"{indent}Source: {self.source.sql() if self.source else '-static-'}"] # type: ignore 329 330 331class Join(Step): 332 @classmethod 333 def from_joins(cls, joins: Iterable[exp.Join], ctes: dict[str, Step] | None = None) -> Join: 334 step = Join() 335 336 for join in joins: 337 source_key, join_key, condition = join_condition(join) 338 step.joins[join.alias_or_name] = { 339 "side": join.side, # type: ignore 340 "join_key": join_key, 341 "source_key": source_key, 342 "condition": condition, 343 } 344 345 step.add_dependency(Scan.from_expression(join.this, ctes)) 346 347 return step 348 349 def __init__(self) -> None: 350 super().__init__() 351 self.source_name: str | None = None 352 self.joins: dict[str, dict[str, list[str] | exp.Expr | list[exp.Expr]]] = {} 353 354 def _to_s(self, indent: str) -> list[str]: 355 lines = [f"{indent}Source: {self.source_name or self.name}"] 356 for name, join in self.joins.items(): 357 lines.append(f"{indent}{name}: {join['side'] or 'INNER'}") 358 join_key = ", ".join(str(key) for key in t.cast(list[str], join.get("join_key") or [])) 359 if join_key: 360 lines.append(f"{indent}Key: {join_key}") 361 if join.get("condition"): 362 lines.append(f"{indent}On: {join['condition'].sql()}") # type: ignore 363 return lines 364 365 366class Aggregate(Step): 367 def __init__(self) -> None: 368 super().__init__() 369 self.aggregations: list[exp.Expr] = [] 370 self.operands: tuple[exp.Expr, ...] = () 371 self.group: dict[str, exp.Expr] = {} 372 self.source: str | None = None 373 374 def _to_s(self, indent: str) -> list[str]: 375 lines = [f"{indent}Aggregations:"] 376 377 for expression in self.aggregations: 378 lines.append(f"{indent} - {expression.sql()}") 379 380 if self.group: 381 lines.append(f"{indent}Group:") 382 for expression in self.group.values(): 383 lines.append(f"{indent} - {expression.sql()}") 384 if self.condition: 385 lines.append(f"{indent}Having:") 386 lines.append(f"{indent} - {self.condition.sql()}") 387 if self.operands: 388 lines.append(f"{indent}Operands:") 389 for expression in self.operands: 390 lines.append(f"{indent} - {expression.sql()}") 391 392 return lines 393 394 395class Sort(Step): 396 def __init__(self) -> None: 397 super().__init__() 398 self.key: list[exp.Expr] | None = None 399 400 def _to_s(self, indent: str) -> list[str]: 401 lines = [f"{indent}Key:"] 402 403 for expression in self.key: # type: ignore 404 lines.append(f"{indent} - {expression.sql()}") 405 406 return lines 407 408 409class SetOperation(Step): 410 def __init__(self, op: type[exp.Expr], left: str, right: str, distinct: bool = False) -> None: 411 super().__init__() 412 self.op: type[exp.Expr] = op 413 self.left: str = left 414 self.right: str = right 415 self.distinct: bool = distinct 416 417 @classmethod 418 def from_expression( 419 cls, expression: exp.Expr, ctes: dict[str, Step] | None = None 420 ) -> SetOperation: 421 assert isinstance(expression, exp.SetOperation) 422 423 left = Step.from_expression(expression.left, ctes) 424 # SELECT 1 UNION SELECT 2 <-- these subqueries don't have names 425 left.name = left.name or "left" 426 right = Step.from_expression(expression.right, ctes) 427 right.name = right.name or "right" 428 step = cls( 429 op=expression.__class__, 430 left=left.name, 431 right=right.name, 432 distinct=bool(expression.args.get("distinct")), 433 ) 434 435 step.add_dependency(left) 436 step.add_dependency(right) 437 438 limit: exp.Limit | None = expression.args.get("limit") 439 440 if limit is not None: 441 step.limit = int(limit.text("expression")) 442 443 return step 444 445 def _to_s(self, indent: str) -> list[str]: 446 lines: list[str] = [] 447 if self.distinct: 448 lines.append(f"{indent}Distinct: {self.distinct}") 449 return lines 450 451 @property 452 def type_name(self) -> str: 453 return self.op.__name__
13class Plan: 14 def __init__(self, expression: exp.Expr) -> None: 15 self.expression: exp.Expr = expression.copy() 16 self.root: Step = Step.from_expression(self.expression) 17 self._dag: dict[Step, set[Step]] = {} 18 19 @property 20 def dag(self) -> dict[Step, set[Step]]: 21 if not self._dag: 22 dag: dict[Step, set[Step]] = {} 23 nodes = {self.root} 24 25 while nodes: 26 node = nodes.pop() 27 dag[node] = set() 28 29 for dep in node.dependencies: 30 dag[node].add(dep) 31 nodes.add(dep) 32 33 self._dag = dag 34 35 return self._dag 36 37 @property 38 def leaves(self) -> Iterator[Step]: 39 return (node for node, deps in self.dag.items() if not deps) 40 41 def __repr__(self) -> str: 42 return f"Plan\n----\n{repr(self.root)}"
19 @property 20 def dag(self) -> dict[Step, set[Step]]: 21 if not self._dag: 22 dag: dict[Step, set[Step]] = {} 23 nodes = {self.root} 24 25 while nodes: 26 node = nodes.pop() 27 dag[node] = set() 28 29 for dep in node.dependencies: 30 dag[node].add(dep) 31 nodes.add(dep) 32 33 self._dag = dag 34 35 return self._dag
45class Step: 46 @classmethod 47 def from_expression(cls, expression: exp.Expr, ctes: dict[str, Step] | None = None) -> Step: 48 """ 49 Builds a DAG of Steps from a SQL expression so that it's easier to execute in an engine. 50 Note: the expression's tables and subqueries must be aliased for this method to work. For 51 example, given the following expression: 52 53 SELECT 54 x.a, 55 SUM(x.b) 56 FROM x AS x 57 JOIN y AS y 58 ON x.a = y.a 59 GROUP BY x.a 60 61 the following DAG is produced (the expression IDs might differ per execution): 62 63 - Aggregate: x (4347984624) 64 Context: 65 Aggregations: 66 - SUM(x.b) 67 Group: 68 - x.a 69 Projections: 70 - x.a 71 - "x"."" 72 Dependencies: 73 - Join: x (4347985296) 74 Context: 75 y: 76 On: x.a = y.a 77 Projections: 78 Dependencies: 79 - Scan: x (4347983136) 80 Context: 81 Source: x AS x 82 Projections: 83 - Scan: y (4343416624) 84 Context: 85 Source: y AS y 86 Projections: 87 88 Args: 89 expression: the expression to build the DAG from. 90 ctes: a dictionary that maps CTEs to their corresponding Step DAG by name. 91 92 Returns: 93 A Step DAG corresponding to `expression`. 94 """ 95 ctes = ctes or {} 96 expression = expression.unnest() 97 with_: exp.With | None = expression.args.get("with_") 98 99 # CTEs break the mold of scope and introduce themselves to all in the context. 100 if with_ is not None: 101 ctes = ctes.copy() 102 for cte in with_.expressions: 103 step = Step.from_expression(cte.this, ctes) 104 step.name = cte.alias 105 ctes[step.name] = step # type: ignore 106 107 from_ = expression.args.get("from_") 108 109 if isinstance(expression, exp.Select) and from_: 110 step = Scan.from_expression(from_.this, ctes) 111 elif isinstance(expression, exp.SetOperation): 112 step = SetOperation.from_expression(expression, ctes) 113 else: 114 step = Scan() 115 116 joins: list[exp.Join] | None = expression.args.get("joins") 117 118 if joins is not None: 119 join = Join.from_joins(joins, ctes) 120 join.name = step.name 121 join.source_name = step.name 122 join.add_dependency(step) 123 step = join 124 # final selects in this chain of steps representing a select 125 projections: list[exp.Expr] = [] 126 # intermediate computations of agg funcs eg x + 1 in SUM(x + 1) 127 operands: dict[exp.Expr, str] = {} 128 aggregations: dict[exp.Expr, None] = {} 129 next_operand_name = name_sequence("_a_") 130 131 def extract_agg_operands(expression: exp.Expr) -> bool: 132 agg_funcs = tuple(expression.find_all(exp.AggFunc)) 133 if agg_funcs: 134 aggregations[expression] = None 135 136 for agg in agg_funcs: 137 for operand in agg.unnest_operands(): 138 if isinstance(operand, exp.Column): 139 continue 140 if operand not in operands: 141 operands[operand] = next_operand_name() 142 143 operand.replace(exp.column(operands[operand], quoted=True)) 144 145 return bool(agg_funcs) 146 147 def set_ops_and_aggs(step) -> None: 148 step.operands = tuple(alias(operand, alias_) for operand, alias_ in operands.items()) 149 step.aggregations = list(aggregations) 150 151 for e in expression.expressions: 152 if e.find(exp.AggFunc): 153 projections.append(exp.column(e.alias_or_name, step.name, quoted=True)) 154 extract_agg_operands(e) 155 else: 156 projections.append(e) 157 158 where: exp.Where | None = expression.args.get("where") 159 160 if where is not None: 161 step.condition = where.this 162 163 group: exp.Group | None = expression.args.get("group") 164 165 if group is not None or aggregations: 166 aggregate = Aggregate() 167 aggregate.source = step.name 168 aggregate.name = step.name 169 170 having: exp.Having | None = expression.args.get("having") 171 172 if having is not None: 173 if extract_agg_operands(exp.alias_(having.this, "_h", quoted=True)): 174 aggregate.condition = exp.column("_h", step.name, quoted=True) 175 else: 176 aggregate.condition = having.this 177 178 set_ops_and_aggs(aggregate) 179 180 # give aggregates names and replace projections with references to them 181 aggregate.group = { 182 f"_g{i}": e for i, e in enumerate(group.expressions if group else []) 183 } 184 185 intermediate: dict[str | exp.Expr, str] = {} 186 for k, v in aggregate.group.items(): 187 intermediate[v] = k 188 if isinstance(v, exp.Column): 189 intermediate[v.name] = k 190 191 for projection in projections: 192 for node in projection.walk(): 193 name = intermediate.get(node) 194 if name: 195 node.replace(exp.column(name, step.name)) 196 197 if aggregate.condition: 198 for node in aggregate.condition.walk(): 199 name = intermediate.get(node) or intermediate.get(node.name) 200 if name: 201 node.replace(exp.column(name, step.name)) 202 203 aggregate.add_dependency(step) 204 step = aggregate 205 else: 206 aggregate = None 207 208 order: exp.Order | None = expression.args.get("order") 209 210 if order is not None: 211 if aggregate is not None and isinstance(step, Aggregate): 212 for i, ordered in enumerate(order.expressions): 213 if extract_agg_operands(exp.alias_(ordered.this, f"_o_{i}", quoted=True)): 214 ordered.this.replace(exp.column(f"_o_{i}", step.name, quoted=True)) 215 216 set_ops_and_aggs(aggregate) 217 218 sort = Sort() 219 sort.name = step.name 220 sort.key = order.expressions 221 sort.add_dependency(step) 222 step = sort 223 224 step.projections = projections 225 226 if isinstance(expression, exp.Select) and expression.args.get("distinct"): 227 distinct = Aggregate() 228 distinct.source = step.name 229 distinct.name = step.name 230 distinct.group = { 231 e.alias_or_name: exp.column(col=e.alias_or_name, table=step.name) 232 for e in projections or expression.expressions 233 } 234 distinct.add_dependency(step) 235 step = distinct 236 237 limit: exp.Limit | None = expression.args.get("limit") 238 239 if limit is not None: 240 step.limit = int(limit.text("expression")) 241 242 return step 243 244 def __init__(self) -> None: 245 self.name: str | None = None 246 self.dependencies: set[Step] = set() 247 self.dependents: set[Step] = set() 248 self.projections: Sequence[exp.Expr] = [] 249 self.limit: float = math.inf 250 self.condition: exp.Expr | None = None 251 252 def add_dependency(self, dependency: Step) -> None: 253 self.dependencies.add(dependency) 254 dependency.dependents.add(self) 255 256 def __repr__(self) -> str: 257 return self.to_s() 258 259 def to_s(self, level: int = 0) -> str: 260 indent = " " * level 261 nested = f"{indent} " 262 263 context = self._to_s(f"{nested} ") 264 265 if context: 266 context = [f"{nested}Context:"] + context 267 268 lines = [ 269 f"{indent}- {self.id}", 270 *context, 271 f"{nested}Projections:", 272 ] 273 274 for expression in self.projections: 275 lines.append(f"{nested} - {expression.sql()}") 276 277 if self.condition: 278 lines.append(f"{nested}Condition: {self.condition.sql()}") 279 280 if self.limit is not math.inf: 281 lines.append(f"{nested}Limit: {self.limit}") 282 283 if self.dependencies: 284 lines.append(f"{nested}Dependencies:") 285 for dependency in self.dependencies: 286 lines.append(" " + dependency.to_s(level + 1)) 287 288 return "\n".join(lines) 289 290 @property 291 def type_name(self) -> str: 292 return self.__class__.__name__ 293 294 @property 295 def id(self) -> str: 296 name = self.name 297 name = f" {name}" if name else "" 298 return f"{self.type_name}:{name} ({id(self)})" 299 300 def _to_s(self, _indent: str) -> list[str]: 301 return []
46 @classmethod 47 def from_expression(cls, expression: exp.Expr, ctes: dict[str, Step] | None = None) -> Step: 48 """ 49 Builds a DAG of Steps from a SQL expression so that it's easier to execute in an engine. 50 Note: the expression's tables and subqueries must be aliased for this method to work. For 51 example, given the following expression: 52 53 SELECT 54 x.a, 55 SUM(x.b) 56 FROM x AS x 57 JOIN y AS y 58 ON x.a = y.a 59 GROUP BY x.a 60 61 the following DAG is produced (the expression IDs might differ per execution): 62 63 - Aggregate: x (4347984624) 64 Context: 65 Aggregations: 66 - SUM(x.b) 67 Group: 68 - x.a 69 Projections: 70 - x.a 71 - "x"."" 72 Dependencies: 73 - Join: x (4347985296) 74 Context: 75 y: 76 On: x.a = y.a 77 Projections: 78 Dependencies: 79 - Scan: x (4347983136) 80 Context: 81 Source: x AS x 82 Projections: 83 - Scan: y (4343416624) 84 Context: 85 Source: y AS y 86 Projections: 87 88 Args: 89 expression: the expression to build the DAG from. 90 ctes: a dictionary that maps CTEs to their corresponding Step DAG by name. 91 92 Returns: 93 A Step DAG corresponding to `expression`. 94 """ 95 ctes = ctes or {} 96 expression = expression.unnest() 97 with_: exp.With | None = expression.args.get("with_") 98 99 # CTEs break the mold of scope and introduce themselves to all in the context. 100 if with_ is not None: 101 ctes = ctes.copy() 102 for cte in with_.expressions: 103 step = Step.from_expression(cte.this, ctes) 104 step.name = cte.alias 105 ctes[step.name] = step # type: ignore 106 107 from_ = expression.args.get("from_") 108 109 if isinstance(expression, exp.Select) and from_: 110 step = Scan.from_expression(from_.this, ctes) 111 elif isinstance(expression, exp.SetOperation): 112 step = SetOperation.from_expression(expression, ctes) 113 else: 114 step = Scan() 115 116 joins: list[exp.Join] | None = expression.args.get("joins") 117 118 if joins is not None: 119 join = Join.from_joins(joins, ctes) 120 join.name = step.name 121 join.source_name = step.name 122 join.add_dependency(step) 123 step = join 124 # final selects in this chain of steps representing a select 125 projections: list[exp.Expr] = [] 126 # intermediate computations of agg funcs eg x + 1 in SUM(x + 1) 127 operands: dict[exp.Expr, str] = {} 128 aggregations: dict[exp.Expr, None] = {} 129 next_operand_name = name_sequence("_a_") 130 131 def extract_agg_operands(expression: exp.Expr) -> bool: 132 agg_funcs = tuple(expression.find_all(exp.AggFunc)) 133 if agg_funcs: 134 aggregations[expression] = None 135 136 for agg in agg_funcs: 137 for operand in agg.unnest_operands(): 138 if isinstance(operand, exp.Column): 139 continue 140 if operand not in operands: 141 operands[operand] = next_operand_name() 142 143 operand.replace(exp.column(operands[operand], quoted=True)) 144 145 return bool(agg_funcs) 146 147 def set_ops_and_aggs(step) -> None: 148 step.operands = tuple(alias(operand, alias_) for operand, alias_ in operands.items()) 149 step.aggregations = list(aggregations) 150 151 for e in expression.expressions: 152 if e.find(exp.AggFunc): 153 projections.append(exp.column(e.alias_or_name, step.name, quoted=True)) 154 extract_agg_operands(e) 155 else: 156 projections.append(e) 157 158 where: exp.Where | None = expression.args.get("where") 159 160 if where is not None: 161 step.condition = where.this 162 163 group: exp.Group | None = expression.args.get("group") 164 165 if group is not None or aggregations: 166 aggregate = Aggregate() 167 aggregate.source = step.name 168 aggregate.name = step.name 169 170 having: exp.Having | None = expression.args.get("having") 171 172 if having is not None: 173 if extract_agg_operands(exp.alias_(having.this, "_h", quoted=True)): 174 aggregate.condition = exp.column("_h", step.name, quoted=True) 175 else: 176 aggregate.condition = having.this 177 178 set_ops_and_aggs(aggregate) 179 180 # give aggregates names and replace projections with references to them 181 aggregate.group = { 182 f"_g{i}": e for i, e in enumerate(group.expressions if group else []) 183 } 184 185 intermediate: dict[str | exp.Expr, str] = {} 186 for k, v in aggregate.group.items(): 187 intermediate[v] = k 188 if isinstance(v, exp.Column): 189 intermediate[v.name] = k 190 191 for projection in projections: 192 for node in projection.walk(): 193 name = intermediate.get(node) 194 if name: 195 node.replace(exp.column(name, step.name)) 196 197 if aggregate.condition: 198 for node in aggregate.condition.walk(): 199 name = intermediate.get(node) or intermediate.get(node.name) 200 if name: 201 node.replace(exp.column(name, step.name)) 202 203 aggregate.add_dependency(step) 204 step = aggregate 205 else: 206 aggregate = None 207 208 order: exp.Order | None = expression.args.get("order") 209 210 if order is not None: 211 if aggregate is not None and isinstance(step, Aggregate): 212 for i, ordered in enumerate(order.expressions): 213 if extract_agg_operands(exp.alias_(ordered.this, f"_o_{i}", quoted=True)): 214 ordered.this.replace(exp.column(f"_o_{i}", step.name, quoted=True)) 215 216 set_ops_and_aggs(aggregate) 217 218 sort = Sort() 219 sort.name = step.name 220 sort.key = order.expressions 221 sort.add_dependency(step) 222 step = sort 223 224 step.projections = projections 225 226 if isinstance(expression, exp.Select) and expression.args.get("distinct"): 227 distinct = Aggregate() 228 distinct.source = step.name 229 distinct.name = step.name 230 distinct.group = { 231 e.alias_or_name: exp.column(col=e.alias_or_name, table=step.name) 232 for e in projections or expression.expressions 233 } 234 distinct.add_dependency(step) 235 step = distinct 236 237 limit: exp.Limit | None = expression.args.get("limit") 238 239 if limit is not None: 240 step.limit = int(limit.text("expression")) 241 242 return step
Builds a DAG of Steps from a SQL expression so that it's easier to execute in an engine. Note: the expression's tables and subqueries must be aliased for this method to work. For example, given the following expression:
SELECT x.a, SUM(x.b) FROM x AS x JOIN y AS y ON x.a = y.a GROUP BY x.a
the following DAG is produced (the expression IDs might differ per execution):
- Aggregate: x (4347984624)
Context:
Aggregations:
- SUM(x.b)
Group:
- x.a
Projections:
- x.a
- "x".""
Dependencies:
- Join: x (4347985296) Context: y: On: x.a = y.a Projections: Dependencies:
- Scan: x (4347983136) Context: Source: x AS x Projections:
- Scan: y (4343416624) Context: Source: y AS y Projections:
Arguments:
- expression: the expression to build the DAG from.
- ctes: a dictionary that maps CTEs to their corresponding Step DAG by name.
Returns:
A Step DAG corresponding to
expression.
259 def to_s(self, level: int = 0) -> str: 260 indent = " " * level 261 nested = f"{indent} " 262 263 context = self._to_s(f"{nested} ") 264 265 if context: 266 context = [f"{nested}Context:"] + context 267 268 lines = [ 269 f"{indent}- {self.id}", 270 *context, 271 f"{nested}Projections:", 272 ] 273 274 for expression in self.projections: 275 lines.append(f"{nested} - {expression.sql()}") 276 277 if self.condition: 278 lines.append(f"{nested}Condition: {self.condition.sql()}") 279 280 if self.limit is not math.inf: 281 lines.append(f"{nested}Limit: {self.limit}") 282 283 if self.dependencies: 284 lines.append(f"{nested}Dependencies:") 285 for dependency in self.dependencies: 286 lines.append(" " + dependency.to_s(level + 1)) 287 288 return "\n".join(lines)
304class Scan(Step): 305 @classmethod 306 def from_expression(cls, expression: exp.Expr, ctes: dict[str, Step] | None = None) -> Step: 307 table: exp.Expr = expression 308 alias_ = expression.alias_or_name 309 310 if isinstance(expression, exp.Subquery): 311 table = expression.this 312 step = Step.from_expression(table, ctes) 313 step.name = alias_ 314 return step 315 316 step = Scan() 317 step.name = alias_ 318 step.source = expression 319 if ctes and table.name in ctes: 320 step.add_dependency(ctes[table.name]) 321 322 return step 323 324 def __init__(self) -> None: 325 super().__init__() 326 self.source: exp.Expr | None = None 327 328 def _to_s(self, indent: str) -> list[str]: 329 return [f"{indent}Source: {self.source.sql() if self.source else '-static-'}"] # type: ignore
305 @classmethod 306 def from_expression(cls, expression: exp.Expr, ctes: dict[str, Step] | None = None) -> Step: 307 table: exp.Expr = expression 308 alias_ = expression.alias_or_name 309 310 if isinstance(expression, exp.Subquery): 311 table = expression.this 312 step = Step.from_expression(table, ctes) 313 step.name = alias_ 314 return step 315 316 step = Scan() 317 step.name = alias_ 318 step.source = expression 319 if ctes and table.name in ctes: 320 step.add_dependency(ctes[table.name]) 321 322 return step
Builds a DAG of Steps from a SQL expression so that it's easier to execute in an engine. Note: the expression's tables and subqueries must be aliased for this method to work. For example, given the following expression:
SELECT x.a, SUM(x.b) FROM x AS x JOIN y AS y ON x.a = y.a GROUP BY x.a
the following DAG is produced (the expression IDs might differ per execution):
- Aggregate: x (4347984624)
Context:
Aggregations:
- SUM(x.b)
Group:
- x.a
Projections:
- x.a
- "x".""
Dependencies:
- Join: x (4347985296) Context: y: On: x.a = y.a Projections: Dependencies:
- Scan: x (4347983136) Context: Source: x AS x Projections:
- Scan: y (4343416624) Context: Source: y AS y Projections:
Arguments:
- expression: the expression to build the DAG from.
- ctes: a dictionary that maps CTEs to their corresponding Step DAG by name.
Returns:
A Step DAG corresponding to
expression.
Inherited Members
332class Join(Step): 333 @classmethod 334 def from_joins(cls, joins: Iterable[exp.Join], ctes: dict[str, Step] | None = None) -> Join: 335 step = Join() 336 337 for join in joins: 338 source_key, join_key, condition = join_condition(join) 339 step.joins[join.alias_or_name] = { 340 "side": join.side, # type: ignore 341 "join_key": join_key, 342 "source_key": source_key, 343 "condition": condition, 344 } 345 346 step.add_dependency(Scan.from_expression(join.this, ctes)) 347 348 return step 349 350 def __init__(self) -> None: 351 super().__init__() 352 self.source_name: str | None = None 353 self.joins: dict[str, dict[str, list[str] | exp.Expr | list[exp.Expr]]] = {} 354 355 def _to_s(self, indent: str) -> list[str]: 356 lines = [f"{indent}Source: {self.source_name or self.name}"] 357 for name, join in self.joins.items(): 358 lines.append(f"{indent}{name}: {join['side'] or 'INNER'}") 359 join_key = ", ".join(str(key) for key in t.cast(list[str], join.get("join_key") or [])) 360 if join_key: 361 lines.append(f"{indent}Key: {join_key}") 362 if join.get("condition"): 363 lines.append(f"{indent}On: {join['condition'].sql()}") # type: ignore 364 return lines
333 @classmethod 334 def from_joins(cls, joins: Iterable[exp.Join], ctes: dict[str, Step] | None = None) -> Join: 335 step = Join() 336 337 for join in joins: 338 source_key, join_key, condition = join_condition(join) 339 step.joins[join.alias_or_name] = { 340 "side": join.side, # type: ignore 341 "join_key": join_key, 342 "source_key": source_key, 343 "condition": condition, 344 } 345 346 step.add_dependency(Scan.from_expression(join.this, ctes)) 347 348 return step
Inherited Members
367class Aggregate(Step): 368 def __init__(self) -> None: 369 super().__init__() 370 self.aggregations: list[exp.Expr] = [] 371 self.operands: tuple[exp.Expr, ...] = () 372 self.group: dict[str, exp.Expr] = {} 373 self.source: str | None = None 374 375 def _to_s(self, indent: str) -> list[str]: 376 lines = [f"{indent}Aggregations:"] 377 378 for expression in self.aggregations: 379 lines.append(f"{indent} - {expression.sql()}") 380 381 if self.group: 382 lines.append(f"{indent}Group:") 383 for expression in self.group.values(): 384 lines.append(f"{indent} - {expression.sql()}") 385 if self.condition: 386 lines.append(f"{indent}Having:") 387 lines.append(f"{indent} - {self.condition.sql()}") 388 if self.operands: 389 lines.append(f"{indent}Operands:") 390 for expression in self.operands: 391 lines.append(f"{indent} - {expression.sql()}") 392 393 return lines
Inherited Members
396class Sort(Step): 397 def __init__(self) -> None: 398 super().__init__() 399 self.key: list[exp.Expr] | None = None 400 401 def _to_s(self, indent: str) -> list[str]: 402 lines = [f"{indent}Key:"] 403 404 for expression in self.key: # type: ignore 405 lines.append(f"{indent} - {expression.sql()}") 406 407 return lines
Inherited Members
410class SetOperation(Step): 411 def __init__(self, op: type[exp.Expr], left: str, right: str, distinct: bool = False) -> None: 412 super().__init__() 413 self.op: type[exp.Expr] = op 414 self.left: str = left 415 self.right: str = right 416 self.distinct: bool = distinct 417 418 @classmethod 419 def from_expression( 420 cls, expression: exp.Expr, ctes: dict[str, Step] | None = None 421 ) -> SetOperation: 422 assert isinstance(expression, exp.SetOperation) 423 424 left = Step.from_expression(expression.left, ctes) 425 # SELECT 1 UNION SELECT 2 <-- these subqueries don't have names 426 left.name = left.name or "left" 427 right = Step.from_expression(expression.right, ctes) 428 right.name = right.name or "right" 429 step = cls( 430 op=expression.__class__, 431 left=left.name, 432 right=right.name, 433 distinct=bool(expression.args.get("distinct")), 434 ) 435 436 step.add_dependency(left) 437 step.add_dependency(right) 438 439 limit: exp.Limit | None = expression.args.get("limit") 440 441 if limit is not None: 442 step.limit = int(limit.text("expression")) 443 444 return step 445 446 def _to_s(self, indent: str) -> list[str]: 447 lines: list[str] = [] 448 if self.distinct: 449 lines.append(f"{indent}Distinct: {self.distinct}") 450 return lines 451 452 @property 453 def type_name(self) -> str: 454 return self.op.__name__
418 @classmethod 419 def from_expression( 420 cls, expression: exp.Expr, ctes: dict[str, Step] | None = None 421 ) -> SetOperation: 422 assert isinstance(expression, exp.SetOperation) 423 424 left = Step.from_expression(expression.left, ctes) 425 # SELECT 1 UNION SELECT 2 <-- these subqueries don't have names 426 left.name = left.name or "left" 427 right = Step.from_expression(expression.right, ctes) 428 right.name = right.name or "right" 429 step = cls( 430 op=expression.__class__, 431 left=left.name, 432 right=right.name, 433 distinct=bool(expression.args.get("distinct")), 434 ) 435 436 step.add_dependency(left) 437 step.add_dependency(right) 438 439 limit: exp.Limit | None = expression.args.get("limit") 440 441 if limit is not None: 442 step.limit = int(limit.text("expression")) 443 444 return step
Builds a DAG of Steps from a SQL expression so that it's easier to execute in an engine. Note: the expression's tables and subqueries must be aliased for this method to work. For example, given the following expression:
SELECT x.a, SUM(x.b) FROM x AS x JOIN y AS y ON x.a = y.a GROUP BY x.a
the following DAG is produced (the expression IDs might differ per execution):
- Aggregate: x (4347984624)
Context:
Aggregations:
- SUM(x.b)
Group:
- x.a
Projections:
- x.a
- "x".""
Dependencies:
- Join: x (4347985296) Context: y: On: x.a = y.a Projections: Dependencies:
- Scan: x (4347983136) Context: Source: x AS x Projections:
- Scan: y (4343416624) Context: Source: y AS y Projections:
Arguments:
- expression: the expression to build the DAG from.
- ctes: a dictionary that maps CTEs to their corresponding Step DAG by name.
Returns:
A Step DAG corresponding to
expression.