From 840181e9e0a1eb33604eddc7fab248a406b28524 Mon Sep 17 00:00:00 2001 From: Sebastian Smiley <46581584+sebastianswms@users.noreply.github.com> Date: Mon, 27 Nov 2023 17:01:44 -0500 Subject: [PATCH] feat: Verify data in tests (row count and first row) (#215) Implement a method to verify data in tests. Currently only checks the total number of rows in a table and the data in the first row. I believe this provides good test coverage while keeping things manageable. Closes #16 --- target_postgres/tests/test_standard_target.py | 219 +++++++++++------- 1 file changed, 132 insertions(+), 87 deletions(-) diff --git a/target_postgres/tests/test_standard_target.py b/target_postgres/tests/test_standard_target.py index 06dd7c56..086b0242 100644 --- a/target_postgres/tests/test_standard_target.py +++ b/target_postgres/tests/test_standard_target.py @@ -4,6 +4,7 @@ import io import uuid from contextlib import redirect_stdout +from decimal import Decimal from pathlib import Path import jsonschema @@ -116,6 +117,57 @@ def remove_metadata_columns(row: dict) -> dict: return new_row +def verify_data( + target: TargetPostgres, + table_name: str, + number_of_rows: int = 1, + primary_key: str | None = None, + check_data: dict | list[dict] | None = None, +): + """Checks whether the data in a table matches a provided data sample. + + Args: + target: The target to obtain a database connection from. + full_table_name: The schema and table name of the table to check data for. + primary_key: The primary key of the table. + number_of_rows: The expected number of rows that should be in the table. + check_data: A dictionary representing the full contents of the first row in the + table, as determined by lowest primary_key value, or else a list of + dictionaries representing every row in the table. + """ + engine = create_engine(target) + full_table_name = f"{target.config['default_target_schema']}.{table_name}" + with engine.connect() as connection: + if primary_key is not None and check_data is not None: + if isinstance(check_data, dict): + result = connection.execute( + sqlalchemy.text( + f"SELECT * FROM {full_table_name} ORDER BY {primary_key}" + ) + ) + assert result.rowcount == number_of_rows + result_dict = remove_metadata_columns(result.first()._asdict()) + assert result_dict == check_data + elif isinstance(check_data, list): + result = connection.execute( + sqlalchemy.text( + f"SELECT * FROM {full_table_name} ORDER BY {primary_key}" + ) + ) + assert result.rowcount == number_of_rows + result_dict = [ + remove_metadata_columns(row._asdict()) for row in result.all() + ] + assert result_dict == check_data + else: + raise ValueError("Invalid check_data - not dict or list of dicts") + else: + result = connection.execute( + sqlalchemy.text(f"SELECT COUNT(*) FROM {full_table_name}") + ) + assert result.first()[0] == number_of_rows + + def test_sqlalchemy_url_config(postgres_config_no_ssl): """Be sure that passing a sqlalchemy_url works @@ -230,10 +282,11 @@ def test_special_chars_in_attributes(postgres_target): singer_file_to_target(file_name, postgres_target) -# TODO test that data is correctly set def test_optional_attributes(postgres_target): file_name = "optional_attributes.singer" singer_file_to_target(file_name, postgres_target) + row = {"id": 1, "optional": "This is optional"} + verify_data(postgres_target, "test_optional_attributes", 4, "id", row) def test_schema_no_properties(postgres_target): @@ -256,12 +309,25 @@ def test_large_numeric_primary_key(postgres_target): def test_schema_updates(postgres_target): file_name = "schema_updates.singer" singer_file_to_target(file_name, postgres_target) + row = { + "id": 1, + "a1": Decimal("101"), + "a2": "string1", + "a3": None, + "a4": None, + "a5": None, + "a6": None, + } + verify_data(postgres_target, "test_schema_updates", 6, "id", row) -# TODO test that data is correct def test_multiple_state_messages(postgres_target): file_name = "multiple_state_messages.singer" singer_file_to_target(file_name, postgres_target) + row = {"id": 1, "metric": 100} + verify_data(postgres_target, "test_multiple_state_messages_a", 6, "id", row) + row = {"id": 1, "metric": 110} + verify_data(postgres_target, "test_multiple_state_messages_b", 6, "id", row) # TODO test that data is correct @@ -279,88 +345,65 @@ def test_multiple_schema_messages(postgres_target, caplog): def test_relational_data(postgres_target): - engine = create_engine(postgres_target) file_name = "user_location_data.singer" singer_file_to_target(file_name, postgres_target) file_name = "user_location_upsert_data.singer" singer_file_to_target(file_name, postgres_target) - schema_name = postgres_target.config["default_target_schema"] + users = [ + {"id": 1, "name": "Johny"}, + {"id": 2, "name": "George"}, + {"id": 3, "name": "Jacob"}, + {"id": 4, "name": "Josh"}, + {"id": 5, "name": "Jim"}, + {"id": 8, "name": "Thomas"}, + {"id": 12, "name": "Paul"}, + {"id": 13, "name": "Mary"}, + ] + locations = [ + {"id": 1, "name": "Philly"}, + {"id": 2, "name": "NY"}, + {"id": 3, "name": "San Francisco"}, + {"id": 6, "name": "Colorado"}, + {"id": 8, "name": "Boston"}, + ] + user_in_location = [ + { + "id": 1, + "user_id": 1, + "location_id": 4, + "info": {"weather": "rainy", "mood": "sad"}, + }, + { + "id": 2, + "user_id": 2, + "location_id": 3, + "info": {"weather": "sunny", "mood": "satisfied"}, + }, + { + "id": 3, + "user_id": 1, + "location_id": 3, + "info": {"weather": "sunny", "mood": "happy"}, + }, + { + "id": 6, + "user_id": 3, + "location_id": 2, + "info": {"weather": "sunny", "mood": "happy"}, + }, + { + "id": 14, + "user_id": 4, + "location_id": 1, + "info": {"weather": "cloudy", "mood": "ok"}, + }, + ] - with engine.connect() as connection: - expected_test_users = [ - {"id": 1, "name": "Johny"}, - {"id": 2, "name": "George"}, - {"id": 3, "name": "Jacob"}, - {"id": 4, "name": "Josh"}, - {"id": 5, "name": "Jim"}, - {"id": 8, "name": "Thomas"}, - {"id": 12, "name": "Paul"}, - {"id": 13, "name": "Mary"}, - ] - - full_table_name = f"{schema_name}.test_users" - result = connection.execute( - sqlalchemy.text(f"SELECT * FROM {full_table_name} ORDER BY id") - ) - result_dict = [remove_metadata_columns(row._asdict()) for row in result.all()] - assert result_dict == expected_test_users - - expected_test_locations = [ - {"id": 1, "name": "Philly"}, - {"id": 2, "name": "NY"}, - {"id": 3, "name": "San Francisco"}, - {"id": 6, "name": "Colorado"}, - {"id": 8, "name": "Boston"}, - ] - - full_table_name = f"{schema_name}.test_locations" - result = connection.execute( - sqlalchemy.text(f"SELECT * FROM {full_table_name} ORDER BY id") - ) - result_dict = [remove_metadata_columns(row._asdict()) for row in result.all()] - assert result_dict == expected_test_locations - - expected_test_user_in_location = [ - { - "id": 1, - "user_id": 1, - "location_id": 4, - "info": {"weather": "rainy", "mood": "sad"}, - }, - { - "id": 2, - "user_id": 2, - "location_id": 3, - "info": {"weather": "sunny", "mood": "satisfied"}, - }, - { - "id": 3, - "user_id": 1, - "location_id": 3, - "info": {"weather": "sunny", "mood": "happy"}, - }, - { - "id": 6, - "user_id": 3, - "location_id": 2, - "info": {"weather": "sunny", "mood": "happy"}, - }, - { - "id": 14, - "user_id": 4, - "location_id": 1, - "info": {"weather": "cloudy", "mood": "ok"}, - }, - ] - - full_table_name = f"{schema_name}.test_user_in_location" - result = connection.execute( - sqlalchemy.text(f"SELECT * FROM {full_table_name} ORDER BY id") - ) - result_dict = [remove_metadata_columns(row._asdict()) for row in result.all()] - assert result_dict == expected_test_user_in_location + verify_data(postgres_target, "test_users", 8, "id", users) + verify_data(postgres_target, "test_locations", 5, "id", locations) + verify_data(postgres_target, "test_user_in_location", 5, "id", user_in_location) def test_no_primary_keys(postgres_target): @@ -369,9 +412,7 @@ def test_no_primary_keys(postgres_target): table_name = "test_no_pk" full_table_name = postgres_target.config["default_target_schema"] + "." + table_name with engine.connect() as connection, connection.begin(): - result = connection.execute( - sqlalchemy.text(f"DROP TABLE IF EXISTS {full_table_name}") - ) + connection.execute(sqlalchemy.text(f"DROP TABLE IF EXISTS {full_table_name}")) file_name = f"{table_name}.singer" singer_file_to_target(file_name, postgres_target) @@ -384,10 +425,7 @@ def test_no_primary_keys(postgres_target): file_name = f"{table_name}_append.singer" singer_file_to_target(file_name, postgres_target) - # Will populate us with 22 records, we run this twice - with engine.connect() as connection: - result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name}")) - assert result.rowcount == 16 + verify_data(postgres_target, table_name, 16) def test_no_type(postgres_target): @@ -395,19 +433,20 @@ def test_no_type(postgres_target): singer_file_to_target(file_name, postgres_target) -# TODO test that data is correct def test_duplicate_records(postgres_target): file_name = "duplicate_records.singer" singer_file_to_target(file_name, postgres_target) + row = {"id": 1, "metric": 100} + verify_data(postgres_target, "test_duplicate_records", 2, "id", row) -# TODO test that data is correct def test_array_data(postgres_target): file_name = "array_data.singer" singer_file_to_target(file_name, postgres_target) + row = {"id": 1, "fruits": ["apple", "orange", "pear"]} + verify_data(postgres_target, "test_carts", 4, "id", row) -# TODO test that data is correct def test_encoded_string_data(postgres_target): """ We removed NUL characters from the original encoded_strings.singer as postgres doesn't allow them. @@ -420,6 +459,12 @@ def test_encoded_string_data(postgres_target): file_name = "encoded_strings.singer" singer_file_to_target(file_name, postgres_target) + row = {"id": 1, "info": "simple string 2837"} + verify_data(postgres_target, "test_strings", 11, "id", row) + row = {"id": 1, "info": {"name": "simple", "value": "simple string 2837"}} + verify_data(postgres_target, "test_strings_in_objects", 11, "id", row) + row = {"id": 1, "strings": ["simple string", "απλή συμβολοσειρά", "简单的字串"]} + verify_data(postgres_target, "test_strings_in_arrays", 6, "id", row) def test_tap_appl(postgres_target):