Edit on GitHub

sqlglot.jsonpath

  1from __future__ import annotations
  2
  3import typing as t
  4
  5import sqlglot.expressions as exp
  6from sqlglot.errors import ParseError
  7from sqlglot.tokens import Token, Tokenizer, TokenType
  8
  9if t.TYPE_CHECKING:
 10    from sqlglot._typing import Lit
 11    from sqlglot.dialects.dialect import DialectType
 12
 13
 14class JSONPathTokenizer(Tokenizer):
 15    SINGLE_TOKENS = {
 16        "(": TokenType.L_PAREN,
 17        ")": TokenType.R_PAREN,
 18        "[": TokenType.L_BRACKET,
 19        "]": TokenType.R_BRACKET,
 20        ":": TokenType.COLON,
 21        ",": TokenType.COMMA,
 22        "-": TokenType.DASH,
 23        ".": TokenType.DOT,
 24        "?": TokenType.PLACEHOLDER,
 25        "@": TokenType.PARAMETER,
 26        "'": TokenType.QUOTE,
 27        '"': TokenType.QUOTE,
 28        "$": TokenType.DOLLAR,
 29        "*": TokenType.STAR,
 30    }
 31
 32    KEYWORDS = {
 33        "..": TokenType.DOT,
 34    }
 35
 36    IDENTIFIER_ESCAPES = ["\\"]
 37    STRING_ESCAPES = ["\\"]
 38
 39
 40def parse(path: str, dialect: DialectType = None) -> exp.JSONPath:
 41    """Takes in a JSON path string and parses it into a JSONPath expression."""
 42    from sqlglot.dialects import Dialect
 43
 44    jsonpath_tokenizer = Dialect.get_or_raise(dialect).jsonpath_tokenizer
 45    tokens = jsonpath_tokenizer.tokenize(path)
 46    size = len(tokens)
 47
 48    i = 0
 49
 50    def _curr() -> t.Optional[TokenType]:
 51        return tokens[i].token_type if i < size else None
 52
 53    def _prev() -> Token:
 54        return tokens[i - 1]
 55
 56    def _advance() -> Token:
 57        nonlocal i
 58        i += 1
 59        return _prev()
 60
 61    def _error(msg: str) -> str:
 62        return f"{msg} at index {i}: {path}"
 63
 64    @t.overload
 65    def _match(token_type: TokenType, raise_unmatched: Lit[True] = True) -> Token:
 66        pass
 67
 68    @t.overload
 69    def _match(token_type: TokenType, raise_unmatched: Lit[False] = False) -> t.Optional[Token]:
 70        pass
 71
 72    def _match(token_type, raise_unmatched=False):
 73        if _curr() == token_type:
 74            return _advance()
 75        if raise_unmatched:
 76            raise ParseError(_error(f"Expected {token_type}"))
 77        return None
 78
 79    def _parse_literal() -> t.Any:
 80        token = _match(TokenType.STRING) or _match(TokenType.IDENTIFIER)
 81        if token:
 82            return token.text
 83        if _match(TokenType.STAR):
 84            return exp.JSONPathWildcard()
 85        if _match(TokenType.PLACEHOLDER) or _match(TokenType.L_PAREN):
 86            script = _prev().text == "("
 87            start = i
 88
 89            while True:
 90                if _match(TokenType.L_BRACKET):
 91                    _parse_bracket()  # nested call which we can throw away
 92                if _curr() in (TokenType.R_BRACKET, None):
 93                    break
 94                _advance()
 95
 96            expr_type = exp.JSONPathScript if script else exp.JSONPathFilter
 97            return expr_type(this=path[tokens[start].start : tokens[i].end])
 98
 99        number = "-" if _match(TokenType.DASH) else ""
100
101        token = _match(TokenType.NUMBER)
102        if token:
103            number += token.text
104
105        if number:
106            return int(number)
107
108        return False
109
110    def _parse_slice() -> t.Any:
111        start = _parse_literal()
112        end = _parse_literal() if _match(TokenType.COLON) else None
113        step = _parse_literal() if _match(TokenType.COLON) else None
114
115        if end is None and step is None:
116            return start
117
118        return exp.JSONPathSlice(start=start, end=end, step=step)
119
120    def _parse_bracket() -> exp.JSONPathPart:
121        literal = _parse_slice()
122
123        if isinstance(literal, str) or literal is not False:
124            indexes = [literal]
125            while _match(TokenType.COMMA):
126                literal = _parse_slice()
127
128                if literal:
129                    indexes.append(literal)
130
131            if len(indexes) == 1:
132                if isinstance(literal, str):
133                    node: exp.JSONPathPart = exp.JSONPathKey(this=indexes[0])
134                elif isinstance(literal, exp.JSONPathPart) and isinstance(
135                    literal, (exp.JSONPathScript, exp.JSONPathFilter)
136                ):
137                    node = exp.JSONPathSelector(this=indexes[0])
138                else:
139                    node = exp.JSONPathSubscript(this=indexes[0])
140            else:
141                node = exp.JSONPathUnion(expressions=indexes)
142        else:
143            raise ParseError(_error("Cannot have empty segment"))
144
145        _match(TokenType.R_BRACKET, raise_unmatched=True)
146
147        return node
148
149    # We canonicalize the JSON path AST so that it always starts with a
150    # "root" element, so paths like "field" will be generated as "$.field"
151    _match(TokenType.DOLLAR)
152    expressions: t.List[exp.JSONPathPart] = [exp.JSONPathRoot()]
153
154    while _curr():
155        if _match(TokenType.DOT) or _match(TokenType.COLON):
156            recursive = _prev().text == ".."
157
158            if _match(TokenType.VAR) or _match(TokenType.IDENTIFIER):
159                value: t.Optional[str | exp.JSONPathWildcard] = _prev().text
160            elif _match(TokenType.STAR):
161                value = exp.JSONPathWildcard()
162            else:
163                value = None
164
165            if recursive:
166                expressions.append(exp.JSONPathRecursive(this=value))
167            elif value:
168                expressions.append(exp.JSONPathKey(this=value))
169            else:
170                raise ParseError(_error("Expected key name or * after DOT"))
171        elif _match(TokenType.L_BRACKET):
172            expressions.append(_parse_bracket())
173        elif _match(TokenType.VAR) or _match(TokenType.IDENTIFIER):
174            expressions.append(exp.JSONPathKey(this=_prev().text))
175        elif _match(TokenType.STAR):
176            expressions.append(exp.JSONPathWildcard())
177        else:
178            raise ParseError(_error(f"Unexpected {tokens[i].token_type}"))
179
180    return exp.JSONPath(expressions=expressions)
181
182
183JSON_PATH_PART_TRANSFORMS: t.Dict[t.Type[exp.Expression], t.Callable[..., str]] = {
184    exp.JSONPathFilter: lambda _, e: f"?{e.this}",
185    exp.JSONPathKey: lambda self, e: self._jsonpathkey_sql(e),
186    exp.JSONPathRecursive: lambda _, e: f"..{e.this or ''}",
187    exp.JSONPathRoot: lambda *_: "$",
188    exp.JSONPathScript: lambda _, e: f"({e.this}",
189    exp.JSONPathSelector: lambda self, e: f"[{self.json_path_part(e.this)}]",
190    exp.JSONPathSlice: lambda self, e: ":".join(
191        "" if p is False else self.json_path_part(p)
192        for p in [e.args.get("start"), e.args.get("end"), e.args.get("step")]
193        if p is not None
194    ),
195    exp.JSONPathSubscript: lambda self, e: self._jsonpathsubscript_sql(e),
196    exp.JSONPathUnion: lambda self,
197    e: f"[{','.join(self.json_path_part(p) for p in e.expressions)}]",
198    exp.JSONPathWildcard: lambda *_: "*",
199}
200
201ALL_JSON_PATH_PARTS = set(JSON_PATH_PART_TRANSFORMS)
class JSONPathTokenizer(sqlglot.tokens.Tokenizer):
15class JSONPathTokenizer(Tokenizer):
16    SINGLE_TOKENS = {
17        "(": TokenType.L_PAREN,
18        ")": TokenType.R_PAREN,
19        "[": TokenType.L_BRACKET,
20        "]": TokenType.R_BRACKET,
21        ":": TokenType.COLON,
22        ",": TokenType.COMMA,
23        "-": TokenType.DASH,
24        ".": TokenType.DOT,
25        "?": TokenType.PLACEHOLDER,
26        "@": TokenType.PARAMETER,
27        "'": TokenType.QUOTE,
28        '"': TokenType.QUOTE,
29        "$": TokenType.DOLLAR,
30        "*": TokenType.STAR,
31    }
32
33    KEYWORDS = {
34        "..": TokenType.DOT,
35    }
36
37    IDENTIFIER_ESCAPES = ["\\"]
38    STRING_ESCAPES = ["\\"]
SINGLE_TOKENS = {'(': <TokenType.L_PAREN: 'L_PAREN'>, ')': <TokenType.R_PAREN: 'R_PAREN'>, '[': <TokenType.L_BRACKET: 'L_BRACKET'>, ']': <TokenType.R_BRACKET: 'R_BRACKET'>, ':': <TokenType.COLON: 'COLON'>, ',': <TokenType.COMMA: 'COMMA'>, '-': <TokenType.DASH: 'DASH'>, '.': <TokenType.DOT: 'DOT'>, '?': <TokenType.PLACEHOLDER: 'PLACEHOLDER'>, '@': <TokenType.PARAMETER: 'PARAMETER'>, "'": <TokenType.QUOTE: 'QUOTE'>, '"': <TokenType.QUOTE: 'QUOTE'>, '$': <TokenType.DOLLAR: 'DOLLAR'>, '*': <TokenType.STAR: 'STAR'>}
KEYWORDS = {'..': <TokenType.DOT: 'DOT'>}
IDENTIFIER_ESCAPES = ['\\']
STRING_ESCAPES = ['\\']
def parse( path: str, dialect: Union[str, sqlglot.dialects.dialect.Dialect, Type[sqlglot.dialects.dialect.Dialect], NoneType] = None) -> sqlglot.expressions.JSONPath:
 41def parse(path: str, dialect: DialectType = None) -> exp.JSONPath:
 42    """Takes in a JSON path string and parses it into a JSONPath expression."""
 43    from sqlglot.dialects import Dialect
 44
 45    jsonpath_tokenizer = Dialect.get_or_raise(dialect).jsonpath_tokenizer
 46    tokens = jsonpath_tokenizer.tokenize(path)
 47    size = len(tokens)
 48
 49    i = 0
 50
 51    def _curr() -> t.Optional[TokenType]:
 52        return tokens[i].token_type if i < size else None
 53
 54    def _prev() -> Token:
 55        return tokens[i - 1]
 56
 57    def _advance() -> Token:
 58        nonlocal i
 59        i += 1
 60        return _prev()
 61
 62    def _error(msg: str) -> str:
 63        return f"{msg} at index {i}: {path}"
 64
 65    @t.overload
 66    def _match(token_type: TokenType, raise_unmatched: Lit[True] = True) -> Token:
 67        pass
 68
 69    @t.overload
 70    def _match(token_type: TokenType, raise_unmatched: Lit[False] = False) -> t.Optional[Token]:
 71        pass
 72
 73    def _match(token_type, raise_unmatched=False):
 74        if _curr() == token_type:
 75            return _advance()
 76        if raise_unmatched:
 77            raise ParseError(_error(f"Expected {token_type}"))
 78        return None
 79
 80    def _parse_literal() -> t.Any:
 81        token = _match(TokenType.STRING) or _match(TokenType.IDENTIFIER)
 82        if token:
 83            return token.text
 84        if _match(TokenType.STAR):
 85            return exp.JSONPathWildcard()
 86        if _match(TokenType.PLACEHOLDER) or _match(TokenType.L_PAREN):
 87            script = _prev().text == "("
 88            start = i
 89
 90            while True:
 91                if _match(TokenType.L_BRACKET):
 92                    _parse_bracket()  # nested call which we can throw away
 93                if _curr() in (TokenType.R_BRACKET, None):
 94                    break
 95                _advance()
 96
 97            expr_type = exp.JSONPathScript if script else exp.JSONPathFilter
 98            return expr_type(this=path[tokens[start].start : tokens[i].end])
 99
100        number = "-" if _match(TokenType.DASH) else ""
101
102        token = _match(TokenType.NUMBER)
103        if token:
104            number += token.text
105
106        if number:
107            return int(number)
108
109        return False
110
111    def _parse_slice() -> t.Any:
112        start = _parse_literal()
113        end = _parse_literal() if _match(TokenType.COLON) else None
114        step = _parse_literal() if _match(TokenType.COLON) else None
115
116        if end is None and step is None:
117            return start
118
119        return exp.JSONPathSlice(start=start, end=end, step=step)
120
121    def _parse_bracket() -> exp.JSONPathPart:
122        literal = _parse_slice()
123
124        if isinstance(literal, str) or literal is not False:
125            indexes = [literal]
126            while _match(TokenType.COMMA):
127                literal = _parse_slice()
128
129                if literal:
130                    indexes.append(literal)
131
132            if len(indexes) == 1:
133                if isinstance(literal, str):
134                    node: exp.JSONPathPart = exp.JSONPathKey(this=indexes[0])
135                elif isinstance(literal, exp.JSONPathPart) and isinstance(
136                    literal, (exp.JSONPathScript, exp.JSONPathFilter)
137                ):
138                    node = exp.JSONPathSelector(this=indexes[0])
139                else:
140                    node = exp.JSONPathSubscript(this=indexes[0])
141            else:
142                node = exp.JSONPathUnion(expressions=indexes)
143        else:
144            raise ParseError(_error("Cannot have empty segment"))
145
146        _match(TokenType.R_BRACKET, raise_unmatched=True)
147
148        return node
149
150    # We canonicalize the JSON path AST so that it always starts with a
151    # "root" element, so paths like "field" will be generated as "$.field"
152    _match(TokenType.DOLLAR)
153    expressions: t.List[exp.JSONPathPart] = [exp.JSONPathRoot()]
154
155    while _curr():
156        if _match(TokenType.DOT) or _match(TokenType.COLON):
157            recursive = _prev().text == ".."
158
159            if _match(TokenType.VAR) or _match(TokenType.IDENTIFIER):
160                value: t.Optional[str | exp.JSONPathWildcard] = _prev().text
161            elif _match(TokenType.STAR):
162                value = exp.JSONPathWildcard()
163            else:
164                value = None
165
166            if recursive:
167                expressions.append(exp.JSONPathRecursive(this=value))
168            elif value:
169                expressions.append(exp.JSONPathKey(this=value))
170            else:
171                raise ParseError(_error("Expected key name or * after DOT"))
172        elif _match(TokenType.L_BRACKET):
173            expressions.append(_parse_bracket())
174        elif _match(TokenType.VAR) or _match(TokenType.IDENTIFIER):
175            expressions.append(exp.JSONPathKey(this=_prev().text))
176        elif _match(TokenType.STAR):
177            expressions.append(exp.JSONPathWildcard())
178        else:
179            raise ParseError(_error(f"Unexpected {tokens[i].token_type}"))
180
181    return exp.JSONPath(expressions=expressions)

Takes in a JSON path string and parses it into a JSONPath expression.

JSON_PATH_PART_TRANSFORMS: Dict[Type[sqlglot.expressions.Expression], Callable[..., str]] = {<class 'sqlglot.expressions.JSONPathFilter'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathKey'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathRecursive'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathRoot'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathScript'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathSelector'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathSlice'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathSubscript'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathUnion'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathWildcard'>: <function <lambda>>}