Edit on GitHub

sqlglot.jsonpath

  1from __future__ import annotations
  2
  3import typing as t
  4
  5import sqlglot.expressions as exp
  6from sqlglot.errors import ParseError
  7from sqlglot.tokens import Token, Tokenizer, TokenType
  8
  9if t.TYPE_CHECKING:
 10    from sqlglot._typing import Lit
 11    from sqlglot.dialects.dialect import DialectType
 12
 13
 14class JSONPathTokenizer(Tokenizer):
 15    SINGLE_TOKENS = {
 16        "(": TokenType.L_PAREN,
 17        ")": TokenType.R_PAREN,
 18        "[": TokenType.L_BRACKET,
 19        "]": TokenType.R_BRACKET,
 20        ":": TokenType.COLON,
 21        ",": TokenType.COMMA,
 22        "-": TokenType.DASH,
 23        ".": TokenType.DOT,
 24        "?": TokenType.PLACEHOLDER,
 25        "@": TokenType.PARAMETER,
 26        "'": TokenType.QUOTE,
 27        '"': TokenType.QUOTE,
 28        "$": TokenType.DOLLAR,
 29        "*": TokenType.STAR,
 30    }
 31
 32    KEYWORDS = {
 33        "..": TokenType.DOT,
 34    }
 35
 36    IDENTIFIER_ESCAPES = ["\\"]
 37    STRING_ESCAPES = ["\\"]
 38
 39
 40def parse(path: str, dialect: DialectType = None) -> exp.JSONPath:
 41    """Takes in a JSON path string and parses it into a JSONPath expression."""
 42    from sqlglot.dialects import Dialect
 43
 44    jsonpath_tokenizer = Dialect.get_or_raise(dialect).jsonpath_tokenizer
 45    tokens = jsonpath_tokenizer.tokenize(path)
 46    size = len(tokens)
 47
 48    i = 0
 49
 50    def _curr() -> t.Optional[TokenType]:
 51        return tokens[i].token_type if i < size else None
 52
 53    def _prev() -> Token:
 54        return tokens[i - 1]
 55
 56    def _advance() -> Token:
 57        nonlocal i
 58        i += 1
 59        return _prev()
 60
 61    def _error(msg: str) -> str:
 62        return f"{msg} at index {i}: {path}"
 63
 64    @t.overload
 65    def _match(token_type: TokenType, raise_unmatched: Lit[True] = True) -> Token:
 66        pass
 67
 68    @t.overload
 69    def _match(token_type: TokenType, raise_unmatched: Lit[False] = False) -> t.Optional[Token]:
 70        pass
 71
 72    def _match(token_type, raise_unmatched=False):
 73        if _curr() == token_type:
 74            return _advance()
 75        if raise_unmatched:
 76            raise ParseError(_error(f"Expected {token_type}"))
 77        return None
 78
 79    def _parse_literal() -> t.Any:
 80        token = _match(TokenType.STRING) or _match(TokenType.IDENTIFIER)
 81        if token:
 82            return token.text
 83        if _match(TokenType.STAR):
 84            return exp.JSONPathWildcard()
 85        if _match(TokenType.PLACEHOLDER) or _match(TokenType.L_PAREN):
 86            script = _prev().text == "("
 87            start = i
 88
 89            while True:
 90                if _match(TokenType.L_BRACKET):
 91                    _parse_bracket()  # nested call which we can throw away
 92                if _curr() in (TokenType.R_BRACKET, None):
 93                    break
 94                _advance()
 95
 96            expr_type = exp.JSONPathScript if script else exp.JSONPathFilter
 97            return expr_type(this=path[tokens[start].start : tokens[i].end])
 98
 99        number = "-" if _match(TokenType.DASH) else ""
100
101        token = _match(TokenType.NUMBER)
102        if token:
103            number += token.text
104
105        if number:
106            return int(number)
107
108        return False
109
110    def _parse_slice() -> t.Any:
111        start = _parse_literal()
112        end = _parse_literal() if _match(TokenType.COLON) else None
113        step = _parse_literal() if _match(TokenType.COLON) else None
114
115        if end is None and step is None:
116            return start
117
118        return exp.JSONPathSlice(start=start, end=end, step=step)
119
120    def _parse_bracket() -> exp.JSONPathPart:
121        literal = _parse_slice()
122
123        if isinstance(literal, str) or literal is not False:
124            indexes = [literal]
125            while _match(TokenType.COMMA):
126                literal = _parse_slice()
127
128                if literal:
129                    indexes.append(literal)
130
131            if len(indexes) == 1:
132                if isinstance(literal, str):
133                    node: exp.JSONPathPart = exp.JSONPathKey(this=indexes[0])
134                elif isinstance(literal, exp.JSONPathPart) and isinstance(
135                    literal, (exp.JSONPathScript, exp.JSONPathFilter)
136                ):
137                    node = exp.JSONPathSelector(this=indexes[0])
138                else:
139                    node = exp.JSONPathSubscript(this=indexes[0])
140            else:
141                node = exp.JSONPathUnion(expressions=indexes)
142        else:
143            raise ParseError(_error("Cannot have empty segment"))
144
145        _match(TokenType.R_BRACKET, raise_unmatched=True)
146
147        return node
148
149    def _parse_var_text() -> str:
150        """
151        Consumes & returns the text for a var. In BigQuery it's valid to have a key with spaces
152        in it, e.g JSON_QUERY(..., '$. a b c ') should produce a single JSONPathKey(' a b c ').
153        This is done by merging "consecutive" vars until a key separator is found (dot, colon etc)
154        or the path string is exhausted.
155        """
156        prev_index = i - 2
157
158        while _match(TokenType.VAR):
159            pass
160
161        start = 0 if prev_index < 0 else tokens[prev_index].end + 1
162
163        if i >= len(tokens):
164            # This key is the last token for the path, so it's text is the remaining path
165            text = path[start:]
166        else:
167            text = path[start : tokens[i].start]
168
169        return text
170
171    # We canonicalize the JSON path AST so that it always starts with a
172    # "root" element, so paths like "field" will be generated as "$.field"
173    _match(TokenType.DOLLAR)
174    expressions: t.List[exp.JSONPathPart] = [exp.JSONPathRoot()]
175
176    while _curr():
177        if _match(TokenType.DOT) or _match(TokenType.COLON):
178            recursive = _prev().text == ".."
179
180            if _match(TokenType.VAR):
181                value: t.Optional[str | exp.JSONPathWildcard] = _parse_var_text()
182            elif _match(TokenType.IDENTIFIER):
183                value = _prev().text
184            elif _match(TokenType.STAR):
185                value = exp.JSONPathWildcard()
186            else:
187                value = None
188
189            if recursive:
190                expressions.append(exp.JSONPathRecursive(this=value))
191            elif value:
192                expressions.append(exp.JSONPathKey(this=value))
193            else:
194                raise ParseError(_error("Expected key name or * after DOT"))
195        elif _match(TokenType.L_BRACKET):
196            expressions.append(_parse_bracket())
197        elif _match(TokenType.VAR):
198            expressions.append(exp.JSONPathKey(this=_parse_var_text()))
199        elif _match(TokenType.IDENTIFIER):
200            expressions.append(exp.JSONPathKey(this=_prev().text))
201        elif _match(TokenType.STAR):
202            expressions.append(exp.JSONPathWildcard())
203        else:
204            raise ParseError(_error(f"Unexpected {tokens[i].token_type}"))
205
206    return exp.JSONPath(expressions=expressions)
207
208
209JSON_PATH_PART_TRANSFORMS: t.Dict[t.Type[exp.Expression], t.Callable[..., str]] = {
210    exp.JSONPathFilter: lambda _, e: f"?{e.this}",
211    exp.JSONPathKey: lambda self, e: self._jsonpathkey_sql(e),
212    exp.JSONPathRecursive: lambda _, e: f"..{e.this or ''}",
213    exp.JSONPathRoot: lambda *_: "$",
214    exp.JSONPathScript: lambda _, e: f"({e.this}",
215    exp.JSONPathSelector: lambda self, e: f"[{self.json_path_part(e.this)}]",
216    exp.JSONPathSlice: lambda self, e: ":".join(
217        "" if p is False else self.json_path_part(p)
218        for p in [e.args.get("start"), e.args.get("end"), e.args.get("step")]
219        if p is not None
220    ),
221    exp.JSONPathSubscript: lambda self, e: self._jsonpathsubscript_sql(e),
222    exp.JSONPathUnion: lambda self,
223    e: f"[{','.join(self.json_path_part(p) for p in e.expressions)}]",
224    exp.JSONPathWildcard: lambda *_: "*",
225}
226
227ALL_JSON_PATH_PARTS = set(JSON_PATH_PART_TRANSFORMS)
class JSONPathTokenizer(sqlglot.tokens.Tokenizer):
15class JSONPathTokenizer(Tokenizer):
16    SINGLE_TOKENS = {
17        "(": TokenType.L_PAREN,
18        ")": TokenType.R_PAREN,
19        "[": TokenType.L_BRACKET,
20        "]": TokenType.R_BRACKET,
21        ":": TokenType.COLON,
22        ",": TokenType.COMMA,
23        "-": TokenType.DASH,
24        ".": TokenType.DOT,
25        "?": TokenType.PLACEHOLDER,
26        "@": TokenType.PARAMETER,
27        "'": TokenType.QUOTE,
28        '"': TokenType.QUOTE,
29        "$": TokenType.DOLLAR,
30        "*": TokenType.STAR,
31    }
32
33    KEYWORDS = {
34        "..": TokenType.DOT,
35    }
36
37    IDENTIFIER_ESCAPES = ["\\"]
38    STRING_ESCAPES = ["\\"]
SINGLE_TOKENS = {'(': <TokenType.L_PAREN: 'L_PAREN'>, ')': <TokenType.R_PAREN: 'R_PAREN'>, '[': <TokenType.L_BRACKET: 'L_BRACKET'>, ']': <TokenType.R_BRACKET: 'R_BRACKET'>, ':': <TokenType.COLON: 'COLON'>, ',': <TokenType.COMMA: 'COMMA'>, '-': <TokenType.DASH: 'DASH'>, '.': <TokenType.DOT: 'DOT'>, '?': <TokenType.PLACEHOLDER: 'PLACEHOLDER'>, '@': <TokenType.PARAMETER: 'PARAMETER'>, "'": <TokenType.QUOTE: 'QUOTE'>, '"': <TokenType.QUOTE: 'QUOTE'>, '$': <TokenType.DOLLAR: 'DOLLAR'>, '*': <TokenType.STAR: 'STAR'>}
KEYWORDS = {'..': <TokenType.DOT: 'DOT'>}
IDENTIFIER_ESCAPES = ['\\']
STRING_ESCAPES = ['\\']
def parse( path: str, dialect: Union[str, sqlglot.dialects.dialect.Dialect, Type[sqlglot.dialects.dialect.Dialect], NoneType] = None) -> sqlglot.expressions.JSONPath:
 41def parse(path: str, dialect: DialectType = None) -> exp.JSONPath:
 42    """Takes in a JSON path string and parses it into a JSONPath expression."""
 43    from sqlglot.dialects import Dialect
 44
 45    jsonpath_tokenizer = Dialect.get_or_raise(dialect).jsonpath_tokenizer
 46    tokens = jsonpath_tokenizer.tokenize(path)
 47    size = len(tokens)
 48
 49    i = 0
 50
 51    def _curr() -> t.Optional[TokenType]:
 52        return tokens[i].token_type if i < size else None
 53
 54    def _prev() -> Token:
 55        return tokens[i - 1]
 56
 57    def _advance() -> Token:
 58        nonlocal i
 59        i += 1
 60        return _prev()
 61
 62    def _error(msg: str) -> str:
 63        return f"{msg} at index {i}: {path}"
 64
 65    @t.overload
 66    def _match(token_type: TokenType, raise_unmatched: Lit[True] = True) -> Token:
 67        pass
 68
 69    @t.overload
 70    def _match(token_type: TokenType, raise_unmatched: Lit[False] = False) -> t.Optional[Token]:
 71        pass
 72
 73    def _match(token_type, raise_unmatched=False):
 74        if _curr() == token_type:
 75            return _advance()
 76        if raise_unmatched:
 77            raise ParseError(_error(f"Expected {token_type}"))
 78        return None
 79
 80    def _parse_literal() -> t.Any:
 81        token = _match(TokenType.STRING) or _match(TokenType.IDENTIFIER)
 82        if token:
 83            return token.text
 84        if _match(TokenType.STAR):
 85            return exp.JSONPathWildcard()
 86        if _match(TokenType.PLACEHOLDER) or _match(TokenType.L_PAREN):
 87            script = _prev().text == "("
 88            start = i
 89
 90            while True:
 91                if _match(TokenType.L_BRACKET):
 92                    _parse_bracket()  # nested call which we can throw away
 93                if _curr() in (TokenType.R_BRACKET, None):
 94                    break
 95                _advance()
 96
 97            expr_type = exp.JSONPathScript if script else exp.JSONPathFilter
 98            return expr_type(this=path[tokens[start].start : tokens[i].end])
 99
100        number = "-" if _match(TokenType.DASH) else ""
101
102        token = _match(TokenType.NUMBER)
103        if token:
104            number += token.text
105
106        if number:
107            return int(number)
108
109        return False
110
111    def _parse_slice() -> t.Any:
112        start = _parse_literal()
113        end = _parse_literal() if _match(TokenType.COLON) else None
114        step = _parse_literal() if _match(TokenType.COLON) else None
115
116        if end is None and step is None:
117            return start
118
119        return exp.JSONPathSlice(start=start, end=end, step=step)
120
121    def _parse_bracket() -> exp.JSONPathPart:
122        literal = _parse_slice()
123
124        if isinstance(literal, str) or literal is not False:
125            indexes = [literal]
126            while _match(TokenType.COMMA):
127                literal = _parse_slice()
128
129                if literal:
130                    indexes.append(literal)
131
132            if len(indexes) == 1:
133                if isinstance(literal, str):
134                    node: exp.JSONPathPart = exp.JSONPathKey(this=indexes[0])
135                elif isinstance(literal, exp.JSONPathPart) and isinstance(
136                    literal, (exp.JSONPathScript, exp.JSONPathFilter)
137                ):
138                    node = exp.JSONPathSelector(this=indexes[0])
139                else:
140                    node = exp.JSONPathSubscript(this=indexes[0])
141            else:
142                node = exp.JSONPathUnion(expressions=indexes)
143        else:
144            raise ParseError(_error("Cannot have empty segment"))
145
146        _match(TokenType.R_BRACKET, raise_unmatched=True)
147
148        return node
149
150    def _parse_var_text() -> str:
151        """
152        Consumes & returns the text for a var. In BigQuery it's valid to have a key with spaces
153        in it, e.g JSON_QUERY(..., '$. a b c ') should produce a single JSONPathKey(' a b c ').
154        This is done by merging "consecutive" vars until a key separator is found (dot, colon etc)
155        or the path string is exhausted.
156        """
157        prev_index = i - 2
158
159        while _match(TokenType.VAR):
160            pass
161
162        start = 0 if prev_index < 0 else tokens[prev_index].end + 1
163
164        if i >= len(tokens):
165            # This key is the last token for the path, so it's text is the remaining path
166            text = path[start:]
167        else:
168            text = path[start : tokens[i].start]
169
170        return text
171
172    # We canonicalize the JSON path AST so that it always starts with a
173    # "root" element, so paths like "field" will be generated as "$.field"
174    _match(TokenType.DOLLAR)
175    expressions: t.List[exp.JSONPathPart] = [exp.JSONPathRoot()]
176
177    while _curr():
178        if _match(TokenType.DOT) or _match(TokenType.COLON):
179            recursive = _prev().text == ".."
180
181            if _match(TokenType.VAR):
182                value: t.Optional[str | exp.JSONPathWildcard] = _parse_var_text()
183            elif _match(TokenType.IDENTIFIER):
184                value = _prev().text
185            elif _match(TokenType.STAR):
186                value = exp.JSONPathWildcard()
187            else:
188                value = None
189
190            if recursive:
191                expressions.append(exp.JSONPathRecursive(this=value))
192            elif value:
193                expressions.append(exp.JSONPathKey(this=value))
194            else:
195                raise ParseError(_error("Expected key name or * after DOT"))
196        elif _match(TokenType.L_BRACKET):
197            expressions.append(_parse_bracket())
198        elif _match(TokenType.VAR):
199            expressions.append(exp.JSONPathKey(this=_parse_var_text()))
200        elif _match(TokenType.IDENTIFIER):
201            expressions.append(exp.JSONPathKey(this=_prev().text))
202        elif _match(TokenType.STAR):
203            expressions.append(exp.JSONPathWildcard())
204        else:
205            raise ParseError(_error(f"Unexpected {tokens[i].token_type}"))
206
207    return exp.JSONPath(expressions=expressions)

Takes in a JSON path string and parses it into a JSONPath expression.

JSON_PATH_PART_TRANSFORMS: Dict[Type[sqlglot.expressions.Expression], Callable[..., str]] = {<class 'sqlglot.expressions.JSONPathFilter'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathKey'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathRecursive'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathRoot'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathScript'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathSelector'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathSlice'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathSubscript'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathUnion'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathWildcard'>: <function <lambda>>}