Edit on GitHub

sqlglot.lineage

  1from __future__ import annotations
  2
  3import json
  4import logging
  5import typing as t
  6from dataclasses import dataclass, field
  7
  8from sqlglot import Schema, exp, maybe_parse
  9from sqlglot.errors import SqlglotError
 10from sqlglot.optimizer import Scope, build_scope, find_all_in_scope, normalize_identifiers, qualify
 11from sqlglot.optimizer.scope import ScopeType
 12
 13if t.TYPE_CHECKING:
 14    from sqlglot.dialects.dialect import DialectType
 15
 16logger = logging.getLogger("sqlglot")
 17
 18
 19@dataclass(frozen=True)
 20class Node:
 21    name: str
 22    expression: exp.Expression
 23    source: exp.Expression
 24    downstream: t.List[Node] = field(default_factory=list)
 25    source_name: str = ""
 26    reference_node_name: str = ""
 27
 28    def walk(self) -> t.Iterator[Node]:
 29        yield self
 30
 31        for d in self.downstream:
 32            yield from d.walk()
 33
 34    def to_html(self, dialect: DialectType = None, **opts) -> GraphHTML:
 35        nodes = {}
 36        edges = []
 37
 38        for node in self.walk():
 39            if isinstance(node.expression, exp.Table):
 40                label = f"FROM {node.expression.this}"
 41                title = f"<pre>SELECT {node.name} FROM {node.expression.this}</pre>"
 42                group = 1
 43            else:
 44                label = node.expression.sql(pretty=True, dialect=dialect)
 45                source = node.source.transform(
 46                    lambda n: (
 47                        exp.Tag(this=n, prefix="<b>", postfix="</b>") if n is node.expression else n
 48                    ),
 49                    copy=False,
 50                ).sql(pretty=True, dialect=dialect)
 51                title = f"<pre>{source}</pre>"
 52                group = 0
 53
 54            node_id = id(node)
 55
 56            nodes[node_id] = {
 57                "id": node_id,
 58                "label": label,
 59                "title": title,
 60                "group": group,
 61            }
 62
 63            for d in node.downstream:
 64                edges.append({"from": node_id, "to": id(d)})
 65        return GraphHTML(nodes, edges, **opts)
 66
 67
 68def lineage(
 69    column: str | exp.Column,
 70    sql: str | exp.Expression,
 71    schema: t.Optional[t.Dict | Schema] = None,
 72    sources: t.Optional[t.Mapping[str, str | exp.Query]] = None,
 73    dialect: DialectType = None,
 74    scope: t.Optional[Scope] = None,
 75    trim_selects: bool = True,
 76    **kwargs,
 77) -> Node:
 78    """Build the lineage graph for a column of a SQL query.
 79
 80    Args:
 81        column: The column to build the lineage for.
 82        sql: The SQL string or expression.
 83        schema: The schema of tables.
 84        sources: A mapping of queries which will be used to continue building lineage.
 85        dialect: The dialect of input SQL.
 86        scope: A pre-created scope to use instead.
 87        trim_selects: Whether or not to clean up selects by trimming to only relevant columns.
 88        **kwargs: Qualification optimizer kwargs.
 89
 90    Returns:
 91        A lineage node.
 92    """
 93
 94    expression = maybe_parse(sql, dialect=dialect)
 95    column = normalize_identifiers.normalize_identifiers(column, dialect=dialect).name
 96
 97    if sources:
 98        expression = exp.expand(
 99            expression,
100            {k: t.cast(exp.Query, maybe_parse(v, dialect=dialect)) for k, v in sources.items()},
101            dialect=dialect,
102        )
103
104    if not scope:
105        expression = qualify.qualify(
106            expression,
107            dialect=dialect,
108            schema=schema,
109            **{"validate_qualify_columns": False, "identify": False, **kwargs},  # type: ignore
110        )
111
112        scope = build_scope(expression)
113
114    if not scope:
115        raise SqlglotError("Cannot build lineage, sql must be SELECT")
116
117    if not any(select.alias_or_name == column for select in scope.expression.selects):
118        raise SqlglotError(f"Cannot find column '{column}' in query.")
119
120    return to_node(column, scope, dialect, trim_selects=trim_selects)
121
122
123def to_node(
124    column: str | int,
125    scope: Scope,
126    dialect: DialectType,
127    scope_name: t.Optional[str] = None,
128    upstream: t.Optional[Node] = None,
129    source_name: t.Optional[str] = None,
130    reference_node_name: t.Optional[str] = None,
131    trim_selects: bool = True,
132) -> Node:
133    # Find the specific select clause that is the source of the column we want.
134    # This can either be a specific, named select or a generic `*` clause.
135    select = (
136        scope.expression.selects[column]
137        if isinstance(column, int)
138        else next(
139            (select for select in scope.expression.selects if select.alias_or_name == column),
140            exp.Star() if scope.expression.is_star else scope.expression,
141        )
142    )
143
144    if isinstance(scope.expression, exp.Subquery):
145        for source in scope.subquery_scopes:
146            return to_node(
147                column,
148                scope=source,
149                dialect=dialect,
150                upstream=upstream,
151                source_name=source_name,
152                reference_node_name=reference_node_name,
153                trim_selects=trim_selects,
154            )
155    if isinstance(scope.expression, exp.Union):
156        upstream = upstream or Node(name="UNION", source=scope.expression, expression=select)
157
158        index = (
159            column
160            if isinstance(column, int)
161            else next(
162                (
163                    i
164                    for i, select in enumerate(scope.expression.selects)
165                    if select.alias_or_name == column or select.is_star
166                ),
167                -1,  # mypy will not allow a None here, but a negative index should never be returned
168            )
169        )
170
171        if index == -1:
172            raise ValueError(f"Could not find {column} in {scope.expression}")
173
174        for s in scope.union_scopes:
175            to_node(
176                index,
177                scope=s,
178                dialect=dialect,
179                upstream=upstream,
180                source_name=source_name,
181                reference_node_name=reference_node_name,
182                trim_selects=trim_selects,
183            )
184
185        return upstream
186
187    if trim_selects and isinstance(scope.expression, exp.Select):
188        # For better ergonomics in our node labels, replace the full select with
189        # a version that has only the column we care about.
190        #   "x", SELECT x, y FROM foo
191        #     => "x", SELECT x FROM foo
192        source = t.cast(exp.Expression, scope.expression.select(select, append=False))
193    else:
194        source = scope.expression
195
196    # Create the node for this step in the lineage chain, and attach it to the previous one.
197    node = Node(
198        name=f"{scope_name}.{column}" if scope_name else str(column),
199        source=source,
200        expression=select,
201        source_name=source_name or "",
202        reference_node_name=reference_node_name or "",
203    )
204
205    if upstream:
206        upstream.downstream.append(node)
207
208    subquery_scopes = {
209        id(subquery_scope.expression): subquery_scope for subquery_scope in scope.subquery_scopes
210    }
211
212    for subquery in find_all_in_scope(select, exp.UNWRAPPED_QUERIES):
213        subquery_scope = subquery_scopes.get(id(subquery))
214        if not subquery_scope:
215            logger.warning(f"Unknown subquery scope: {subquery.sql(dialect=dialect)}")
216            continue
217
218        for name in subquery.named_selects:
219            to_node(
220                name,
221                scope=subquery_scope,
222                dialect=dialect,
223                upstream=node,
224                trim_selects=trim_selects,
225            )
226
227    # if the select is a star add all scope sources as downstreams
228    if select.is_star:
229        for source in scope.sources.values():
230            if isinstance(source, Scope):
231                source = source.expression
232            node.downstream.append(Node(name=select.sql(), source=source, expression=source))
233
234    # Find all columns that went into creating this one to list their lineage nodes.
235    source_columns = set(find_all_in_scope(select, exp.Column))
236
237    # If the source is a UDTF find columns used in the UTDF to generate the table
238    if isinstance(source, exp.UDTF):
239        source_columns |= set(source.find_all(exp.Column))
240        derived_tables = [
241            source.expression.parent
242            for source in scope.sources.values()
243            if isinstance(source, Scope) and source.is_derived_table
244        ]
245    else:
246        derived_tables = scope.derived_tables
247
248    source_names = {
249        dt.alias: dt.comments[0].split()[1]
250        for dt in derived_tables
251        if dt.comments and dt.comments[0].startswith("source: ")
252    }
253
254    for c in source_columns:
255        table = c.table
256        source = scope.sources.get(table)
257
258        if isinstance(source, Scope):
259            reference_node_name = None
260            if source.scope_type == ScopeType.DERIVED_TABLE and table not in source_names:
261                reference_node_name = table
262            elif source.scope_type == ScopeType.CTE:
263                selected_node, _ = scope.selected_sources.get(table, (None, None))
264                reference_node_name = selected_node.name if selected_node else None
265            # The table itself came from a more specific scope. Recurse into that one using the unaliased column name.
266            to_node(
267                c.name,
268                scope=source,
269                dialect=dialect,
270                scope_name=table,
271                upstream=node,
272                source_name=source_names.get(table) or source_name,
273                reference_node_name=reference_node_name,
274                trim_selects=trim_selects,
275            )
276        else:
277            # The source is not a scope - we've reached the end of the line. At this point, if a source is not found
278            # it means this column's lineage is unknown. This can happen if the definition of a source used in a query
279            # is not passed into the `sources` map.
280            source = source or exp.Placeholder()
281            node.downstream.append(Node(name=c.sql(), source=source, expression=source))
282
283    return node
284
285
286class GraphHTML:
287    """Node to HTML generator using vis.js.
288
289    https://visjs.github.io/vis-network/docs/network/
290    """
291
292    def __init__(
293        self, nodes: t.Dict, edges: t.List, imports: bool = True, options: t.Optional[t.Dict] = None
294    ):
295        self.imports = imports
296
297        self.options = {
298            "height": "500px",
299            "width": "100%",
300            "layout": {
301                "hierarchical": {
302                    "enabled": True,
303                    "nodeSpacing": 200,
304                    "sortMethod": "directed",
305                },
306            },
307            "interaction": {
308                "dragNodes": False,
309                "selectable": False,
310            },
311            "physics": {
312                "enabled": False,
313            },
314            "edges": {
315                "arrows": "to",
316            },
317            "nodes": {
318                "font": "20px monaco",
319                "shape": "box",
320                "widthConstraint": {
321                    "maximum": 300,
322                },
323            },
324            **(options or {}),
325        }
326
327        self.nodes = nodes
328        self.edges = edges
329
330    def __str__(self):
331        nodes = json.dumps(list(self.nodes.values()))
332        edges = json.dumps(self.edges)
333        options = json.dumps(self.options)
334        imports = (
335            """<script type="text/javascript" src="https://unpkg.com/vis-data@latest/peer/umd/vis-data.min.js"></script>
336  <script type="text/javascript" src="https://unpkg.com/vis-network@latest/peer/umd/vis-network.min.js"></script>
337  <link rel="stylesheet" type="text/css" href="https://unpkg.com/vis-network/styles/vis-network.min.css" />"""
338            if self.imports
339            else ""
340        )
341
342        return f"""<div>
343  <div id="sqlglot-lineage"></div>
344  {imports}
345  <script type="text/javascript">
346    var nodes = new vis.DataSet({nodes})
347    nodes.forEach(row => row["title"] = new DOMParser().parseFromString(row["title"], "text/html").body.childNodes[0])
348
349    new vis.Network(
350        document.getElementById("sqlglot-lineage"),
351        {{
352            nodes: nodes,
353            edges: new vis.DataSet({edges})
354        }},
355        {options},
356    )
357  </script>
358</div>"""
359
360    def _repr_html_(self) -> str:
361        return self.__str__()
logger = <Logger sqlglot (WARNING)>
@dataclass(frozen=True)
class Node:
20@dataclass(frozen=True)
21class Node:
22    name: str
23    expression: exp.Expression
24    source: exp.Expression
25    downstream: t.List[Node] = field(default_factory=list)
26    source_name: str = ""
27    reference_node_name: str = ""
28
29    def walk(self) -> t.Iterator[Node]:
30        yield self
31
32        for d in self.downstream:
33            yield from d.walk()
34
35    def to_html(self, dialect: DialectType = None, **opts) -> GraphHTML:
36        nodes = {}
37        edges = []
38
39        for node in self.walk():
40            if isinstance(node.expression, exp.Table):
41                label = f"FROM {node.expression.this}"
42                title = f"<pre>SELECT {node.name} FROM {node.expression.this}</pre>"
43                group = 1
44            else:
45                label = node.expression.sql(pretty=True, dialect=dialect)
46                source = node.source.transform(
47                    lambda n: (
48                        exp.Tag(this=n, prefix="<b>", postfix="</b>") if n is node.expression else n
49                    ),
50                    copy=False,
51                ).sql(pretty=True, dialect=dialect)
52                title = f"<pre>{source}</pre>"
53                group = 0
54
55            node_id = id(node)
56
57            nodes[node_id] = {
58                "id": node_id,
59                "label": label,
60                "title": title,
61                "group": group,
62            }
63
64            for d in node.downstream:
65                edges.append({"from": node_id, "to": id(d)})
66        return GraphHTML(nodes, edges, **opts)
Node( name: str, expression: sqlglot.expressions.Expression, source: sqlglot.expressions.Expression, downstream: List[Node] = <factory>, source_name: str = '', reference_node_name: str = '')
name: str
downstream: List[Node]
source_name: str = ''
reference_node_name: str = ''
def walk(self) -> Iterator[Node]:
29    def walk(self) -> t.Iterator[Node]:
30        yield self
31
32        for d in self.downstream:
33            yield from d.walk()
def to_html( self, dialect: Union[str, sqlglot.dialects.dialect.Dialect, Type[sqlglot.dialects.dialect.Dialect], NoneType] = None, **opts) -> GraphHTML:
35    def to_html(self, dialect: DialectType = None, **opts) -> GraphHTML:
36        nodes = {}
37        edges = []
38
39        for node in self.walk():
40            if isinstance(node.expression, exp.Table):
41                label = f"FROM {node.expression.this}"
42                title = f"<pre>SELECT {node.name} FROM {node.expression.this}</pre>"
43                group = 1
44            else:
45                label = node.expression.sql(pretty=True, dialect=dialect)
46                source = node.source.transform(
47                    lambda n: (
48                        exp.Tag(this=n, prefix="<b>", postfix="</b>") if n is node.expression else n
49                    ),
50                    copy=False,
51                ).sql(pretty=True, dialect=dialect)
52                title = f"<pre>{source}</pre>"
53                group = 0
54
55            node_id = id(node)
56
57            nodes[node_id] = {
58                "id": node_id,
59                "label": label,
60                "title": title,
61                "group": group,
62            }
63
64            for d in node.downstream:
65                edges.append({"from": node_id, "to": id(d)})
66        return GraphHTML(nodes, edges, **opts)
def lineage( column: str | sqlglot.expressions.Column, sql: str | sqlglot.expressions.Expression, schema: Union[Dict, sqlglot.schema.Schema, NoneType] = None, sources: Optional[Mapping[str, str | sqlglot.expressions.Query]] = None, dialect: Union[str, sqlglot.dialects.dialect.Dialect, Type[sqlglot.dialects.dialect.Dialect], NoneType] = None, scope: Optional[sqlglot.optimizer.scope.Scope] = None, trim_selects: bool = True, **kwargs) -> Node:
 69def lineage(
 70    column: str | exp.Column,
 71    sql: str | exp.Expression,
 72    schema: t.Optional[t.Dict | Schema] = None,
 73    sources: t.Optional[t.Mapping[str, str | exp.Query]] = None,
 74    dialect: DialectType = None,
 75    scope: t.Optional[Scope] = None,
 76    trim_selects: bool = True,
 77    **kwargs,
 78) -> Node:
 79    """Build the lineage graph for a column of a SQL query.
 80
 81    Args:
 82        column: The column to build the lineage for.
 83        sql: The SQL string or expression.
 84        schema: The schema of tables.
 85        sources: A mapping of queries which will be used to continue building lineage.
 86        dialect: The dialect of input SQL.
 87        scope: A pre-created scope to use instead.
 88        trim_selects: Whether or not to clean up selects by trimming to only relevant columns.
 89        **kwargs: Qualification optimizer kwargs.
 90
 91    Returns:
 92        A lineage node.
 93    """
 94
 95    expression = maybe_parse(sql, dialect=dialect)
 96    column = normalize_identifiers.normalize_identifiers(column, dialect=dialect).name
 97
 98    if sources:
 99        expression = exp.expand(
100            expression,
101            {k: t.cast(exp.Query, maybe_parse(v, dialect=dialect)) for k, v in sources.items()},
102            dialect=dialect,
103        )
104
105    if not scope:
106        expression = qualify.qualify(
107            expression,
108            dialect=dialect,
109            schema=schema,
110            **{"validate_qualify_columns": False, "identify": False, **kwargs},  # type: ignore
111        )
112
113        scope = build_scope(expression)
114
115    if not scope:
116        raise SqlglotError("Cannot build lineage, sql must be SELECT")
117
118    if not any(select.alias_or_name == column for select in scope.expression.selects):
119        raise SqlglotError(f"Cannot find column '{column}' in query.")
120
121    return to_node(column, scope, dialect, trim_selects=trim_selects)

Build the lineage graph for a column of a SQL query.

Arguments:
  • column: The column to build the lineage for.
  • sql: The SQL string or expression.
  • schema: The schema of tables.
  • sources: A mapping of queries which will be used to continue building lineage.
  • dialect: The dialect of input SQL.
  • scope: A pre-created scope to use instead.
  • trim_selects: Whether or not to clean up selects by trimming to only relevant columns.
  • **kwargs: Qualification optimizer kwargs.
Returns:

A lineage node.

def to_node( column: str | int, scope: sqlglot.optimizer.scope.Scope, dialect: Union[str, sqlglot.dialects.dialect.Dialect, Type[sqlglot.dialects.dialect.Dialect], NoneType], scope_name: Optional[str] = None, upstream: Optional[Node] = None, source_name: Optional[str] = None, reference_node_name: Optional[str] = None, trim_selects: bool = True) -> Node:
124def to_node(
125    column: str | int,
126    scope: Scope,
127    dialect: DialectType,
128    scope_name: t.Optional[str] = None,
129    upstream: t.Optional[Node] = None,
130    source_name: t.Optional[str] = None,
131    reference_node_name: t.Optional[str] = None,
132    trim_selects: bool = True,
133) -> Node:
134    # Find the specific select clause that is the source of the column we want.
135    # This can either be a specific, named select or a generic `*` clause.
136    select = (
137        scope.expression.selects[column]
138        if isinstance(column, int)
139        else next(
140            (select for select in scope.expression.selects if select.alias_or_name == column),
141            exp.Star() if scope.expression.is_star else scope.expression,
142        )
143    )
144
145    if isinstance(scope.expression, exp.Subquery):
146        for source in scope.subquery_scopes:
147            return to_node(
148                column,
149                scope=source,
150                dialect=dialect,
151                upstream=upstream,
152                source_name=source_name,
153                reference_node_name=reference_node_name,
154                trim_selects=trim_selects,
155            )
156    if isinstance(scope.expression, exp.Union):
157        upstream = upstream or Node(name="UNION", source=scope.expression, expression=select)
158
159        index = (
160            column
161            if isinstance(column, int)
162            else next(
163                (
164                    i
165                    for i, select in enumerate(scope.expression.selects)
166                    if select.alias_or_name == column or select.is_star
167                ),
168                -1,  # mypy will not allow a None here, but a negative index should never be returned
169            )
170        )
171
172        if index == -1:
173            raise ValueError(f"Could not find {column} in {scope.expression}")
174
175        for s in scope.union_scopes:
176            to_node(
177                index,
178                scope=s,
179                dialect=dialect,
180                upstream=upstream,
181                source_name=source_name,
182                reference_node_name=reference_node_name,
183                trim_selects=trim_selects,
184            )
185
186        return upstream
187
188    if trim_selects and isinstance(scope.expression, exp.Select):
189        # For better ergonomics in our node labels, replace the full select with
190        # a version that has only the column we care about.
191        #   "x", SELECT x, y FROM foo
192        #     => "x", SELECT x FROM foo
193        source = t.cast(exp.Expression, scope.expression.select(select, append=False))
194    else:
195        source = scope.expression
196
197    # Create the node for this step in the lineage chain, and attach it to the previous one.
198    node = Node(
199        name=f"{scope_name}.{column}" if scope_name else str(column),
200        source=source,
201        expression=select,
202        source_name=source_name or "",
203        reference_node_name=reference_node_name or "",
204    )
205
206    if upstream:
207        upstream.downstream.append(node)
208
209    subquery_scopes = {
210        id(subquery_scope.expression): subquery_scope for subquery_scope in scope.subquery_scopes
211    }
212
213    for subquery in find_all_in_scope(select, exp.UNWRAPPED_QUERIES):
214        subquery_scope = subquery_scopes.get(id(subquery))
215        if not subquery_scope:
216            logger.warning(f"Unknown subquery scope: {subquery.sql(dialect=dialect)}")
217            continue
218
219        for name in subquery.named_selects:
220            to_node(
221                name,
222                scope=subquery_scope,
223                dialect=dialect,
224                upstream=node,
225                trim_selects=trim_selects,
226            )
227
228    # if the select is a star add all scope sources as downstreams
229    if select.is_star:
230        for source in scope.sources.values():
231            if isinstance(source, Scope):
232                source = source.expression
233            node.downstream.append(Node(name=select.sql(), source=source, expression=source))
234
235    # Find all columns that went into creating this one to list their lineage nodes.
236    source_columns = set(find_all_in_scope(select, exp.Column))
237
238    # If the source is a UDTF find columns used in the UTDF to generate the table
239    if isinstance(source, exp.UDTF):
240        source_columns |= set(source.find_all(exp.Column))
241        derived_tables = [
242            source.expression.parent
243            for source in scope.sources.values()
244            if isinstance(source, Scope) and source.is_derived_table
245        ]
246    else:
247        derived_tables = scope.derived_tables
248
249    source_names = {
250        dt.alias: dt.comments[0].split()[1]
251        for dt in derived_tables
252        if dt.comments and dt.comments[0].startswith("source: ")
253    }
254
255    for c in source_columns:
256        table = c.table
257        source = scope.sources.get(table)
258
259        if isinstance(source, Scope):
260            reference_node_name = None
261            if source.scope_type == ScopeType.DERIVED_TABLE and table not in source_names:
262                reference_node_name = table
263            elif source.scope_type == ScopeType.CTE:
264                selected_node, _ = scope.selected_sources.get(table, (None, None))
265                reference_node_name = selected_node.name if selected_node else None
266            # The table itself came from a more specific scope. Recurse into that one using the unaliased column name.
267            to_node(
268                c.name,
269                scope=source,
270                dialect=dialect,
271                scope_name=table,
272                upstream=node,
273                source_name=source_names.get(table) or source_name,
274                reference_node_name=reference_node_name,
275                trim_selects=trim_selects,
276            )
277        else:
278            # The source is not a scope - we've reached the end of the line. At this point, if a source is not found
279            # it means this column's lineage is unknown. This can happen if the definition of a source used in a query
280            # is not passed into the `sources` map.
281            source = source or exp.Placeholder()
282            node.downstream.append(Node(name=c.sql(), source=source, expression=source))
283
284    return node
class GraphHTML:
287class GraphHTML:
288    """Node to HTML generator using vis.js.
289
290    https://visjs.github.io/vis-network/docs/network/
291    """
292
293    def __init__(
294        self, nodes: t.Dict, edges: t.List, imports: bool = True, options: t.Optional[t.Dict] = None
295    ):
296        self.imports = imports
297
298        self.options = {
299            "height": "500px",
300            "width": "100%",
301            "layout": {
302                "hierarchical": {
303                    "enabled": True,
304                    "nodeSpacing": 200,
305                    "sortMethod": "directed",
306                },
307            },
308            "interaction": {
309                "dragNodes": False,
310                "selectable": False,
311            },
312            "physics": {
313                "enabled": False,
314            },
315            "edges": {
316                "arrows": "to",
317            },
318            "nodes": {
319                "font": "20px monaco",
320                "shape": "box",
321                "widthConstraint": {
322                    "maximum": 300,
323                },
324            },
325            **(options or {}),
326        }
327
328        self.nodes = nodes
329        self.edges = edges
330
331    def __str__(self):
332        nodes = json.dumps(list(self.nodes.values()))
333        edges = json.dumps(self.edges)
334        options = json.dumps(self.options)
335        imports = (
336            """<script type="text/javascript" src="https://unpkg.com/vis-data@latest/peer/umd/vis-data.min.js"></script>
337  <script type="text/javascript" src="https://unpkg.com/vis-network@latest/peer/umd/vis-network.min.js"></script>
338  <link rel="stylesheet" type="text/css" href="https://unpkg.com/vis-network/styles/vis-network.min.css" />"""
339            if self.imports
340            else ""
341        )
342
343        return f"""<div>
344  <div id="sqlglot-lineage"></div>
345  {imports}
346  <script type="text/javascript">
347    var nodes = new vis.DataSet({nodes})
348    nodes.forEach(row => row["title"] = new DOMParser().parseFromString(row["title"], "text/html").body.childNodes[0])
349
350    new vis.Network(
351        document.getElementById("sqlglot-lineage"),
352        {{
353            nodes: nodes,
354            edges: new vis.DataSet({edges})
355        }},
356        {options},
357    )
358  </script>
359</div>"""
360
361    def _repr_html_(self) -> str:
362        return self.__str__()

Node to HTML generator using vis.js.

https://visjs.github.io/vis-network/docs/network/

GraphHTML( nodes: Dict, edges: List, imports: bool = True, options: Optional[Dict] = None)
293    def __init__(
294        self, nodes: t.Dict, edges: t.List, imports: bool = True, options: t.Optional[t.Dict] = None
295    ):
296        self.imports = imports
297
298        self.options = {
299            "height": "500px",
300            "width": "100%",
301            "layout": {
302                "hierarchical": {
303                    "enabled": True,
304                    "nodeSpacing": 200,
305                    "sortMethod": "directed",
306                },
307            },
308            "interaction": {
309                "dragNodes": False,
310                "selectable": False,
311            },
312            "physics": {
313                "enabled": False,
314            },
315            "edges": {
316                "arrows": "to",
317            },
318            "nodes": {
319                "font": "20px monaco",
320                "shape": "box",
321                "widthConstraint": {
322                    "maximum": 300,
323                },
324            },
325            **(options or {}),
326        }
327
328        self.nodes = nodes
329        self.edges = edges
imports
options
nodes
edges