Skip to content

Commit

Permalink
fix: Raise exception on missing primary key (#27126)
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfarias authored Dec 23, 2024
1 parent d45d9ff commit e7d0a4c
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 2 deletions.
13 changes: 12 additions & 1 deletion posthog/temporal/batch_exports/postgres_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ class PostgreSQLConnectionError(Exception):
pass


class MissingPrimaryKeyError(Exception):
def __init__(self, table: sql.Identifier, primary_key: sql.Composed):
super().__init__(f"An operation could not be completed as '{table}' is missing a primary key on {primary_key}")


@dataclasses.dataclass
class PostgresInsertInputs:
"""Inputs for Postgres insert activity."""
Expand Down Expand Up @@ -346,7 +351,10 @@ async def amerge_person_tables(
await cursor.execute(sql.SQL("SET search_path TO {schema}").format(schema=sql.Identifier(schema)))
await cursor.execute("SET TRANSACTION READ WRITE")

await cursor.execute(merge_query)
try:
await cursor.execute(merge_query)
except psycopg.errors.InvalidColumnReference:
raise MissingPrimaryKeyError(final_table_identifier, conflict_fields)

async def copy_tsv_to_postgres(
self,
Expand Down Expand Up @@ -598,6 +606,7 @@ async def flush_to_postgres(
("team_id", "INT"),
("distinct_id", "TEXT"),
)

await pg_client.amerge_person_tables(
final_table_name=pg_table,
stage_table_name=pg_stage_table,
Expand Down Expand Up @@ -704,6 +713,8 @@ async def run(self, inputs: PostgresBatchExportInputs):
"DiskFull",
# Raised by our PostgreSQL client when failing to connect after several attempts.
"PostgreSQLConnectionError",
# Raised when merging without a primary key.
"MissingPrimaryKeyError",
],
finish_inputs=finish_inputs,
)
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
start_batch_export_run,
)
from posthog.temporal.batch_exports.postgres_batch_export import (
MissingPrimaryKeyError,
PostgresBatchExportInputs,
PostgresBatchExportWorkflow,
PostgresInsertInputs,
Expand Down Expand Up @@ -362,7 +363,73 @@ async def test_insert_into_postgres_activity_merges_data_in_follow_up_runs(

@pytest.fixture
def table_name(ateam, interval):
return f"test_workflow_table_{ateam.pk}_{interval}"
return f"test_table_{ateam.pk}_{interval}"


@pytest_asyncio.fixture
async def persons_table_without_primary_key(postgres_connection, postgres_config, table_name):
"""Managed a table for a persons batch export without a primary key."""
self_managed_table_name = table_name + f"_self_managed_{uuid.uuid4().hex}"

async with postgres_connection.transaction():
async with postgres_connection.cursor() as cursor:
await cursor.execute(
sql.SQL(
"CREATE TABLE {table} (team_id BIGINT, distinct_id TEXT, person_id TEXT, properties JSONB, person_distinct_id_version BIGINT, person_version BIGINT)"
).format(table=sql.Identifier(postgres_config["schema"], self_managed_table_name))
)

yield self_managed_table_name

async with postgres_connection.transaction():
async with postgres_connection.cursor() as cursor:
await cursor.execute(
sql.SQL("DROP TABLE IF EXISTS {table}").format(
table=sql.Identifier(postgres_config["schema"], self_managed_table_name)
)
)


@pytest.mark.parametrize("model", [BatchExportModel(name="persons", schema=None)])
async def test_insert_into_postgres_activity_inserts_fails_on_missing_primary_key(
activity_environment,
postgres_config,
exclude_events,
model: BatchExportModel | BatchExportSchema | None,
data_interval_start,
data_interval_end,
ateam,
generate_test_data,
persons_table_without_primary_key,
):
"""Test the insert_into_postgres_activity function fails when missing a primary key.
We use a self-managed, previously created postgresql table to export persons data to.
Since this table does not have a primary key, the merge query should fail.
This error should only occur if the table is created outside the batch export.
"""
batch_export_schema: BatchExportSchema | None = None
batch_export_model: BatchExportModel | None = None
if isinstance(model, BatchExportModel):
batch_export_model = model
elif model is not None:
batch_export_schema = model

insert_inputs = PostgresInsertInputs(
team_id=ateam.pk,
table_name=persons_table_without_primary_key,
data_interval_start=data_interval_start.isoformat(),
data_interval_end=data_interval_end.isoformat(),
exclude_events=exclude_events,
batch_export_schema=batch_export_schema,
batch_export_model=batch_export_model,
**postgres_config,
)

with pytest.raises(MissingPrimaryKeyError):
with override_settings(BATCH_EXPORT_POSTGRES_UPLOAD_CHUNK_SIZE_BYTES=5 * 1024**2):
await activity_environment.run(insert_into_postgres_activity, insert_inputs)


@pytest_asyncio.fixture
Expand Down

0 comments on commit e7d0a4c

Please sign in to comment.