Skip to content

Commit

Permalink
fix: Track records completed in heartbeat (#26686)
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfarias authored Jan 9, 2025
1 parent fabcf0c commit d29e3b8
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 22 deletions.
7 changes: 4 additions & 3 deletions posthog/temporal/batch_exports/bigquery_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,7 @@ async def flush(
self.rows_exported_counter.add(records_since_last_flush)
self.bytes_exported_counter.add(bytes_since_last_flush)

self.heartbeat_details.records_completed += records_since_last_flush
self.heartbeat_details.track_done_range(last_date_range, self.data_interval_start)


Expand Down Expand Up @@ -639,7 +640,7 @@ async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs) -> Records

record_batch_schema = await wait_for_schema_or_producer(queue, producer_task)
if record_batch_schema is None:
return 0
return details.records_completed

record_batch_schema = pa.schema(
# NOTE: For some reason, some batches set non-nullable fields as non-nullable, whereas other
Expand Down Expand Up @@ -716,7 +717,7 @@ async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs) -> Records
bigquery_table=bigquery_stage_table if can_perform_merge else bigquery_table,
table_schema=stage_schema if can_perform_merge else schema,
)
records_completed = await run_consumer(
await run_consumer(
consumer=consumer,
queue=queue,
producer_task=producer_task,
Expand All @@ -740,7 +741,7 @@ async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs) -> Records
stage_fields_cast_to_json=json_columns,
)

return records_completed
return details.records_completed


@workflow.defn(name="bigquery-export", failure_exception_types=[workflow.NondeterminismError])
Expand Down
25 changes: 19 additions & 6 deletions posthog/temporal/batch_exports/heartbeat.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import typing
import datetime as dt
import collections.abc
import dataclasses
import datetime as dt
import typing

import structlog

from posthog.temporal.common.heartbeat import (
EmptyHeartbeatError,
HeartbeatDetails,
HeartbeatParseError,
EmptyHeartbeatError,
NotEnoughHeartbeatValuesError,
)

Expand All @@ -27,6 +27,7 @@ class BatchExportRangeHeartbeatDetails(HeartbeatDetails):
"""

done_ranges: list[DateRange] = dataclasses.field(default_factory=list)
records_completed: int = 0
_remaining: collections.abc.Sequence[typing.Any] = dataclasses.field(default_factory=tuple)

@classmethod
Expand All @@ -37,10 +38,11 @@ def deserialize_details(cls, details: collections.abc.Sequence[typing.Any]) -> d
values. Moreover, we expect datetime values to be ISO-formatted strings.
"""
done_ranges: list[DateRange] = []
records_completed = 0
remaining = super().deserialize_details(details)

if len(remaining["_remaining"]) == 0:
return {"done_ranges": done_ranges, **remaining}
return {"done_ranges": done_ranges, "records_completed": records_completed, **remaining}

first_detail = remaining["_remaining"][0]
remaining["_remaining"] = remaining["_remaining"][1:]
Expand All @@ -57,7 +59,18 @@ def deserialize_details(cls, details: collections.abc.Sequence[typing.Any]) -> d

done_ranges.append(datetime_bounds)

return {"done_ranges": done_ranges, **remaining}
if len(remaining["_remaining"]) == 0:
return {"done_ranges": done_ranges, "records_completed": records_completed, **remaining}

next_detail = remaining["_remaining"][0]
remaining["_remaining"] = remaining["_remaining"][1:]

try:
records_completed = int(next_detail)
except (TypeError, ValueError) as e:
raise HeartbeatParseError("records_completed") from e

return {"done_ranges": done_ranges, "records_completed": records_completed, **remaining}

def serialize_details(self) -> tuple[typing.Any, ...]:
"""Serialize this into a tuple.
Expand All @@ -69,7 +82,7 @@ def serialize_details(self) -> tuple[typing.Any, ...]:
(start.isoformat() if start is not None else start, end.isoformat()) for (start, end) in self.done_ranges
]
serialized_parent_details = super().serialize_details()
return (*serialized_parent_details[:-1], serialized_done_ranges, self._remaining)
return (*serialized_parent_details[:-1], serialized_done_ranges, self.records_completed, self._remaining)

@property
def empty(self) -> bool:
Expand Down
8 changes: 4 additions & 4 deletions posthog/temporal/batch_exports/postgres_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,7 @@ async def flush(
self.rows_exported_counter.add(records_since_last_flush)
self.bytes_exported_counter.add(bytes_since_last_flush)

self.heartbeat_details.records_completed += records_since_last_flush
self.heartbeat_details.track_done_range(last_date_range, self.data_interval_start)


Expand Down Expand Up @@ -604,11 +605,10 @@ async def insert_into_postgres_activity(inputs: PostgresInsertInputs) -> Records
include_events=inputs.include_events,
extra_query_parameters=extra_query_parameters,
)
records_completed = 0

record_batch_schema = await wait_for_schema_or_producer(queue, producer_task)
if record_batch_schema is None:
return records_completed
return details.records_completed

record_batch_schema = pa.schema(
[field.with_nullable(True) for field in record_batch_schema if field.name != "_inserted_at"]
Expand Down Expand Up @@ -675,7 +675,7 @@ async def insert_into_postgres_activity(inputs: PostgresInsertInputs) -> Records
postgresql_table_schema=inputs.schema,
postgresql_table_fields=schema_columns,
)
records_completed = await run_consumer(
await run_consumer(
consumer=consumer,
queue=queue,
producer_task=producer_task,
Expand Down Expand Up @@ -705,7 +705,7 @@ async def insert_into_postgres_activity(inputs: PostgresInsertInputs) -> Records
merge_key=merge_key,
)

return records_completed
return details.records_completed


@workflow.defn(name="postgres-export", failure_exception_types=[workflow.NondeterminismError])
Expand Down
8 changes: 5 additions & 3 deletions posthog/temporal/batch_exports/redshift_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ async def flush(
self.rows_exported_counter.add(records_since_last_flush)
self.bytes_exported_counter.add(bytes_since_last_flush)

self.heartbeat_details.records_completed += records_since_last_flush
self.heartbeat_details.track_done_range(last_date_range, self.data_interval_start)


Expand Down Expand Up @@ -404,9 +405,10 @@ async def insert_into_redshift_activity(inputs: RedshiftInsertInputs) -> Records
extra_query_parameters=extra_query_parameters,
max_record_batch_size_bytes=1024 * 1024 * 2, # 2MB
)

record_batch_schema = await wait_for_schema_or_producer(queue, producer_task)
if record_batch_schema is None:
return 0
return details.records_completed

record_batch_schema = pa.schema(
[field.with_nullable(True) for field in record_batch_schema if field.name != "_inserted_at"]
Expand Down Expand Up @@ -474,7 +476,7 @@ async def insert_into_redshift_activity(inputs: RedshiftInsertInputs) -> Records
redshift_client=redshift_client,
redshift_table=redshift_stage_table if requires_merge else redshift_table,
)
records_completed = await run_consumer(
await run_consumer(
consumer=consumer,
queue=queue,
producer_task=producer_task,
Expand Down Expand Up @@ -504,7 +506,7 @@ async def insert_into_redshift_activity(inputs: RedshiftInsertInputs) -> Records
merge_key=merge_key,
)

return records_completed
return details.records_completed


@workflow.defn(name="redshift-export", failure_exception_types=[workflow.NondeterminismError])
Expand Down
8 changes: 4 additions & 4 deletions posthog/temporal/batch_exports/s3_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,7 @@ async def flush(
else:
self.heartbeat_details.append_upload_state(self.s3_upload.to_state())

self.heartbeat_details.records_completed += records_since_last_flush
self.heartbeat_details.track_done_range(last_date_range, self.data_interval_start)

async def close(self):
Expand Down Expand Up @@ -772,11 +773,10 @@ async def insert_into_s3_activity(inputs: S3InsertInputs) -> RecordsCompleted:
max_record_batch_size_bytes=1024 * 1024 * 10, # 10MB
use_latest_schema=True,
)
records_completed = 0

record_batch_schema = await wait_for_schema_or_producer(queue, producer_task)
if record_batch_schema is None:
return records_completed
return details.records_completed

record_batch_schema = pa.schema(
# NOTE: For some reason, some batches set non-nullable fields as non-nullable, whereas other
Expand All @@ -796,7 +796,7 @@ async def insert_into_s3_activity(inputs: S3InsertInputs) -> RecordsCompleted:
s3_upload=s3_upload,
s3_inputs=inputs,
)
records_completed = await run_consumer(
await run_consumer(
consumer=consumer,
queue=queue,
producer_task=producer_task,
Expand All @@ -807,7 +807,7 @@ async def insert_into_s3_activity(inputs: S3InsertInputs) -> RecordsCompleted:
max_file_size_bytes=inputs.max_file_size_mb * 1024 * 1024 if inputs.max_file_size_mb else 0,
)

return records_completed
return details.records_completed


@workflow.defn(name="s3-export", failure_exception_types=[workflow.NondeterminismError])
Expand Down
1 change: 1 addition & 0 deletions posthog/temporal/batch_exports/snowflake_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,7 @@ async def flush(
self.rows_exported_counter.add(records_since_last_flush)
self.bytes_exported_counter.add(bytes_since_last_flush)

self.heartbeat_details.records_completed += records_since_last_flush
self.heartbeat_details.track_done_range(last_date_range, self.data_interval_start)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1633,8 +1633,11 @@ def capture_heartbeat_details(*details):
@pytest.mark.parametrize(
"details",
[
([(dt.datetime.now().isoformat(), dt.datetime.now().isoformat())], 1),
([(dt.datetime.now().isoformat(), dt.datetime.now().isoformat())],),
([(dt.datetime.now().isoformat(), dt.datetime.now().isoformat())], 10, 1),
(
[(dt.datetime.now().isoformat(), dt.datetime.now().isoformat())],
10,
),
],
)
def test_snowflake_heartbeat_details_parses_from_tuple(details):
Expand Down

0 comments on commit d29e3b8

Please sign in to comment.