Edit on GitHub

Writing a Python SQL engine from scratch

Toby Mao

Introduction

When I first started writing SQLGlot in early 2021, my goal was just to translate SQL queries from SparkSQL to Presto and vice versa. However, over the last year and a half, I've ended up with a full-fledged SQL engine. SQLGlot can now parse and transpile between 18 SQL dialects and can execute all 24 TPC-H SQL queries. The parser and engine are all written from scratch using Python.

This post will cover why I went through the effort of creating a Python SQL engine and how a simple query goes from a string to actually transforming data. The following steps are briefly summarized:

Why?

I started working on SQLGlot because of my work on the experimentation and metrics platform at Netflix, where I built tools that allowed data scientists to define and compute SQL-based metrics. Netflix relied on multiple engines to query data (Spark, Presto, and Druid), so my team built the metrics platform around PyPika, a Python SQL query builder. This way, definitions could be reused across multiple engines. However, it became quickly apparent that writing python code to programmatically generate SQL was challenging for data scientists, especially those with academic backgrounds, since they were mostly familiar with R and SQL. At the time, the only Python SQL parser was sqlparse, which is not actually a parser but a tokenizer, so having users write raw SQL into the platform wasn't really an option. Some time later, I randomly stumbled across Crafting Interpreters and realized that I could use it as a guide towards creating my own SQL parser/transpiler.

Why did I do this? Isn't a Python SQL engine going to be extremely slow?

The main reason why I ended up building a SQL engine was...just for entertainment. It's been fun learning about all the things required to actually run a SQL query, and seeing it actually work is extremely rewarding. Before SQLGlot, I had zero experience with lexers, parsers, or compilers.

In terms of practical use cases, I planned to use the Python SQL engine for unit testing SQL pipelines. Big data pipelines are tough to test because many of the engines are not open source and cannot be run locally. With SQLGlot, you can take a SQL query targeting a warehouse such as Snowflake and seamlessly run it in CI on mock Python data. It's easy to mock data and create arbitrary UDFs because everything is just Python. Although the implementation is slow and unsuitable for large amounts of data (> 1 million rows), there's very little overhead/startup and you can run queries on test data in a couple of milliseconds.

Finally, the components that have been built to support execution can be used as a foundation for a faster engine. I'm inspired by what Apache Calcite has done for the JVM world. Even though Python is commonly used for data, there hasn't been a Calcite for Python. So, you could say that SQLGlot aims to be that framework. For example, it wouldn't take much work to replace the Python execution engine with numpy/pandas/arrow to become a respectably-performing query engine. The implementation would be able to leverage the parser, optimizer, and logical planner, only needing to implement physical execution. There is a lot of work in the Python ecosystem around high performance vectorized computation, which I think could benefit from a pure Python-based AST/plan. Parsing and planning doesn't have to be fast when the bottleneck of running queries is processing terabytes of data. So, having a Python-based ecosystem around SQL is beneficial given the ease of development in Python, despite not having bare metal performance.

Parts of SQLGlot's toolkit are being used today by the following:

  • Ibis: A Python library that provides a lightweight, universal interface for data wrangling.
    • Uses the Python SQL expression builder and leverages the optimizer/planner to convert SQL into dataframe operations.
  • mysql-mimic: Pure-Python implementation of the MySQL server wire protocol
    • Parses / transforms SQL and executes INFORMATION_SCHEMA queries.
  • Quokka: Push-based vectorized query engine
    • Parse and optimizes SQL.
  • Splink: Fast, accurate and scalable probabilistic data linkage using your choice of SQL backend.
    • Transpiles queries.

How?

There are many steps involved with actually running a simple query like:

SELECT
  bar.a,
  b + 1 AS b
FROM bar
JOIN baz
  ON bar.a = baz.a
WHERE bar.a > 1

In this post, I'll walk through all the steps SQLGlot takes to run this query over Python objects.

Tokenizing

The first step is to convert the sql string into a list of tokens. SQLGlot's tokenizer is quite simple and can be found here. In a while loop, it checks each character and either appends the character to the current token, or makes a new token.

Running the SQLGlot tokenizer shows the output.

Tokenizer Output

Each keyword has been converted to a SQLGlot Token object. Each token has some metadata associated with it, like line/column information for error messages. Comments are also a part of the token, so that comments can be preserved.

Parsing

Once a SQL statement is tokenized, we don't need to worry about white space and other formatting, so it's easier to work with. We can now convert the list of tokens into an AST. The SQLGlot parser is a handwritten recursive descent parser.

Similar to the tokenizer, it consumes the tokens sequentially, but it instead uses a recursive algorithm. The tokens are converted into a single AST node that presents the SQL query. The SQLGlot parser was designed to support various dialects, so it contains many options for overriding parsing functionality.

Parser Output

The AST is a generic representation of a given SQL query. Each dialect can override or implement its own generator, which can convert an AST object into syntatically-correct SQL.

Optimizing

Once we have our AST, we can transform it into an equivalent query that produces the same results more efficiently. When optimizing queries, most engines first convert the AST into a logical plan and then optimize the plan. However, I chose to optimize the AST directly for the following reasons:

  1. It's easier to debug and validate the optimizations when the input and output are both SQL.

  2. Rules can be applied a la carte to transform SQL into a more desirable form.

  3. I wanted a way to generate 'canonical sql'. Having a canonical representation of SQL is useful for understanding if two queries are semantically equivalent (e.g. SELECT 1 + 1 and SELECT 2).

I've yet to find another engine that takes this approach, but I'm quite happy with this decision. The optimizer currently does not perform any "physical optimizations" such as join reordering. Those are left to the execution layer, as additional statistics and information could become relevant.

Optimizer Output

The optimizer currently has 17 rules. Each of these rules is applied, transforming the AST in place. The combination of these rules creates "canonical" sql that can then be more easily converted into a logical plan and executed.

Some example rules are:

qualify_tables and qualify_columns

  • Adds all db/catalog qualifiers to tables and forces an alias.
  • Ensure each column is unambiguous and expand stars.
SELECT * FROM x;

SELECT "db"."x" AS "x";

simplify

Boolean and math simplification. Check out all the test cases.

((NOT FALSE) AND (x = x)) AND (TRUE OR 1 <> 3);
x = x;

1 + 1;
2;

normalize

Attempts to convert all predicates into conjunctive normal form.

-- DNF
(A AND B) OR (B AND C AND D);

-- CNF
(A OR C) AND (A OR D) AND B;

unnest_subqueries

Converts subqueries in predicates into joins.

-- The subquery can be converted into a left join
SELECT *
FROM x AS x
WHERE (
  SELECT y.a AS a
  FROM y AS y
  WHERE x.a = y.a
) = 1;

SELECT *
FROM x AS x
LEFT JOIN (
  SELECT y.a AS a
  FROM y AS y
  WHERE TRUE
  GROUP BY y.a
) AS "_u_0"
  ON x.a = "_u_0".a
WHERE ("_u_0".a = 1 AND NOT "_u_0".a IS NULL)

pushdown_predicates

Push down filters into the innermost query.

SELECT *
FROM (
  SELECT *
  FROM x AS x
) AS y
WHERE y.a = 1;

SELECT *
FROM (
  SELECT *
  FROM x AS x
  WHERE y.a = 1
) AS y WHERE TRUE

annotate_types

Infer all types throughout the AST given schema information and function type definitions.

Planning

After the SQL AST has been "optimized", it's much easier to convert into a logical plan. The AST is traversed and converted into a DAG consisting of one of five steps. The different steps are:

Scan

Selects columns from a table, applies projections, and finally filters the table.

Sort

Sorts a table for order by expressions.

Set

Applies the operators union/union all/except/intersect.

Aggregate

Applies an aggregation/group by.

Join

Joins multiple tables together.

Planner Output

The logical plan is quite simple and contains the information required to convert it into a physical plan (execution).

Executing

Finally, we can actually execute the SQL query. The Python engine is not fast, but it's very small (~400 LOC)! It iterates the DAG with a queue and runs each step, passing each intermediary table to the next step.

In order to keep things simple, it evaluates expressions with eval. Because SQLGlot was built primarily to be a transpiler, it was simple to create a "Python SQL" dialect. So a SQL expression x + 1 can just be converted into scope['x'] + 1.

Executor Output

What's next

SQLGlot's main focus will always be on parsing/transpiling, but I plan to continue development on the execution engine. I'd like to pass TPC-DS. If someone doesn't beat me to it, I may even take a stab at writing a Pandas/Arrow execution engine.

I'm hoping that over time, SQLGlot will spark the Python SQL ecosystem just like Calcite has for Java.

Special thanks

SQLGlot would not be what it is without it's core contributors. In particular, the execution engine would not exist without Barak Alon and George Sittas.

Get in touch

If you'd like to chat more about SQLGlot, please join my Slack Channel!


 1"""
 2.. include:: ../../posts/python_sql_engine.md
 3
 4----
 5"""
 6
 7from __future__ import annotations
 8
 9import logging
10import time
11import typing as t
12
13from sqlglot import exp
14from sqlglot.errors import ExecuteError
15from sqlglot.executor.python import PythonExecutor
16from sqlglot.executor.table import Table, ensure_tables
17from sqlglot.helper import dict_depth
18from sqlglot.optimizer import optimize
19from sqlglot.optimizer.annotate_types import annotate_types
20from sqlglot.planner import Plan
21from sqlglot.schema import ensure_schema, flatten_schema, nested_get, nested_set
22
23logger = logging.getLogger("sqlglot")
24
25if t.TYPE_CHECKING:
26    from sqlglot.dialects.dialect import DialectType
27    from sqlglot.expressions import Expression
28    from sqlglot.schema import Schema
29
30
31def execute(
32    sql: str | Expression,
33    schema: t.Optional[t.Dict | Schema] = None,
34    read: DialectType = None,
35    dialect: DialectType = None,
36    tables: t.Optional[t.Dict] = None,
37) -> Table:
38    """
39    Run a sql query against data.
40
41    Args:
42        sql: a sql statement.
43        schema: database schema.
44            This can either be an instance of `Schema` or a mapping in one of the following forms:
45            1. {table: {col: type}}
46            2. {db: {table: {col: type}}}
47            3. {catalog: {db: {table: {col: type}}}}
48        read: the SQL dialect to apply during parsing (eg. "spark", "hive", "presto", "mysql").
49        dialect: the SQL dialect (alias for read).
50        tables: additional tables to register.
51
52    Returns:
53        Simple columnar data structure.
54    """
55    read = read or dialect
56    tables_ = ensure_tables(tables, dialect=read)
57
58    if not schema:
59        schema = {}
60        flattened_tables = flatten_schema(tables_.mapping, depth=dict_depth(tables_.mapping))
61
62        for keys in flattened_tables:
63            table = nested_get(tables_.mapping, *zip(keys, keys))
64            assert table is not None
65
66            for column in table.columns:
67                value = table[0][column]
68                column_type = annotate_types(exp.convert(value)).type or type(value).__name__
69                nested_set(schema, [*keys, column], column_type)
70
71    schema = ensure_schema(schema, dialect=read)
72
73    if tables_.supported_table_args and tables_.supported_table_args != schema.supported_table_args:
74        raise ExecuteError("Tables must support the same table args as schema")
75
76    now = time.time()
77    expression = optimize(
78        sql, schema, leave_tables_isolated=True, infer_csv_schemas=True, dialect=read
79    )
80
81    logger.debug("Optimization finished: %f", time.time() - now)
82    logger.debug("Optimized SQL: %s", expression.sql(pretty=True))
83
84    plan = Plan(expression)
85
86    logger.debug("Logical Plan: %s", plan)
87
88    now = time.time()
89    result = PythonExecutor(tables=tables_).execute(plan)
90
91    logger.debug("Query finished: %f", time.time() - now)
92
93    return result
logger = <Logger sqlglot (WARNING)>
def execute( sql: str | sqlglot.expressions.Expression, schema: Union[Dict, sqlglot.schema.Schema, NoneType] = None, read: Union[str, sqlglot.dialects.dialect.Dialect, Type[sqlglot.dialects.dialect.Dialect], NoneType] = None, dialect: Union[str, sqlglot.dialects.dialect.Dialect, Type[sqlglot.dialects.dialect.Dialect], NoneType] = None, tables: Optional[Dict] = None) -> sqlglot.executor.table.Table:
32def execute(
33    sql: str | Expression,
34    schema: t.Optional[t.Dict | Schema] = None,
35    read: DialectType = None,
36    dialect: DialectType = None,
37    tables: t.Optional[t.Dict] = None,
38) -> Table:
39    """
40    Run a sql query against data.
41
42    Args:
43        sql: a sql statement.
44        schema: database schema.
45            This can either be an instance of `Schema` or a mapping in one of the following forms:
46            1. {table: {col: type}}
47            2. {db: {table: {col: type}}}
48            3. {catalog: {db: {table: {col: type}}}}
49        read: the SQL dialect to apply during parsing (eg. "spark", "hive", "presto", "mysql").
50        dialect: the SQL dialect (alias for read).
51        tables: additional tables to register.
52
53    Returns:
54        Simple columnar data structure.
55    """
56    read = read or dialect
57    tables_ = ensure_tables(tables, dialect=read)
58
59    if not schema:
60        schema = {}
61        flattened_tables = flatten_schema(tables_.mapping, depth=dict_depth(tables_.mapping))
62
63        for keys in flattened_tables:
64            table = nested_get(tables_.mapping, *zip(keys, keys))
65            assert table is not None
66
67            for column in table.columns:
68                value = table[0][column]
69                column_type = annotate_types(exp.convert(value)).type or type(value).__name__
70                nested_set(schema, [*keys, column], column_type)
71
72    schema = ensure_schema(schema, dialect=read)
73
74    if tables_.supported_table_args and tables_.supported_table_args != schema.supported_table_args:
75        raise ExecuteError("Tables must support the same table args as schema")
76
77    now = time.time()
78    expression = optimize(
79        sql, schema, leave_tables_isolated=True, infer_csv_schemas=True, dialect=read
80    )
81
82    logger.debug("Optimization finished: %f", time.time() - now)
83    logger.debug("Optimized SQL: %s", expression.sql(pretty=True))
84
85    plan = Plan(expression)
86
87    logger.debug("Logical Plan: %s", plan)
88
89    now = time.time()
90    result = PythonExecutor(tables=tables_).execute(plan)
91
92    logger.debug("Query finished: %f", time.time() - now)
93
94    return result

Run a sql query against data.

Arguments:
  • sql: a sql statement.
  • schema: database schema. This can either be an instance of Schema or a mapping in one of the following forms:
    1. {table: {col: type}}
    2. {db: {table: {col: type}}}
    3. {catalog: {db: {table: {col: type}}}}
  • read: the SQL dialect to apply during parsing (eg. "spark", "hive", "presto", "mysql").
  • dialect: the SQL dialect (alias for read).
  • tables: additional tables to register.
Returns:

Simple columnar data structure.