From fee8dd452e73272a2a9203e42d5c7092a52b9117 Mon Sep 17 00:00:00 2001 From: Tom Owers Date: Thu, 2 Jan 2025 22:14:40 +0100 Subject: [PATCH] fix(data-warehouse): Fix for decimal overflow tables (#27228) --- .../pipelines/pipeline/delta_table_helper.py | 12 +++++++++++- .../workflow_activities/import_data_sync.py | 6 +++++- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/posthog/temporal/data_imports/pipelines/pipeline/delta_table_helper.py b/posthog/temporal/data_imports/pipelines/pipeline/delta_table_helper.py index 64cbbda922863..63191aaf918cd 100644 --- a/posthog/temporal/data_imports/pipelines/pipeline/delta_table_helper.py +++ b/posthog/temporal/data_imports/pipelines/pipeline/delta_table_helper.py @@ -6,8 +6,10 @@ from dlt.common.normalizers.naming.snake_case import NamingConvention import deltalake as deltalake from django.conf import settings +from sentry_sdk import capture_exception from posthog.settings.base_variables import TEST from posthog.warehouse.models import ExternalDataJob +from posthog.warehouse.s3 import get_s3_client class DeltaTableHelper: @@ -66,7 +68,15 @@ def get_delta_table(self) -> deltalake.DeltaTable | None: storage_options = self._get_credentials() if deltalake.DeltaTable.is_deltatable(table_uri=delta_uri, storage_options=storage_options): - return deltalake.DeltaTable(table_uri=delta_uri, storage_options=storage_options) + try: + return deltalake.DeltaTable(table_uri=delta_uri, storage_options=storage_options) + except Exception as e: + # Temp fix for bugged tables + capture_exception(e) + if "parse decimal overflow" in "".join(e.args): + s3 = get_s3_client() + s3.delete(delta_uri, recursive=True) + return None return None diff --git a/posthog/temporal/data_imports/workflow_activities/import_data_sync.py b/posthog/temporal/data_imports/workflow_activities/import_data_sync.py index a208e6bef95c8..a4696cf453410 100644 --- a/posthog/temporal/data_imports/workflow_activities/import_data_sync.py +++ b/posthog/temporal/data_imports/workflow_activities/import_data_sync.py @@ -164,7 +164,11 @@ def import_data_activity_sync(inputs: ImportDataActivityInputs): ExternalDataSource.Type.MYSQL, ExternalDataSource.Type.MSSQL, ]: - if is_posthog_team(inputs.team_id) or is_enabled_for_team(inputs.team_id): + if ( + is_posthog_team(inputs.team_id) + or is_enabled_for_team(inputs.team_id) + or settings.TEMPORAL_TASK_QUEUE == DATA_WAREHOUSE_TASK_QUEUE_V2 + ): from posthog.temporal.data_imports.pipelines.sql_database_v2 import sql_source_for_type else: from posthog.temporal.data_imports.pipelines.sql_database import sql_source_for_type