Skip to content

Commit

Permalink
feat(batch-exports): Add created_at to Postgres persons batch export (
Browse files Browse the repository at this point in the history
  • Loading branch information
rossgray authored Jan 10, 2025
1 parent 4c3cb69 commit 45ded54
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 19 deletions.
12 changes: 10 additions & 2 deletions posthog/temporal/batch_exports/postgres_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,7 @@ async def insert_into_postgres_activity(inputs: PostgresInsertInputs) -> Records
exclude_events=inputs.exclude_events,
include_events=inputs.include_events,
extra_query_parameters=extra_query_parameters,
use_latest_schema=True,
)

record_batch_schema = await wait_for_schema_or_producer(queue, producer_task)
Expand Down Expand Up @@ -656,8 +657,6 @@ async def insert_into_postgres_activity(inputs: PostgresInsertInputs) -> Records
known_json_columns=["properties", "set", "set_once", "person_properties"],
)

schema_columns = [field[0] for field in table_fields]

requires_merge = (
isinstance(inputs.batch_export_model, BatchExportModel) and inputs.batch_export_model.name == "persons"
)
Expand All @@ -672,6 +671,15 @@ async def insert_into_postgres_activity(inputs: PostgresInsertInputs) -> Records
primary_key = None

async with PostgreSQLClient.from_inputs(inputs).connect() as pg_client:
# handle the case where the final table doesn't contain all the fields present in the record batch schema
try:
columns = await pg_client.aget_table_columns(inputs.schema, inputs.table_name)
table_fields = [field for field in table_fields if field[0] in columns]
except psycopg.errors.UndefinedTable:
pass

schema_columns = [field[0] for field in table_fields]

async with (
pg_client.managed_table(
inputs.schema, inputs.table_name, table_fields, delete=False, primary_key=primary_key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,17 @@
pytest.mark.django_db,
]

EXPECTED_PERSONS_BATCH_EXPORT_FIELDS = [
"team_id",
"distinct_id",
"person_id",
"properties",
"person_version",
"person_distinct_id_version",
"created_at",
"_inserted_at",
]


async def assert_clickhouse_records_in_postgres(
postgres_connection,
Expand All @@ -62,6 +73,7 @@ async def assert_clickhouse_records_in_postgres(
include_events: list[str] | None = None,
sort_key: str = "event",
is_backfill: bool = False,
expected_fields: list[str] | None = None,
):
"""Assert expected records are written to a given PostgreSQL table.
Expand All @@ -73,18 +85,14 @@ async def assert_clickhouse_records_in_postgres(
5. Cast records returned by ClickHouse to a Python list of dicts.
6. Compare each record returned by ClickHouse to each record read from PostgreSQL.
Caveats:
* Casting records to a Python list of dicts means losing some type precision.
* Reading records from ClickHouse could be hiding bugs in the `iter_records` function and related.
* `iter_records` has its own set of related unit tests to control for this.
Arguments:
postgres_connection: A PostgreSQL connection used to read inserted events.
clickhouse_client: A ClickHouseClient used to read events that are expected to be inserted.
schema_name: PostgreSQL schema name.
table_name: PostgreSQL table name.
team_id: The ID of the team that we are testing events for.
batch_export_schema: Custom schema used in the batch export.
expected_fields: The expected fields to be exported.
"""
inserted_records = []

Expand All @@ -96,8 +104,10 @@ async def assert_clickhouse_records_in_postgres(
event = dict(zip(columns, row))
inserted_records.append(event)

schema_column_names = [field["alias"] for field in postgres_default_fields()]
if batch_export_model is not None:
schema_column_names = (
expected_fields if expected_fields is not None else [field["alias"] for field in postgres_default_fields()]
)
if batch_export_model is not None and expected_fields is None:
if isinstance(batch_export_model, BatchExportModel):
batch_export_schema = batch_export_model.schema
else:
Expand All @@ -106,15 +116,7 @@ async def assert_clickhouse_records_in_postgres(
if batch_export_schema is not None:
schema_column_names = [field["alias"] for field in batch_export_schema["fields"]]
elif isinstance(batch_export_model, BatchExportModel) and batch_export_model.name == "persons":
schema_column_names = [
"team_id",
"distinct_id",
"person_id",
"properties",
"person_version",
"person_distinct_id_version",
"_inserted_at",
]
schema_column_names = EXPECTED_PERSONS_BATCH_EXPORT_FIELDS

expected_records = []
async for record_batch in iter_model_records(
Expand All @@ -127,6 +129,7 @@ async def assert_clickhouse_records_in_postgres(
include_events=include_events,
destination_default_fields=postgres_default_fields(),
is_backfill=is_backfill,
use_latest_schema=True,
):
for record in record_batch.select(schema_column_names).to_pylist():
expected_record = {}
Expand Down Expand Up @@ -191,6 +194,7 @@ async def postgres_connection(postgres_config, setup_postgres_test_db):
dbname=postgres_config["database"],
host=postgres_config["host"],
port=postgres_config["port"],
autocommit=True,
)

yield connection
Expand Down Expand Up @@ -387,7 +391,18 @@ async def persons_table_without_primary_key(postgres_connection, postgres_config
async with postgres_connection.cursor() as cursor:
await cursor.execute(
sql.SQL(
"CREATE TABLE {table} (team_id BIGINT, distinct_id TEXT, person_id TEXT, properties JSONB, person_distinct_id_version BIGINT, person_version BIGINT)"
"""
CREATE TABLE {table} (
team_id BIGINT,
distinct_id TEXT,
person_id TEXT,
properties JSONB,
person_distinct_id_version BIGINT,
person_version BIGINT,
created_at TIMESTAMP,
updated_at TIMESTAMP
)
"""
).format(table=sql.Identifier(postgres_config["schema"], self_managed_table_name))
)

Expand Down Expand Up @@ -867,3 +882,100 @@ async def never_finish_activity(_: PostgresInsertInputs) -> str:
assert run.status == "Cancelled"
assert run.latest_error == "Cancelled"
assert run.records_completed is None


async def test_insert_into_postgres_activity_handles_person_schema_changes(
clickhouse_client,
activity_environment,
postgres_connection,
postgres_config,
generate_test_data,
data_interval_start,
data_interval_end,
ateam,
):
"""Test that the `insert_into_postgres_activity` handles changes to the
person schema.
If we update the schema of the persons model we export, we should still be
able to export the data without breaking existing exports. For example, any
new fields should not be added to the destination (in future we may want to
allow this but for now we don't).
To replicate this situation we first export the data with the original
schema, then delete a column in the destination and then rerun the export.
"""
model = BatchExportModel(name="persons", schema=None)

insert_inputs = PostgresInsertInputs(
team_id=ateam.pk,
table_name=f"test_insert_activity_migration_table__{ateam.pk}",
data_interval_start=data_interval_start.isoformat(),
data_interval_end=data_interval_end.isoformat(),
batch_export_model=model,
**postgres_config,
)

await activity_environment.run(insert_into_postgres_activity, insert_inputs)

await assert_clickhouse_records_in_postgres(
postgres_connection=postgres_connection,
clickhouse_client=clickhouse_client,
schema_name=postgres_config["schema"],
table_name=f"test_insert_activity_migration_table__{ateam.pk}",
team_id=ateam.pk,
data_interval_start=data_interval_start,
data_interval_end=data_interval_end,
batch_export_model=model,
sort_key="person_id",
)

# Drop the created_at column from the PostgreSQL table
async with postgres_connection.transaction():
async with postgres_connection.cursor() as cursor:
await cursor.execute(
sql.SQL("ALTER TABLE {table} DROP COLUMN created_at").format(
table=sql.Identifier(postgres_config["schema"], f"test_insert_activity_migration_table__{ateam.pk}")
)
)

_, persons_to_export_created = generate_test_data

for old_person in persons_to_export_created[: len(persons_to_export_created) // 2]:
new_person_id = uuid.uuid4()
new_person, _ = await generate_test_persons_in_clickhouse(
client=clickhouse_client,
team_id=ateam.pk,
start_time=data_interval_start,
end_time=data_interval_end,
person_id=new_person_id,
count=1,
properties={"utm_medium": "referral", "$initial_os": "Linux", "new_property": "Something"},
)

await generate_test_person_distinct_id2_in_clickhouse(
clickhouse_client,
ateam.pk,
person_id=uuid.UUID(new_person[0]["id"]),
distinct_id=old_person["distinct_id"],
version=old_person["version"] + 1,
timestamp=old_person["_timestamp"],
)

await activity_environment.run(insert_into_postgres_activity, insert_inputs)

# This time we don't expect there to be a created_at column
expected_fields = [field for field in EXPECTED_PERSONS_BATCH_EXPORT_FIELDS if field != "created_at"]

await assert_clickhouse_records_in_postgres(
postgres_connection=postgres_connection,
clickhouse_client=clickhouse_client,
schema_name=postgres_config["schema"],
table_name=f"test_insert_activity_migration_table__{ateam.pk}",
team_id=ateam.pk,
data_interval_start=data_interval_start,
data_interval_end=data_interval_end,
batch_export_model=model,
sort_key="person_id",
expected_fields=expected_fields,
)

0 comments on commit 45ded54

Please sign in to comment.