Skip to content

Commit

Permalink
Serialize datastore records
Browse files Browse the repository at this point in the history
When serializing DatasetRef, we now serialize its datastore records as well.  This is needed to support get() in RemoteButler.
  • Loading branch information
dhirving committed Nov 22, 2023
1 parent 0a2fea0 commit c262315
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 11 deletions.
43 changes: 41 additions & 2 deletions python/lsst/daf/butler/_dataset_ref.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
from ._config_support import LookupKey
from ._dataset_type import DatasetType, SerializedDatasetType
from ._named import NamedKeyDict
from .datastore.stored_file_info import StoredDatastoreItemInfo
from .datastore.stored_file_info import StoredDatastoreItemInfo, SerializedStoredDatastoreItemInfo
from .dimensions import DataCoordinate, DimensionGraph, DimensionUniverse, SerializedDataCoordinate
from .json import from_json_pydantic, to_json_pydantic
from .persistence_context import PersistenceContextVars
Expand All @@ -62,6 +62,7 @@
# Per-dataset records grouped by opaque table name, usually there is just one
# opaque table.
DatasetDatastoreRecords: TypeAlias = Mapping[str, Iterable[StoredDatastoreItemInfo]]
SerializedDatasetDatastoreRecords: TypeAlias = dict[str, list[SerializedStoredDatastoreItemInfo]]


class AmbiguousDatasetError(Exception):
Expand Down Expand Up @@ -191,6 +192,7 @@ class SerializedDatasetRef(_BaseModelCompat):
dataId: SerializedDataCoordinate | None = None
run: StrictStr | None = None
component: StrictStr | None = None
datastoreRecords: SerializedDatasetDatastoreRecords | None = None

if PYDANTIC_V2:
# Can not use "after" validator since in some cases the validator
Expand Down Expand Up @@ -231,6 +233,7 @@ def direct(
datasetType: dict[str, Any] | None = None,
dataId: dict[str, Any] | None = None,
component: str | None = None,
datastoreRecords: dict[str, Any] | None = None,
) -> SerializedDatasetRef:
"""Construct a `SerializedDatasetRef` directly without validators.
Expand All @@ -249,6 +252,11 @@ def direct(
SerializedDatasetType.direct(**datasetType) if datasetType is not None else None
)
serialized_dataId = SerializedDataCoordinate.direct(**dataId) if dataId is not None else None
serialized_datastore_records = (
pydantic.parse_obj_as(SerializedDatasetDatastoreRecords, datastoreRecords)
if datastoreRecords is not None
else None
)

node = cls.model_construct(
_fields_set=_serializedDatasetRefFieldsSet,
Expand All @@ -257,6 +265,7 @@ def direct(
dataId=serialized_dataId,
run=sys.intern(run),
component=component,
datastoreRecords=serialized_datastore_records,
)

return node
Expand Down Expand Up @@ -404,6 +413,9 @@ def to_simple(self, minimal: bool = False) -> SerializedDatasetRef:
Use minimal serialization. Requires Registry to convert
back to a full type.
datastore_records : `bool`, optional
Include the datastore records in the serialized object
Returns
-------
simple : `dict` or `int`
Expand All @@ -427,6 +439,7 @@ def to_simple(self, minimal: bool = False) -> SerializedDatasetRef:
dataId=self.dataId.to_simple(),
run=self.run,
id=self.id,
datastoreRecords=_serialize_datastore_records(self._datastore_records),
)

@classmethod
Expand Down Expand Up @@ -516,11 +529,13 @@ def from_simple(
"Run collection name is missing from serialized representation. "
f"Encountered with {simple!r}{dstr}."
)

ref = cls(
datasetType,
dataId,
id=simple.id,
run=simple.run,
datastore_records=_deserialize_datastore_records(simple.datastoreRecords),
)
if cache is not None:
if ref.datasetType.component() is not None:
Expand Down Expand Up @@ -856,8 +871,32 @@ class associated with the dataset type of the other ref can be
Cannot be changed after a `DatasetRef` is constructed.
"""

datastore_records: DatasetDatastoreRecords | None
_datastore_records: DatasetDatastoreRecords | None
"""Optional datastore records (`DatasetDatastoreRecords`).
Cannot be changed after a `DatasetRef` is constructed.
"""


def _serialize_datastore_records(
datastore_records: DatasetDatastoreRecords | None,
) -> SerializedDatasetDatastoreRecords | None:
if datastore_records is None:
return None

serialized_datastore_records: SerializedDatasetDatastoreRecords = {}
for opaque_name, records in datastore_records.items():
serialized_datastore_records[opaque_name] = [record.to_model() for record in records]
return serialized_datastore_records


def _deserialize_datastore_records(
serialized_datastore_records: SerializedDatasetDatastoreRecords | None,
) -> DatasetDatastoreRecords | None:
if serialized_datastore_records is None:
return None

datastore_records: dict[str, list[StoredDatastoreItemInfo]] = {}
for opaque_table, records in serialized_datastore_records.items():
datastore_records[opaque_table] = [StoredDatastoreItemInfo.from_model(r) for r in records]
return datastore_records
67 changes: 65 additions & 2 deletions python/lsst/daf/butler/datastore/stored_file_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,19 @@

from __future__ import annotations

__all__ = ("StoredDatastoreItemInfo", "StoredFileInfo")
__all__ = (
"StoredDatastoreItemInfo",
"StoredFileInfo",
"SerializedStoredFileInfo",
"SerializedStoredFileInfoData",
)

import inspect
from collections.abc import Iterable, Mapping
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, Literal

from lsst.daf.butler._compat import _BaseModelCompat
from lsst.resources import ResourcePath
from lsst.utils import doImportType
from lsst.utils.introspection import get_full_type_name
Expand Down Expand Up @@ -99,6 +105,16 @@ def to_record(self, **kwargs: Any) -> dict[str, Any]:
"""
raise NotImplementedError()

def to_model(self) -> SerializedStoredDatastoreItemInfo:
"""Return the record contents as a Pydantic model"""
raise NotImplementedError()

@classmethod
def from_model(cls, model: SerializedStoredDatastoreItemInfo) -> StoredDatastoreItemInfo:
# Currently the FileDatastore is the only Datastore that supports
# serialization of its datastore records
return StoredFileInfo.from_model(model)

def update(self, **kwargs: Any) -> StoredDatastoreItemInfo:
"""Create a new class with everything retained apart from the
specified values.
Expand Down Expand Up @@ -260,6 +276,14 @@ def to_record(self, **kwargs: Any) -> dict[str, Any]:
**kwargs,
)

def to_model(self) -> SerializedStoredDatastoreItemInfo:
record = self.to_record()
# We allow None on the model but the record contains a "null string"
# instead
record["component"] = self.component
data = SerializedStoredFileInfoData.model_validate(record)
return SerializedStoredFileInfo(type="file", data=data)

def file_location(self, factory: LocationFactory) -> Location:
"""Return the location of artifact.
Expand Down Expand Up @@ -307,6 +331,10 @@ def from_record(cls: type[StoredFileInfo], record: Mapping[str, Any]) -> StoredF
)
return info

@classmethod
def from_model(cls: type[StoredFileInfo], model: SerializedStoredFileInfo) -> StoredFileInfo:
return cls.from_record(dict(model.data))

def update(self, **kwargs: Any) -> StoredFileInfo:
new_args = {}
for k in self.__slots__:
Expand All @@ -320,3 +348,38 @@ def update(self, **kwargs: Any) -> StoredFileInfo:

def __reduce__(self) -> str | tuple[Any, ...]:
return (self.from_record, (self.to_record(),))


class SerializedStoredFileInfoData(_BaseModelCompat):
"""Serialized representation of `StoredFileInfo` properties"""

formatter: str
"""Fully-qualified name of Formatter."""

path: str
"""Path to dataset within Datastore."""

storage_class: str
"""Name of the StorageClass associated with Dataset."""

component: str | None
"""Component associated with this file. Can be None if the file does
not refer to a component of a composite."""

checksum: str | None
"""Checksum of the serialized dataset."""

file_size: int
"""Size of the serialized dataset in bytes."""


class SerializedStoredFileInfo(_BaseModelCompat):
"""Serialized representation of `StoredFileInfo`"""

type: Literal["file"]
data: SerializedStoredFileInfoData


# In theory this may become a discriminated union in the future, to support
# multiple DataStore types.
SerializedStoredDatastoreItemInfo = SerializedStoredFileInfo
4 changes: 0 additions & 4 deletions python/lsst/daf/butler/remote_butler/_remote_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,6 @@ def get_dataset(
"dimension_records": dimension_records,
"datastore_records": datastore_records,
}
if datastore_records:
raise ValueError("Datastore records can not yet be returned in client/server butler.")
if storage_class:
params["storage_class"] = storage_class_name
response = self._client.get(self._get_url(path), params=params)
Expand Down Expand Up @@ -295,8 +293,6 @@ def find_dataset(
# cache to generate list of collection names.
wildcards = CollectionWildcard.from_expression(collections)

if datastore_records:
raise ValueError("Datastore records can not yet be returned in client/server butler.")
if timespan:
raise ValueError("Timespan can not yet be used in butler client/server.")

Expand Down
6 changes: 3 additions & 3 deletions tests/test_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -709,13 +709,13 @@ def testJson(self) -> None:
s = ref.to_json()
self.assertEqual(DatasetRef.from_json(s, universe=self.universe), ref)

# Also test ref with datastore records, serialization does not
# preserve those.
# Also test ref with datastore records
ref = self._make_datastore_records(ref, "/path1", "/path2")
s = ref.to_json()
ref2 = DatasetRef.from_json(s, universe=self.universe)
self.assertEqual(ref2, ref)
self.assertIsNone(ref2._datastore_records)
self.assertIsNotNone(ref2._datastore_records)
self.assertEqual(ref2._datastore_records, ref._datastore_records)

def testFileDataset(self) -> None:
ref = DatasetRef(self.datasetType, self.dataId, run="somerun")
Expand Down
28 changes: 28 additions & 0 deletions tests/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import unittest
import uuid

from lsst.daf.butler.datastore.stored_file_info import StoredFileInfo

try:
# Failing to import any of these should disable the tests.
from fastapi.testclient import TestClient
Expand Down Expand Up @@ -127,6 +129,7 @@ def test_find_dataset(self):
self.assertIsInstance(ref, DatasetRef)
self.assertEqual(ref.id, uuid.UUID("e15ab039-bc8b-4135-87c5-90902a7c0b22"))
self.assertFalse(ref.dataId.hasRecords())
self.assertIsNone(ref._datastore_records)

# Try again with variation of parameters.
ref_new = self.butler.find_dataset(
Expand Down Expand Up @@ -174,6 +177,31 @@ def test_find_dataset(self):
self.assertIsNone(self.butler.get_dataset(uuid.uuid4()))
self.assertIsNone(self.butler.get_dataset(uuid.uuid4(), storage_class="NumpyArray"))

def test_find_dataset_datastore_records(self):
def check_datastore_records(ref_to_check):
records = ref_to_check._datastore_records
self.assertIsNotNone(records)
file_info = records["file_datastore_records"]
self.assertEqual(len(file_info), 3)
for f in file_info:
self.assertIsInstance(f, StoredFileInfo)
self.assertEqual(
file_info[0].formatter, "lsst.daf.butler.tests.testFormatters.MetricsExampleDataFormatter"
)
self.assertEqual(file_info[0].component, "data")
self.assertEqual(file_info[0].storageClass.name, "StructuredDataDataTest")

ref = self.butler.find_dataset(
"test_metric_comp",
{"instrument": "DummyCamComp", "visit": 423},
collections="ingest/run",
datastore_records=True,
)
check_datastore_records(ref)

ref2 = self.butler.get_dataset(ref.id, datastore_records=True)
check_datastore_records(ref2)

def test_instantiate_via_butler_http_search(self):
"""Ensure that the primary Butler constructor's automatic search logic
correctly locates and reads the configuration file and ends up with a
Expand Down

0 comments on commit c262315

Please sign in to comment.