Edit on GitHub

sqlglot.executor.python

  1import collections
  2import itertools
  3import math
  4
  5from sqlglot import exp, planner, tokens
  6from sqlglot.dialects.dialect import Dialect
  7from sqlglot.errors import ExecuteError
  8from sqlglot.executor.context import Context
  9from sqlglot.executor.env import ENV
 10from sqlglot.executor.table import RowReader, Table
 11from sqlglot.generators.python import PythonGenerator
 12
 13
 14class PythonExecutor:
 15    def __init__(self, env=None, tables=None):
 16        self.generator = Python().generator(identify=True, comments=False)
 17        self.env = {**ENV, **(env or {})}
 18        self.tables = tables or {}
 19
 20    def execute(self, plan):
 21        finished = set()
 22        queue = set(plan.leaves)
 23        contexts = {}
 24
 25        while queue:
 26            node = queue.pop()
 27            try:
 28                context = self.context(
 29                    {
 30                        name: table
 31                        for dep in node.dependencies
 32                        for name, table in contexts[dep].tables.items()
 33                    }
 34                )
 35
 36                if isinstance(node, planner.Scan):
 37                    contexts[node] = self.scan(node, context)
 38                elif isinstance(node, planner.Aggregate):
 39                    contexts[node] = self.aggregate(node, context)
 40                elif isinstance(node, planner.Join):
 41                    contexts[node] = self.join(node, context)
 42                elif isinstance(node, planner.Sort):
 43                    contexts[node] = self.sort(node, context)
 44                elif isinstance(node, planner.SetOperation):
 45                    contexts[node] = self.set_operation(node, context)
 46                else:
 47                    raise NotImplementedError
 48
 49                finished.add(node)
 50
 51                for dep in node.dependents:
 52                    if all(d in contexts for d in dep.dependencies):
 53                        queue.add(dep)
 54
 55                for dep in node.dependencies:
 56                    if all(d in finished for d in dep.dependents):
 57                        contexts.pop(dep)
 58            except Exception as e:
 59                raise ExecuteError(f"Step '{node.id}' failed: {e}") from e
 60
 61        root = plan.root
 62        return contexts[root].tables[root.name]
 63
 64    def generate(self, expression):
 65        """Convert a SQL expression into literal Python code and compile it into bytecode."""
 66        if not expression:
 67            return None
 68
 69        sql = self.generator.generate(expression)
 70        return compile(sql, sql, "eval", optimize=2)
 71
 72    def generate_tuple(self, expressions):
 73        """Convert an array of SQL expressions into tuple of Python byte code."""
 74        if not expressions:
 75            return tuple()
 76        return tuple(self.generate(expression) for expression in expressions)
 77
 78    def context(self, tables):
 79        return Context(tables, env=self.env)
 80
 81    def table(self, expressions):
 82        return Table(
 83            expression.alias_or_name if isinstance(expression, exp.Expr) else expression
 84            for expression in expressions
 85        )
 86
 87    def scan(self, step, context):
 88        source = step.source
 89
 90        if source and isinstance(source, exp.Expr):
 91            source = source.name or source.alias
 92
 93        if source is None:
 94            context, table_iter = self.static()
 95        elif source in context:
 96            if not step.projections and not step.condition:
 97                return self.context({step.name: context.tables[source]})
 98            table_iter = context.table_iter(source)
 99        else:
100            context, table_iter = self.scan_table(step)
101
102        return self.context({step.name: self._project_and_filter(context, step, table_iter)})
103
104    def _project_and_filter(self, context, step, table_iter):
105        sink = self.table(step.projections if step.projections else context.columns)
106        condition = self.generate(step.condition)
107        projections = self.generate_tuple(step.projections)
108
109        for reader in table_iter:
110            if len(sink) >= step.limit:
111                break
112
113            if condition and not context.eval(condition):
114                continue
115
116            if projections:
117                sink.append(context.eval_tuple(projections))
118            else:
119                sink.append(reader.row)
120
121        return sink
122
123    def static(self):
124        return self.context({}), [RowReader(())]
125
126    def scan_table(self, step):
127        table = self.tables.find(step.source)
128        context = self.context({step.source.alias_or_name: table})
129        return context, iter(table)
130
131    def join(self, step, context):
132        source = step.source_name
133
134        source_table = context.tables[source]
135        source_context = self.context({source: source_table})
136        column_ranges = {source: range(0, len(source_table.columns))}
137
138        for name, join in step.joins.items():
139            table = context.tables[name]
140            start = max(r.stop for r in column_ranges.values())
141            column_ranges[name] = range(start, len(table.columns) + start)
142            join_context = self.context({name: table})
143
144            if join.get("source_key"):
145                table = self.hash_join(join, source_context, join_context)
146            else:
147                table = self.nested_loop_join(join, source_context, join_context)
148
149            source_context = self.context(
150                {
151                    name: Table(table.columns, table.rows, column_range)
152                    for name, column_range in column_ranges.items()
153                }
154            )
155            condition = self.generate(join["condition"])
156            if condition:
157                source_context.filter(condition)
158
159        if not step.condition and not step.projections:
160            return source_context
161
162        sink = self._project_and_filter(
163            source_context,
164            step,
165            (reader for reader, _ in iter(source_context)),
166        )
167
168        if step.projections:
169            return self.context({step.name: sink})
170        else:
171            return self.context(
172                {
173                    name: Table(table.columns, sink.rows, table.column_range)
174                    for name, table in source_context.tables.items()
175                }
176            )
177
178    def nested_loop_join(self, _join, source_context, join_context):
179        table = Table(source_context.columns + join_context.columns)
180
181        for reader_a, _ in source_context:
182            for reader_b, _ in join_context:
183                table.append(reader_a.row + reader_b.row)
184
185        return table
186
187    def hash_join(self, join, source_context, join_context):
188        source_key = self.generate_tuple(join["source_key"])
189        join_key = self.generate_tuple(join["join_key"])
190        left = join.get("side") == "LEFT"
191        right = join.get("side") == "RIGHT"
192
193        results = collections.defaultdict(lambda: ([], []))
194
195        for reader, ctx in source_context:
196            results[ctx.eval_tuple(source_key)][0].append(reader.row)
197        for reader, ctx in join_context:
198            results[ctx.eval_tuple(join_key)][1].append(reader.row)
199
200        table = Table(source_context.columns + join_context.columns)
201        nulls = [(None,) * len(join_context.columns if left else source_context.columns)]
202
203        for a_group, b_group in results.values():
204            if left:
205                b_group = b_group or nulls
206            elif right:
207                a_group = a_group or nulls
208
209            for a_row, b_row in itertools.product(a_group, b_group):
210                table.append(a_row + b_row)
211
212        return table
213
214    def aggregate(self, step, context):
215        group_by = self.generate_tuple(step.group.values())
216        aggregations = self.generate_tuple(step.aggregations)
217        operands = self.generate_tuple(step.operands)
218
219        if operands:
220            operand_table = Table(self.table(step.operands).columns)
221
222            for reader, ctx in context:
223                operand_table.append(ctx.eval_tuple(operands))
224
225            for i, (a, b) in enumerate(zip(context.table.rows, operand_table.rows)):
226                context.table.rows[i] = a + b
227
228            width = len(context.columns)
229            context.add_columns(*operand_table.columns)
230
231            operand_table = Table(
232                context.columns,
233                context.table.rows,
234                range(width, width + len(operand_table.columns)),
235            )
236
237            context = self.context(
238                {
239                    None: operand_table,
240                    **context.tables,
241                }
242            )
243
244        context.sort(group_by)
245
246        group = None
247        start = 0
248        end = 1
249        length = len(context.table)
250        table = self.table(list(step.group) + step.aggregations)
251
252        def add_row():
253            table.append(group + context.eval_tuple(aggregations))
254
255        if length:
256            for i in range(length):
257                context.set_index(i)
258                key = context.eval_tuple(group_by)
259                group = key if group is None else group
260                end += 1
261                if key != group:
262                    context.set_range(start, end - 2)
263                    add_row()
264                    group = key
265                    start = end - 2
266                if len(table.rows) >= step.limit:
267                    break
268                if i == length - 1:
269                    context.set_range(start, end - 1)
270                    add_row()
271        elif step.limit > 0 and not group_by:
272            context.set_range(0, 0)
273            table.append(context.eval_tuple(aggregations))
274
275        context = self.context({step.name: table, **{name: table for name in context.tables}})
276
277        if step.projections or step.condition:
278            return self.scan(step, context)
279        return context
280
281    def sort(self, step, context):
282        projections = self.generate_tuple(step.projections)
283        projection_columns = [p.alias_or_name for p in step.projections]
284        all_columns = list(context.columns) + projection_columns
285        sink = self.table(all_columns)
286        for reader, ctx in context:
287            sink.append(reader.row + ctx.eval_tuple(projections))
288
289        sort_ctx = self.context(
290            {
291                None: sink,
292                **{table: sink for table in context.tables},
293            }
294        )
295        sort_ctx.sort(self.generate_tuple(step.key))
296
297        if not math.isinf(step.limit):
298            sort_ctx.table.rows = sort_ctx.table.rows[0 : step.limit]
299
300        output = Table(
301            projection_columns,
302            rows=[r[len(context.columns) : len(all_columns)] for r in sort_ctx.table.rows],
303        )
304        return self.context({step.name: output})
305
306    def set_operation(self, step, context):
307        left = context.tables[step.left]
308        right = context.tables[step.right]
309
310        sink = self.table(left.columns)
311
312        if issubclass(step.op, exp.Intersect):
313            sink.rows = list(set(left.rows).intersection(set(right.rows)))
314        elif issubclass(step.op, exp.Except):
315            sink.rows = list(set(left.rows).difference(set(right.rows)))
316        elif issubclass(step.op, exp.Union) and step.distinct:
317            sink.rows = list(set(left.rows).union(set(right.rows)))
318        else:
319            sink.rows = left.rows + right.rows
320
321        if not math.isinf(step.limit):
322            sink.rows = sink.rows[0 : step.limit]
323
324        return self.context({step.name: sink})
325
326
327class Python(Dialect):
328    class Tokenizer(tokens.Tokenizer):
329        STRING_ESCAPES = ["\\"]
330
331    Generator = PythonGenerator
class PythonExecutor:
 15class PythonExecutor:
 16    def __init__(self, env=None, tables=None):
 17        self.generator = Python().generator(identify=True, comments=False)
 18        self.env = {**ENV, **(env or {})}
 19        self.tables = tables or {}
 20
 21    def execute(self, plan):
 22        finished = set()
 23        queue = set(plan.leaves)
 24        contexts = {}
 25
 26        while queue:
 27            node = queue.pop()
 28            try:
 29                context = self.context(
 30                    {
 31                        name: table
 32                        for dep in node.dependencies
 33                        for name, table in contexts[dep].tables.items()
 34                    }
 35                )
 36
 37                if isinstance(node, planner.Scan):
 38                    contexts[node] = self.scan(node, context)
 39                elif isinstance(node, planner.Aggregate):
 40                    contexts[node] = self.aggregate(node, context)
 41                elif isinstance(node, planner.Join):
 42                    contexts[node] = self.join(node, context)
 43                elif isinstance(node, planner.Sort):
 44                    contexts[node] = self.sort(node, context)
 45                elif isinstance(node, planner.SetOperation):
 46                    contexts[node] = self.set_operation(node, context)
 47                else:
 48                    raise NotImplementedError
 49
 50                finished.add(node)
 51
 52                for dep in node.dependents:
 53                    if all(d in contexts for d in dep.dependencies):
 54                        queue.add(dep)
 55
 56                for dep in node.dependencies:
 57                    if all(d in finished for d in dep.dependents):
 58                        contexts.pop(dep)
 59            except Exception as e:
 60                raise ExecuteError(f"Step '{node.id}' failed: {e}") from e
 61
 62        root = plan.root
 63        return contexts[root].tables[root.name]
 64
 65    def generate(self, expression):
 66        """Convert a SQL expression into literal Python code and compile it into bytecode."""
 67        if not expression:
 68            return None
 69
 70        sql = self.generator.generate(expression)
 71        return compile(sql, sql, "eval", optimize=2)
 72
 73    def generate_tuple(self, expressions):
 74        """Convert an array of SQL expressions into tuple of Python byte code."""
 75        if not expressions:
 76            return tuple()
 77        return tuple(self.generate(expression) for expression in expressions)
 78
 79    def context(self, tables):
 80        return Context(tables, env=self.env)
 81
 82    def table(self, expressions):
 83        return Table(
 84            expression.alias_or_name if isinstance(expression, exp.Expr) else expression
 85            for expression in expressions
 86        )
 87
 88    def scan(self, step, context):
 89        source = step.source
 90
 91        if source and isinstance(source, exp.Expr):
 92            source = source.name or source.alias
 93
 94        if source is None:
 95            context, table_iter = self.static()
 96        elif source in context:
 97            if not step.projections and not step.condition:
 98                return self.context({step.name: context.tables[source]})
 99            table_iter = context.table_iter(source)
100        else:
101            context, table_iter = self.scan_table(step)
102
103        return self.context({step.name: self._project_and_filter(context, step, table_iter)})
104
105    def _project_and_filter(self, context, step, table_iter):
106        sink = self.table(step.projections if step.projections else context.columns)
107        condition = self.generate(step.condition)
108        projections = self.generate_tuple(step.projections)
109
110        for reader in table_iter:
111            if len(sink) >= step.limit:
112                break
113
114            if condition and not context.eval(condition):
115                continue
116
117            if projections:
118                sink.append(context.eval_tuple(projections))
119            else:
120                sink.append(reader.row)
121
122        return sink
123
124    def static(self):
125        return self.context({}), [RowReader(())]
126
127    def scan_table(self, step):
128        table = self.tables.find(step.source)
129        context = self.context({step.source.alias_or_name: table})
130        return context, iter(table)
131
132    def join(self, step, context):
133        source = step.source_name
134
135        source_table = context.tables[source]
136        source_context = self.context({source: source_table})
137        column_ranges = {source: range(0, len(source_table.columns))}
138
139        for name, join in step.joins.items():
140            table = context.tables[name]
141            start = max(r.stop for r in column_ranges.values())
142            column_ranges[name] = range(start, len(table.columns) + start)
143            join_context = self.context({name: table})
144
145            if join.get("source_key"):
146                table = self.hash_join(join, source_context, join_context)
147            else:
148                table = self.nested_loop_join(join, source_context, join_context)
149
150            source_context = self.context(
151                {
152                    name: Table(table.columns, table.rows, column_range)
153                    for name, column_range in column_ranges.items()
154                }
155            )
156            condition = self.generate(join["condition"])
157            if condition:
158                source_context.filter(condition)
159
160        if not step.condition and not step.projections:
161            return source_context
162
163        sink = self._project_and_filter(
164            source_context,
165            step,
166            (reader for reader, _ in iter(source_context)),
167        )
168
169        if step.projections:
170            return self.context({step.name: sink})
171        else:
172            return self.context(
173                {
174                    name: Table(table.columns, sink.rows, table.column_range)
175                    for name, table in source_context.tables.items()
176                }
177            )
178
179    def nested_loop_join(self, _join, source_context, join_context):
180        table = Table(source_context.columns + join_context.columns)
181
182        for reader_a, _ in source_context:
183            for reader_b, _ in join_context:
184                table.append(reader_a.row + reader_b.row)
185
186        return table
187
188    def hash_join(self, join, source_context, join_context):
189        source_key = self.generate_tuple(join["source_key"])
190        join_key = self.generate_tuple(join["join_key"])
191        left = join.get("side") == "LEFT"
192        right = join.get("side") == "RIGHT"
193
194        results = collections.defaultdict(lambda: ([], []))
195
196        for reader, ctx in source_context:
197            results[ctx.eval_tuple(source_key)][0].append(reader.row)
198        for reader, ctx in join_context:
199            results[ctx.eval_tuple(join_key)][1].append(reader.row)
200
201        table = Table(source_context.columns + join_context.columns)
202        nulls = [(None,) * len(join_context.columns if left else source_context.columns)]
203
204        for a_group, b_group in results.values():
205            if left:
206                b_group = b_group or nulls
207            elif right:
208                a_group = a_group or nulls
209
210            for a_row, b_row in itertools.product(a_group, b_group):
211                table.append(a_row + b_row)
212
213        return table
214
215    def aggregate(self, step, context):
216        group_by = self.generate_tuple(step.group.values())
217        aggregations = self.generate_tuple(step.aggregations)
218        operands = self.generate_tuple(step.operands)
219
220        if operands:
221            operand_table = Table(self.table(step.operands).columns)
222
223            for reader, ctx in context:
224                operand_table.append(ctx.eval_tuple(operands))
225
226            for i, (a, b) in enumerate(zip(context.table.rows, operand_table.rows)):
227                context.table.rows[i] = a + b
228
229            width = len(context.columns)
230            context.add_columns(*operand_table.columns)
231
232            operand_table = Table(
233                context.columns,
234                context.table.rows,
235                range(width, width + len(operand_table.columns)),
236            )
237
238            context = self.context(
239                {
240                    None: operand_table,
241                    **context.tables,
242                }
243            )
244
245        context.sort(group_by)
246
247        group = None
248        start = 0
249        end = 1
250        length = len(context.table)
251        table = self.table(list(step.group) + step.aggregations)
252
253        def add_row():
254            table.append(group + context.eval_tuple(aggregations))
255
256        if length:
257            for i in range(length):
258                context.set_index(i)
259                key = context.eval_tuple(group_by)
260                group = key if group is None else group
261                end += 1
262                if key != group:
263                    context.set_range(start, end - 2)
264                    add_row()
265                    group = key
266                    start = end - 2
267                if len(table.rows) >= step.limit:
268                    break
269                if i == length - 1:
270                    context.set_range(start, end - 1)
271                    add_row()
272        elif step.limit > 0 and not group_by:
273            context.set_range(0, 0)
274            table.append(context.eval_tuple(aggregations))
275
276        context = self.context({step.name: table, **{name: table for name in context.tables}})
277
278        if step.projections or step.condition:
279            return self.scan(step, context)
280        return context
281
282    def sort(self, step, context):
283        projections = self.generate_tuple(step.projections)
284        projection_columns = [p.alias_or_name for p in step.projections]
285        all_columns = list(context.columns) + projection_columns
286        sink = self.table(all_columns)
287        for reader, ctx in context:
288            sink.append(reader.row + ctx.eval_tuple(projections))
289
290        sort_ctx = self.context(
291            {
292                None: sink,
293                **{table: sink for table in context.tables},
294            }
295        )
296        sort_ctx.sort(self.generate_tuple(step.key))
297
298        if not math.isinf(step.limit):
299            sort_ctx.table.rows = sort_ctx.table.rows[0 : step.limit]
300
301        output = Table(
302            projection_columns,
303            rows=[r[len(context.columns) : len(all_columns)] for r in sort_ctx.table.rows],
304        )
305        return self.context({step.name: output})
306
307    def set_operation(self, step, context):
308        left = context.tables[step.left]
309        right = context.tables[step.right]
310
311        sink = self.table(left.columns)
312
313        if issubclass(step.op, exp.Intersect):
314            sink.rows = list(set(left.rows).intersection(set(right.rows)))
315        elif issubclass(step.op, exp.Except):
316            sink.rows = list(set(left.rows).difference(set(right.rows)))
317        elif issubclass(step.op, exp.Union) and step.distinct:
318            sink.rows = list(set(left.rows).union(set(right.rows)))
319        else:
320            sink.rows = left.rows + right.rows
321
322        if not math.isinf(step.limit):
323            sink.rows = sink.rows[0 : step.limit]
324
325        return self.context({step.name: sink})
PythonExecutor(env=None, tables=None)
16    def __init__(self, env=None, tables=None):
17        self.generator = Python().generator(identify=True, comments=False)
18        self.env = {**ENV, **(env or {})}
19        self.tables = tables or {}
generator
env
tables
def execute(self, plan):
21    def execute(self, plan):
22        finished = set()
23        queue = set(plan.leaves)
24        contexts = {}
25
26        while queue:
27            node = queue.pop()
28            try:
29                context = self.context(
30                    {
31                        name: table
32                        for dep in node.dependencies
33                        for name, table in contexts[dep].tables.items()
34                    }
35                )
36
37                if isinstance(node, planner.Scan):
38                    contexts[node] = self.scan(node, context)
39                elif isinstance(node, planner.Aggregate):
40                    contexts[node] = self.aggregate(node, context)
41                elif isinstance(node, planner.Join):
42                    contexts[node] = self.join(node, context)
43                elif isinstance(node, planner.Sort):
44                    contexts[node] = self.sort(node, context)
45                elif isinstance(node, planner.SetOperation):
46                    contexts[node] = self.set_operation(node, context)
47                else:
48                    raise NotImplementedError
49
50                finished.add(node)
51
52                for dep in node.dependents:
53                    if all(d in contexts for d in dep.dependencies):
54                        queue.add(dep)
55
56                for dep in node.dependencies:
57                    if all(d in finished for d in dep.dependents):
58                        contexts.pop(dep)
59            except Exception as e:
60                raise ExecuteError(f"Step '{node.id}' failed: {e}") from e
61
62        root = plan.root
63        return contexts[root].tables[root.name]
def generate(self, expression):
65    def generate(self, expression):
66        """Convert a SQL expression into literal Python code and compile it into bytecode."""
67        if not expression:
68            return None
69
70        sql = self.generator.generate(expression)
71        return compile(sql, sql, "eval", optimize=2)

Convert a SQL expression into literal Python code and compile it into bytecode.

def generate_tuple(self, expressions):
73    def generate_tuple(self, expressions):
74        """Convert an array of SQL expressions into tuple of Python byte code."""
75        if not expressions:
76            return tuple()
77        return tuple(self.generate(expression) for expression in expressions)

Convert an array of SQL expressions into tuple of Python byte code.

def context(self, tables):
79    def context(self, tables):
80        return Context(tables, env=self.env)
def table(self, expressions):
82    def table(self, expressions):
83        return Table(
84            expression.alias_or_name if isinstance(expression, exp.Expr) else expression
85            for expression in expressions
86        )
def scan(self, step, context):
 88    def scan(self, step, context):
 89        source = step.source
 90
 91        if source and isinstance(source, exp.Expr):
 92            source = source.name or source.alias
 93
 94        if source is None:
 95            context, table_iter = self.static()
 96        elif source in context:
 97            if not step.projections and not step.condition:
 98                return self.context({step.name: context.tables[source]})
 99            table_iter = context.table_iter(source)
100        else:
101            context, table_iter = self.scan_table(step)
102
103        return self.context({step.name: self._project_and_filter(context, step, table_iter)})
def static(self):
124    def static(self):
125        return self.context({}), [RowReader(())]
def scan_table(self, step):
127    def scan_table(self, step):
128        table = self.tables.find(step.source)
129        context = self.context({step.source.alias_or_name: table})
130        return context, iter(table)
def join(self, step, context):
132    def join(self, step, context):
133        source = step.source_name
134
135        source_table = context.tables[source]
136        source_context = self.context({source: source_table})
137        column_ranges = {source: range(0, len(source_table.columns))}
138
139        for name, join in step.joins.items():
140            table = context.tables[name]
141            start = max(r.stop for r in column_ranges.values())
142            column_ranges[name] = range(start, len(table.columns) + start)
143            join_context = self.context({name: table})
144
145            if join.get("source_key"):
146                table = self.hash_join(join, source_context, join_context)
147            else:
148                table = self.nested_loop_join(join, source_context, join_context)
149
150            source_context = self.context(
151                {
152                    name: Table(table.columns, table.rows, column_range)
153                    for name, column_range in column_ranges.items()
154                }
155            )
156            condition = self.generate(join["condition"])
157            if condition:
158                source_context.filter(condition)
159
160        if not step.condition and not step.projections:
161            return source_context
162
163        sink = self._project_and_filter(
164            source_context,
165            step,
166            (reader for reader, _ in iter(source_context)),
167        )
168
169        if step.projections:
170            return self.context({step.name: sink})
171        else:
172            return self.context(
173                {
174                    name: Table(table.columns, sink.rows, table.column_range)
175                    for name, table in source_context.tables.items()
176                }
177            )
def nested_loop_join(self, _join, source_context, join_context):
179    def nested_loop_join(self, _join, source_context, join_context):
180        table = Table(source_context.columns + join_context.columns)
181
182        for reader_a, _ in source_context:
183            for reader_b, _ in join_context:
184                table.append(reader_a.row + reader_b.row)
185
186        return table
def hash_join(self, join, source_context, join_context):
188    def hash_join(self, join, source_context, join_context):
189        source_key = self.generate_tuple(join["source_key"])
190        join_key = self.generate_tuple(join["join_key"])
191        left = join.get("side") == "LEFT"
192        right = join.get("side") == "RIGHT"
193
194        results = collections.defaultdict(lambda: ([], []))
195
196        for reader, ctx in source_context:
197            results[ctx.eval_tuple(source_key)][0].append(reader.row)
198        for reader, ctx in join_context:
199            results[ctx.eval_tuple(join_key)][1].append(reader.row)
200
201        table = Table(source_context.columns + join_context.columns)
202        nulls = [(None,) * len(join_context.columns if left else source_context.columns)]
203
204        for a_group, b_group in results.values():
205            if left:
206                b_group = b_group or nulls
207            elif right:
208                a_group = a_group or nulls
209
210            for a_row, b_row in itertools.product(a_group, b_group):
211                table.append(a_row + b_row)
212
213        return table
def aggregate(self, step, context):
215    def aggregate(self, step, context):
216        group_by = self.generate_tuple(step.group.values())
217        aggregations = self.generate_tuple(step.aggregations)
218        operands = self.generate_tuple(step.operands)
219
220        if operands:
221            operand_table = Table(self.table(step.operands).columns)
222
223            for reader, ctx in context:
224                operand_table.append(ctx.eval_tuple(operands))
225
226            for i, (a, b) in enumerate(zip(context.table.rows, operand_table.rows)):
227                context.table.rows[i] = a + b
228
229            width = len(context.columns)
230            context.add_columns(*operand_table.columns)
231
232            operand_table = Table(
233                context.columns,
234                context.table.rows,
235                range(width, width + len(operand_table.columns)),
236            )
237
238            context = self.context(
239                {
240                    None: operand_table,
241                    **context.tables,
242                }
243            )
244
245        context.sort(group_by)
246
247        group = None
248        start = 0
249        end = 1
250        length = len(context.table)
251        table = self.table(list(step.group) + step.aggregations)
252
253        def add_row():
254            table.append(group + context.eval_tuple(aggregations))
255
256        if length:
257            for i in range(length):
258                context.set_index(i)
259                key = context.eval_tuple(group_by)
260                group = key if group is None else group
261                end += 1
262                if key != group:
263                    context.set_range(start, end - 2)
264                    add_row()
265                    group = key
266                    start = end - 2
267                if len(table.rows) >= step.limit:
268                    break
269                if i == length - 1:
270                    context.set_range(start, end - 1)
271                    add_row()
272        elif step.limit > 0 and not group_by:
273            context.set_range(0, 0)
274            table.append(context.eval_tuple(aggregations))
275
276        context = self.context({step.name: table, **{name: table for name in context.tables}})
277
278        if step.projections or step.condition:
279            return self.scan(step, context)
280        return context
def sort(self, step, context):
282    def sort(self, step, context):
283        projections = self.generate_tuple(step.projections)
284        projection_columns = [p.alias_or_name for p in step.projections]
285        all_columns = list(context.columns) + projection_columns
286        sink = self.table(all_columns)
287        for reader, ctx in context:
288            sink.append(reader.row + ctx.eval_tuple(projections))
289
290        sort_ctx = self.context(
291            {
292                None: sink,
293                **{table: sink for table in context.tables},
294            }
295        )
296        sort_ctx.sort(self.generate_tuple(step.key))
297
298        if not math.isinf(step.limit):
299            sort_ctx.table.rows = sort_ctx.table.rows[0 : step.limit]
300
301        output = Table(
302            projection_columns,
303            rows=[r[len(context.columns) : len(all_columns)] for r in sort_ctx.table.rows],
304        )
305        return self.context({step.name: output})
def set_operation(self, step, context):
307    def set_operation(self, step, context):
308        left = context.tables[step.left]
309        right = context.tables[step.right]
310
311        sink = self.table(left.columns)
312
313        if issubclass(step.op, exp.Intersect):
314            sink.rows = list(set(left.rows).intersection(set(right.rows)))
315        elif issubclass(step.op, exp.Except):
316            sink.rows = list(set(left.rows).difference(set(right.rows)))
317        elif issubclass(step.op, exp.Union) and step.distinct:
318            sink.rows = list(set(left.rows).union(set(right.rows)))
319        else:
320            sink.rows = left.rows + right.rows
321
322        if not math.isinf(step.limit):
323            sink.rows = sink.rows[0 : step.limit]
324
325        return self.context({step.name: sink})
class Python(sqlglot.dialects.dialect.Dialect):
328class Python(Dialect):
329    class Tokenizer(tokens.Tokenizer):
330        STRING_ESCAPES = ["\\"]
331
332    Generator = PythonGenerator
SUPPORTS_COLUMN_JOIN_MARKS = False

Whether the old-style outer join (+) syntax is supported.

UNESCAPED_SEQUENCES: dict[str, str] = {'\\a': '\x07', '\\b': '\x08', '\\f': '\x0c', '\\n': '\n', '\\r': '\r', '\\t': '\t', '\\v': '\x0b', '\\\\': '\\'}

Mapping of an escaped sequence (\n) to its unescaped version ( ).

STRINGS_SUPPORT_ESCAPED_SEQUENCES: bool = True

Whether string literals support escape sequences (e.g. \n). Set by the metaclass based on the tokenizer's STRING_ESCAPES.

BYTE_STRINGS_SUPPORT_ESCAPED_SEQUENCES: bool = True

Whether byte string literals support escape sequences. Set by the metaclass based on the tokenizer's BYTE_STRING_ESCAPES.

INITCAP_SUPPORTS_CUSTOM_DELIMITERS = False
tokenizer_class = <class 'Python.Tokenizer'>
jsonpath_tokenizer_class = <class 'sqlglot.dialects.dialect.JSONPathTokenizer'>
parser_class = <class 'sqlglot.parsers.base.BaseParser'>
generator_class = <class 'sqlglot.generators.python.PythonGenerator'>
TIME_TRIE: dict = {}
FORMAT_TRIE: dict = {}
INVERSE_TIME_MAPPING: dict[str, str] = {}
INVERSE_TIME_TRIE: dict = {}
INVERSE_FORMAT_MAPPING: dict[str, str] = {}
INVERSE_FORMAT_TRIE: dict = {}
INVERSE_CREATABLE_KIND_MAPPING: dict[str, str] = {}
ESCAPED_SEQUENCES: dict[str, str] = {'\x07': '\\a', '\x08': '\\b', '\x0c': '\\f', '\n': '\\n', '\r': '\\r', '\t': '\\t', '\x0b': '\\v', '\\': '\\\\'}
QUOTE_START = "'"
QUOTE_END = "'"
IDENTIFIER_START = '"'
IDENTIFIER_END = '"'
VALID_INTERVAL_UNITS: set[str] = {'WEEK_ISO', 'DAY OF YEAR', 'MSEC', 'MINUTES', 'QUARTER', 'NSECOND', 'SECS', 'DAYOFMONTH', 'NSECONDS', 'WEEKOFYEARISO', 'NANOSEC', 'MIL', 'EPOCH_MICROSECONDS', 'USECOND', 'MONTH', 'D', 'MSECONDS', 'C', 'NANOSECS', 'WY', 'TIMEZONE_HOUR', 'USECS', 'DAYOFYEAR', 'WK', 'MILLISEC', 'DOY', 'YRS', 'YY', 'DW', 'EPOCH_MILLISECONDS', 'H', 'S', 'WEEK', 'TZM', 'DAY', 'QTRS', 'MICROSECONDS', 'MI', 'TIMEZONE_MINUTE', 'DAYOFWEEK_ISO', 'DAYS', 'EPOCH_NANOSECOND', 'QTR', 'SECONDS', 'W', 'MILLISECS', 'DY', 'NSEC', 'YEARS', 'WEEKDAY', 'MSECS', 'Q', 'HRS', 'YEAR', 'DEC', 'DW_ISO', 'MIN', 'SEC', 'WEEKDAY_ISO', 'EPOCH', 'MICROSEC', 'QUARTERS', 'MICROSECS', 'DOW_ISO', 'MICROSECOND', 'HR', 'MS', 'YYYY', 'MILS', 'MILLISECONDS', 'WEEKOFYEAR_ISO', 'EPOCH_MILLISECOND', 'Y', 'SECOND', 'NANOSECOND', 'MILLENNIUM', 'MINS', 'MM', 'HOURS', 'WEEKISO', 'DAYOFWEEKISO', 'USEC', 'CENTURY', 'DD', 'YYY', 'EPOCH_NANOSECONDS', 'USECONDS', 'EPOCH_SECOND', 'DECADE', 'DECS', 'MINUTE', 'EPOCH_SECONDS', 'EPOCH_MICROSECOND', 'MSECOND', 'MONS', 'MILLISECOND', 'DAYOFWEEK', 'HOUR', 'M', 'CENTS', 'US', 'WEEKOFYEAR', 'NS', 'CENTURIES', 'MILLISECON', 'YR', 'MON', 'MILLENIA', 'WOY', 'TZH', 'CENT', 'DAY OF WEEK', 'DOW', 'HH', 'DECADES', 'MONTHS'}
BIT_START: str | None = None
BIT_END: str | None = None
HEX_START: str | None = None
HEX_END: str | None = None
BYTE_START: str | None = None
BYTE_END: str | None = None
UNICODE_START: str | None = None
UNICODE_END: str | None = None
class Python.Tokenizer(sqlglot.tokens.Tokenizer):
329    class Tokenizer(tokens.Tokenizer):
330        STRING_ESCAPES = ["\\"]
STRING_ESCAPES = ['\\']
BYTE_STRING_ESCAPES: ClassVar[list[str]] = ['\\']