Skip to content

Commit

Permalink
Merge pull request #15 from MITLibraries/TIMX-432-rework-dataset-part…
Browse files Browse the repository at this point in the history
…itions

Rework dataset partitions to only year, month, day
  • Loading branch information
jonavellecuerdo authored Dec 12, 2024
2 parents 5769260 + 8a30ca3 commit e1c0c6a
Show file tree
Hide file tree
Showing 8 changed files with 390 additions and 436 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,6 @@ cython_debug/

# PyCharm
.idea/

# VSCode
.vscode
8 changes: 4 additions & 4 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ verify_ssl = true
name = "pypi"

[packages]
attrs = "*"
boto3 = "*"
duckdb = "*"
pandas = "*"
Expand All @@ -14,15 +15,14 @@ black = "*"
boto3-stubs = {version = "*", extras = ["s3"]}
coveralls = "*"
ipython = "*"
moto = "*"
mypy = "*"
pandas-stubs = "*"
pre-commit = "*"
pytest-mock = "*"
pyarrow-stubs = "*"
pytest = "*"
ruff = "*"
setuptools = "*"
pandas-stubs = "*"
moto = "*"
pytest-mock = "*"

[requires]
python_version = "3.12"
476 changes: 246 additions & 230 deletions Pipfile.lock

Large diffs are not rendered by default.

7 changes: 1 addition & 6 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,7 @@ def sample_records_iter_without_partitions():

def _records_iter(num_records):
return generate_sample_records(
num_records,
source=None,
run_date=None,
run_type=None,
action=None,
run_id=None,
num_records, run_date="invalid run-date", year=None, month=None, day=None
)

return _records_iter
216 changes: 86 additions & 130 deletions tests/test_dataset_write.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,24 @@
# ruff: noqa: S105, S106, SLF001, PLR2004, PD901, D209, D205

import datetime
import math
import os
import re
from datetime import UTC, datetime
from unittest.mock import patch

import pyarrow.dataset as ds
import pytest

from tests.utils import generate_sample_records
from timdex_dataset_api.dataset import (
MAX_ROWS_PER_FILE,
TIMDEX_DATASET_SCHEMA,
DatasetNotLoadedError,
TIMDEXDataset,
)
from timdex_dataset_api.exceptions import InvalidDatasetRecordError
from timdex_dataset_api.record import DatasetRecord


def test_dataset_record_serialization():
def test_dataset_record_init():
values = {
"timdex_record_id": "alma:123",
"source_record": b"<record><title>Hello World.</title></record>",
Expand All @@ -26,38 +27,35 @@ def test_dataset_record_serialization():
"run_date": "2024-12-01",
"run_type": "full",
"action": "index",
"run_id": "abc123",
"run_id": "000-111-aaa-bbb",
}
dataset_record = DatasetRecord(**values)
assert dataset_record.to_dict() == values
record = DatasetRecord(**values)
assert record
assert (record.year, record.month, record.day) == (
"2024",
"12",
"01",
)


def test_dataset_record_serialization_with_partition_values_provided():
dataset_record = DatasetRecord(
timdex_record_id="alma:123",
source_record=b"<record><title>Hello World.</title></record>",
transformed_record=b"""{"title":["Hello World."]}""",
)
partition_values = {
"source": "alma",
"run_date": "2024-12-01",
"run_type": "daily",
"action": "index",
"run_id": "000-111-aaa-bbb",
}
assert dataset_record.to_dict(partition_values=partition_values) == {
def test_dataset_record_init_with_invalid_run_date_raise_error():
values = {
"timdex_record_id": "alma:123",
"source_record": b"<record><title>Hello World.</title></record>",
"transformed_record": b"""{"title":["Hello World."]}""",
"source": "alma",
"run_date": "2024-12-01",
"run_type": "daily",
"source": "libguides",
"run_date": "-12-01",
"run_type": "full",
"action": "index",
"run_id": "000-111-aaa-bbb",
}
with pytest.raises(
ValueError, match=re.escape("time data '-12-01' does not match format '%Y-%m-%d'")
):
DatasetRecord(**values)


def test_dataset_record_serialization_missing_partition_raise_error():
def test_dataset_record_serialization():
values = {
"timdex_record_id": "alma:123",
"source_record": b"<record><title>Hello World.</title></record>",
Expand All @@ -66,14 +64,22 @@ def test_dataset_record_serialization_missing_partition_raise_error():
"run_date": "2024-12-01",
"run_type": "full",
"action": "index",
"run_id": None, # <------ missing partition here
"run_id": "abc123",
}
dataset_record = DatasetRecord(**values)
with pytest.raises(
InvalidDatasetRecordError,
match="Partition values are missing: run_id",
):
assert dataset_record.to_dict() == values
assert dataset_record.to_dict() == {
"timdex_record_id": "alma:123",
"source_record": b"<record><title>Hello World.</title></record>",
"transformed_record": b"""{"title":["Hello World."]}""",
"source": "libguides",
"run_date": datetime(2024, 12, 1).astimezone(UTC),
"run_type": "full",
"action": "index",
"run_id": "abc123",
"year": "2024",
"month": "12",
"day": "01",
}


def test_dataset_write_records_to_new_dataset(new_dataset, sample_records_iter):
Expand Down Expand Up @@ -134,52 +140,6 @@ def test_dataset_write_to_multiple_locations_raise_error(sample_records_iter):
timdex_dataset.write(sample_records_iter(10))


def test_dataset_write_mixin_partition_values_used(
new_dataset, sample_records_iter_without_partitions
):
partition_values = {
"source": "alma",
"run_date": "2024-12-01",
"run_type": "daily",
"action": "index",
"run_id": "000-111-aaa-bbb",
}
_written_files = new_dataset.write(
sample_records_iter_without_partitions(10),
partition_values=partition_values,
)
new_dataset.reload()

# load as pandas dataframe and assert column values
df = new_dataset.dataset.to_table().to_pandas()
row = df.iloc[0]
assert row.source == partition_values["source"]
assert row.run_date == datetime.date(2024, 12, 1)
assert row.run_type == partition_values["run_type"]
assert row.action == partition_values["action"]
assert row.action == partition_values["action"]


def test_dataset_write_schema_partitions_correctly_ordered(
new_dataset, sample_records_iter
):
written_files = new_dataset.write(
sample_records_iter(10),
partition_values={
"source": "alma",
"run_date": "2024-12-01",
"run_type": "daily",
"run_id": "000-111-aaa-bbb",
"action": "index",
},
)
file = written_files[0]
assert (
"/source=alma/run_date=2024-12-01/run_type=daily"
"/run_id=000-111-aaa-bbb/action=index/" in file.path
)


def test_dataset_write_schema_applied_to_dataset(new_dataset, sample_records_iter):
new_dataset.write(sample_records_iter(10))

Expand All @@ -194,67 +154,63 @@ def test_dataset_write_schema_applied_to_dataset(new_dataset, sample_records_ite
assert set(dataset.schema.names) == set(TIMDEX_DATASET_SCHEMA.names)


def test_dataset_write_partition_deleted_when_written_to_again(
new_dataset, sample_records_iter
):
"""This tests the existing_data_behavior="delete_matching" configuration when writing
to a dataset."""
partition_values = {
"source": "alma",
"run_date": "2024-12-01",
"run_type": "daily",
"action": "index",
"run_id": "000-111-aaa-bbb",
}
def test_dataset_write_partition_for_single_source(new_dataset, sample_records_iter):
written_files = new_dataset.write(sample_records_iter(10))
assert len(written_files) == 1
assert os.path.exists(new_dataset.location)
assert "year=2024/month=12/day=01" in written_files[0].path

# perform FIRST write to run_date="2024-12-01"
written_files_1 = new_dataset.write(
sample_records_iter(10),
partition_values=partition_values,
)

# assert that files from first write are present at this time
assert os.path.exists(written_files_1[0].path)
def test_dataset_write_partition_for_multiple_sources(new_dataset, sample_records_iter):
# perform write for source="alma" and run_date="2024-12-01"
written_files_source_a = new_dataset.write(sample_records_iter(10))
new_dataset.reload()

# perform unrelated write with new run_date to confirm this is untouched during delete
new_partition_values = partition_values.copy()
new_partition_values["run_date"] = "2024-12-15"
new_partition_values["run_id"] = "222-333-ccc-ddd"
written_files_x = new_dataset.write(
sample_records_iter(7),
partition_values=new_partition_values,
)
assert os.path.exists(written_files_source_a[0].path)
assert new_dataset.row_count == 10

# perform SECOND write to run_date="2024-12-01", expecting this to delete everything
# under this combination of partitions (i.e. the first write)
written_files_2 = new_dataset.write(
sample_records_iter(10),
partition_values=partition_values,
# perform write for source="libguides" and run_date="2024-12-01"
written_files_source_b = new_dataset.write(
generate_sample_records(
num_records=7, timdex_record_id_prefix="libguides", source="libguides"
)
)

new_dataset.reload()

# assert 17 rows: second write for run_date="2024-12-01" @ 10 rows +
# run_date="2024-12-15" @ 5 rows
assert os.path.exists(written_files_source_b[0].path)
assert os.path.exists(written_files_source_a[0].path)
assert new_dataset.row_count == 17

# assert that files from first run_date="2024-12-01" are gone, second exist
# and files from run_date="2024-12-15" also exist
assert not os.path.exists(written_files_1[0].path)
assert os.path.exists(written_files_2[0].path)
assert os.path.exists(written_files_x[0].path)

def test_dataset_write_partition_ignore_existing_data(new_dataset, sample_records_iter):
# perform two (2) writes for source="alma" and run_date="2024-12-01"
written_files_source_a0 = new_dataset.write(sample_records_iter(10))
written_files_source_a1 = new_dataset.write(sample_records_iter(10))
new_dataset.reload()

def test_dataset_write_missing_partitions_raise_error(new_dataset, sample_records_iter):
missing_partition_values = {
"source": "libguides",
"run_date": None,
"run_type": None,
"action": None,
"run_id": None,
}
with pytest.raises(InvalidDatasetRecordError, match="Partition values are missing"):
_ = new_dataset.write(
sample_records_iter(10),
partition_values=missing_partition_values,
)
# assert that both files exist and no overwriting occurs
assert os.path.exists(written_files_source_a0[0].path)
assert os.path.exists(written_files_source_a1[0].path)
assert new_dataset.row_count == 20


@patch("timdex_dataset_api.dataset.uuid.uuid4")
def test_dataset_write_partition_overwrite_files_with_same_name(
mock_uuid, new_dataset, sample_records_iter
):
"""This test is to demonstrate existing_data_behavior="overwrite_or_ignore".
It is extremely unlikely for the uuid.uuid4 method to generate duplicate values,
so for testing purposes, this method is patched to return the same value
and therefore generate similarly named files.
"""
mock_uuid.return_value = "abc"

# perform two (2) writes for source="alma" and run_date="2024-12-01"
_ = new_dataset.write(sample_records_iter(10))
written_files_source_a1 = new_dataset.write(sample_records_iter(7))
new_dataset.reload()

# assert that only the second file exists and overwriting occurs
assert os.path.exists(written_files_source_a1[0].path)
assert new_dataset.row_count == 7
2 changes: 1 addition & 1 deletion timdex_dataset_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from timdex_dataset_api.dataset import TIMDEXDataset
from timdex_dataset_api.record import DatasetRecord

__version__ = "0.2.0"
__version__ = "0.3.0"

__all__ = [
"DatasetRecord",
Expand Down
Loading

0 comments on commit e1c0c6a

Please sign in to comment.