From d29e3b8afbec73337b119381e8a37ff50580e70d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Thu, 9 Jan 2025 15:09:08 +0100 Subject: [PATCH] fix: Track records completed in heartbeat (#26686) --- .../batch_exports/bigquery_batch_export.py | 7 +++--- posthog/temporal/batch_exports/heartbeat.py | 25 ++++++++++++++----- .../batch_exports/postgres_batch_export.py | 8 +++--- .../batch_exports/redshift_batch_export.py | 8 +++--- .../temporal/batch_exports/s3_batch_export.py | 8 +++--- .../batch_exports/snowflake_batch_export.py | 1 + .../test_snowflake_batch_export_workflow.py | 7 ++++-- 7 files changed, 42 insertions(+), 22 deletions(-) diff --git a/posthog/temporal/batch_exports/bigquery_batch_export.py b/posthog/temporal/batch_exports/bigquery_batch_export.py index 5aa3965b5a8bd..91b0885c932da 100644 --- a/posthog/temporal/batch_exports/bigquery_batch_export.py +++ b/posthog/temporal/batch_exports/bigquery_batch_export.py @@ -563,6 +563,7 @@ async def flush( self.rows_exported_counter.add(records_since_last_flush) self.bytes_exported_counter.add(bytes_since_last_flush) + self.heartbeat_details.records_completed += records_since_last_flush self.heartbeat_details.track_done_range(last_date_range, self.data_interval_start) @@ -639,7 +640,7 @@ async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs) -> Records record_batch_schema = await wait_for_schema_or_producer(queue, producer_task) if record_batch_schema is None: - return 0 + return details.records_completed record_batch_schema = pa.schema( # NOTE: For some reason, some batches set non-nullable fields as non-nullable, whereas other @@ -716,7 +717,7 @@ async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs) -> Records bigquery_table=bigquery_stage_table if can_perform_merge else bigquery_table, table_schema=stage_schema if can_perform_merge else schema, ) - records_completed = await run_consumer( + await run_consumer( consumer=consumer, queue=queue, producer_task=producer_task, @@ -740,7 +741,7 @@ async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs) -> Records stage_fields_cast_to_json=json_columns, ) - return records_completed + return details.records_completed @workflow.defn(name="bigquery-export", failure_exception_types=[workflow.NondeterminismError]) diff --git a/posthog/temporal/batch_exports/heartbeat.py b/posthog/temporal/batch_exports/heartbeat.py index fdd21d0613eee..a873507be2be9 100644 --- a/posthog/temporal/batch_exports/heartbeat.py +++ b/posthog/temporal/batch_exports/heartbeat.py @@ -1,14 +1,14 @@ -import typing -import datetime as dt import collections.abc import dataclasses +import datetime as dt +import typing import structlog from posthog.temporal.common.heartbeat import ( + EmptyHeartbeatError, HeartbeatDetails, HeartbeatParseError, - EmptyHeartbeatError, NotEnoughHeartbeatValuesError, ) @@ -27,6 +27,7 @@ class BatchExportRangeHeartbeatDetails(HeartbeatDetails): """ done_ranges: list[DateRange] = dataclasses.field(default_factory=list) + records_completed: int = 0 _remaining: collections.abc.Sequence[typing.Any] = dataclasses.field(default_factory=tuple) @classmethod @@ -37,10 +38,11 @@ def deserialize_details(cls, details: collections.abc.Sequence[typing.Any]) -> d values. Moreover, we expect datetime values to be ISO-formatted strings. """ done_ranges: list[DateRange] = [] + records_completed = 0 remaining = super().deserialize_details(details) if len(remaining["_remaining"]) == 0: - return {"done_ranges": done_ranges, **remaining} + return {"done_ranges": done_ranges, "records_completed": records_completed, **remaining} first_detail = remaining["_remaining"][0] remaining["_remaining"] = remaining["_remaining"][1:] @@ -57,7 +59,18 @@ def deserialize_details(cls, details: collections.abc.Sequence[typing.Any]) -> d done_ranges.append(datetime_bounds) - return {"done_ranges": done_ranges, **remaining} + if len(remaining["_remaining"]) == 0: + return {"done_ranges": done_ranges, "records_completed": records_completed, **remaining} + + next_detail = remaining["_remaining"][0] + remaining["_remaining"] = remaining["_remaining"][1:] + + try: + records_completed = int(next_detail) + except (TypeError, ValueError) as e: + raise HeartbeatParseError("records_completed") from e + + return {"done_ranges": done_ranges, "records_completed": records_completed, **remaining} def serialize_details(self) -> tuple[typing.Any, ...]: """Serialize this into a tuple. @@ -69,7 +82,7 @@ def serialize_details(self) -> tuple[typing.Any, ...]: (start.isoformat() if start is not None else start, end.isoformat()) for (start, end) in self.done_ranges ] serialized_parent_details = super().serialize_details() - return (*serialized_parent_details[:-1], serialized_done_ranges, self._remaining) + return (*serialized_parent_details[:-1], serialized_done_ranges, self.records_completed, self._remaining) @property def empty(self) -> bool: diff --git a/posthog/temporal/batch_exports/postgres_batch_export.py b/posthog/temporal/batch_exports/postgres_batch_export.py index a8f0213abf1e8..3c70e5b747dea 100644 --- a/posthog/temporal/batch_exports/postgres_batch_export.py +++ b/posthog/temporal/batch_exports/postgres_batch_export.py @@ -534,6 +534,7 @@ async def flush( self.rows_exported_counter.add(records_since_last_flush) self.bytes_exported_counter.add(bytes_since_last_flush) + self.heartbeat_details.records_completed += records_since_last_flush self.heartbeat_details.track_done_range(last_date_range, self.data_interval_start) @@ -604,11 +605,10 @@ async def insert_into_postgres_activity(inputs: PostgresInsertInputs) -> Records include_events=inputs.include_events, extra_query_parameters=extra_query_parameters, ) - records_completed = 0 record_batch_schema = await wait_for_schema_or_producer(queue, producer_task) if record_batch_schema is None: - return records_completed + return details.records_completed record_batch_schema = pa.schema( [field.with_nullable(True) for field in record_batch_schema if field.name != "_inserted_at"] @@ -675,7 +675,7 @@ async def insert_into_postgres_activity(inputs: PostgresInsertInputs) -> Records postgresql_table_schema=inputs.schema, postgresql_table_fields=schema_columns, ) - records_completed = await run_consumer( + await run_consumer( consumer=consumer, queue=queue, producer_task=producer_task, @@ -705,7 +705,7 @@ async def insert_into_postgres_activity(inputs: PostgresInsertInputs) -> Records merge_key=merge_key, ) - return records_completed + return details.records_completed @workflow.defn(name="postgres-export", failure_exception_types=[workflow.NondeterminismError]) diff --git a/posthog/temporal/batch_exports/redshift_batch_export.py b/posthog/temporal/batch_exports/redshift_batch_export.py index 9c0fc8b9119b5..7abbbc885e09f 100644 --- a/posthog/temporal/batch_exports/redshift_batch_export.py +++ b/posthog/temporal/batch_exports/redshift_batch_export.py @@ -306,6 +306,7 @@ async def flush( self.rows_exported_counter.add(records_since_last_flush) self.bytes_exported_counter.add(bytes_since_last_flush) + self.heartbeat_details.records_completed += records_since_last_flush self.heartbeat_details.track_done_range(last_date_range, self.data_interval_start) @@ -404,9 +405,10 @@ async def insert_into_redshift_activity(inputs: RedshiftInsertInputs) -> Records extra_query_parameters=extra_query_parameters, max_record_batch_size_bytes=1024 * 1024 * 2, # 2MB ) + record_batch_schema = await wait_for_schema_or_producer(queue, producer_task) if record_batch_schema is None: - return 0 + return details.records_completed record_batch_schema = pa.schema( [field.with_nullable(True) for field in record_batch_schema if field.name != "_inserted_at"] @@ -474,7 +476,7 @@ async def insert_into_redshift_activity(inputs: RedshiftInsertInputs) -> Records redshift_client=redshift_client, redshift_table=redshift_stage_table if requires_merge else redshift_table, ) - records_completed = await run_consumer( + await run_consumer( consumer=consumer, queue=queue, producer_task=producer_task, @@ -504,7 +506,7 @@ async def insert_into_redshift_activity(inputs: RedshiftInsertInputs) -> Records merge_key=merge_key, ) - return records_completed + return details.records_completed @workflow.defn(name="redshift-export", failure_exception_types=[workflow.NondeterminismError]) diff --git a/posthog/temporal/batch_exports/s3_batch_export.py b/posthog/temporal/batch_exports/s3_batch_export.py index 6b32afeba9bb1..812ac9adc68a1 100644 --- a/posthog/temporal/batch_exports/s3_batch_export.py +++ b/posthog/temporal/batch_exports/s3_batch_export.py @@ -603,6 +603,7 @@ async def flush( else: self.heartbeat_details.append_upload_state(self.s3_upload.to_state()) + self.heartbeat_details.records_completed += records_since_last_flush self.heartbeat_details.track_done_range(last_date_range, self.data_interval_start) async def close(self): @@ -772,11 +773,10 @@ async def insert_into_s3_activity(inputs: S3InsertInputs) -> RecordsCompleted: max_record_batch_size_bytes=1024 * 1024 * 10, # 10MB use_latest_schema=True, ) - records_completed = 0 record_batch_schema = await wait_for_schema_or_producer(queue, producer_task) if record_batch_schema is None: - return records_completed + return details.records_completed record_batch_schema = pa.schema( # NOTE: For some reason, some batches set non-nullable fields as non-nullable, whereas other @@ -796,7 +796,7 @@ async def insert_into_s3_activity(inputs: S3InsertInputs) -> RecordsCompleted: s3_upload=s3_upload, s3_inputs=inputs, ) - records_completed = await run_consumer( + await run_consumer( consumer=consumer, queue=queue, producer_task=producer_task, @@ -807,7 +807,7 @@ async def insert_into_s3_activity(inputs: S3InsertInputs) -> RecordsCompleted: max_file_size_bytes=inputs.max_file_size_mb * 1024 * 1024 if inputs.max_file_size_mb else 0, ) - return records_completed + return details.records_completed @workflow.defn(name="s3-export", failure_exception_types=[workflow.NondeterminismError]) diff --git a/posthog/temporal/batch_exports/snowflake_batch_export.py b/posthog/temporal/batch_exports/snowflake_batch_export.py index 0e3046a371048..ebdea7749e014 100644 --- a/posthog/temporal/batch_exports/snowflake_batch_export.py +++ b/posthog/temporal/batch_exports/snowflake_batch_export.py @@ -546,6 +546,7 @@ async def flush( self.rows_exported_counter.add(records_since_last_flush) self.bytes_exported_counter.add(bytes_since_last_flush) + self.heartbeat_details.records_completed += records_since_last_flush self.heartbeat_details.track_done_range(last_date_range, self.data_interval_start) diff --git a/posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py index 116a118ece249..cb6f352cb1d6f 100644 --- a/posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py @@ -1633,8 +1633,11 @@ def capture_heartbeat_details(*details): @pytest.mark.parametrize( "details", [ - ([(dt.datetime.now().isoformat(), dt.datetime.now().isoformat())], 1), - ([(dt.datetime.now().isoformat(), dt.datetime.now().isoformat())],), + ([(dt.datetime.now().isoformat(), dt.datetime.now().isoformat())], 10, 1), + ( + [(dt.datetime.now().isoformat(), dt.datetime.now().isoformat())], + 10, + ), ], ) def test_snowflake_heartbeat_details_parses_from_tuple(details):