sqlglot.jsonpath
1from __future__ import annotations 2 3import typing as t 4 5import sqlglot.expressions as exp 6from sqlglot.errors import ParseError 7from sqlglot.tokens import Token, Tokenizer, TokenType 8 9if t.TYPE_CHECKING: 10 from sqlglot._typing import Lit 11 from sqlglot.dialects.dialect import DialectType 12 13 14class JSONPathTokenizer(Tokenizer): 15 SINGLE_TOKENS = { 16 "(": TokenType.L_PAREN, 17 ")": TokenType.R_PAREN, 18 "[": TokenType.L_BRACKET, 19 "]": TokenType.R_BRACKET, 20 ":": TokenType.COLON, 21 ",": TokenType.COMMA, 22 "-": TokenType.DASH, 23 ".": TokenType.DOT, 24 "?": TokenType.PLACEHOLDER, 25 "@": TokenType.PARAMETER, 26 "'": TokenType.QUOTE, 27 '"': TokenType.QUOTE, 28 "$": TokenType.DOLLAR, 29 "*": TokenType.STAR, 30 } 31 32 KEYWORDS = { 33 "..": TokenType.DOT, 34 } 35 36 IDENTIFIER_ESCAPES = ["\\"] 37 STRING_ESCAPES = ["\\"] 38 39 40def parse(path: str, dialect: DialectType = None) -> exp.JSONPath: 41 """Takes in a JSON path string and parses it into a JSONPath expression.""" 42 from sqlglot.dialects import Dialect 43 44 jsonpath_tokenizer = Dialect.get_or_raise(dialect).jsonpath_tokenizer 45 tokens = jsonpath_tokenizer.tokenize(path) 46 size = len(tokens) 47 48 i = 0 49 50 def _curr() -> t.Optional[TokenType]: 51 return tokens[i].token_type if i < size else None 52 53 def _prev() -> Token: 54 return tokens[i - 1] 55 56 def _advance() -> Token: 57 nonlocal i 58 i += 1 59 return _prev() 60 61 def _error(msg: str) -> str: 62 return f"{msg} at index {i}: {path}" 63 64 @t.overload 65 def _match(token_type: TokenType, raise_unmatched: Lit[True] = True) -> Token: 66 pass 67 68 @t.overload 69 def _match(token_type: TokenType, raise_unmatched: Lit[False] = False) -> t.Optional[Token]: 70 pass 71 72 def _match(token_type, raise_unmatched=False): 73 if _curr() == token_type: 74 return _advance() 75 if raise_unmatched: 76 raise ParseError(_error(f"Expected {token_type}")) 77 return None 78 79 def _parse_literal() -> t.Any: 80 token = _match(TokenType.STRING) or _match(TokenType.IDENTIFIER) 81 if token: 82 return token.text 83 if _match(TokenType.STAR): 84 return exp.JSONPathWildcard() 85 if _match(TokenType.PLACEHOLDER) or _match(TokenType.L_PAREN): 86 script = _prev().text == "(" 87 start = i 88 89 while True: 90 if _match(TokenType.L_BRACKET): 91 _parse_bracket() # nested call which we can throw away 92 if _curr() in (TokenType.R_BRACKET, None): 93 break 94 _advance() 95 96 expr_type = exp.JSONPathScript if script else exp.JSONPathFilter 97 return expr_type(this=path[tokens[start].start : tokens[i].end]) 98 99 number = "-" if _match(TokenType.DASH) else "" 100 101 token = _match(TokenType.NUMBER) 102 if token: 103 number += token.text 104 105 if number: 106 return int(number) 107 108 return False 109 110 def _parse_slice() -> t.Any: 111 start = _parse_literal() 112 end = _parse_literal() if _match(TokenType.COLON) else None 113 step = _parse_literal() if _match(TokenType.COLON) else None 114 115 if end is None and step is None: 116 return start 117 118 return exp.JSONPathSlice(start=start, end=end, step=step) 119 120 def _parse_bracket() -> exp.JSONPathPart: 121 literal = _parse_slice() 122 123 if isinstance(literal, str) or literal is not False: 124 indexes = [literal] 125 while _match(TokenType.COMMA): 126 literal = _parse_slice() 127 128 if literal: 129 indexes.append(literal) 130 131 if len(indexes) == 1: 132 if isinstance(literal, str): 133 node: exp.JSONPathPart = exp.JSONPathKey(this=indexes[0]) 134 elif isinstance(literal, exp.JSONPathPart) and isinstance( 135 literal, (exp.JSONPathScript, exp.JSONPathFilter) 136 ): 137 node = exp.JSONPathSelector(this=indexes[0]) 138 else: 139 node = exp.JSONPathSubscript(this=indexes[0]) 140 else: 141 node = exp.JSONPathUnion(expressions=indexes) 142 else: 143 raise ParseError(_error("Cannot have empty segment")) 144 145 _match(TokenType.R_BRACKET, raise_unmatched=True) 146 147 return node 148 149 # We canonicalize the JSON path AST so that it always starts with a 150 # "root" element, so paths like "field" will be generated as "$.field" 151 _match(TokenType.DOLLAR) 152 expressions: t.List[exp.JSONPathPart] = [exp.JSONPathRoot()] 153 154 while _curr(): 155 if _match(TokenType.DOT) or _match(TokenType.COLON): 156 recursive = _prev().text == ".." 157 158 if _match(TokenType.VAR) or _match(TokenType.IDENTIFIER): 159 value: t.Optional[str | exp.JSONPathWildcard] = _prev().text 160 elif _match(TokenType.STAR): 161 value = exp.JSONPathWildcard() 162 else: 163 value = None 164 165 if recursive: 166 expressions.append(exp.JSONPathRecursive(this=value)) 167 elif value: 168 expressions.append(exp.JSONPathKey(this=value)) 169 else: 170 raise ParseError(_error("Expected key name or * after DOT")) 171 elif _match(TokenType.L_BRACKET): 172 expressions.append(_parse_bracket()) 173 elif _match(TokenType.VAR) or _match(TokenType.IDENTIFIER): 174 expressions.append(exp.JSONPathKey(this=_prev().text)) 175 elif _match(TokenType.STAR): 176 expressions.append(exp.JSONPathWildcard()) 177 else: 178 raise ParseError(_error(f"Unexpected {tokens[i].token_type}")) 179 180 return exp.JSONPath(expressions=expressions) 181 182 183JSON_PATH_PART_TRANSFORMS: t.Dict[t.Type[exp.Expression], t.Callable[..., str]] = { 184 exp.JSONPathFilter: lambda _, e: f"?{e.this}", 185 exp.JSONPathKey: lambda self, e: self._jsonpathkey_sql(e), 186 exp.JSONPathRecursive: lambda _, e: f"..{e.this or ''}", 187 exp.JSONPathRoot: lambda *_: "$", 188 exp.JSONPathScript: lambda _, e: f"({e.this}", 189 exp.JSONPathSelector: lambda self, e: f"[{self.json_path_part(e.this)}]", 190 exp.JSONPathSlice: lambda self, e: ":".join( 191 "" if p is False else self.json_path_part(p) 192 for p in [e.args.get("start"), e.args.get("end"), e.args.get("step")] 193 if p is not None 194 ), 195 exp.JSONPathSubscript: lambda self, e: self._jsonpathsubscript_sql(e), 196 exp.JSONPathUnion: lambda self, 197 e: f"[{','.join(self.json_path_part(p) for p in e.expressions)}]", 198 exp.JSONPathWildcard: lambda *_: "*", 199} 200 201ALL_JSON_PATH_PARTS = set(JSON_PATH_PART_TRANSFORMS)
15class JSONPathTokenizer(Tokenizer): 16 SINGLE_TOKENS = { 17 "(": TokenType.L_PAREN, 18 ")": TokenType.R_PAREN, 19 "[": TokenType.L_BRACKET, 20 "]": TokenType.R_BRACKET, 21 ":": TokenType.COLON, 22 ",": TokenType.COMMA, 23 "-": TokenType.DASH, 24 ".": TokenType.DOT, 25 "?": TokenType.PLACEHOLDER, 26 "@": TokenType.PARAMETER, 27 "'": TokenType.QUOTE, 28 '"': TokenType.QUOTE, 29 "$": TokenType.DOLLAR, 30 "*": TokenType.STAR, 31 } 32 33 KEYWORDS = { 34 "..": TokenType.DOT, 35 } 36 37 IDENTIFIER_ESCAPES = ["\\"] 38 STRING_ESCAPES = ["\\"]
SINGLE_TOKENS =
{'(': <TokenType.L_PAREN: 'L_PAREN'>, ')': <TokenType.R_PAREN: 'R_PAREN'>, '[': <TokenType.L_BRACKET: 'L_BRACKET'>, ']': <TokenType.R_BRACKET: 'R_BRACKET'>, ':': <TokenType.COLON: 'COLON'>, ',': <TokenType.COMMA: 'COMMA'>, '-': <TokenType.DASH: 'DASH'>, '.': <TokenType.DOT: 'DOT'>, '?': <TokenType.PLACEHOLDER: 'PLACEHOLDER'>, '@': <TokenType.PARAMETER: 'PARAMETER'>, "'": <TokenType.QUOTE: 'QUOTE'>, '"': <TokenType.QUOTE: 'QUOTE'>, '$': <TokenType.DOLLAR: 'DOLLAR'>, '*': <TokenType.STAR: 'STAR'>}
Inherited Members
- sqlglot.tokens.Tokenizer
- Tokenizer
- BIT_STRINGS
- BYTE_STRINGS
- HEX_STRINGS
- RAW_STRINGS
- HEREDOC_STRINGS
- UNICODE_STRINGS
- IDENTIFIERS
- QUOTES
- VAR_SINGLE_TOKENS
- HEREDOC_TAG_IS_IDENTIFIER
- HEREDOC_STRING_ALTERNATIVE
- STRING_ESCAPES_ALLOWED_IN_RAW_STRINGS
- NESTED_COMMENTS
- WHITE_SPACE
- COMMANDS
- COMMAND_PREFIX_TOKENS
- NUMERIC_LITERALS
- COMMENTS
- dialect
- reset
- tokenize
- tokenize_rs
- size
- sql
- tokens
def
parse( path: str, dialect: Union[str, sqlglot.dialects.dialect.Dialect, Type[sqlglot.dialects.dialect.Dialect], NoneType] = None) -> sqlglot.expressions.JSONPath:
41def parse(path: str, dialect: DialectType = None) -> exp.JSONPath: 42 """Takes in a JSON path string and parses it into a JSONPath expression.""" 43 from sqlglot.dialects import Dialect 44 45 jsonpath_tokenizer = Dialect.get_or_raise(dialect).jsonpath_tokenizer 46 tokens = jsonpath_tokenizer.tokenize(path) 47 size = len(tokens) 48 49 i = 0 50 51 def _curr() -> t.Optional[TokenType]: 52 return tokens[i].token_type if i < size else None 53 54 def _prev() -> Token: 55 return tokens[i - 1] 56 57 def _advance() -> Token: 58 nonlocal i 59 i += 1 60 return _prev() 61 62 def _error(msg: str) -> str: 63 return f"{msg} at index {i}: {path}" 64 65 @t.overload 66 def _match(token_type: TokenType, raise_unmatched: Lit[True] = True) -> Token: 67 pass 68 69 @t.overload 70 def _match(token_type: TokenType, raise_unmatched: Lit[False] = False) -> t.Optional[Token]: 71 pass 72 73 def _match(token_type, raise_unmatched=False): 74 if _curr() == token_type: 75 return _advance() 76 if raise_unmatched: 77 raise ParseError(_error(f"Expected {token_type}")) 78 return None 79 80 def _parse_literal() -> t.Any: 81 token = _match(TokenType.STRING) or _match(TokenType.IDENTIFIER) 82 if token: 83 return token.text 84 if _match(TokenType.STAR): 85 return exp.JSONPathWildcard() 86 if _match(TokenType.PLACEHOLDER) or _match(TokenType.L_PAREN): 87 script = _prev().text == "(" 88 start = i 89 90 while True: 91 if _match(TokenType.L_BRACKET): 92 _parse_bracket() # nested call which we can throw away 93 if _curr() in (TokenType.R_BRACKET, None): 94 break 95 _advance() 96 97 expr_type = exp.JSONPathScript if script else exp.JSONPathFilter 98 return expr_type(this=path[tokens[start].start : tokens[i].end]) 99 100 number = "-" if _match(TokenType.DASH) else "" 101 102 token = _match(TokenType.NUMBER) 103 if token: 104 number += token.text 105 106 if number: 107 return int(number) 108 109 return False 110 111 def _parse_slice() -> t.Any: 112 start = _parse_literal() 113 end = _parse_literal() if _match(TokenType.COLON) else None 114 step = _parse_literal() if _match(TokenType.COLON) else None 115 116 if end is None and step is None: 117 return start 118 119 return exp.JSONPathSlice(start=start, end=end, step=step) 120 121 def _parse_bracket() -> exp.JSONPathPart: 122 literal = _parse_slice() 123 124 if isinstance(literal, str) or literal is not False: 125 indexes = [literal] 126 while _match(TokenType.COMMA): 127 literal = _parse_slice() 128 129 if literal: 130 indexes.append(literal) 131 132 if len(indexes) == 1: 133 if isinstance(literal, str): 134 node: exp.JSONPathPart = exp.JSONPathKey(this=indexes[0]) 135 elif isinstance(literal, exp.JSONPathPart) and isinstance( 136 literal, (exp.JSONPathScript, exp.JSONPathFilter) 137 ): 138 node = exp.JSONPathSelector(this=indexes[0]) 139 else: 140 node = exp.JSONPathSubscript(this=indexes[0]) 141 else: 142 node = exp.JSONPathUnion(expressions=indexes) 143 else: 144 raise ParseError(_error("Cannot have empty segment")) 145 146 _match(TokenType.R_BRACKET, raise_unmatched=True) 147 148 return node 149 150 # We canonicalize the JSON path AST so that it always starts with a 151 # "root" element, so paths like "field" will be generated as "$.field" 152 _match(TokenType.DOLLAR) 153 expressions: t.List[exp.JSONPathPart] = [exp.JSONPathRoot()] 154 155 while _curr(): 156 if _match(TokenType.DOT) or _match(TokenType.COLON): 157 recursive = _prev().text == ".." 158 159 if _match(TokenType.VAR) or _match(TokenType.IDENTIFIER): 160 value: t.Optional[str | exp.JSONPathWildcard] = _prev().text 161 elif _match(TokenType.STAR): 162 value = exp.JSONPathWildcard() 163 else: 164 value = None 165 166 if recursive: 167 expressions.append(exp.JSONPathRecursive(this=value)) 168 elif value: 169 expressions.append(exp.JSONPathKey(this=value)) 170 else: 171 raise ParseError(_error("Expected key name or * after DOT")) 172 elif _match(TokenType.L_BRACKET): 173 expressions.append(_parse_bracket()) 174 elif _match(TokenType.VAR) or _match(TokenType.IDENTIFIER): 175 expressions.append(exp.JSONPathKey(this=_prev().text)) 176 elif _match(TokenType.STAR): 177 expressions.append(exp.JSONPathWildcard()) 178 else: 179 raise ParseError(_error(f"Unexpected {tokens[i].token_type}")) 180 181 return exp.JSONPath(expressions=expressions)
Takes in a JSON path string and parses it into a JSONPath expression.
JSON_PATH_PART_TRANSFORMS: Dict[Type[sqlglot.expressions.Expression], Callable[..., str]] =
{<class 'sqlglot.expressions.JSONPathFilter'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathKey'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathRecursive'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathRoot'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathScript'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathSelector'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathSlice'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathSubscript'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathUnion'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathWildcard'>: <function <lambda>>}
ALL_JSON_PATH_PARTS =
{<class 'sqlglot.expressions.JSONPathKey'>, <class 'sqlglot.expressions.JSONPathWildcard'>, <class 'sqlglot.expressions.JSONPathFilter'>, <class 'sqlglot.expressions.JSONPathUnion'>, <class 'sqlglot.expressions.JSONPathSubscript'>, <class 'sqlglot.expressions.JSONPathSelector'>, <class 'sqlglot.expressions.JSONPathSlice'>, <class 'sqlglot.expressions.JSONPathScript'>, <class 'sqlglot.expressions.JSONPathRoot'>, <class 'sqlglot.expressions.JSONPathRecursive'>}