sqlglot.jsonpath
1from __future__ import annotations 2 3import typing as t 4 5import sqlglot.expressions as exp 6from sqlglot.errors import ParseError 7from sqlglot.tokens import Token, Tokenizer, TokenType 8 9if t.TYPE_CHECKING: 10 from sqlglot.dialects.dialect import DialectType 11 from collections.abc import Collection 12 13 14class JSONPathTokenizer(Tokenizer): 15 SINGLE_TOKENS = { 16 "(": TokenType.L_PAREN, 17 ")": TokenType.R_PAREN, 18 "[": TokenType.L_BRACKET, 19 "]": TokenType.R_BRACKET, 20 ":": TokenType.COLON, 21 ",": TokenType.COMMA, 22 "-": TokenType.DASH, 23 ".": TokenType.DOT, 24 "?": TokenType.PLACEHOLDER, 25 "@": TokenType.PARAMETER, 26 "'": TokenType.QUOTE, 27 '"': TokenType.QUOTE, 28 "$": TokenType.DOLLAR, 29 "*": TokenType.STAR, 30 } 31 32 KEYWORDS = { 33 "..": TokenType.DOT, 34 } 35 36 IDENTIFIER_ESCAPES = ["\\"] 37 STRING_ESCAPES = ["\\"] 38 NUMBERS_CAN_HAVE_DECIMALS = False 39 40 VAR_TOKENS = { 41 TokenType.VAR, 42 } 43 44 45def parse(path: str, dialect: DialectType = None) -> exp.JSONPath: 46 """Takes in a JSON path string and parses it into a JSONPath expression.""" 47 from sqlglot.dialects import Dialect 48 49 dialect_inst = Dialect.get_or_raise(dialect) 50 jsonpath_tokenizer = dialect_inst.jsonpath_tokenizer() 51 tokens = jsonpath_tokenizer.tokenize(path) 52 size = len(tokens) 53 54 i = 0 55 56 def _curr() -> TokenType | None: 57 return tokens[i].token_type if i < size else None 58 59 def _prev() -> Token: 60 return tokens[i - 1] 61 62 def _advance() -> Token: 63 nonlocal i 64 i += 1 65 return _prev() 66 67 def _error(msg: str) -> str: 68 return f"{msg} at index {i}: {path}" 69 70 @t.overload 71 def _match(token_type: TokenType, raise_unmatched: t.Literal[True] = True) -> Token: 72 pass 73 74 @t.overload 75 def _match(token_type: TokenType, raise_unmatched: t.Literal[False] = False) -> Token | None: 76 pass 77 78 def _match(token_type, raise_unmatched=False): 79 if _curr() == token_type: 80 return _advance() 81 if raise_unmatched: 82 raise ParseError(_error(f"Expected {token_type}")) 83 return None 84 85 def _match_set(types: Collection[TokenType]) -> Token | None: 86 return _advance() if _curr() in types else None 87 88 def _parse_literal() -> t.Any: 89 token = _match(TokenType.STRING) or _match(TokenType.IDENTIFIER) 90 if token: 91 return token.text 92 if _match(TokenType.STAR): 93 return exp.JSONPathWildcard() 94 if _match(TokenType.PLACEHOLDER) or _match(TokenType.L_PAREN): 95 script = _prev().text == "(" 96 start = i 97 98 while True: 99 if _match(TokenType.L_BRACKET): 100 _parse_bracket() # nested call which we can throw away 101 if _curr() in (TokenType.R_BRACKET, None): 102 break 103 _advance() 104 105 expr_type = exp.JSONPathScript if script else exp.JSONPathFilter 106 return expr_type(this=path[tokens[start].start : tokens[i].end]) 107 108 number = "-" if _match(TokenType.DASH) else "" 109 110 token = _match(TokenType.NUMBER) 111 if token: 112 number += token.text 113 114 if number: 115 return int(number) 116 117 return False 118 119 def _parse_slice() -> t.Any: 120 start = _parse_literal() 121 end = _parse_literal() if _match(TokenType.COLON) else None 122 step = _parse_literal() if _match(TokenType.COLON) else None 123 124 if end is None and step is None: 125 return start 126 127 return exp.JSONPathSlice(start=start, end=end, step=step) 128 129 def _parse_bracket() -> exp.JSONPathPart: 130 literal = _parse_slice() 131 132 if isinstance(literal, str) or literal is not False: 133 indexes = [literal] 134 while _match(TokenType.COMMA): 135 literal = _parse_slice() 136 137 if literal: 138 indexes.append(literal) 139 140 if len(indexes) == 1: 141 if isinstance(literal, str): 142 node: exp.JSONPathPart = exp.JSONPathKey(this=indexes[0]) 143 elif isinstance(literal, exp.JSONPathPart) and isinstance( 144 literal, (exp.JSONPathScript, exp.JSONPathFilter) 145 ): 146 node = exp.JSONPathSelector(this=indexes[0]) 147 else: 148 node = exp.JSONPathSubscript(this=indexes[0]) 149 else: 150 node = exp.JSONPathUnion(expressions=indexes) 151 else: 152 raise ParseError(_error("Cannot have empty segment")) 153 154 _match(TokenType.R_BRACKET, raise_unmatched=True) 155 156 return node 157 158 def _parse_var_text() -> str: 159 """ 160 Consumes & returns the text for a var. In BigQuery it's valid to have a key with spaces 161 in it, e.g JSON_QUERY(..., '$. a b c ') should produce a single JSONPathKey(' a b c '). 162 This is done by merging "consecutive" vars until a key separator is found (dot, colon etc) 163 or the path string is exhausted. 164 """ 165 prev_index = i - 2 166 167 while _match_set(jsonpath_tokenizer.VAR_TOKENS): 168 pass 169 170 start = 0 if prev_index < 0 else tokens[prev_index].end + 1 171 172 if i >= len(tokens): 173 # This key is the last token for the path, so it's text is the remaining path 174 text = path[start:] 175 else: 176 text = path[start : tokens[i].start] 177 178 return text 179 180 # We canonicalize the JSON path AST so that it always starts with a 181 # "root" element, so paths like "field" will be generated as "$.field" 182 _match(TokenType.DOLLAR) 183 expressions: list[exp.JSONPathPart] = [exp.JSONPathRoot()] 184 185 while _curr(): 186 if _match(TokenType.DOT) or _match(TokenType.COLON): 187 recursive = _prev().text == ".." 188 189 if _match_set(jsonpath_tokenizer.VAR_TOKENS): 190 value: str | exp.JSONPathWildcard | None = _parse_var_text() 191 elif _match(TokenType.IDENTIFIER): 192 value = _prev().text 193 elif _match(TokenType.STAR): 194 value = exp.JSONPathWildcard() 195 else: 196 value = None 197 198 if recursive: 199 expressions.append(exp.JSONPathRecursive(this=value)) 200 elif value: 201 expressions.append(exp.JSONPathKey(this=value)) 202 elif not dialect_inst.JSON_PATH_SINGLE_DOT_IS_WILDCARD: 203 raise ParseError(_error("Expected key name or * after DOT")) 204 elif _match(TokenType.L_BRACKET): 205 expressions.append(_parse_bracket()) 206 elif _match_set(jsonpath_tokenizer.VAR_TOKENS): 207 expressions.append(exp.JSONPathKey(this=_parse_var_text())) 208 elif _match(TokenType.IDENTIFIER): 209 expressions.append(exp.JSONPathKey(this=_prev().text)) 210 elif _match(TokenType.STAR): 211 expressions.append(exp.JSONPathWildcard()) 212 else: 213 raise ParseError(_error(f"Unexpected {tokens[i].token_type}")) 214 215 return exp.JSONPath(expressions=expressions) 216 217 218JSON_PATH_PART_TRANSFORMS: dict[type[exp.Expr], t.Callable[..., str]] = { 219 exp.JSONPathFilter: lambda _, e: f"?{e.this}", 220 exp.JSONPathKey: lambda self, e: self._jsonpathkey_sql(e), 221 exp.JSONPathRecursive: lambda _, e: f"..{e.this or ''}", 222 exp.JSONPathRoot: lambda *_: "$", 223 exp.JSONPathScript: lambda _, e: f"({e.this}", 224 exp.JSONPathSelector: lambda self, e: f"[{self.json_path_part(e.this)}]", 225 exp.JSONPathSlice: lambda self, e: ":".join( 226 "" if p is False else self.json_path_part(p) 227 for p in [e.args.get("start"), e.args.get("end"), e.args.get("step")] 228 if p is not None 229 ), 230 exp.JSONPathSubscript: lambda self, e: self._jsonpathsubscript_sql(e), 231 exp.JSONPathUnion: lambda self, e: ( 232 f"[{','.join(self.json_path_part(p) for p in e.expressions)}]" 233 ), 234 exp.JSONPathWildcard: lambda *_: "*", 235} 236 237ALL_JSON_PATH_PARTS = set(JSON_PATH_PART_TRANSFORMS)
15class JSONPathTokenizer(Tokenizer): 16 SINGLE_TOKENS = { 17 "(": TokenType.L_PAREN, 18 ")": TokenType.R_PAREN, 19 "[": TokenType.L_BRACKET, 20 "]": TokenType.R_BRACKET, 21 ":": TokenType.COLON, 22 ",": TokenType.COMMA, 23 "-": TokenType.DASH, 24 ".": TokenType.DOT, 25 "?": TokenType.PLACEHOLDER, 26 "@": TokenType.PARAMETER, 27 "'": TokenType.QUOTE, 28 '"': TokenType.QUOTE, 29 "$": TokenType.DOLLAR, 30 "*": TokenType.STAR, 31 } 32 33 KEYWORDS = { 34 "..": TokenType.DOT, 35 } 36 37 IDENTIFIER_ESCAPES = ["\\"] 38 STRING_ESCAPES = ["\\"] 39 NUMBERS_CAN_HAVE_DECIMALS = False 40 41 VAR_TOKENS = { 42 TokenType.VAR, 43 }
SINGLE_TOKENS =
{'(': <TokenType.L_PAREN: 1>, ')': <TokenType.R_PAREN: 2>, '[': <TokenType.L_BRACKET: 3>, ']': <TokenType.R_BRACKET: 4>, ':': <TokenType.COLON: 11>, ',': <TokenType.COMMA: 7>, '-': <TokenType.DASH: 9>, '.': <TokenType.DOT: 8>, '?': <TokenType.PLACEHOLDER: 353>, '@': <TokenType.PARAMETER: 56>, "'": <TokenType.QUOTE: 365>, '"': <TokenType.QUOTE: 365>, '$': <TokenType.DOLLAR: 55>, '*': <TokenType.STAR: 20>}
Inherited Members
- sqlglot.tokens.Tokenizer
- Tokenizer
- BIT_STRINGS
- BYTE_STRINGS
- HEX_STRINGS
- RAW_STRINGS
- HEREDOC_STRINGS
- UNICODE_STRINGS
- IDENTIFIERS
- QUOTES
- VAR_SINGLE_TOKENS
- ESCAPE_FOLLOW_CHARS
- HEREDOC_TAG_IS_IDENTIFIER
- HEREDOC_STRING_ALTERNATIVE
- STRING_ESCAPES_ALLOWED_IN_RAW_STRINGS
- NESTED_COMMENTS
- HINT_START
- TOKENS_PRECEDING_HINT
- COMMANDS
- COMMAND_PREFIX_TOKENS
- NUMERIC_LITERALS
- COMMENTS
- dialect
- tokenize
- sql
- size
- tokens
def
parse( path: str, dialect: Union[str, sqlglot.dialects.Dialect, type[sqlglot.dialects.Dialect], NoneType] = None) -> sqlglot.expressions.query.JSONPath:
46def parse(path: str, dialect: DialectType = None) -> exp.JSONPath: 47 """Takes in a JSON path string and parses it into a JSONPath expression.""" 48 from sqlglot.dialects import Dialect 49 50 dialect_inst = Dialect.get_or_raise(dialect) 51 jsonpath_tokenizer = dialect_inst.jsonpath_tokenizer() 52 tokens = jsonpath_tokenizer.tokenize(path) 53 size = len(tokens) 54 55 i = 0 56 57 def _curr() -> TokenType | None: 58 return tokens[i].token_type if i < size else None 59 60 def _prev() -> Token: 61 return tokens[i - 1] 62 63 def _advance() -> Token: 64 nonlocal i 65 i += 1 66 return _prev() 67 68 def _error(msg: str) -> str: 69 return f"{msg} at index {i}: {path}" 70 71 @t.overload 72 def _match(token_type: TokenType, raise_unmatched: t.Literal[True] = True) -> Token: 73 pass 74 75 @t.overload 76 def _match(token_type: TokenType, raise_unmatched: t.Literal[False] = False) -> Token | None: 77 pass 78 79 def _match(token_type, raise_unmatched=False): 80 if _curr() == token_type: 81 return _advance() 82 if raise_unmatched: 83 raise ParseError(_error(f"Expected {token_type}")) 84 return None 85 86 def _match_set(types: Collection[TokenType]) -> Token | None: 87 return _advance() if _curr() in types else None 88 89 def _parse_literal() -> t.Any: 90 token = _match(TokenType.STRING) or _match(TokenType.IDENTIFIER) 91 if token: 92 return token.text 93 if _match(TokenType.STAR): 94 return exp.JSONPathWildcard() 95 if _match(TokenType.PLACEHOLDER) or _match(TokenType.L_PAREN): 96 script = _prev().text == "(" 97 start = i 98 99 while True: 100 if _match(TokenType.L_BRACKET): 101 _parse_bracket() # nested call which we can throw away 102 if _curr() in (TokenType.R_BRACKET, None): 103 break 104 _advance() 105 106 expr_type = exp.JSONPathScript if script else exp.JSONPathFilter 107 return expr_type(this=path[tokens[start].start : tokens[i].end]) 108 109 number = "-" if _match(TokenType.DASH) else "" 110 111 token = _match(TokenType.NUMBER) 112 if token: 113 number += token.text 114 115 if number: 116 return int(number) 117 118 return False 119 120 def _parse_slice() -> t.Any: 121 start = _parse_literal() 122 end = _parse_literal() if _match(TokenType.COLON) else None 123 step = _parse_literal() if _match(TokenType.COLON) else None 124 125 if end is None and step is None: 126 return start 127 128 return exp.JSONPathSlice(start=start, end=end, step=step) 129 130 def _parse_bracket() -> exp.JSONPathPart: 131 literal = _parse_slice() 132 133 if isinstance(literal, str) or literal is not False: 134 indexes = [literal] 135 while _match(TokenType.COMMA): 136 literal = _parse_slice() 137 138 if literal: 139 indexes.append(literal) 140 141 if len(indexes) == 1: 142 if isinstance(literal, str): 143 node: exp.JSONPathPart = exp.JSONPathKey(this=indexes[0]) 144 elif isinstance(literal, exp.JSONPathPart) and isinstance( 145 literal, (exp.JSONPathScript, exp.JSONPathFilter) 146 ): 147 node = exp.JSONPathSelector(this=indexes[0]) 148 else: 149 node = exp.JSONPathSubscript(this=indexes[0]) 150 else: 151 node = exp.JSONPathUnion(expressions=indexes) 152 else: 153 raise ParseError(_error("Cannot have empty segment")) 154 155 _match(TokenType.R_BRACKET, raise_unmatched=True) 156 157 return node 158 159 def _parse_var_text() -> str: 160 """ 161 Consumes & returns the text for a var. In BigQuery it's valid to have a key with spaces 162 in it, e.g JSON_QUERY(..., '$. a b c ') should produce a single JSONPathKey(' a b c '). 163 This is done by merging "consecutive" vars until a key separator is found (dot, colon etc) 164 or the path string is exhausted. 165 """ 166 prev_index = i - 2 167 168 while _match_set(jsonpath_tokenizer.VAR_TOKENS): 169 pass 170 171 start = 0 if prev_index < 0 else tokens[prev_index].end + 1 172 173 if i >= len(tokens): 174 # This key is the last token for the path, so it's text is the remaining path 175 text = path[start:] 176 else: 177 text = path[start : tokens[i].start] 178 179 return text 180 181 # We canonicalize the JSON path AST so that it always starts with a 182 # "root" element, so paths like "field" will be generated as "$.field" 183 _match(TokenType.DOLLAR) 184 expressions: list[exp.JSONPathPart] = [exp.JSONPathRoot()] 185 186 while _curr(): 187 if _match(TokenType.DOT) or _match(TokenType.COLON): 188 recursive = _prev().text == ".." 189 190 if _match_set(jsonpath_tokenizer.VAR_TOKENS): 191 value: str | exp.JSONPathWildcard | None = _parse_var_text() 192 elif _match(TokenType.IDENTIFIER): 193 value = _prev().text 194 elif _match(TokenType.STAR): 195 value = exp.JSONPathWildcard() 196 else: 197 value = None 198 199 if recursive: 200 expressions.append(exp.JSONPathRecursive(this=value)) 201 elif value: 202 expressions.append(exp.JSONPathKey(this=value)) 203 elif not dialect_inst.JSON_PATH_SINGLE_DOT_IS_WILDCARD: 204 raise ParseError(_error("Expected key name or * after DOT")) 205 elif _match(TokenType.L_BRACKET): 206 expressions.append(_parse_bracket()) 207 elif _match_set(jsonpath_tokenizer.VAR_TOKENS): 208 expressions.append(exp.JSONPathKey(this=_parse_var_text())) 209 elif _match(TokenType.IDENTIFIER): 210 expressions.append(exp.JSONPathKey(this=_prev().text)) 211 elif _match(TokenType.STAR): 212 expressions.append(exp.JSONPathWildcard()) 213 else: 214 raise ParseError(_error(f"Unexpected {tokens[i].token_type}")) 215 216 return exp.JSONPath(expressions=expressions)
Takes in a JSON path string and parses it into a JSONPath expression.
JSON_PATH_PART_TRANSFORMS: dict[type[sqlglot.expressions.core.Expr], typing.Callable[..., str]] =
{<class 'sqlglot.expressions.query.JSONPathFilter'>: <function <lambda>>, <class 'sqlglot.expressions.query.JSONPathKey'>: <function <lambda>>, <class 'sqlglot.expressions.query.JSONPathRecursive'>: <function <lambda>>, <class 'sqlglot.expressions.query.JSONPathRoot'>: <function <lambda>>, <class 'sqlglot.expressions.query.JSONPathScript'>: <function <lambda>>, <class 'sqlglot.expressions.query.JSONPathSelector'>: <function <lambda>>, <class 'sqlglot.expressions.query.JSONPathSlice'>: <function <lambda>>, <class 'sqlglot.expressions.query.JSONPathSubscript'>: <function <lambda>>, <class 'sqlglot.expressions.query.JSONPathUnion'>: <function <lambda>>, <class 'sqlglot.expressions.query.JSONPathWildcard'>: <function <lambda>>}
ALL_JSON_PATH_PARTS =
{<class 'sqlglot.expressions.query.JSONPathScript'>, <class 'sqlglot.expressions.query.JSONPathRoot'>, <class 'sqlglot.expressions.query.JSONPathRecursive'>, <class 'sqlglot.expressions.query.JSONPathKey'>, <class 'sqlglot.expressions.query.JSONPathWildcard'>, <class 'sqlglot.expressions.query.JSONPathFilter'>, <class 'sqlglot.expressions.query.JSONPathUnion'>, <class 'sqlglot.expressions.query.JSONPathSubscript'>, <class 'sqlglot.expressions.query.JSONPathSelector'>, <class 'sqlglot.expressions.query.JSONPathSlice'>}