sqlglot.jsonpath
1from __future__ import annotations 2 3import typing as t 4 5import sqlglot.expressions as exp 6from sqlglot.errors import ParseError 7from sqlglot.tokens import Token, Tokenizer, TokenType 8 9if t.TYPE_CHECKING: 10 from sqlglot._typing import Lit 11 from sqlglot.dialects.dialect import DialectType 12 13 14class JSONPathTokenizer(Tokenizer): 15 SINGLE_TOKENS = { 16 "(": TokenType.L_PAREN, 17 ")": TokenType.R_PAREN, 18 "[": TokenType.L_BRACKET, 19 "]": TokenType.R_BRACKET, 20 ":": TokenType.COLON, 21 ",": TokenType.COMMA, 22 "-": TokenType.DASH, 23 ".": TokenType.DOT, 24 "?": TokenType.PLACEHOLDER, 25 "@": TokenType.PARAMETER, 26 "'": TokenType.QUOTE, 27 '"': TokenType.QUOTE, 28 "$": TokenType.DOLLAR, 29 "*": TokenType.STAR, 30 } 31 32 KEYWORDS = { 33 "..": TokenType.DOT, 34 } 35 36 IDENTIFIER_ESCAPES = ["\\"] 37 STRING_ESCAPES = ["\\"] 38 39 40def parse(path: str, dialect: DialectType = None) -> exp.JSONPath: 41 """Takes in a JSON path string and parses it into a JSONPath expression.""" 42 from sqlglot.dialects import Dialect 43 44 jsonpath_tokenizer = Dialect.get_or_raise(dialect).jsonpath_tokenizer 45 tokens = jsonpath_tokenizer.tokenize(path) 46 size = len(tokens) 47 48 i = 0 49 50 def _curr() -> t.Optional[TokenType]: 51 return tokens[i].token_type if i < size else None 52 53 def _prev() -> Token: 54 return tokens[i - 1] 55 56 def _advance() -> Token: 57 nonlocal i 58 i += 1 59 return _prev() 60 61 def _error(msg: str) -> str: 62 return f"{msg} at index {i}: {path}" 63 64 @t.overload 65 def _match(token_type: TokenType, raise_unmatched: Lit[True] = True) -> Token: 66 pass 67 68 @t.overload 69 def _match(token_type: TokenType, raise_unmatched: Lit[False] = False) -> t.Optional[Token]: 70 pass 71 72 def _match(token_type, raise_unmatched=False): 73 if _curr() == token_type: 74 return _advance() 75 if raise_unmatched: 76 raise ParseError(_error(f"Expected {token_type}")) 77 return None 78 79 def _parse_literal() -> t.Any: 80 token = _match(TokenType.STRING) or _match(TokenType.IDENTIFIER) 81 if token: 82 return token.text 83 if _match(TokenType.STAR): 84 return exp.JSONPathWildcard() 85 if _match(TokenType.PLACEHOLDER) or _match(TokenType.L_PAREN): 86 script = _prev().text == "(" 87 start = i 88 89 while True: 90 if _match(TokenType.L_BRACKET): 91 _parse_bracket() # nested call which we can throw away 92 if _curr() in (TokenType.R_BRACKET, None): 93 break 94 _advance() 95 96 expr_type = exp.JSONPathScript if script else exp.JSONPathFilter 97 return expr_type(this=path[tokens[start].start : tokens[i].end]) 98 99 number = "-" if _match(TokenType.DASH) else "" 100 101 token = _match(TokenType.NUMBER) 102 if token: 103 number += token.text 104 105 if number: 106 return int(number) 107 108 return False 109 110 def _parse_slice() -> t.Any: 111 start = _parse_literal() 112 end = _parse_literal() if _match(TokenType.COLON) else None 113 step = _parse_literal() if _match(TokenType.COLON) else None 114 115 if end is None and step is None: 116 return start 117 118 return exp.JSONPathSlice(start=start, end=end, step=step) 119 120 def _parse_bracket() -> exp.JSONPathPart: 121 literal = _parse_slice() 122 123 if isinstance(literal, str) or literal is not False: 124 indexes = [literal] 125 while _match(TokenType.COMMA): 126 literal = _parse_slice() 127 128 if literal: 129 indexes.append(literal) 130 131 if len(indexes) == 1: 132 if isinstance(literal, str): 133 node: exp.JSONPathPart = exp.JSONPathKey(this=indexes[0]) 134 elif isinstance(literal, exp.JSONPathPart) and isinstance( 135 literal, (exp.JSONPathScript, exp.JSONPathFilter) 136 ): 137 node = exp.JSONPathSelector(this=indexes[0]) 138 else: 139 node = exp.JSONPathSubscript(this=indexes[0]) 140 else: 141 node = exp.JSONPathUnion(expressions=indexes) 142 else: 143 raise ParseError(_error("Cannot have empty segment")) 144 145 _match(TokenType.R_BRACKET, raise_unmatched=True) 146 147 return node 148 149 def _parse_var_text() -> str: 150 """ 151 Consumes & returns the text for a var. In BigQuery it's valid to have a key with spaces 152 in it, e.g JSON_QUERY(..., '$. a b c ') should produce a single JSONPathKey(' a b c '). 153 This is done by merging "consecutive" vars until a key separator is found (dot, colon etc) 154 or the path string is exhausted. 155 """ 156 prev_index = i - 2 157 158 while _match(TokenType.VAR): 159 pass 160 161 start = 0 if prev_index < 0 else tokens[prev_index].end + 1 162 163 if i >= len(tokens): 164 # This key is the last token for the path, so it's text is the remaining path 165 text = path[start:] 166 else: 167 text = path[start : tokens[i].start] 168 169 return text 170 171 # We canonicalize the JSON path AST so that it always starts with a 172 # "root" element, so paths like "field" will be generated as "$.field" 173 _match(TokenType.DOLLAR) 174 expressions: t.List[exp.JSONPathPart] = [exp.JSONPathRoot()] 175 176 while _curr(): 177 if _match(TokenType.DOT) or _match(TokenType.COLON): 178 recursive = _prev().text == ".." 179 180 if _match(TokenType.VAR): 181 value: t.Optional[str | exp.JSONPathWildcard] = _parse_var_text() 182 elif _match(TokenType.IDENTIFIER): 183 value = _prev().text 184 elif _match(TokenType.STAR): 185 value = exp.JSONPathWildcard() 186 else: 187 value = None 188 189 if recursive: 190 expressions.append(exp.JSONPathRecursive(this=value)) 191 elif value: 192 expressions.append(exp.JSONPathKey(this=value)) 193 else: 194 raise ParseError(_error("Expected key name or * after DOT")) 195 elif _match(TokenType.L_BRACKET): 196 expressions.append(_parse_bracket()) 197 elif _match(TokenType.VAR): 198 expressions.append(exp.JSONPathKey(this=_parse_var_text())) 199 elif _match(TokenType.IDENTIFIER): 200 expressions.append(exp.JSONPathKey(this=_prev().text)) 201 elif _match(TokenType.STAR): 202 expressions.append(exp.JSONPathWildcard()) 203 else: 204 raise ParseError(_error(f"Unexpected {tokens[i].token_type}")) 205 206 return exp.JSONPath(expressions=expressions) 207 208 209JSON_PATH_PART_TRANSFORMS: t.Dict[t.Type[exp.Expression], t.Callable[..., str]] = { 210 exp.JSONPathFilter: lambda _, e: f"?{e.this}", 211 exp.JSONPathKey: lambda self, e: self._jsonpathkey_sql(e), 212 exp.JSONPathRecursive: lambda _, e: f"..{e.this or ''}", 213 exp.JSONPathRoot: lambda *_: "$", 214 exp.JSONPathScript: lambda _, e: f"({e.this}", 215 exp.JSONPathSelector: lambda self, e: f"[{self.json_path_part(e.this)}]", 216 exp.JSONPathSlice: lambda self, e: ":".join( 217 "" if p is False else self.json_path_part(p) 218 for p in [e.args.get("start"), e.args.get("end"), e.args.get("step")] 219 if p is not None 220 ), 221 exp.JSONPathSubscript: lambda self, e: self._jsonpathsubscript_sql(e), 222 exp.JSONPathUnion: lambda self, 223 e: f"[{','.join(self.json_path_part(p) for p in e.expressions)}]", 224 exp.JSONPathWildcard: lambda *_: "*", 225} 226 227ALL_JSON_PATH_PARTS = set(JSON_PATH_PART_TRANSFORMS)
15class JSONPathTokenizer(Tokenizer): 16 SINGLE_TOKENS = { 17 "(": TokenType.L_PAREN, 18 ")": TokenType.R_PAREN, 19 "[": TokenType.L_BRACKET, 20 "]": TokenType.R_BRACKET, 21 ":": TokenType.COLON, 22 ",": TokenType.COMMA, 23 "-": TokenType.DASH, 24 ".": TokenType.DOT, 25 "?": TokenType.PLACEHOLDER, 26 "@": TokenType.PARAMETER, 27 "'": TokenType.QUOTE, 28 '"': TokenType.QUOTE, 29 "$": TokenType.DOLLAR, 30 "*": TokenType.STAR, 31 } 32 33 KEYWORDS = { 34 "..": TokenType.DOT, 35 } 36 37 IDENTIFIER_ESCAPES = ["\\"] 38 STRING_ESCAPES = ["\\"]
SINGLE_TOKENS =
{'(': <TokenType.L_PAREN: 'L_PAREN'>, ')': <TokenType.R_PAREN: 'R_PAREN'>, '[': <TokenType.L_BRACKET: 'L_BRACKET'>, ']': <TokenType.R_BRACKET: 'R_BRACKET'>, ':': <TokenType.COLON: 'COLON'>, ',': <TokenType.COMMA: 'COMMA'>, '-': <TokenType.DASH: 'DASH'>, '.': <TokenType.DOT: 'DOT'>, '?': <TokenType.PLACEHOLDER: 'PLACEHOLDER'>, '@': <TokenType.PARAMETER: 'PARAMETER'>, "'": <TokenType.QUOTE: 'QUOTE'>, '"': <TokenType.QUOTE: 'QUOTE'>, '$': <TokenType.DOLLAR: 'DOLLAR'>, '*': <TokenType.STAR: 'STAR'>}
Inherited Members
- sqlglot.tokens.Tokenizer
- Tokenizer
- BIT_STRINGS
- BYTE_STRINGS
- HEX_STRINGS
- RAW_STRINGS
- HEREDOC_STRINGS
- UNICODE_STRINGS
- IDENTIFIERS
- QUOTES
- VAR_SINGLE_TOKENS
- HEREDOC_TAG_IS_IDENTIFIER
- HEREDOC_STRING_ALTERNATIVE
- STRING_ESCAPES_ALLOWED_IN_RAW_STRINGS
- NESTED_COMMENTS
- HINT_START
- TOKENS_PRECEDING_HINT
- WHITE_SPACE
- COMMANDS
- COMMAND_PREFIX_TOKENS
- NUMERIC_LITERALS
- COMMENTS
- dialect
- reset
- tokenize
- tokenize_rs
- size
- sql
- tokens
def
parse( path: str, dialect: Union[str, sqlglot.dialects.dialect.Dialect, Type[sqlglot.dialects.dialect.Dialect], NoneType] = None) -> sqlglot.expressions.JSONPath:
41def parse(path: str, dialect: DialectType = None) -> exp.JSONPath: 42 """Takes in a JSON path string and parses it into a JSONPath expression.""" 43 from sqlglot.dialects import Dialect 44 45 jsonpath_tokenizer = Dialect.get_or_raise(dialect).jsonpath_tokenizer 46 tokens = jsonpath_tokenizer.tokenize(path) 47 size = len(tokens) 48 49 i = 0 50 51 def _curr() -> t.Optional[TokenType]: 52 return tokens[i].token_type if i < size else None 53 54 def _prev() -> Token: 55 return tokens[i - 1] 56 57 def _advance() -> Token: 58 nonlocal i 59 i += 1 60 return _prev() 61 62 def _error(msg: str) -> str: 63 return f"{msg} at index {i}: {path}" 64 65 @t.overload 66 def _match(token_type: TokenType, raise_unmatched: Lit[True] = True) -> Token: 67 pass 68 69 @t.overload 70 def _match(token_type: TokenType, raise_unmatched: Lit[False] = False) -> t.Optional[Token]: 71 pass 72 73 def _match(token_type, raise_unmatched=False): 74 if _curr() == token_type: 75 return _advance() 76 if raise_unmatched: 77 raise ParseError(_error(f"Expected {token_type}")) 78 return None 79 80 def _parse_literal() -> t.Any: 81 token = _match(TokenType.STRING) or _match(TokenType.IDENTIFIER) 82 if token: 83 return token.text 84 if _match(TokenType.STAR): 85 return exp.JSONPathWildcard() 86 if _match(TokenType.PLACEHOLDER) or _match(TokenType.L_PAREN): 87 script = _prev().text == "(" 88 start = i 89 90 while True: 91 if _match(TokenType.L_BRACKET): 92 _parse_bracket() # nested call which we can throw away 93 if _curr() in (TokenType.R_BRACKET, None): 94 break 95 _advance() 96 97 expr_type = exp.JSONPathScript if script else exp.JSONPathFilter 98 return expr_type(this=path[tokens[start].start : tokens[i].end]) 99 100 number = "-" if _match(TokenType.DASH) else "" 101 102 token = _match(TokenType.NUMBER) 103 if token: 104 number += token.text 105 106 if number: 107 return int(number) 108 109 return False 110 111 def _parse_slice() -> t.Any: 112 start = _parse_literal() 113 end = _parse_literal() if _match(TokenType.COLON) else None 114 step = _parse_literal() if _match(TokenType.COLON) else None 115 116 if end is None and step is None: 117 return start 118 119 return exp.JSONPathSlice(start=start, end=end, step=step) 120 121 def _parse_bracket() -> exp.JSONPathPart: 122 literal = _parse_slice() 123 124 if isinstance(literal, str) or literal is not False: 125 indexes = [literal] 126 while _match(TokenType.COMMA): 127 literal = _parse_slice() 128 129 if literal: 130 indexes.append(literal) 131 132 if len(indexes) == 1: 133 if isinstance(literal, str): 134 node: exp.JSONPathPart = exp.JSONPathKey(this=indexes[0]) 135 elif isinstance(literal, exp.JSONPathPart) and isinstance( 136 literal, (exp.JSONPathScript, exp.JSONPathFilter) 137 ): 138 node = exp.JSONPathSelector(this=indexes[0]) 139 else: 140 node = exp.JSONPathSubscript(this=indexes[0]) 141 else: 142 node = exp.JSONPathUnion(expressions=indexes) 143 else: 144 raise ParseError(_error("Cannot have empty segment")) 145 146 _match(TokenType.R_BRACKET, raise_unmatched=True) 147 148 return node 149 150 def _parse_var_text() -> str: 151 """ 152 Consumes & returns the text for a var. In BigQuery it's valid to have a key with spaces 153 in it, e.g JSON_QUERY(..., '$. a b c ') should produce a single JSONPathKey(' a b c '). 154 This is done by merging "consecutive" vars until a key separator is found (dot, colon etc) 155 or the path string is exhausted. 156 """ 157 prev_index = i - 2 158 159 while _match(TokenType.VAR): 160 pass 161 162 start = 0 if prev_index < 0 else tokens[prev_index].end + 1 163 164 if i >= len(tokens): 165 # This key is the last token for the path, so it's text is the remaining path 166 text = path[start:] 167 else: 168 text = path[start : tokens[i].start] 169 170 return text 171 172 # We canonicalize the JSON path AST so that it always starts with a 173 # "root" element, so paths like "field" will be generated as "$.field" 174 _match(TokenType.DOLLAR) 175 expressions: t.List[exp.JSONPathPart] = [exp.JSONPathRoot()] 176 177 while _curr(): 178 if _match(TokenType.DOT) or _match(TokenType.COLON): 179 recursive = _prev().text == ".." 180 181 if _match(TokenType.VAR): 182 value: t.Optional[str | exp.JSONPathWildcard] = _parse_var_text() 183 elif _match(TokenType.IDENTIFIER): 184 value = _prev().text 185 elif _match(TokenType.STAR): 186 value = exp.JSONPathWildcard() 187 else: 188 value = None 189 190 if recursive: 191 expressions.append(exp.JSONPathRecursive(this=value)) 192 elif value: 193 expressions.append(exp.JSONPathKey(this=value)) 194 else: 195 raise ParseError(_error("Expected key name or * after DOT")) 196 elif _match(TokenType.L_BRACKET): 197 expressions.append(_parse_bracket()) 198 elif _match(TokenType.VAR): 199 expressions.append(exp.JSONPathKey(this=_parse_var_text())) 200 elif _match(TokenType.IDENTIFIER): 201 expressions.append(exp.JSONPathKey(this=_prev().text)) 202 elif _match(TokenType.STAR): 203 expressions.append(exp.JSONPathWildcard()) 204 else: 205 raise ParseError(_error(f"Unexpected {tokens[i].token_type}")) 206 207 return exp.JSONPath(expressions=expressions)
Takes in a JSON path string and parses it into a JSONPath expression.
JSON_PATH_PART_TRANSFORMS: Dict[Type[sqlglot.expressions.Expression], Callable[..., str]] =
{<class 'sqlglot.expressions.JSONPathFilter'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathKey'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathRecursive'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathRoot'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathScript'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathSelector'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathSlice'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathSubscript'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathUnion'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathWildcard'>: <function <lambda>>}
ALL_JSON_PATH_PARTS =
{<class 'sqlglot.expressions.JSONPathScript'>, <class 'sqlglot.expressions.JSONPathRoot'>, <class 'sqlglot.expressions.JSONPathRecursive'>, <class 'sqlglot.expressions.JSONPathKey'>, <class 'sqlglot.expressions.JSONPathWildcard'>, <class 'sqlglot.expressions.JSONPathFilter'>, <class 'sqlglot.expressions.JSONPathUnion'>, <class 'sqlglot.expressions.JSONPathSubscript'>, <class 'sqlglot.expressions.JSONPathSelector'>, <class 'sqlglot.expressions.JSONPathSlice'>}