Skip to content

Commit

Permalink
fix: Raise exception on invalid file format (#27149)
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfarias authored Jan 3, 2025
1 parent 7196fd9 commit 12eb711
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 1 deletion.
9 changes: 8 additions & 1 deletion posthog/temporal/batch_exports/s3_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
)
from posthog.temporal.batch_exports.temporary_file import (
BatchExportTemporaryFile,
UnsupportedFileFormatError,
WriterFormat,
)
from posthog.temporal.batch_exports.utils import set_status_to_running_task
Expand All @@ -71,6 +72,8 @@
"RecordBatchConsumerNonRetryableExceptionGroup",
# Invalid S3 endpoint URL
"InvalidS3EndpointError",
# Invalid file_format input
"UnsupportedFileFormatError",
]

FILE_FORMAT_EXTENSIONS = {
Expand Down Expand Up @@ -107,7 +110,11 @@ def get_s3_key(inputs) -> str:
"""Return an S3 key given S3InsertInputs."""
template_variables = get_allowed_template_variables(inputs)
key_prefix = inputs.prefix.format(**template_variables)
file_extension = FILE_FORMAT_EXTENSIONS[inputs.file_format]

try:
file_extension = FILE_FORMAT_EXTENSIONS[inputs.file_format]
except KeyError:
raise UnsupportedFileFormatError(inputs.file_format, "S3")

base_file_name = f"{inputs.data_interval_start}-{inputs.data_interval_end}"
if inputs.compression is not None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
insert_into_s3_activity,
s3_default_fields,
)
from posthog.temporal.batch_exports.temporary_file import UnsupportedFileFormatError
from posthog.temporal.common.clickhouse import ClickHouseClient
from posthog.temporal.tests.batch_exports.utils import mocked_start_batch_export_run
from posthog.temporal.tests.utils.events import generate_test_events_in_clickhouse
Expand Down Expand Up @@ -561,6 +562,51 @@ async def test_insert_into_s3_activity_puts_data_into_s3_using_async(
)


@pytest.mark.parametrize("model", [BatchExportModel(name="events", schema=None)])
@pytest.mark.parametrize("file_format", ["invalid"])
async def test_insert_into_s3_activity_fails_on_invalid_file_format(
clickhouse_client,
bucket_name,
minio_client,
activity_environment,
compression,
exclude_events,
file_format,
data_interval_start,
data_interval_end,
model: BatchExportModel | BatchExportSchema | None,
ateam,
):
"""Test the insert_into_s3_activity function fails with an invalid file format."""
batch_export_schema: BatchExportSchema | None = None
batch_export_model: BatchExportModel | None = None
if isinstance(model, BatchExportModel):
batch_export_model = model
elif model is not None:
batch_export_schema = model

insert_inputs = S3InsertInputs(
bucket_name=bucket_name,
region="us-east-1",
prefix="any",
team_id=ateam.pk,
data_interval_start=data_interval_start.isoformat(),
data_interval_end=data_interval_end.isoformat(),
aws_access_key_id="object_storage_root_user",
aws_secret_access_key="object_storage_root_password",
endpoint_url=settings.OBJECT_STORAGE_ENDPOINT,
compression=compression,
exclude_events=exclude_events,
file_format=file_format,
batch_export_schema=batch_export_schema,
batch_export_model=batch_export_model,
)

with pytest.raises(UnsupportedFileFormatError):
with override_settings(BATCH_EXPORT_POSTGRES_UPLOAD_CHUNK_SIZE_BYTES=5 * 1024**2):
await activity_environment.run(insert_into_s3_activity, insert_inputs)


@pytest_asyncio.fixture
async def s3_batch_export(
ateam,
Expand Down

0 comments on commit 12eb711

Please sign in to comment.