diff --git a/posthog/temporal/batch_exports/s3_batch_export.py b/posthog/temporal/batch_exports/s3_batch_export.py index 55dacb59e60e4..8246db39eb6ad 100644 --- a/posthog/temporal/batch_exports/s3_batch_export.py +++ b/posthog/temporal/batch_exports/s3_batch_export.py @@ -49,6 +49,7 @@ ) from posthog.temporal.batch_exports.temporary_file import ( BatchExportTemporaryFile, + UnsupportedFileFormatError, WriterFormat, ) from posthog.temporal.batch_exports.utils import set_status_to_running_task @@ -71,6 +72,8 @@ "RecordBatchConsumerNonRetryableExceptionGroup", # Invalid S3 endpoint URL "InvalidS3EndpointError", + # Invalid file_format input + "UnsupportedFileFormatError", ] FILE_FORMAT_EXTENSIONS = { @@ -107,7 +110,11 @@ def get_s3_key(inputs) -> str: """Return an S3 key given S3InsertInputs.""" template_variables = get_allowed_template_variables(inputs) key_prefix = inputs.prefix.format(**template_variables) - file_extension = FILE_FORMAT_EXTENSIONS[inputs.file_format] + + try: + file_extension = FILE_FORMAT_EXTENSIONS[inputs.file_format] + except KeyError: + raise UnsupportedFileFormatError(inputs.file_format, "S3") base_file_name = f"{inputs.data_interval_start}-{inputs.data_interval_end}" if inputs.compression is not None: diff --git a/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py index 3dab18c67b283..85594e7ad4fa0 100644 --- a/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py @@ -39,6 +39,7 @@ insert_into_s3_activity, s3_default_fields, ) +from posthog.temporal.batch_exports.temporary_file import UnsupportedFileFormatError from posthog.temporal.common.clickhouse import ClickHouseClient from posthog.temporal.tests.batch_exports.utils import mocked_start_batch_export_run from posthog.temporal.tests.utils.events import generate_test_events_in_clickhouse @@ -561,6 +562,51 @@ async def test_insert_into_s3_activity_puts_data_into_s3_using_async( ) +@pytest.mark.parametrize("model", [BatchExportModel(name="events", schema=None)]) +@pytest.mark.parametrize("file_format", ["invalid"]) +async def test_insert_into_s3_activity_fails_on_invalid_file_format( + clickhouse_client, + bucket_name, + minio_client, + activity_environment, + compression, + exclude_events, + file_format, + data_interval_start, + data_interval_end, + model: BatchExportModel | BatchExportSchema | None, + ateam, +): + """Test the insert_into_s3_activity function fails with an invalid file format.""" + batch_export_schema: BatchExportSchema | None = None + batch_export_model: BatchExportModel | None = None + if isinstance(model, BatchExportModel): + batch_export_model = model + elif model is not None: + batch_export_schema = model + + insert_inputs = S3InsertInputs( + bucket_name=bucket_name, + region="us-east-1", + prefix="any", + team_id=ateam.pk, + data_interval_start=data_interval_start.isoformat(), + data_interval_end=data_interval_end.isoformat(), + aws_access_key_id="object_storage_root_user", + aws_secret_access_key="object_storage_root_password", + endpoint_url=settings.OBJECT_STORAGE_ENDPOINT, + compression=compression, + exclude_events=exclude_events, + file_format=file_format, + batch_export_schema=batch_export_schema, + batch_export_model=batch_export_model, + ) + + with pytest.raises(UnsupportedFileFormatError): + with override_settings(BATCH_EXPORT_POSTGRES_UPLOAD_CHUNK_SIZE_BYTES=5 * 1024**2): + await activity_environment.run(insert_into_s3_activity, insert_inputs) + + @pytest_asyncio.fixture async def s3_batch_export( ateam,