Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Snapshot behavior in new_record mode for records already marked as deleted #655

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20250124-001152.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Fix Snapshot to avoid inserting redundant entries for records already marked as deleted
time: 2025-01-24T00:11:52.499074243+01:00
custom:
Author: lpillmann
Issue: "654"
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,13 @@
delete from {database}.{schema}.seed where id = 1
"""

# If the deletion worked correctly, this should return two rows, with one of them representing the deletion.
# SQL to insert a record back into the snapshot source data with a new updated_at value
_insert_sql = """
insert into {database}.{schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values
(1, 'Judith', 'Kennedy', '(not provided)', 'Female', '54.60.24.128', '2016-01-01 12:19:28');
"""

# SQL to fetch the snapshotted entries of the record being used in deletion tests
_delete_check_sql = """
select dbt_valid_to, dbt_scd_id, dbt_is_deleted from {schema}.snapshot_actual where id = 1
"""
Expand Down Expand Up @@ -216,6 +222,10 @@ def update_sql(self):
def delete_sql(self):
return _delete_sql

@pytest.fixture(scope="class")
def insert_sql(self):
return _insert_sql

def test_snapshot_new_record_mode(
self, project, seed_new_record_mode, invalidate_sql, update_sql
):
Expand All @@ -234,7 +244,7 @@ def test_snapshot_new_record_mode(
project.run_sql(_delete_sql)

results = run_dbt(["snapshot"])
assert len(results) == 1
assert len(results) == 2

check_result = project.run_sql(_delete_check_sql, fetch="all")
valid_to = 0
Expand Down Expand Up @@ -264,3 +274,29 @@ def test_snapshot_new_record_mode(
== 1
)
assert check_result[0][scd_id] != check_result[1][scd_id]

# run snapshot with the same source data; neither insert or update should happen
run_dbt(["snapshot"])
assert len(results) == 0
check_result = project.run_sql(_delete_check_sql, fetch="all")
assert len(check_result) == 2

# insert the record back and run the snapshot again; update and insert expected
project.run_sql(_insert_sql)
results = run_dbt(["snapshot"])
assert len(results) == 2
check_result = project.run_sql(_delete_check_sql, fetch="all")
assert len(check_result) == 3

# delete it once again and run the snapshot; update and insert expected
project.run_sql(_delete_sql)
results = run_dbt(["snapshot"])
assert len(results) == 2
check_result = project.run_sql(_delete_check_sql, fetch="all")
assert len(check_result) == 4

# run snapshot with the same source data; neither insert or update should happen
results = run_dbt(["snapshot"])
assert len(results) == 0
check_result = project.run_sql(_delete_check_sql, fetch="all")
assert len(check_result) == 4
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,14 @@
left join deletes_source_data as source_data
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
where {{ unique_key_is_null(strategy.unique_key, "source_data") }}

{%- if strategy.hard_deletes == 'new_record' %}
and not (
--avoid updating the record's valid_to if the latest entry is marked as deleted
snapshotted_data.{{ columns.dbt_is_deleted }}
and snapshotted_data.{{ columns.dbt_valid_to }} is null
)
{%- endif %}
)
{%- endif %}

Expand Down Expand Up @@ -177,6 +185,11 @@
left join deletes_source_data as source_data
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
where {{ unique_key_is_null(strategy.unique_key, "source_data") }}
and not (
--avoid inserting a new record if the latest one is marked as deleted
snapshotted_data.{{ columns.dbt_is_deleted }}
and snapshotted_data.{{ columns.dbt_valid_to }} is null
)

)
{%- endif %}
Expand Down
Loading