Skip to content

Commit

Permalink
Write output to parquet dataset
Browse files Browse the repository at this point in the history
Why these changes are being introduced:

Transmogrifier is getting updated to write its output to a
TIMDEX parquet dataset, instead of standalone JSON and TXT files.
To perform the actual dataset writing, the library timdex-dataset-api
is used.

These changes also make some preliminary updates to the README and CLI
arguments, moving towards parquet dataset writing as the default behavior
for Transmogrifier.  However, "v1" behavior of writing to JSON and TXT
files is still supported, and will be until parquet dataset work is
finalized.

How this addresses that need:
* Installs timdex-dataset-api as application dependency
  * temporarily pinned to v0.2.0, but pinning will get removed
* Uses timdex_dataset_api.TIMDEXDataset class to perform writing
of transformed and source files to dataset
* Changes -o CLI argument alias from --output-file to
--output-location

Side effects of this change:
* None: where ETL_VERSION=1 (default) behavior will remain the same

Relevant ticket(s):
* https://mitlibraries.atlassian.net/browse/TIMX-405
  • Loading branch information
ghukill committed Dec 11, 2024
1 parent d3a6018 commit 5efd355
Show file tree
Hide file tree
Showing 10 changed files with 636 additions and 443 deletions.
3 changes: 1 addition & 2 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,18 @@ click = "*"
jsonlines = "*"
lxml = "*"
lxml-stubs = "*"
pyarrow = "*"
python-dateutil = "*"
sentry-sdk = "*"
smart-open = {version = "*", extras = ["s3"]}
types-python-dateutil = "*"
timdex-dataset-api = {ref = "8bf085a", git = "git+https://github.com/MITLibraries/timdex-dataset-api.git"}

[dev-packages]
black = "*"
coveralls = "*"
ipython = "*"
mypy = "*"
pre-commit = "*"
pyarrow-stubs = "*"
pytest = "*"
ruff = "*"
safety = "*"
Expand Down
650 changes: 388 additions & 262 deletions Pipfile.lock

Large diffs are not rendered by default.

17 changes: 10 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

An application to transform source records to the TIMDEX data model to facilitate ingest into an OpenSearch index.

TIMDEX ingests records from various sources with different metadata formats, necessitating an application to transform those source records to a common metadata format, the TIMDEX data model in this case. This application processes both XML and JSON source records and outputs a JSON file of records formatted according to the TIMDEX data model.
TIMDEX ingests records from various sources with different metadata formats, necessitating an application to transform those source records to a common metadata format, the TIMDEX data model in this case. This application processes source records, creates records formatted according to the TIMDEX data model, and writes to a TIMDEX parquet dataset.

```mermaid
---
Expand All @@ -14,21 +14,21 @@ flowchart TD
GeoData
MARC
transmogrifier((transmogrifier))
JSON
timdex-dataset
timdex-index-manager
ArchivesSpace[("ArchivesSpace<br>(EAD XML)")] --> transmogrifier
DSpace[("DSpace<br>(METS XML)")] --> transmogrifier
GeoData[("GeoData<br>(Aardvark JSON)")] --> transmogrifier
MARC[("Alma<br>(MARCXML)")] --> transmogrifier
transmogrifier --> JSON["TIMDEX JSON"]
JSON[TIMDEX JSON file] --> timdex-index-manager((timdex-index-manager))
transmogrifier --> timdex-dataset["TIMDEX Parquet Dataset"]
timdex-dataset["TIMDEX Parquet Dataset"] --> timdex-index-manager((timdex-index-manager))
```

The TIMDEX data model is designed to produce records that can be successfully ingested into an OpenSearch index and contains data fields that are broadly applicable to various types of records. `transmogrifier` contains different validators to ensure that the record is structured properly and that certain types of values, such as dates, align with OpenSearch's expectations.

Each source is defined with configuration values and a dedicated transform class to process records from that source. For each transform class, various errors and warnings are logged. Some errors are logged and the entire source record is skipped because the severity implies it should not be processed until fixed, while others are merely logged as warnings for later review. The application also determines which records are marked as deleted in each source and removes those record from the OpenSearch index.

After the JSON file of transformed records is produced, it is processed by `timdex-index-manager` for ingest into an OpenSearch index.
After Transmogrifier writes the transformed files to the TIMDEX parquet dataset, it is processed by `timdex-index-manager` for ingest into an OpenSearch index.

## Development

Expand Down Expand Up @@ -65,8 +65,11 @@ Usage: -c [OPTIONS]
Options:
-i, --input-file TEXT Filepath for harvested input records to
transform [required]
-o, --output-file TEXT Filepath to write output TIMDEX JSON records
to [required]
--output-file TEXT Filepath to write output TIMDEX JSON records
to. NOTE: this option will be removed when
output to parquet is finalized.
-o, --output-location TEXT Location of TIMDEX parquet dataset to write
to.
-s, --source [alma|aspace|dspace|jpal|libguides|gismit|gisogm|researchdatabases|whoas|zenodo]
Source records were harvested from, must
choose from list of options [required]
Expand Down
31 changes: 30 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ def runner():

@pytest.fixture
def generic_transformer():

class GenericTransformer(Transformer):
def parse_source_file(self):
pass
Expand Down Expand Up @@ -229,3 +228,33 @@ def timdex_record_all_fields_and_subfields():
subjects=[timdex.Subject(value=["Stuff"], kind="LCSH")],
summary=["This is data."],
)


# timdex parquet dataset ##########################


@pytest.fixture
def run_id():
return "run-abc-123"


@pytest.fixture
def empty_dataset_location(tmp_path):
return str(tmp_path / "dataset")


@pytest.fixture
def libguides_input_file():
return (
"tests/fixtures/dataset/libguides-2024-06-03-full-extracted-records-to-index.xml"
)


@pytest.fixture
def libguides_transformer(monkeypatch, run_id, libguides_input_file):
monkeypatch.setenv("ETL_VERSION", "2")
return Transformer.load(
"libguides",
libguides_input_file,
run_id=run_id,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<?xml version="1.0" encoding="UTF-8"?>
<records>
<!-- valid record to index -->
<record xmlns="http://www.openarchives.org/OAI/2.0/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"><header><identifier>oai:libguides.com:guides/175846</identifier><datestamp>2024-02-27T18:27:05Z</datestamp><setSpec>guides</setSpec></header><metadata><oai_dc:dc xmlns:oai_dc="http://www.openarchives.org/OAI/2.0/oai_dc/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.openarchives.org/OAI/2.0/oai_dc/ http://www.openarchives.org/OAI/2.0/oai_dc.xsd"><dc:title>Materials Science &amp; Engineering</dc:title><dc:creator>Ye Li</dc:creator><dc:subject>Engineering</dc:subject><dc:subject>Science</dc:subject><dc:description>Useful databases and other research tips for materials science.</dc:description><dc:publisher>MIT Libraries</dc:publisher><dc:date>2008-06-19 17:55:27</dc:date><dc:identifier>https://libguides.mit.edu/materials</dc:identifier></oai_dc:dc></metadata></record>

<!-- deleted record -->
<record xmlns="http://www.openarchives.org/OAI/2.0/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"><header status="deleted"><identifier>oai:libguides.com:guides/175849</identifier><datestamp>2024-05-21T18:36:58Z</datestamp><setSpec>guides</setSpec></header><metadata><oai_dc:dc xmlns:oai_dc="http://www.openarchives.org/OAI/2.0/oai_dc/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.openarchives.org/OAI/2.0/oai_dc/ http://www.openarchives.org/OAI/2.0/oai_dc.xsd"><dc:title>Country Data &amp; Analysis</dc:title><dc:creator>Nicholas Albaugh</dc:creator><dc:subject>Business &amp; management</dc:subject><dc:description>This is the subject guide for Country Data &amp; Analysis</dc:description><dc:publisher>MIT Libraries</dc:publisher><dc:date>2008-06-26 00:51:04</dc:date><dc:identifier>https://libguides.mit.edu/country</dc:identifier></oai_dc:dc></metadata></record>

<!-- skipped record -->
<record xmlns="http://www.openarchives.org/OAI/2.0/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"><header><identifier>oai:libguides.com:guides/175853</identifier><datestamp>2024-03-26T20:15:38Z</datestamp><setSpec>guides</setSpec></header><metadata><oai_dc:dc xmlns:oai_dc="http://www.openarchives.org/OAI/2.0/oai_dc/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.openarchives.org/OAI/2.0/oai_dc/ http://www.openarchives.org/OAI/2.0/oai_dc.xsd"><dc:title>News, Newspapers, and Current Events</dc:title><dc:creator>Tina Chan</dc:creator><dc:subject>Interdisciplinary</dc:subject><dc:description>This is the subject guide for News</dc:description><dc:publisher>MIT Libraries</dc:publisher><dc:date>2008-06-26 21:29:54</dc:date><dc:identifier>https://libguides.mit.edu/news</dc:identifier></oai_dc:dc></metadata></record>

<!-- unhandled exception record -->
<record xmlns="http://www.openarchives.org/OAI/2.0/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"><header><identifier>oai:libguides.com:guides/175855</identifier><datestamp>2021-07-19T09:31:31Z</datestamp><setSpec>guides</setSpec></header><metadata><oai_dc:dc xmlns:oai_dc="http://www.openarchives.org/OAI/2.0/oai_dc/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.openarchives.org/OAI/2.0/oai_dc/ http://www.openarchives.org/OAI/2.0/oai_dc.xsd"><dc:title>Biography</dc:title><dc:creator>Tina Chan</dc:creator><dc:subject>Interdisciplinary</dc:subject><dc:publisher>MIT Libraries</dc:publisher><dc:date>2008-06-26 22:05:13</dc:date><dc:identifier>https://libguides.mit.edu/biography</dc:identifier></oai_dc:dc></metadata></record>
</records>
90 changes: 79 additions & 11 deletions tests/sources/test_transformer.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
# ruff: noqa: PLR2004

import uuid
# ruff: noqa: PLR2004, SLF001, D202
import datetime
import json
from unittest import mock

import pytest
from lxml import etree
from timdex_dataset_api.record import DatasetRecord

import transmogrifier.models as timdex
from transmogrifier.exceptions import DeletedRecordEvent, SkippedRecordEvent
from transmogrifier.sources.transformer import Transformer
from transmogrifier.sources.xml.datacite import Datacite

Expand Down Expand Up @@ -79,13 +83,77 @@ def test_create_locations_from_spatial_subjects_success(timdex_record_required_f
]


def test_transformer_run_id_explicitly_passed(generic_transformer):
run_id = "abc123"
transformer = generic_transformer("alma", [], run_id=run_id)
assert transformer.run_id == run_id
def test_transformer_get_run_data_from_source_file_and_run_id(
libguides_transformer, libguides_input_file, run_id
):
assert libguides_transformer.get_run_data(libguides_input_file, run_id) == {
"source": "libguides",
"run_date": "2024-06-03",
"run_type": "full",
"run_id": run_id,
}


def test_transformer_next_iter_yields_dataset_records(libguides_transformer):
assert isinstance(next(libguides_transformer), DatasetRecord)


def test_transform_next_iter_sets_valid_source_and_transformed_records(
libguides_transformer,
):
record = next(libguides_transformer)

def test_transformer_run_id_none_passed_generates_uuid(generic_transformer):
transformer = generic_transformer("alma", [], run_id=None)
assert transformer.run_id is not None
assert uuid.UUID(transformer.run_id)
# parse source record XML
assert isinstance(record.source_record, bytes)
source_record = etree.fromstring(record.source_record)
assert isinstance(source_record, etree._Element)

# parse transformed record
assert isinstance(record.transformed_record, bytes)
transformed_record = json.loads(record.transformed_record)
assert isinstance(transformed_record, dict)


def test_transform_next_iter_uses_run_data_parsed_from_source_file(
libguides_transformer, libguides_input_file, run_id
):
record = next(libguides_transformer)
run_data = libguides_transformer.get_run_data(libguides_input_file, run_id)
assert record.run_date == datetime.datetime.strptime(
run_data["run_date"], "%Y-%m-%d"
).astimezone(datetime.UTC)
assert record.run_type == run_data["run_type"]
assert record.run_id == run_id


@pytest.mark.parametrize(
("transform_exception", "expected_action"),
[
(None, "index"),
(DeletedRecordEvent(timdex_record_id="libguides:guides-0"), "delete"),
(SkippedRecordEvent(source_record_id="guides-0"), "skip"),
(RuntimeError("totally unhandled event"), "error"),
],
)
def test_transformer_action_column_based_on_transformation_exception_handling(
libguides_transformer, transform_exception, expected_action
):
"""While Transmogrifier is often considered just an application to transform a source
record into a TIMDEX record, it also serves the purpose of determining if a source
record should be indexed or deleted, or skipped (no action taken), or handling when a
record cannot be transformed (unhandled error). This 'action' that Transmogrifier
determines for each record is captured in the 'action' column in the TIMDEX parquet
dataset.
This test ensures that the 'action' column values are correct given behavior
(exception handling) during transformation of each record.
"""

if transform_exception:
with mock.patch.object(libguides_transformer, "transform") as mocked_transform:
mocked_transform.side_effect = transform_exception
record = next(libguides_transformer)
assert mocked_transform.called
else:
record = next(libguides_transformer)
assert record.action == expected_action
24 changes: 14 additions & 10 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def test_transform_no_sentry_not_verbose(caplog, monkeypatch, runner, tmp_path):
[
"-i",
"tests/fixtures/datacite/datacite_records.xml",
"-o",
"--output-file",
outfile,
"-s",
"jpal",
Expand All @@ -39,7 +39,7 @@ def test_transform_with_sentry_and_verbose(caplog, monkeypatch, runner, tmp_path
[
"-i",
"tests/fixtures/datacite/datacite_records.xml",
"-o",
"--output-file",
outfile,
"-s",
"jpal",
Expand All @@ -66,7 +66,7 @@ def test_transform_no_records(runner, tmp_path):
[
"-i",
"tests/fixtures/no_records.xml",
"-o",
"--output-file",
outfile,
"-s",
"dspace",
Expand All @@ -83,7 +83,7 @@ def test_transform_deleted_records(caplog, runner, tmp_path):
[
"-i",
"tests/fixtures/record_deleted.xml",
"-o",
"--output-file",
outfile,
"-s",
"jpal",
Expand All @@ -96,7 +96,8 @@ def test_transform_deleted_records(caplog, runner, tmp_path):
) in caplog.text


def test_transform_run_id_argument_passed_and_used(caplog, runner, tmp_path):
def test_transform_run_id_argument_passed_and_used(monkeypatch, caplog, runner, tmp_path):
monkeypatch.setenv("ETL_VERSION", "2")
caplog.set_level("INFO")
run_id = "abc123"
with mock.patch(
Expand All @@ -112,15 +113,18 @@ def test_transform_run_id_argument_passed_and_used(caplog, runner, tmp_path):
"-r",
run_id,
"-i",
"tests/fixtures/datacite/datacite_records.xml",
"tests/fixtures/dataset/libguides-2024-06-03-full-extracted-records-to-index.xml",
"-o",
"/tmp/records.json",
"/tmp/dataset",
],
)
assert f"run_id set: '{run_id}'" in caplog.text


def test_transform_run_id_argument_not_passed_and_uuid_minted(caplog, runner, tmp_path):
def test_transform_run_id_argument_not_passed_and_uuid_minted(
monkeypatch, caplog, runner, tmp_path
):
monkeypatch.setenv("ETL_VERSION", "2")
caplog.set_level("INFO")
with mock.patch(
"transmogrifier.sources.transformer.Transformer.transform_and_write_output_files"
Expand All @@ -133,9 +137,9 @@ def test_transform_run_id_argument_not_passed_and_uuid_minted(caplog, runner, tm
"-s",
"alma",
"-i",
"tests/fixtures/datacite/datacite_records.xml",
"tests/fixtures/dataset/libguides-2024-06-03-full-extracted-records-to-index.xml",
"-o",
"/tmp/records.json",
"/tmp/dataset",
],
)
assert "explicit run_id not passed, minting new UUID" in caplog.text
Expand Down
2 changes: 1 addition & 1 deletion tests/test_temporary_feature_flagging.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def test_cli_etl_version_v1_invokes_v1_code(
[
"-i",
"/does/not/exist/alma-2023-01-13-full-extracted-records-to-index_01.xml",
"-o",
"--output-file",
"/does/not/exist/libguides.json",
"-s",
"libguides",
Expand Down
21 changes: 17 additions & 4 deletions transmogrifier/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,16 @@
help="Filepath for harvested input records to transform",
)
@click.option(
"-o",
"--output-file",
required=True,
help="Filepath to write output TIMDEX JSON records to",
required=False,
help="Filepath to write output TIMDEX JSON records to. NOTE: this option will be "
"removed when output to parquet is finalized.",
)
@click.option(
"-o",
"--output-location",
required=False,
help="Location of TIMDEX parquet dataset to write to.",
)
@click.option(
"-s",
Expand All @@ -50,6 +56,7 @@ def main(
source: str,
input_file: str,
output_file: str,
output_location: str,
run_id: str,
verbose: bool, # noqa: FBT001
) -> None:
Expand All @@ -65,9 +72,15 @@ def main(
etl_version = get_etl_version()
match etl_version:
case 1:
if output_file is None:
message = "--output-file must be set when using ETL_VERSION=1"
raise RuntimeError(message)
transformer.transform_and_write_output_files(output_file)
case 2:
transformer.write_to_parquet_dataset(output_file)
if output_location is None:
message = "-o / --output-location must be set when using ETL_VERSION=2"
raise RuntimeError(message)
transformer.write_to_parquet_dataset(output_location)

logger.info(
(
Expand Down
Loading

0 comments on commit 5efd355

Please sign in to comment.