diff --git a/posthog/temporal/batch_exports/postgres_batch_export.py b/posthog/temporal/batch_exports/postgres_batch_export.py index 6a71d3d75e8ae..56d29e8371c33 100644 --- a/posthog/temporal/batch_exports/postgres_batch_export.py +++ b/posthog/temporal/batch_exports/postgres_batch_export.py @@ -625,6 +625,7 @@ async def insert_into_postgres_activity(inputs: PostgresInsertInputs) -> Records exclude_events=inputs.exclude_events, include_events=inputs.include_events, extra_query_parameters=extra_query_parameters, + use_latest_schema=True, ) record_batch_schema = await wait_for_schema_or_producer(queue, producer_task) @@ -656,8 +657,6 @@ async def insert_into_postgres_activity(inputs: PostgresInsertInputs) -> Records known_json_columns=["properties", "set", "set_once", "person_properties"], ) - schema_columns = [field[0] for field in table_fields] - requires_merge = ( isinstance(inputs.batch_export_model, BatchExportModel) and inputs.batch_export_model.name == "persons" ) @@ -672,6 +671,15 @@ async def insert_into_postgres_activity(inputs: PostgresInsertInputs) -> Records primary_key = None async with PostgreSQLClient.from_inputs(inputs).connect() as pg_client: + # handle the case where the final table doesn't contain all the fields present in the record batch schema + try: + columns = await pg_client.aget_table_columns(inputs.schema, inputs.table_name) + table_fields = [field for field in table_fields if field[0] in columns] + except psycopg.errors.UndefinedTable: + pass + + schema_columns = [field[0] for field in table_fields] + async with ( pg_client.managed_table( inputs.schema, inputs.table_name, table_fields, delete=False, primary_key=primary_key diff --git a/posthog/temporal/tests/batch_exports/test_postgres_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_postgres_batch_export_workflow.py index db2022b464e0b..ee29e739d67e3 100644 --- a/posthog/temporal/tests/batch_exports/test_postgres_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_postgres_batch_export_workflow.py @@ -48,6 +48,17 @@ pytest.mark.django_db, ] +EXPECTED_PERSONS_BATCH_EXPORT_FIELDS = [ + "team_id", + "distinct_id", + "person_id", + "properties", + "person_version", + "person_distinct_id_version", + "created_at", + "_inserted_at", +] + async def assert_clickhouse_records_in_postgres( postgres_connection, @@ -62,6 +73,7 @@ async def assert_clickhouse_records_in_postgres( include_events: list[str] | None = None, sort_key: str = "event", is_backfill: bool = False, + expected_fields: list[str] | None = None, ): """Assert expected records are written to a given PostgreSQL table. @@ -73,11 +85,6 @@ async def assert_clickhouse_records_in_postgres( 5. Cast records returned by ClickHouse to a Python list of dicts. 6. Compare each record returned by ClickHouse to each record read from PostgreSQL. - Caveats: - * Casting records to a Python list of dicts means losing some type precision. - * Reading records from ClickHouse could be hiding bugs in the `iter_records` function and related. - * `iter_records` has its own set of related unit tests to control for this. - Arguments: postgres_connection: A PostgreSQL connection used to read inserted events. clickhouse_client: A ClickHouseClient used to read events that are expected to be inserted. @@ -85,6 +92,7 @@ async def assert_clickhouse_records_in_postgres( table_name: PostgreSQL table name. team_id: The ID of the team that we are testing events for. batch_export_schema: Custom schema used in the batch export. + expected_fields: The expected fields to be exported. """ inserted_records = [] @@ -96,8 +104,10 @@ async def assert_clickhouse_records_in_postgres( event = dict(zip(columns, row)) inserted_records.append(event) - schema_column_names = [field["alias"] for field in postgres_default_fields()] - if batch_export_model is not None: + schema_column_names = ( + expected_fields if expected_fields is not None else [field["alias"] for field in postgres_default_fields()] + ) + if batch_export_model is not None and expected_fields is None: if isinstance(batch_export_model, BatchExportModel): batch_export_schema = batch_export_model.schema else: @@ -106,15 +116,7 @@ async def assert_clickhouse_records_in_postgres( if batch_export_schema is not None: schema_column_names = [field["alias"] for field in batch_export_schema["fields"]] elif isinstance(batch_export_model, BatchExportModel) and batch_export_model.name == "persons": - schema_column_names = [ - "team_id", - "distinct_id", - "person_id", - "properties", - "person_version", - "person_distinct_id_version", - "_inserted_at", - ] + schema_column_names = EXPECTED_PERSONS_BATCH_EXPORT_FIELDS expected_records = [] async for record_batch in iter_model_records( @@ -127,6 +129,7 @@ async def assert_clickhouse_records_in_postgres( include_events=include_events, destination_default_fields=postgres_default_fields(), is_backfill=is_backfill, + use_latest_schema=True, ): for record in record_batch.select(schema_column_names).to_pylist(): expected_record = {} @@ -191,6 +194,7 @@ async def postgres_connection(postgres_config, setup_postgres_test_db): dbname=postgres_config["database"], host=postgres_config["host"], port=postgres_config["port"], + autocommit=True, ) yield connection @@ -387,7 +391,18 @@ async def persons_table_without_primary_key(postgres_connection, postgres_config async with postgres_connection.cursor() as cursor: await cursor.execute( sql.SQL( - "CREATE TABLE {table} (team_id BIGINT, distinct_id TEXT, person_id TEXT, properties JSONB, person_distinct_id_version BIGINT, person_version BIGINT)" + """ + CREATE TABLE {table} ( + team_id BIGINT, + distinct_id TEXT, + person_id TEXT, + properties JSONB, + person_distinct_id_version BIGINT, + person_version BIGINT, + created_at TIMESTAMP, + updated_at TIMESTAMP + ) + """ ).format(table=sql.Identifier(postgres_config["schema"], self_managed_table_name)) ) @@ -867,3 +882,100 @@ async def never_finish_activity(_: PostgresInsertInputs) -> str: assert run.status == "Cancelled" assert run.latest_error == "Cancelled" assert run.records_completed is None + + +async def test_insert_into_postgres_activity_handles_person_schema_changes( + clickhouse_client, + activity_environment, + postgres_connection, + postgres_config, + generate_test_data, + data_interval_start, + data_interval_end, + ateam, +): + """Test that the `insert_into_postgres_activity` handles changes to the + person schema. + + If we update the schema of the persons model we export, we should still be + able to export the data without breaking existing exports. For example, any + new fields should not be added to the destination (in future we may want to + allow this but for now we don't). + + To replicate this situation we first export the data with the original + schema, then delete a column in the destination and then rerun the export. + """ + model = BatchExportModel(name="persons", schema=None) + + insert_inputs = PostgresInsertInputs( + team_id=ateam.pk, + table_name=f"test_insert_activity_migration_table__{ateam.pk}", + data_interval_start=data_interval_start.isoformat(), + data_interval_end=data_interval_end.isoformat(), + batch_export_model=model, + **postgres_config, + ) + + await activity_environment.run(insert_into_postgres_activity, insert_inputs) + + await assert_clickhouse_records_in_postgres( + postgres_connection=postgres_connection, + clickhouse_client=clickhouse_client, + schema_name=postgres_config["schema"], + table_name=f"test_insert_activity_migration_table__{ateam.pk}", + team_id=ateam.pk, + data_interval_start=data_interval_start, + data_interval_end=data_interval_end, + batch_export_model=model, + sort_key="person_id", + ) + + # Drop the created_at column from the PostgreSQL table + async with postgres_connection.transaction(): + async with postgres_connection.cursor() as cursor: + await cursor.execute( + sql.SQL("ALTER TABLE {table} DROP COLUMN created_at").format( + table=sql.Identifier(postgres_config["schema"], f"test_insert_activity_migration_table__{ateam.pk}") + ) + ) + + _, persons_to_export_created = generate_test_data + + for old_person in persons_to_export_created[: len(persons_to_export_created) // 2]: + new_person_id = uuid.uuid4() + new_person, _ = await generate_test_persons_in_clickhouse( + client=clickhouse_client, + team_id=ateam.pk, + start_time=data_interval_start, + end_time=data_interval_end, + person_id=new_person_id, + count=1, + properties={"utm_medium": "referral", "$initial_os": "Linux", "new_property": "Something"}, + ) + + await generate_test_person_distinct_id2_in_clickhouse( + clickhouse_client, + ateam.pk, + person_id=uuid.UUID(new_person[0]["id"]), + distinct_id=old_person["distinct_id"], + version=old_person["version"] + 1, + timestamp=old_person["_timestamp"], + ) + + await activity_environment.run(insert_into_postgres_activity, insert_inputs) + + # This time we don't expect there to be a created_at column + expected_fields = [field for field in EXPECTED_PERSONS_BATCH_EXPORT_FIELDS if field != "created_at"] + + await assert_clickhouse_records_in_postgres( + postgres_connection=postgres_connection, + clickhouse_client=clickhouse_client, + schema_name=postgres_config["schema"], + table_name=f"test_insert_activity_migration_table__{ateam.pk}", + team_id=ateam.pk, + data_interval_start=data_interval_start, + data_interval_end=data_interval_end, + batch_export_model=model, + sort_key="person_id", + expected_fields=expected_fields, + )