Skip to content

Commit

Permalink
Merge branch 'main' into sanitize-nulls
Browse files Browse the repository at this point in the history
  • Loading branch information
edgarrmondragon committed Sep 23, 2024
2 parents 975667b + d07b415 commit 330501b
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 57 deletions.
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ select = [
"ICN", # flake8-import-conventions
"RET", # flake8-return
"SIM", # flake8-simplify
"TCH", # flake8-type-checking
"PL", # Pylint
"PERF", # Perflint
"RUF", # ruff
]
Expand Down
40 changes: 17 additions & 23 deletions target_postgres/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

import atexit
import io
import itertools
import signal
import sys
import typing as t
from contextlib import contextmanager
from functools import cached_property
Expand Down Expand Up @@ -104,7 +106,7 @@ def sanitize_null_text_characters(self) -> bool:
"""
return self.config.get("sanitize_null_text_characters", False)

def prepare_table( # type: ignore[override]
def prepare_table( # type: ignore[override] # noqa: PLR0913
self,
full_table_name: str | FullyQualifiedName,
schema: dict,
Expand All @@ -130,7 +132,7 @@ def prepare_table( # type: ignore[override]
meta = sa.MetaData(schema=schema_name)
table: sa.Table
if not self.table_exists(full_table_name=full_table_name):
table = self.create_empty_table(
return self.create_empty_table(
table_name=table_name,
meta=meta,
schema=schema,
Expand All @@ -139,7 +141,6 @@ def prepare_table( # type: ignore[override]
as_temp_table=as_temp_table,
connection=connection,
)
return table
meta.reflect(connection, only=[table_name])
table = meta.tables[
full_table_name
Expand Down Expand Up @@ -278,7 +279,7 @@ def to_sql_type(self, jsonschema_type: dict) -> sa.types.TypeEngine: # type: ig

return PostgresConnector.pick_best_sql_type(sql_type_array=sql_type_array)

def pick_individual_type(self, jsonschema_type: dict):
def pick_individual_type(self, jsonschema_type: dict): # noqa: PLR0911
"""Select the correct sql type assuming jsonschema_type has only a single type.
Args:
Expand Down Expand Up @@ -316,11 +317,7 @@ def pick_individual_type(self, jsonschema_type: dict):
return ARRAY(self.to_sql_type({"type": items_type}))

# Case 3: tuples
if isinstance(items, list):
return ARRAY(JSONB())

# All other cases, return JSONB
return JSONB()
return ARRAY(JSONB()) if isinstance(items, list) else JSONB()

# string formats
if jsonschema_type.get("format") == "date-time":
Expand All @@ -333,9 +330,7 @@ def pick_individual_type(self, jsonschema_type: dict):
):
return HexByteString()
individual_type = th.to_sql_type(jsonschema_type)
if isinstance(individual_type, VARCHAR):
return TEXT()
return individual_type
return TEXT() if isinstance(individual_type, VARCHAR) else individual_type

@staticmethod
def pick_best_sql_type(sql_type_array: list):
Expand Down Expand Up @@ -364,13 +359,12 @@ def pick_best_sql_type(sql_type_array: list):
NOTYPE,
]

for sql_type in precedence_order:
for obj in sql_type_array:
if isinstance(obj, sql_type):
return obj
for sql_type, obj in itertools.product(precedence_order, sql_type_array):
if isinstance(obj, sql_type):
return obj
return TEXT()

def create_empty_table( # type: ignore[override]
def create_empty_table( # type: ignore[override] # noqa: PLR0913
self,
table_name: str,
meta: sa.MetaData,
Expand All @@ -384,7 +378,7 @@ def create_empty_table( # type: ignore[override]
Args:
table_name: the target table name.
meta: the SQLAchemy metadata object.
meta: the SQLAlchemy metadata object.
schema: the JSON schema for the new table.
connection: the database connection.
primary_keys: list of key properties.
Expand All @@ -406,7 +400,7 @@ def create_empty_table( # type: ignore[override]
raise RuntimeError(
f"Schema for table_name: '{table_name}'"
f"does not define properties: {schema}"
)
) from None

for property_name, property_jsonschema in properties.items():
is_primary_key = property_name in primary_keys
Expand Down Expand Up @@ -540,7 +534,7 @@ def get_column_add_ddl( # type: ignore[override]
},
)

def _adapt_column_type( # type: ignore[override]
def _adapt_column_type( # type: ignore[override] # noqa: PLR0913
self,
schema_name: str,
table_name: str,
Expand Down Expand Up @@ -583,7 +577,7 @@ def _adapt_column_type( # type: ignore[override]
return

# Not the same type, generic type or compatible types
# calling merge_sql_types for assistnace
# calling merge_sql_types for assistance
compatible_sql_type = self.merge_sql_types([current_type, sql_type])

if str(compatible_sql_type) == str(current_type):
Expand Down Expand Up @@ -678,7 +672,7 @@ def get_sqlalchemy_query(self, config: dict) -> dict:
# ssl_enable is for verifying the server's identity to the client.
if config["ssl_enable"]:
ssl_mode = config["ssl_mode"]
query.update({"sslmode": ssl_mode})
query["sslmode"] = ssl_mode
query["sslrootcert"] = self.filepath_or_certificate(
value=config["ssl_certificate_authority"],
alternative_name=config["ssl_storage_directory"] + "/root.crt",
Expand Down Expand Up @@ -773,7 +767,7 @@ def catch_signal(self, signum, frame) -> None:
signum: The signal number
frame: The current stack frame
"""
exit(1) # Calling this to be sure atexit is called, so clean_up gets called
sys.exit(1) # Calling this to be sure atexit is called, so clean_up gets called

def _get_column_type( # type: ignore[override]
self,
Expand Down
59 changes: 28 additions & 31 deletions target_postgres/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@

import sqlalchemy as sa
from singer_sdk.sinks import SQLSink
from sqlalchemy.sql import Executable
from sqlalchemy.sql.expression import bindparam

from target_postgres.connector import PostgresConnector

if t.TYPE_CHECKING:
from singer_sdk.connectors.sql import FullyQualifiedName
from sqlalchemy.sql import Executable


class PostgresSink(SQLSink):
Expand Down Expand Up @@ -52,10 +52,8 @@ def setup(self) -> None:
This method is called on Sink creation, and creates the required Schema and
Table entities in the target database.
"""
if self.key_properties is None or self.key_properties == []:
self.append_only = True
else:
self.append_only = False
self.append_only = self.key_properties is None or self.key_properties == []

if self.schema_name:
self.connector.prepare_schema(self.schema_name)
with self.connector._connect() as connection, connection.begin():
Expand Down Expand Up @@ -122,7 +120,7 @@ def generate_temp_table_name(self):
return f"{str(uuid.uuid4()).replace('-', '_')}"

def sanitize_null_text_characters(self, data):
"""Sanitizes null characters by replacing \u0000 with \ufffd"""
"""Sanitizes null characters by replacing \u0000 with \ufffd."""

def replace_null_character(d):
return d.replace("\u0000", "\ufffd")
Expand Down Expand Up @@ -182,29 +180,29 @@ def bulk_insert_records( # type: ignore[override]
if self.append_only is False:
insert_records: dict[str, dict] = {} # pk : record
for record in records:
insert_record = {}
for column in columns:
if self.connector.sanitize_null_text_characters:
insert_record[column.name] = self.sanitize_null_text_characters(
record.get(column.name)
)
else:
insert_record[column.name] = record.get(column.name)
insert_record = {
column.name: (
self.sanitize_null_text_characters(record.get(column.name))
if self.connector.sanitize_null_text_characters
else record.get(column.name)
)
for column in columns
}
# No need to check for a KeyError here because the SDK already
# guarantees that all key properties exist in the record.
primary_key_value = "".join([str(record[key]) for key in primary_keys])
insert_records[primary_key_value] = insert_record
data_to_insert = list(insert_records.values())
else:
for record in records:
insert_record = {}
for column in columns:
if self.connector.sanitize_null_text_characters:
insert_record[column.name] = self.sanitize_null_text_characters(
record.get(column.name)
)
else:
insert_record[column.name] = record.get(column.name)
insert_record = {
column.name: (
self.sanitize_null_text_characters(record.get(column.name))
if self.connector.sanitize_null_text_characters
else record.get(column.name)
)
for column in columns
}
data_to_insert.append(insert_record)
connection.execute(insert, data_to_insert)
return True
Expand Down Expand Up @@ -285,14 +283,13 @@ def column_representation(
schema: dict,
) -> list[sa.Column]:
"""Return a sqlalchemy table representation for the current schema."""
columns: list[sa.Column] = []
for property_name, property_jsonschema in schema["properties"].items():
columns.append(
sa.Column(
property_name,
self.connector.to_sql_type(property_jsonschema),
)
columns: list[sa.Column] = [
sa.Column(
property_name,
self.connector.to_sql_type(property_jsonschema),
)
for property_name, property_jsonschema in schema["properties"].items()
]
return columns

def generate_insert_statement(
Expand Down Expand Up @@ -322,12 +319,12 @@ def schema_name(self) -> str | None:
"""Return the schema name or `None` if using names with no schema part.
Note that after the next SDK release (after 0.14.0) we can remove this
as it's already upstreamed.
as it's already implemented upstream.
Returns:
The target schema name.
"""
# Look for a default_target_scheme in the configuraion fle
# Look for a default_target_scheme in the configuration fle
default_target_schema: str = self.config.get("default_target_schema", None)
parts = self.stream_name.split("-")

Expand Down
10 changes: 7 additions & 3 deletions target_postgres/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@

from __future__ import annotations

from pathlib import PurePath
import typing as t

from singer_sdk import typing as th
from singer_sdk.target_base import SQLTarget

from target_postgres.sinks import PostgresSink

if t.TYPE_CHECKING:
from pathlib import PurePath


class TargetPostgres(SQLTarget):
"""Target for Postgres."""
Expand Down Expand Up @@ -208,8 +211,9 @@ def __init__(
th.BooleanType,
default=False,
description=(
"If set to true, the target will sanitize null characters in char/text/varchar fields, as they "
"are not supported by Postgres. See [postgres documentation](https://www.postgresql.org/docs/current/functions-string.html) "
"If set to true, the target will sanitize null characters in "
"char/text/varchar fields, as they are not supported by Postgres. "
"See [postgres documentation](https://www.postgresql.org/docs/current/functions-string.html) " # noqa: E501
"for more information about chr(0) not being supported."
),
),
Expand Down

0 comments on commit 330501b

Please sign in to comment.