Skip to content

Commit

Permalink
Establish feature flag branching for parquet work
Browse files Browse the repository at this point in the history
Why these changes are being introduced:
* We will be using a feature flag approach while modifying
TIMDEX Pipeline Lambdas to generate the required commands
for the Transform step to output files to a parquet dataset
and for the Load step to read records a parquet dataset
and index records into TIMDEX.

We are doing so with the goal of full backwards compatibility until
that refactor is complete.

How this addresses that need:
* Utilizes a new optional env var 'ETL_VERSION' to provide an ETL version
for code to branch from.
* Format Input Lambda function handler is updated to provide an ETL version
to the functions that generate commands for the Transform and Load step.
* Add branching logic and submethods to the functions that generate commands
for the Transform and Load step.
* All feature flag logic branching is noted by comments, suitable for
removal when the development work is complete.

Side effects of this change:
* TIMDEX Pipeline Lambdas remains fully backwards compatible, either via
the absence of env var 'ETL_VERSION' or if the value equals '1'.

Relevant ticket(s):
* https://mitlibraries.atlassian.net/browse/TIMX-412
  • Loading branch information
jonavellecuerdo committed Dec 3, 2024
1 parent ed33a71 commit 9f4d3ba
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 14 deletions.
50 changes: 46 additions & 4 deletions lambdas/commands.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from typing import Literal

from lambdas import config, helpers

Expand Down Expand Up @@ -72,10 +73,24 @@ def generate_transform_commands(
input_data: dict,
run_date: str,
timdex_bucket: str,
) -> dict:
etl_version: Literal[1, 2],
) -> dict[str, list[dict]]:
"""Generate task run command for TIMDEX transform."""
files_to_transform: list[dict] = []
# NOTE: FEATURE FLAG: branching logic will be removed after v2 work is complete
match etl_version:
case 1:
return _etl_v1_generate_transform_commands_method(
extract_output_files, input_data, run_date, timdex_bucket
)
case 2:
return _etl_v2_generate_transform_commands_method()


# NOTE: FEATURE FLAG: branching logic + method removed after v2 work is complete
def _etl_v1_generate_transform_commands_method(
extract_output_files: list[str], input_data: dict, run_date: str, timdex_bucket: str
) -> dict[str, list[dict]]:
files_to_transform: list[dict] = []
source = input_data["source"]
transform_output_prefix = helpers.generate_step_output_prefix(
source, run_date, input_data["run-type"], "transform"
Expand All @@ -96,14 +111,36 @@ def generate_transform_commands(
]

files_to_transform.append({"transform-command": transform_command})

return {"files-to-transform": files_to_transform}


# NOTE: FEATURE FLAG: branching logic + method removed after v2 work is complete
def _etl_v2_generate_transform_commands_method() -> dict[str, list[dict]]:
raise NotImplementedError


def generate_load_commands(
transform_output_files: list[str], run_type: str, source: str, timdex_bucket: str
transform_output_files: list[str],
run_type: str,
source: str,
timdex_bucket: str,
etl_version: Literal[1, 2],
) -> dict:
"""Generate task run command for loading records into OpenSearch."""
# NOTE: FEATURE FLAG: branching logic will be removed after v2 work is complete
match etl_version:
case 1:
return _etl_v1_generate_load_commands_method(
transform_output_files, run_type, source, timdex_bucket
)
case 2:
return _etl_v2_generate_load_commands_method()


# NOTE: FEATURE FLAG: branching logic + method removed after v2 work is complete
def _etl_v1_generate_load_commands_method(
transform_output_files: list[str], run_type: str, source: str, timdex_bucket: str
) -> dict:
if run_type == "daily":
files_to_index = []
files_to_delete = []
Expand Down Expand Up @@ -169,3 +206,8 @@ def generate_load_commands(
return {
"failure": "Something unexpected went wrong. Please check input and try again."
}


# NOTE: FEATURE FLAG: branching logic + method removed after v2 work is complete
def _etl_v2_generate_load_commands_method() -> dict:
raise NotImplementedError
10 changes: 10 additions & 0 deletions lambdas/config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import os
from typing import Literal

GIS_SOURCES = ["gismit", "gisogm"]
INDEX_ALIASES = {
Expand Down Expand Up @@ -51,6 +52,15 @@ def configure_logger(
)


# NOTE: FEATURE FLAG: function will be removed after v2 work is complete
def get_etl_version() -> Literal[1, 2]:
etl_version = int(os.environ.get("ETL_VERSION", "1"))
if etl_version not in [1, 2]:
message = f"ETL_VERSION '{etl_version}' not supported"
raise ValueError(message)
return etl_version # type: ignore[return-value]


def validate_input(input_data: dict) -> None:
"""Validate input to the lambda function.
Expand Down
10 changes: 8 additions & 2 deletions lambdas/format_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
def lambda_handler(event: dict, _context: dict) -> dict:
"""Format data into the necessary input for TIMDEX pipeline processing."""
config.verify_env()
# NOTE: FEATURE FLAG: branching logic will be removed after v2 work is complete
etl_version = config.get_etl_version()
verbose = config.check_verbosity(event.get("verbose", False))
config.configure_logger(logging.getLogger(), verbose)
logger.debug(json.dumps(event))
Expand Down Expand Up @@ -66,8 +68,10 @@ def lambda_handler(event: dict, _context: dict) -> dict:
source,
)
result["next-step"] = "load"

# NOTE: FEATURE FLAG: branching logic will be removed after v2 work is complete
result["transform"] = commands.generate_transform_commands(
extract_output_files, event, run_date, timdex_bucket
extract_output_files, event, run_date, timdex_bucket, etl_version
)

elif next_step == "load":
Expand All @@ -84,8 +88,10 @@ def lambda_handler(event: dict, _context: dict) -> dict:
"the provided date and source, something likely went wrong."
)
return result

# NOTE: FEATURE FLAG: branching logic will be removed after v2 work is complete
result["load"] = commands.generate_load_commands(
transform_output_files, run_type, source, timdex_bucket
transform_output_files, run_type, source, timdex_bucket, etl_version
)

return result
2 changes: 2 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ def _test_env(monkeypatch):
monkeypatch.setenv("TIMDEX_ALMA_EXPORT_BUCKET_ID", "test-alma-bucket")
monkeypatch.setenv("TIMDEX_S3_EXTRACT_BUCKET_ID", "test-timdex-bucket")
monkeypatch.setenv("WORKSPACE", "test")
# NOTE: FEATURE FLAG: remove after v2 work is complete
monkeypatch.setenv("ETL_VERSION", "1")


@pytest.fixture(autouse=True)
Expand Down
16 changes: 8 additions & 8 deletions tests/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def test_generate_transform_commands_required_input_fields():
"testsource/testsource-2022-01-02-full-extracted-records-to-index.xml"
]
assert commands.generate_transform_commands(
extract_output_files, input_data, "2022-01-02", "test-timdex-bucket"
extract_output_files, input_data, "2022-01-02", "test-timdex-bucket", 1
) == {
"files-to-transform": [
{
Expand Down Expand Up @@ -117,7 +117,7 @@ def test_generate_transform_commands_all_input_fields():
"testsource/testsource-2022-01-02-daily-extracted-records-to-delete.xml",
]
assert commands.generate_transform_commands(
extract_output_files, input_data, "2022-01-02", "test-timdex-bucket"
extract_output_files, input_data, "2022-01-02", "test-timdex-bucket", 1
) == {
"files-to-transform": [
{
Expand Down Expand Up @@ -158,7 +158,7 @@ def test_generate_load_commands_daily():
"testsource/testsource-2022-01-02-daily-transformed-records-to-delete.txt",
]
assert commands.generate_load_commands(
transform_output_files, "daily", "testsource", "test-timdex-bucket"
transform_output_files, "daily", "testsource", "test-timdex-bucket", 1
) == {
"files-to-index": [
{
Expand Down Expand Up @@ -200,7 +200,7 @@ def test_generate_load_commands_full_not_aliased():
"testsource/testsource-2022-01-02-full-transformed-records-to-index.json"
]
assert commands.generate_load_commands(
transform_output_files, "full", "testsource", "test-timdex-bucket"
transform_output_files, "full", "testsource", "test-timdex-bucket", 1
) == {
"create-index-command": ["create", "--index", "testsource-2022-01-02t12-13-14"],
"files-to-index": [
Expand Down Expand Up @@ -230,7 +230,7 @@ def test_generate_load_commands_full_aliased():
"alma/alma-2022-01-02-full-transformed-records-to-index.json"
]
assert commands.generate_load_commands(
transform_output_files, "full", "alma", "test-timdex-bucket"
transform_output_files, "full", "alma", "test-timdex-bucket", 1
) == {
"create-index-command": ["create", "--index", "alma-2022-01-02t12-13-14"],
"files-to-index": [
Expand Down Expand Up @@ -262,7 +262,7 @@ def test_generate_load_commands_full_with_deletes_logs_error(caplog):
"alma/alma-2022-01-02-full-transformed-records-to-delete.txt"
]
commands.generate_load_commands(
transform_output_files, "full", "alma", "test-timdex-bucket"
transform_output_files, "full", "alma", "test-timdex-bucket", 1
)
assert (
"lambdas.commands",
Expand All @@ -272,10 +272,10 @@ def test_generate_load_commands_full_with_deletes_logs_error(caplog):
) in caplog.record_tuples


def test_generate_load_command_unexpected_input():
def test_generate_load_commands_unexpected_input():
transform_output_files = [
"alma/alma-2022-01-02-full-transformed-records-to-index.json"
]
assert commands.generate_load_commands(
transform_output_files, "wrong", "alma", "test-timdex-bucket"
transform_output_files, "wrong", "alma", "test-timdex-bucket", 1
) == {"failure": "Something unexpected went wrong. Please check input and try again."}

0 comments on commit 9f4d3ba

Please sign in to comment.