Skip to content

Commit

Permalink
Add DirectButler.find_dataset
Browse files Browse the repository at this point in the history
  • Loading branch information
timj committed Oct 27, 2023
1 parent a983c5e commit 5fd972c
Show file tree
Hide file tree
Showing 11 changed files with 289 additions and 18 deletions.
11 changes: 10 additions & 1 deletion python/lsst/daf/butler/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,16 @@

# Do not import or lift symbols from 'server' or 'server_models'.
# Import the registry subpackage directly for other symbols.
from .registry import CollectionSearch, CollectionType, MissingDatasetTypeError, Registry, RegistryConfig
from .registry import (
CollectionArgType,
CollectionSearch,
CollectionType,
MissingCollectionError,
MissingDatasetTypeError,
NoDefaultCollectionError,
Registry,
RegistryConfig,
)
from .transfers import RepoExportContext, YamlRepoExportBackend, YamlRepoImportBackend
from .version import *

Expand Down
80 changes: 79 additions & 1 deletion python/lsst/daf/butler/_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,10 @@
from ._file_dataset import FileDataset
from ._limited_butler import LimitedButler
from ._storage_class import StorageClass
from ._timespan import Timespan
from .datastore import DatasetRefURIs, Datastore
from .dimensions import DataId, DimensionConfig
from .registry import Registry, RegistryConfig, _RegistryFactory
from .registry import CollectionArgType, Registry, RegistryConfig, _RegistryFactory
from .repo_relocation import BUTLER_ROOT_TAG
from .transfers import RepoExportContext

Expand Down Expand Up @@ -798,6 +799,83 @@ def get_dataset_type(self, name: str) -> DatasetType:
"""
raise NotImplementedError()

@abstractmethod
def find_dataset(
self,
datasetType: DatasetType | str,
dataId: DataId | None = None,
*,
collections: CollectionArgType | None = None,
timespan: Timespan | None = None,
datastore_records: bool = False,
**kwargs: Any,
) -> DatasetRef | None:
"""Find a dataset given its `DatasetType` and data ID.
This can be used to obtain a `DatasetRef` that permits the dataset to
be read from a `Datastore`. If the dataset is a component and can not
be found using the provided dataset type, a dataset ref for the parent
will be returned instead but with the correct dataset type.
Parameters
----------
datasetType : `DatasetType` or `str`
A `DatasetType` or the name of one. If this is a `DatasetType`
instance, its storage class will be respected and propagated to
the output, even if it differs from the dataset type definition
in the registry, as long as the storage classes are convertible.
dataId : `dict` or `DataCoordinate`, optional
A `dict`-like object containing the `Dimension` links that identify
the dataset within a collection.
collections : collection expression, optional
An expression that fully or partially identifies the collections to
search for the dataset; see
:ref:`daf_butler_collection_expressions` for more information.
Defaults to ``self.defaults.collections``.
timespan : `Timespan`, optional
A timespan that the validity range of the dataset must overlap.
If not provided, any `~CollectionType.CALIBRATION` collections
matched by the ``collections`` argument will not be searched.
**kwargs
Additional keyword arguments passed to
`DataCoordinate.standardize` to convert ``dataId`` to a true
`DataCoordinate` or augment an existing one.
Returns
-------
ref : `DatasetRef`
A reference to the dataset, or `None` if no matching Dataset
was found.
Raises
------
lsst.daf.butler.NoDefaultCollectionError
Raised if ``collections`` is `None` and
``self.collections`` is `None`.
LookupError
Raised if one or more data ID keys are missing.
lsst.daf.butler.registry.MissingDatasetTypeError
Raised if the dataset type does not exist.
lsst.daf.butler.MissingCollectionError
Raised if any of ``collections`` does not exist in the registry.
Notes
-----
This method simply returns `None` and does not raise an exception even
when the set of collections searched is intrinsically incompatible with
the dataset type, e.g. if ``datasetType.isCalibration() is False``, but
only `~CollectionType.CALIBRATION` collections are being searched.
This may make it harder to debug some lookup failures, but the behavior
is intentional; we consider it more important that failed searches are
reported consistently, regardless of the reason, and that adding
additional collections that do not contain a match to the search path
never changes the behavior.
This method handles component dataset types automatically, though most
other registry operations do not.
"""
raise NotImplementedError()

@abstractmethod
def retrieveArtifacts(
self,
Expand Down
22 changes: 21 additions & 1 deletion python/lsst/daf/butler/direct_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
)
from .progress import Progress
from .registry import (
CollectionArgType,
CollectionType,
ConflictingDefinitionError,
DataIdError,
Expand Down Expand Up @@ -846,7 +847,7 @@ def _findDatasetRef(
)
# Always lookup the DatasetRef, even if one is given, to ensure it is
# present in the current collection.
ref = self._registry.findDataset(
ref = self.find_dataset(
datasetType,
dataId,
collections=collections,
Expand Down Expand Up @@ -1321,6 +1322,25 @@ def getURI(
def get_dataset_type(self, name: str) -> DatasetType:
return self._registry.getDatasetType(name)

def find_dataset(
self,
datasetType: DatasetType | str,
dataId: DataId | None = None,
*,
collections: CollectionArgType | None = None,
timespan: Timespan | None = None,
datastore_records: bool = False,
**kwargs: Any,
) -> DatasetRef | None:
return self._registry.findDataset(
datasetType,
dataId,
collections=collections,
timespan=timespan,
dataset_records=datastore_records,
**kwargs,
)

def retrieveArtifacts(
self,
refs: Iterable[DatasetRef],
Expand Down
8 changes: 5 additions & 3 deletions python/lsst/daf/butler/registry/_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@

from __future__ import annotations

__all__ = ("Registry",)
__all__ = ("Registry", "CollectionArgType")

import contextlib
import logging
import re
from abc import ABC, abstractmethod
from collections.abc import Iterable, Iterator, Mapping, Sequence
from types import EllipsisType
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, TypeAlias

from .._dataset_association import DatasetAssociation
from .._dataset_ref import DatasetId, DatasetIdGenEnum, DatasetRef
Expand Down Expand Up @@ -64,7 +64,9 @@
_LOG = logging.getLogger(__name__)

# TYpe alias for `collections` arguments.
CollectionArgType = str | re.Pattern | Iterable[str | re.Pattern] | EllipsisType | CollectionWildcard
CollectionArgType: TypeAlias = (
str | re.Pattern | Iterable[str | re.Pattern] | EllipsisType | CollectionWildcard
)


class Registry(ABC):
Expand Down
78 changes: 75 additions & 3 deletions python/lsst/daf/butler/remote_butler/_remote_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,20 @@
from .._butler_config import ButlerConfig
from .._config import Config
from .._dataset_existence import DatasetExistence
from .._dataset_ref import DatasetIdGenEnum, DatasetRef
from .._dataset_ref import DatasetIdGenEnum, DatasetRef, SerializedDatasetRef
from .._dataset_type import DatasetType, SerializedDatasetType
from .._deferredDatasetHandle import DeferredDatasetHandle
from .._file_dataset import FileDataset
from .._limited_butler import LimitedButler
from .._storage_class import StorageClass
from .._timespan import Timespan
from ..datastore import DatasetRefURIs
from ..dimensions import DataId, DimensionConfig, DimensionUniverse
from ..registry import Registry, RegistryDefaults
from ..dimensions import DataCoordinate, DataId, DimensionConfig, DimensionUniverse, SerializedDataCoordinate
from ..registry import CollectionArgType, NoDefaultCollectionError, Registry, RegistryDefaults
from ..registry.wildcards import CollectionWildcard
from ..transfers import RepoExportContext
from ._config import RemoteButlerConfigModel
from .server import FindDatasetModel


class RemoteButler(Butler):
Expand Down Expand Up @@ -100,6 +103,39 @@ def dimensions(self) -> DimensionUniverse:
self._dimensions = DimensionUniverse(config)
return self._dimensions

def _simplify_dataId(
self, dataId: DataId | None, **kwargs: dict[str, int | str]
) -> SerializedDataCoordinate | None:
"""Take a generic Data ID and convert it to a serializable form.
Parameters
----------
dataId : `dict`, `None`, `DataCoordinate`
The data ID to serialize.
**kwargs : `dict`
Additional values that should be included if this is not
a `DataCoordinate`.
Returns
-------
data_id : `SerializedDataCoordinate` or `None`
A serializable form.
"""
if dataId is None and not kwargs:
return None
if isinstance(dataId, DataCoordinate):
return dataId.to_simple()

if dataId is None:
data_id = kwargs
elif kwargs:
# Change variable because DataId is immutable and mypy complains.
data_id = dict(dataId)
data_id.update(kwargs)

# Assume we can treat it as a dict.
return SerializedDataCoordinate(dataId=data_id)

def transaction(self) -> AbstractContextManager[None]:
"""Will always raise NotImplementedError.
Transactions are not supported by RemoteButler.
Expand Down Expand Up @@ -183,6 +219,42 @@ def get_dataset_type(self, name: str) -> DatasetType:
response.raise_for_status()
return DatasetType.from_simple(SerializedDatasetType(**response.json()), universe=self.dimensions)

def find_dataset(
self,
datasetType: DatasetType | str,
dataId: DataId | None = None,
*,
collections: CollectionArgType | None = None,
timespan: Timespan | None = None,
datastore_records: bool = False,
**kwargs: Any,
) -> DatasetRef | None:
if collections is None:
if not self.collections:
raise NoDefaultCollectionError(
"No collections provided to find_dataset, and no defaults from butler construction."
)
collections = self.collections
# Temporary hack. Assume strings for collections. In future
# want to construct CollectionWildcard and filter it through collection
# cache to generate list of collection names.
wildcards = CollectionWildcard.from_expression(collections)

if isinstance(datasetType, DatasetType):
datasetType = datasetType.name

query = FindDatasetModel(
dataId=self._simplify_dataId(dataId, **kwargs), collections=wildcards.strings
)

path = f"find_dataset/{datasetType}"
response = self._client.post(
self._get_url(path), json=query.model_dump(mode="json", exclude_unset=True)
)
response.raise_for_status()

return DatasetRef.from_simple(SerializedDatasetRef(**response.json()), universe=self.dimensions)

def retrieveArtifacts(
self,
refs: Iterable[DatasetRef],
Expand Down
1 change: 1 addition & 0 deletions python/lsst/daf/butler/remote_butler/server/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@

from ._factory import *
from ._server import *
from ._server_models import *
53 changes: 52 additions & 1 deletion python/lsst/daf/butler/remote_butler/server/_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,16 @@

from fastapi import Depends, FastAPI
from fastapi.middleware.gzip import GZipMiddleware
from lsst.daf.butler import Butler, SerializedDatasetType
from lsst.daf.butler import (
Butler,
DataCoordinate,
SerializedDataCoordinate,
SerializedDatasetRef,
SerializedDatasetType,
)

from ._factory import Factory
from ._server_models import FindDatasetModel

BUTLER_ROOT = "ci_hsc_gen3/DATA"

Expand All @@ -56,6 +63,26 @@ def factory_dependency() -> Factory:
return Factory(butler=_make_global_butler())


def unpack_dataId(butler: Butler, data_id: SerializedDataCoordinate | None) -> DataCoordinate | None:
"""Convert the serialized dataId back to full DataCoordinate.
Parameters
----------
butler : `lsst.daf.butler.Butler`
The butler to use for registry and universe.
data_id : `SerializedDataCoordinate` or `None`
The serialized form.
Returns
-------
dataId : `DataCoordinate` or `None`
The DataId usable by registry.
"""
if data_id is None:
return None
return DataCoordinate.from_simple(data_id, registry=butler.registry)


@app.get("/butler/v1/universe", response_model=dict[str, Any])
def get_dimension_universe(factory: Factory = Depends(factory_dependency)) -> dict[str, Any]:
"""Allow remote client to get dimensions definition."""
Expand All @@ -78,3 +105,27 @@ def get_dataset_type(
butler = factory.create_butler()
datasetType = butler.get_dataset_type(dataset_type_name)
return datasetType.to_simple()


# Not yet supported: TimeSpan is not yet a pydantic model.
# collections parameter assumes client-side has resolved regexes.
@app.post(
"/butler/v1/find_dataset/{dataset_type}",
summary="Retrieve this dataset definition from collection, dataset type, and dataId",
response_model=SerializedDatasetRef,
response_model_exclude_unset=True,
response_model_exclude_defaults=True,
response_model_exclude_none=True,
)
def find_dataset(
dataset_type: str,
query: FindDatasetModel,
factory: Factory = Depends(factory_dependency),
) -> SerializedDatasetRef | None:
collection_query = query.collections if query.collections else None

butler = factory.create_butler()
ref = butler.find_dataset(
dataset_type, dataId=unpack_dataId(butler, query.dataId), collections=collection_query
)
return ref.to_simple() if ref else None
11 changes: 11 additions & 0 deletions python/lsst/daf/butler/remote_butler/server/_server_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,14 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""Models used for client/server communication."""

__all__ = ["FindDatasetModel"]

from lsst.daf.butler import SerializedDataCoordinate

from ..._compat import _BaseModelCompat


class FindDatasetModel(_BaseModelCompat):
dataId: SerializedDataCoordinate
collections: list[str]
Loading

0 comments on commit 5fd972c

Please sign in to comment.