Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: make delete dag dependent on current most recent squash job results #28243

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
30 changes: 26 additions & 4 deletions dags/deletes.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@
NodeRole,
get_cluster,
)
from posthog.models.event.sql import EVENTS_DATA_TABLE
from posthog.models.async_deletion import AsyncDeletion, DeletionType
from posthog.models.event.sql import EVENTS_DATA_TABLE
from posthog.models.person.sql import PERSON_DISTINCT_ID_OVERRIDES_TABLE


class ClickhouseClusterResource(ConfigurableResource):
Expand Down Expand Up @@ -266,14 +267,33 @@ def delete_mutation_runner(self) -> MutationRunner:
)


@op
def get_oldest_person_override_timestamp(
cluster: ResourceParam[ClickhouseCluster],
) -> datetime:
"""Get the oldest person override timestamp from the person_distinct_id_overrides table."""

query = f"""
SELECT min(_timestamp) FROM {PERSON_DISTINCT_ID_OVERRIDES_TABLE}
"""
Comment on lines +276 to +278
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: query could return NULL if table is empty, which would cause a runtime error when unpacking result

[[result]] = cluster.any_host_by_role(lambda client: client.execute(query), NodeRole.WORKER).result()
return result


@op
def create_pending_person_deletions_table(
config: DeleteConfig,
cluster: ResourceParam[ClickhouseCluster],
oldest_person_override_timestamp: datetime,
) -> PendingPersonEventDeletesTable:
"""Create a merge tree table in ClickHouse to store pending deletes."""
"""
Create a merge tree table in ClickHouse to store pending deletes.

Important to note: we only get pending deletions for requests that happened before the oldest person override timestamp.
"""

table = PendingPersonEventDeletesTable(
timestamp=config.parsed_timestamp,
timestamp=oldest_person_override_timestamp,
team_id=config.team_id,
cluster=settings.CLICKHOUSE_CLUSTER,
)
Expand Down Expand Up @@ -448,6 +468,7 @@ def count_pending_deletes(client: Client) -> int:
.items()
)
}

return (load_and_verify_deletes_dictionary, shard_mutations)


Expand Down Expand Up @@ -504,8 +525,9 @@ def cleanup_delete_assets(
@job
def deletes_job():
"""Job that handles deletion of person events."""
oldest_override_timestamp = get_oldest_person_override_timestamp()
report_person_table = create_reporting_pending_person_deletions_table()
person_table = create_pending_person_deletions_table()
person_table = create_pending_person_deletions_table(oldest_override_timestamp)
loaded_person_table = load_pending_person_deletions(person_table)
create_deletes_dict_op = create_deletes_dict(loaded_person_table)
load_dict = load_and_verify_deletes_dictionary(create_deletes_dict_op)
Expand Down
1 change: 0 additions & 1 deletion dags/person_overrides.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,5 +404,4 @@ def squash_person_overrides():
dictionary_after_override_delete_mutations = wait_for_overrides_delete_mutations(
start_overrides_delete_mutations(dictionary_after_person_id_update_mutations)
)

drop_snapshot_table(drop_snapshot_dictionary(dictionary_after_override_delete_mutations))
53 changes: 42 additions & 11 deletions dags/tests/test_deletes.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@

import pytest
from clickhouse_driver import Client

from django.conf import settings
from dags.deletes import (
deletes_job,
PendingPersonEventDeletesTable,
PendingDeletesDictionary,
)

from posthog.clickhouse.cluster import ClickhouseCluster, get_cluster
from posthog.models.async_deletion import AsyncDeletion, DeletionType
from posthog.models.person.sql import PERSON_DISTINCT_ID_OVERRIDES_TABLE


@pytest.fixture
Expand All @@ -23,8 +24,10 @@ def cluster(django_db_setup) -> Iterator[ClickhouseCluster]:

@pytest.mark.django_db
def test_full_job(cluster: ClickhouseCluster):
timestamp = datetime.now() + timedelta(days=31)
hour_delay = 31 * 24
timestamp = (datetime.now() + timedelta(days=31)).replace(
microsecond=0
) # we don't freeze time because we are namespaced by time
hour_delay = 745 # 31 * 24
fuziontech marked this conversation as resolved.
Show resolved Hide resolved
event_count = 10000
delete_count = 1000

Expand All @@ -45,6 +48,21 @@ def insert_events(client: Client) -> None:

cluster.any_host(insert_events).result()

def get_oldest_override_timestamp(client: Client) -> datetime:
result = client.execute(f"SELECT min(_timestamp) FROM {PERSON_DISTINCT_ID_OVERRIDES_TABLE}")
if not result or result[0][0] is None:
return datetime.max
return result[0][0]

# Insert some person overrides - we need this to establish high watermark for pending deletes
def insert_overrides(client: Client) -> None:
client.execute(
"INSERT INTO person_distinct_id_overrides (distinct_id, person_id, _timestamp, version) VALUES",
[(f"{i}", UUID(int=i), timestamp - timedelta(hours=i), 1) for i in range(hour_delay)],
)

cluster.any_host(insert_overrides).result()

def get_events_by_person_team(client: Client) -> dict[tuple[int, UUID], int]:
result = client.execute("SELECT team_id, person_id, count(1) FROM writable_events GROUP BY team_id, person_id")
if not isinstance(result, list):
Expand All @@ -56,13 +74,15 @@ def insert_pending_deletes() -> None:
deletes = [(events[i][0], DeletionType.Person, events[i][2], None) for i in range(delete_count)]

# insert the deletes into django
for delete in deletes:
AsyncDeletion.objects.create(
for i, delete in enumerate(deletes):
d = AsyncDeletion.objects.create(
team_id=delete[0],
deletion_type=delete[1],
key=delete[2],
delete_verified_at=delete[3],
).save()
)
d.created_at = events[i][3] + timedelta(hours=1)
d.save()

insert_pending_deletes()

Expand All @@ -76,6 +96,10 @@ def get_pending_deletes() -> list[AsyncDeletion]:
pending_deletes = get_pending_deletes()
assert len(pending_deletes) == delete_count

# Check overrides is correct
oldest_override_timestamp = cluster.any_host(get_oldest_override_timestamp).result()
assert oldest_override_timestamp == timestamp - timedelta(hours=hour_delay - 1)

# Run the deletion job
deletes_job.execute_in_process(
run_config={"ops": {"create_pending_person_deletions_table": {"config": {"timestamp": timestamp.isoformat()}}}},
Expand All @@ -84,22 +108,29 @@ def get_pending_deletes() -> list[AsyncDeletion]:

# Check postconditions
final_events = cluster.any_host(get_events_by_person_team).result()
assert len(final_events) == event_count - 256 # Only events for non-deleted persons remain
assert len(final_events) == event_count - (delete_count - hour_delay) # Only events for non-deleted persons remain
fuziontech marked this conversation as resolved.
Show resolved Hide resolved

# Check that events after the deletion window remain
target_uuid = UUID(int=hour_delay - 1)
target_uuid = UUID(int=hour_delay - 2)
assert any(
target_uuid == uuid for _, uuid in final_events.keys()
), f"Expected to find UUID {target_uuid} in remaining events"

# Check that early events were deleted
deleted_uuid = UUID(int=hour_delay + 1)
deleted_uuid = UUID(int=hour_delay + 2)
assert not any(
deleted_uuid == uuid for _, uuid in final_events.keys()
), f"Expected UUID {deleted_uuid} to be deleted"

# Verify that the deletions have been marked verified
assert all(deletion.delete_verified_at is not None for deletion in AsyncDeletion.objects.all())
# Verify that the deletions before oldest override timestamp have been marked verified
pre_override_deletions = AsyncDeletion.objects.filter(created_at__lte=oldest_override_timestamp)
assert len(pre_override_deletions) == delete_count - hour_delay
assert all(deletion.delete_verified_at is not None for deletion in pre_override_deletions)

# Verify that the deletions after oldest override timestamp have not been marked verified
post_override_deletions = AsyncDeletion.objects.filter(created_at__gt=oldest_override_timestamp)
assert len(post_override_deletions) == hour_delay
assert not all(deletion.delete_verified_at is not None for deletion in post_override_deletions)

# Verify the temporary tables were cleaned up
table = PendingPersonEventDeletesTable(timestamp=timestamp)
Expand Down
Loading