diff --git a/dbt/adapters/sql/connections.py b/dbt/adapters/sql/connections.py index f5c0f03a2..0c6797cf3 100644 --- a/dbt/adapters/sql/connections.py +++ b/dbt/adapters/sql/connections.py @@ -1,6 +1,18 @@ import abc import time -from typing import Any, Dict, Iterable, Iterator, List, Optional, Tuple, TYPE_CHECKING, Callable, Type, Union +from typing import ( + Any, + Dict, + Iterable, + Iterator, + List, + Optional, + Tuple, + TYPE_CHECKING, + Callable, + Type, + Union, +) from dbt_common.events.contextvars import get_node_info from dbt_common.events.functions import fire_event @@ -14,12 +26,12 @@ Connection, ConnectionState, ) -from dbt.adapters.events.logging import AdapterLogger from dbt.adapters.events.types import ( ConnectionUsed, SQLCommit, SQLQuery, - SQLQueryStatus, AdapterEventDebug, + SQLQueryStatus, + AdapterEventDebug, ) if TYPE_CHECKING: @@ -58,14 +70,14 @@ def cancel_open(self) -> List[str]: return names def add_query( - self, - sql: str, - auto_begin: bool = True, - bindings: Optional[Any] = None, - abridge_sql_log: bool = False, - retryable_exceptions: Iterable[Type[Exception]] = [], - retry_limit: int = 1, - retry_timeout: Union[Callable[[int], SleepTime], SleepTime] = 1, + self, + sql: str, + auto_begin: bool = True, + bindings: Optional[Any] = None, + abridge_sql_log: bool = False, + retryable_exceptions: Iterable[Type[Exception]] = [], + retry_limit: int = 1, + retry_timeout: Union[Callable[[int], SleepTime], SleepTime] = 1, ) -> Tuple[Connection, Any]: connection = self.get_thread_connection() if auto_begin and connection.transaction_open is False: @@ -96,12 +108,12 @@ def add_query( cursor = connection.handle.cursor() self._retryable_cursor_execute( - execute_fn=cursor.execute, - sql=sql, - bindings=bindings, - retryable_exceptions=retryable_exceptions, - retry_limit=retry_limit, - retry_timeout=retry_timeout + execute_fn=cursor.execute, + sql=sql, + bindings=bindings, + retryable_exceptions=retryable_exceptions, + retry_limit=retry_limit, + retry_timeout=retry_timeout, ) result = self.get_response(cursor) @@ -125,7 +137,7 @@ def get_response(cls, cursor: Any) -> AdapterResponse: @classmethod def process_results( - cls, column_names: Iterable[str], rows: Iterable[Any] + cls, column_names: Iterable[str], rows: Iterable[Any] ) -> Iterator[Dict[str, Any]]: unique_col_names = dict() # type: ignore[var-annotated] for idx in range(len(column_names)): # type: ignore[arg-type] @@ -157,11 +169,11 @@ def get_result_from_cursor(cls, cursor: Any, limit: Optional[int]) -> "agate.Tab return table_from_data_flat(data, column_names) def execute( - self, - sql: str, - auto_begin: bool = False, - fetch: bool = False, - limit: Optional[int] = None, + self, + sql: str, + auto_begin: bool = False, + fetch: bool = False, + limit: Optional[int] = None, ) -> Tuple[AdapterResponse, "agate.Table"]: from dbt_common.clients.agate_helper import empty_table @@ -212,20 +224,19 @@ def commit(self): return connection - def _retryable_cursor_execute(self, - execute_fn: Callable, - sql: str, - bindings: Optional[Any] = None, - retryable_exceptions: Iterable[Type[Exception]] = [], - retry_limit: int = 1, - retry_timeout: Union[Callable[[int], SleepTime], SleepTime] = 1, - _attempts: int = 0, - ) -> None: + def _retryable_cursor_execute( + self, + execute_fn: Callable, + sql: str, + bindings: Optional[Any] = None, + retryable_exceptions: Iterable[Type[Exception]] = [], + retry_limit: int = 1, + retry_timeout: Union[Callable[[int], SleepTime], SleepTime] = 1, + _attempts: int = 0, + ) -> None: timeout = retry_timeout(_attempts) if callable(retry_timeout) else retry_timeout if timeout < 0: - raise DbtRuntimeError( - "retry_timeout cannot be negative or return a negative time." - ) + raise DbtRuntimeError("retry_timeout cannot be negative or return a negative time.") try: execute_fn(sql, bindings) @@ -237,8 +248,8 @@ def _retryable_cursor_execute(self, fire_event( AdapterEventDebug( message=f"Got a retryable error {type(e)} when attempting to execute a query.\n" - f"{retry_limit} attempts remaining. Retrying in {timeout} seconds.\n" - f"Error:\n{e}" + f"{retry_limit} attempts remaining. Retrying in {timeout} seconds.\n" + f"Error:\n{e}" ) )