diff --git a/posthog/temporal/batch_exports/postgres_batch_export.py b/posthog/temporal/batch_exports/postgres_batch_export.py index 5d9bc88f7bd29..458b4cb1a6f1b 100644 --- a/posthog/temporal/batch_exports/postgres_batch_export.py +++ b/posthog/temporal/batch_exports/postgres_batch_export.py @@ -55,6 +55,11 @@ class PostgreSQLConnectionError(Exception): pass +class MissingPrimaryKeyError(Exception): + def __init__(self, table: sql.Identifier, primary_key: sql.Composed): + super().__init__(f"An operation could not be completed as '{table}' is missing a primary key on {primary_key}") + + @dataclasses.dataclass class PostgresInsertInputs: """Inputs for Postgres insert activity.""" @@ -346,7 +351,10 @@ async def amerge_person_tables( await cursor.execute(sql.SQL("SET search_path TO {schema}").format(schema=sql.Identifier(schema))) await cursor.execute("SET TRANSACTION READ WRITE") - await cursor.execute(merge_query) + try: + await cursor.execute(merge_query) + except psycopg.errors.InvalidColumnReference: + raise MissingPrimaryKeyError(final_table_identifier, conflict_fields) async def copy_tsv_to_postgres( self, @@ -598,6 +606,7 @@ async def flush_to_postgres( ("team_id", "INT"), ("distinct_id", "TEXT"), ) + await pg_client.amerge_person_tables( final_table_name=pg_table, stage_table_name=pg_stage_table, @@ -704,6 +713,8 @@ async def run(self, inputs: PostgresBatchExportInputs): "DiskFull", # Raised by our PostgreSQL client when failing to connect after several attempts. "PostgreSQLConnectionError", + # Raised when merging without a primary key. + "MissingPrimaryKeyError", ], finish_inputs=finish_inputs, ) diff --git a/posthog/temporal/tests/batch_exports/test_postgres_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_postgres_batch_export_workflow.py index 19831d2d6d636..56e4c91464032 100644 --- a/posthog/temporal/tests/batch_exports/test_postgres_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_postgres_batch_export_workflow.py @@ -23,6 +23,7 @@ start_batch_export_run, ) from posthog.temporal.batch_exports.postgres_batch_export import ( + MissingPrimaryKeyError, PostgresBatchExportInputs, PostgresBatchExportWorkflow, PostgresInsertInputs, @@ -362,7 +363,73 @@ async def test_insert_into_postgres_activity_merges_data_in_follow_up_runs( @pytest.fixture def table_name(ateam, interval): - return f"test_workflow_table_{ateam.pk}_{interval}" + return f"test_table_{ateam.pk}_{interval}" + + +@pytest_asyncio.fixture +async def persons_table_without_primary_key(postgres_connection, postgres_config, table_name): + """Managed a table for a persons batch export without a primary key.""" + self_managed_table_name = table_name + f"_self_managed_{uuid.uuid4().hex}" + + async with postgres_connection.transaction(): + async with postgres_connection.cursor() as cursor: + await cursor.execute( + sql.SQL( + "CREATE TABLE {table} (team_id BIGINT, distinct_id TEXT, person_id TEXT, properties JSONB, person_distinct_id_version BIGINT, person_version BIGINT)" + ).format(table=sql.Identifier(postgres_config["schema"], self_managed_table_name)) + ) + + yield self_managed_table_name + + async with postgres_connection.transaction(): + async with postgres_connection.cursor() as cursor: + await cursor.execute( + sql.SQL("DROP TABLE IF EXISTS {table}").format( + table=sql.Identifier(postgres_config["schema"], self_managed_table_name) + ) + ) + + +@pytest.mark.parametrize("model", [BatchExportModel(name="persons", schema=None)]) +async def test_insert_into_postgres_activity_inserts_fails_on_missing_primary_key( + activity_environment, + postgres_config, + exclude_events, + model: BatchExportModel | BatchExportSchema | None, + data_interval_start, + data_interval_end, + ateam, + generate_test_data, + persons_table_without_primary_key, +): + """Test the insert_into_postgres_activity function fails when missing a primary key. + + We use a self-managed, previously created postgresql table to export persons data to. + Since this table does not have a primary key, the merge query should fail. + + This error should only occur if the table is created outside the batch export. + """ + batch_export_schema: BatchExportSchema | None = None + batch_export_model: BatchExportModel | None = None + if isinstance(model, BatchExportModel): + batch_export_model = model + elif model is not None: + batch_export_schema = model + + insert_inputs = PostgresInsertInputs( + team_id=ateam.pk, + table_name=persons_table_without_primary_key, + data_interval_start=data_interval_start.isoformat(), + data_interval_end=data_interval_end.isoformat(), + exclude_events=exclude_events, + batch_export_schema=batch_export_schema, + batch_export_model=batch_export_model, + **postgres_config, + ) + + with pytest.raises(MissingPrimaryKeyError): + with override_settings(BATCH_EXPORT_POSTGRES_UPLOAD_CHUNK_SIZE_BYTES=5 * 1024**2): + await activity_environment.run(insert_into_postgres_activity, insert_inputs) @pytest_asyncio.fixture