Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Enable PL Ruff rules #445

Merged
merged 1 commit into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ select = [
"RET", # flake8-return
"SIM", # flake8-simplify
"TCH", # flake8-type-checking
"PL", # Pylint
"PERF", # Perflint
"RUF", # ruff
]
Expand Down
36 changes: 15 additions & 21 deletions target_postgres/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

import atexit
import io
import itertools
import signal
import sys
import typing as t
from contextlib import contextmanager
from functools import cached_property
Expand Down Expand Up @@ -95,7 +97,7 @@ def interpret_content_encoding(self) -> bool:
"""
return self.config.get("interpret_content_encoding", False)

def prepare_table( # type: ignore[override]
def prepare_table( # type: ignore[override] # noqa: PLR0913
self,
full_table_name: str | FullyQualifiedName,
schema: dict,
Expand All @@ -121,7 +123,7 @@ def prepare_table( # type: ignore[override]
meta = sa.MetaData(schema=schema_name)
table: sa.Table
if not self.table_exists(full_table_name=full_table_name):
table = self.create_empty_table(
return self.create_empty_table(
table_name=table_name,
meta=meta,
schema=schema,
Expand All @@ -130,7 +132,6 @@ def prepare_table( # type: ignore[override]
as_temp_table=as_temp_table,
connection=connection,
)
return table
meta.reflect(connection, only=[table_name])
table = meta.tables[
full_table_name
Expand Down Expand Up @@ -269,7 +270,7 @@ def to_sql_type(self, jsonschema_type: dict) -> sa.types.TypeEngine: # type: ig

return PostgresConnector.pick_best_sql_type(sql_type_array=sql_type_array)

def pick_individual_type(self, jsonschema_type: dict):
def pick_individual_type(self, jsonschema_type: dict): # noqa: PLR0911
"""Select the correct sql type assuming jsonschema_type has only a single type.

Args:
Expand Down Expand Up @@ -307,11 +308,7 @@ def pick_individual_type(self, jsonschema_type: dict):
return ARRAY(self.to_sql_type({"type": items_type}))

# Case 3: tuples
if isinstance(items, list):
return ARRAY(JSONB())

# All other cases, return JSONB
return JSONB()
return ARRAY(JSONB()) if isinstance(items, list) else JSONB()

# string formats
if jsonschema_type.get("format") == "date-time":
Expand All @@ -324,9 +321,7 @@ def pick_individual_type(self, jsonschema_type: dict):
):
return HexByteString()
individual_type = th.to_sql_type(jsonschema_type)
if isinstance(individual_type, VARCHAR):
return TEXT()
return individual_type
return TEXT() if isinstance(individual_type, VARCHAR) else individual_type

@staticmethod
def pick_best_sql_type(sql_type_array: list):
Expand Down Expand Up @@ -355,13 +350,12 @@ def pick_best_sql_type(sql_type_array: list):
NOTYPE,
]

for sql_type in precedence_order:
for obj in sql_type_array:
if isinstance(obj, sql_type):
return obj
for sql_type, obj in itertools.product(precedence_order, sql_type_array):
if isinstance(obj, sql_type):
return obj
return TEXT()

def create_empty_table( # type: ignore[override]
def create_empty_table( # type: ignore[override] # noqa: PLR0913
self,
table_name: str,
meta: sa.MetaData,
Expand Down Expand Up @@ -397,7 +391,7 @@ def create_empty_table( # type: ignore[override]
raise RuntimeError(
f"Schema for table_name: '{table_name}'"
f"does not define properties: {schema}"
)
) from None

for property_name, property_jsonschema in properties.items():
is_primary_key = property_name in primary_keys
Expand Down Expand Up @@ -531,7 +525,7 @@ def get_column_add_ddl( # type: ignore[override]
},
)

def _adapt_column_type( # type: ignore[override]
def _adapt_column_type( # type: ignore[override] # noqa: PLR0913
self,
schema_name: str,
table_name: str,
Expand Down Expand Up @@ -669,7 +663,7 @@ def get_sqlalchemy_query(self, config: dict) -> dict:
# ssl_enable is for verifying the server's identity to the client.
if config["ssl_enable"]:
ssl_mode = config["ssl_mode"]
query.update({"sslmode": ssl_mode})
query["sslmode"] = ssl_mode
query["sslrootcert"] = self.filepath_or_certificate(
value=config["ssl_certificate_authority"],
alternative_name=config["ssl_storage_directory"] + "/root.crt",
Expand Down Expand Up @@ -764,7 +758,7 @@ def catch_signal(self, signum, frame) -> None:
signum: The signal number
frame: The current stack frame
"""
exit(1) # Calling this to be sure atexit is called, so clean_up gets called
sys.exit(1) # Calling this to be sure atexit is called, so clean_up gets called

def _get_column_type( # type: ignore[override]
self,
Expand Down
27 changes: 13 additions & 14 deletions target_postgres/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,19 +159,19 @@ def bulk_insert_records( # type: ignore[override]
if self.append_only is False:
insert_records: dict[str, dict] = {} # pk : record
for record in records:
insert_record = {}
for column in columns:
insert_record[column.name] = record.get(column.name)
insert_record = {
column.name: record.get(column.name) for column in columns
}
# No need to check for a KeyError here because the SDK already
# guarantees that all key properties exist in the record.
primary_key_value = "".join([str(record[key]) for key in primary_keys])
insert_records[primary_key_value] = insert_record
data_to_insert = list(insert_records.values())
else:
for record in records:
insert_record = {}
for column in columns:
insert_record[column.name] = record.get(column.name)
insert_record = {
column.name: record.get(column.name) for column in columns
}
data_to_insert.append(insert_record)
connection.execute(insert, data_to_insert)
return True
Expand Down Expand Up @@ -252,14 +252,13 @@ def column_representation(
schema: dict,
) -> list[sa.Column]:
"""Return a sqlalchemy table representation for the current schema."""
columns: list[sa.Column] = []
for property_name, property_jsonschema in schema["properties"].items():
columns.append(
sa.Column(
property_name,
self.connector.to_sql_type(property_jsonschema),
)
columns: list[sa.Column] = [
sa.Column(
property_name,
self.connector.to_sql_type(property_jsonschema),
)
for property_name, property_jsonschema in schema["properties"].items()
]
return columns

def generate_insert_statement(
Expand Down Expand Up @@ -289,7 +288,7 @@ def schema_name(self) -> str | None:
"""Return the schema name or `None` if using names with no schema part.

Note that after the next SDK release (after 0.14.0) we can remove this
as it's already upstreamed.
as it's already implemented upstream.

Returns:
The target schema name.
Expand Down