Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Use Psycopg3 COPY #451

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 28 additions & 28 deletions README.md

Large diffs are not rendered by default.

672 changes: 383 additions & 289 deletions poetry.lock

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@ packages = [

[tool.poetry.dependencies]
python = ">=3.8"
faker = {version = "~=30.0", optional = true}
psycopg2-binary = "2.9.9"
faker = {version = "~=29.0", optional = true}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to downgrade this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope! My branch was slightly outdated 😅

sqlalchemy = "~=2.0"
sshtunnel = "0.4.0"
psycopg = "^3.2.3"
psycopg-binary = "^3.2.3"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason we both the source and binary packages?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@edgarrmondragon This can be changed to just psycopg. Although now I am wondering if a user wants to use psycopg[c] or psycopg[binary] what would be the suggestion?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dbt-labs/dbt-postgres#96 is probably a good case study. Most users in the data space can't or don't want to build C extensions, so we'll probably prefer psycopg[binary].


[tool.poetry.dependencies.singer-sdk]
version = "~=0.40.0a1"
Expand Down Expand Up @@ -109,4 +110,4 @@ banned-from = ["sqlalchemy"]
sqlalchemy = "sa"

[tool.ruff.lint.pydocstyle]
convention = "google"
convention = "google"
SpaceCondor marked this conversation as resolved.
Show resolved Hide resolved
69 changes: 54 additions & 15 deletions target_postgres/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,25 @@ def generate_temp_table_name(self):
# in postgres, used a guid just in case we are using the same session
return f"{str(uuid.uuid4()).replace('-', '_')}"

def generate_copy_statement(
self,
full_table_name: str | FullyQualifiedName,
columns: list[sa.Column], # type: ignore[override]
) -> str:
"""Generate a copy statement for bulk copy.

Args:
full_table_name: the target table name.
columns: the target table columns.

Returns:
A copy statement.
"""
columns_list = ", ".join(f'"{column.name}"' for column in columns)
sql: str = f'COPY "{full_table_name}" ({columns_list}) FROM STDIN'

return sql

def bulk_insert_records( # type: ignore[override]
self,
table: sa.Table,
Expand All @@ -145,35 +164,55 @@ def bulk_insert_records( # type: ignore[override]
True if table exists, False if not, None if unsure or undetectable.
"""
columns = self.column_representation(schema)
insert: str = t.cast(
str,
self.generate_insert_statement(
table.name,
columns,
),
)
self.logger.info("Inserting with SQL: %s", insert)
# Only one record per PK, we want to take the last one
data_to_insert: list[dict[str, t.Any]] = []
copy_statement: str = self.generate_copy_statement(table.name, columns)
self.logger.info("Inserting with SQL: %s", copy_statement)

data_to_copy: list[dict[str, t.Any]] = []

# If append only is False, we only take the latest record one per primary key
if self.append_only is False:
insert_records: dict[tuple, dict] = {} # pk tuple: record
unique_copy_records: dict[tuple, dict] = {} # pk tuple: values
for record in records:
insert_record = {
column.name: record.get(column.name) for column in columns
}
# No need to check for a KeyError here because the SDK already
# guarantees that all key properties exist in the record.
primary_key_tuple = tuple(record[key] for key in primary_keys)
insert_records[primary_key_tuple] = insert_record
data_to_insert = list(insert_records.values())
unique_copy_records[primary_key_tuple] = insert_record
data_to_copy = list(unique_copy_records.values())
else:
for record in records:
insert_record = {
column.name: record.get(column.name) for column in columns
}
data_to_insert.append(insert_record)
connection.execute(insert, data_to_insert)
data_to_copy.append(insert_record)

# Prepare to process the rows into csv. Use each column's bind_processor to do
# most of the work, then do the final construction of the csv rows ourselves
# to control exactly how values are converted and which ones are quoted.
column_bind_processors = {
column.name: column.type.bind_processor(connection.dialect)
for column in columns
}

# Use copy to run the copy statement.
# https://www.psycopg.org/psycopg3/docs/basic/copy.html
with connection.connection.cursor().copy(copy_statement) as copy: # type: ignore[attr-defined]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens at this point if someone sets postgresql+psycopg2 for dialect+driver?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@edgarrmondragon It would raise an exception. In the current main branch I don't think using anything aside from postgresql+psycopg2 would work anyway, so this being configurable doesn't add much.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not 100% sure, but I don't think we use driver-specific APIs and rely on SQLAlchemy DDL/DML in all places, so I would expect most drivers to work. Maybe I'm wrong.

for row in data_to_copy:
processed_row = []
for row_column_name in row:
if column_bind_processors[row_column_name] is not None:
processed_row.append(
column_bind_processors[row_column_name](
row[row_column_name]
)
)
else:
processed_row.append(row[row_column_name])

copy.write_row(processed_row)

return True

def upsert(
Expand Down
2 changes: 1 addition & 1 deletion target_postgres/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def __init__(
th.Property(
"dialect+driver",
th.StringType,
default="postgresql+psycopg2",
default="postgresql+psycopg",
description=(
"Dialect+driver see "
+ "https://docs.sqlalchemy.org/en/20/core/engines.html. "
Expand Down
6 changes: 3 additions & 3 deletions target_postgres/tests/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

def postgres_config():
return {
"dialect+driver": "postgresql+psycopg2",
"dialect+driver": "postgresql+psycopg",
"host": "localhost",
"user": "postgres",
"password": "postgres",
Expand All @@ -29,7 +29,7 @@ def postgres_config():

def postgres_config_no_ssl():
return {
"dialect+driver": "postgresql+psycopg2",
"dialect+driver": "postgresql+psycopg",
"host": "localhost",
"user": "postgres",
"password": "postgres",
Expand All @@ -43,7 +43,7 @@ def postgres_config_no_ssl():

def postgres_config_ssh_tunnel():
return {
"sqlalchemy_url": "postgresql://postgres:[email protected]:5432/main",
"sqlalchemy_url": "postgresql+psycopg://postgres:[email protected]:5432/main",
"ssh_tunnel": {
"enable": True,
"host": "127.0.0.1",
Expand Down
8 changes: 3 additions & 5 deletions target_postgres/tests/test_sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,18 @@
TargetCamelcaseTest,
TargetCliPrintsTest,
TargetDuplicateRecords,
TargetEncodedStringData,
TargetInvalidSchemaTest,
TargetMultipleStateMessages,
TargetNoPrimaryKeys,
TargetOptionalAttributes,
TargetRecordBeforeSchemaTest,
TargetRecordMissingKeyProperty,
TargetRecordMissingOptionalFields,
TargetRecordMissingRequiredProperty,
TargetSchemaNoProperties,
TargetSchemaUpdates,
TargetSpecialCharsInAttributes,
)

from target_postgres.target import TargetPostgres

from .core import create_engine, postgres_config

target_tests = TestSuite(
Expand Down Expand Up @@ -62,7 +58,9 @@ class BasePostgresSDKTests:
@pytest.fixture()
def connection(self, runner):
engine = create_engine(runner)
return engine.connect()
with engine.connect() as conn:
yield conn
engine.dispose()


SDKTests = get_target_test_class(
Expand Down
14 changes: 11 additions & 3 deletions target_postgres/tests/test_target_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ def verify_data(
sqlalchemy.text(f"SELECT COUNT(*) FROM {full_table_name}")
)
assert result.first()[0] == number_of_rows
engine.dispose()


def test_sqlalchemy_url_config(postgres_config_no_ssl):
Expand All @@ -157,7 +158,7 @@ def test_sqlalchemy_url_config(postgres_config_no_ssl):
port = postgres_config_no_ssl["port"]

config = {
"sqlalchemy_url": f"postgresql://{user}:{password}@{host}:{port}/{database}"
"sqlalchemy_url": f"postgresql+psycopg://{user}:{password}@{host}:{port}/{database}"
}
tap = SampleTapCountries(config={}, state=None)
target = TargetPostgres(config=config)
Expand All @@ -167,7 +168,7 @@ def test_sqlalchemy_url_config(postgres_config_no_ssl):
def test_port_default_config():
"""Test that the default config is passed into the engine when the config doesn't provide it"""
config = {
"dialect+driver": "postgresql+psycopg2",
"dialect+driver": "postgresql+psycopg",
"host": "localhost",
"user": "postgres",
"password": "postgres",
Expand All @@ -186,12 +187,13 @@ def test_port_default_config():
engine.url.render_as_string(hide_password=False)
== f"{dialect_driver}://{user}:{password}@{host}:5432/{database}"
)
engine.dispose()


def test_port_config():
"""Test that the port config works"""
config = {
"dialect+driver": "postgresql+psycopg2",
"dialect+driver": "postgresql+psycopg",
"host": "localhost",
"user": "postgres",
"password": "postgres",
Expand All @@ -211,6 +213,7 @@ def test_port_config():
engine.url.render_as_string(hide_password=False)
== f"{dialect_driver}://{user}:{password}@{host}:5433/{database}"
)
engine.dispose()


# Test name would work well
Expand Down Expand Up @@ -402,6 +405,7 @@ def test_no_primary_keys(postgres_target):
singer_file_to_target(file_name, postgres_target)

verify_data(postgres_target, table_name, 16)
engine.dispose()


def test_no_type(postgres_target):
Expand Down Expand Up @@ -511,6 +515,7 @@ def test_anyof(postgres_target):
# {"anyOf":[{"type":"string"},{"type":"integer"},{"type":"null"}]}
if column.name == "legacy_id":
assert isinstance(column.type, TEXT)
engine.dispose()


def test_new_array_column(postgres_target):
Expand Down Expand Up @@ -621,6 +626,7 @@ def test_activate_version_hard_delete(postgres_config_no_ssl):
assert result.rowcount == 9

singer_file_to_target(file_name, pg_hard_delete_true)
engine.dispose()

# Should remove the 2 records we added manually
with engine.connect() as connection:
Expand Down Expand Up @@ -692,6 +698,7 @@ def test_activate_version_soft_delete(postgres_config_no_ssl):
# South America row should not have been modified, but it would have been prior
# to the fix mentioned in #204 and implemented in #240.
assert south_america == result.first()._asdict()
engine.dispose()


def test_activate_version_no_metadata(postgres_config_no_ssl):
Expand Down Expand Up @@ -742,6 +749,7 @@ def test_activate_version_deletes_data_properly(postgres_target):
with engine.connect() as connection:
result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name}"))
assert result.rowcount == 0
engine.dispose()


def test_reserved_keywords(postgres_target):
Expand Down
2 changes: 1 addition & 1 deletion target_postgres/tests/test_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def connector():
"""Create a PostgresConnector instance."""
return PostgresConnector(
config={
"dialect+driver": "postgresql+psycopg2",
"dialect+driver": "postgresql+psycopg",
"host": "localhost",
"port": "5432",
"user": "postgres",
Expand Down
Loading