diff --git a/README.md b/README.md index a1b48c33..32733350 100644 --- a/README.md +++ b/README.md @@ -25,23 +25,24 @@ Built with the [Meltano SDK](https://sdk.meltano.com) for Singer Taps and Target | database | False | None | Database name. Note if sqlalchemy_url is set this will be ignored. | | sqlalchemy_url | False | None | SQLAlchemy connection string. This will override using host, user, password, port, dialect, and all ssl settings. Note that you must escape password special characters properly. See https://docs.sqlalchemy.org/en/20/core/engines.html#escaping-special-characters-such-as-signs-in-passwords | | dialect+driver | False | postgresql+psycopg2 | Dialect+driver see https://docs.sqlalchemy.org/en/20/core/engines.html. Generally just leave this alone. Note if sqlalchemy_url is set this will be ignored. | -| default_target_schema | False | None | Postgres schema to send data to, example: tap-clickup | -| hard_delete | False | 0 | When activate version is sent from a tap this specefies if we should delete the records that don't match, or mark them with a date in the `_sdc_deleted_at` column. | -| add_record_metadata | False | 1 | Note that this must be enabled for activate_version to work!This adds _sdc_extracted_at, _sdc_batched_at, and more to every table. See https://sdk.meltano.com/en/latest/implementation/record_metadata.html for more information. | -| ssh_tunnel | False | None | SSH Tunnel Configuration, this is a json object | -| ssh_tunnel.enable | True (if ssh_tunnel set) | False | Enable an ssh tunnel (also known as bastion host), see the other ssh_tunnel.* properties for more details. -| ssh_tunnel.host | True (if ssh_tunnel set) | False | Host of the bastion host, this is the host we'll connect to via ssh -| ssh_tunnel.username | True (if ssh_tunnel set) | False |Username to connect to bastion host -| ssh_tunnel.port | True (if ssh_tunnel set) | 22 | Port to connect to bastion host -| ssh_tunnel.private_key | True (if ssh_tunnel set) | None | Private Key for authentication to the bastion host -| ssh_tunnel.private_key_password | False | None | Private Key Password, leave None if no password is set -| ssl_enable | False | 0 | Whether or not to use ssl to verify the server's identity. Use ssl_certificate_authority and ssl_mode for further customization. To use a client certificate to authenticate yourself to the server, use ssl_client_certificate_enable instead. Note if sqlalchemy_url is set this will be ignored. | -| ssl_client_certificate_enable| False | 0 | Whether or not to provide client-side certificates as a method of authentication to the server. Use ssl_client_certificate and ssl_client_private_key for further customization. To use SSL to verify the server's identity, use ssl_enable instead. Note if sqlalchemy_url is set this will be ignored. | +| default_target_schema | False | melty | Postgres schema to send data to, example: tap-clickup | +| activate_version | False | True | If set to false, the tap will ignore activate version messages. If set to true, add_record_metadata must be set to true as well. | +| hard_delete | False | False | When activate version is sent from a tap this specefies if we should delete the records that don't match, or mark them with a date in the `_sdc_deleted_at` column. This config option is ignored if `activate_version` is set to false. | +| add_record_metadata | False | True | Note that this must be enabled for activate_version to work!This adds _sdc_extracted_at, _sdc_batched_at, and more to every table. See https://sdk.meltano.com/en/latest/implementation/record_metadata.html for more information. | +| ssl_enable | False | False | Whether or not to use ssl to verify the server's identity. Use ssl_certificate_authority and ssl_mode for further customization. To use a client certificate to authenticate yourself to the server, use ssl_client_certificate_enable instead. Note if sqlalchemy_url is set this will be ignored. | +| ssl_client_certificate_enable| False | False | Whether or not to provide client-side certificates as a method of authentication to the server. Use ssl_client_certificate and ssl_client_private_key for further customization. To use SSL to verify the server's identity, use ssl_enable instead. Note if sqlalchemy_url is set this will be ignored. | | ssl_mode | False | verify-full | SSL Protection method, see [postgres documentation](https://www.postgresql.org/docs/current/libpq-ssl.html#LIBPQ-SSL-PROTECTION) for more information. Must be one of disable, allow, prefer, require, verify-ca, or verify-full. Note if sqlalchemy_url is set this will be ignored. | | ssl_certificate_authority | False | ~/.postgresql/root.crl | The certificate authority that should be used to verify the server's identity. Can be provided either as the certificate itself (in .env) or as a filepath to the certificate. Note if sqlalchemy_url is set this will be ignored. | | ssl_client_certificate | False | ~/.postgresql/postgresql.crt | The certificate that should be used to verify your identity to the server. Can be provided either as the certificate itself (in .env) or as a filepath to the certificate. Note if sqlalchemy_url is set this will be ignored. | | ssl_client_private_key | False | ~/.postgresql/postgresql.key | The private key for the certificate you provided. Can be provided either as the certificate itself (in .env) or as a filepath to the certificate. Note if sqlalchemy_url is set this will be ignored. | | ssl_storage_directory | False | .secrets | The folder in which to store SSL certificates provided as raw values. When a certificate/key is provided as a raw value instead of as a filepath, it must be written to a file before it can be used. This configuration option determines where that file is created. | +| ssh_tunnel | False | None | SSH Tunnel Configuration, this is a json object | +| ssh_tunnel.enable | True (if ssh_tunnel set) | False | Enable an ssh tunnel (also known as bastion host), see the other ssh_tunnel.* properties for more details. +| ssh_tunnel.host | True (if ssh_tunnel set) | False | Host of the bastion host, this is the host we'll connect to via ssh +| ssh_tunnel.username | True (if ssh_tunnel set) | False |Username to connect to bastion host +| ssh_tunnel.port | True (if ssh_tunnel set) | 22 | Port to connect to bastion host +| ssh_tunnel.private_key | True (if ssh_tunnel set) | None | Private Key for authentication to the bastion host +| ssh_tunnel.private_key_password | False | None | Private Key Password, leave None if no password is set | stream_maps | False | None | Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html). | | stream_map_config | False | None | User-defined config values to be used within map expressions. | | flattening_enabled | False | None | 'True' to enable schema flattening and automatically expand nested properties. | diff --git a/target_postgres/connector.py b/target_postgres/connector.py index 09554d59..8b83e5df 100644 --- a/target_postgres/connector.py +++ b/target_postgres/connector.py @@ -130,8 +130,7 @@ def prepare_table( # type: ignore[override] if property_name in columns: column_object = columns[property_name] self.prepare_column( - schema_name=cast(str, schema_name), - table=table, + full_table_name=table.fullname, column_name=property_name, sql_type=self.to_sql_type(property_def), connection=connection, @@ -370,43 +369,48 @@ def create_empty_table( # type: ignore[override] new_table.create(bind=connection) return new_table - def prepare_column( # type: ignore[override] + def prepare_column( self, - schema_name: str, - table: sqlalchemy.Table, + full_table_name: str, column_name: str, sql_type: sqlalchemy.types.TypeEngine, - connection: sqlalchemy.engine.Connection, + connection: sqlalchemy.engine.Connection | None = None, column_object: sqlalchemy.Column | None = None, ) -> None: """Adapt target table to provided schema if possible. Args: - schema_name: the schema name. - table: the target table. + full_table_name: the fully qualified table name. column_name: the target column name. sql_type: the SQLAlchemy type. - connection: the database connection. + connection: a database connection. optional. + column_object: a SQLAlchemy column. optional. """ + if connection is None: + super().prepare_column(full_table_name, column_name, sql_type) + return + + _, schema_name, table_name = self.parse_full_table_name(full_table_name) + column_exists = column_object is not None or self.column_exists( - table.fullname, column_name, connection=connection + full_table_name, column_name, connection=connection ) if not column_exists: self._create_empty_column( # We should migrate every function to use sqlalchemy.Table # instead of having to know what the function wants - table_name=table.name, + table_name=table_name, column_name=column_name, sql_type=sql_type, - schema_name=schema_name, + schema_name=cast(str, schema_name), connection=connection, ) return self._adapt_column_type( - schema_name=schema_name, - table_name=table.name, + schema_name=cast(str, schema_name), + table_name=table_name, column_name=column_name, sql_type=sql_type, connection=connection, diff --git a/target_postgres/sinks.py b/target_postgres/sinks.py index c8bb175a..8c32c261 100644 --- a/target_postgres/sinks.py +++ b/target_postgres/sinks.py @@ -314,34 +314,45 @@ def activate_version(self, new_version: int) -> None: Args: new_version: The version number to activate. """ + if self.config["activate_version"] is False: + self.logger.warning( + "An activate version message was received, but activate_version is set " + "to false so it was ignored." + ) + return + # There's nothing to do if the table doesn't exist yet # (which it won't the first time the stream is processed) if not self.connector.table_exists(self.full_table_name): return deleted_at = now() - # Different from SingerSDK as we need to handle types the - # same as SCHEMA messsages - datetime_type = self.connector.to_sql_type( - {"type": "string", "format": "date-time"} - ) - - # Different from SingerSDK as we need to handle types the - # same as SCHEMA messsages - integer_type = self.connector.to_sql_type({"type": "integer"}) with self.connector._connect() as connection, connection.begin(): + # Theoretically these errors should never appear because we always create + # the columns, but it's useful as a sanity check. If anything changes later, + # the error that would otherwise appear is not as intuitive. if not self.connector.column_exists( full_table_name=self.full_table_name, column_name=self.version_column_name, connection=connection, ): - self.connector.prepare_column( # type: ignore[call-arg] - self.full_table_name, - self.version_column_name, # type: ignore[arg-type] - sql_type=integer_type, + raise RuntimeError( + f"{self.version_column_name} is required for activate version " + "messages, but doesn't exist." + ) + if not ( + self.config["hard_delete"] + or self.connector.column_exists( + full_table_name=self.full_table_name, + column_name=self.soft_delete_column_name, connection=connection, ) + ): + raise RuntimeError( + f"{self.version_column_name} is required for soft deletion with " + "activate version messages, but doesn't exist." + ) metadata = MetaData() target_table = Table( @@ -362,17 +373,6 @@ def activate_version(self, new_version: int) -> None: connection.execute(delete_stmt) return - if not self.connector.column_exists( - full_table_name=self.full_table_name, - column_name=self.soft_delete_column_name, - connection=connection, - ): - self.connector.prepare_column( # type: ignore[call-arg] - self.full_table_name, - self.soft_delete_column_name, # type: ignore[arg-type] - sql_type=datetime_type, - connection=connection, - ) # Need to deal with the case where data doesn't exist for the version column update_stmt = ( update(target_table) diff --git a/target_postgres/target.py b/target_postgres/target.py index 12ca714d..0d5272d9 100644 --- a/target_postgres/target.py +++ b/target_postgres/target.py @@ -79,6 +79,14 @@ def __init__( + " ssl_client_certificate or ssl_client_private_key are unset." ) + assert self.config.get("add_record_metadata") or not self.config.get( + "activate_version" + ), ( + "Activate version messages can't be processed unless add_record_metadata " + "is set to true. To ignore Activate version messages instead, Set the " + "`activate_version` configuration to False." + ) + name = "target-postgres" config_jsonschema = th.PropertiesList( th.Property( @@ -150,6 +158,15 @@ def __init__( description="Postgres schema to send data to, example: tap-clickup", default="melty", ), + th.Property( + "activate_version", + th.BooleanType, + default=True, + description=( + "If set to false, the tap will ignore activate version messages. If " + + "set to true, add_record_metadata must be set to true as well." + ), + ), th.Property( "hard_delete", th.BooleanType, @@ -157,7 +174,8 @@ def __init__( description=( "When activate version is sent from a tap this specefies " + "if we should delete the records that don't match, or mark " - + "them with a date in the `_sdc_deleted_at` column." + + "them with a date in the `_sdc_deleted_at` column. This config " + + "option is ignored if `activate_version` is set to false." ), ), th.Property( diff --git a/target_postgres/tests/data_files/test_activate_version_soft_with_delete.singer b/target_postgres/tests/data_files/test_activate_version_soft_with_delete.singer new file mode 100644 index 00000000..6e536bec --- /dev/null +++ b/target_postgres/tests/data_files/test_activate_version_soft_with_delete.singer @@ -0,0 +1,9 @@ +{"type": "SCHEMA", "stream": "test_activate_version_soft", "schema": {"type": "object", "properties": {"code": {"type": ["string"]}, "name": {"type": ["null", "string"]}}}, "key_properties": ["code"], "bookmark_properties": []} +{"type": "ACTIVATE_VERSION", "stream": "test_activate_version_soft", "version": 1674486431564} +{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "AF", "name": "Africa"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "AN", "name": "Antarctica"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "AS", "name": "Asia"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "EU", "name": "Europe"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "NA", "name": "North America"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "OC", "name": "Oceania"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "ACTIVATE_VERSION", "stream": "test_activate_version_soft", "version": 1674486431564} diff --git a/target_postgres/tests/test_target_postgres.py b/target_postgres/tests/test_target_postgres.py index 1eaa9978..93f56c30 100644 --- a/target_postgres/tests/test_target_postgres.py +++ b/target_postgres/tests/test_target_postgres.py @@ -1,6 +1,7 @@ """ Postgres target tests """ # flake8: noqa import copy +import datetime import io from contextlib import redirect_stdout from decimal import Decimal @@ -536,24 +537,34 @@ def test_activate_version_hard_delete(postgres_config_no_ssl): assert result.rowcount == 7 -def test_activate_version_soft_delete(postgres_target): +def test_activate_version_soft_delete(postgres_config_no_ssl): """Activate Version Soft Delete Test""" - engine = create_engine(postgres_target) table_name = "test_activate_version_soft" file_name = f"{table_name}.singer" - full_table_name = postgres_target.config["default_target_schema"] + "." + table_name - with engine.connect() as connection, connection.begin(): - result = connection.execute( - sqlalchemy.text(f"DROP TABLE IF EXISTS {full_table_name}") - ) - postgres_config_soft_delete = copy.deepcopy(postgres_target._config) - postgres_config_soft_delete["hard_delete"] = False - pg_soft_delete = TargetPostgres(config=postgres_config_soft_delete) + full_table_name = postgres_config_no_ssl["default_target_schema"] + "." + table_name + postgres_config_hard_delete_true = copy.deepcopy(postgres_config_no_ssl) + postgres_config_hard_delete_true["hard_delete"] = False + pg_soft_delete = TargetPostgres(config=postgres_config_hard_delete_true) + engine = create_engine(pg_soft_delete) singer_file_to_target(file_name, pg_soft_delete) + with engine.connect() as connection: + result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name}")) + assert result.rowcount == 7 + + # Same file as above, but with South America (code=SA) record missing. + file_name = f"{table_name}_with_delete.singer" + south_america = {} + singer_file_to_target(file_name, pg_soft_delete) with engine.connect() as connection: result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name}")) assert result.rowcount == 7 + result = connection.execute( + sqlalchemy.text(f"SELECT * FROM {full_table_name} WHERE code='SA'") + ) + south_america = result.first()._asdict() + + singer_file_to_target(file_name, pg_soft_delete) with engine.connect() as connection, connection.begin(): # Add a record like someone would if they weren't using the tap target combo result = connection.execute( @@ -582,7 +593,23 @@ def test_activate_version_soft_delete(postgres_target): f"SELECT * FROM {full_table_name} where _sdc_deleted_at is NOT NULL" ) ) - assert result.rowcount == 2 + assert result.rowcount == 3 # 2 manual + 1 deleted (south america) + + result = connection.execute( + sqlalchemy.text(f"SELECT * FROM {full_table_name} WHERE code='SA'") + ) + # South America row should not have been modified, but it would have been prior + # to the fix mentioned in #204 and implemented in #240. + assert south_america == result.first()._asdict() + + +def test_activate_version_no_metadata(postgres_config_no_ssl): + """Activate Version Test for if add_record_metadata is disabled""" + postgres_config_modified = copy.deepcopy(postgres_config_no_ssl) + postgres_config_modified["activate_version"] = True + postgres_config_modified["add_record_metadata"] = False + with pytest.raises(AssertionError): + TargetPostgres(config=postgres_config_modified) def test_activate_version_deletes_data_properly(postgres_target):