Skip to content

Commit

Permalink
Feature flag support for v2 transform command
Browse files Browse the repository at this point in the history
Why these changes are being introduced:

We are adding feature flag support for parquet dataset
writing, aka ETL_VERSION=2, and the first step will be supporting
an updated transform command for Transmogrifier.

How this addresses that need:
* Builds out feature flag pathway _etl_v2_generate_transform_commands_method
* Adds comments for what code is feature flagged and should
be addressed after v2 work is established

Side effects of this change:
* If ETL_VERSION=2 for this application, new pathways will
be used for transform command generation

Relevant ticket(s):
* https://mitlibraries.atlassian.net/browse/TIMX-441
  • Loading branch information
ghukill committed Dec 19, 2024
1 parent 8e09243 commit 5214451
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 16 deletions.
24 changes: 21 additions & 3 deletions lambdas/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def generate_transform_commands(
input_data: dict,
run_date: str,
timdex_bucket: str,
run_id: str,
) -> dict[str, list[dict]]:
"""Generate task run command for TIMDEX transform."""
# NOTE: FEATURE FLAG: branching logic will be removed after v2 work is complete
Expand All @@ -82,7 +83,9 @@ def generate_transform_commands(
extract_output_files, input_data, run_date, timdex_bucket
)
case 2:
return _etl_v2_generate_transform_commands_method()
return _etl_v2_generate_transform_commands_method(
extract_output_files, input_data, timdex_bucket, run_id
)


# NOTE: FEATURE FLAG: branching logic + method removed after v2 work is complete
Expand Down Expand Up @@ -114,8 +117,23 @@ def _etl_v1_generate_transform_commands_method(


# NOTE: FEATURE FLAG: branching logic + method removed after v2 work is complete
def _etl_v2_generate_transform_commands_method() -> dict[str, list[dict]]:
raise NotImplementedError
def _etl_v2_generate_transform_commands_method(
extract_output_files: list[str],
input_data: dict,
timdex_bucket: str,
run_id: str,
) -> dict[str, list[dict]]:
files_to_transform: list[dict] = []
source = input_data["source"]
for extract_output_file in extract_output_files:
transform_command = [
f"--input-file=s3://{timdex_bucket}/{extract_output_file}",
f"--output-location=s3://{timdex_bucket}/dataset",
f"--source={source}",
f"--run-id={run_id}",
]
files_to_transform.append({"transform-command": transform_command})
return {"files-to-transform": files_to_transform}


def generate_load_commands(
Expand Down
1 change: 1 addition & 0 deletions lambdas/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"TIMDEX_S3_EXTRACT_BUCKET_ID",
"WORKSPACE",
}
# NOTE: FEATURE FLAG: add "run-id" after v1 pathways are removed
REQUIRED_FIELDS = ("next-step", "run-date", "run-type", "source")
REQUIRED_OAI_HARVEST_FIELDS = ("oai-pmh-host", "oai-metadata-format")
VALID_DATE_FORMATS = ("%Y-%m-%d", "%Y-%m-%dT%H:%M:%SZ")
Expand Down
4 changes: 3 additions & 1 deletion lambdas/format_input.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
import logging
import os
import uuid

from lambdas import alma_prep, commands, config, errors, helpers

Expand All @@ -19,6 +20,7 @@ def lambda_handler(event: dict, _context: dict) -> dict:
run_type = event["run-type"]
source = event["source"]
next_step = event["next-step"]
run_id = event.get("run-id", str(uuid.uuid4()))
timdex_bucket = os.environ["TIMDEX_S3_EXTRACT_BUCKET_ID"]

result = {
Expand Down Expand Up @@ -67,7 +69,7 @@ def lambda_handler(event: dict, _context: dict) -> dict:
)
result["next-step"] = "load"
result["transform"] = commands.generate_transform_commands(
extract_output_files, event, run_date, timdex_bucket
extract_output_files, event, run_date, timdex_bucket, run_id
)

elif next_step == "load":
Expand Down
13 changes: 13 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,16 @@ def mocked_s3():
def s3_client():
# ruff: noqa: PT022
yield boto3.client("s3")


@pytest.fixture
def run_id():
return "run-abc-123"


# NOTE: FEATURE FLAG: remove after v2 work is complete
@pytest.fixture
def etl_version_2(monkeypatch):
etl_version = 2
monkeypatch.setenv("ETL_VERSION", f"{etl_version}")
return etl_version
28 changes: 16 additions & 12 deletions tests/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def test_generate_extract_command_geoharvester():
}


def test_generate_transform_commands_required_input_fields():
def test_generate_transform_commands_required_input_fields(etl_version_2, run_id):
input_data = {
"next-step": "transform",
"run-date": "2022-01-02T12:13:14Z",
Expand All @@ -88,23 +88,27 @@ def test_generate_transform_commands_required_input_fields():
"testsource/testsource-2022-01-02-full-extracted-records-to-index.xml"
]
assert commands.generate_transform_commands(
extract_output_files, input_data, "2022-01-02", "test-timdex-bucket"
extract_output_files,
input_data,
"2022-01-02",
"test-timdex-bucket",
run_id,
) == {
"files-to-transform": [
{
"transform-command": [
"--input-file=s3://test-timdex-bucket/testsource/"
"testsource-2022-01-02-full-extracted-records-to-index.xml",
"--output-file=s3://test-timdex-bucket/testsource/"
"testsource-2022-01-02-full-transformed-records-to-index.json",
"--output-location=s3://test-timdex-bucket/dataset",
"--source=testsource",
f"--run-id={run_id}",
]
}
]
}


def test_generate_transform_commands_all_input_fields():
def test_generate_transform_commands_all_input_fields(etl_version_2, run_id):
input_data = {
"next-step": "transform",
"run-date": "2022-01-02T12:13:14Z",
Expand All @@ -117,34 +121,34 @@ def test_generate_transform_commands_all_input_fields():
"testsource/testsource-2022-01-02-daily-extracted-records-to-delete.xml",
]
assert commands.generate_transform_commands(
extract_output_files, input_data, "2022-01-02", "test-timdex-bucket"
extract_output_files, input_data, "2022-01-02", "test-timdex-bucket", run_id
) == {
"files-to-transform": [
{
"transform-command": [
"--input-file=s3://test-timdex-bucket/testsource/"
"testsource-2022-01-02-daily-extracted-records-to-index_01.xml",
"--output-file=s3://test-timdex-bucket/testsource/"
"testsource-2022-01-02-daily-transformed-records-to-index_01.json",
"--output-location=s3://test-timdex-bucket/dataset",
"--source=testsource",
f"--run-id={run_id}",
]
},
{
"transform-command": [
"--input-file=s3://test-timdex-bucket/testsource/"
"testsource-2022-01-02-daily-extracted-records-to-index_02.xml",
"--output-file=s3://test-timdex-bucket/testsource/"
"testsource-2022-01-02-daily-transformed-records-to-index_02.json",
"--output-location=s3://test-timdex-bucket/dataset",
"--source=testsource",
f"--run-id={run_id}",
]
},
{
"transform-command": [
"--input-file=s3://test-timdex-bucket/testsource/"
"testsource-2022-01-02-daily-extracted-records-to-delete.xml",
"--output-file=s3://test-timdex-bucket/testsource/"
"testsource-2022-01-02-daily-transformed-records-to-delete.txt",
"--output-location=s3://test-timdex-bucket/dataset",
"--source=testsource",
f"--run-id={run_id}",
]
},
]
Expand Down
79 changes: 79 additions & 0 deletions tests/test_commands_v1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# ruff: noqa: FBT003

from lambdas import commands

# NOTE: FEATURE FLAG: this file can be FULLY removed after v2 work is complete


def test_generate_transform_commands_required_input_fields(run_id):
input_data = {
"next-step": "transform",
"run-date": "2022-01-02T12:13:14Z",
"run-type": "full",
"source": "testsource",
}
extract_output_files = [
"testsource/testsource-2022-01-02-full-extracted-records-to-index.xml"
]
assert commands.generate_transform_commands(
extract_output_files, input_data, "2022-01-02", "test-timdex-bucket", run_id
) == {
"files-to-transform": [
{
"transform-command": [
"--input-file=s3://test-timdex-bucket/testsource/"
"testsource-2022-01-02-full-extracted-records-to-index.xml",
"--output-file=s3://test-timdex-bucket/testsource/"
"testsource-2022-01-02-full-transformed-records-to-index.json",
"--source=testsource",
]
}
]
}


def test_generate_transform_commands_all_input_fields(run_id):
input_data = {
"next-step": "transform",
"run-date": "2022-01-02T12:13:14Z",
"run-type": "daily",
"source": "testsource",
}
extract_output_files = [
"testsource/testsource-2022-01-02-daily-extracted-records-to-index_01.xml",
"testsource/testsource-2022-01-02-daily-extracted-records-to-index_02.xml",
"testsource/testsource-2022-01-02-daily-extracted-records-to-delete.xml",
]
assert commands.generate_transform_commands(
extract_output_files, input_data, "2022-01-02", "test-timdex-bucket", run_id
) == {
"files-to-transform": [
{
"transform-command": [
"--input-file=s3://test-timdex-bucket/testsource/"
"testsource-2022-01-02-daily-extracted-records-to-index_01.xml",
"--output-file=s3://test-timdex-bucket/testsource/"
"testsource-2022-01-02-daily-transformed-records-to-index_01.json",
"--source=testsource",
]
},
{
"transform-command": [
"--input-file=s3://test-timdex-bucket/testsource/"
"testsource-2022-01-02-daily-extracted-records-to-index_02.xml",
"--output-file=s3://test-timdex-bucket/testsource/"
"testsource-2022-01-02-daily-transformed-records-to-index_02.json",
"--source=testsource",
]
},
{
"transform-command": [
"--input-file=s3://test-timdex-bucket/testsource/"
"testsource-2022-01-02-daily-extracted-records-to-delete.xml",
"--output-file=s3://test-timdex-bucket/testsource/"
"testsource-2022-01-02-daily-transformed-records-to-delete.txt",
"--source=testsource",
]
},
]
}

0 comments on commit 5214451

Please sign in to comment.