From 03ac31d2f2c00c19ad274a4f27a47565ce248c33 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez=20Mondrag=C3=B3n?= <16805946+edgarrmondragon@users.noreply.github.com> Date: Mon, 23 Sep 2024 12:24:35 -0600 Subject: [PATCH 1/2] chore: Enable `TCH` Ruff rule (#444) --- pyproject.toml | 1 + target_postgres/connector.py | 4 ++-- target_postgres/sinks.py | 12 +++++------- target_postgres/target.py | 5 ++++- 4 files changed, 12 insertions(+), 10 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index d9d09009..986d791e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -96,6 +96,7 @@ select = [ "ICN", # flake8-import-conventions "RET", # flake8-return "SIM", # flake8-simplify + "TCH", # flake8-type-checking "PERF", # Perflint "RUF", # ruff ] diff --git a/target_postgres/connector.py b/target_postgres/connector.py index 9fa09b25..d1105398 100644 --- a/target_postgres/connector.py +++ b/target_postgres/connector.py @@ -375,7 +375,7 @@ def create_empty_table( # type: ignore[override] Args: table_name: the target table name. - meta: the SQLAchemy metadata object. + meta: the SQLAlchemy metadata object. schema: the JSON schema for the new table. connection: the database connection. primary_keys: list of key properties. @@ -574,7 +574,7 @@ def _adapt_column_type( # type: ignore[override] return # Not the same type, generic type or compatible types - # calling merge_sql_types for assistnace + # calling merge_sql_types for assistance compatible_sql_type = self.merge_sql_types([current_type, sql_type]) if str(compatible_sql_type) == str(current_type): diff --git a/target_postgres/sinks.py b/target_postgres/sinks.py index 7f52995b..617b3089 100644 --- a/target_postgres/sinks.py +++ b/target_postgres/sinks.py @@ -8,13 +8,13 @@ import sqlalchemy as sa from singer_sdk.sinks import SQLSink -from sqlalchemy.sql import Executable from sqlalchemy.sql.expression import bindparam from target_postgres.connector import PostgresConnector if t.TYPE_CHECKING: from singer_sdk.connectors.sql import FullyQualifiedName + from sqlalchemy.sql import Executable class PostgresSink(SQLSink): @@ -52,10 +52,8 @@ def setup(self) -> None: This method is called on Sink creation, and creates the required Schema and Table entities in the target database. """ - if self.key_properties is None or self.key_properties == []: - self.append_only = True - else: - self.append_only = False + self.append_only = self.key_properties is None or self.key_properties == [] + if self.schema_name: self.connector.prepare_schema(self.schema_name) with self.connector._connect() as connection, connection.begin(): @@ -165,7 +163,7 @@ def bulk_insert_records( # type: ignore[override] for column in columns: insert_record[column.name] = record.get(column.name) # No need to check for a KeyError here because the SDK already - # guaruntees that all key properties exist in the record. + # guarantees that all key properties exist in the record. primary_key_value = "".join([str(record[key]) for key in primary_keys]) insert_records[primary_key_value] = insert_record data_to_insert = list(insert_records.values()) @@ -296,7 +294,7 @@ def schema_name(self) -> str | None: Returns: The target schema name. """ - # Look for a default_target_scheme in the configuraion fle + # Look for a default_target_scheme in the configuration fle default_target_schema: str = self.config.get("default_target_schema", None) parts = self.stream_name.split("-") diff --git a/target_postgres/target.py b/target_postgres/target.py index 809dbd86..2cc0b714 100644 --- a/target_postgres/target.py +++ b/target_postgres/target.py @@ -2,13 +2,16 @@ from __future__ import annotations -from pathlib import PurePath +import typing as t from singer_sdk import typing as th from singer_sdk.target_base import SQLTarget from target_postgres.sinks import PostgresSink +if t.TYPE_CHECKING: + from pathlib import PurePath + class TargetPostgres(SQLTarget): """Target for Postgres.""" From d07b41583e8ff77ee770a0d40779ea9485772461 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez=20Mondrag=C3=B3n?= <16805946+edgarrmondragon@users.noreply.github.com> Date: Mon, 23 Sep 2024 12:42:06 -0600 Subject: [PATCH 2/2] chore: Enable `PL` Ruff rules (#445) --- pyproject.toml | 1 + target_postgres/connector.py | 36 +++++++++++++++--------------------- target_postgres/sinks.py | 27 +++++++++++++-------------- 3 files changed, 29 insertions(+), 35 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 986d791e..2f11781c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -97,6 +97,7 @@ select = [ "RET", # flake8-return "SIM", # flake8-simplify "TCH", # flake8-type-checking + "PL", # Pylint "PERF", # Perflint "RUF", # ruff ] diff --git a/target_postgres/connector.py b/target_postgres/connector.py index d1105398..6ba09b9e 100644 --- a/target_postgres/connector.py +++ b/target_postgres/connector.py @@ -4,7 +4,9 @@ import atexit import io +import itertools import signal +import sys import typing as t from contextlib import contextmanager from functools import cached_property @@ -95,7 +97,7 @@ def interpret_content_encoding(self) -> bool: """ return self.config.get("interpret_content_encoding", False) - def prepare_table( # type: ignore[override] + def prepare_table( # type: ignore[override] # noqa: PLR0913 self, full_table_name: str | FullyQualifiedName, schema: dict, @@ -121,7 +123,7 @@ def prepare_table( # type: ignore[override] meta = sa.MetaData(schema=schema_name) table: sa.Table if not self.table_exists(full_table_name=full_table_name): - table = self.create_empty_table( + return self.create_empty_table( table_name=table_name, meta=meta, schema=schema, @@ -130,7 +132,6 @@ def prepare_table( # type: ignore[override] as_temp_table=as_temp_table, connection=connection, ) - return table meta.reflect(connection, only=[table_name]) table = meta.tables[ full_table_name @@ -269,7 +270,7 @@ def to_sql_type(self, jsonschema_type: dict) -> sa.types.TypeEngine: # type: ig return PostgresConnector.pick_best_sql_type(sql_type_array=sql_type_array) - def pick_individual_type(self, jsonschema_type: dict): + def pick_individual_type(self, jsonschema_type: dict): # noqa: PLR0911 """Select the correct sql type assuming jsonschema_type has only a single type. Args: @@ -307,11 +308,7 @@ def pick_individual_type(self, jsonschema_type: dict): return ARRAY(self.to_sql_type({"type": items_type})) # Case 3: tuples - if isinstance(items, list): - return ARRAY(JSONB()) - - # All other cases, return JSONB - return JSONB() + return ARRAY(JSONB()) if isinstance(items, list) else JSONB() # string formats if jsonschema_type.get("format") == "date-time": @@ -324,9 +321,7 @@ def pick_individual_type(self, jsonschema_type: dict): ): return HexByteString() individual_type = th.to_sql_type(jsonschema_type) - if isinstance(individual_type, VARCHAR): - return TEXT() - return individual_type + return TEXT() if isinstance(individual_type, VARCHAR) else individual_type @staticmethod def pick_best_sql_type(sql_type_array: list): @@ -355,13 +350,12 @@ def pick_best_sql_type(sql_type_array: list): NOTYPE, ] - for sql_type in precedence_order: - for obj in sql_type_array: - if isinstance(obj, sql_type): - return obj + for sql_type, obj in itertools.product(precedence_order, sql_type_array): + if isinstance(obj, sql_type): + return obj return TEXT() - def create_empty_table( # type: ignore[override] + def create_empty_table( # type: ignore[override] # noqa: PLR0913 self, table_name: str, meta: sa.MetaData, @@ -397,7 +391,7 @@ def create_empty_table( # type: ignore[override] raise RuntimeError( f"Schema for table_name: '{table_name}'" f"does not define properties: {schema}" - ) + ) from None for property_name, property_jsonschema in properties.items(): is_primary_key = property_name in primary_keys @@ -531,7 +525,7 @@ def get_column_add_ddl( # type: ignore[override] }, ) - def _adapt_column_type( # type: ignore[override] + def _adapt_column_type( # type: ignore[override] # noqa: PLR0913 self, schema_name: str, table_name: str, @@ -669,7 +663,7 @@ def get_sqlalchemy_query(self, config: dict) -> dict: # ssl_enable is for verifying the server's identity to the client. if config["ssl_enable"]: ssl_mode = config["ssl_mode"] - query.update({"sslmode": ssl_mode}) + query["sslmode"] = ssl_mode query["sslrootcert"] = self.filepath_or_certificate( value=config["ssl_certificate_authority"], alternative_name=config["ssl_storage_directory"] + "/root.crt", @@ -764,7 +758,7 @@ def catch_signal(self, signum, frame) -> None: signum: The signal number frame: The current stack frame """ - exit(1) # Calling this to be sure atexit is called, so clean_up gets called + sys.exit(1) # Calling this to be sure atexit is called, so clean_up gets called def _get_column_type( # type: ignore[override] self, diff --git a/target_postgres/sinks.py b/target_postgres/sinks.py index 617b3089..f52d6c78 100644 --- a/target_postgres/sinks.py +++ b/target_postgres/sinks.py @@ -159,9 +159,9 @@ def bulk_insert_records( # type: ignore[override] if self.append_only is False: insert_records: dict[str, dict] = {} # pk : record for record in records: - insert_record = {} - for column in columns: - insert_record[column.name] = record.get(column.name) + insert_record = { + column.name: record.get(column.name) for column in columns + } # No need to check for a KeyError here because the SDK already # guarantees that all key properties exist in the record. primary_key_value = "".join([str(record[key]) for key in primary_keys]) @@ -169,9 +169,9 @@ def bulk_insert_records( # type: ignore[override] data_to_insert = list(insert_records.values()) else: for record in records: - insert_record = {} - for column in columns: - insert_record[column.name] = record.get(column.name) + insert_record = { + column.name: record.get(column.name) for column in columns + } data_to_insert.append(insert_record) connection.execute(insert, data_to_insert) return True @@ -252,14 +252,13 @@ def column_representation( schema: dict, ) -> list[sa.Column]: """Return a sqlalchemy table representation for the current schema.""" - columns: list[sa.Column] = [] - for property_name, property_jsonschema in schema["properties"].items(): - columns.append( - sa.Column( - property_name, - self.connector.to_sql_type(property_jsonschema), - ) + columns: list[sa.Column] = [ + sa.Column( + property_name, + self.connector.to_sql_type(property_jsonschema), ) + for property_name, property_jsonschema in schema["properties"].items() + ] return columns def generate_insert_statement( @@ -289,7 +288,7 @@ def schema_name(self) -> str | None: """Return the schema name or `None` if using names with no schema part. Note that after the next SDK release (after 0.14.0) we can remove this - as it's already upstreamed. + as it's already implemented upstream. Returns: The target schema name.