sqlglot.jsonpath
1from __future__ import annotations 2 3import typing as t 4 5import sqlglot.expressions as exp 6from sqlglot.errors import ParseError 7from sqlglot.tokens import Token, Tokenizer, TokenType 8 9if t.TYPE_CHECKING: 10 from sqlglot._typing import Lit 11 from sqlglot.dialects.dialect import DialectType 12 13 14class JSONPathTokenizer(Tokenizer): 15 SINGLE_TOKENS = { 16 "(": TokenType.L_PAREN, 17 ")": TokenType.R_PAREN, 18 "[": TokenType.L_BRACKET, 19 "]": TokenType.R_BRACKET, 20 ":": TokenType.COLON, 21 ",": TokenType.COMMA, 22 "-": TokenType.DASH, 23 ".": TokenType.DOT, 24 "?": TokenType.PLACEHOLDER, 25 "@": TokenType.PARAMETER, 26 "'": TokenType.QUOTE, 27 '"': TokenType.QUOTE, 28 "$": TokenType.DOLLAR, 29 "*": TokenType.STAR, 30 } 31 32 KEYWORDS = { 33 "..": TokenType.DOT, 34 } 35 36 IDENTIFIER_ESCAPES = ["\\"] 37 STRING_ESCAPES = ["\\"] 38 39 VAR_TOKENS = { 40 TokenType.VAR, 41 } 42 43 44def parse(path: str, dialect: DialectType = None) -> exp.JSONPath: 45 """Takes in a JSON path string and parses it into a JSONPath expression.""" 46 from sqlglot.dialects import Dialect 47 48 jsonpath_tokenizer = Dialect.get_or_raise(dialect).jsonpath_tokenizer() 49 tokens = jsonpath_tokenizer.tokenize(path) 50 size = len(tokens) 51 52 i = 0 53 54 def _curr() -> t.Optional[TokenType]: 55 return tokens[i].token_type if i < size else None 56 57 def _prev() -> Token: 58 return tokens[i - 1] 59 60 def _advance() -> Token: 61 nonlocal i 62 i += 1 63 return _prev() 64 65 def _error(msg: str) -> str: 66 return f"{msg} at index {i}: {path}" 67 68 @t.overload 69 def _match(token_type: TokenType, raise_unmatched: Lit[True] = True) -> Token: 70 pass 71 72 @t.overload 73 def _match(token_type: TokenType, raise_unmatched: Lit[False] = False) -> t.Optional[Token]: 74 pass 75 76 def _match(token_type, raise_unmatched=False): 77 if _curr() == token_type: 78 return _advance() 79 if raise_unmatched: 80 raise ParseError(_error(f"Expected {token_type}")) 81 return None 82 83 def _match_set(types: t.Collection[TokenType]) -> t.Optional[Token]: 84 return _advance() if _curr() in types else None 85 86 def _parse_literal() -> t.Any: 87 token = _match(TokenType.STRING) or _match(TokenType.IDENTIFIER) 88 if token: 89 return token.text 90 if _match(TokenType.STAR): 91 return exp.JSONPathWildcard() 92 if _match(TokenType.PLACEHOLDER) or _match(TokenType.L_PAREN): 93 script = _prev().text == "(" 94 start = i 95 96 while True: 97 if _match(TokenType.L_BRACKET): 98 _parse_bracket() # nested call which we can throw away 99 if _curr() in (TokenType.R_BRACKET, None): 100 break 101 _advance() 102 103 expr_type = exp.JSONPathScript if script else exp.JSONPathFilter 104 return expr_type(this=path[tokens[start].start : tokens[i].end]) 105 106 number = "-" if _match(TokenType.DASH) else "" 107 108 token = _match(TokenType.NUMBER) 109 if token: 110 number += token.text 111 112 if number: 113 return int(number) 114 115 return False 116 117 def _parse_slice() -> t.Any: 118 start = _parse_literal() 119 end = _parse_literal() if _match(TokenType.COLON) else None 120 step = _parse_literal() if _match(TokenType.COLON) else None 121 122 if end is None and step is None: 123 return start 124 125 return exp.JSONPathSlice(start=start, end=end, step=step) 126 127 def _parse_bracket() -> exp.JSONPathPart: 128 literal = _parse_slice() 129 130 if isinstance(literal, str) or literal is not False: 131 indexes = [literal] 132 while _match(TokenType.COMMA): 133 literal = _parse_slice() 134 135 if literal: 136 indexes.append(literal) 137 138 if len(indexes) == 1: 139 if isinstance(literal, str): 140 node: exp.JSONPathPart = exp.JSONPathKey(this=indexes[0]) 141 elif isinstance(literal, exp.JSONPathPart) and isinstance( 142 literal, (exp.JSONPathScript, exp.JSONPathFilter) 143 ): 144 node = exp.JSONPathSelector(this=indexes[0]) 145 else: 146 node = exp.JSONPathSubscript(this=indexes[0]) 147 else: 148 node = exp.JSONPathUnion(expressions=indexes) 149 else: 150 raise ParseError(_error("Cannot have empty segment")) 151 152 _match(TokenType.R_BRACKET, raise_unmatched=True) 153 154 return node 155 156 def _parse_var_text() -> str: 157 """ 158 Consumes & returns the text for a var. In BigQuery it's valid to have a key with spaces 159 in it, e.g JSON_QUERY(..., '$. a b c ') should produce a single JSONPathKey(' a b c '). 160 This is done by merging "consecutive" vars until a key separator is found (dot, colon etc) 161 or the path string is exhausted. 162 """ 163 prev_index = i - 2 164 165 while _match_set(jsonpath_tokenizer.VAR_TOKENS): 166 pass 167 168 start = 0 if prev_index < 0 else tokens[prev_index].end + 1 169 170 if i >= len(tokens): 171 # This key is the last token for the path, so it's text is the remaining path 172 text = path[start:] 173 else: 174 text = path[start : tokens[i].start] 175 176 return text 177 178 # We canonicalize the JSON path AST so that it always starts with a 179 # "root" element, so paths like "field" will be generated as "$.field" 180 _match(TokenType.DOLLAR) 181 expressions: t.List[exp.JSONPathPart] = [exp.JSONPathRoot()] 182 183 while _curr(): 184 if _match(TokenType.DOT) or _match(TokenType.COLON): 185 recursive = _prev().text == ".." 186 187 if _match_set(jsonpath_tokenizer.VAR_TOKENS): 188 value: t.Optional[str | exp.JSONPathWildcard] = _parse_var_text() 189 elif _match(TokenType.IDENTIFIER): 190 value = _prev().text 191 elif _match(TokenType.STAR): 192 value = exp.JSONPathWildcard() 193 else: 194 value = None 195 196 if recursive: 197 expressions.append(exp.JSONPathRecursive(this=value)) 198 elif value: 199 expressions.append(exp.JSONPathKey(this=value)) 200 else: 201 raise ParseError(_error("Expected key name or * after DOT")) 202 elif _match(TokenType.L_BRACKET): 203 expressions.append(_parse_bracket()) 204 elif _match_set(jsonpath_tokenizer.VAR_TOKENS): 205 expressions.append(exp.JSONPathKey(this=_parse_var_text())) 206 elif _match(TokenType.IDENTIFIER): 207 expressions.append(exp.JSONPathKey(this=_prev().text)) 208 elif _match(TokenType.STAR): 209 expressions.append(exp.JSONPathWildcard()) 210 else: 211 raise ParseError(_error(f"Unexpected {tokens[i].token_type}")) 212 213 return exp.JSONPath(expressions=expressions) 214 215 216JSON_PATH_PART_TRANSFORMS: t.Dict[t.Type[exp.Expression], t.Callable[..., str]] = { 217 exp.JSONPathFilter: lambda _, e: f"?{e.this}", 218 exp.JSONPathKey: lambda self, e: self._jsonpathkey_sql(e), 219 exp.JSONPathRecursive: lambda _, e: f"..{e.this or ''}", 220 exp.JSONPathRoot: lambda *_: "$", 221 exp.JSONPathScript: lambda _, e: f"({e.this}", 222 exp.JSONPathSelector: lambda self, e: f"[{self.json_path_part(e.this)}]", 223 exp.JSONPathSlice: lambda self, e: ":".join( 224 "" if p is False else self.json_path_part(p) 225 for p in [e.args.get("start"), e.args.get("end"), e.args.get("step")] 226 if p is not None 227 ), 228 exp.JSONPathSubscript: lambda self, e: self._jsonpathsubscript_sql(e), 229 exp.JSONPathUnion: lambda self, 230 e: f"[{','.join(self.json_path_part(p) for p in e.expressions)}]", 231 exp.JSONPathWildcard: lambda *_: "*", 232} 233 234ALL_JSON_PATH_PARTS = set(JSON_PATH_PART_TRANSFORMS)
15class JSONPathTokenizer(Tokenizer): 16 SINGLE_TOKENS = { 17 "(": TokenType.L_PAREN, 18 ")": TokenType.R_PAREN, 19 "[": TokenType.L_BRACKET, 20 "]": TokenType.R_BRACKET, 21 ":": TokenType.COLON, 22 ",": TokenType.COMMA, 23 "-": TokenType.DASH, 24 ".": TokenType.DOT, 25 "?": TokenType.PLACEHOLDER, 26 "@": TokenType.PARAMETER, 27 "'": TokenType.QUOTE, 28 '"': TokenType.QUOTE, 29 "$": TokenType.DOLLAR, 30 "*": TokenType.STAR, 31 } 32 33 KEYWORDS = { 34 "..": TokenType.DOT, 35 } 36 37 IDENTIFIER_ESCAPES = ["\\"] 38 STRING_ESCAPES = ["\\"] 39 40 VAR_TOKENS = { 41 TokenType.VAR, 42 }
SINGLE_TOKENS =
{'(': <TokenType.L_PAREN: 'L_PAREN'>, ')': <TokenType.R_PAREN: 'R_PAREN'>, '[': <TokenType.L_BRACKET: 'L_BRACKET'>, ']': <TokenType.R_BRACKET: 'R_BRACKET'>, ':': <TokenType.COLON: 'COLON'>, ',': <TokenType.COMMA: 'COMMA'>, '-': <TokenType.DASH: 'DASH'>, '.': <TokenType.DOT: 'DOT'>, '?': <TokenType.PLACEHOLDER: 'PLACEHOLDER'>, '@': <TokenType.PARAMETER: 'PARAMETER'>, "'": <TokenType.QUOTE: 'QUOTE'>, '"': <TokenType.QUOTE: 'QUOTE'>, '$': <TokenType.DOLLAR: 'DOLLAR'>, '*': <TokenType.STAR: 'STAR'>}
Inherited Members
- sqlglot.tokens.Tokenizer
- Tokenizer
- BIT_STRINGS
- BYTE_STRINGS
- HEX_STRINGS
- RAW_STRINGS
- HEREDOC_STRINGS
- UNICODE_STRINGS
- IDENTIFIERS
- QUOTES
- VAR_SINGLE_TOKENS
- HEREDOC_TAG_IS_IDENTIFIER
- HEREDOC_STRING_ALTERNATIVE
- STRING_ESCAPES_ALLOWED_IN_RAW_STRINGS
- NESTED_COMMENTS
- HINT_START
- TOKENS_PRECEDING_HINT
- WHITE_SPACE
- COMMANDS
- COMMAND_PREFIX_TOKENS
- NUMERIC_LITERALS
- COMMENTS
- dialect
- use_rs_tokenizer
- reset
- tokenize
- tokenize_rs
- size
- sql
- tokens
def
parse( path: str, dialect: Union[str, sqlglot.dialects.Dialect, Type[sqlglot.dialects.Dialect], NoneType] = None) -> sqlglot.expressions.JSONPath:
45def parse(path: str, dialect: DialectType = None) -> exp.JSONPath: 46 """Takes in a JSON path string and parses it into a JSONPath expression.""" 47 from sqlglot.dialects import Dialect 48 49 jsonpath_tokenizer = Dialect.get_or_raise(dialect).jsonpath_tokenizer() 50 tokens = jsonpath_tokenizer.tokenize(path) 51 size = len(tokens) 52 53 i = 0 54 55 def _curr() -> t.Optional[TokenType]: 56 return tokens[i].token_type if i < size else None 57 58 def _prev() -> Token: 59 return tokens[i - 1] 60 61 def _advance() -> Token: 62 nonlocal i 63 i += 1 64 return _prev() 65 66 def _error(msg: str) -> str: 67 return f"{msg} at index {i}: {path}" 68 69 @t.overload 70 def _match(token_type: TokenType, raise_unmatched: Lit[True] = True) -> Token: 71 pass 72 73 @t.overload 74 def _match(token_type: TokenType, raise_unmatched: Lit[False] = False) -> t.Optional[Token]: 75 pass 76 77 def _match(token_type, raise_unmatched=False): 78 if _curr() == token_type: 79 return _advance() 80 if raise_unmatched: 81 raise ParseError(_error(f"Expected {token_type}")) 82 return None 83 84 def _match_set(types: t.Collection[TokenType]) -> t.Optional[Token]: 85 return _advance() if _curr() in types else None 86 87 def _parse_literal() -> t.Any: 88 token = _match(TokenType.STRING) or _match(TokenType.IDENTIFIER) 89 if token: 90 return token.text 91 if _match(TokenType.STAR): 92 return exp.JSONPathWildcard() 93 if _match(TokenType.PLACEHOLDER) or _match(TokenType.L_PAREN): 94 script = _prev().text == "(" 95 start = i 96 97 while True: 98 if _match(TokenType.L_BRACKET): 99 _parse_bracket() # nested call which we can throw away 100 if _curr() in (TokenType.R_BRACKET, None): 101 break 102 _advance() 103 104 expr_type = exp.JSONPathScript if script else exp.JSONPathFilter 105 return expr_type(this=path[tokens[start].start : tokens[i].end]) 106 107 number = "-" if _match(TokenType.DASH) else "" 108 109 token = _match(TokenType.NUMBER) 110 if token: 111 number += token.text 112 113 if number: 114 return int(number) 115 116 return False 117 118 def _parse_slice() -> t.Any: 119 start = _parse_literal() 120 end = _parse_literal() if _match(TokenType.COLON) else None 121 step = _parse_literal() if _match(TokenType.COLON) else None 122 123 if end is None and step is None: 124 return start 125 126 return exp.JSONPathSlice(start=start, end=end, step=step) 127 128 def _parse_bracket() -> exp.JSONPathPart: 129 literal = _parse_slice() 130 131 if isinstance(literal, str) or literal is not False: 132 indexes = [literal] 133 while _match(TokenType.COMMA): 134 literal = _parse_slice() 135 136 if literal: 137 indexes.append(literal) 138 139 if len(indexes) == 1: 140 if isinstance(literal, str): 141 node: exp.JSONPathPart = exp.JSONPathKey(this=indexes[0]) 142 elif isinstance(literal, exp.JSONPathPart) and isinstance( 143 literal, (exp.JSONPathScript, exp.JSONPathFilter) 144 ): 145 node = exp.JSONPathSelector(this=indexes[0]) 146 else: 147 node = exp.JSONPathSubscript(this=indexes[0]) 148 else: 149 node = exp.JSONPathUnion(expressions=indexes) 150 else: 151 raise ParseError(_error("Cannot have empty segment")) 152 153 _match(TokenType.R_BRACKET, raise_unmatched=True) 154 155 return node 156 157 def _parse_var_text() -> str: 158 """ 159 Consumes & returns the text for a var. In BigQuery it's valid to have a key with spaces 160 in it, e.g JSON_QUERY(..., '$. a b c ') should produce a single JSONPathKey(' a b c '). 161 This is done by merging "consecutive" vars until a key separator is found (dot, colon etc) 162 or the path string is exhausted. 163 """ 164 prev_index = i - 2 165 166 while _match_set(jsonpath_tokenizer.VAR_TOKENS): 167 pass 168 169 start = 0 if prev_index < 0 else tokens[prev_index].end + 1 170 171 if i >= len(tokens): 172 # This key is the last token for the path, so it's text is the remaining path 173 text = path[start:] 174 else: 175 text = path[start : tokens[i].start] 176 177 return text 178 179 # We canonicalize the JSON path AST so that it always starts with a 180 # "root" element, so paths like "field" will be generated as "$.field" 181 _match(TokenType.DOLLAR) 182 expressions: t.List[exp.JSONPathPart] = [exp.JSONPathRoot()] 183 184 while _curr(): 185 if _match(TokenType.DOT) or _match(TokenType.COLON): 186 recursive = _prev().text == ".." 187 188 if _match_set(jsonpath_tokenizer.VAR_TOKENS): 189 value: t.Optional[str | exp.JSONPathWildcard] = _parse_var_text() 190 elif _match(TokenType.IDENTIFIER): 191 value = _prev().text 192 elif _match(TokenType.STAR): 193 value = exp.JSONPathWildcard() 194 else: 195 value = None 196 197 if recursive: 198 expressions.append(exp.JSONPathRecursive(this=value)) 199 elif value: 200 expressions.append(exp.JSONPathKey(this=value)) 201 else: 202 raise ParseError(_error("Expected key name or * after DOT")) 203 elif _match(TokenType.L_BRACKET): 204 expressions.append(_parse_bracket()) 205 elif _match_set(jsonpath_tokenizer.VAR_TOKENS): 206 expressions.append(exp.JSONPathKey(this=_parse_var_text())) 207 elif _match(TokenType.IDENTIFIER): 208 expressions.append(exp.JSONPathKey(this=_prev().text)) 209 elif _match(TokenType.STAR): 210 expressions.append(exp.JSONPathWildcard()) 211 else: 212 raise ParseError(_error(f"Unexpected {tokens[i].token_type}")) 213 214 return exp.JSONPath(expressions=expressions)
Takes in a JSON path string and parses it into a JSONPath expression.
JSON_PATH_PART_TRANSFORMS: Dict[Type[sqlglot.expressions.Expression], Callable[..., str]] =
{<class 'sqlglot.expressions.JSONPathFilter'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathKey'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathRecursive'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathRoot'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathScript'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathSelector'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathSlice'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathSubscript'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathUnion'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathWildcard'>: <function <lambda>>}
ALL_JSON_PATH_PARTS =
{<class 'sqlglot.expressions.JSONPathFilter'>, <class 'sqlglot.expressions.JSONPathUnion'>, <class 'sqlglot.expressions.JSONPathSubscript'>, <class 'sqlglot.expressions.JSONPathSelector'>, <class 'sqlglot.expressions.JSONPathWildcard'>, <class 'sqlglot.expressions.JSONPathSlice'>, <class 'sqlglot.expressions.JSONPathScript'>, <class 'sqlglot.expressions.JSONPathRoot'>, <class 'sqlglot.expressions.JSONPathRecursive'>, <class 'sqlglot.expressions.JSONPathKey'>}