From 305e001f4cdda7eb486d7db8f875b0242b72e740 Mon Sep 17 00:00:00 2001 From: Sebastian Smiley Date: Fri, 19 Jul 2024 15:21:44 -0400 Subject: [PATCH 1/2] get tests to fail --- .../data_files/test_activate_version_hard.singer | 1 + .../test_activate_version_soft_with_delete.singer | 12 ++++++------ target_postgres/tests/test_target_postgres.py | 6 +++--- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/target_postgres/tests/data_files/test_activate_version_hard.singer b/target_postgres/tests/data_files/test_activate_version_hard.singer index 1d05c3f8..e8b6dfc9 100644 --- a/target_postgres/tests/data_files/test_activate_version_hard.singer +++ b/target_postgres/tests/data_files/test_activate_version_hard.singer @@ -7,4 +7,5 @@ {"type": "RECORD", "stream": "test_activate_version_hard", "record": {"code": "NA", "name": "North America"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} {"type": "RECORD", "stream": "test_activate_version_hard", "record": {"code": "OC", "name": "Oceania"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} {"type": "RECORD", "stream": "test_activate_version_hard", "record": {"code": "SA", "name": "South America"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version_hard", "record": {"code": "XX", "name": "This record should be deleted"}, "version": 1674486431562, "time_extracted": "2023-01-23T15:07:11.563063Z"} {"type": "ACTIVATE_VERSION", "stream": "test_activate_version_hard", "version": 1674486431563} diff --git a/target_postgres/tests/data_files/test_activate_version_soft_with_delete.singer b/target_postgres/tests/data_files/test_activate_version_soft_with_delete.singer index 6e536bec..2a741c64 100644 --- a/target_postgres/tests/data_files/test_activate_version_soft_with_delete.singer +++ b/target_postgres/tests/data_files/test_activate_version_soft_with_delete.singer @@ -1,9 +1,9 @@ {"type": "SCHEMA", "stream": "test_activate_version_soft", "schema": {"type": "object", "properties": {"code": {"type": ["string"]}, "name": {"type": ["null", "string"]}}}, "key_properties": ["code"], "bookmark_properties": []} {"type": "ACTIVATE_VERSION", "stream": "test_activate_version_soft", "version": 1674486431564} -{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "AF", "name": "Africa"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} -{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "AN", "name": "Antarctica"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} -{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "AS", "name": "Asia"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} -{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "EU", "name": "Europe"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} -{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "NA", "name": "North America"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} -{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "OC", "name": "Oceania"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "AF", "name": "Africa"}, "version": 1674486431564, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "AN", "name": "Antarctica"}, "version": 1674486431564, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "AS", "name": "Asia"}, "version": 1674486431564, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "EU", "name": "Europe"}, "version": 1674486431564, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "NA", "name": "North America"}, "version": 1674486431564, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "OC", "name": "Oceania"}, "version": 1674486431564, "time_extracted": "2023-01-23T15:07:11.563063Z"} {"type": "ACTIVATE_VERSION", "stream": "test_activate_version_soft", "version": 1674486431564} diff --git a/target_postgres/tests/test_target_postgres.py b/target_postgres/tests/test_target_postgres.py index 9bc0acc1..67d4a92f 100644 --- a/target_postgres/tests/test_target_postgres.py +++ b/target_postgres/tests/test_target_postgres.py @@ -623,9 +623,9 @@ def test_activate_version_soft_delete(postgres_config_no_ssl): table_name = "test_activate_version_soft" file_name = f"{table_name}.singer" full_table_name = postgres_config_no_ssl["default_target_schema"] + "." + table_name - postgres_config_hard_delete_true = copy.deepcopy(postgres_config_no_ssl) - postgres_config_hard_delete_true["hard_delete"] = False - pg_soft_delete = TargetPostgres(config=postgres_config_hard_delete_true) + postgres_config_hard_delete_false = copy.deepcopy(postgres_config_no_ssl) + postgres_config_hard_delete_false["hard_delete"] = False + pg_soft_delete = TargetPostgres(config=postgres_config_hard_delete_false) engine = create_engine(pg_soft_delete) singer_file_to_target(file_name, pg_soft_delete) with engine.connect() as connection: From 4021c4483173e77618716e2914ff4a369ec409f2 Mon Sep 17 00:00:00 2001 From: Sebastian Smiley Date: Fri, 19 Jul 2024 15:23:59 -0400 Subject: [PATCH 2/2] get tests to pass --- target_postgres/sinks.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/target_postgres/sinks.py b/target_postgres/sinks.py index ea8b8df3..52673375 100644 --- a/target_postgres/sinks.py +++ b/target_postgres/sinks.py @@ -324,6 +324,15 @@ def activate_version(self, new_version: int) -> None: ) return + if self._pending_batch: + self.logger.info( + "An activate version message for '%s' was received. Draining...", + self.stream_name, + ) + draining_status = self.start_drain() + self.process_batch(draining_status) + self.mark_drained() + # There's nothing to do if the table doesn't exist yet # (which it won't the first time the stream is processed) if not self.connector.table_exists(self.full_table_name): @@ -370,7 +379,7 @@ def activate_version(self, new_version: int) -> None: delete_stmt = sa.delete(target_table).where( sa.or_( target_table.c[self.version_column_name].is_(None), - target_table.c[self.version_column_name] <= new_version, + target_table.c[self.version_column_name] < new_version, ) ) connection.execute(delete_stmt)