sqlglot.executor.python
1import collections 2import itertools 3import math 4 5from sqlglot import exp, planner, tokens 6from sqlglot.dialects.dialect import Dialect 7from sqlglot.errors import ExecuteError 8from sqlglot.executor.context import Context 9from sqlglot.executor.env import ENV 10from sqlglot.executor.table import RowReader, Table 11from sqlglot.generators.python import PythonGenerator 12 13 14class PythonExecutor: 15 def __init__(self, env=None, tables=None): 16 self.generator = Python().generator(identify=True, comments=False) 17 self.env = {**ENV, **(env or {})} 18 self.tables = tables or {} 19 20 def execute(self, plan): 21 finished = set() 22 queue = set(plan.leaves) 23 contexts = {} 24 25 while queue: 26 node = queue.pop() 27 try: 28 context = self.context( 29 { 30 name: table 31 for dep in node.dependencies 32 for name, table in contexts[dep].tables.items() 33 } 34 ) 35 36 if isinstance(node, planner.Scan): 37 contexts[node] = self.scan(node, context) 38 elif isinstance(node, planner.Aggregate): 39 contexts[node] = self.aggregate(node, context) 40 elif isinstance(node, planner.Join): 41 contexts[node] = self.join(node, context) 42 elif isinstance(node, planner.Sort): 43 contexts[node] = self.sort(node, context) 44 elif isinstance(node, planner.SetOperation): 45 contexts[node] = self.set_operation(node, context) 46 else: 47 raise NotImplementedError 48 49 finished.add(node) 50 51 for dep in node.dependents: 52 if all(d in contexts for d in dep.dependencies): 53 queue.add(dep) 54 55 for dep in node.dependencies: 56 if all(d in finished for d in dep.dependents): 57 contexts.pop(dep) 58 except Exception as e: 59 raise ExecuteError(f"Step '{node.id}' failed: {e}") from e 60 61 root = plan.root 62 return contexts[root].tables[root.name] 63 64 def generate(self, expression): 65 """Convert a SQL expression into literal Python code and compile it into bytecode.""" 66 if not expression: 67 return None 68 69 sql = self.generator.generate(expression) 70 return compile(sql, sql, "eval", optimize=2) 71 72 def generate_tuple(self, expressions): 73 """Convert an array of SQL expressions into tuple of Python byte code.""" 74 if not expressions: 75 return tuple() 76 return tuple(self.generate(expression) for expression in expressions) 77 78 def context(self, tables): 79 return Context(tables, env=self.env) 80 81 def table(self, expressions): 82 return Table( 83 expression.alias_or_name if isinstance(expression, exp.Expr) else expression 84 for expression in expressions 85 ) 86 87 def scan(self, step, context): 88 source = step.source 89 90 if source and isinstance(source, exp.Expr): 91 source = source.name or source.alias 92 93 if source is None: 94 context, table_iter = self.static() 95 elif source in context: 96 if not step.projections and not step.condition: 97 return self.context({step.name: context.tables[source]}) 98 table_iter = context.table_iter(source) 99 else: 100 context, table_iter = self.scan_table(step) 101 102 return self.context({step.name: self._project_and_filter(context, step, table_iter)}) 103 104 def _project_and_filter(self, context, step, table_iter): 105 sink = self.table(step.projections if step.projections else context.columns) 106 condition = self.generate(step.condition) 107 projections = self.generate_tuple(step.projections) 108 109 for reader in table_iter: 110 if len(sink) >= step.limit: 111 break 112 113 if condition and not context.eval(condition): 114 continue 115 116 if projections: 117 sink.append(context.eval_tuple(projections)) 118 else: 119 sink.append(reader.row) 120 121 return sink 122 123 def static(self): 124 return self.context({}), [RowReader(())] 125 126 def scan_table(self, step): 127 table = self.tables.find(step.source) 128 context = self.context({step.source.alias_or_name: table}) 129 return context, iter(table) 130 131 def join(self, step, context): 132 source = step.source_name 133 134 source_table = context.tables[source] 135 source_context = self.context({source: source_table}) 136 column_ranges = {source: range(0, len(source_table.columns))} 137 138 for name, join in step.joins.items(): 139 table = context.tables[name] 140 start = max(r.stop for r in column_ranges.values()) 141 column_ranges[name] = range(start, len(table.columns) + start) 142 join_context = self.context({name: table}) 143 144 if join.get("source_key"): 145 table = self.hash_join(join, source_context, join_context) 146 else: 147 table = self.nested_loop_join(join, source_context, join_context) 148 149 source_context = self.context( 150 { 151 name: Table(table.columns, table.rows, column_range) 152 for name, column_range in column_ranges.items() 153 } 154 ) 155 condition = self.generate(join["condition"]) 156 if condition: 157 source_context.filter(condition) 158 159 if not step.condition and not step.projections: 160 return source_context 161 162 sink = self._project_and_filter( 163 source_context, 164 step, 165 (reader for reader, _ in iter(source_context)), 166 ) 167 168 if step.projections: 169 return self.context({step.name: sink}) 170 else: 171 return self.context( 172 { 173 name: Table(table.columns, sink.rows, table.column_range) 174 for name, table in source_context.tables.items() 175 } 176 ) 177 178 def nested_loop_join(self, _join, source_context, join_context): 179 table = Table(source_context.columns + join_context.columns) 180 181 for reader_a, _ in source_context: 182 for reader_b, _ in join_context: 183 table.append(reader_a.row + reader_b.row) 184 185 return table 186 187 def hash_join(self, join, source_context, join_context): 188 source_key = self.generate_tuple(join["source_key"]) 189 join_key = self.generate_tuple(join["join_key"]) 190 left = join.get("side") == "LEFT" 191 right = join.get("side") == "RIGHT" 192 193 results = collections.defaultdict(lambda: ([], [])) 194 195 for reader, ctx in source_context: 196 results[ctx.eval_tuple(source_key)][0].append(reader.row) 197 for reader, ctx in join_context: 198 results[ctx.eval_tuple(join_key)][1].append(reader.row) 199 200 table = Table(source_context.columns + join_context.columns) 201 nulls = [(None,) * len(join_context.columns if left else source_context.columns)] 202 203 for a_group, b_group in results.values(): 204 if left: 205 b_group = b_group or nulls 206 elif right: 207 a_group = a_group or nulls 208 209 for a_row, b_row in itertools.product(a_group, b_group): 210 table.append(a_row + b_row) 211 212 return table 213 214 def aggregate(self, step, context): 215 group_by = self.generate_tuple(step.group.values()) 216 aggregations = self.generate_tuple(step.aggregations) 217 operands = self.generate_tuple(step.operands) 218 219 if operands: 220 operand_table = Table(self.table(step.operands).columns) 221 222 for reader, ctx in context: 223 operand_table.append(ctx.eval_tuple(operands)) 224 225 for i, (a, b) in enumerate(zip(context.table.rows, operand_table.rows)): 226 context.table.rows[i] = a + b 227 228 width = len(context.columns) 229 context.add_columns(*operand_table.columns) 230 231 operand_table = Table( 232 context.columns, 233 context.table.rows, 234 range(width, width + len(operand_table.columns)), 235 ) 236 237 context = self.context( 238 { 239 None: operand_table, 240 **context.tables, 241 } 242 ) 243 244 context.sort(group_by) 245 246 group = None 247 start = 0 248 end = 1 249 length = len(context.table) 250 table = self.table(list(step.group) + step.aggregations) 251 252 def add_row(): 253 table.append(group + context.eval_tuple(aggregations)) 254 255 if length: 256 for i in range(length): 257 context.set_index(i) 258 key = context.eval_tuple(group_by) 259 group = key if group is None else group 260 end += 1 261 if key != group: 262 context.set_range(start, end - 2) 263 add_row() 264 group = key 265 start = end - 2 266 if len(table.rows) >= step.limit: 267 break 268 if i == length - 1: 269 context.set_range(start, end - 1) 270 add_row() 271 elif step.limit > 0 and not group_by: 272 context.set_range(0, 0) 273 table.append(context.eval_tuple(aggregations)) 274 275 context = self.context({step.name: table, **{name: table for name in context.tables}}) 276 277 if step.projections or step.condition: 278 return self.scan(step, context) 279 return context 280 281 def sort(self, step, context): 282 projections = self.generate_tuple(step.projections) 283 projection_columns = [p.alias_or_name for p in step.projections] 284 all_columns = list(context.columns) + projection_columns 285 sink = self.table(all_columns) 286 for reader, ctx in context: 287 sink.append(reader.row + ctx.eval_tuple(projections)) 288 289 sort_ctx = self.context( 290 { 291 None: sink, 292 **{table: sink for table in context.tables}, 293 } 294 ) 295 sort_ctx.sort(self.generate_tuple(step.key)) 296 297 if not math.isinf(step.limit): 298 sort_ctx.table.rows = sort_ctx.table.rows[0 : step.limit] 299 300 output = Table( 301 projection_columns, 302 rows=[r[len(context.columns) : len(all_columns)] for r in sort_ctx.table.rows], 303 ) 304 return self.context({step.name: output}) 305 306 def set_operation(self, step, context): 307 left = context.tables[step.left] 308 right = context.tables[step.right] 309 310 sink = self.table(left.columns) 311 312 if issubclass(step.op, exp.Intersect): 313 sink.rows = list(set(left.rows).intersection(set(right.rows))) 314 elif issubclass(step.op, exp.Except): 315 sink.rows = list(set(left.rows).difference(set(right.rows))) 316 elif issubclass(step.op, exp.Union) and step.distinct: 317 sink.rows = list(set(left.rows).union(set(right.rows))) 318 else: 319 sink.rows = left.rows + right.rows 320 321 if not math.isinf(step.limit): 322 sink.rows = sink.rows[0 : step.limit] 323 324 return self.context({step.name: sink}) 325 326 327class Python(Dialect): 328 class Tokenizer(tokens.Tokenizer): 329 STRING_ESCAPES = ["\\"] 330 331 Generator = PythonGenerator
class
PythonExecutor:
15class PythonExecutor: 16 def __init__(self, env=None, tables=None): 17 self.generator = Python().generator(identify=True, comments=False) 18 self.env = {**ENV, **(env or {})} 19 self.tables = tables or {} 20 21 def execute(self, plan): 22 finished = set() 23 queue = set(plan.leaves) 24 contexts = {} 25 26 while queue: 27 node = queue.pop() 28 try: 29 context = self.context( 30 { 31 name: table 32 for dep in node.dependencies 33 for name, table in contexts[dep].tables.items() 34 } 35 ) 36 37 if isinstance(node, planner.Scan): 38 contexts[node] = self.scan(node, context) 39 elif isinstance(node, planner.Aggregate): 40 contexts[node] = self.aggregate(node, context) 41 elif isinstance(node, planner.Join): 42 contexts[node] = self.join(node, context) 43 elif isinstance(node, planner.Sort): 44 contexts[node] = self.sort(node, context) 45 elif isinstance(node, planner.SetOperation): 46 contexts[node] = self.set_operation(node, context) 47 else: 48 raise NotImplementedError 49 50 finished.add(node) 51 52 for dep in node.dependents: 53 if all(d in contexts for d in dep.dependencies): 54 queue.add(dep) 55 56 for dep in node.dependencies: 57 if all(d in finished for d in dep.dependents): 58 contexts.pop(dep) 59 except Exception as e: 60 raise ExecuteError(f"Step '{node.id}' failed: {e}") from e 61 62 root = plan.root 63 return contexts[root].tables[root.name] 64 65 def generate(self, expression): 66 """Convert a SQL expression into literal Python code and compile it into bytecode.""" 67 if not expression: 68 return None 69 70 sql = self.generator.generate(expression) 71 return compile(sql, sql, "eval", optimize=2) 72 73 def generate_tuple(self, expressions): 74 """Convert an array of SQL expressions into tuple of Python byte code.""" 75 if not expressions: 76 return tuple() 77 return tuple(self.generate(expression) for expression in expressions) 78 79 def context(self, tables): 80 return Context(tables, env=self.env) 81 82 def table(self, expressions): 83 return Table( 84 expression.alias_or_name if isinstance(expression, exp.Expr) else expression 85 for expression in expressions 86 ) 87 88 def scan(self, step, context): 89 source = step.source 90 91 if source and isinstance(source, exp.Expr): 92 source = source.name or source.alias 93 94 if source is None: 95 context, table_iter = self.static() 96 elif source in context: 97 if not step.projections and not step.condition: 98 return self.context({step.name: context.tables[source]}) 99 table_iter = context.table_iter(source) 100 else: 101 context, table_iter = self.scan_table(step) 102 103 return self.context({step.name: self._project_and_filter(context, step, table_iter)}) 104 105 def _project_and_filter(self, context, step, table_iter): 106 sink = self.table(step.projections if step.projections else context.columns) 107 condition = self.generate(step.condition) 108 projections = self.generate_tuple(step.projections) 109 110 for reader in table_iter: 111 if len(sink) >= step.limit: 112 break 113 114 if condition and not context.eval(condition): 115 continue 116 117 if projections: 118 sink.append(context.eval_tuple(projections)) 119 else: 120 sink.append(reader.row) 121 122 return sink 123 124 def static(self): 125 return self.context({}), [RowReader(())] 126 127 def scan_table(self, step): 128 table = self.tables.find(step.source) 129 context = self.context({step.source.alias_or_name: table}) 130 return context, iter(table) 131 132 def join(self, step, context): 133 source = step.source_name 134 135 source_table = context.tables[source] 136 source_context = self.context({source: source_table}) 137 column_ranges = {source: range(0, len(source_table.columns))} 138 139 for name, join in step.joins.items(): 140 table = context.tables[name] 141 start = max(r.stop for r in column_ranges.values()) 142 column_ranges[name] = range(start, len(table.columns) + start) 143 join_context = self.context({name: table}) 144 145 if join.get("source_key"): 146 table = self.hash_join(join, source_context, join_context) 147 else: 148 table = self.nested_loop_join(join, source_context, join_context) 149 150 source_context = self.context( 151 { 152 name: Table(table.columns, table.rows, column_range) 153 for name, column_range in column_ranges.items() 154 } 155 ) 156 condition = self.generate(join["condition"]) 157 if condition: 158 source_context.filter(condition) 159 160 if not step.condition and not step.projections: 161 return source_context 162 163 sink = self._project_and_filter( 164 source_context, 165 step, 166 (reader for reader, _ in iter(source_context)), 167 ) 168 169 if step.projections: 170 return self.context({step.name: sink}) 171 else: 172 return self.context( 173 { 174 name: Table(table.columns, sink.rows, table.column_range) 175 for name, table in source_context.tables.items() 176 } 177 ) 178 179 def nested_loop_join(self, _join, source_context, join_context): 180 table = Table(source_context.columns + join_context.columns) 181 182 for reader_a, _ in source_context: 183 for reader_b, _ in join_context: 184 table.append(reader_a.row + reader_b.row) 185 186 return table 187 188 def hash_join(self, join, source_context, join_context): 189 source_key = self.generate_tuple(join["source_key"]) 190 join_key = self.generate_tuple(join["join_key"]) 191 left = join.get("side") == "LEFT" 192 right = join.get("side") == "RIGHT" 193 194 results = collections.defaultdict(lambda: ([], [])) 195 196 for reader, ctx in source_context: 197 results[ctx.eval_tuple(source_key)][0].append(reader.row) 198 for reader, ctx in join_context: 199 results[ctx.eval_tuple(join_key)][1].append(reader.row) 200 201 table = Table(source_context.columns + join_context.columns) 202 nulls = [(None,) * len(join_context.columns if left else source_context.columns)] 203 204 for a_group, b_group in results.values(): 205 if left: 206 b_group = b_group or nulls 207 elif right: 208 a_group = a_group or nulls 209 210 for a_row, b_row in itertools.product(a_group, b_group): 211 table.append(a_row + b_row) 212 213 return table 214 215 def aggregate(self, step, context): 216 group_by = self.generate_tuple(step.group.values()) 217 aggregations = self.generate_tuple(step.aggregations) 218 operands = self.generate_tuple(step.operands) 219 220 if operands: 221 operand_table = Table(self.table(step.operands).columns) 222 223 for reader, ctx in context: 224 operand_table.append(ctx.eval_tuple(operands)) 225 226 for i, (a, b) in enumerate(zip(context.table.rows, operand_table.rows)): 227 context.table.rows[i] = a + b 228 229 width = len(context.columns) 230 context.add_columns(*operand_table.columns) 231 232 operand_table = Table( 233 context.columns, 234 context.table.rows, 235 range(width, width + len(operand_table.columns)), 236 ) 237 238 context = self.context( 239 { 240 None: operand_table, 241 **context.tables, 242 } 243 ) 244 245 context.sort(group_by) 246 247 group = None 248 start = 0 249 end = 1 250 length = len(context.table) 251 table = self.table(list(step.group) + step.aggregations) 252 253 def add_row(): 254 table.append(group + context.eval_tuple(aggregations)) 255 256 if length: 257 for i in range(length): 258 context.set_index(i) 259 key = context.eval_tuple(group_by) 260 group = key if group is None else group 261 end += 1 262 if key != group: 263 context.set_range(start, end - 2) 264 add_row() 265 group = key 266 start = end - 2 267 if len(table.rows) >= step.limit: 268 break 269 if i == length - 1: 270 context.set_range(start, end - 1) 271 add_row() 272 elif step.limit > 0 and not group_by: 273 context.set_range(0, 0) 274 table.append(context.eval_tuple(aggregations)) 275 276 context = self.context({step.name: table, **{name: table for name in context.tables}}) 277 278 if step.projections or step.condition: 279 return self.scan(step, context) 280 return context 281 282 def sort(self, step, context): 283 projections = self.generate_tuple(step.projections) 284 projection_columns = [p.alias_or_name for p in step.projections] 285 all_columns = list(context.columns) + projection_columns 286 sink = self.table(all_columns) 287 for reader, ctx in context: 288 sink.append(reader.row + ctx.eval_tuple(projections)) 289 290 sort_ctx = self.context( 291 { 292 None: sink, 293 **{table: sink for table in context.tables}, 294 } 295 ) 296 sort_ctx.sort(self.generate_tuple(step.key)) 297 298 if not math.isinf(step.limit): 299 sort_ctx.table.rows = sort_ctx.table.rows[0 : step.limit] 300 301 output = Table( 302 projection_columns, 303 rows=[r[len(context.columns) : len(all_columns)] for r in sort_ctx.table.rows], 304 ) 305 return self.context({step.name: output}) 306 307 def set_operation(self, step, context): 308 left = context.tables[step.left] 309 right = context.tables[step.right] 310 311 sink = self.table(left.columns) 312 313 if issubclass(step.op, exp.Intersect): 314 sink.rows = list(set(left.rows).intersection(set(right.rows))) 315 elif issubclass(step.op, exp.Except): 316 sink.rows = list(set(left.rows).difference(set(right.rows))) 317 elif issubclass(step.op, exp.Union) and step.distinct: 318 sink.rows = list(set(left.rows).union(set(right.rows))) 319 else: 320 sink.rows = left.rows + right.rows 321 322 if not math.isinf(step.limit): 323 sink.rows = sink.rows[0 : step.limit] 324 325 return self.context({step.name: sink})
def
execute(self, plan):
21 def execute(self, plan): 22 finished = set() 23 queue = set(plan.leaves) 24 contexts = {} 25 26 while queue: 27 node = queue.pop() 28 try: 29 context = self.context( 30 { 31 name: table 32 for dep in node.dependencies 33 for name, table in contexts[dep].tables.items() 34 } 35 ) 36 37 if isinstance(node, planner.Scan): 38 contexts[node] = self.scan(node, context) 39 elif isinstance(node, planner.Aggregate): 40 contexts[node] = self.aggregate(node, context) 41 elif isinstance(node, planner.Join): 42 contexts[node] = self.join(node, context) 43 elif isinstance(node, planner.Sort): 44 contexts[node] = self.sort(node, context) 45 elif isinstance(node, planner.SetOperation): 46 contexts[node] = self.set_operation(node, context) 47 else: 48 raise NotImplementedError 49 50 finished.add(node) 51 52 for dep in node.dependents: 53 if all(d in contexts for d in dep.dependencies): 54 queue.add(dep) 55 56 for dep in node.dependencies: 57 if all(d in finished for d in dep.dependents): 58 contexts.pop(dep) 59 except Exception as e: 60 raise ExecuteError(f"Step '{node.id}' failed: {e}") from e 61 62 root = plan.root 63 return contexts[root].tables[root.name]
def
generate(self, expression):
65 def generate(self, expression): 66 """Convert a SQL expression into literal Python code and compile it into bytecode.""" 67 if not expression: 68 return None 69 70 sql = self.generator.generate(expression) 71 return compile(sql, sql, "eval", optimize=2)
Convert a SQL expression into literal Python code and compile it into bytecode.
def
generate_tuple(self, expressions):
73 def generate_tuple(self, expressions): 74 """Convert an array of SQL expressions into tuple of Python byte code.""" 75 if not expressions: 76 return tuple() 77 return tuple(self.generate(expression) for expression in expressions)
Convert an array of SQL expressions into tuple of Python byte code.
def
scan(self, step, context):
88 def scan(self, step, context): 89 source = step.source 90 91 if source and isinstance(source, exp.Expr): 92 source = source.name or source.alias 93 94 if source is None: 95 context, table_iter = self.static() 96 elif source in context: 97 if not step.projections and not step.condition: 98 return self.context({step.name: context.tables[source]}) 99 table_iter = context.table_iter(source) 100 else: 101 context, table_iter = self.scan_table(step) 102 103 return self.context({step.name: self._project_and_filter(context, step, table_iter)})
def
join(self, step, context):
132 def join(self, step, context): 133 source = step.source_name 134 135 source_table = context.tables[source] 136 source_context = self.context({source: source_table}) 137 column_ranges = {source: range(0, len(source_table.columns))} 138 139 for name, join in step.joins.items(): 140 table = context.tables[name] 141 start = max(r.stop for r in column_ranges.values()) 142 column_ranges[name] = range(start, len(table.columns) + start) 143 join_context = self.context({name: table}) 144 145 if join.get("source_key"): 146 table = self.hash_join(join, source_context, join_context) 147 else: 148 table = self.nested_loop_join(join, source_context, join_context) 149 150 source_context = self.context( 151 { 152 name: Table(table.columns, table.rows, column_range) 153 for name, column_range in column_ranges.items() 154 } 155 ) 156 condition = self.generate(join["condition"]) 157 if condition: 158 source_context.filter(condition) 159 160 if not step.condition and not step.projections: 161 return source_context 162 163 sink = self._project_and_filter( 164 source_context, 165 step, 166 (reader for reader, _ in iter(source_context)), 167 ) 168 169 if step.projections: 170 return self.context({step.name: sink}) 171 else: 172 return self.context( 173 { 174 name: Table(table.columns, sink.rows, table.column_range) 175 for name, table in source_context.tables.items() 176 } 177 )
def
hash_join(self, join, source_context, join_context):
188 def hash_join(self, join, source_context, join_context): 189 source_key = self.generate_tuple(join["source_key"]) 190 join_key = self.generate_tuple(join["join_key"]) 191 left = join.get("side") == "LEFT" 192 right = join.get("side") == "RIGHT" 193 194 results = collections.defaultdict(lambda: ([], [])) 195 196 for reader, ctx in source_context: 197 results[ctx.eval_tuple(source_key)][0].append(reader.row) 198 for reader, ctx in join_context: 199 results[ctx.eval_tuple(join_key)][1].append(reader.row) 200 201 table = Table(source_context.columns + join_context.columns) 202 nulls = [(None,) * len(join_context.columns if left else source_context.columns)] 203 204 for a_group, b_group in results.values(): 205 if left: 206 b_group = b_group or nulls 207 elif right: 208 a_group = a_group or nulls 209 210 for a_row, b_row in itertools.product(a_group, b_group): 211 table.append(a_row + b_row) 212 213 return table
def
aggregate(self, step, context):
215 def aggregate(self, step, context): 216 group_by = self.generate_tuple(step.group.values()) 217 aggregations = self.generate_tuple(step.aggregations) 218 operands = self.generate_tuple(step.operands) 219 220 if operands: 221 operand_table = Table(self.table(step.operands).columns) 222 223 for reader, ctx in context: 224 operand_table.append(ctx.eval_tuple(operands)) 225 226 for i, (a, b) in enumerate(zip(context.table.rows, operand_table.rows)): 227 context.table.rows[i] = a + b 228 229 width = len(context.columns) 230 context.add_columns(*operand_table.columns) 231 232 operand_table = Table( 233 context.columns, 234 context.table.rows, 235 range(width, width + len(operand_table.columns)), 236 ) 237 238 context = self.context( 239 { 240 None: operand_table, 241 **context.tables, 242 } 243 ) 244 245 context.sort(group_by) 246 247 group = None 248 start = 0 249 end = 1 250 length = len(context.table) 251 table = self.table(list(step.group) + step.aggregations) 252 253 def add_row(): 254 table.append(group + context.eval_tuple(aggregations)) 255 256 if length: 257 for i in range(length): 258 context.set_index(i) 259 key = context.eval_tuple(group_by) 260 group = key if group is None else group 261 end += 1 262 if key != group: 263 context.set_range(start, end - 2) 264 add_row() 265 group = key 266 start = end - 2 267 if len(table.rows) >= step.limit: 268 break 269 if i == length - 1: 270 context.set_range(start, end - 1) 271 add_row() 272 elif step.limit > 0 and not group_by: 273 context.set_range(0, 0) 274 table.append(context.eval_tuple(aggregations)) 275 276 context = self.context({step.name: table, **{name: table for name in context.tables}}) 277 278 if step.projections or step.condition: 279 return self.scan(step, context) 280 return context
def
sort(self, step, context):
282 def sort(self, step, context): 283 projections = self.generate_tuple(step.projections) 284 projection_columns = [p.alias_or_name for p in step.projections] 285 all_columns = list(context.columns) + projection_columns 286 sink = self.table(all_columns) 287 for reader, ctx in context: 288 sink.append(reader.row + ctx.eval_tuple(projections)) 289 290 sort_ctx = self.context( 291 { 292 None: sink, 293 **{table: sink for table in context.tables}, 294 } 295 ) 296 sort_ctx.sort(self.generate_tuple(step.key)) 297 298 if not math.isinf(step.limit): 299 sort_ctx.table.rows = sort_ctx.table.rows[0 : step.limit] 300 301 output = Table( 302 projection_columns, 303 rows=[r[len(context.columns) : len(all_columns)] for r in sort_ctx.table.rows], 304 ) 305 return self.context({step.name: output})
def
set_operation(self, step, context):
307 def set_operation(self, step, context): 308 left = context.tables[step.left] 309 right = context.tables[step.right] 310 311 sink = self.table(left.columns) 312 313 if issubclass(step.op, exp.Intersect): 314 sink.rows = list(set(left.rows).intersection(set(right.rows))) 315 elif issubclass(step.op, exp.Except): 316 sink.rows = list(set(left.rows).difference(set(right.rows))) 317 elif issubclass(step.op, exp.Union) and step.distinct: 318 sink.rows = list(set(left.rows).union(set(right.rows))) 319 else: 320 sink.rows = left.rows + right.rows 321 322 if not math.isinf(step.limit): 323 sink.rows = sink.rows[0 : step.limit] 324 325 return self.context({step.name: sink})
class
Python(sqlglot.dialects.dialect.Dialect):
328class Python(Dialect): 329 class Tokenizer(tokens.Tokenizer): 330 STRING_ESCAPES = ["\\"] 331 332 Generator = PythonGenerator
Generator =
<class 'sqlglot.generators.python.PythonGenerator'>
UNESCAPED_SEQUENCES: dict[str, str] =
{'\\a': '\x07', '\\b': '\x08', '\\f': '\x0c', '\\n': '\n', '\\r': '\r', '\\t': '\t', '\\v': '\x0b', '\\\\': '\\'}
Mapping of an escaped sequence (\n) to its unescaped version (
).
STRINGS_SUPPORT_ESCAPED_SEQUENCES: bool =
True
Whether string literals support escape sequences (e.g. \n). Set by the metaclass based on the tokenizer's STRING_ESCAPES.
BYTE_STRINGS_SUPPORT_ESCAPED_SEQUENCES: bool =
True
Whether byte string literals support escape sequences. Set by the metaclass based on the tokenizer's BYTE_STRING_ESCAPES.
tokenizer_class =
<class 'Python.Tokenizer'>
parser_class =
<class 'sqlglot.parsers.base.BaseParser'>
generator_class =
<class 'sqlglot.generators.python.PythonGenerator'>
ESCAPED_SEQUENCES: dict[str, str] =
{'\x07': '\\a', '\x08': '\\b', '\x0c': '\\f', '\n': '\\n', '\r': '\\r', '\t': '\\t', '\x0b': '\\v', '\\': '\\\\'}
VALID_INTERVAL_UNITS: set[str] =
{'WEEK_ISO', 'DAY OF YEAR', 'MSEC', 'MINUTES', 'QUARTER', 'NSECOND', 'SECS', 'DAYOFMONTH', 'NSECONDS', 'WEEKOFYEARISO', 'NANOSEC', 'MIL', 'EPOCH_MICROSECONDS', 'USECOND', 'MONTH', 'D', 'MSECONDS', 'C', 'NANOSECS', 'WY', 'TIMEZONE_HOUR', 'USECS', 'DAYOFYEAR', 'WK', 'MILLISEC', 'DOY', 'YRS', 'YY', 'DW', 'EPOCH_MILLISECONDS', 'H', 'S', 'WEEK', 'TZM', 'DAY', 'QTRS', 'MICROSECONDS', 'MI', 'TIMEZONE_MINUTE', 'DAYOFWEEK_ISO', 'DAYS', 'EPOCH_NANOSECOND', 'QTR', 'SECONDS', 'W', 'MILLISECS', 'DY', 'NSEC', 'YEARS', 'WEEKDAY', 'MSECS', 'Q', 'HRS', 'YEAR', 'DEC', 'DW_ISO', 'MIN', 'SEC', 'WEEKDAY_ISO', 'EPOCH', 'MICROSEC', 'QUARTERS', 'MICROSECS', 'DOW_ISO', 'MICROSECOND', 'HR', 'MS', 'YYYY', 'MILS', 'MILLISECONDS', 'WEEKOFYEAR_ISO', 'EPOCH_MILLISECOND', 'Y', 'SECOND', 'NANOSECOND', 'MILLENNIUM', 'MINS', 'MM', 'HOURS', 'WEEKISO', 'DAYOFWEEKISO', 'USEC', 'CENTURY', 'DD', 'YYY', 'EPOCH_NANOSECONDS', 'USECONDS', 'EPOCH_SECOND', 'DECADE', 'DECS', 'MINUTE', 'EPOCH_SECONDS', 'EPOCH_MICROSECOND', 'MSECOND', 'MONS', 'MILLISECOND', 'DAYOFWEEK', 'HOUR', 'M', 'CENTS', 'US', 'WEEKOFYEAR', 'NS', 'CENTURIES', 'MILLISECON', 'YR', 'MON', 'MILLENIA', 'WOY', 'TZH', 'CENT', 'DAY OF WEEK', 'DOW', 'HH', 'DECADES', 'MONTHS'}
Inherited Members
- sqlglot.tokens.Tokenizer
- Tokenizer
- SINGLE_TOKENS
- BIT_STRINGS
- BYTE_STRINGS
- HEX_STRINGS
- RAW_STRINGS
- HEREDOC_STRINGS
- UNICODE_STRINGS
- IDENTIFIERS
- QUOTES
- VAR_SINGLE_TOKENS
- ESCAPE_FOLLOW_CHARS
- IDENTIFIER_ESCAPES
- HEREDOC_TAG_IS_IDENTIFIER
- HEREDOC_STRING_ALTERNATIVE
- STRING_ESCAPES_ALLOWED_IN_RAW_STRINGS
- NESTED_COMMENTS
- HINT_START
- TOKENS_PRECEDING_HINT
- KEYWORDS
- COMMANDS
- COMMAND_PREFIX_TOKENS
- NUMERIC_LITERALS
- NUMBERS_CAN_HAVE_DECIMALS
- COMMENTS
- dialect
- tokenize
- sql
- size
- tokens