Skip to content

Commit

Permalink
feat: Verify data in tests (row count and first row) (#215)
Browse files Browse the repository at this point in the history
Implement a method to verify data in tests. Currently only checks the
total number of rows in a table and the data in the first row. I believe
this provides good test coverage while keeping things manageable.

Closes #16
  • Loading branch information
sebastianswms authored Nov 27, 2023
1 parent 937cb8a commit 840181e
Showing 1 changed file with 132 additions and 87 deletions.
219 changes: 132 additions & 87 deletions target_postgres/tests/test_standard_target.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io
import uuid
from contextlib import redirect_stdout
from decimal import Decimal
from pathlib import Path

import jsonschema
Expand Down Expand Up @@ -116,6 +117,57 @@ def remove_metadata_columns(row: dict) -> dict:
return new_row


def verify_data(
target: TargetPostgres,
table_name: str,
number_of_rows: int = 1,
primary_key: str | None = None,
check_data: dict | list[dict] | None = None,
):
"""Checks whether the data in a table matches a provided data sample.
Args:
target: The target to obtain a database connection from.
full_table_name: The schema and table name of the table to check data for.
primary_key: The primary key of the table.
number_of_rows: The expected number of rows that should be in the table.
check_data: A dictionary representing the full contents of the first row in the
table, as determined by lowest primary_key value, or else a list of
dictionaries representing every row in the table.
"""
engine = create_engine(target)
full_table_name = f"{target.config['default_target_schema']}.{table_name}"
with engine.connect() as connection:
if primary_key is not None and check_data is not None:
if isinstance(check_data, dict):
result = connection.execute(
sqlalchemy.text(
f"SELECT * FROM {full_table_name} ORDER BY {primary_key}"
)
)
assert result.rowcount == number_of_rows
result_dict = remove_metadata_columns(result.first()._asdict())
assert result_dict == check_data
elif isinstance(check_data, list):
result = connection.execute(
sqlalchemy.text(
f"SELECT * FROM {full_table_name} ORDER BY {primary_key}"
)
)
assert result.rowcount == number_of_rows
result_dict = [
remove_metadata_columns(row._asdict()) for row in result.all()
]
assert result_dict == check_data
else:
raise ValueError("Invalid check_data - not dict or list of dicts")
else:
result = connection.execute(
sqlalchemy.text(f"SELECT COUNT(*) FROM {full_table_name}")
)
assert result.first()[0] == number_of_rows


def test_sqlalchemy_url_config(postgres_config_no_ssl):
"""Be sure that passing a sqlalchemy_url works
Expand Down Expand Up @@ -230,10 +282,11 @@ def test_special_chars_in_attributes(postgres_target):
singer_file_to_target(file_name, postgres_target)


# TODO test that data is correctly set
def test_optional_attributes(postgres_target):
file_name = "optional_attributes.singer"
singer_file_to_target(file_name, postgres_target)
row = {"id": 1, "optional": "This is optional"}
verify_data(postgres_target, "test_optional_attributes", 4, "id", row)


def test_schema_no_properties(postgres_target):
Expand All @@ -256,12 +309,25 @@ def test_large_numeric_primary_key(postgres_target):
def test_schema_updates(postgres_target):
file_name = "schema_updates.singer"
singer_file_to_target(file_name, postgres_target)
row = {
"id": 1,
"a1": Decimal("101"),
"a2": "string1",
"a3": None,
"a4": None,
"a5": None,
"a6": None,
}
verify_data(postgres_target, "test_schema_updates", 6, "id", row)


# TODO test that data is correct
def test_multiple_state_messages(postgres_target):
file_name = "multiple_state_messages.singer"
singer_file_to_target(file_name, postgres_target)
row = {"id": 1, "metric": 100}
verify_data(postgres_target, "test_multiple_state_messages_a", 6, "id", row)
row = {"id": 1, "metric": 110}
verify_data(postgres_target, "test_multiple_state_messages_b", 6, "id", row)


# TODO test that data is correct
Expand All @@ -279,88 +345,65 @@ def test_multiple_schema_messages(postgres_target, caplog):


def test_relational_data(postgres_target):
engine = create_engine(postgres_target)
file_name = "user_location_data.singer"
singer_file_to_target(file_name, postgres_target)

file_name = "user_location_upsert_data.singer"
singer_file_to_target(file_name, postgres_target)

schema_name = postgres_target.config["default_target_schema"]
users = [
{"id": 1, "name": "Johny"},
{"id": 2, "name": "George"},
{"id": 3, "name": "Jacob"},
{"id": 4, "name": "Josh"},
{"id": 5, "name": "Jim"},
{"id": 8, "name": "Thomas"},
{"id": 12, "name": "Paul"},
{"id": 13, "name": "Mary"},
]
locations = [
{"id": 1, "name": "Philly"},
{"id": 2, "name": "NY"},
{"id": 3, "name": "San Francisco"},
{"id": 6, "name": "Colorado"},
{"id": 8, "name": "Boston"},
]
user_in_location = [
{
"id": 1,
"user_id": 1,
"location_id": 4,
"info": {"weather": "rainy", "mood": "sad"},
},
{
"id": 2,
"user_id": 2,
"location_id": 3,
"info": {"weather": "sunny", "mood": "satisfied"},
},
{
"id": 3,
"user_id": 1,
"location_id": 3,
"info": {"weather": "sunny", "mood": "happy"},
},
{
"id": 6,
"user_id": 3,
"location_id": 2,
"info": {"weather": "sunny", "mood": "happy"},
},
{
"id": 14,
"user_id": 4,
"location_id": 1,
"info": {"weather": "cloudy", "mood": "ok"},
},
]

with engine.connect() as connection:
expected_test_users = [
{"id": 1, "name": "Johny"},
{"id": 2, "name": "George"},
{"id": 3, "name": "Jacob"},
{"id": 4, "name": "Josh"},
{"id": 5, "name": "Jim"},
{"id": 8, "name": "Thomas"},
{"id": 12, "name": "Paul"},
{"id": 13, "name": "Mary"},
]

full_table_name = f"{schema_name}.test_users"
result = connection.execute(
sqlalchemy.text(f"SELECT * FROM {full_table_name} ORDER BY id")
)
result_dict = [remove_metadata_columns(row._asdict()) for row in result.all()]
assert result_dict == expected_test_users

expected_test_locations = [
{"id": 1, "name": "Philly"},
{"id": 2, "name": "NY"},
{"id": 3, "name": "San Francisco"},
{"id": 6, "name": "Colorado"},
{"id": 8, "name": "Boston"},
]

full_table_name = f"{schema_name}.test_locations"
result = connection.execute(
sqlalchemy.text(f"SELECT * FROM {full_table_name} ORDER BY id")
)
result_dict = [remove_metadata_columns(row._asdict()) for row in result.all()]
assert result_dict == expected_test_locations

expected_test_user_in_location = [
{
"id": 1,
"user_id": 1,
"location_id": 4,
"info": {"weather": "rainy", "mood": "sad"},
},
{
"id": 2,
"user_id": 2,
"location_id": 3,
"info": {"weather": "sunny", "mood": "satisfied"},
},
{
"id": 3,
"user_id": 1,
"location_id": 3,
"info": {"weather": "sunny", "mood": "happy"},
},
{
"id": 6,
"user_id": 3,
"location_id": 2,
"info": {"weather": "sunny", "mood": "happy"},
},
{
"id": 14,
"user_id": 4,
"location_id": 1,
"info": {"weather": "cloudy", "mood": "ok"},
},
]

full_table_name = f"{schema_name}.test_user_in_location"
result = connection.execute(
sqlalchemy.text(f"SELECT * FROM {full_table_name} ORDER BY id")
)
result_dict = [remove_metadata_columns(row._asdict()) for row in result.all()]
assert result_dict == expected_test_user_in_location
verify_data(postgres_target, "test_users", 8, "id", users)
verify_data(postgres_target, "test_locations", 5, "id", locations)
verify_data(postgres_target, "test_user_in_location", 5, "id", user_in_location)


def test_no_primary_keys(postgres_target):
Expand All @@ -369,9 +412,7 @@ def test_no_primary_keys(postgres_target):
table_name = "test_no_pk"
full_table_name = postgres_target.config["default_target_schema"] + "." + table_name
with engine.connect() as connection, connection.begin():
result = connection.execute(
sqlalchemy.text(f"DROP TABLE IF EXISTS {full_table_name}")
)
connection.execute(sqlalchemy.text(f"DROP TABLE IF EXISTS {full_table_name}"))
file_name = f"{table_name}.singer"
singer_file_to_target(file_name, postgres_target)

Expand All @@ -384,30 +425,28 @@ def test_no_primary_keys(postgres_target):
file_name = f"{table_name}_append.singer"
singer_file_to_target(file_name, postgres_target)

# Will populate us with 22 records, we run this twice
with engine.connect() as connection:
result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name}"))
assert result.rowcount == 16
verify_data(postgres_target, table_name, 16)


def test_no_type(postgres_target):
file_name = "test_no_type.singer"
singer_file_to_target(file_name, postgres_target)


# TODO test that data is correct
def test_duplicate_records(postgres_target):
file_name = "duplicate_records.singer"
singer_file_to_target(file_name, postgres_target)
row = {"id": 1, "metric": 100}
verify_data(postgres_target, "test_duplicate_records", 2, "id", row)


# TODO test that data is correct
def test_array_data(postgres_target):
file_name = "array_data.singer"
singer_file_to_target(file_name, postgres_target)
row = {"id": 1, "fruits": ["apple", "orange", "pear"]}
verify_data(postgres_target, "test_carts", 4, "id", row)


# TODO test that data is correct
def test_encoded_string_data(postgres_target):
"""
We removed NUL characters from the original encoded_strings.singer as postgres doesn't allow them.
Expand All @@ -420,6 +459,12 @@ def test_encoded_string_data(postgres_target):

file_name = "encoded_strings.singer"
singer_file_to_target(file_name, postgres_target)
row = {"id": 1, "info": "simple string 2837"}
verify_data(postgres_target, "test_strings", 11, "id", row)
row = {"id": 1, "info": {"name": "simple", "value": "simple string 2837"}}
verify_data(postgres_target, "test_strings_in_objects", 11, "id", row)
row = {"id": 1, "strings": ["simple string", "απλή συμβολοσειρά", "简单的字串"]}
verify_data(postgres_target, "test_strings_in_arrays", 6, "id", row)


def test_tap_appl(postgres_target):
Expand Down

0 comments on commit 840181e

Please sign in to comment.