Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: prepare_column() refactor and new activate_version config option. #240

Merged
merged 7 commits into from
Dec 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 13 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,24 @@ Built with the [Meltano SDK](https://sdk.meltano.com) for Singer Taps and Target
| database | False | None | Database name. Note if sqlalchemy_url is set this will be ignored. |
| sqlalchemy_url | False | None | SQLAlchemy connection string. This will override using host, user, password, port, dialect, and all ssl settings. Note that you must escape password special characters properly. See https://docs.sqlalchemy.org/en/20/core/engines.html#escaping-special-characters-such-as-signs-in-passwords |
| dialect+driver | False | postgresql+psycopg2 | Dialect+driver see https://docs.sqlalchemy.org/en/20/core/engines.html. Generally just leave this alone. Note if sqlalchemy_url is set this will be ignored. |
| default_target_schema | False | None | Postgres schema to send data to, example: tap-clickup |
| hard_delete | False | 0 | When activate version is sent from a tap this specefies if we should delete the records that don't match, or mark them with a date in the `_sdc_deleted_at` column. |
| add_record_metadata | False | 1 | Note that this must be enabled for activate_version to work!This adds _sdc_extracted_at, _sdc_batched_at, and more to every table. See https://sdk.meltano.com/en/latest/implementation/record_metadata.html for more information. |
| ssh_tunnel | False | None | SSH Tunnel Configuration, this is a json object |
| ssh_tunnel.enable | True (if ssh_tunnel set) | False | Enable an ssh tunnel (also known as bastion host), see the other ssh_tunnel.* properties for more details.
| ssh_tunnel.host | True (if ssh_tunnel set) | False | Host of the bastion host, this is the host we'll connect to via ssh
visch marked this conversation as resolved.
Show resolved Hide resolved
| ssh_tunnel.username | True (if ssh_tunnel set) | False |Username to connect to bastion host
| ssh_tunnel.port | True (if ssh_tunnel set) | 22 | Port to connect to bastion host
| ssh_tunnel.private_key | True (if ssh_tunnel set) | None | Private Key for authentication to the bastion host
| ssh_tunnel.private_key_password | False | None | Private Key Password, leave None if no password is set
| ssl_enable | False | 0 | Whether or not to use ssl to verify the server's identity. Use ssl_certificate_authority and ssl_mode for further customization. To use a client certificate to authenticate yourself to the server, use ssl_client_certificate_enable instead. Note if sqlalchemy_url is set this will be ignored. |
| ssl_client_certificate_enable| False | 0 | Whether or not to provide client-side certificates as a method of authentication to the server. Use ssl_client_certificate and ssl_client_private_key for further customization. To use SSL to verify the server's identity, use ssl_enable instead. Note if sqlalchemy_url is set this will be ignored. |
| default_target_schema | False | melty | Postgres schema to send data to, example: tap-clickup |
| activate_version | False | True | If set to false, the tap will ignore activate version messages. If set to true, add_record_metadata must be set to true as well. |
| hard_delete | False | False | When activate version is sent from a tap this specefies if we should delete the records that don't match, or mark them with a date in the `_sdc_deleted_at` column. This config option is ignored if `activate_version` is set to false. |
| add_record_metadata | False | True | Note that this must be enabled for activate_version to work!This adds _sdc_extracted_at, _sdc_batched_at, and more to every table. See https://sdk.meltano.com/en/latest/implementation/record_metadata.html for more information. |
| ssl_enable | False | False | Whether or not to use ssl to verify the server's identity. Use ssl_certificate_authority and ssl_mode for further customization. To use a client certificate to authenticate yourself to the server, use ssl_client_certificate_enable instead. Note if sqlalchemy_url is set this will be ignored. |
| ssl_client_certificate_enable| False | False | Whether or not to provide client-side certificates as a method of authentication to the server. Use ssl_client_certificate and ssl_client_private_key for further customization. To use SSL to verify the server's identity, use ssl_enable instead. Note if sqlalchemy_url is set this will be ignored. |
| ssl_mode | False | verify-full | SSL Protection method, see [postgres documentation](https://www.postgresql.org/docs/current/libpq-ssl.html#LIBPQ-SSL-PROTECTION) for more information. Must be one of disable, allow, prefer, require, verify-ca, or verify-full. Note if sqlalchemy_url is set this will be ignored. |
| ssl_certificate_authority | False | ~/.postgresql/root.crl | The certificate authority that should be used to verify the server's identity. Can be provided either as the certificate itself (in .env) or as a filepath to the certificate. Note if sqlalchemy_url is set this will be ignored. |
| ssl_client_certificate | False | ~/.postgresql/postgresql.crt | The certificate that should be used to verify your identity to the server. Can be provided either as the certificate itself (in .env) or as a filepath to the certificate. Note if sqlalchemy_url is set this will be ignored. |
| ssl_client_private_key | False | ~/.postgresql/postgresql.key | The private key for the certificate you provided. Can be provided either as the certificate itself (in .env) or as a filepath to the certificate. Note if sqlalchemy_url is set this will be ignored. |
| ssl_storage_directory | False | .secrets | The folder in which to store SSL certificates provided as raw values. When a certificate/key is provided as a raw value instead of as a filepath, it must be written to a file before it can be used. This configuration option determines where that file is created. |
| ssh_tunnel | False | None | SSH Tunnel Configuration, this is a json object |
| ssh_tunnel.enable | True (if ssh_tunnel set) | False | Enable an ssh tunnel (also known as bastion host), see the other ssh_tunnel.* properties for more details.
| ssh_tunnel.host | True (if ssh_tunnel set) | False | Host of the bastion host, this is the host we'll connect to via ssh
| ssh_tunnel.username | True (if ssh_tunnel set) | False |Username to connect to bastion host
| ssh_tunnel.port | True (if ssh_tunnel set) | 22 | Port to connect to bastion host
| ssh_tunnel.private_key | True (if ssh_tunnel set) | None | Private Key for authentication to the bastion host
| ssh_tunnel.private_key_password | False | None | Private Key Password, leave None if no password is set
| stream_maps | False | None | Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html). |
| stream_map_config | False | None | User-defined config values to be used within map expressions. |
| flattening_enabled | False | None | 'True' to enable schema flattening and automatically expand nested properties. |
Expand Down
32 changes: 18 additions & 14 deletions target_postgres/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,7 @@ def prepare_table( # type: ignore[override]
if property_name in columns:
column_object = columns[property_name]
self.prepare_column(
schema_name=cast(str, schema_name),
table=table,
full_table_name=table.fullname,
column_name=property_name,
sql_type=self.to_sql_type(property_def),
connection=connection,
Expand Down Expand Up @@ -370,43 +369,48 @@ def create_empty_table( # type: ignore[override]
new_table.create(bind=connection)
return new_table

def prepare_column( # type: ignore[override]
def prepare_column(
self,
schema_name: str,
table: sqlalchemy.Table,
full_table_name: str,
column_name: str,
sql_type: sqlalchemy.types.TypeEngine,
connection: sqlalchemy.engine.Connection,
connection: sqlalchemy.engine.Connection | None = None,
column_object: sqlalchemy.Column | None = None,
) -> None:
"""Adapt target table to provided schema if possible.

Args:
schema_name: the schema name.
table: the target table.
full_table_name: the fully qualified table name.
column_name: the target column name.
sql_type: the SQLAlchemy type.
connection: the database connection.
connection: a database connection. optional.
column_object: a SQLAlchemy column. optional.
"""
if connection is None:
super().prepare_column(full_table_name, column_name, sql_type)
return

_, schema_name, table_name = self.parse_full_table_name(full_table_name)

column_exists = column_object is not None or self.column_exists(
table.fullname, column_name, connection=connection
full_table_name, column_name, connection=connection
)

if not column_exists:
self._create_empty_column(
# We should migrate every function to use sqlalchemy.Table
# instead of having to know what the function wants
table_name=table.name,
table_name=table_name,
column_name=column_name,
sql_type=sql_type,
schema_name=schema_name,
schema_name=cast(str, schema_name),
connection=connection,
)
return

self._adapt_column_type(
schema_name=schema_name,
table_name=table.name,
schema_name=cast(str, schema_name),
table_name=table_name,
column_name=column_name,
sql_type=sql_type,
connection=connection,
Expand Down
48 changes: 24 additions & 24 deletions target_postgres/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,34 +314,45 @@ def activate_version(self, new_version: int) -> None:
Args:
new_version: The version number to activate.
"""
if self.config["activate_version"] is False:
self.logger.warning(
"An activate version message was received, but activate_version is set "
"to false so it was ignored."
)
return

# There's nothing to do if the table doesn't exist yet
# (which it won't the first time the stream is processed)
if not self.connector.table_exists(self.full_table_name):
return

deleted_at = now()
# Different from SingerSDK as we need to handle types the
# same as SCHEMA messsages
datetime_type = self.connector.to_sql_type(
{"type": "string", "format": "date-time"}
)

# Different from SingerSDK as we need to handle types the
# same as SCHEMA messsages
integer_type = self.connector.to_sql_type({"type": "integer"})

with self.connector._connect() as connection, connection.begin():
# Theoretically these errors should never appear because we always create
# the columns, but it's useful as a sanity check. If anything changes later,
# the error that would otherwise appear is not as intuitive.
if not self.connector.column_exists(
full_table_name=self.full_table_name,
column_name=self.version_column_name,
connection=connection,
):
self.connector.prepare_column( # type: ignore[call-arg]
self.full_table_name,
self.version_column_name, # type: ignore[arg-type]
sql_type=integer_type,
raise RuntimeError(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do create this column somewhere right? I remember there being a bug here but I don't remember what it was exactly

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I dove in here. Theoretically this error should never be hit because we always create the column, but I like it as a sanity check. If anything changes down the line, the error that would appear otherwise is not as intuitive.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add Theoretically this error should never be hit because we always create the column, but I like it as a sanity check. If anything changes down the line, the error that would appear otherwise is not as intuitive. as a comment then to the code here to explain this

f"{self.version_column_name} is required for activate version "
"messages, but doesn't exist."
)
if not (
self.config["hard_delete"]
or self.connector.column_exists(
full_table_name=self.full_table_name,
column_name=self.soft_delete_column_name,
connection=connection,
)
):
raise RuntimeError(
f"{self.version_column_name} is required for soft deletion with "
"activate version messages, but doesn't exist."
)

metadata = MetaData()
target_table = Table(
Expand All @@ -362,17 +373,6 @@ def activate_version(self, new_version: int) -> None:
connection.execute(delete_stmt)
return

if not self.connector.column_exists(
full_table_name=self.full_table_name,
column_name=self.soft_delete_column_name,
connection=connection,
):
self.connector.prepare_column( # type: ignore[call-arg]
self.full_table_name,
self.soft_delete_column_name, # type: ignore[arg-type]
sql_type=datetime_type,
connection=connection,
)
# Need to deal with the case where data doesn't exist for the version column
update_stmt = (
update(target_table)
Expand Down
20 changes: 19 additions & 1 deletion target_postgres/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,14 @@ def __init__(
+ " ssl_client_certificate or ssl_client_private_key are unset."
)

assert self.config.get("add_record_metadata") or not self.config.get(
"activate_version"
), (
"Activate version messages can't be processed unless add_record_metadata "
"is set to true. To ignore Activate version messages instead, Set the "
"`activate_version` configuration to False."
)

name = "target-postgres"
config_jsonschema = th.PropertiesList(
th.Property(
Expand Down Expand Up @@ -150,14 +158,24 @@ def __init__(
description="Postgres schema to send data to, example: tap-clickup",
default="melty",
),
th.Property(
"activate_version",
th.BooleanType,
default=True,
description=(
"If set to false, the tap will ignore activate version messages. If "
+ "set to true, add_record_metadata must be set to true as well."
),
),
th.Property(
"hard_delete",
th.BooleanType,
default=False,
description=(
"When activate version is sent from a tap this specefies "
+ "if we should delete the records that don't match, or mark "
+ "them with a date in the `_sdc_deleted_at` column."
+ "them with a date in the `_sdc_deleted_at` column. This config "
+ "option is ignored if `activate_version` is set to false."
),
),
th.Property(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{"type": "SCHEMA", "stream": "test_activate_version_soft", "schema": {"type": "object", "properties": {"code": {"type": ["string"]}, "name": {"type": ["null", "string"]}}}, "key_properties": ["code"], "bookmark_properties": []}
{"type": "ACTIVATE_VERSION", "stream": "test_activate_version_soft", "version": 1674486431564}
{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "AF", "name": "Africa"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"}
{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "AN", "name": "Antarctica"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"}
{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "AS", "name": "Asia"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"}
{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "EU", "name": "Europe"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"}
{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "NA", "name": "North America"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"}
{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "OC", "name": "Oceania"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"}
{"type": "ACTIVATE_VERSION", "stream": "test_activate_version_soft", "version": 1674486431564}
49 changes: 38 additions & 11 deletions target_postgres/tests/test_target_postgres.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
""" Postgres target tests """
# flake8: noqa
import copy
import datetime
import io
from contextlib import redirect_stdout
from decimal import Decimal
Expand Down Expand Up @@ -536,24 +537,34 @@ def test_activate_version_hard_delete(postgres_config_no_ssl):
assert result.rowcount == 7


def test_activate_version_soft_delete(postgres_target):
def test_activate_version_soft_delete(postgres_config_no_ssl):
visch marked this conversation as resolved.
Show resolved Hide resolved
"""Activate Version Soft Delete Test"""
engine = create_engine(postgres_target)
table_name = "test_activate_version_soft"
file_name = f"{table_name}.singer"
full_table_name = postgres_target.config["default_target_schema"] + "." + table_name
with engine.connect() as connection, connection.begin():
result = connection.execute(
sqlalchemy.text(f"DROP TABLE IF EXISTS {full_table_name}")
)
postgres_config_soft_delete = copy.deepcopy(postgres_target._config)
postgres_config_soft_delete["hard_delete"] = False
pg_soft_delete = TargetPostgres(config=postgres_config_soft_delete)
full_table_name = postgres_config_no_ssl["default_target_schema"] + "." + table_name
postgres_config_hard_delete_true = copy.deepcopy(postgres_config_no_ssl)
postgres_config_hard_delete_true["hard_delete"] = False
pg_soft_delete = TargetPostgres(config=postgres_config_hard_delete_true)
engine = create_engine(pg_soft_delete)
singer_file_to_target(file_name, pg_soft_delete)
with engine.connect() as connection:
result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name}"))
assert result.rowcount == 7

# Same file as above, but with South America (code=SA) record missing.
file_name = f"{table_name}_with_delete.singer"
south_america = {}

singer_file_to_target(file_name, pg_soft_delete)
with engine.connect() as connection:
result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name}"))
assert result.rowcount == 7
result = connection.execute(
sqlalchemy.text(f"SELECT * FROM {full_table_name} WHERE code='SA'")
)
south_america = result.first()._asdict()

singer_file_to_target(file_name, pg_soft_delete)
with engine.connect() as connection, connection.begin():
# Add a record like someone would if they weren't using the tap target combo
result = connection.execute(
Expand Down Expand Up @@ -582,7 +593,23 @@ def test_activate_version_soft_delete(postgres_target):
f"SELECT * FROM {full_table_name} where _sdc_deleted_at is NOT NULL"
)
)
assert result.rowcount == 2
assert result.rowcount == 3 # 2 manual + 1 deleted (south america)

result = connection.execute(
sqlalchemy.text(f"SELECT * FROM {full_table_name} WHERE code='SA'")
)
# South America row should not have been modified, but it would have been prior
# to the fix mentioned in #204 and implemented in #240.
assert south_america == result.first()._asdict()


def test_activate_version_no_metadata(postgres_config_no_ssl):
"""Activate Version Test for if add_record_metadata is disabled"""
postgres_config_modified = copy.deepcopy(postgres_config_no_ssl)
postgres_config_modified["activate_version"] = True
postgres_config_modified["add_record_metadata"] = False
with pytest.raises(AssertionError):
TargetPostgres(config=postgres_config_modified)


def test_activate_version_deletes_data_properly(postgres_target):
Expand Down
Loading