diff --git a/posthog/batch_exports/models.py b/posthog/batch_exports/models.py index 20eb35966dc223..de8ffedcfd3a42 100644 --- a/posthog/batch_exports/models.py +++ b/posthog/batch_exports/models.py @@ -106,13 +106,6 @@ class Status(models.TextChoices): records_total_count = models.IntegerField( null=True, help_text="The total count of records that should be exported in this BatchExportRun." ) - expected_records_count = models.IntegerField( - null=True, - help_text=( - "The total count of records that we expect to be exported in this BatchExportRun. " - "This is updated automatically post-batch export by a monitoring job." - ), - ) @property def workflow_id(self) -> str: diff --git a/posthog/temporal/batch_exports/monitoring.py b/posthog/temporal/batch_exports/monitoring.py index 05bac0448bb8a0..97eaf6c2430d90 100644 --- a/posthog/temporal/batch_exports/monitoring.py +++ b/posthog/temporal/batch_exports/monitoring.py @@ -11,6 +11,7 @@ from posthog.batch_exports.sql import EVENT_COUNT_BY_INTERVAL from posthog.temporal.batch_exports.base import PostHogWorkflow from posthog.temporal.common.clickhouse import get_client +from posthog.temporal.common.heartbeat import Heartbeater class BatchExportNotFoundError(Exception): @@ -41,6 +42,7 @@ class BatchExportMonitoringInputs: @dataclass class BatchExportDetails: id: UUID + team_id: int interval: str exclude_events: list[str] include_events: list[str] @@ -69,6 +71,7 @@ async def get_batch_export(batch_export_id: UUID) -> BatchExportDetails: config = batch_export.destination.config return BatchExportDetails( id=batch_export.id, + team_id=batch_export.team_id, interval=batch_export.interval, exclude_events=config.get("exclude_events", []), include_events=config.get("include_events", []), @@ -120,7 +123,7 @@ async def get_event_counts(inputs: GetEventCountsInputs) -> GetEventCountsOutput "include_events": inputs.include_events, "exclude_events": inputs.exclude_events, } - async with get_client() as client: + async with Heartbeater(), get_client() as client: if not await client.is_alive(): raise ConnectionError("Cannot establish connection to ClickHouse") @@ -146,13 +149,14 @@ async def update_batch_export_runs(inputs: UpdateBatchExportRunsInputs) -> int: """Update BatchExportRuns with the expected number of events.""" total_rows_updated = 0 - for result in inputs.results: - total_rows_updated += await aupdate_records_total_count( - batch_export_id=inputs.batch_export_id, - interval_start=dt.datetime.strptime(result.interval_start, "%Y-%m-%d %H:%M:%S").replace(tzinfo=dt.UTC), - interval_end=dt.datetime.strptime(result.interval_end, "%Y-%m-%d %H:%M:%S").replace(tzinfo=dt.UTC), - count=result.count, - ) + async with Heartbeater(): + for result in inputs.results: + total_rows_updated += await aupdate_records_total_count( + batch_export_id=inputs.batch_export_id, + interval_start=dt.datetime.strptime(result.interval_start, "%Y-%m-%d %H:%M:%S").replace(tzinfo=dt.UTC), + interval_end=dt.datetime.strptime(result.interval_end, "%Y-%m-%d %H:%M:%S").replace(tzinfo=dt.UTC), + count=result.count, + ) activity.logger.info(f"Updated {total_rows_updated} BatchExportRuns") return total_rows_updated @@ -186,11 +190,9 @@ async def run(self, inputs: BatchExportMonitoringInputs): inputs.batch_export_id, start_to_close_timeout=dt.timedelta(minutes=1), retry_policy=RetryPolicy( - maximum_attempts=3, initial_interval=dt.timedelta(seconds=20), - non_retryable_error_types=["NoActiveBatchExportsFoundError"], + non_retryable_error_types=["BatchExportNotFoundError", "NoValidBatchExportsFoundError"], ), - heartbeat_timeout=dt.timedelta(minutes=1), ) # time interval to check is not the previous hour but the hour before that @@ -204,7 +206,7 @@ async def run(self, inputs: BatchExportMonitoringInputs): total_events = await workflow.execute_activity( get_event_counts, GetEventCountsInputs( - team_id=inputs.team_id, + team_id=batch_export_details.team_id, interval=batch_export_details.interval, overall_interval_start=interval_start_str, overall_interval_end=interval_end_str,