Edit on GitHub

sqlglot.jsonpath

  1from __future__ import annotations
  2
  3import typing as t
  4
  5import sqlglot.expressions as exp
  6from sqlglot.errors import ParseError
  7from sqlglot.tokens import Token, Tokenizer, TokenType
  8
  9if t.TYPE_CHECKING:
 10    from sqlglot._typing import Lit
 11
 12
 13class JSONPathTokenizer(Tokenizer):
 14    SINGLE_TOKENS = {
 15        "(": TokenType.L_PAREN,
 16        ")": TokenType.R_PAREN,
 17        "[": TokenType.L_BRACKET,
 18        "]": TokenType.R_BRACKET,
 19        ":": TokenType.COLON,
 20        ",": TokenType.COMMA,
 21        "-": TokenType.DASH,
 22        ".": TokenType.DOT,
 23        "?": TokenType.PLACEHOLDER,
 24        "@": TokenType.PARAMETER,
 25        "'": TokenType.QUOTE,
 26        '"': TokenType.QUOTE,
 27        "$": TokenType.DOLLAR,
 28        "*": TokenType.STAR,
 29    }
 30
 31    KEYWORDS = {
 32        "..": TokenType.DOT,
 33    }
 34
 35    IDENTIFIER_ESCAPES = ["\\"]
 36    STRING_ESCAPES = ["\\"]
 37
 38
 39def parse(path: str) -> exp.JSONPath:
 40    """Takes in a JSON path string and parses it into a JSONPath expression."""
 41    tokens = JSONPathTokenizer().tokenize(path)
 42    size = len(tokens)
 43
 44    i = 0
 45
 46    def _curr() -> t.Optional[TokenType]:
 47        return tokens[i].token_type if i < size else None
 48
 49    def _prev() -> Token:
 50        return tokens[i - 1]
 51
 52    def _advance() -> Token:
 53        nonlocal i
 54        i += 1
 55        return _prev()
 56
 57    def _error(msg: str) -> str:
 58        return f"{msg} at index {i}: {path}"
 59
 60    @t.overload
 61    def _match(token_type: TokenType, raise_unmatched: Lit[True] = True) -> Token:
 62        pass
 63
 64    @t.overload
 65    def _match(token_type: TokenType, raise_unmatched: Lit[False] = False) -> t.Optional[Token]:
 66        pass
 67
 68    def _match(token_type, raise_unmatched=False):
 69        if _curr() == token_type:
 70            return _advance()
 71        if raise_unmatched:
 72            raise ParseError(_error(f"Expected {token_type}"))
 73        return None
 74
 75    def _parse_literal() -> t.Any:
 76        token = _match(TokenType.STRING) or _match(TokenType.IDENTIFIER)
 77        if token:
 78            return token.text
 79        if _match(TokenType.STAR):
 80            return exp.JSONPathWildcard()
 81        if _match(TokenType.PLACEHOLDER) or _match(TokenType.L_PAREN):
 82            script = _prev().text == "("
 83            start = i
 84
 85            while True:
 86                if _match(TokenType.L_BRACKET):
 87                    _parse_bracket()  # nested call which we can throw away
 88                if _curr() in (TokenType.R_BRACKET, None):
 89                    break
 90                _advance()
 91
 92            expr_type = exp.JSONPathScript if script else exp.JSONPathFilter
 93            return expr_type(this=path[tokens[start].start : tokens[i].end])
 94
 95        number = "-" if _match(TokenType.DASH) else ""
 96
 97        token = _match(TokenType.NUMBER)
 98        if token:
 99            number += token.text
100
101        if number:
102            return int(number)
103
104        return False
105
106    def _parse_slice() -> t.Any:
107        start = _parse_literal()
108        end = _parse_literal() if _match(TokenType.COLON) else None
109        step = _parse_literal() if _match(TokenType.COLON) else None
110
111        if end is None and step is None:
112            return start
113
114        return exp.JSONPathSlice(start=start, end=end, step=step)
115
116    def _parse_bracket() -> exp.JSONPathPart:
117        literal = _parse_slice()
118
119        if isinstance(literal, str) or literal is not False:
120            indexes = [literal]
121            while _match(TokenType.COMMA):
122                literal = _parse_slice()
123
124                if literal:
125                    indexes.append(literal)
126
127            if len(indexes) == 1:
128                if isinstance(literal, str):
129                    node: exp.JSONPathPart = exp.JSONPathKey(this=indexes[0])
130                elif isinstance(literal, exp.JSONPathPart) and isinstance(
131                    literal, (exp.JSONPathScript, exp.JSONPathFilter)
132                ):
133                    node = exp.JSONPathSelector(this=indexes[0])
134                else:
135                    node = exp.JSONPathSubscript(this=indexes[0])
136            else:
137                node = exp.JSONPathUnion(expressions=indexes)
138        else:
139            raise ParseError(_error("Cannot have empty segment"))
140
141        _match(TokenType.R_BRACKET, raise_unmatched=True)
142
143        return node
144
145    # We canonicalize the JSON path AST so that it always starts with a
146    # "root" element, so paths like "field" will be generated as "$.field"
147    _match(TokenType.DOLLAR)
148    expressions: t.List[exp.JSONPathPart] = [exp.JSONPathRoot()]
149
150    while _curr():
151        if _match(TokenType.DOT) or _match(TokenType.COLON):
152            recursive = _prev().text == ".."
153
154            if _match(TokenType.VAR) or _match(TokenType.IDENTIFIER):
155                value: t.Optional[str | exp.JSONPathWildcard] = _prev().text
156            elif _match(TokenType.STAR):
157                value = exp.JSONPathWildcard()
158            else:
159                value = None
160
161            if recursive:
162                expressions.append(exp.JSONPathRecursive(this=value))
163            elif value:
164                expressions.append(exp.JSONPathKey(this=value))
165            else:
166                raise ParseError(_error("Expected key name or * after DOT"))
167        elif _match(TokenType.L_BRACKET):
168            expressions.append(_parse_bracket())
169        elif _match(TokenType.VAR) or _match(TokenType.IDENTIFIER):
170            expressions.append(exp.JSONPathKey(this=_prev().text))
171        elif _match(TokenType.STAR):
172            expressions.append(exp.JSONPathWildcard())
173        else:
174            raise ParseError(_error(f"Unexpected {tokens[i].token_type}"))
175
176    return exp.JSONPath(expressions=expressions)
177
178
179JSON_PATH_PART_TRANSFORMS: t.Dict[t.Type[exp.Expression], t.Callable[..., str]] = {
180    exp.JSONPathFilter: lambda _, e: f"?{e.this}",
181    exp.JSONPathKey: lambda self, e: self._jsonpathkey_sql(e),
182    exp.JSONPathRecursive: lambda _, e: f"..{e.this or ''}",
183    exp.JSONPathRoot: lambda *_: "$",
184    exp.JSONPathScript: lambda _, e: f"({e.this}",
185    exp.JSONPathSelector: lambda self, e: f"[{self.json_path_part(e.this)}]",
186    exp.JSONPathSlice: lambda self, e: ":".join(
187        "" if p is False else self.json_path_part(p)
188        for p in [e.args.get("start"), e.args.get("end"), e.args.get("step")]
189        if p is not None
190    ),
191    exp.JSONPathSubscript: lambda self, e: self._jsonpathsubscript_sql(e),
192    exp.JSONPathUnion: lambda self,
193    e: f"[{','.join(self.json_path_part(p) for p in e.expressions)}]",
194    exp.JSONPathWildcard: lambda *_: "*",
195}
196
197ALL_JSON_PATH_PARTS = set(JSON_PATH_PART_TRANSFORMS)
class JSONPathTokenizer(sqlglot.tokens.Tokenizer):
14class JSONPathTokenizer(Tokenizer):
15    SINGLE_TOKENS = {
16        "(": TokenType.L_PAREN,
17        ")": TokenType.R_PAREN,
18        "[": TokenType.L_BRACKET,
19        "]": TokenType.R_BRACKET,
20        ":": TokenType.COLON,
21        ",": TokenType.COMMA,
22        "-": TokenType.DASH,
23        ".": TokenType.DOT,
24        "?": TokenType.PLACEHOLDER,
25        "@": TokenType.PARAMETER,
26        "'": TokenType.QUOTE,
27        '"': TokenType.QUOTE,
28        "$": TokenType.DOLLAR,
29        "*": TokenType.STAR,
30    }
31
32    KEYWORDS = {
33        "..": TokenType.DOT,
34    }
35
36    IDENTIFIER_ESCAPES = ["\\"]
37    STRING_ESCAPES = ["\\"]
SINGLE_TOKENS = {'(': <TokenType.L_PAREN: 'L_PAREN'>, ')': <TokenType.R_PAREN: 'R_PAREN'>, '[': <TokenType.L_BRACKET: 'L_BRACKET'>, ']': <TokenType.R_BRACKET: 'R_BRACKET'>, ':': <TokenType.COLON: 'COLON'>, ',': <TokenType.COMMA: 'COMMA'>, '-': <TokenType.DASH: 'DASH'>, '.': <TokenType.DOT: 'DOT'>, '?': <TokenType.PLACEHOLDER: 'PLACEHOLDER'>, '@': <TokenType.PARAMETER: 'PARAMETER'>, "'": <TokenType.QUOTE: 'QUOTE'>, '"': <TokenType.QUOTE: 'QUOTE'>, '$': <TokenType.DOLLAR: 'DOLLAR'>, '*': <TokenType.STAR: 'STAR'>}
KEYWORDS = {'..': <TokenType.DOT: 'DOT'>}
IDENTIFIER_ESCAPES = ['\\']
STRING_ESCAPES = ['\\']
def parse(path: str) -> sqlglot.expressions.JSONPath:
 40def parse(path: str) -> exp.JSONPath:
 41    """Takes in a JSON path string and parses it into a JSONPath expression."""
 42    tokens = JSONPathTokenizer().tokenize(path)
 43    size = len(tokens)
 44
 45    i = 0
 46
 47    def _curr() -> t.Optional[TokenType]:
 48        return tokens[i].token_type if i < size else None
 49
 50    def _prev() -> Token:
 51        return tokens[i - 1]
 52
 53    def _advance() -> Token:
 54        nonlocal i
 55        i += 1
 56        return _prev()
 57
 58    def _error(msg: str) -> str:
 59        return f"{msg} at index {i}: {path}"
 60
 61    @t.overload
 62    def _match(token_type: TokenType, raise_unmatched: Lit[True] = True) -> Token:
 63        pass
 64
 65    @t.overload
 66    def _match(token_type: TokenType, raise_unmatched: Lit[False] = False) -> t.Optional[Token]:
 67        pass
 68
 69    def _match(token_type, raise_unmatched=False):
 70        if _curr() == token_type:
 71            return _advance()
 72        if raise_unmatched:
 73            raise ParseError(_error(f"Expected {token_type}"))
 74        return None
 75
 76    def _parse_literal() -> t.Any:
 77        token = _match(TokenType.STRING) or _match(TokenType.IDENTIFIER)
 78        if token:
 79            return token.text
 80        if _match(TokenType.STAR):
 81            return exp.JSONPathWildcard()
 82        if _match(TokenType.PLACEHOLDER) or _match(TokenType.L_PAREN):
 83            script = _prev().text == "("
 84            start = i
 85
 86            while True:
 87                if _match(TokenType.L_BRACKET):
 88                    _parse_bracket()  # nested call which we can throw away
 89                if _curr() in (TokenType.R_BRACKET, None):
 90                    break
 91                _advance()
 92
 93            expr_type = exp.JSONPathScript if script else exp.JSONPathFilter
 94            return expr_type(this=path[tokens[start].start : tokens[i].end])
 95
 96        number = "-" if _match(TokenType.DASH) else ""
 97
 98        token = _match(TokenType.NUMBER)
 99        if token:
100            number += token.text
101
102        if number:
103            return int(number)
104
105        return False
106
107    def _parse_slice() -> t.Any:
108        start = _parse_literal()
109        end = _parse_literal() if _match(TokenType.COLON) else None
110        step = _parse_literal() if _match(TokenType.COLON) else None
111
112        if end is None and step is None:
113            return start
114
115        return exp.JSONPathSlice(start=start, end=end, step=step)
116
117    def _parse_bracket() -> exp.JSONPathPart:
118        literal = _parse_slice()
119
120        if isinstance(literal, str) or literal is not False:
121            indexes = [literal]
122            while _match(TokenType.COMMA):
123                literal = _parse_slice()
124
125                if literal:
126                    indexes.append(literal)
127
128            if len(indexes) == 1:
129                if isinstance(literal, str):
130                    node: exp.JSONPathPart = exp.JSONPathKey(this=indexes[0])
131                elif isinstance(literal, exp.JSONPathPart) and isinstance(
132                    literal, (exp.JSONPathScript, exp.JSONPathFilter)
133                ):
134                    node = exp.JSONPathSelector(this=indexes[0])
135                else:
136                    node = exp.JSONPathSubscript(this=indexes[0])
137            else:
138                node = exp.JSONPathUnion(expressions=indexes)
139        else:
140            raise ParseError(_error("Cannot have empty segment"))
141
142        _match(TokenType.R_BRACKET, raise_unmatched=True)
143
144        return node
145
146    # We canonicalize the JSON path AST so that it always starts with a
147    # "root" element, so paths like "field" will be generated as "$.field"
148    _match(TokenType.DOLLAR)
149    expressions: t.List[exp.JSONPathPart] = [exp.JSONPathRoot()]
150
151    while _curr():
152        if _match(TokenType.DOT) or _match(TokenType.COLON):
153            recursive = _prev().text == ".."
154
155            if _match(TokenType.VAR) or _match(TokenType.IDENTIFIER):
156                value: t.Optional[str | exp.JSONPathWildcard] = _prev().text
157            elif _match(TokenType.STAR):
158                value = exp.JSONPathWildcard()
159            else:
160                value = None
161
162            if recursive:
163                expressions.append(exp.JSONPathRecursive(this=value))
164            elif value:
165                expressions.append(exp.JSONPathKey(this=value))
166            else:
167                raise ParseError(_error("Expected key name or * after DOT"))
168        elif _match(TokenType.L_BRACKET):
169            expressions.append(_parse_bracket())
170        elif _match(TokenType.VAR) or _match(TokenType.IDENTIFIER):
171            expressions.append(exp.JSONPathKey(this=_prev().text))
172        elif _match(TokenType.STAR):
173            expressions.append(exp.JSONPathWildcard())
174        else:
175            raise ParseError(_error(f"Unexpected {tokens[i].token_type}"))
176
177    return exp.JSONPath(expressions=expressions)

Takes in a JSON path string and parses it into a JSONPath expression.

JSON_PATH_PART_TRANSFORMS: Dict[Type[sqlglot.expressions.Expression], Callable[..., str]] = {<class 'sqlglot.expressions.JSONPathFilter'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathKey'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathRecursive'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathRoot'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathScript'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathSelector'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathSlice'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathSubscript'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathUnion'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathWildcard'>: <function <lambda>>}