diff --git a/target_postgres/tests/test_standard_target.py b/target_postgres/tests/test_standard_target.py index 5bf3c497..e6ae8e2a 100644 --- a/target_postgres/tests/test_standard_target.py +++ b/target_postgres/tests/test_standard_target.py @@ -4,6 +4,7 @@ import io import uuid from contextlib import redirect_stdout +from decimal import Decimal from pathlib import Path import jsonschema @@ -116,6 +117,42 @@ def remove_metadata_columns(row: dict) -> dict: return new_row +def verify_data( + target: TargetPostgres, + table_name: str, + number_of_rows: int = 1, + primary_key: str | None = None, + first_row: dict | None = None, +): + """Checks whether the data in a table matches a provided data sample. + + Args: + target: The target to obtain a database connection from. + full_table_name: The schema and table name of the table to check data for. + primary_key: The primary key of the table table. + number_of_rows: The expected number of rows that should be in the table. + first_row: A dictionary representing the full contents of the first row in the + table, as determined by lowest primary_key value. + """ + engine = create_engine(target) + full_table_name = f"{target.config['default_target_schema']}.{table_name}" + with engine.connect() as connection: + if primary_key is not None and first_row is not None: + result = connection.execute( + sqlalchemy.text( + f"SELECT * FROM {full_table_name} ORDER BY {primary_key}" + ) + ) + assert result.rowcount == number_of_rows + result_dict = remove_metadata_columns(result.first()._asdict()) + assert result_dict == first_row + else: + result = connection.execute( + sqlalchemy.text(f"SELECT COUNT(*) FROM {full_table_name}") + ) + assert result.first()[0] == number_of_rows + + def test_sqlalchemy_url_config(postgres_config_no_ssl): """Be sure that passing a sqlalchemy_url works @@ -230,10 +267,11 @@ def test_special_chars_in_attributes(postgres_target): singer_file_to_target(file_name, postgres_target) -# TODO test that data is correctly set def test_optional_attributes(postgres_target): file_name = "optional_attributes.singer" singer_file_to_target(file_name, postgres_target) + row = {"id": 1, "optional": "This is optional"} + verify_data(postgres_target, "test_optional_attributes", 4, "id", row) def test_schema_no_properties(postgres_target): @@ -242,101 +280,49 @@ def test_schema_no_properties(postgres_target): singer_file_to_target(file_name, postgres_target) -# TODO test that data is correct def test_schema_updates(postgres_target): file_name = "schema_updates.singer" singer_file_to_target(file_name, postgres_target) + row = { + "id": 1, + "a1": Decimal("101"), + "a2": "string1", + "a3": None, + "a4": None, + "a5": None, + "a6": None, + } + verify_data(postgres_target, "test_schema_updates", 6, "id", row) -# TODO test that data is correct def test_multiple_state_messages(postgres_target): file_name = "multiple_state_messages.singer" singer_file_to_target(file_name, postgres_target) + row = {"id": 1, "metric": 100} + verify_data(postgres_target, "test_multiple_state_messages_a", 6, "id", row) + row = {"id": 1, "metric": 110} + verify_data(postgres_target, "test_multiple_state_messages_b", 6, "id", row) def test_relational_data(postgres_target): - engine = create_engine(postgres_target) file_name = "user_location_data.singer" singer_file_to_target(file_name, postgres_target) file_name = "user_location_upsert_data.singer" singer_file_to_target(file_name, postgres_target) - schema_name = postgres_target.config["default_target_schema"] + user = {"id": 1, "name": "Johny"} + location = {"id": 1, "name": "Philly"} + user_in_location = { + "id": 1, + "user_id": 1, + "location_id": 4, + "info": {"weather": "rainy", "mood": "sad"}, + } - with engine.connect() as connection: - expected_test_users = [ - {"id": 1, "name": "Johny"}, - {"id": 2, "name": "George"}, - {"id": 3, "name": "Jacob"}, - {"id": 4, "name": "Josh"}, - {"id": 5, "name": "Jim"}, - {"id": 8, "name": "Thomas"}, - {"id": 12, "name": "Paul"}, - {"id": 13, "name": "Mary"}, - ] - - full_table_name = f"{schema_name}.test_users" - result = connection.execute( - sqlalchemy.text(f"SELECT * FROM {full_table_name} ORDER BY id") - ) - result_dict = [remove_metadata_columns(row._asdict()) for row in result.all()] - assert result_dict == expected_test_users - - expected_test_locations = [ - {"id": 1, "name": "Philly"}, - {"id": 2, "name": "NY"}, - {"id": 3, "name": "San Francisco"}, - {"id": 6, "name": "Colorado"}, - {"id": 8, "name": "Boston"}, - ] - - full_table_name = f"{schema_name}.test_locations" - result = connection.execute( - sqlalchemy.text(f"SELECT * FROM {full_table_name} ORDER BY id") - ) - result_dict = [remove_metadata_columns(row._asdict()) for row in result.all()] - assert result_dict == expected_test_locations - - expected_test_user_in_location = [ - { - "id": 1, - "user_id": 1, - "location_id": 4, - "info": {"weather": "rainy", "mood": "sad"}, - }, - { - "id": 2, - "user_id": 2, - "location_id": 3, - "info": {"weather": "sunny", "mood": "satisfied"}, - }, - { - "id": 3, - "user_id": 1, - "location_id": 3, - "info": {"weather": "sunny", "mood": "happy"}, - }, - { - "id": 6, - "user_id": 3, - "location_id": 2, - "info": {"weather": "sunny", "mood": "happy"}, - }, - { - "id": 14, - "user_id": 4, - "location_id": 1, - "info": {"weather": "cloudy", "mood": "ok"}, - }, - ] - - full_table_name = f"{schema_name}.test_user_in_location" - result = connection.execute( - sqlalchemy.text(f"SELECT * FROM {full_table_name} ORDER BY id") - ) - result_dict = [remove_metadata_columns(row._asdict()) for row in result.all()] - assert result_dict == expected_test_user_in_location + verify_data(postgres_target, "test_users", 8, "id", user) + verify_data(postgres_target, "test_locations", 5, "id", location) + verify_data(postgres_target, "test_user_in_location", 5, "id", user_in_location) def test_no_primary_keys(postgres_target): @@ -345,9 +331,7 @@ def test_no_primary_keys(postgres_target): table_name = "test_no_pk" full_table_name = postgres_target.config["default_target_schema"] + "." + table_name with engine.connect() as connection, connection.begin(): - result = connection.execute( - sqlalchemy.text(f"DROP TABLE IF EXISTS {full_table_name}") - ) + connection.execute(sqlalchemy.text(f"DROP TABLE IF EXISTS {full_table_name}")) file_name = f"{table_name}.singer" singer_file_to_target(file_name, postgres_target) @@ -360,10 +344,7 @@ def test_no_primary_keys(postgres_target): file_name = f"{table_name}_append.singer" singer_file_to_target(file_name, postgres_target) - # Will populate us with 22 records, we run this twice - with engine.connect() as connection: - result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name}")) - assert result.rowcount == 16 + verify_data(postgres_target, table_name, 16) def test_no_type(postgres_target): @@ -371,19 +352,20 @@ def test_no_type(postgres_target): singer_file_to_target(file_name, postgres_target) -# TODO test that data is correct def test_duplicate_records(postgres_target): file_name = "duplicate_records.singer" singer_file_to_target(file_name, postgres_target) + row = {"id": 1, "metric": 100} + verify_data(postgres_target, "test_duplicate_records", 2, "id", row) -# TODO test that data is correct def test_array_data(postgres_target): file_name = "array_data.singer" singer_file_to_target(file_name, postgres_target) + row = {"id": 1, "fruits": ["apple", "orange", "pear"]} + verify_data(postgres_target, "test_carts", 4, "id", row) -# TODO test that data is correct def test_encoded_string_data(postgres_target): """ We removed NUL characters from the original encoded_strings.singer as postgres doesn't allow them. @@ -396,6 +378,12 @@ def test_encoded_string_data(postgres_target): file_name = "encoded_strings.singer" singer_file_to_target(file_name, postgres_target) + row = {"id": 1, "info": "simple string 2837"} + verify_data(postgres_target, "test_strings", 11, "id", row) + row = {"id": 1, "info": {"name": "simple", "value": "simple string 2837"}} + verify_data(postgres_target, "test_strings_in_objects", 11, "id", row) + row = {"id": 1, "strings": ["simple string", "απλή συμβολοσειρά", "简单的字串"]} + verify_data(postgres_target, "test_strings_in_arrays", 6, "id", row) def test_tap_appl(postgres_target):