diff --git a/dags/deletes.py b/dags/deletes.py index 04ee38706aef1..2f46fcfe7e0b6 100644 --- a/dags/deletes.py +++ b/dags/deletes.py @@ -24,8 +24,9 @@ NodeRole, get_cluster, ) -from posthog.models.event.sql import EVENTS_DATA_TABLE from posthog.models.async_deletion import AsyncDeletion, DeletionType +from posthog.models.event.sql import EVENTS_DATA_TABLE +from posthog.models.person.sql import PERSON_DISTINCT_ID_OVERRIDES_TABLE class ClickhouseClusterResource(ConfigurableResource): @@ -266,14 +267,33 @@ def delete_mutation_runner(self) -> MutationRunner: ) +@op +def get_oldest_person_override_timestamp( + cluster: ResourceParam[ClickhouseCluster], +) -> datetime: + """Get the oldest person override timestamp from the person_distinct_id_overrides table.""" + + query = f""" + SELECT min(_timestamp) FROM {PERSON_DISTINCT_ID_OVERRIDES_TABLE} + """ + [[result]] = cluster.any_host_by_role(lambda client: client.execute(query), NodeRole.WORKER).result() + return result + + @op def create_pending_person_deletions_table( config: DeleteConfig, cluster: ResourceParam[ClickhouseCluster], + oldest_person_override_timestamp: datetime, ) -> PendingPersonEventDeletesTable: - """Create a merge tree table in ClickHouse to store pending deletes.""" + """ + Create a merge tree table in ClickHouse to store pending deletes. + + Important to note: we only get pending deletions for requests that happened before the oldest person override timestamp. + """ + table = PendingPersonEventDeletesTable( - timestamp=config.parsed_timestamp, + timestamp=oldest_person_override_timestamp, team_id=config.team_id, cluster=settings.CLICKHOUSE_CLUSTER, ) @@ -448,6 +468,7 @@ def count_pending_deletes(client: Client) -> int: .items() ) } + return (load_and_verify_deletes_dictionary, shard_mutations) @@ -504,8 +525,9 @@ def cleanup_delete_assets( @job def deletes_job(): """Job that handles deletion of person events.""" + oldest_override_timestamp = get_oldest_person_override_timestamp() report_person_table = create_reporting_pending_person_deletions_table() - person_table = create_pending_person_deletions_table() + person_table = create_pending_person_deletions_table(oldest_override_timestamp) loaded_person_table = load_pending_person_deletions(person_table) create_deletes_dict_op = create_deletes_dict(loaded_person_table) load_dict = load_and_verify_deletes_dictionary(create_deletes_dict_op) diff --git a/dags/person_overrides.py b/dags/person_overrides.py index 60684b0e549de..955ff478414a4 100644 --- a/dags/person_overrides.py +++ b/dags/person_overrides.py @@ -404,5 +404,4 @@ def squash_person_overrides(): dictionary_after_override_delete_mutations = wait_for_overrides_delete_mutations( start_overrides_delete_mutations(dictionary_after_person_id_update_mutations) ) - drop_snapshot_table(drop_snapshot_dictionary(dictionary_after_override_delete_mutations)) diff --git a/dags/tests/test_deletes.py b/dags/tests/test_deletes.py index 5f857c0783c28..72d66d54ea11b 100644 --- a/dags/tests/test_deletes.py +++ b/dags/tests/test_deletes.py @@ -5,15 +5,16 @@ import pytest from clickhouse_driver import Client - from django.conf import settings from dags.deletes import ( deletes_job, PendingPersonEventDeletesTable, PendingDeletesDictionary, ) + from posthog.clickhouse.cluster import ClickhouseCluster, get_cluster from posthog.models.async_deletion import AsyncDeletion, DeletionType +from posthog.models.person.sql import PERSON_DISTINCT_ID_OVERRIDES_TABLE @pytest.fixture @@ -23,8 +24,10 @@ def cluster(django_db_setup) -> Iterator[ClickhouseCluster]: @pytest.mark.django_db def test_full_job(cluster: ClickhouseCluster): - timestamp = datetime.now() + timedelta(days=31) - hour_delay = 31 * 24 + timestamp = (datetime.now() + timedelta(days=31)).replace( + microsecond=0 + ) # we don't freeze time because we are namespaced by time + hour_delay = 745 # 31 * 24 event_count = 10000 delete_count = 1000 @@ -45,6 +48,21 @@ def insert_events(client: Client) -> None: cluster.any_host(insert_events).result() + def get_oldest_override_timestamp(client: Client) -> datetime: + result = client.execute(f"SELECT min(_timestamp) FROM {PERSON_DISTINCT_ID_OVERRIDES_TABLE}") + if not result or result[0][0] is None: + return datetime.max + return result[0][0] + + # Insert some person overrides - we need this to establish high watermark for pending deletes + def insert_overrides(client: Client) -> None: + client.execute( + "INSERT INTO person_distinct_id_overrides (distinct_id, person_id, _timestamp, version) VALUES", + [(f"{i}", UUID(int=i), timestamp - timedelta(hours=i), 1) for i in range(hour_delay)], + ) + + cluster.any_host(insert_overrides).result() + def get_events_by_person_team(client: Client) -> dict[tuple[int, UUID], int]: result = client.execute("SELECT team_id, person_id, count(1) FROM writable_events GROUP BY team_id, person_id") if not isinstance(result, list): @@ -56,13 +74,15 @@ def insert_pending_deletes() -> None: deletes = [(events[i][0], DeletionType.Person, events[i][2], None) for i in range(delete_count)] # insert the deletes into django - for delete in deletes: - AsyncDeletion.objects.create( + for i, delete in enumerate(deletes): + d = AsyncDeletion.objects.create( team_id=delete[0], deletion_type=delete[1], key=delete[2], delete_verified_at=delete[3], - ).save() + ) + d.created_at = events[i][3] + timedelta(hours=1) + d.save() insert_pending_deletes() @@ -76,6 +96,10 @@ def get_pending_deletes() -> list[AsyncDeletion]: pending_deletes = get_pending_deletes() assert len(pending_deletes) == delete_count + # Check overrides is correct + oldest_override_timestamp = cluster.any_host(get_oldest_override_timestamp).result() + assert oldest_override_timestamp == timestamp - timedelta(hours=hour_delay - 1) + # Run the deletion job deletes_job.execute_in_process( run_config={"ops": {"create_pending_person_deletions_table": {"config": {"timestamp": timestamp.isoformat()}}}}, @@ -84,22 +108,29 @@ def get_pending_deletes() -> list[AsyncDeletion]: # Check postconditions final_events = cluster.any_host(get_events_by_person_team).result() - assert len(final_events) == event_count - 256 # Only events for non-deleted persons remain + assert len(final_events) == event_count - (delete_count - hour_delay) # Only events for non-deleted persons remain # Check that events after the deletion window remain - target_uuid = UUID(int=hour_delay - 1) + target_uuid = UUID(int=hour_delay - 2) assert any( target_uuid == uuid for _, uuid in final_events.keys() ), f"Expected to find UUID {target_uuid} in remaining events" # Check that early events were deleted - deleted_uuid = UUID(int=hour_delay + 1) + deleted_uuid = UUID(int=hour_delay + 2) assert not any( deleted_uuid == uuid for _, uuid in final_events.keys() ), f"Expected UUID {deleted_uuid} to be deleted" - # Verify that the deletions have been marked verified - assert all(deletion.delete_verified_at is not None for deletion in AsyncDeletion.objects.all()) + # Verify that the deletions before oldest override timestamp have been marked verified + pre_override_deletions = AsyncDeletion.objects.filter(created_at__lte=oldest_override_timestamp) + assert len(pre_override_deletions) == delete_count - hour_delay + assert all(deletion.delete_verified_at is not None for deletion in pre_override_deletions) + + # Verify that the deletions after oldest override timestamp have not been marked verified + post_override_deletions = AsyncDeletion.objects.filter(created_at__gt=oldest_override_timestamp) + assert len(post_override_deletions) == hour_delay + assert not all(deletion.delete_verified_at is not None for deletion in post_override_deletions) # Verify the temporary tables were cleaned up table = PendingPersonEventDeletesTable(timestamp=timestamp)