Edit on GitHub

sqlglot.jsonpath

  1from __future__ import annotations
  2
  3import typing as t
  4
  5import sqlglot.expressions as exp
  6from sqlglot.errors import ParseError
  7from sqlglot.tokens import Token, Tokenizer, TokenType
  8
  9if t.TYPE_CHECKING:
 10    from sqlglot._typing import Lit
 11    from sqlglot.dialects.dialect import DialectType
 12
 13
 14class JSONPathTokenizer(Tokenizer):
 15    SINGLE_TOKENS = {
 16        "(": TokenType.L_PAREN,
 17        ")": TokenType.R_PAREN,
 18        "[": TokenType.L_BRACKET,
 19        "]": TokenType.R_BRACKET,
 20        ":": TokenType.COLON,
 21        ",": TokenType.COMMA,
 22        "-": TokenType.DASH,
 23        ".": TokenType.DOT,
 24        "?": TokenType.PLACEHOLDER,
 25        "@": TokenType.PARAMETER,
 26        "'": TokenType.QUOTE,
 27        '"': TokenType.QUOTE,
 28        "$": TokenType.DOLLAR,
 29        "*": TokenType.STAR,
 30    }
 31
 32    KEYWORDS = {
 33        "..": TokenType.DOT,
 34    }
 35
 36    IDENTIFIER_ESCAPES = ["\\"]
 37    STRING_ESCAPES = ["\\"]
 38
 39    VAR_TOKENS = {
 40        TokenType.VAR,
 41    }
 42
 43
 44def parse(path: str, dialect: DialectType = None) -> exp.JSONPath:
 45    """Takes in a JSON path string and parses it into a JSONPath expression."""
 46    from sqlglot.dialects import Dialect
 47
 48    jsonpath_tokenizer = Dialect.get_or_raise(dialect).jsonpath_tokenizer()
 49    tokens = jsonpath_tokenizer.tokenize(path)
 50    size = len(tokens)
 51
 52    i = 0
 53
 54    def _curr() -> t.Optional[TokenType]:
 55        return tokens[i].token_type if i < size else None
 56
 57    def _prev() -> Token:
 58        return tokens[i - 1]
 59
 60    def _advance() -> Token:
 61        nonlocal i
 62        i += 1
 63        return _prev()
 64
 65    def _error(msg: str) -> str:
 66        return f"{msg} at index {i}: {path}"
 67
 68    @t.overload
 69    def _match(token_type: TokenType, raise_unmatched: Lit[True] = True) -> Token:
 70        pass
 71
 72    @t.overload
 73    def _match(token_type: TokenType, raise_unmatched: Lit[False] = False) -> t.Optional[Token]:
 74        pass
 75
 76    def _match(token_type, raise_unmatched=False):
 77        if _curr() == token_type:
 78            return _advance()
 79        if raise_unmatched:
 80            raise ParseError(_error(f"Expected {token_type}"))
 81        return None
 82
 83    def _match_set(types: t.Collection[TokenType]) -> t.Optional[Token]:
 84        return _advance() if _curr() in types else None
 85
 86    def _parse_literal() -> t.Any:
 87        token = _match(TokenType.STRING) or _match(TokenType.IDENTIFIER)
 88        if token:
 89            return token.text
 90        if _match(TokenType.STAR):
 91            return exp.JSONPathWildcard()
 92        if _match(TokenType.PLACEHOLDER) or _match(TokenType.L_PAREN):
 93            script = _prev().text == "("
 94            start = i
 95
 96            while True:
 97                if _match(TokenType.L_BRACKET):
 98                    _parse_bracket()  # nested call which we can throw away
 99                if _curr() in (TokenType.R_BRACKET, None):
100                    break
101                _advance()
102
103            expr_type = exp.JSONPathScript if script else exp.JSONPathFilter
104            return expr_type(this=path[tokens[start].start : tokens[i].end])
105
106        number = "-" if _match(TokenType.DASH) else ""
107
108        token = _match(TokenType.NUMBER)
109        if token:
110            number += token.text
111
112        if number:
113            return int(number)
114
115        return False
116
117    def _parse_slice() -> t.Any:
118        start = _parse_literal()
119        end = _parse_literal() if _match(TokenType.COLON) else None
120        step = _parse_literal() if _match(TokenType.COLON) else None
121
122        if end is None and step is None:
123            return start
124
125        return exp.JSONPathSlice(start=start, end=end, step=step)
126
127    def _parse_bracket() -> exp.JSONPathPart:
128        literal = _parse_slice()
129
130        if isinstance(literal, str) or literal is not False:
131            indexes = [literal]
132            while _match(TokenType.COMMA):
133                literal = _parse_slice()
134
135                if literal:
136                    indexes.append(literal)
137
138            if len(indexes) == 1:
139                if isinstance(literal, str):
140                    node: exp.JSONPathPart = exp.JSONPathKey(this=indexes[0])
141                elif isinstance(literal, exp.JSONPathPart) and isinstance(
142                    literal, (exp.JSONPathScript, exp.JSONPathFilter)
143                ):
144                    node = exp.JSONPathSelector(this=indexes[0])
145                else:
146                    node = exp.JSONPathSubscript(this=indexes[0])
147            else:
148                node = exp.JSONPathUnion(expressions=indexes)
149        else:
150            raise ParseError(_error("Cannot have empty segment"))
151
152        _match(TokenType.R_BRACKET, raise_unmatched=True)
153
154        return node
155
156    def _parse_var_text() -> str:
157        """
158        Consumes & returns the text for a var. In BigQuery it's valid to have a key with spaces
159        in it, e.g JSON_QUERY(..., '$. a b c ') should produce a single JSONPathKey(' a b c ').
160        This is done by merging "consecutive" vars until a key separator is found (dot, colon etc)
161        or the path string is exhausted.
162        """
163        prev_index = i - 2
164
165        while _match_set(jsonpath_tokenizer.VAR_TOKENS):
166            pass
167
168        start = 0 if prev_index < 0 else tokens[prev_index].end + 1
169
170        if i >= len(tokens):
171            # This key is the last token for the path, so it's text is the remaining path
172            text = path[start:]
173        else:
174            text = path[start : tokens[i].start]
175
176        return text
177
178    # We canonicalize the JSON path AST so that it always starts with a
179    # "root" element, so paths like "field" will be generated as "$.field"
180    _match(TokenType.DOLLAR)
181    expressions: t.List[exp.JSONPathPart] = [exp.JSONPathRoot()]
182
183    while _curr():
184        if _match(TokenType.DOT) or _match(TokenType.COLON):
185            recursive = _prev().text == ".."
186
187            if _match_set(jsonpath_tokenizer.VAR_TOKENS):
188                value: t.Optional[str | exp.JSONPathWildcard] = _parse_var_text()
189            elif _match(TokenType.IDENTIFIER):
190                value = _prev().text
191            elif _match(TokenType.STAR):
192                value = exp.JSONPathWildcard()
193            else:
194                value = None
195
196            if recursive:
197                expressions.append(exp.JSONPathRecursive(this=value))
198            elif value:
199                expressions.append(exp.JSONPathKey(this=value))
200            else:
201                raise ParseError(_error("Expected key name or * after DOT"))
202        elif _match(TokenType.L_BRACKET):
203            expressions.append(_parse_bracket())
204        elif _match_set(jsonpath_tokenizer.VAR_TOKENS):
205            expressions.append(exp.JSONPathKey(this=_parse_var_text()))
206        elif _match(TokenType.IDENTIFIER):
207            expressions.append(exp.JSONPathKey(this=_prev().text))
208        elif _match(TokenType.STAR):
209            expressions.append(exp.JSONPathWildcard())
210        else:
211            raise ParseError(_error(f"Unexpected {tokens[i].token_type}"))
212
213    return exp.JSONPath(expressions=expressions)
214
215
216JSON_PATH_PART_TRANSFORMS: t.Dict[t.Type[exp.Expression], t.Callable[..., str]] = {
217    exp.JSONPathFilter: lambda _, e: f"?{e.this}",
218    exp.JSONPathKey: lambda self, e: self._jsonpathkey_sql(e),
219    exp.JSONPathRecursive: lambda _, e: f"..{e.this or ''}",
220    exp.JSONPathRoot: lambda *_: "$",
221    exp.JSONPathScript: lambda _, e: f"({e.this}",
222    exp.JSONPathSelector: lambda self, e: f"[{self.json_path_part(e.this)}]",
223    exp.JSONPathSlice: lambda self, e: ":".join(
224        "" if p is False else self.json_path_part(p)
225        for p in [e.args.get("start"), e.args.get("end"), e.args.get("step")]
226        if p is not None
227    ),
228    exp.JSONPathSubscript: lambda self, e: self._jsonpathsubscript_sql(e),
229    exp.JSONPathUnion: lambda self,
230    e: f"[{','.join(self.json_path_part(p) for p in e.expressions)}]",
231    exp.JSONPathWildcard: lambda *_: "*",
232}
233
234ALL_JSON_PATH_PARTS = set(JSON_PATH_PART_TRANSFORMS)
class JSONPathTokenizer(sqlglot.tokens.Tokenizer):
15class JSONPathTokenizer(Tokenizer):
16    SINGLE_TOKENS = {
17        "(": TokenType.L_PAREN,
18        ")": TokenType.R_PAREN,
19        "[": TokenType.L_BRACKET,
20        "]": TokenType.R_BRACKET,
21        ":": TokenType.COLON,
22        ",": TokenType.COMMA,
23        "-": TokenType.DASH,
24        ".": TokenType.DOT,
25        "?": TokenType.PLACEHOLDER,
26        "@": TokenType.PARAMETER,
27        "'": TokenType.QUOTE,
28        '"': TokenType.QUOTE,
29        "$": TokenType.DOLLAR,
30        "*": TokenType.STAR,
31    }
32
33    KEYWORDS = {
34        "..": TokenType.DOT,
35    }
36
37    IDENTIFIER_ESCAPES = ["\\"]
38    STRING_ESCAPES = ["\\"]
39
40    VAR_TOKENS = {
41        TokenType.VAR,
42    }
SINGLE_TOKENS = {'(': <TokenType.L_PAREN: 'L_PAREN'>, ')': <TokenType.R_PAREN: 'R_PAREN'>, '[': <TokenType.L_BRACKET: 'L_BRACKET'>, ']': <TokenType.R_BRACKET: 'R_BRACKET'>, ':': <TokenType.COLON: 'COLON'>, ',': <TokenType.COMMA: 'COMMA'>, '-': <TokenType.DASH: 'DASH'>, '.': <TokenType.DOT: 'DOT'>, '?': <TokenType.PLACEHOLDER: 'PLACEHOLDER'>, '@': <TokenType.PARAMETER: 'PARAMETER'>, "'": <TokenType.QUOTE: 'QUOTE'>, '"': <TokenType.QUOTE: 'QUOTE'>, '$': <TokenType.DOLLAR: 'DOLLAR'>, '*': <TokenType.STAR: 'STAR'>}
KEYWORDS = {'..': <TokenType.DOT: 'DOT'>}
IDENTIFIER_ESCAPES = ['\\']
STRING_ESCAPES = ['\\']
VAR_TOKENS = {<TokenType.VAR: 'VAR'>}
def parse( path: str, dialect: Union[str, sqlglot.dialects.Dialect, Type[sqlglot.dialects.Dialect], NoneType] = None) -> sqlglot.expressions.JSONPath:
 45def parse(path: str, dialect: DialectType = None) -> exp.JSONPath:
 46    """Takes in a JSON path string and parses it into a JSONPath expression."""
 47    from sqlglot.dialects import Dialect
 48
 49    jsonpath_tokenizer = Dialect.get_or_raise(dialect).jsonpath_tokenizer()
 50    tokens = jsonpath_tokenizer.tokenize(path)
 51    size = len(tokens)
 52
 53    i = 0
 54
 55    def _curr() -> t.Optional[TokenType]:
 56        return tokens[i].token_type if i < size else None
 57
 58    def _prev() -> Token:
 59        return tokens[i - 1]
 60
 61    def _advance() -> Token:
 62        nonlocal i
 63        i += 1
 64        return _prev()
 65
 66    def _error(msg: str) -> str:
 67        return f"{msg} at index {i}: {path}"
 68
 69    @t.overload
 70    def _match(token_type: TokenType, raise_unmatched: Lit[True] = True) -> Token:
 71        pass
 72
 73    @t.overload
 74    def _match(token_type: TokenType, raise_unmatched: Lit[False] = False) -> t.Optional[Token]:
 75        pass
 76
 77    def _match(token_type, raise_unmatched=False):
 78        if _curr() == token_type:
 79            return _advance()
 80        if raise_unmatched:
 81            raise ParseError(_error(f"Expected {token_type}"))
 82        return None
 83
 84    def _match_set(types: t.Collection[TokenType]) -> t.Optional[Token]:
 85        return _advance() if _curr() in types else None
 86
 87    def _parse_literal() -> t.Any:
 88        token = _match(TokenType.STRING) or _match(TokenType.IDENTIFIER)
 89        if token:
 90            return token.text
 91        if _match(TokenType.STAR):
 92            return exp.JSONPathWildcard()
 93        if _match(TokenType.PLACEHOLDER) or _match(TokenType.L_PAREN):
 94            script = _prev().text == "("
 95            start = i
 96
 97            while True:
 98                if _match(TokenType.L_BRACKET):
 99                    _parse_bracket()  # nested call which we can throw away
100                if _curr() in (TokenType.R_BRACKET, None):
101                    break
102                _advance()
103
104            expr_type = exp.JSONPathScript if script else exp.JSONPathFilter
105            return expr_type(this=path[tokens[start].start : tokens[i].end])
106
107        number = "-" if _match(TokenType.DASH) else ""
108
109        token = _match(TokenType.NUMBER)
110        if token:
111            number += token.text
112
113        if number:
114            return int(number)
115
116        return False
117
118    def _parse_slice() -> t.Any:
119        start = _parse_literal()
120        end = _parse_literal() if _match(TokenType.COLON) else None
121        step = _parse_literal() if _match(TokenType.COLON) else None
122
123        if end is None and step is None:
124            return start
125
126        return exp.JSONPathSlice(start=start, end=end, step=step)
127
128    def _parse_bracket() -> exp.JSONPathPart:
129        literal = _parse_slice()
130
131        if isinstance(literal, str) or literal is not False:
132            indexes = [literal]
133            while _match(TokenType.COMMA):
134                literal = _parse_slice()
135
136                if literal:
137                    indexes.append(literal)
138
139            if len(indexes) == 1:
140                if isinstance(literal, str):
141                    node: exp.JSONPathPart = exp.JSONPathKey(this=indexes[0])
142                elif isinstance(literal, exp.JSONPathPart) and isinstance(
143                    literal, (exp.JSONPathScript, exp.JSONPathFilter)
144                ):
145                    node = exp.JSONPathSelector(this=indexes[0])
146                else:
147                    node = exp.JSONPathSubscript(this=indexes[0])
148            else:
149                node = exp.JSONPathUnion(expressions=indexes)
150        else:
151            raise ParseError(_error("Cannot have empty segment"))
152
153        _match(TokenType.R_BRACKET, raise_unmatched=True)
154
155        return node
156
157    def _parse_var_text() -> str:
158        """
159        Consumes & returns the text for a var. In BigQuery it's valid to have a key with spaces
160        in it, e.g JSON_QUERY(..., '$. a b c ') should produce a single JSONPathKey(' a b c ').
161        This is done by merging "consecutive" vars until a key separator is found (dot, colon etc)
162        or the path string is exhausted.
163        """
164        prev_index = i - 2
165
166        while _match_set(jsonpath_tokenizer.VAR_TOKENS):
167            pass
168
169        start = 0 if prev_index < 0 else tokens[prev_index].end + 1
170
171        if i >= len(tokens):
172            # This key is the last token for the path, so it's text is the remaining path
173            text = path[start:]
174        else:
175            text = path[start : tokens[i].start]
176
177        return text
178
179    # We canonicalize the JSON path AST so that it always starts with a
180    # "root" element, so paths like "field" will be generated as "$.field"
181    _match(TokenType.DOLLAR)
182    expressions: t.List[exp.JSONPathPart] = [exp.JSONPathRoot()]
183
184    while _curr():
185        if _match(TokenType.DOT) or _match(TokenType.COLON):
186            recursive = _prev().text == ".."
187
188            if _match_set(jsonpath_tokenizer.VAR_TOKENS):
189                value: t.Optional[str | exp.JSONPathWildcard] = _parse_var_text()
190            elif _match(TokenType.IDENTIFIER):
191                value = _prev().text
192            elif _match(TokenType.STAR):
193                value = exp.JSONPathWildcard()
194            else:
195                value = None
196
197            if recursive:
198                expressions.append(exp.JSONPathRecursive(this=value))
199            elif value:
200                expressions.append(exp.JSONPathKey(this=value))
201            else:
202                raise ParseError(_error("Expected key name or * after DOT"))
203        elif _match(TokenType.L_BRACKET):
204            expressions.append(_parse_bracket())
205        elif _match_set(jsonpath_tokenizer.VAR_TOKENS):
206            expressions.append(exp.JSONPathKey(this=_parse_var_text()))
207        elif _match(TokenType.IDENTIFIER):
208            expressions.append(exp.JSONPathKey(this=_prev().text))
209        elif _match(TokenType.STAR):
210            expressions.append(exp.JSONPathWildcard())
211        else:
212            raise ParseError(_error(f"Unexpected {tokens[i].token_type}"))
213
214    return exp.JSONPath(expressions=expressions)

Takes in a JSON path string and parses it into a JSONPath expression.

JSON_PATH_PART_TRANSFORMS: Dict[Type[sqlglot.expressions.Expression], Callable[..., str]] = {<class 'sqlglot.expressions.JSONPathFilter'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathKey'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathRecursive'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathRoot'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathScript'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathSelector'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathSlice'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathSubscript'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathUnion'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathWildcard'>: <function <lambda>>}