From 2a822bd6214694d5e6bb1fc22498f4e823bc3ecd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Mon, 23 Sep 2024 12:35:07 -0600 Subject: [PATCH] chore: Enable `PL` Ruff rules --- pyproject.toml | 1 + target_postgres/connector.py | 36 +++++++++++++++--------------------- target_postgres/sinks.py | 27 +++++++++++++-------------- 3 files changed, 29 insertions(+), 35 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 986d791e..2f11781c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -97,6 +97,7 @@ select = [ "RET", # flake8-return "SIM", # flake8-simplify "TCH", # flake8-type-checking + "PL", # Pylint "PERF", # Perflint "RUF", # ruff ] diff --git a/target_postgres/connector.py b/target_postgres/connector.py index d1105398..6ba09b9e 100644 --- a/target_postgres/connector.py +++ b/target_postgres/connector.py @@ -4,7 +4,9 @@ import atexit import io +import itertools import signal +import sys import typing as t from contextlib import contextmanager from functools import cached_property @@ -95,7 +97,7 @@ def interpret_content_encoding(self) -> bool: """ return self.config.get("interpret_content_encoding", False) - def prepare_table( # type: ignore[override] + def prepare_table( # type: ignore[override] # noqa: PLR0913 self, full_table_name: str | FullyQualifiedName, schema: dict, @@ -121,7 +123,7 @@ def prepare_table( # type: ignore[override] meta = sa.MetaData(schema=schema_name) table: sa.Table if not self.table_exists(full_table_name=full_table_name): - table = self.create_empty_table( + return self.create_empty_table( table_name=table_name, meta=meta, schema=schema, @@ -130,7 +132,6 @@ def prepare_table( # type: ignore[override] as_temp_table=as_temp_table, connection=connection, ) - return table meta.reflect(connection, only=[table_name]) table = meta.tables[ full_table_name @@ -269,7 +270,7 @@ def to_sql_type(self, jsonschema_type: dict) -> sa.types.TypeEngine: # type: ig return PostgresConnector.pick_best_sql_type(sql_type_array=sql_type_array) - def pick_individual_type(self, jsonschema_type: dict): + def pick_individual_type(self, jsonschema_type: dict): # noqa: PLR0911 """Select the correct sql type assuming jsonschema_type has only a single type. Args: @@ -307,11 +308,7 @@ def pick_individual_type(self, jsonschema_type: dict): return ARRAY(self.to_sql_type({"type": items_type})) # Case 3: tuples - if isinstance(items, list): - return ARRAY(JSONB()) - - # All other cases, return JSONB - return JSONB() + return ARRAY(JSONB()) if isinstance(items, list) else JSONB() # string formats if jsonschema_type.get("format") == "date-time": @@ -324,9 +321,7 @@ def pick_individual_type(self, jsonschema_type: dict): ): return HexByteString() individual_type = th.to_sql_type(jsonschema_type) - if isinstance(individual_type, VARCHAR): - return TEXT() - return individual_type + return TEXT() if isinstance(individual_type, VARCHAR) else individual_type @staticmethod def pick_best_sql_type(sql_type_array: list): @@ -355,13 +350,12 @@ def pick_best_sql_type(sql_type_array: list): NOTYPE, ] - for sql_type in precedence_order: - for obj in sql_type_array: - if isinstance(obj, sql_type): - return obj + for sql_type, obj in itertools.product(precedence_order, sql_type_array): + if isinstance(obj, sql_type): + return obj return TEXT() - def create_empty_table( # type: ignore[override] + def create_empty_table( # type: ignore[override] # noqa: PLR0913 self, table_name: str, meta: sa.MetaData, @@ -397,7 +391,7 @@ def create_empty_table( # type: ignore[override] raise RuntimeError( f"Schema for table_name: '{table_name}'" f"does not define properties: {schema}" - ) + ) from None for property_name, property_jsonschema in properties.items(): is_primary_key = property_name in primary_keys @@ -531,7 +525,7 @@ def get_column_add_ddl( # type: ignore[override] }, ) - def _adapt_column_type( # type: ignore[override] + def _adapt_column_type( # type: ignore[override] # noqa: PLR0913 self, schema_name: str, table_name: str, @@ -669,7 +663,7 @@ def get_sqlalchemy_query(self, config: dict) -> dict: # ssl_enable is for verifying the server's identity to the client. if config["ssl_enable"]: ssl_mode = config["ssl_mode"] - query.update({"sslmode": ssl_mode}) + query["sslmode"] = ssl_mode query["sslrootcert"] = self.filepath_or_certificate( value=config["ssl_certificate_authority"], alternative_name=config["ssl_storage_directory"] + "/root.crt", @@ -764,7 +758,7 @@ def catch_signal(self, signum, frame) -> None: signum: The signal number frame: The current stack frame """ - exit(1) # Calling this to be sure atexit is called, so clean_up gets called + sys.exit(1) # Calling this to be sure atexit is called, so clean_up gets called def _get_column_type( # type: ignore[override] self, diff --git a/target_postgres/sinks.py b/target_postgres/sinks.py index 617b3089..f52d6c78 100644 --- a/target_postgres/sinks.py +++ b/target_postgres/sinks.py @@ -159,9 +159,9 @@ def bulk_insert_records( # type: ignore[override] if self.append_only is False: insert_records: dict[str, dict] = {} # pk : record for record in records: - insert_record = {} - for column in columns: - insert_record[column.name] = record.get(column.name) + insert_record = { + column.name: record.get(column.name) for column in columns + } # No need to check for a KeyError here because the SDK already # guarantees that all key properties exist in the record. primary_key_value = "".join([str(record[key]) for key in primary_keys]) @@ -169,9 +169,9 @@ def bulk_insert_records( # type: ignore[override] data_to_insert = list(insert_records.values()) else: for record in records: - insert_record = {} - for column in columns: - insert_record[column.name] = record.get(column.name) + insert_record = { + column.name: record.get(column.name) for column in columns + } data_to_insert.append(insert_record) connection.execute(insert, data_to_insert) return True @@ -252,14 +252,13 @@ def column_representation( schema: dict, ) -> list[sa.Column]: """Return a sqlalchemy table representation for the current schema.""" - columns: list[sa.Column] = [] - for property_name, property_jsonschema in schema["properties"].items(): - columns.append( - sa.Column( - property_name, - self.connector.to_sql_type(property_jsonschema), - ) + columns: list[sa.Column] = [ + sa.Column( + property_name, + self.connector.to_sql_type(property_jsonschema), ) + for property_name, property_jsonschema in schema["properties"].items() + ] return columns def generate_insert_statement( @@ -289,7 +288,7 @@ def schema_name(self) -> str | None: """Return the schema name or `None` if using names with no schema part. Note that after the next SDK release (after 0.14.0) we can remove this - as it's already upstreamed. + as it's already implemented upstream. Returns: The target schema name.