Edit on GitHub

sqlglot.parser_core

  1from __future__ import annotations
  2
  3import typing as t
  4
  5from sqlglot.errors import ErrorLevel, ParseError, highlight_sql
  6from sqlglot.tokenizer_core import Token, TokenType
  7
  8
  9class ParserCore:
 10    __slots__ = (
 11        "error_level",
 12        "error_message_context",
 13        "max_errors",
 14        "dialect",
 15        "sql",
 16        "errors",
 17        "_tokens",
 18        "_index",
 19        "_curr",
 20        "_next",
 21        "_prev",
 22        "_prev_comments",
 23        "_pipe_cte_counter",
 24        "_chunks",
 25        "_chunk_index",
 26    )
 27
 28    def __init__(
 29        self,
 30        error_level: ErrorLevel,
 31        error_message_context: int,
 32        max_errors: int,
 33        dialect: t.Any,
 34    ) -> None:
 35        self.error_level: ErrorLevel = error_level
 36        self.error_message_context = error_message_context
 37        self.max_errors = max_errors
 38        self.dialect: t.Any = dialect
 39        self.reset()
 40
 41    def reset(self) -> None:
 42        self.sql: str = ""
 43        self.errors: t.List[ParseError] = []
 44        self._tokens: t.List[Token] = []
 45        self._index: int = 0
 46        self._curr: t.Optional[Token] = None
 47        self._next: t.Optional[Token] = None
 48        self._prev: t.Optional[Token] = None
 49        self._prev_comments: t.Optional[t.List[str]] = None
 50        self._pipe_cte_counter: int = 0
 51        self._chunks: t.List[t.List[Token]] = []
 52        self._chunk_index: int = 0
 53
 54    def _advance(self, times: int = 1) -> None:
 55        index = self._index + times
 56        self._index = index
 57        tokens = self._tokens
 58        size = len(tokens)
 59        self._curr = tokens[index] if index < size else None
 60        self._next = tokens[index + 1] if index + 1 < size else None
 61
 62        if index > 0:
 63            prev = tokens[index - 1]
 64            self._prev = prev
 65            self._prev_comments = prev.comments
 66        else:
 67            self._prev = None
 68            self._prev_comments = None
 69
 70    def _advance_chunk(self) -> None:
 71        self._index = -1
 72        self._tokens = self._chunks[self._chunk_index]
 73        self._chunk_index += 1
 74        self._advance()
 75
 76    def _retreat(self, index: int) -> None:
 77        if index != self._index:
 78            self._advance(index - self._index)
 79
 80    def _add_comments(self, expression: t.Any) -> None:
 81        if expression and self._prev_comments:
 82            expression.add_comments(self._prev_comments)
 83            self._prev_comments = None
 84
 85    def _match(self, token_type: TokenType, advance: bool = True, expression: t.Any = None) -> bool:
 86        curr = self._curr
 87        if curr and curr.token_type == token_type:
 88            if advance:
 89                self._advance()
 90            self._add_comments(expression)
 91            return True
 92        return False
 93
 94    def _match_set(self, types: t.Any, advance: bool = True) -> bool:
 95        curr = self._curr
 96        if curr and curr.token_type in types:
 97            if advance:
 98                self._advance()
 99            return True
100        return False
101
102    def _match_pair(
103        self, token_type_a: TokenType, token_type_b: TokenType, advance: bool = True
104    ) -> bool:
105        curr = self._curr
106        next_ = self._next
107        if curr and next_ and curr.token_type == token_type_a and next_.token_type == token_type_b:
108            if advance:
109                self._advance(2)
110            return True
111        return False
112
113    def _match_texts(self, texts: t.Any, advance: bool = True) -> bool:
114        curr = self._curr
115        if curr and curr.token_type != TokenType.STRING and curr.text.upper() in texts:
116            if advance:
117                self._advance()
118            return True
119        return False
120
121    def _match_text_seq(self, *texts: str, advance: bool = True) -> bool:
122        index = self._index
123        string_type = TokenType.STRING
124        for text in texts:
125            curr = self._curr
126            if curr and curr.token_type != string_type and curr.text.upper() == text:
127                self._advance()
128            else:
129                self._retreat(index)
130                return False
131
132        if not advance:
133            self._retreat(index)
134
135        return True
136
137    def _is_connected(self) -> bool:
138        prev = self._prev
139        curr = self._curr
140        return bool(prev and curr and prev.end + 1 == curr.start)
141
142    def _find_sql(self, start: Token, end: Token) -> str:
143        return self.sql[start.start : end.end + 1]
144
145    def raise_error(self, message: str, token: t.Optional[Token] = None) -> None:
146        token = token or self._curr or self._prev or Token.string("")
147        formatted_sql, start_context, highlight, end_context = highlight_sql(
148            sql=self.sql,
149            positions=[(token.start, token.end)],
150            context_length=self.error_message_context,
151        )
152        formatted_message = f"{message}. Line {token.line}, Col: {token.col}.\n  {formatted_sql}"
153
154        error = ParseError.new(
155            formatted_message,
156            description=message,
157            line=token.line,
158            col=token.col,
159            start_context=start_context,
160            highlight=highlight,
161            end_context=end_context,
162        )
163
164        if self.error_level == ErrorLevel.IMMEDIATE:
165            raise error
166
167        self.errors.append(error)
168
169    def validate_expression(self, expression: t.Any, args: t.Optional[t.List] = None) -> t.Any:
170        if self.error_level != ErrorLevel.IGNORE:
171            for error_message in expression.error_messages(args):
172                self.raise_error(error_message)
173        return expression
174
175    def _try_parse(self, parse_method: t.Callable, retreat: bool = False) -> t.Optional[t.Any]:
176        index = self._index
177        error_level = self.error_level
178        this: t.Optional[t.Any] = None
179
180        self.error_level = ErrorLevel.IMMEDIATE
181        try:
182            this = parse_method()
183        except ParseError:
184            this = None
185        finally:
186            if not this or retreat:
187                self._retreat(index)
188            self.error_level = error_level
189
190        return this
class ParserCore:
 10class ParserCore:
 11    __slots__ = (
 12        "error_level",
 13        "error_message_context",
 14        "max_errors",
 15        "dialect",
 16        "sql",
 17        "errors",
 18        "_tokens",
 19        "_index",
 20        "_curr",
 21        "_next",
 22        "_prev",
 23        "_prev_comments",
 24        "_pipe_cte_counter",
 25        "_chunks",
 26        "_chunk_index",
 27    )
 28
 29    def __init__(
 30        self,
 31        error_level: ErrorLevel,
 32        error_message_context: int,
 33        max_errors: int,
 34        dialect: t.Any,
 35    ) -> None:
 36        self.error_level: ErrorLevel = error_level
 37        self.error_message_context = error_message_context
 38        self.max_errors = max_errors
 39        self.dialect: t.Any = dialect
 40        self.reset()
 41
 42    def reset(self) -> None:
 43        self.sql: str = ""
 44        self.errors: t.List[ParseError] = []
 45        self._tokens: t.List[Token] = []
 46        self._index: int = 0
 47        self._curr: t.Optional[Token] = None
 48        self._next: t.Optional[Token] = None
 49        self._prev: t.Optional[Token] = None
 50        self._prev_comments: t.Optional[t.List[str]] = None
 51        self._pipe_cte_counter: int = 0
 52        self._chunks: t.List[t.List[Token]] = []
 53        self._chunk_index: int = 0
 54
 55    def _advance(self, times: int = 1) -> None:
 56        index = self._index + times
 57        self._index = index
 58        tokens = self._tokens
 59        size = len(tokens)
 60        self._curr = tokens[index] if index < size else None
 61        self._next = tokens[index + 1] if index + 1 < size else None
 62
 63        if index > 0:
 64            prev = tokens[index - 1]
 65            self._prev = prev
 66            self._prev_comments = prev.comments
 67        else:
 68            self._prev = None
 69            self._prev_comments = None
 70
 71    def _advance_chunk(self) -> None:
 72        self._index = -1
 73        self._tokens = self._chunks[self._chunk_index]
 74        self._chunk_index += 1
 75        self._advance()
 76
 77    def _retreat(self, index: int) -> None:
 78        if index != self._index:
 79            self._advance(index - self._index)
 80
 81    def _add_comments(self, expression: t.Any) -> None:
 82        if expression and self._prev_comments:
 83            expression.add_comments(self._prev_comments)
 84            self._prev_comments = None
 85
 86    def _match(self, token_type: TokenType, advance: bool = True, expression: t.Any = None) -> bool:
 87        curr = self._curr
 88        if curr and curr.token_type == token_type:
 89            if advance:
 90                self._advance()
 91            self._add_comments(expression)
 92            return True
 93        return False
 94
 95    def _match_set(self, types: t.Any, advance: bool = True) -> bool:
 96        curr = self._curr
 97        if curr and curr.token_type in types:
 98            if advance:
 99                self._advance()
100            return True
101        return False
102
103    def _match_pair(
104        self, token_type_a: TokenType, token_type_b: TokenType, advance: bool = True
105    ) -> bool:
106        curr = self._curr
107        next_ = self._next
108        if curr and next_ and curr.token_type == token_type_a and next_.token_type == token_type_b:
109            if advance:
110                self._advance(2)
111            return True
112        return False
113
114    def _match_texts(self, texts: t.Any, advance: bool = True) -> bool:
115        curr = self._curr
116        if curr and curr.token_type != TokenType.STRING and curr.text.upper() in texts:
117            if advance:
118                self._advance()
119            return True
120        return False
121
122    def _match_text_seq(self, *texts: str, advance: bool = True) -> bool:
123        index = self._index
124        string_type = TokenType.STRING
125        for text in texts:
126            curr = self._curr
127            if curr and curr.token_type != string_type and curr.text.upper() == text:
128                self._advance()
129            else:
130                self._retreat(index)
131                return False
132
133        if not advance:
134            self._retreat(index)
135
136        return True
137
138    def _is_connected(self) -> bool:
139        prev = self._prev
140        curr = self._curr
141        return bool(prev and curr and prev.end + 1 == curr.start)
142
143    def _find_sql(self, start: Token, end: Token) -> str:
144        return self.sql[start.start : end.end + 1]
145
146    def raise_error(self, message: str, token: t.Optional[Token] = None) -> None:
147        token = token or self._curr or self._prev or Token.string("")
148        formatted_sql, start_context, highlight, end_context = highlight_sql(
149            sql=self.sql,
150            positions=[(token.start, token.end)],
151            context_length=self.error_message_context,
152        )
153        formatted_message = f"{message}. Line {token.line}, Col: {token.col}.\n  {formatted_sql}"
154
155        error = ParseError.new(
156            formatted_message,
157            description=message,
158            line=token.line,
159            col=token.col,
160            start_context=start_context,
161            highlight=highlight,
162            end_context=end_context,
163        )
164
165        if self.error_level == ErrorLevel.IMMEDIATE:
166            raise error
167
168        self.errors.append(error)
169
170    def validate_expression(self, expression: t.Any, args: t.Optional[t.List] = None) -> t.Any:
171        if self.error_level != ErrorLevel.IGNORE:
172            for error_message in expression.error_messages(args):
173                self.raise_error(error_message)
174        return expression
175
176    def _try_parse(self, parse_method: t.Callable, retreat: bool = False) -> t.Optional[t.Any]:
177        index = self._index
178        error_level = self.error_level
179        this: t.Optional[t.Any] = None
180
181        self.error_level = ErrorLevel.IMMEDIATE
182        try:
183            this = parse_method()
184        except ParseError:
185            this = None
186        finally:
187            if not this or retreat:
188                self._retreat(index)
189            self.error_level = error_level
190
191        return this
ParserCore( error_level: sqlglot.errors.ErrorLevel, error_message_context: int, max_errors: int, dialect: Any)
29    def __init__(
30        self,
31        error_level: ErrorLevel,
32        error_message_context: int,
33        max_errors: int,
34        dialect: t.Any,
35    ) -> None:
36        self.error_level: ErrorLevel = error_level
37        self.error_message_context = error_message_context
38        self.max_errors = max_errors
39        self.dialect: t.Any = dialect
40        self.reset()
error_message_context
max_errors
dialect: Any
def reset(self) -> None:
42    def reset(self) -> None:
43        self.sql: str = ""
44        self.errors: t.List[ParseError] = []
45        self._tokens: t.List[Token] = []
46        self._index: int = 0
47        self._curr: t.Optional[Token] = None
48        self._next: t.Optional[Token] = None
49        self._prev: t.Optional[Token] = None
50        self._prev_comments: t.Optional[t.List[str]] = None
51        self._pipe_cte_counter: int = 0
52        self._chunks: t.List[t.List[Token]] = []
53        self._chunk_index: int = 0
def raise_error( self, message: str, token: Optional[sqlglot.tokenizer_core.Token] = None) -> None:
146    def raise_error(self, message: str, token: t.Optional[Token] = None) -> None:
147        token = token or self._curr or self._prev or Token.string("")
148        formatted_sql, start_context, highlight, end_context = highlight_sql(
149            sql=self.sql,
150            positions=[(token.start, token.end)],
151            context_length=self.error_message_context,
152        )
153        formatted_message = f"{message}. Line {token.line}, Col: {token.col}.\n  {formatted_sql}"
154
155        error = ParseError.new(
156            formatted_message,
157            description=message,
158            line=token.line,
159            col=token.col,
160            start_context=start_context,
161            highlight=highlight,
162            end_context=end_context,
163        )
164
165        if self.error_level == ErrorLevel.IMMEDIATE:
166            raise error
167
168        self.errors.append(error)
def validate_expression(self, expression: Any, args: Optional[List] = None) -> Any:
170    def validate_expression(self, expression: t.Any, args: t.Optional[t.List] = None) -> t.Any:
171        if self.error_level != ErrorLevel.IGNORE:
172            for error_message in expression.error_messages(args):
173                self.raise_error(error_message)
174        return expression
errors
sql