Edit on GitHub

sqlglot.jsonpath

  1from __future__ import annotations
  2
  3import typing as t
  4
  5import sqlglot.expressions as exp
  6from sqlglot.errors import ParseError
  7from sqlglot.tokens import Token, Tokenizer, TokenType
  8
  9if t.TYPE_CHECKING:
 10    from sqlglot.dialects.dialect import DialectType
 11    from collections.abc import Collection
 12
 13
 14class JSONPathTokenizer(Tokenizer):
 15    SINGLE_TOKENS = {
 16        "(": TokenType.L_PAREN,
 17        ")": TokenType.R_PAREN,
 18        "[": TokenType.L_BRACKET,
 19        "]": TokenType.R_BRACKET,
 20        ":": TokenType.COLON,
 21        ",": TokenType.COMMA,
 22        "-": TokenType.DASH,
 23        ".": TokenType.DOT,
 24        "?": TokenType.PLACEHOLDER,
 25        "@": TokenType.PARAMETER,
 26        "'": TokenType.QUOTE,
 27        '"': TokenType.QUOTE,
 28        "$": TokenType.DOLLAR,
 29        "*": TokenType.STAR,
 30    }
 31
 32    KEYWORDS = {
 33        "..": TokenType.DOT,
 34    }
 35
 36    IDENTIFIER_ESCAPES = ["\\"]
 37    STRING_ESCAPES = ["\\"]
 38    NUMBERS_CAN_HAVE_DECIMALS = False
 39
 40    VAR_TOKENS = {
 41        TokenType.VAR,
 42    }
 43
 44
 45def parse(path: str, dialect: DialectType = None) -> exp.JSONPath:
 46    """Takes in a JSON path string and parses it into a JSONPath expression."""
 47    from sqlglot.dialects import Dialect
 48
 49    dialect_inst = Dialect.get_or_raise(dialect)
 50    jsonpath_tokenizer = dialect_inst.jsonpath_tokenizer()
 51    tokens = jsonpath_tokenizer.tokenize(path)
 52    size = len(tokens)
 53
 54    i = 0
 55
 56    def _curr() -> TokenType | None:
 57        return tokens[i].token_type if i < size else None
 58
 59    def _prev() -> Token:
 60        return tokens[i - 1]
 61
 62    def _advance() -> Token:
 63        nonlocal i
 64        i += 1
 65        return _prev()
 66
 67    def _error(msg: str) -> str:
 68        return f"{msg} at index {i}: {path}"
 69
 70    @t.overload
 71    def _match(token_type: TokenType, raise_unmatched: t.Literal[True] = True) -> Token:
 72        pass
 73
 74    @t.overload
 75    def _match(token_type: TokenType, raise_unmatched: t.Literal[False] = False) -> Token | None:
 76        pass
 77
 78    def _match(token_type, raise_unmatched=False):
 79        if _curr() == token_type:
 80            return _advance()
 81        if raise_unmatched:
 82            raise ParseError(_error(f"Expected {token_type}"))
 83        return None
 84
 85    def _match_set(types: Collection[TokenType]) -> Token | None:
 86        return _advance() if _curr() in types else None
 87
 88    def _parse_literal() -> t.Any:
 89        token = _match(TokenType.STRING) or _match(TokenType.IDENTIFIER)
 90        if token:
 91            return token.text
 92        if _match(TokenType.STAR):
 93            return exp.JSONPathWildcard()
 94        if _match(TokenType.PLACEHOLDER) or _match(TokenType.L_PAREN):
 95            script = _prev().text == "("
 96            start = i
 97
 98            while True:
 99                if _match(TokenType.L_BRACKET):
100                    _parse_bracket()  # nested call which we can throw away
101                if _curr() in (TokenType.R_BRACKET, None):
102                    break
103                _advance()
104
105            expr_type = exp.JSONPathScript if script else exp.JSONPathFilter
106            return expr_type(this=path[tokens[start].start : tokens[i].end])
107
108        number = "-" if _match(TokenType.DASH) else ""
109
110        token = _match(TokenType.NUMBER)
111        if token:
112            number += token.text
113
114        if number:
115            return int(number)
116
117        return False
118
119    def _parse_slice() -> t.Any:
120        start = _parse_literal()
121        end = _parse_literal() if _match(TokenType.COLON) else None
122        step = _parse_literal() if _match(TokenType.COLON) else None
123
124        if end is None and step is None:
125            return start
126
127        return exp.JSONPathSlice(start=start, end=end, step=step)
128
129    def _parse_bracket() -> exp.JSONPathPart:
130        literal = _parse_slice()
131
132        if isinstance(literal, str) or literal is not False:
133            indexes = [literal]
134            while _match(TokenType.COMMA):
135                literal = _parse_slice()
136
137                if literal:
138                    indexes.append(literal)
139
140            if len(indexes) == 1:
141                if isinstance(literal, str):
142                    node: exp.JSONPathPart = exp.JSONPathKey(this=indexes[0])
143                elif isinstance(literal, exp.JSONPathPart) and isinstance(
144                    literal, (exp.JSONPathScript, exp.JSONPathFilter)
145                ):
146                    node = exp.JSONPathSelector(this=indexes[0])
147                else:
148                    node = exp.JSONPathSubscript(this=indexes[0])
149            else:
150                node = exp.JSONPathUnion(expressions=indexes)
151        else:
152            raise ParseError(_error("Cannot have empty segment"))
153
154        _match(TokenType.R_BRACKET, raise_unmatched=True)
155
156        return node
157
158    def _parse_var_text() -> str:
159        """
160        Consumes & returns the text for a var. In BigQuery it's valid to have a key with spaces
161        in it, e.g JSON_QUERY(..., '$. a b c ') should produce a single JSONPathKey(' a b c ').
162        This is done by merging "consecutive" vars until a key separator is found (dot, colon etc)
163        or the path string is exhausted.
164        """
165        prev_index = i - 2
166
167        while _match_set(jsonpath_tokenizer.VAR_TOKENS):
168            pass
169
170        start = 0 if prev_index < 0 else tokens[prev_index].end + 1
171
172        if i >= len(tokens):
173            # This key is the last token for the path, so it's text is the remaining path
174            text = path[start:]
175        else:
176            text = path[start : tokens[i].start]
177
178        return text
179
180    # We canonicalize the JSON path AST so that it always starts with a
181    # "root" element, so paths like "field" will be generated as "$.field"
182    _match(TokenType.DOLLAR)
183    expressions: list[exp.JSONPathPart] = [exp.JSONPathRoot()]
184
185    while _curr():
186        if _match(TokenType.DOT) or _match(TokenType.COLON):
187            recursive = _prev().text == ".."
188
189            if _match_set(jsonpath_tokenizer.VAR_TOKENS):
190                value: str | exp.JSONPathWildcard | None = _parse_var_text()
191            elif _match(TokenType.IDENTIFIER):
192                value = _prev().text
193            elif _match(TokenType.STAR):
194                value = exp.JSONPathWildcard()
195            else:
196                value = None
197
198            if recursive:
199                expressions.append(exp.JSONPathRecursive(this=value))
200            elif value:
201                expressions.append(exp.JSONPathKey(this=value))
202            elif not dialect_inst.JSON_PATH_SINGLE_DOT_IS_WILDCARD:
203                raise ParseError(_error("Expected key name or * after DOT"))
204        elif _match(TokenType.L_BRACKET):
205            expressions.append(_parse_bracket())
206        elif _match_set(jsonpath_tokenizer.VAR_TOKENS):
207            expressions.append(exp.JSONPathKey(this=_parse_var_text()))
208        elif _match(TokenType.IDENTIFIER):
209            expressions.append(exp.JSONPathKey(this=_prev().text))
210        elif _match(TokenType.STAR):
211            expressions.append(exp.JSONPathWildcard())
212        else:
213            raise ParseError(_error(f"Unexpected {tokens[i].token_type}"))
214
215    return exp.JSONPath(expressions=expressions)
216
217
218JSON_PATH_PART_TRANSFORMS: dict[type[exp.Expr], t.Callable[..., str]] = {
219    exp.JSONPathFilter: lambda _, e: f"?{e.this}",
220    exp.JSONPathKey: lambda self, e: self._jsonpathkey_sql(e),
221    exp.JSONPathRecursive: lambda _, e: f"..{e.this or ''}",
222    exp.JSONPathRoot: lambda *_: "$",
223    exp.JSONPathScript: lambda _, e: f"({e.this}",
224    exp.JSONPathSelector: lambda self, e: f"[{self.json_path_part(e.this)}]",
225    exp.JSONPathSlice: lambda self, e: ":".join(
226        "" if p is False else self.json_path_part(p)
227        for p in [e.args.get("start"), e.args.get("end"), e.args.get("step")]
228        if p is not None
229    ),
230    exp.JSONPathSubscript: lambda self, e: self._jsonpathsubscript_sql(e),
231    exp.JSONPathUnion: lambda self, e: (
232        f"[{','.join(self.json_path_part(p) for p in e.expressions)}]"
233    ),
234    exp.JSONPathWildcard: lambda *_: "*",
235}
236
237ALL_JSON_PATH_PARTS = set(JSON_PATH_PART_TRANSFORMS)
class JSONPathTokenizer(sqlglot.tokens.Tokenizer):
15class JSONPathTokenizer(Tokenizer):
16    SINGLE_TOKENS = {
17        "(": TokenType.L_PAREN,
18        ")": TokenType.R_PAREN,
19        "[": TokenType.L_BRACKET,
20        "]": TokenType.R_BRACKET,
21        ":": TokenType.COLON,
22        ",": TokenType.COMMA,
23        "-": TokenType.DASH,
24        ".": TokenType.DOT,
25        "?": TokenType.PLACEHOLDER,
26        "@": TokenType.PARAMETER,
27        "'": TokenType.QUOTE,
28        '"': TokenType.QUOTE,
29        "$": TokenType.DOLLAR,
30        "*": TokenType.STAR,
31    }
32
33    KEYWORDS = {
34        "..": TokenType.DOT,
35    }
36
37    IDENTIFIER_ESCAPES = ["\\"]
38    STRING_ESCAPES = ["\\"]
39    NUMBERS_CAN_HAVE_DECIMALS = False
40
41    VAR_TOKENS = {
42        TokenType.VAR,
43    }
SINGLE_TOKENS = {'(': <TokenType.L_PAREN: 1>, ')': <TokenType.R_PAREN: 2>, '[': <TokenType.L_BRACKET: 3>, ']': <TokenType.R_BRACKET: 4>, ':': <TokenType.COLON: 11>, ',': <TokenType.COMMA: 7>, '-': <TokenType.DASH: 9>, '.': <TokenType.DOT: 8>, '?': <TokenType.PLACEHOLDER: 353>, '@': <TokenType.PARAMETER: 56>, "'": <TokenType.QUOTE: 365>, '"': <TokenType.QUOTE: 365>, '$': <TokenType.DOLLAR: 55>, '*': <TokenType.STAR: 20>}
KEYWORDS = {'..': <TokenType.DOT: 8>}
IDENTIFIER_ESCAPES = ['\\']
STRING_ESCAPES = ['\\']
NUMBERS_CAN_HAVE_DECIMALS = False
VAR_TOKENS = {<TokenType.VAR: 87>}
BYTE_STRING_ESCAPES: ClassVar[list[str]] = ['\\']
def parse( path: str, dialect: Union[str, sqlglot.dialects.Dialect, type[sqlglot.dialects.Dialect], NoneType] = None) -> sqlglot.expressions.query.JSONPath:
 46def parse(path: str, dialect: DialectType = None) -> exp.JSONPath:
 47    """Takes in a JSON path string and parses it into a JSONPath expression."""
 48    from sqlglot.dialects import Dialect
 49
 50    dialect_inst = Dialect.get_or_raise(dialect)
 51    jsonpath_tokenizer = dialect_inst.jsonpath_tokenizer()
 52    tokens = jsonpath_tokenizer.tokenize(path)
 53    size = len(tokens)
 54
 55    i = 0
 56
 57    def _curr() -> TokenType | None:
 58        return tokens[i].token_type if i < size else None
 59
 60    def _prev() -> Token:
 61        return tokens[i - 1]
 62
 63    def _advance() -> Token:
 64        nonlocal i
 65        i += 1
 66        return _prev()
 67
 68    def _error(msg: str) -> str:
 69        return f"{msg} at index {i}: {path}"
 70
 71    @t.overload
 72    def _match(token_type: TokenType, raise_unmatched: t.Literal[True] = True) -> Token:
 73        pass
 74
 75    @t.overload
 76    def _match(token_type: TokenType, raise_unmatched: t.Literal[False] = False) -> Token | None:
 77        pass
 78
 79    def _match(token_type, raise_unmatched=False):
 80        if _curr() == token_type:
 81            return _advance()
 82        if raise_unmatched:
 83            raise ParseError(_error(f"Expected {token_type}"))
 84        return None
 85
 86    def _match_set(types: Collection[TokenType]) -> Token | None:
 87        return _advance() if _curr() in types else None
 88
 89    def _parse_literal() -> t.Any:
 90        token = _match(TokenType.STRING) or _match(TokenType.IDENTIFIER)
 91        if token:
 92            return token.text
 93        if _match(TokenType.STAR):
 94            return exp.JSONPathWildcard()
 95        if _match(TokenType.PLACEHOLDER) or _match(TokenType.L_PAREN):
 96            script = _prev().text == "("
 97            start = i
 98
 99            while True:
100                if _match(TokenType.L_BRACKET):
101                    _parse_bracket()  # nested call which we can throw away
102                if _curr() in (TokenType.R_BRACKET, None):
103                    break
104                _advance()
105
106            expr_type = exp.JSONPathScript if script else exp.JSONPathFilter
107            return expr_type(this=path[tokens[start].start : tokens[i].end])
108
109        number = "-" if _match(TokenType.DASH) else ""
110
111        token = _match(TokenType.NUMBER)
112        if token:
113            number += token.text
114
115        if number:
116            return int(number)
117
118        return False
119
120    def _parse_slice() -> t.Any:
121        start = _parse_literal()
122        end = _parse_literal() if _match(TokenType.COLON) else None
123        step = _parse_literal() if _match(TokenType.COLON) else None
124
125        if end is None and step is None:
126            return start
127
128        return exp.JSONPathSlice(start=start, end=end, step=step)
129
130    def _parse_bracket() -> exp.JSONPathPart:
131        literal = _parse_slice()
132
133        if isinstance(literal, str) or literal is not False:
134            indexes = [literal]
135            while _match(TokenType.COMMA):
136                literal = _parse_slice()
137
138                if literal:
139                    indexes.append(literal)
140
141            if len(indexes) == 1:
142                if isinstance(literal, str):
143                    node: exp.JSONPathPart = exp.JSONPathKey(this=indexes[0])
144                elif isinstance(literal, exp.JSONPathPart) and isinstance(
145                    literal, (exp.JSONPathScript, exp.JSONPathFilter)
146                ):
147                    node = exp.JSONPathSelector(this=indexes[0])
148                else:
149                    node = exp.JSONPathSubscript(this=indexes[0])
150            else:
151                node = exp.JSONPathUnion(expressions=indexes)
152        else:
153            raise ParseError(_error("Cannot have empty segment"))
154
155        _match(TokenType.R_BRACKET, raise_unmatched=True)
156
157        return node
158
159    def _parse_var_text() -> str:
160        """
161        Consumes & returns the text for a var. In BigQuery it's valid to have a key with spaces
162        in it, e.g JSON_QUERY(..., '$. a b c ') should produce a single JSONPathKey(' a b c ').
163        This is done by merging "consecutive" vars until a key separator is found (dot, colon etc)
164        or the path string is exhausted.
165        """
166        prev_index = i - 2
167
168        while _match_set(jsonpath_tokenizer.VAR_TOKENS):
169            pass
170
171        start = 0 if prev_index < 0 else tokens[prev_index].end + 1
172
173        if i >= len(tokens):
174            # This key is the last token for the path, so it's text is the remaining path
175            text = path[start:]
176        else:
177            text = path[start : tokens[i].start]
178
179        return text
180
181    # We canonicalize the JSON path AST so that it always starts with a
182    # "root" element, so paths like "field" will be generated as "$.field"
183    _match(TokenType.DOLLAR)
184    expressions: list[exp.JSONPathPart] = [exp.JSONPathRoot()]
185
186    while _curr():
187        if _match(TokenType.DOT) or _match(TokenType.COLON):
188            recursive = _prev().text == ".."
189
190            if _match_set(jsonpath_tokenizer.VAR_TOKENS):
191                value: str | exp.JSONPathWildcard | None = _parse_var_text()
192            elif _match(TokenType.IDENTIFIER):
193                value = _prev().text
194            elif _match(TokenType.STAR):
195                value = exp.JSONPathWildcard()
196            else:
197                value = None
198
199            if recursive:
200                expressions.append(exp.JSONPathRecursive(this=value))
201            elif value:
202                expressions.append(exp.JSONPathKey(this=value))
203            elif not dialect_inst.JSON_PATH_SINGLE_DOT_IS_WILDCARD:
204                raise ParseError(_error("Expected key name or * after DOT"))
205        elif _match(TokenType.L_BRACKET):
206            expressions.append(_parse_bracket())
207        elif _match_set(jsonpath_tokenizer.VAR_TOKENS):
208            expressions.append(exp.JSONPathKey(this=_parse_var_text()))
209        elif _match(TokenType.IDENTIFIER):
210            expressions.append(exp.JSONPathKey(this=_prev().text))
211        elif _match(TokenType.STAR):
212            expressions.append(exp.JSONPathWildcard())
213        else:
214            raise ParseError(_error(f"Unexpected {tokens[i].token_type}"))
215
216    return exp.JSONPath(expressions=expressions)

Takes in a JSON path string and parses it into a JSONPath expression.

JSON_PATH_PART_TRANSFORMS: dict[type[sqlglot.expressions.core.Expr], typing.Callable[..., str]] = {<class 'sqlglot.expressions.query.JSONPathFilter'>: <function <lambda>>, <class 'sqlglot.expressions.query.JSONPathKey'>: <function <lambda>>, <class 'sqlglot.expressions.query.JSONPathRecursive'>: <function <lambda>>, <class 'sqlglot.expressions.query.JSONPathRoot'>: <function <lambda>>, <class 'sqlglot.expressions.query.JSONPathScript'>: <function <lambda>>, <class 'sqlglot.expressions.query.JSONPathSelector'>: <function <lambda>>, <class 'sqlglot.expressions.query.JSONPathSlice'>: <function <lambda>>, <class 'sqlglot.expressions.query.JSONPathSubscript'>: <function <lambda>>, <class 'sqlglot.expressions.query.JSONPathUnion'>: <function <lambda>>, <class 'sqlglot.expressions.query.JSONPathWildcard'>: <function <lambda>>}