Skip to content

Commit

Permalink
pre-commit check
Browse files Browse the repository at this point in the history
  • Loading branch information
zqWu committed Mar 15, 2023
1 parent a69e843 commit c038f84
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 8 deletions.
2 changes: 1 addition & 1 deletion dbt/adapters/flink/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def _read_session_handle(cls, credentials: FlinkCredentials) -> Optional[SqlGate
)

if (
datetime.now() - session_timestamp
datetime.now() - session_timestamp
).seconds > credentials.session_idle_timeout_s:
logger.info("Stored session has timeout.")
return None
Expand Down
2 changes: 1 addition & 1 deletion dbt/adapters/flink/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def get_columns_in_relation(self, relation: BaseRelation) -> List[BaseColumn]:
def is_cancelable(cls) -> bool:
return False # TODO

def list_relations_without_caching(self, schema_relation: BaseRelation) -> List[FlinkRelation]:
def list_relations_without_caching(self, schema_relation: BaseRelation):
database = schema_relation.path.database
if not database:
raise RuntimeError("database(flink catalog) should not be empty")
Expand Down
18 changes: 12 additions & 6 deletions flink/sqlgateway/schema_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ class SchemaOperation:
"""

@staticmethod
def use_catalog_database(session: SqlGatewaySession, catalog: str = None, database: str = None) -> bool:
def use_catalog_database(
session: SqlGatewaySession, catalog: str = None, database: str = None
) -> bool:
if catalog:
if database:
sql = f"use {catalog}.{database}"
Expand Down Expand Up @@ -89,7 +91,7 @@ def show_current_databases(session: SqlGatewaySession) -> str:
@staticmethod
def show_databases(session: SqlGatewaySession, catalog: str) -> List[str]:
need_switch_catalog = False
current_catalog = ''
current_catalog = ""

if catalog:
current_catalog = SchemaOperation.show_current_catalogs(session)
Expand All @@ -100,7 +102,9 @@ def show_databases(session: SqlGatewaySession, catalog: str) -> List[str]:
try:
SchemaOperation.use_catalog_database(session, catalog)
except Exception as e:
raise RuntimeError(f"fail list database in {catalog}, please check {catalog} exists in this session")
raise RuntimeError(
f"fail list database in {catalog}, please check {catalog} exists in this session"
)
dbs = SchemaOperation.show_xxx(session, sql_show_databases)

# switch back database if necessary
Expand Down Expand Up @@ -131,12 +135,12 @@ def show_relations(session, catalog: str, database: str) -> Tuple[List[str], Lis
return real_tables, views

@staticmethod
def show_tables(session: SqlGatewaySession, catalog: str = None, database: str = None) -> List[str]:
def show_tables(session: SqlGatewaySession, catalog: str, database: str) -> List[str]:
real_tables, _ = SchemaOperation.show_relations(session, catalog, database)
return real_tables

@staticmethod
def show_views(session: SqlGatewaySession, catalog: str = None, database: str = None) -> List[str]:
def show_views(session: SqlGatewaySession, catalog: str, database: str) -> List[str]:
# NO such sql: show views in|from catalog.database
return SchemaOperation.show_xxx(session, sql_show_views)

Expand All @@ -146,7 +150,9 @@ def show_xxx(session: SqlGatewaySession, sql: str) -> List[str]:
return SchemaOperation.execute_sql_collect_all_result(session, sql, collector)

@staticmethod
def execute_sql_collect_all_result(session: SqlGatewaySession, sql: str, collector: ResultCollector):
def execute_sql_collect_all_result(
session: SqlGatewaySession, sql: str, collector: ResultCollector
):
operation = SchemaOperation.execute_sql_and_wait_end(session, sql)

try:
Expand Down

0 comments on commit c038f84

Please sign in to comment.