Skip to content

Commit

Permalink
Add heartbeat
Browse files Browse the repository at this point in the history
  • Loading branch information
rossgray committed Dec 10, 2024
1 parent 29ed769 commit 5f2e8ed
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 19 deletions.
7 changes: 0 additions & 7 deletions posthog/batch_exports/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,6 @@ class Status(models.TextChoices):
records_total_count = models.IntegerField(
null=True, help_text="The total count of records that should be exported in this BatchExportRun."
)
expected_records_count = models.IntegerField(
null=True,
help_text=(
"The total count of records that we expect to be exported in this BatchExportRun. "
"This is updated automatically post-batch export by a monitoring job."
),
)

@property
def workflow_id(self) -> str:
Expand Down
26 changes: 14 additions & 12 deletions posthog/temporal/batch_exports/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from posthog.batch_exports.sql import EVENT_COUNT_BY_INTERVAL
from posthog.temporal.batch_exports.base import PostHogWorkflow
from posthog.temporal.common.clickhouse import get_client
from posthog.temporal.common.heartbeat import Heartbeater


class BatchExportNotFoundError(Exception):
Expand Down Expand Up @@ -41,6 +42,7 @@ class BatchExportMonitoringInputs:
@dataclass
class BatchExportDetails:
id: UUID
team_id: int
interval: str
exclude_events: list[str]
include_events: list[str]
Expand Down Expand Up @@ -69,6 +71,7 @@ async def get_batch_export(batch_export_id: UUID) -> BatchExportDetails:
config = batch_export.destination.config
return BatchExportDetails(
id=batch_export.id,
team_id=batch_export.team_id,
interval=batch_export.interval,
exclude_events=config.get("exclude_events", []),
include_events=config.get("include_events", []),
Expand Down Expand Up @@ -120,7 +123,7 @@ async def get_event_counts(inputs: GetEventCountsInputs) -> GetEventCountsOutput
"include_events": inputs.include_events,
"exclude_events": inputs.exclude_events,
}
async with get_client() as client:
async with Heartbeater(), get_client() as client:
if not await client.is_alive():
raise ConnectionError("Cannot establish connection to ClickHouse")

Expand All @@ -146,13 +149,14 @@ async def update_batch_export_runs(inputs: UpdateBatchExportRunsInputs) -> int:
"""Update BatchExportRuns with the expected number of events."""

total_rows_updated = 0
for result in inputs.results:
total_rows_updated += await aupdate_records_total_count(
batch_export_id=inputs.batch_export_id,
interval_start=dt.datetime.strptime(result.interval_start, "%Y-%m-%d %H:%M:%S").replace(tzinfo=dt.UTC),
interval_end=dt.datetime.strptime(result.interval_end, "%Y-%m-%d %H:%M:%S").replace(tzinfo=dt.UTC),
count=result.count,
)
async with Heartbeater():
for result in inputs.results:
total_rows_updated += await aupdate_records_total_count(
batch_export_id=inputs.batch_export_id,
interval_start=dt.datetime.strptime(result.interval_start, "%Y-%m-%d %H:%M:%S").replace(tzinfo=dt.UTC),
interval_end=dt.datetime.strptime(result.interval_end, "%Y-%m-%d %H:%M:%S").replace(tzinfo=dt.UTC),
count=result.count,
)
activity.logger.info(f"Updated {total_rows_updated} BatchExportRuns")
return total_rows_updated

Expand Down Expand Up @@ -186,11 +190,9 @@ async def run(self, inputs: BatchExportMonitoringInputs):
inputs.batch_export_id,
start_to_close_timeout=dt.timedelta(minutes=1),
retry_policy=RetryPolicy(
maximum_attempts=3,
initial_interval=dt.timedelta(seconds=20),
non_retryable_error_types=["NoActiveBatchExportsFoundError"],
non_retryable_error_types=["BatchExportNotFoundError", "NoValidBatchExportsFoundError"],
),
heartbeat_timeout=dt.timedelta(minutes=1),
)

# time interval to check is not the previous hour but the hour before that
Expand All @@ -204,7 +206,7 @@ async def run(self, inputs: BatchExportMonitoringInputs):
total_events = await workflow.execute_activity(
get_event_counts,
GetEventCountsInputs(
team_id=inputs.team_id,
team_id=batch_export_details.team_id,
interval=batch_export_details.interval,
overall_interval_start=interval_start_str,
overall_interval_end=interval_end_str,
Expand Down

0 comments on commit 5f2e8ed

Please sign in to comment.