diff --git a/doc/changes/DM-46776.feature.rst b/doc/changes/DM-46776.feature.rst new file mode 100644 index 0000000000..3d4e8d0015 --- /dev/null +++ b/doc/changes/DM-46776.feature.rst @@ -0,0 +1,7 @@ +* Added ``Butler.retrieve_artifacts_zip`` and ``QuantumBackedButler.retrieve_artifacts_zip`` methods to retrieve the dataset artifacts and store them into a zip file. +* Added ``Butler.ingest_zip`` to ingest the contents of a Zip file. +* Added ``SerializedDatasetRefContainerV1`` class to allow a collection of ``DatasetRef`` to be serialized efficiently. + JSON serializations made using this class will be supported. +* Added ``--zip`` parameter to ``butler retrieve-artifacts``. +* Changed ``Butler.retrieveArtifacts`` to always write a JSON index file describing where the artifacts came from. +* Added a ``butler ingest-zip`` command-line tool for ingesting zip files created by ``butler retrieve-artifacts``. diff --git a/python/lsst/daf/butler/_butler.py b/python/lsst/daf/butler/_butler.py index c06935bea0..fbfedd6bb7 100644 --- a/python/lsst/daf/butler/_butler.py +++ b/python/lsst/daf/butler/_butler.py @@ -987,6 +987,39 @@ def find_dataset( """ raise NotImplementedError() + @abstractmethod + def retrieve_artifacts_zip( + self, + refs: Iterable[DatasetRef], + destination: ResourcePathExpression, + overwrite: bool = True, + ) -> ResourcePath: + """Retrieve artifacts from a Butler and place in ZIP file. + + Parameters + ---------- + refs : `~collections.abc.Iterable` [ `DatasetRef` ] + The datasets to be included in the zip file. + destination : `lsst.resources.ResourcePathExpression` + Directory to write the new ZIP file. This directory will + also be used as a staging area for the datasets being downloaded + from the datastore. + overwrite : `bool`, optional + If `False` the output Zip will not be written if a file of the + same name is already present in ``destination``. + + Returns + ------- + zip_file : `lsst.resources.ResourcePath` + The path to the new ZIP file. + + Raises + ------ + ValueError + Raised if there are no refs to retrieve. + """ + raise NotImplementedError() + @abstractmethod def retrieveArtifacts( self, @@ -1202,6 +1235,25 @@ def ingest( """ raise NotImplementedError() + @abstractmethod + def ingest_zip(self, zip_file: ResourcePathExpression, transfer: str = "auto") -> None: + """Ingest a Zip file into this butler. + + The Zip file must have been created by `retrieve_artifacts_zip`. + + Parameters + ---------- + zip_file : `lsst.resources.ResourcePathExpression` + Path to the Zip file. + transfer : `str`, optional + Method to use to transfer the Zip into the datastore. + + Notes + ----- + Run collections are created as needed. + """ + raise NotImplementedError() + @abstractmethod def export( self, diff --git a/python/lsst/daf/butler/_dataset_ref.py b/python/lsst/daf/butler/_dataset_ref.py index 250e86e87e..9e25b98c14 100644 --- a/python/lsst/daf/butler/_dataset_ref.py +++ b/python/lsst/daf/butler/_dataset_ref.py @@ -34,13 +34,26 @@ "DatasetIdGenEnum", "DatasetRef", "SerializedDatasetRef", + "SerializedDatasetRefContainerV1", + "SerializedDatasetRefContainers", ] import enum +import logging import sys import uuid from collections.abc import Iterable, Mapping -from typing import TYPE_CHECKING, Any, ClassVar, Literal, Protocol, TypeAlias, runtime_checkable +from typing import ( + TYPE_CHECKING, + Annotated, + Any, + ClassVar, + Literal, + Protocol, + Self, + TypeAlias, + runtime_checkable, +) import pydantic from lsst.utils.classes import immutable @@ -50,7 +63,13 @@ from ._dataset_type import DatasetType, SerializedDatasetType from ._named import NamedKeyDict from .datastore.stored_file_info import StoredDatastoreItemInfo -from .dimensions import DataCoordinate, DimensionGroup, DimensionUniverse, SerializedDataCoordinate +from .dimensions import ( + DataCoordinate, + DimensionGroup, + DimensionUniverse, + SerializedDataCoordinate, + SerializedDataId, +) from .json import from_json_pydantic, to_json_pydantic from .persistence_context import PersistenceContextVars @@ -63,6 +82,9 @@ DatasetDatastoreRecords: TypeAlias = Mapping[str, list[StoredDatastoreItemInfo]] +_LOG = logging.getLogger(__name__) + + class AmbiguousDatasetError(Exception): """Raised when a `DatasetRef` is not resolved but should be. @@ -864,3 +886,153 @@ class associated with the dataset type of the other ref can be Cannot be changed after a `DatasetRef` is constructed. """ + + +class MinimalistSerializableDatasetRef(pydantic.BaseModel): + """Minimal information needed to define a DatasetRef. + + The ID is not included and is presumed to be the key to a mapping + to this information. + """ + + model_config = pydantic.ConfigDict(frozen=True) + + dataset_type_name: str + """Name of the dataset type.""" + + run: str + """Name of the RUN collection.""" + + data_id: SerializedDataId + """Data coordinate of this dataset.""" + + +class SerializedDatasetRefContainer(pydantic.BaseModel): + """Serializable model for a collection of DatasetRef. + + Dimension records are not included. + """ + + model_config = pydantic.ConfigDict(extra="allow", frozen=True) + container_version: str + + +class SerializedDatasetRefContainerV1(SerializedDatasetRefContainer): + """Serializable model for a collection of DatasetRef. + + Dimension records are not included. + """ + + container_version: Literal["V1"] = "V1" + + universe_version: int + """Dimension universe version.""" + + universe_namespace: str + """Dimension universe namespace.""" + + dataset_types: dict[str, SerializedDatasetType] + """Dataset types indexed by their name.""" + + compact_refs: dict[uuid.UUID, MinimalistSerializableDatasetRef] + """Minimal dataset ref information indexed by UUID.""" + + def __len__(self) -> int: + """Return the number of datasets in the container.""" + return len(self.compact_refs) + + @classmethod + def from_refs(cls, refs: Iterable[DatasetRef]) -> Self: + """Construct a serializable form from a list of `DatasetRef`. + + Parameters + ---------- + refs : `~collections.abc.Iterable` [ `DatasetRef` ] + The datasets to include in the container. + """ + # The serialized DatasetRef contains a lot of duplicated information. + # We also want to drop dimension records and assume that the records + # are already in the registry. + universe: DimensionUniverse | None = None + dataset_types: dict[str, SerializedDatasetType] = {} + compact_refs: dict[uuid.UUID, MinimalistSerializableDatasetRef] = {} + for ref in refs: + simple_ref = ref.to_simple() + dataset_type = simple_ref.datasetType + assert dataset_type is not None # For mypy + if universe is None: + universe = ref.datasetType.dimensions.universe + if (name := dataset_type.name) not in dataset_types: + dataset_types[name] = dataset_type + data_id = simple_ref.dataId + assert data_id is not None # For mypy + compact_refs[simple_ref.id] = MinimalistSerializableDatasetRef( + dataset_type_name=name, run=simple_ref.run, data_id=data_id.dataId + ) + if universe: + universe_version = universe.version + universe_namespace = universe.namespace + else: + # No refs so no universe. + universe_version = 0 + universe_namespace = "unknown" + return cls( + universe_version=universe_version, + universe_namespace=universe_namespace, + dataset_types=dataset_types, + compact_refs=compact_refs, + ) + + def to_refs(self, universe: DimensionUniverse) -> list[DatasetRef]: + """Construct the original `DatasetRef`. + + Parameters + ---------- + universe : `DimensionUniverse` + The universe to use when constructing the `DatasetRef`. + + Returns + ------- + refs : `list` [ `DatasetRef` ] + The `DatasetRef` that were serialized. + """ + if not self.compact_refs: + return [] + + if universe.namespace != self.universe_namespace: + raise RuntimeError( + f"Can not convert to refs in universe {universe.namespace} that were created from " + f"universe {self.universe_namespace}" + ) + + if universe.version != self.universe_version: + _LOG.warning( + "Universe mismatch when attempting to reconstruct DatasetRef from serialized form. " + "Serialized with version %d but asked to use version %d.", + self.universe_version, + universe.version, + ) + + # Reconstruct the DatasetType objects. + dataset_types = { + name: DatasetType.from_simple(dtype, universe=universe) + for name, dtype in self.dataset_types.items() + } + refs: list[DatasetRef] = [] + for id_, minimal in self.compact_refs.items(): + simple_data_id = SerializedDataCoordinate(dataId=minimal.data_id) + data_id = DataCoordinate.from_simple(simple=simple_data_id, universe=universe) + ref = DatasetRef( + id=id_, + run=minimal.run, + datasetType=dataset_types[minimal.dataset_type_name], + dataId=data_id, + ) + refs.append(ref) + return refs + + +SerializedDatasetRefContainers: TypeAlias = Annotated[ + SerializedDatasetRefContainerV1, + pydantic.Field(discriminator="container_version"), +] diff --git a/python/lsst/daf/butler/_dataset_type.py b/python/lsst/daf/butler/_dataset_type.py index c7b546cc94..e9ba515ef9 100644 --- a/python/lsst/daf/butler/_dataset_type.py +++ b/python/lsst/daf/butler/_dataset_type.py @@ -676,7 +676,7 @@ def to_simple(self, minimal: bool = False) -> SerializedDatasetType: "name": self.name, "storageClass": self._storageClassName, "isCalibration": self._isCalibration, - "dimensions": list(self._dimensions.names), + "dimensions": list(self._dimensions.required), } if self._parentStorageClassName is not None: diff --git a/python/lsst/daf/butler/_formatter.py b/python/lsst/daf/butler/_formatter.py index 5cdd75fd82..e0378aa301 100644 --- a/python/lsst/daf/butler/_formatter.py +++ b/python/lsst/daf/butler/_formatter.py @@ -439,8 +439,8 @@ def read( # direct read from URI option and the contents of the Zip file must # be extracted. uri = self.file_descriptor.location.uri - if uri.fragment and uri.fragment.startswith("zip-path="): - _, path_in_zip = uri.fragment.split("=") + if uri.fragment and uri.unquoted_fragment.startswith("zip-path="): + _, _, path_in_zip = uri.unquoted_fragment.partition("=") # Open the Zip file using ResourcePath. with uri.open("rb") as fd: diff --git a/python/lsst/daf/butler/_quantum_backed.py b/python/lsst/daf/butler/_quantum_backed.py index b1594efcc8..067fb93c45 100644 --- a/python/lsst/daf/butler/_quantum_backed.py +++ b/python/lsst/daf/butler/_quantum_backed.py @@ -39,7 +39,7 @@ from typing import TYPE_CHECKING, Any import pydantic -from lsst.resources import ResourcePathExpression +from lsst.resources import ResourcePath, ResourcePathExpression from ._butler_config import ButlerConfig from ._config import Config @@ -51,6 +51,7 @@ from ._storage_class import StorageClass, StorageClassFactory from .datastore import Datastore from .datastore.record_data import DatastoreRecordData, SerializedDatastoreRecordData +from .datastores.file_datastore.retrieve_artifacts import retrieve_and_zip from .dimensions import DimensionUniverse from .registry.bridge.monolithic import MonolithicDatastoreRegistryBridgeManager from .registry.databases.sqlite import SqliteDatabase @@ -498,6 +499,38 @@ def pruneDatasets( # Point of no return for removing artifacts self._datastore.emptyTrash() + def retrieve_artifacts_zip( + self, + refs: Iterable[DatasetRef], + destination: ResourcePathExpression, + overwrite: bool = True, + ) -> ResourcePath: + """Retrieve artifacts from the graph and place in ZIP file. + + Parameters + ---------- + refs : `~collections.abc.Iterable` [ `DatasetRef` ] + The datasets to be included in the zip file. + destination : `lsst.resources.ResourcePathExpression` + Directory to write the new ZIP file. This directory will + also be used as a staging area for the datasets being downloaded + from the datastore. + overwrite : `bool`, optional + If `False` the output Zip will not be written if a file of the + same name is already present in ``destination``. + + Returns + ------- + zip_file : `lsst.resources.ResourcePath` + The path to the new ZIP file. + + Raises + ------ + ValueError + Raised if there are no refs to retrieve. + """ + return retrieve_and_zip(refs, destination, self._datastore.retrieveArtifacts, overwrite) + def extract_provenance_data(self) -> QuantumProvenanceData: """Extract provenance information and datastore records from this butler. diff --git a/python/lsst/daf/butler/cli/cmd/__init__.py b/python/lsst/daf/butler/cli/cmd/__init__.py index 5b4c9b4164..fa12602ca1 100644 --- a/python/lsst/daf/butler/cli/cmd/__init__.py +++ b/python/lsst/daf/butler/cli/cmd/__init__.py @@ -35,6 +35,7 @@ "config_validate", "export_calibs", "ingest_files", + "ingest_zip", "prune_datasets", "query_collections", "query_data_ids", @@ -61,6 +62,7 @@ create, export_calibs, ingest_files, + ingest_zip, prune_datasets, query_collections, query_data_ids, diff --git a/python/lsst/daf/butler/cli/cmd/commands.py b/python/lsst/daf/butler/cli/cmd/commands.py index 119be85c20..8b55614115 100644 --- a/python/lsst/daf/butler/cli/cmd/commands.py +++ b/python/lsst/daf/butler/cli/cmd/commands.py @@ -602,17 +602,28 @@ def query_dimension_records(**kwargs: Any) -> None: default=False, help="If clobber, overwrite files if they exist locally.", ) +@click.option( + "--zip/--no-zip", + is_flag=True, + default=False, + help="Retrieve artifacts and place in a Zip file.", +) @options_file_option() def retrieve_artifacts(**kwargs: Any) -> None: """Retrieve file artifacts associated with datasets in a repository.""" verbose = kwargs.pop("verbose") transferred = script.retrieveArtifacts(**kwargs) - if verbose and transferred: - print(f"Transferred the following to {kwargs['destination']}:") - for uri in transferred: - print(uri) - print() - print(f"Number of artifacts retrieved into destination {kwargs['destination']}: {len(transferred)}") + if not transferred: + print("No datasets matched query.") + elif kwargs["zip"]: + print(f"Zip file written to {transferred[0]}") + else: + if verbose: + print(f"Transferred the following to {kwargs['destination']}:") + for uri in transferred: + print(uri) + print() + print(f"Number of artifacts retrieved into destination {kwargs['destination']}: {len(transferred)}") @click.command(cls=ButlerCommand) @@ -804,3 +815,18 @@ def export_calibs(*args: Any, **kwargs: Any) -> None: table = script.exportCalibs(*args, **kwargs) if table: table.pprint_all(align="<") + + +@click.command(cls=ButlerCommand) +@repo_argument(required=True) +@click.argument("zip", required=True) +@transfer_option() +def ingest_zip(**kwargs: Any) -> None: + """Ingest a Zip file created by retrieve-artifacts. + + ZIP is the URI to the Zip file that should be ingested. + + This command does not create dimension records and so any records must + be created by other means. + """ + script.ingest_zip(**kwargs) diff --git a/python/lsst/daf/butler/datastore/_datastore.py b/python/lsst/daf/butler/datastore/_datastore.py index 8514aacc24..7b09beda60 100644 --- a/python/lsst/daf/butler/datastore/_datastore.py +++ b/python/lsst/daf/butler/datastore/_datastore.py @@ -64,6 +64,7 @@ from .._dataset_ref import DatasetRef from .._dataset_type import DatasetType from .._storage_class import StorageClass + from ..datastores.file_datastore.retrieve_artifacts import ArtifactIndexInfo from ..registry.interfaces import DatasetIdRef, DatastoreRegistryBridgeManager from .record_data import DatastoreRecordData from .stored_file_info import StoredDatastoreItemInfo @@ -1023,7 +1024,9 @@ def retrieveArtifacts( transfer: str = "auto", preserve_path: bool = True, overwrite: bool = False, - ) -> list[ResourcePath]: + write_index: bool = True, + add_prefix: bool = False, + ) -> dict[ResourcePath, ArtifactIndexInfo]: """Retrieve the artifacts associated with the supplied refs. Parameters @@ -1045,22 +1048,44 @@ def retrieveArtifacts( overwrite : `bool`, optional If `True` allow transfers to overwrite existing files at the destination. + write_index : `bool`, optional + If `True` write a file at the top level containing a serialization + of a `ZipIndex` for the downloaded datasets. + add_prefix : `bool`, optional + If `True` and if ``preserve_path`` is `False`, apply a prefix to + the filenames corresponding to some part of the dataset ref ID. + This can be used to guarantee uniqueness. Returns ------- - targets : `list` of `lsst.resources.ResourcePath` - URIs of file artifacts in destination location. Order is not - preserved. + artifact_map : `dict` [ `lsst.resources.ResourcePath`, \ + `ArtifactIndexInfo` ] + Mapping of retrieved file to associated index information. Notes ----- For non-file datastores the artifacts written to the destination may not match the representation inside the datastore. For example - a hierarchichal data structure in a NoSQL database may well be stored + a hierarchical data structure in a NoSQL database may well be stored as a JSON file. """ raise NotImplementedError() + @abstractmethod + def ingest_zip(self, zip_path: ResourcePath, transfer: str | None) -> None: + """Ingest an indexed Zip file and contents. + + The Zip file must have an index file as created by `retrieveArtifacts`. + + Parameters + ---------- + zip_path : `lsst.resources.ResourcePath` + Path to the Zip file. + transfer : `str` + Method to use for transferring the Zip file into the datastore. + """ + raise NotImplementedError() + @abstractmethod def remove(self, datasetRef: DatasetRef) -> None: """Indicate to the Datastore that a Dataset can be removed. @@ -1451,6 +1476,9 @@ def getURIs(self, datasetRef: DatasetRef, predict: bool = False) -> DatasetRefUR def getURI(self, datasetRef: DatasetRef, predict: bool = False) -> ResourcePath: raise FileNotFoundError("This is a no-op datastore that can not access a real datastore") + def ingest_zip(self, zip_path: ResourcePath, transfer: str | None) -> None: + raise NotImplementedError("Can only ingest a Zip into a real datastore.") + def retrieveArtifacts( self, refs: Iterable[DatasetRef], @@ -1458,7 +1486,9 @@ def retrieveArtifacts( transfer: str = "auto", preserve_path: bool = True, overwrite: bool = False, - ) -> list[ResourcePath]: + write_index: bool = True, + add_prefix: bool = False, + ) -> dict[ResourcePath, ArtifactIndexInfo]: raise NotImplementedError("This is a no-op datastore that can not access a real datastore") def remove(self, datasetRef: DatasetRef) -> None: diff --git a/python/lsst/daf/butler/datastore/stored_file_info.py b/python/lsst/daf/butler/datastore/stored_file_info.py index ce9eb3303b..513d81e926 100644 --- a/python/lsst/daf/butler/datastore/stored_file_info.py +++ b/python/lsst/daf/butler/datastore/stored_file_info.py @@ -365,6 +365,13 @@ def update(self, **kwargs: Any) -> StoredFileInfo: def __reduce__(self) -> str | tuple[Any, ...]: return (self.from_record, (self.to_record(),)) + @property + def artifact_path(self) -> str: + """Path to dataset as stored in Datastore with fragments removed.""" + if "#" in self.path: + return self.path[: self.path.rfind("#")] + return self.path + class SerializedStoredFileInfo(pydantic.BaseModel): """Serialized representation of `StoredFileInfo` properties.""" @@ -378,11 +385,11 @@ class SerializedStoredFileInfo(pydantic.BaseModel): storage_class: str """Name of the StorageClass associated with Dataset.""" - component: str | None + component: str | None = None """Component associated with this file. Can be `None` if the file does not refer to a component of a composite.""" - checksum: str | None + checksum: str | None = None """Checksum of the serialized dataset.""" file_size: int diff --git a/python/lsst/daf/butler/datastores/chainedDatastore.py b/python/lsst/daf/butler/datastores/chainedDatastore.py index 09432a2c21..d2f1859fa7 100644 --- a/python/lsst/daf/butler/datastores/chainedDatastore.py +++ b/python/lsst/daf/butler/datastores/chainedDatastore.py @@ -38,7 +38,7 @@ from collections.abc import Callable, Collection, Iterable, Mapping, Sequence from typing import TYPE_CHECKING, Any -from lsst.daf.butler import DatasetRef, DatasetTypeNotSupportedError, FileDataset +from lsst.daf.butler import DatasetRef, DatasetTypeNotSupportedError, DimensionUniverse, FileDataset from lsst.daf.butler.datastore import ( DatasetRefURIs, Datastore, @@ -48,6 +48,7 @@ ) from lsst.daf.butler.datastore.constraints import Constraints from lsst.daf.butler.datastore.record_data import DatastoreRecordData +from lsst.daf.butler.datastores.file_datastore.retrieve_artifacts import ArtifactIndexInfo, ZipIndex from lsst.resources import ResourcePath from lsst.utils import doImportType @@ -113,6 +114,9 @@ class ChainedDatastore(Datastore): datastoreConstraints: Sequence[Constraints | None] """Constraints to be applied to each of the child datastores.""" + universe: DimensionUniverse + """Dimension universe associated with the butler.""" + @classmethod def setConfigRoot(cls, root: str, config: Config, full: Config, overwrite: bool = True) -> None: """Set any filesystem-dependent config options for child Datastores to @@ -222,6 +226,8 @@ def __init__( else: self.datastoreConstraints = (None,) * len(self.datastores) + self.universe = bridgeManager.universe + log.debug("Created %s (%s)", self.name, ("ephemeral" if self.isEphemeral else "permanent")) @classmethod @@ -804,7 +810,9 @@ def retrieveArtifacts( transfer: str = "auto", preserve_path: bool = True, overwrite: bool = False, - ) -> list[ResourcePath]: + write_index: bool = True, + add_prefix: bool = False, + ) -> dict[ResourcePath, ArtifactIndexInfo]: """Retrieve the file artifacts associated with the supplied refs. Parameters @@ -826,12 +834,18 @@ def retrieveArtifacts( overwrite : `bool`, optional If `True` allow transfers to overwrite existing files at the destination. + write_index : `bool`, optional + If `True` write a file at the top level containing a serialization + of a `ZipIndex` for the downloaded datasets. + add_prefix : `bool`, optional + Add a prefix based on the DatasetId. Only used if ``preserve_path`` + is `False`. Returns ------- - targets : `list` of `lsst.resources.ResourcePath` - URIs of file artifacts in destination location. Order is not - preserved. + artifact_map : `dict` [ `lsst.resources.ResourcePath`, \ + `ArtifactIndexInfo` ] + Mapping of retrieved file to associated index information. """ if not destination.isdir(): raise ValueError(f"Destination location must refer to a directory. Given {destination}") @@ -855,7 +869,12 @@ def retrieveArtifacts( # cache is exactly what we should be doing. continue try: - datastore_refs = {ref for ref in pending if datastore.exists(ref)} + # Checking file existence is expensive. Have the option + # of checking whether the datastore knows of these datasets + # instead, which is fast but can potentially lead to + # retrieveArtifacts failing. + knows = datastore.knows_these(pending) + datastore_refs = {ref for ref, exists in knows.items() if exists} except NotImplementedError: # Some datastores may not support retrieving artifacts continue @@ -871,19 +890,92 @@ def retrieveArtifacts( raise RuntimeError(f"Some datasets were not found in any datastores: {pending}") # Now do the transfer. - targets: list[ResourcePath] = [] + merged_artifact_map: dict[ResourcePath, ArtifactIndexInfo] = {} for number, datastore_refs in grouped_by_datastore.items(): - targets.extend( - self.datastores[number].retrieveArtifacts( - datastore_refs, - destination, - transfer=transfer, - preserve_path=preserve_path, - overwrite=overwrite, - ) + artifact_map = self.datastores[number].retrieveArtifacts( + datastore_refs, + destination, + transfer=transfer, + preserve_path=preserve_path, + overwrite=overwrite, + write_index=False, # Disable index writing regardless. + add_prefix=add_prefix, + ) + merged_artifact_map.update(artifact_map) + + if write_index: + index = ZipIndex.from_artifact_map(refs, merged_artifact_map, destination) + index.write_index(destination) + + return merged_artifact_map + + def ingest_zip(self, zip_path: ResourcePath, transfer: str | None) -> None: + """Ingest an indexed Zip file and contents. + + The Zip file must have an index file as created by `retrieveArtifacts`. + + Parameters + ---------- + zip_path : `lsst.resources.ResourcePath` + Path to the Zip file. + transfer : `str` + Method to use for transferring the Zip file into the datastore. + + Notes + ----- + Datastore constraints are bypassed with Zip ingest. A zip file can + contain multiple dataset types. Should the entire Zip be rejected + if one dataset type is in the constraints list? If configured to + reject everything, ingest should not be attempted. + + The Zip file is given to each datastore in turn, ignoring datastores + where it is not supported. Is deemed successful if any of the + datastores accept the file. + """ + index = ZipIndex.from_zip_file(zip_path) + refs = index.refs.to_refs(self.universe) + + # For now raise if any refs are not supported. + # Being selective will require that we return the ingested refs + # to the caller so that registry can be modified to remove the + # entries. + if any(not self.constraints.isAcceptable(ref) for ref in refs): + raise DatasetTypeNotSupportedError( + "Some of the refs in the given Zip file are not acceptable to this datastore." ) - return targets + n_success = 0 + final_exception: Exception | None = None + for number, (datastore, constraints) in enumerate( + zip(self.datastores, self.datastoreConstraints, strict=True) + ): + if datastore.isEphemeral: + continue + + # There can be constraints for the datastore in the configuration + # of the chaining, or constraints in the configuration of the + # datastore itself. + if any( + (constraints is not None and not constraints.isAcceptable(ref)) + or not datastore.constraints.isAcceptable(ref) + for ref in refs + ): + log.debug("Datastore %s skipping zip ingest due to constraints", datastore.name) + continue + try: + datastore.ingest_zip(zip_path, transfer=transfer) + except NotImplementedError: + continue + except Exception as e: + final_exception = e + else: + n_success += 1 + + if n_success: + return + if final_exception: + raise final_exception + raise RuntimeError("Ingest was not successful in any datastores.") def remove(self, ref: DatasetRef) -> None: """Indicate to the datastore that a dataset can be removed. diff --git a/python/lsst/daf/butler/datastores/fileDatastore.py b/python/lsst/daf/butler/datastores/fileDatastore.py index a4d3000e07..177db5aa49 100644 --- a/python/lsst/daf/butler/datastores/fileDatastore.py +++ b/python/lsst/daf/butler/datastores/fileDatastore.py @@ -81,7 +81,10 @@ get_dataset_as_python_object_from_get_info, ) from lsst.daf.butler.datastores.file_datastore.retrieve_artifacts import ( + ArtifactIndexInfo, + ZipIndex, determine_destination_for_retrieved_artifact, + unpack_zips, ) from lsst.daf.butler.datastores.fileDatastoreClient import ( FileDatastoreGetPayload, @@ -296,6 +299,8 @@ def __init__( else: self.cacheManager = DatastoreDisabledCacheManager("", universe=bridgeManager.universe) + self.universe = bridgeManager.universe + @classmethod def _create_from_config( cls, @@ -566,23 +571,50 @@ def _get_stored_records_associated_with_refs( records_by_ref[record["dataset_id"]].append(StoredFileInfo.from_record(record)) return records_by_ref - def _refs_associated_with_artifacts(self, paths: list[str | ResourcePath]) -> dict[str, set[DatasetId]]: + def _refs_associated_with_artifacts( + self, paths: Iterable[str | ResourcePath] + ) -> dict[str, set[DatasetId]]: """Return paths and associated dataset refs. Parameters ---------- paths : `list` of `str` or `lsst.resources.ResourcePath` - All the paths to include in search. + All the paths to include in search. These are exact matches + to the entries in the records table and can include fragments. Returns ------- mapping : `dict` of [`str`, `set` [`DatasetId`]] Mapping of each path to a set of associated database IDs. + These are artifacts and so any fragments are stripped from the + keys. """ - records = self._table.fetch(path=[str(path) for path in paths]) - result = defaultdict(set) - for row in records: - result[row["path"]].add(row["dataset_id"]) + # Group paths by those that have fragments and those that do not. + with_fragment = set() + without_fragment = set() + for rpath in paths: + spath = str(rpath) # Typing says can be ResourcePath so must force to string. + if "#" in spath: + spath, fragment = spath.rsplit("#", 1) + with_fragment.add(spath) + else: + without_fragment.add(spath) + + result: dict[str, set[DatasetId]] = defaultdict(set) + if without_fragment: + records = self._table.fetch(path=without_fragment) + for row in records: + path = row["path"] + result[path].add(row["dataset_id"]) + if with_fragment: + # Do a query per prefix. + for path in with_fragment: + records = self._table.fetch(path=f"{path}#%") + for row in records: + # Need to strip fragments before adding to dict. + row_path = row["path"] + artifact_path = row_path[: row_path.rfind("#")] + result[artifact_path].add(row["dataset_id"]) return result def _registered_refs_per_artifact(self, pathInStore: ResourcePath) -> set[DatasetId]: @@ -1946,6 +1978,71 @@ def _locations_to_URI( return uris + @staticmethod + def _find_missing_records( + datastore: FileDatastore, + refs: Iterable[DatasetRef], + missing_ids: set[DatasetId], + artifact_existence: dict[ResourcePath, bool] | None = None, + ) -> dict[DatasetId, list[StoredFileInfo]]: + if not missing_ids: + return {} + + if artifact_existence is None: + artifact_existence = {} + + found_records: dict[DatasetId, list[StoredFileInfo]] = defaultdict(list) + id_to_ref = {ref.id: ref for ref in refs if ref.id in missing_ids} + + # This should be chunked in case we end up having to check + # the file store since we need some log output to show + # progress. + for missing_ids_chunk in chunk_iterable(missing_ids, chunk_size=10_000): + records = {} + for missing in missing_ids_chunk: + # Ask the source datastore where the missing artifacts + # should be. An execution butler might not know about the + # artifacts even if they are there. + expected = datastore._get_expected_dataset_locations_info(id_to_ref[missing]) + records[missing] = [info for _, info in expected] + + # Call the mexist helper method in case we have not already + # checked these artifacts such that artifact_existence is + # empty. This allows us to benefit from parallelism. + # datastore.mexists() itself does not give us access to the + # derived datastore record. + log.verbose("Checking existence of %d datasets unknown to datastore", len(records)) + ref_exists = datastore._process_mexists_records( + id_to_ref, records, False, artifact_existence=artifact_existence + ) + + # Now go through the records and propagate the ones that exist. + location_factory = datastore.locationFactory + for missing, record_list in records.items(): + # Skip completely if the ref does not exist. + ref = id_to_ref[missing] + if not ref_exists[ref]: + log.warning("Asked to transfer dataset %s but no file artifacts exist for it.", ref) + continue + # Check for file artifact to decide which parts of a + # disassembled composite do exist. If there is only a + # single record we don't even need to look because it can't + # be a composite and must exist. + if len(record_list) == 1: + dataset_records = record_list + else: + dataset_records = [ + record + for record in record_list + if artifact_existence[record.file_location(location_factory).uri] + ] + assert len(dataset_records) > 0, "Disassembled composite should have had some files." + + # Rely on source_records being a defaultdict. + found_records[missing].extend(dataset_records) + log.verbose("Completed scan for missing data files") + return found_records + def retrieveArtifacts( self, refs: Iterable[DatasetRef], @@ -1953,7 +2050,9 @@ def retrieveArtifacts( transfer: str = "auto", preserve_path: bool = True, overwrite: bool = False, - ) -> list[ResourcePath]: + write_index: bool = True, + add_prefix: bool = False, + ) -> dict[ResourcePath, ArtifactIndexInfo]: """Retrieve the file artifacts associated with the supplied refs. Parameters @@ -1975,12 +2074,19 @@ def retrieveArtifacts( overwrite : `bool`, optional If `True` allow transfers to overwrite existing files at the destination. + write_index : `bool`, optional + If `True` write a file at the top level containing a serialization + of a `ZipIndex` for the downloaded datasets. + add_prefix : `bool`, optional + If `True` and if ``preserve_path`` is `False`, apply a prefix to + the filenames corresponding to some part of the dataset ref ID. + This can be used to guarantee uniqueness. Returns ------- - targets : `list` of `lsst.resources.ResourcePath` - URIs of file artifacts in destination location. Order is not - preserved. + artifact_map : `dict` [ `lsst.resources.ResourcePath`, \ + `ArtifactIndexInfo` ] + Mapping of retrieved file to associated index information. """ if not destination.isdir(): raise ValueError(f"Destination location must refer to a directory. Given {destination}") @@ -1992,22 +2098,150 @@ def retrieveArtifacts( # This also helps filter out duplicate DatasetRef in the request # that will map to the same underlying file transfer. to_transfer: dict[ResourcePath, ResourcePath] = {} + zips_to_transfer: set[ResourcePath] = set() - for ref in refs: - locations = self._get_dataset_locations_info(ref) - for location, _ in locations: + # Retrieve all the records in bulk indexed by ref.id. + records = self._get_stored_records_associated_with_refs(refs, ignore_datastore_records=True) + + # Check for missing records. + known_ids = set(records) + log.debug("Number of datastore records found in database: %d", len(known_ids)) + requested_ids = {ref.id for ref in refs} + missing_ids = requested_ids - known_ids + + if missing_ids and not self.trustGetRequest: + raise ValueError(f"Number of datasets missing from this datastore: {len(missing_ids)}") + + missing_records = self._find_missing_records(self, refs, missing_ids) + records.update(missing_records) + + # One artifact can be used by multiple DatasetRef. + # e.g. DECam. + artifact_map: dict[ResourcePath, ArtifactIndexInfo] = {} + # Sort to ensure that in many refs to one file situation the same + # ref is used for any prefix that might be added. + for ref in sorted(refs): + prefix = str(ref.id)[:8] + "-" if add_prefix else "" + for info in records[ref.id]: + location = info.file_location(self.locationFactory) source_uri = location.uri - target_uri = determine_destination_for_retrieved_artifact( - destination, location.pathInStore, preserve_path - ) - to_transfer[source_uri] = target_uri + # For DECam/zip we only want to copy once. + # For zip files we need to unpack so that they can be + # zipped up again if needed. + is_zip = source_uri.getExtension() == ".zip" and "zip-path" in source_uri.fragment + # We need to remove fragments for consistency. + cleaned_source_uri = source_uri.replace(fragment="", query="", params="") + if is_zip: + # Assume the DatasetRef definitions are within the Zip + # file itself and so can be dropped from loop. + zips_to_transfer.add(cleaned_source_uri) + elif cleaned_source_uri not in to_transfer: + target_uri = determine_destination_for_retrieved_artifact( + destination, location.pathInStore, preserve_path, prefix + ) + to_transfer[cleaned_source_uri] = target_uri + artifact_map[target_uri] = ArtifactIndexInfo.from_single(info.to_simple(), ref.id) + else: + target_uri = to_transfer[cleaned_source_uri] + artifact_map[target_uri].append(ref.id) # In theory can now parallelize the transfer log.debug("Number of artifacts to transfer to %s: %d", str(destination), len(to_transfer)) for source_uri, target_uri in to_transfer.items(): target_uri.transfer_from(source_uri, transfer=transfer, overwrite=overwrite) - return list(to_transfer.values()) + # Transfer the Zip files and unpack them. + zipped_artifacts = unpack_zips(zips_to_transfer, requested_ids, destination, preserve_path, overwrite) + artifact_map.update(zipped_artifacts) + + if write_index: + index = ZipIndex.from_artifact_map(refs, artifact_map, destination) + index.write_index(destination) + + return artifact_map + + def ingest_zip(self, zip_path: ResourcePath, transfer: str | None) -> None: + """Ingest an indexed Zip file and contents. + + The Zip file must have an index file as created by `retrieveArtifacts`. + + Parameters + ---------- + zip_path : `lsst.resources.ResourcePath` + Path to the Zip file. + transfer : `str` + Method to use for transferring the Zip file into the datastore. + + Notes + ----- + Datastore constraints are bypassed with Zip ingest. A zip file can + contain multiple dataset types. Should the entire Zip be rejected + if one dataset type is in the constraints list? + + If any dataset is already present in the datastore the entire ingest + will fail. + """ + index = ZipIndex.from_zip_file(zip_path) + + # Refs indexed by UUID. + refs = index.refs.to_refs(universe=self.universe) + id_to_ref = {ref.id: ref for ref in refs} + + # Any failing constraints trigger entire failure. + if any(not self.constraints.isAcceptable(ref) for ref in refs): + raise DatasetTypeNotSupportedError( + "Some refs in the Zip file are not supported by this datastore" + ) + + # Transfer the Zip file into the datastore file system. + # There is no RUN as such to use for naming. + # Potentially could use the RUN from the first ref in the index + # There is no requirement that the contents of the Zip files share + # the same RUN. + # Could use the Zip UUID from the index + special "zips/" prefix. + if transfer is None: + # Indicated that the zip file is already in the right place. + if not zip_path.isabs(): + tgtLocation = self.locationFactory.fromPath(zip_path.ospath, trusted_path=False) + else: + pathInStore = zip_path.relative_to(self.root) + if pathInStore is None: + raise RuntimeError( + f"Unexpectedly learned that {zip_path} is not within datastore {self.root}" + ) + tgtLocation = self.locationFactory.fromPath(pathInStore, trusted_path=True) + elif transfer == "direct": + # Reference in original location. + tgtLocation = None + else: + # Name the zip file based on index contents. + tgtLocation = self.locationFactory.fromPath(index.calculate_zip_file_path_in_store()) + if not tgtLocation.uri.dirname().exists(): + log.debug("Folder %s does not exist yet.", tgtLocation.uri.dirname()) + tgtLocation.uri.dirname().mkdir() + + # Transfer the Zip file into the datastore. + tgtLocation.uri.transfer_from( + zip_path, transfer=transfer, transaction=self._transaction, overwrite=True + ) + + if tgtLocation is None: + path_in_store = str(zip_path) + else: + path_in_store = tgtLocation.pathInStore.path + + # Associate each file with a (DatasetRef, StoredFileInfo) tuple. + artifacts: list[tuple[DatasetRef, StoredFileInfo]] = [] + for path_in_zip, index_info in index.artifact_map.items(): + # Need to modify the info to include the path to the Zip file + # that was previously written to the datastore. + index_info.info.path = f"{path_in_store}#zip-path={path_in_zip}" + + info = StoredFileInfo.from_simple(index_info.info) + for id_ in index_info.ids: + artifacts.append((id_to_ref[id_], info)) + + self._register_datasets(artifacts, insert_mode=DatabaseInsertMode.INSERT) def get( self, @@ -2299,36 +2533,49 @@ def emptyTrash(self, ignore_errors: bool = True) -> None: # This requires multiple copies of the trashed items trashed, artifacts_to_keep = trash_data - if artifacts_to_keep is None: + # Assume that # in path means there are fragments involved. The + # fragments can not be handled by the emptyTrash bridge call + # so need to be processed independently. + # The generator has to be converted to a list for multiple + # iterations. Clean up the typing so that multiple isinstance + # tests aren't needed later. + trashed_list = [(ref, ninfo) for ref, ninfo in trashed if isinstance(ninfo, StoredFileInfo)] + + if artifacts_to_keep is None or any("#" in info[1].path for info in trashed_list): # The bridge is not helping us so have to work it out # ourselves. This is not going to be as efficient. - trashed = list(trashed) - - # The instance check is for mypy since up to this point it - # does not know the type of info. - path_map = self._refs_associated_with_artifacts( - [info.path for _, info in trashed if isinstance(info, StoredFileInfo)] - ) - - for ref, info in trashed: - # Mypy needs to know this is not the base class - assert isinstance(info, StoredFileInfo), f"Unexpectedly got info of class {type(info)}" - - path_map[info.path].remove(ref.id) - if not path_map[info.path]: - del path_map[info.path] - - artifacts_to_keep = set(path_map) + # This mapping does not include the fragments. + if artifacts_to_keep is not None: + # This means we have already checked for non-fragment + # examples so can filter. + paths_to_check = {info.path for _, info in trashed_list if "#" in info.path} + else: + paths_to_check = {info.path for _, info in trashed_list} + + path_map = self._refs_associated_with_artifacts(paths_to_check) + + for ref, info in trashed_list: + path = info.artifact_path + # For disassembled composites in a Zip it is possible + # for the same path to correspond to the same dataset ref + # multiple times so trap for that. + if ref.id in path_map[path]: + path_map[path].remove(ref.id) + if not path_map[path]: + del path_map[path] + + slow_artifacts_to_keep = set(path_map) + if artifacts_to_keep is not None: + artifacts_to_keep.update(slow_artifacts_to_keep) + else: + artifacts_to_keep = slow_artifacts_to_keep - for ref, info in trashed: + for ref, info in trashed_list: # Should not happen for this implementation but need # to keep mypy happy. assert info is not None, f"Internal logic error in emptyTrash with ref {ref}." - # Mypy needs to know this is not the base class - assert isinstance(info, StoredFileInfo), f"Unexpectedly got info of class {type(info)}" - - if info.path in artifacts_to_keep: + if info.artifact_path in artifacts_to_keep: # This is a multi-dataset artifact and we are not # removing all associated refs. continue @@ -2453,55 +2700,10 @@ def transfer_from( len(missing_ids), len(requested_ids), ) - id_to_ref = {ref.id: ref for ref in refs if ref.id in missing_ids} - - # This should be chunked in case we end up having to check - # the file store since we need some log output to show - # progress. - for missing_ids_chunk in chunk_iterable(missing_ids, chunk_size=10_000): - records = {} - for missing in missing_ids_chunk: - # Ask the source datastore where the missing artifacts - # should be. An execution butler might not know about the - # artifacts even if they are there. - expected = source_datastore._get_expected_dataset_locations_info(id_to_ref[missing]) - records[missing] = [info for _, info in expected] - - # Call the mexist helper method in case we have not already - # checked these artifacts such that artifact_existence is - # empty. This allows us to benefit from parallelism. - # datastore.mexists() itself does not give us access to the - # derived datastore record. - log.verbose("Checking existence of %d datasets unknown to datastore", len(records)) - ref_exists = source_datastore._process_mexists_records( - id_to_ref, records, False, artifact_existence=artifact_existence - ) - - # Now go through the records and propagate the ones that exist. - location_factory = source_datastore.locationFactory - for missing, record_list in records.items(): - # Skip completely if the ref does not exist. - ref = id_to_ref[missing] - if not ref_exists[ref]: - log.warning("Asked to transfer dataset %s but no file artifacts exist for it.", ref) - continue - # Check for file artifact to decide which parts of a - # disassembled composite do exist. If there is only a - # single record we don't even need to look because it can't - # be a composite and must exist. - if len(record_list) == 1: - dataset_records = record_list - else: - dataset_records = [ - record - for record in record_list - if artifact_existence[record.file_location(location_factory).uri] - ] - assert len(dataset_records) > 0, "Disassembled composite should have had some files." - - # Rely on source_records being a defaultdict. - source_records[missing].extend(dataset_records) - log.verbose("Completed scan for missing data files") + found_records = self._find_missing_records( + source_datastore, refs, missing_ids, artifact_existence + ) + source_records.update(found_records) # See if we already have these records target_records = self._get_stored_records_associated_with_refs(refs, ignore_datastore_records=True) diff --git a/python/lsst/daf/butler/datastores/file_datastore/retrieve_artifacts.py b/python/lsst/daf/butler/datastores/file_datastore/retrieve_artifacts.py index 0445a1bd30..c143e82712 100644 --- a/python/lsst/daf/butler/datastores/file_datastore/retrieve_artifacts.py +++ b/python/lsst/daf/butler/datastores/file_datastore/retrieve_artifacts.py @@ -27,13 +27,258 @@ from __future__ import annotations -__all__ = ("determine_destination_for_retrieved_artifact",) +__all__ = ("determine_destination_for_retrieved_artifact", "retrieve_and_zip", "unpack_zips", "ZipIndex") +import logging +import tempfile +import uuid +import zipfile +from collections.abc import Iterable +from typing import ClassVar, Literal, Protocol, Self + +from lsst.daf.butler import ( + DatasetIdFactory, + DatasetRef, + SerializedDatasetRefContainers, + SerializedDatasetRefContainerV1, +) +from lsst.daf.butler.datastore.stored_file_info import SerializedStoredFileInfo from lsst.resources import ResourcePath, ResourcePathExpression +from pydantic import BaseModel + +_LOG = logging.getLogger(__name__) + + +class ArtifactIndexInfo(BaseModel): + """Information related to an artifact in an index.""" + + info: SerializedStoredFileInfo + """Datastore record information for this file artifact.""" + + ids: set[uuid.UUID] + """Dataset IDs for this artifact.""" + + def append(self, id_: uuid.UUID) -> None: + """Add an additional dataset ID. + + Parameters + ---------- + id_ : `uuid.UUID` + Additional dataset ID to associate with this artifact. + """ + self.ids.add(id_) + + @classmethod + def from_single(cls, info: SerializedStoredFileInfo, id_: uuid.UUID) -> Self: + """Create a mapping from a single ID and info. + + Parameters + ---------- + info : `SerializedStoredFileInfo` + Datastore record for this artifact. + id_ : `uuid.UUID` + First dataset ID to associate with this artifact. + """ + return cls(info=info, ids=[id_]) + + def subset(self, ids: Iterable[uuid.UUID]) -> Self: + """Replace the IDs with a subset of the IDs and return a new instance. + + Parameters + ---------- + ids : `~collections.abc.Iterable` [ `uuid.UUID` ] + Subset of IDs to keep. + + Returns + ------- + subsetted : `ArtifactIndexInfo` + New instance with the requested subset. + + Raises + ------ + ValueError + Raised if the given IDs is not a subset of the current IDs. + """ + subset = set(ids) + if subset - self.ids: + raise ValueError(f"Given subset of {subset} is not a subset of {self.ids}") + return type(self)(ids=subset, info=self.info.model_copy()) + + +class ZipIndex(BaseModel): + """Index of a Zip file of Butler datasets. + + A file can be associated with multiple butler datasets and a single + butler dataset can be associated with multiple files. This model + provides the necessary information for ingesting back into a Butler + file datastore. + """ + + index_version: Literal["V1"] = "V1" + + refs: SerializedDatasetRefContainers + """Deduplicated information for all the `DatasetRef` in the index.""" + + artifact_map: dict[str, ArtifactIndexInfo] + """Mapping of each Zip member to associated lookup information.""" + + index_name: ClassVar[str] = "_butler_zip_index.json" + """Name to use when saving the index to a file.""" + + def generate_uuid5(self) -> uuid.UUID: + """Create a UUID based on the Zip index. + + Returns + ------- + id_ : `uuid.UUID` + A UUID5 created from the paths inside the Zip file. Guarantees + that if the Zip file is regenerated with exactly the same file + paths the same answer will be returned. + """ + # Options are: + # - uuid5 based on file paths in zip + # - uuid5 based on ref uuids. + # - checksum derived from the above. + # - uuid5 from file paths and dataset refs. + # Do not attempt to include file contents in UUID. + # Start with uuid5 from file paths. + data = ",".join(sorted(self.artifact_map.keys())) + # No need to come up with a different namespace. + return uuid.uuid5(DatasetIdFactory.NS_UUID, data) + + def __len__(self) -> int: + """Return the number of files in the Zip.""" + return len(self.artifact_map) + + def calculate_zip_file_name(self) -> str: + """Calculate the default name for the Zip file based on the index + contents. + + Returns + ------- + name : `str` + Name of the zip file based on index. + """ + return f"{self.generate_uuid5()}.zip" + + def calculate_zip_file_path_in_store(self) -> str: + """Calculate the relative path inside a datastore that should be + used for this Zip file. + + Returns + ------- + path_in_store : `str` + Relative path to use for Zip file in datastore. + """ + zip_name = self.calculate_zip_file_name() + return f"zips/{zip_name[:4]}/{zip_name}" + + def write_index(self, dir: ResourcePath) -> ResourcePath: + """Write the index to the specified directory. + + Parameters + ---------- + dir : `~lsst.resources.ResourcePath` + Directory to write the index file to. + + Returns + ------- + index_path : `~lsst.resources.ResourcePath` + Path to the index file that was written. + """ + index_path = dir.join(self.index_name, forceDirectory=False) + with index_path.open("w") as fd: + # Need to include unset/default values so that the version + # discriminator field for refs container appears in the + # serialization. + print(self.model_dump_json(), file=fd) + return index_path + + @classmethod + def calc_relative_paths( + cls, root: ResourcePath, paths: Iterable[ResourcePath] + ) -> dict[ResourcePath, str]: + """Calculate the path to use inside the Zip file from the full path. + + Parameters + ---------- + root : `lsst.resources.ResourcePath` + The reference root directory. + paths : `~collections.abc.Iterable` [ `lsst.resources.ResourcePath` ] + The paths to the files that should be included in the Zip file. + + Returns + ------- + abs_to_rel : `dict` [ `~lsst.resources.ResourcePath`, `str` ] + Mapping of the original file path to the relative path to use + in Zip file. + """ + file_to_relative: dict[ResourcePath, str] = {} + for p in paths: + # It is an error if there is no relative path. + rel = p.relative_to(root) + if rel is None: + raise RuntimeError(f"Unexepectedly unable to calculate relative path of {p} to {root}.") + file_to_relative[p] = rel + return file_to_relative + + @classmethod + def from_artifact_map( + cls, + refs: Iterable[DatasetRef], + artifact_map: dict[ResourcePath, ArtifactIndexInfo], + root: ResourcePath, + ) -> Self: + """Create an index from the mappings returned from + `Datastore.retrieveArtifacts`. + + Parameters + ---------- + refs : `~collections.abc.Iterable` [ `DatasetRef` ] + Datasets present in the index. + artifact_map : `dict` [ `lsst.resources.ResourcePath`, `ArtifactMap` ] + Mapping of artifact path to information linking it to the + associated refs and datastore information. + root : `lsst.resources.ResourcePath` + Root path to be removed from all the paths before creating the + index. + """ + if not refs: + return cls( + refs=SerializedDatasetRefContainerV1.from_refs(refs), + artifact_map={}, + ) + + # Calculate the paths relative to the given root since the Zip file + # uses relative paths. + file_to_relative = cls.calc_relative_paths(root, artifact_map.keys()) + + simplified_refs = SerializedDatasetRefContainerV1.from_refs(refs) + + # Convert the artifact mapping to relative path. + relative_artifact_map = {file_to_relative[path]: info for path, info in artifact_map.items()} + + return cls( + refs=simplified_refs, + artifact_map=relative_artifact_map, + ) + + @classmethod + def from_zip_file(cls, zip_path: ResourcePath) -> Self: + """Given a path to a Zip file return the index. + + Parameters + ---------- + zip_path : `lsst.resources.ResourcePath` + Path to the Zip file. + """ + with zip_path.open("rb") as fd, zipfile.ZipFile(fd) as zf: + json_data = zf.read(cls.index_name) + return cls.model_validate_json(json_data) def determine_destination_for_retrieved_artifact( - destination_directory: ResourcePath, source_path: ResourcePath, preserve_path: bool + destination_directory: ResourcePath, source_path: ResourcePath, preserve_path: bool, prefix: str = "" ) -> ResourcePath: """Determine destination path for an artifact retrieved from a datastore. @@ -44,10 +289,12 @@ def determine_destination_for_retrieved_artifact( source_path : `ResourcePath` Path to the source file to be transferred. This may be relative to the datastore root, or an absolute path. - preserve_path : `bool`, optional + preserve_path : `bool` If `True` the full path of the artifact within the datastore is preserved. If `False` the final file component of the path is used. + prefix : `str`, optional + Prefix to add to the file name if ``preserve_path`` is `False`. Returns ------- @@ -65,8 +312,175 @@ def determine_destination_for_retrieved_artifact( target_path = target_path.relativeToPathRoot else: target_path = source_path.basename() + if prefix: + target_path = prefix + target_path target_uri = destination_directory.join(target_path).abspath() if target_uri.relative_to(destination_directory) is None: raise ValueError(f"File path attempts to escape destination directory: '{source_path}'") return target_uri + + +class RetrievalCallable(Protocol): + def __call__( + self, + refs: Iterable[DatasetRef], + destination: ResourcePath, + transfer: str, + preserve_path: bool, + overwrite: bool, + write_index: bool, + add_prefix: bool, + ) -> dict[ResourcePath, ArtifactIndexInfo]: ... + + +def retrieve_and_zip( + refs: Iterable[DatasetRef], + destination: ResourcePathExpression, + retrieval_callback: RetrievalCallable, + overwrite: bool = True, +) -> ResourcePath: + """Retrieve artifacts from a Butler and place in ZIP file. + + Parameters + ---------- + refs : `collections.abc.Iterable` [ `DatasetRef` ] + The datasets to be included in the zip file. Must all be from + the same dataset type. + destination : `lsst.resources.ResourcePath` + Directory to write the new ZIP file. This directory will + also be used as a staging area for the datasets being downloaded + from the datastore. + retrieval_callback : `~collections.abc.Callable` + Bound method for a function that can retrieve the artifacts and + return the metadata necessary for creating the zip index. For example + `lsst.daf.butler.datastore.Datastore.retrieveArtifacts`. + overwrite : `bool`, optional + If `False` the output Zip will not be written if a file of the + same name is already present in ``destination``. + + Returns + ------- + zip_file : `lsst.resources.ResourcePath` + The path to the new ZIP file. + + Raises + ------ + ValueError + Raised if there are no refs to retrieve. + """ + if not refs: + raise ValueError("Requested Zip file with no contents.") + + outdir = ResourcePath(destination, forceDirectory=True) + if not outdir.isdir(): + raise ValueError(f"Destination location must refer to a directory. Given {destination}") + + if not outdir.exists(): + outdir.mkdir() + + # Simplest approach: + # - create temp dir in destination + # - Run retrieveArtifacts to that temp dir + # - Create zip file with unique name. + # - Delete temp dir + # - Add index file to ZIP. + # - Return name of zip file. + with tempfile.TemporaryDirectory(dir=outdir.ospath, ignore_cleanup_errors=True) as tmpdir: + tmpdir_path = ResourcePath(tmpdir, forceDirectory=True) + # Retrieve the artifacts and write the index file. Strip paths. + artifact_map = retrieval_callback( + refs=refs, + destination=tmpdir_path, + transfer="auto", + preserve_path=False, + overwrite=False, + write_index=True, + add_prefix=True, + ) + + # Read the index to construct file name. + index_path = tmpdir_path.join(ZipIndex.index_name, forceDirectory=False) + index_json = index_path.read() + index = ZipIndex.model_validate_json(index_json) + + # Use unique name based on files in Zip. + zip_file_name = index.calculate_zip_file_name() + zip_path = outdir.join(zip_file_name, forceDirectory=False) + if not overwrite and zip_path.exists(): + raise FileExistsError(f"Output Zip at {zip_path} already exists but cannot overwrite.") + with zipfile.ZipFile(zip_path.ospath, "w") as zip: + zip.write(index_path.ospath, index_path.basename(), compress_type=zipfile.ZIP_DEFLATED) + for path, name in index.calc_relative_paths(tmpdir_path, list(artifact_map)).items(): + zip.write(path.ospath, name) + + return zip_path + + +def unpack_zips( + zips_to_transfer: Iterable[ResourcePath], + allowed_ids: set[uuid.UUID], + destination: ResourcePath, + preserve_path: bool, + overwrite: bool, +) -> dict[ResourcePath, ArtifactIndexInfo]: + """Transfer the Zip files and unpack them in the destination directory. + + Parameters + ---------- + zips_to_transfer : `~collections.abc.Iterable` \ + [ `~lsst.resources.ResourcePath` ] + Paths to Zip files to unpack. These must be Zip files that include + the index information and were created by the Butler. + allowed_ids : `set` [ `uuid.UUID` ] + All the possible dataset IDs for which artifacts should be extracted + from the Zip file. If an ID in the Zip file is not present in this + list the artifact will not be extracted from the Zip. + destination : `~lsst.resources.ResourcePath` + Output destination for the Zip contents. + preserve_path : `bool` + Whether to include subdirectories during extraction. If `True` a + directory will be made per Zip. + overwrite : `bool`, optional + If `True` allow transfers to overwrite existing files at the + destination. + + Returns + ------- + artifact_map : `dict` \ + [ `~lsst.resources.ResourcePath`, `ArtifactIndexInfo` ] + Path linking Zip contents location to associated artifact information. + """ + artifact_map: dict[ResourcePath, ArtifactIndexInfo] = {} + for source_uri in zips_to_transfer: + _LOG.debug("Unpacking zip file %s", source_uri) + # Assume that downloading to temporary location is more efficient + # than trying to read the contents remotely. + with ResourcePath.temporary_uri(suffix=".zip") as temp: + temp.transfer_from(source_uri, transfer="auto") + index = ZipIndex.from_zip_file(temp) + + if preserve_path: + subdir = ResourcePath( + index.calculate_zip_file_path_in_store(), forceDirectory=False, forceAbsolute=False + ).dirname() + outdir = destination.join(subdir) + else: + outdir = destination + outdir.mkdir() + with temp.open("rb") as fd, zipfile.ZipFile(fd) as zf: + for path_in_zip, artifact_info in index.artifact_map.items(): + # Skip if this specific dataset ref is not requested. + included_ids = artifact_info.ids & allowed_ids + if included_ids: + # Do not apply a new prefix since the zip file + # should already have a prefix. + output_path = outdir.join(path_in_zip, forceDirectory=False) + if not overwrite and output_path.exists(): + raise FileExistsError( + f"Destination path '{output_path}' already exists. " + "Extraction from Zip cannot be completed." + ) + zf.extract(path_in_zip, path=outdir.ospath) + artifact_map[output_path] = artifact_info.subset(included_ids) + return artifact_map diff --git a/python/lsst/daf/butler/datastores/inMemoryDatastore.py b/python/lsst/daf/butler/datastores/inMemoryDatastore.py index 57bba73335..ecd0e1b9ff 100644 --- a/python/lsst/daf/butler/datastores/inMemoryDatastore.py +++ b/python/lsst/daf/butler/datastores/inMemoryDatastore.py @@ -50,6 +50,7 @@ if TYPE_CHECKING: from lsst.daf.butler import Config, DatasetType, LookupKey from lsst.daf.butler.datastore import DatastoreOpaqueTable + from lsst.daf.butler.datastores.file_datastore.retrieve_artifacts import ArtifactIndexInfo from lsst.daf.butler.registry.interfaces import DatasetIdRef, DatastoreRegistryBridgeManager log = logging.getLogger(__name__) @@ -516,6 +517,9 @@ def getURI(self, ref: DatasetRef, predict: bool = False) -> ResourcePath: raise AssertionError(f"Unexpectedly got no URI for in-memory datastore for {ref}") return primary + def ingest_zip(self, zip_path: ResourcePath, transfer: str | None) -> None: + raise NotImplementedError("Can only ingest a Zip into a file datastore.") + def retrieveArtifacts( self, refs: Iterable[DatasetRef], @@ -523,7 +527,9 @@ def retrieveArtifacts( transfer: str = "auto", preserve_path: bool = True, overwrite: bool | None = False, - ) -> list[ResourcePath]: + write_index: bool = True, + add_prefix: bool = False, + ) -> dict[ResourcePath, ArtifactIndexInfo]: """Retrieve the file artifacts associated with the supplied refs. Parameters @@ -545,6 +551,13 @@ def retrieveArtifacts( overwrite : `bool`, optional If `True` allow transfers to overwrite existing files at the destination. + write_index : `bool`, optional + If `True` write a file at the top level containing a serialization + of a `ZipIndex` for the downloaded datasets. + add_prefix : `bool`, optional + If `True` and if ``preserve_path`` is `False`, apply a prefix to + the filenames corresponding to some part of the dataset ref ID. + This can be used to guarantee uniqueness. Notes ----- diff --git a/python/lsst/daf/butler/dimensions/_coordinate.py b/python/lsst/daf/butler/dimensions/_coordinate.py index d6f4189e9c..d511a49cac 100644 --- a/python/lsst/daf/butler/dimensions/_coordinate.py +++ b/python/lsst/daf/butler/dimensions/_coordinate.py @@ -709,7 +709,7 @@ def to_simple(self, minimal: bool = False) -> SerializedDataCoordinate: else: records = None - return SerializedDataCoordinate(dataId=dict(self.mapping), records=records) + return SerializedDataCoordinate(dataId=dict(self.required), records=records) @classmethod def from_simple( diff --git a/python/lsst/daf/butler/direct_butler/_direct_butler.py b/python/lsst/daf/butler/direct_butler/_direct_butler.py index 7c89d77d2f..892bcaa8f5 100644 --- a/python/lsst/daf/butler/direct_butler/_direct_butler.py +++ b/python/lsst/daf/butler/direct_butler/_direct_butler.py @@ -41,6 +41,7 @@ import logging import numbers import os +import uuid import warnings from collections import Counter, defaultdict from collections.abc import Iterable, Iterator, MutableMapping, Sequence @@ -62,11 +63,13 @@ from .._dataset_type import DatasetType from .._deferredDatasetHandle import DeferredDatasetHandle from .._exceptions import DatasetNotFoundError, DimensionValueError, EmptyQueryResultError, ValidationError +from .._file_dataset import FileDataset from .._limited_butler import LimitedButler from .._registry_shim import RegistryShim from .._storage_class import StorageClass, StorageClassFactory from .._timespan import Timespan from ..datastore import Datastore, NullDatastore +from ..datastores.file_datastore.retrieve_artifacts import ZipIndex, retrieve_and_zip from ..dimensions import DataCoordinate, Dimension from ..direct_query_driver import DirectQueryDriver from ..progress import Progress @@ -87,7 +90,6 @@ from lsst.resources import ResourceHandleProtocol from .._dataset_ref import DatasetId - from .._file_dataset import FileDataset from ..datastore import DatasetRefURIs from ..dimensions import DataId, DataIdValue, DimensionElement, DimensionRecord, DimensionUniverse from ..registry import CollectionArgType, Registry @@ -1290,6 +1292,14 @@ def find_dataset( return ref + def retrieve_artifacts_zip( + self, + refs: Iterable[DatasetRef], + destination: ResourcePathExpression, + overwrite: bool = True, + ) -> ResourcePath: + return retrieve_and_zip(refs, destination, self._datastore.retrieveArtifacts, overwrite) + def retrieveArtifacts( self, refs: Iterable[DatasetRef], @@ -1299,13 +1309,16 @@ def retrieveArtifacts( overwrite: bool = False, ) -> list[ResourcePath]: # Docstring inherited. - return self._datastore.retrieveArtifacts( + outdir = ResourcePath(destination) + artifact_map = self._datastore.retrieveArtifacts( refs, - ResourcePath(destination), + outdir, transfer=transfer, preserve_path=preserve_path, overwrite=overwrite, + write_index=True, ) + return list(artifact_map) def exists( self, @@ -1501,17 +1514,68 @@ def pruneDatasets( self._datastore.emptyTrash() @transactional - def ingest( - self, - *datasets: FileDataset, - transfer: str | None = "auto", - record_validation_info: bool = True, - ) -> None: - # Docstring inherited. + def ingest_zip(self, zip_file: ResourcePathExpression, transfer: str = "auto") -> None: + """Ingest a Zip file into this butler. + + The Zip file must have been created by `retrieve_artifacts_zip`. + + Parameters + ---------- + zip_file : `lsst.resources.ResourcePathExpression` + Path to the Zip file. + transfer : `str`, optional + Method to use to transfer the Zip into the datastore. + + Notes + ----- + Run collections are created as needed. + """ if not self.isWriteable(): raise TypeError("Butler is read-only.") - _LOG.verbose("Ingesting %d file dataset%s.", len(datasets), "" if len(datasets) == 1 else "s") + zip_path = ResourcePath(zip_file) + index = ZipIndex.from_zip_file(zip_path) + _LOG.verbose( + "Ingesting %s containing %d datasets and %d files.", zip_path, len(index.refs), len(index) + ) + + # Need to ingest the refs into registry. Re-use the standard ingest + # code by reconstructing FileDataset from the index. + refs = index.refs.to_refs(universe=self.dimensions) + id_to_ref = {ref.id: ref for ref in refs} + datasets: list[FileDataset] = [] + processed_ids: set[uuid.UUID] = set() + for path_in_zip, index_info in index.artifact_map.items(): + # Disassembled composites need to check this ref isn't already + # included. + unprocessed = {id_ for id_ in index_info.ids if id_ not in processed_ids} + if not unprocessed: + continue + dataset = FileDataset(refs=[id_to_ref[id_] for id_ in unprocessed], path=path_in_zip) + datasets.append(dataset) + processed_ids.update(unprocessed) + + # Ingest doesn't create the RUN collections so we have to do that + # here. + runs = {ref.run for ref in refs} + for run in runs: + registered = self.collections.register(run) + if registered: + _LOG.verbose("Created RUN collection %s as part of zip ingest", run) + + # Do not need expanded dataset refs so can ignore the return value. + self._ingest_file_datasets(datasets) + + try: + self._datastore.ingest_zip(zip_path, transfer=transfer) + except IntegrityError as e: + raise ConflictingDefinitionError(f"Datastore already contains one or more datasets: {e}") from e + + def _ingest_file_datasets( + self, + datasets: Sequence[FileDataset], + ) -> None: + # Docstring inherited. if not datasets: return @@ -1583,7 +1647,10 @@ def ingest( # guarantee that they are expanded and Datastore will need # the records. imported_refs = self._registry._importDatasets(refs_to_import, expand=True) - assert set(imported_refs) == set(refs_to_import) + + # Since refs can come from an external butler, there is no + # guarantee that the DatasetType values match. Compare IDs. + assert {ref.id for ref in imported_refs} == {ref.id for ref in refs_to_import} # Replace all the refs in the FileDataset with expanded versions. # Pull them off in the order we put them on the list. @@ -1592,6 +1659,22 @@ def ingest( dataset.refs = imported_refs[:n_dataset_refs] del imported_refs[:n_dataset_refs] + @transactional + def ingest( + self, + *datasets: FileDataset, + transfer: str | None = "auto", + record_validation_info: bool = True, + ) -> None: + # Docstring inherited. + if not datasets: + return + if not self.isWriteable(): + raise TypeError("Butler is read-only.") + _LOG.verbose("Ingesting %d file dataset%s.", len(datasets), "" if len(datasets) == 1 else "s") + + self._ingest_file_datasets(datasets) + # Bulk-insert everything into Datastore. # We do not know if any of the registry entries already existed # (_importDatasets only complains if they exist but differ) so diff --git a/python/lsst/daf/butler/formatters/json.py b/python/lsst/daf/butler/formatters/json.py index 298adc73c2..2599831c81 100644 --- a/python/lsst/daf/butler/formatters/json.py +++ b/python/lsst/daf/butler/formatters/json.py @@ -63,7 +63,13 @@ def read_from_uri(self, uri: ResourcePath, component: str | None = None, expecte data = pytype.model_validate_json(json_bytes) else: # This can raise ValueError. - data = from_json(json_bytes) + try: + data = from_json(json_bytes) + except ValueError as e: + # Report on the first few bytes of the file. + bytes_str = json_bytes[:60].decode(errors="replace") + e.add_note(f"Error parsing JSON bytes starting with {bytes_str!r}") + raise return data diff --git a/python/lsst/daf/butler/formatters/yaml.py b/python/lsst/daf/butler/formatters/yaml.py index 61706f4095..400c68a156 100644 --- a/python/lsst/daf/butler/formatters/yaml.py +++ b/python/lsst/daf/butler/formatters/yaml.py @@ -49,7 +49,14 @@ class YamlFormatter(TypelessFormatter): def read_from_uri(self, uri: ResourcePath, component: str | None = None, expected_size: int = -1) -> Any: # Can not use ResourcePath.open() - data = yaml.safe_load(uri.read()) + yaml_bytes = uri.read() + try: + data = yaml.safe_load(yaml_bytes) + except Exception as e: + # Report on the first few bytes of the file. + bytes_str = yaml_bytes[:60].decode(errors="replace") + e.add_note(f"Error parsing JSON bytes starting with {bytes_str!r}") + raise return data def to_bytes(self, in_memory_dataset: Any) -> bytes: diff --git a/python/lsst/daf/butler/registry/opaque.py b/python/lsst/daf/butler/registry/opaque.py index 2062c448cc..09d1c18461 100644 --- a/python/lsst/daf/butler/registry/opaque.py +++ b/python/lsst/daf/butler/registry/opaque.py @@ -98,7 +98,10 @@ def replace(self, *data: dict, transaction: DatastoreTransaction | None = None) # the database itself providing any rollback functionality. self._db.replace(self._table, *data) - def fetch(self, **where: Any) -> Iterator[sqlalchemy.RowMapping]: + def fetch( + self, + **where: Any, + ) -> Iterator[sqlalchemy.RowMapping]: # Docstring inherited from OpaqueTableStorage. def _batch_in_clause( @@ -123,8 +126,12 @@ def _batch_in_clauses(**where: Any) -> Iterator[sqlalchemy.sql.expression.Column if isinstance(v, list | tuple | set): batches.append(_batch_in_clause(column, v)) else: - # single "batch" for a regular eq operator - batches.append([column == v]) + if isinstance(v, str) and v.endswith("%"): + # Special case prefix queries. + batches.append([column.startswith(v[:-1])]) + else: + # single "batch" for a regular eq operator + batches.append([column == v]) for clauses in itertools.product(*batches): yield sqlalchemy.sql.and_(*clauses) diff --git a/python/lsst/daf/butler/registry/queries/_results.py b/python/lsst/daf/butler/registry/queries/_results.py index c643e7ce09..fbb904a0ec 100644 --- a/python/lsst/daf/butler/registry/queries/_results.py +++ b/python/lsst/daf/butler/registry/queries/_results.py @@ -47,6 +47,7 @@ from typing import Any, Self from deprecated.sphinx import deprecated +from lsst.utils.introspection import find_outside_stacklevel from ..._dataset_ref import DatasetRef from ..._dataset_type import DatasetType @@ -537,6 +538,9 @@ def limit(self, limit: int, offset: int | None | EllipsisType = ...) -> Self: "'offset' parameter should no longer be used. It is not supported by the new query system." " Will be removed after v28.", FutureWarning, + stacklevel=find_outside_stacklevel( + "lsst.daf.butler", allow_modules={"lsst.daf.butler.registry.tests"} + ), ) if offset is None or offset is ...: offset = 0 diff --git a/python/lsst/daf/butler/remote_butler/_remote_butler.py b/python/lsst/daf/butler/remote_butler/_remote_butler.py index bda66fc20a..8f6c8a9914 100644 --- a/python/lsst/daf/butler/remote_butler/_remote_butler.py +++ b/python/lsst/daf/butler/remote_butler/_remote_butler.py @@ -38,7 +38,11 @@ from deprecated.sphinx import deprecated from lsst.daf.butler.datastores.file_datastore.retrieve_artifacts import ( + ArtifactIndexInfo, + ZipIndex, determine_destination_for_retrieved_artifact, + retrieve_and_zip, + unpack_zips, ) from lsst.daf.butler.datastores.fileDatastoreClient import ( FileDatastoreGetPayload, @@ -414,14 +418,16 @@ def find_dataset( ref = DatasetRef.from_simple(model.dataset_ref, universe=self.dimensions) return apply_storage_class_override(ref, dataset_type, storage_class) - def retrieveArtifacts( + def _retrieve_artifacts( self, refs: Iterable[DatasetRef], destination: ResourcePathExpression, transfer: str = "auto", preserve_path: bool = True, overwrite: bool = False, - ) -> list[ResourcePath]: + write_index: bool = True, + add_prefix: bool = False, + ) -> dict[ResourcePath, ArtifactIndexInfo]: destination = ResourcePath(destination).abspath() if not destination.isdir(): raise ValueError(f"Destination location must refer to a directory. Given {destination}.") @@ -429,21 +435,72 @@ def retrieveArtifacts( if transfer not in ("auto", "copy"): raise ValueError("Only 'copy' and 'auto' transfer modes are supported.") - output_uris: list[ResourcePath] = [] - for ref in refs: + requested_ids = {ref.id for ref in refs} + have_copied: dict[ResourcePath, ResourcePath] = {} + artifact_map: dict[ResourcePath, ArtifactIndexInfo] = {} + # Sort to ensure that in many refs to one file situation the same + # ref is used for any prefix that might be added. + for ref in sorted(refs): + prefix = str(ref.id)[:8] + "-" if add_prefix else "" file_info = _to_file_payload(self._get_file_info_for_ref(ref)).file_info for file in file_info: source_uri = ResourcePath(str(file.url)) - relative_path = ResourcePath(file.datastoreRecords.path, forceAbsolute=False) - target_uri = determine_destination_for_retrieved_artifact( - destination, relative_path, preserve_path - ) - # Because signed URLs expire, we want to do the transfer soon - # after retrieving the URL. - target_uri.transfer_from(source_uri, transfer="copy", overwrite=overwrite) - output_uris.append(target_uri) + # For DECam/zip we only want to copy once. + # For zip files we need to unpack so that they can be + # zipped up again if needed. + is_zip = source_uri.getExtension() == ".zip" and "zip-path" in source_uri.fragment + cleaned_source_uri = source_uri.replace(fragment="", query="", params="") + if is_zip: + if cleaned_source_uri not in have_copied: + zipped_artifacts = unpack_zips( + [cleaned_source_uri], requested_ids, destination, preserve_path, overwrite + ) + artifact_map.update(zipped_artifacts) + have_copied[cleaned_source_uri] = cleaned_source_uri + elif cleaned_source_uri not in have_copied: + relative_path = ResourcePath(file.datastoreRecords.path, forceAbsolute=False) + target_uri = determine_destination_for_retrieved_artifact( + destination, relative_path, preserve_path, prefix + ) + # Because signed URLs expire, we want to do the transfer + # soon after retrieving the URL. + target_uri.transfer_from(source_uri, transfer="copy", overwrite=overwrite) + have_copied[cleaned_source_uri] = target_uri + artifact_map[target_uri] = ArtifactIndexInfo.from_single(file.datastoreRecords, ref.id) + else: + target_uri = have_copied[cleaned_source_uri] + artifact_map[target_uri].append(ref.id) + + if write_index: + index = ZipIndex.from_artifact_map(refs, artifact_map, destination) + index.write_index(destination) + + return artifact_map + + def retrieve_artifacts_zip( + self, + refs: Iterable[DatasetRef], + destination: ResourcePathExpression, + overwrite: bool = True, + ) -> ResourcePath: + return retrieve_and_zip(refs, destination, self._retrieve_artifacts, overwrite) - return output_uris + def retrieveArtifacts( + self, + refs: Iterable[DatasetRef], + destination: ResourcePathExpression, + transfer: str = "auto", + preserve_path: bool = True, + overwrite: bool = False, + ) -> list[ResourcePath]: + artifact_map = self._retrieve_artifacts( + refs, + destination, + transfer, + preserve_path, + overwrite, + ) + return list(artifact_map) def exists( self, @@ -498,6 +555,10 @@ def ingest( # Docstring inherited. raise NotImplementedError() + def ingest_zip(self, zip_file: ResourcePathExpression, transfer: str = "auto") -> None: + # Docstring inherited. + raise NotImplementedError() + def export( self, *, diff --git a/python/lsst/daf/butler/script/__init__.py b/python/lsst/daf/butler/script/__init__.py index 964e1ddef7..45c2428a60 100644 --- a/python/lsst/daf/butler/script/__init__.py +++ b/python/lsst/daf/butler/script/__init__.py @@ -35,6 +35,7 @@ from .createRepo import createRepo from .exportCalibs import exportCalibs from .ingest_files import ingest_files +from .ingest_zip import ingest_zip from .queryCollections import queryCollections from .queryDataIds import queryDataIds from .queryDatasets import QueryDatasets diff --git a/python/lsst/daf/butler/script/ingest_zip.py b/python/lsst/daf/butler/script/ingest_zip.py new file mode 100644 index 0000000000..13f1695c31 --- /dev/null +++ b/python/lsst/daf/butler/script/ingest_zip.py @@ -0,0 +1,52 @@ +# This file is part of daf_butler. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (http://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This software is dual licensed under the GNU General Public License and also +# under a 3-clause BSD license. Recipients may choose which of these licenses +# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, +# respectively. If you choose the GPL option then the following text applies +# (but note that there is still no warranty even if you opt for BSD instead): +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +from __future__ import annotations + +__all__ = ["ingest_zip"] + +from .._butler import Butler + + +def ingest_zip( + repo: str, + zip: str, + transfer: str = "auto", +) -> None: + """Ingest a Zip file into the given butler. + + Parameters + ---------- + repo : `str` + URI string of the Butler repo to use. + zip : `str` + URI string of the location of the Zip file. + transfer : `str`, optional + Transfer mode to use for ingest. + """ + butler = Butler.from_config(repo, writeable=True) + + butler.ingest_zip(zip, transfer=transfer) diff --git a/python/lsst/daf/butler/script/queryDatasets.py b/python/lsst/daf/butler/script/queryDatasets.py index 838fe7fe50..739ea8eb30 100644 --- a/python/lsst/daf/butler/script/queryDatasets.py +++ b/python/lsst/daf/butler/script/queryDatasets.py @@ -191,6 +191,7 @@ def __init__( warnings.warn( "No --collections specified. The --collections argument will become mandatory after v28.", FutureWarning, + stacklevel=2, ) glob = list(glob) if not glob: @@ -198,6 +199,7 @@ def __init__( "No dataset types specified. Explicitly specifying dataset types will become mandatory" " after v28. Specify '*' to match the current behavior of querying all dataset types.", FutureWarning, + stacklevel=2, ) # show_uri requires a datastore. diff --git a/python/lsst/daf/butler/script/retrieveArtifacts.py b/python/lsst/daf/butler/script/retrieveArtifacts.py index a39e80862a..842e0fae00 100644 --- a/python/lsst/daf/butler/script/retrieveArtifacts.py +++ b/python/lsst/daf/butler/script/retrieveArtifacts.py @@ -54,6 +54,7 @@ def retrieveArtifacts( transfer: str, preserve_path: bool, clobber: bool, + zip: bool, ) -> list[ResourcePath]: """Parameters are those required for querying datasets plus a destination URI. @@ -89,18 +90,21 @@ def retrieveArtifacts( destination directory, else only the filename will be used. clobber : `bool` If `True` allow transfers to overwrite files at the destination. + zip : `bool` + If `True` retrieve the datasets and place in a zip file. Returns ------- transferred : `list` of `lsst.resources.ResourcePath` - The destination URIs of every transferred artifact. + The destination URIs of every transferred artifact or a list with a + single entry of the name of the zip file. """ query_types = dataset_type or "*" query_collections: tuple[str, ...] = collections or ("*",) butler = Butler.from_config(repo, writeable=False) - # Need to store in list so we can count the number to give some feedback + # Need to store in set so we can count the number to give some feedback # to caller. query = QueryDatasets( butler=butler, @@ -112,10 +116,17 @@ def retrieveArtifacts( order_by=order_by, show_uri=False, ) - refs = list(itertools.chain(*query.getDatasets())) + refs = set(itertools.chain(*query.getDatasets())) log.info("Number of datasets matching query: %d", len(refs)) + if not refs: + return [] + + if not zip: + transferred = butler.retrieveArtifacts( + refs, destination=destination, transfer=transfer, preserve_path=preserve_path, overwrite=clobber + ) + else: + zip_file = butler.retrieve_artifacts_zip(refs, destination=destination, overwrite=clobber) + transferred = [zip_file] - transferred = butler.retrieveArtifacts( - refs, destination=destination, transfer=transfer, preserve_path=preserve_path, overwrite=clobber - ) return transferred diff --git a/python/lsst/daf/butler/tests/hybrid_butler.py b/python/lsst/daf/butler/tests/hybrid_butler.py index ebe8824015..41181de15c 100644 --- a/python/lsst/daf/butler/tests/hybrid_butler.py +++ b/python/lsst/daf/butler/tests/hybrid_butler.py @@ -206,6 +206,14 @@ def find_dataset( **kwargs, ) + def retrieve_artifacts_zip( + self, + refs: Iterable[DatasetRef], + destination: ResourcePathExpression, + overwrite: bool = True, + ) -> ResourcePath: + return self._remote_butler.retrieve_artifacts_zip(refs, destination, overwrite) + def retrieveArtifacts( self, refs: Iterable[DatasetRef], @@ -242,6 +250,10 @@ def _exists_many( def removeRuns(self, names: Iterable[str], unstore: bool = True) -> None: return self._direct_butler.removeRuns(names, unstore) + def ingest_zip(self, zip_file: ResourcePathExpression, transfer: str = "auto") -> None: + # Docstring inherited. + return self._direct_butler.ingest_zip(zip_file, transfer=transfer) + def ingest( self, *datasets: FileDataset, diff --git a/tests/data/zip_index.json b/tests/data/zip_index.json new file mode 100644 index 0000000000..3c37b39a51 --- /dev/null +++ b/tests/data/zip_index.json @@ -0,0 +1,305 @@ +{ + "index_version": "V1", + "refs": { + "container_version": "V1", + "universe_version": 7, + "universe_namespace": "daf_butler", + "dataset_types": { + "raw": { + "name": "raw", + "storageClass": "Exposure", + "dimensions": [ + "instrument", + "detector", + "exposure" + ], + "parentStorageClass": null, + "isCalibration": false + }, + "calexp": { + "name": "calexp", + "storageClass": "ExposureF", + "dimensions": [ + "instrument", + "detector", + "visit" + ], + "parentStorageClass": null, + "isCalibration": false + }, + "isr_metadata": { + "name": "isr_metadata", + "storageClass": "TaskMetadata", + "dimensions": [ + "instrument", + "detector", + "exposure" + ], + "parentStorageClass": null, + "isCalibration": false + } + }, + "compact_refs": { + "3f452f1d-b8a5-5202-8190-4e723004a6ba": { + "dataset_type_name": "raw", + "run": "DECam/raw/all", + "data_id": { + "instrument": "DECam", + "detector": 56, + "exposure": 411371 + } + }, + "5d32ede2-f2ca-5d3b-b900-bba4a00c3bf9": { + "dataset_type_name": "raw", + "run": "DECam/raw/all", + "data_id": { + "instrument": "DECam", + "detector": 60, + "exposure": 411371 + } + }, + "d24d9a3c-1004-42e7-88bb-a38ebf364084": { + "dataset_type_name": "calexp", + "run": "demo_collection", + "data_id": { + "instrument": "HSC", + "detector": 10, + "visit": 903342 + } + }, + "ac08556d-d6ab-43c5-9307-3bb7132548fd": { + "dataset_type_name": "isr_metadata", + "run": "demo_collection", + "data_id": { + "instrument": "HSC", + "detector": 10, + "exposure": 903342 + } + } + } + }, + "artifact_map": { + "Users/timj/work/lsstsw/build/testdata_decam/hits2015-zeroed/c4d_150218_052850_ori.fits.fz": { + "info": { + "formatter": "lsst.obs.decam.rawFormatter.DarkEnergyCameraRawFormatter", + "path": "file:///Users/timj/work/lsstsw/build/testdata_decam/hits2015-zeroed/c4d_150218_052850_ori.fits.fz", + "storage_class": "Exposure", + "component": null, + "checksum": null, + "file_size": 198720 + }, + "ids": [ + "3f452f1d-b8a5-5202-8190-4e723004a6ba", + "5d32ede2-f2ca-5d3b-b900-bba4a00c3bf9" + ] + }, + "demo_collection/calexp.apCorrMap/20130617/r/HSC-R/903342/calexp_apCorrMap_HSC_r_HSC-R_903342_1_36_demo_collection.fits": { + "info": { + "formatter": "lsst.obs.base.formatters.fitsGeneric.FitsGenericFormatter", + "path": "demo_collection/calexp.apCorrMap/20130617/r/HSC-R/903342/calexp_apCorrMap_HSC_r_HSC-R_903342_1_36_demo_collection.fits", + "storage_class": "ApCorr", + "component": "apCorrMap", + "checksum": null, + "file_size": 46080 + }, + "ids": [ + "d24d9a3c-1004-42e7-88bb-a38ebf364084" + ] + }, + "demo_collection/calexp.detector/20130617/r/HSC-R/903342/calexp_detector_HSC_r_HSC-R_903342_1_36_demo_collection.fits": { + "info": { + "formatter": "lsst.obs.base.formatters.fitsGeneric.FitsGenericFormatter", + "path": "demo_collection/calexp.detector/20130617/r/HSC-R/903342/calexp_detector_HSC_r_HSC-R_903342_1_36_demo_collection.fits", + "storage_class": "Detector", + "component": "detector", + "checksum": null, + "file_size": 1725120 + }, + "ids": [ + "d24d9a3c-1004-42e7-88bb-a38ebf364084" + ] + }, + "demo_collection/calexp.filter/20130617/r/HSC-R/903342/calexp_filter_HSC_r_HSC-R_903342_1_36_demo_collection.fits": { + "info": { + "formatter": "lsst.obs.base.formatters.fitsGeneric.FitsGenericFormatter", + "path": "demo_collection/calexp.filter/20130617/r/HSC-R/903342/calexp_filter_HSC_r_HSC-R_903342_1_36_demo_collection.fits", + "storage_class": "FilterLabel", + "component": "filter", + "checksum": null, + "file_size": 17280 + }, + "ids": [ + "d24d9a3c-1004-42e7-88bb-a38ebf364084" + ] + }, + "demo_collection/calexp.id/20130617/r/HSC-R/903342/calexp_id_HSC_r_HSC-R_903342_1_36_demo_collection.json": { + "info": { + "formatter": "lsst.daf.butler.formatters.json.JsonFormatter", + "path": "demo_collection/calexp.id/20130617/r/HSC-R/903342/calexp_id_HSC_r_HSC-R_903342_1_36_demo_collection.json", + "storage_class": "int", + "component": "id", + "checksum": null, + "file_size": 9 + }, + "ids": [ + "d24d9a3c-1004-42e7-88bb-a38ebf364084" + ] + }, + "demo_collection/calexp.image/20130617/r/HSC-R/903342/calexp_image_HSC_r_HSC-R_903342_1_36_demo_collection.fits": { + "info": { + "formatter": "lsst.obs.base.formatters.fitsExposure.FitsImageFormatter", + "path": "demo_collection/calexp.image/20130617/r/HSC-R/903342/calexp_image_HSC_r_HSC-R_903342_1_36_demo_collection.fits", + "storage_class": "ImageF", + "component": "image", + "checksum": null, + "file_size": 31271040 + }, + "ids": [ + "d24d9a3c-1004-42e7-88bb-a38ebf364084" + ] + }, + "demo_collection/calexp.mask/20130617/r/HSC-R/903342/calexp_mask_HSC_r_HSC-R_903342_1_36_demo_collection.fits": { + "info": { + "formatter": "lsst.obs.base.formatters.fitsExposure.FitsMaskFormatter", + "path": "demo_collection/calexp.mask/20130617/r/HSC-R/903342/calexp_mask_HSC_r_HSC-R_903342_1_36_demo_collection.fits", + "storage_class": "MaskX", + "component": "mask", + "checksum": null, + "file_size": 627840 + }, + "ids": [ + "d24d9a3c-1004-42e7-88bb-a38ebf364084" + ] + }, + "demo_collection/calexp.metadata/20130617/r/HSC-R/903342/calexp_metadata_HSC_r_HSC-R_903342_1_36_demo_collection.yaml": { + "info": { + "formatter": "lsst.daf.butler.formatters.yaml.YamlFormatter", + "path": "demo_collection/calexp.metadata/20130617/r/HSC-R/903342/calexp_metadata_HSC_r_HSC-R_903342_1_36_demo_collection.yaml", + "storage_class": "PropertyList", + "component": "metadata", + "checksum": null, + "file_size": 14887 + }, + "ids": [ + "d24d9a3c-1004-42e7-88bb-a38ebf364084" + ] + }, + "demo_collection/calexp.photoCalib/20130617/r/HSC-R/903342/calexp_photoCalib_HSC_r_HSC-R_903342_1_36_demo_collection.fits": { + "info": { + "formatter": "lsst.obs.base.formatters.fitsGeneric.FitsGenericFormatter", + "path": "demo_collection/calexp.photoCalib/20130617/r/HSC-R/903342/calexp_photoCalib_HSC_r_HSC-R_903342_1_36_demo_collection.fits", + "storage_class": "PhotoCalib", + "component": "photoCalib", + "checksum": null, + "file_size": 28800 + }, + "ids": [ + "d24d9a3c-1004-42e7-88bb-a38ebf364084" + ] + }, + "demo_collection/calexp.psf/20130617/r/HSC-R/903342/calexp_psf_HSC_r_HSC-R_903342_1_36_demo_collection.fits": { + "info": { + "formatter": "lsst.obs.base.formatters.fitsGeneric.FitsGenericFormatter", + "path": "demo_collection/calexp.psf/20130617/r/HSC-R/903342/calexp_psf_HSC_r_HSC-R_903342_1_36_demo_collection.fits", + "storage_class": "Psf", + "component": "psf", + "checksum": null, + "file_size": 192960 + }, + "ids": [ + "d24d9a3c-1004-42e7-88bb-a38ebf364084" + ] + }, + "demo_collection/calexp.summaryStats/20130617/r/HSC-R/903342/calexp_summaryStats_HSC_r_HSC-R_903342_1_36_demo_collection.yaml": { + "info": { + "formatter": "lsst.daf.butler.formatters.yaml.YamlFormatter", + "path": "demo_collection/calexp.summaryStats/20130617/r/HSC-R/903342/calexp_summaryStats_HSC_r_HSC-R_903342_1_36_demo_collection.yaml", + "storage_class": "ExposureSummaryStats", + "component": "summaryStats", + "checksum": null, + "file_size": 1332 + }, + "ids": [ + "d24d9a3c-1004-42e7-88bb-a38ebf364084" + ] + }, + "demo_collection/calexp.transmissionCurve/20130617/r/HSC-R/903342/calexp_transmissionCurve_HSC_r_HSC-R_903342_1_36_demo_collection.fits": { + "info": { + "formatter": "lsst.obs.base.formatters.fitsGeneric.FitsGenericFormatter", + "path": "demo_collection/calexp.transmissionCurve/20130617/r/HSC-R/903342/calexp_transmissionCurve_HSC_r_HSC-R_903342_1_36_demo_collection.fits", + "storage_class": "TransmissionCurve", + "component": "transmissionCurve", + "checksum": null, + "file_size": 201600 + }, + "ids": [ + "d24d9a3c-1004-42e7-88bb-a38ebf364084" + ] + }, + "demo_collection/calexp.validPolygon/20130617/r/HSC-R/903342/calexp_validPolygon_HSC_r_HSC-R_903342_1_36_demo_collection.fits": { + "info": { + "formatter": "lsst.obs.base.formatters.fitsGeneric.FitsGenericFormatter", + "path": "demo_collection/calexp.validPolygon/20130617/r/HSC-R/903342/calexp_validPolygon_HSC_r_HSC-R_903342_1_36_demo_collection.fits", + "storage_class": "Polygon", + "component": "validPolygon", + "checksum": null, + "file_size": 17280 + }, + "ids": [ + "d24d9a3c-1004-42e7-88bb-a38ebf364084" + ] + }, + "demo_collection/calexp.variance/20130617/r/HSC-R/903342/calexp_variance_HSC_r_HSC-R_903342_1_36_demo_collection.fits": { + "info": { + "formatter": "lsst.obs.base.formatters.fitsExposure.FitsImageFormatter", + "path": "demo_collection/calexp.variance/20130617/r/HSC-R/903342/calexp_variance_HSC_r_HSC-R_903342_1_36_demo_collection.fits", + "storage_class": "ImageF", + "component": "variance", + "checksum": null, + "file_size": 24782400 + }, + "ids": [ + "d24d9a3c-1004-42e7-88bb-a38ebf364084" + ] + }, + "demo_collection/calexp.visitInfo/20130617/r/HSC-R/903342/calexp_visitInfo_HSC_r_HSC-R_903342_1_36_demo_collection.fits": { + "info": { + "formatter": "lsst.obs.base.formatters.fitsGeneric.FitsGenericFormatter", + "path": "demo_collection/calexp.visitInfo/20130617/r/HSC-R/903342/calexp_visitInfo_HSC_r_HSC-R_903342_1_36_demo_collection.fits", + "storage_class": "VisitInfo", + "component": "visitInfo", + "checksum": null, + "file_size": 28800 + }, + "ids": [ + "d24d9a3c-1004-42e7-88bb-a38ebf364084" + ] + }, + "demo_collection/calexp.wcs/20130617/r/HSC-R/903342/calexp_wcs_HSC_r_HSC-R_903342_1_36_demo_collection.fits": { + "info": { + "formatter": "lsst.obs.base.formatters.fitsGeneric.FitsGenericFormatter", + "path": "demo_collection/calexp.wcs/20130617/r/HSC-R/903342/calexp_wcs_HSC_r_HSC-R_903342_1_36_demo_collection.fits", + "storage_class": "Wcs", + "component": "wcs", + "checksum": null, + "file_size": 25920 + }, + "ids": [ + "d24d9a3c-1004-42e7-88bb-a38ebf364084" + ] + }, + "demo_collection/isr_metadata/20130617/HSCA90334200/isr_metadata_HSC_HSC-R_HSCA90334200_1_36_demo_collection.json": { + "info": { + "formatter": "lsst.daf.butler.formatters.json.JsonFormatter", + "path": "demo_collection/isr_metadata/20130617/HSCA90334200/isr_metadata_HSC_HSC-R_HSCA90334200_1_36_demo_collection.json", + "storage_class": "TaskMetadata", + "component": null, + "checksum": null, + "file_size": 7686 + }, + "ids": [ + "ac08556d-d6ab-43c5-9307-3bb7132548fd" + ] + } + } +} diff --git a/tests/test_butler.py b/tests/test_butler.py index ac696dba0a..9f24d6ee85 100644 --- a/tests/test_butler.py +++ b/tests/test_butler.py @@ -88,6 +88,7 @@ def mock_aws(*args: Any, **kwargs: Any) -> Any: # type: ignore[no-untyped-def] ) from lsst.daf.butler.datastore import NullDatastore from lsst.daf.butler.datastore.file_templates import FileTemplate, FileTemplateValidationError +from lsst.daf.butler.datastores.file_datastore.retrieve_artifacts import ZipIndex from lsst.daf.butler.datastores.fileDatastore import FileDatastore from lsst.daf.butler.direct_butler import DirectButler from lsst.daf.butler.registry import ( @@ -361,6 +362,8 @@ def runPutGetTest(self, storageClass: StorageClass, datasetTypeName: str) -> But ) self.assertGreater(len(transferred), 0) artifacts = list(ResourcePath.findFileResources([destination])) + # Filter out the index file. + artifacts = [a for a in artifacts if a.basename() != ZipIndex.index_name] self.assertEqual(set(transferred), set(artifacts)) for artifact in transferred: @@ -921,6 +924,53 @@ def testPytypePutCoercion(self) -> None: test_dict3 = butler.get(this_type, dataId=dataId, visit=425) self.assertEqual(get_full_type_name(test_dict3), "dict") + def test_ingest_zip(self) -> None: + """Create butler, export data, delete data, import from Zip.""" + butler, dataset_type = self.create_butler( + run=self.default_run, storageClass="StructuredData", datasetTypeName="metrics" + ) + + metric = makeExampleMetrics() + refs = [] + for visit in (423, 424, 425): + ref = butler.put(metric, dataset_type, instrument="DummyCamComp", visit=visit) + refs.append(ref) + + # Retrieve a Zip file. + with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir: + zip = butler.retrieve_artifacts_zip(refs, destination=tmpdir) + + # Ingest will fail. + with self.assertRaises(ConflictingDefinitionError): + butler.ingest_zip(zip) + + # Clear out the collection. + butler.removeRuns([self.default_run], unstore=True) + self.assertFalse(butler.exists(refs[0])) + + butler.ingest_zip(zip, transfer="copy") + + # Check that the refs can be read again. + _ = [butler.get(ref) for ref in refs] + + uri = butler.getURI(refs[2]) + self.assertTrue(uri.exists()) + + # Delete one dataset. The Zip file should still exist and allow + # remaining refs to be read. + butler.pruneDatasets([refs[0]], purge=True, unstore=True) + self.assertTrue(uri.exists()) + + metric2 = butler.get(refs[1]) + self.assertEqual(metric2, metric, msg=f"{metric2} != {metric}") + + butler.removeRuns([self.default_run], unstore=True) + self.assertFalse(uri.exists()) + self.assertFalse(butler.exists(refs[-1])) + + with self.assertRaises(ValueError): + butler.retrieve_artifacts_zip([], destination=".") + def testIngest(self) -> None: butler = self.create_empty_butler(run=self.default_run) @@ -2137,6 +2187,9 @@ class InMemoryDatastoreButlerTestCase(ButlerTests, unittest.TestCase): def testIngest(self) -> None: pass + def test_ingest_zip(self) -> None: + pass + class ClonedSqliteButlerTestCase(InMemoryDatastoreButlerTestCase, unittest.TestCase): """Test that a Butler with a Sqlite registry still works after cloning.""" diff --git a/tests/test_cliCmdPruneDatasets.py b/tests/test_cliCmdPruneDatasets.py index 59c4c2274d..8c16097151 100644 --- a/tests/test_cliCmdPruneDatasets.py +++ b/tests/test_cliCmdPruneDatasets.py @@ -80,6 +80,9 @@ def getRefs(): def makeQueryDatasets(*args, **kwargs): """Return a query datasets object.""" + if not kwargs.get("glob"): + # Use all dataset types if not specified. + kwargs["glob"] = ("*",) return QueryDatasets(*args, **kwargs) @@ -98,7 +101,7 @@ def setUp(self): @staticmethod def makeQueryDatasetsArgs(*, repo, **kwargs): expectedArgs = dict( - repo=repo, collections=(), where="", find_first=True, show_uri=False, glob=tuple() + repo=repo, collections=("*",), where="", find_first=True, show_uri=False, glob=tuple() ) expectedArgs.update(kwargs) return expectedArgs diff --git a/tests/test_cliCmdRetrieveArtifacts.py b/tests/test_cliCmdRetrieveArtifacts.py index 85e7bd9396..28690816dd 100644 --- a/tests/test_cliCmdRetrieveArtifacts.py +++ b/tests/test_cliCmdRetrieveArtifacts.py @@ -75,7 +75,7 @@ def testRetrieveAll(self): self.assertTrue(result.stdout.endswith(": 6\n"), f"Expected 6 got: {result.stdout}") artifacts = self.find_files(destdir) - self.assertEqual(len(artifacts), 6, f"Expected 6 artifacts: {artifacts}") + self.assertEqual(len(artifacts), 7, f"Expected 7 artifacts including index: {artifacts}") self.assertIn(f"{destdir}{prefix}", str(artifacts[1])) def testRetrieveSubset(self): @@ -95,7 +95,27 @@ def testRetrieveSubset(self): self.assertEqual(result.exit_code, 0, clickResultMsg(result)) self.assertTrue(result.stdout.endswith(": 3\n"), f"Expected 3 got: {result.stdout}") artifacts = self.find_files(destdir) - self.assertEqual(len(artifacts), 3, f"Expected 3 artifacts: {artifacts}") + self.assertEqual(len(artifacts), 4, f"Expected 4 artifacts including index: {artifacts}") + + def testRetrieveAsZip(self): + runner = LogCliRunner() + with runner.isolated_filesystem(): + destdir = "tmp1/" + result = runner.invoke( + cli, + [ + "retrieve-artifacts", + self.root, + destdir, + "--where", + "instrument='DummyCamComp' AND visit=423", + "--zip", + ], + ) + self.assertEqual(result.exit_code, 0, clickResultMsg(result)) + self.assertIn(".zip", result.stdout) + artifacts = self.find_files(destdir) + self.assertEqual(len(artifacts), 1, f"Expected one zip file: {artifacts}") def testOverwriteLink(self): runner = LogCliRunner() diff --git a/tests/test_datasets.py b/tests/test_datasets.py index fc13898cde..0e8123254e 100644 --- a/tests/test_datasets.py +++ b/tests/test_datasets.py @@ -26,6 +26,7 @@ # along with this program. If not, see . import copy +import os import pickle import unittest import uuid @@ -34,12 +35,18 @@ DataCoordinate, DatasetRef, DatasetType, + DimensionConfig, DimensionUniverse, FileDataset, + SerializedDatasetRefContainerV1, StorageClass, StorageClassFactory, ) from lsst.daf.butler.datastore.stored_file_info import StoredFileInfo +from lsst.daf.butler.datastores.file_datastore.retrieve_artifacts import ZipIndex +from lsst.resources import ResourcePath + +TESTDIR = os.path.abspath(os.path.dirname(__file__)) """Tests for datasets module. """ @@ -738,6 +745,41 @@ def testFileDataset(self) -> None: with self.assertRaises(ValueError): FileDataset(path="other.yaml", refs=[ref, ref2]) + def test_container(self) -> None: + ref1 = DatasetRef(self.datasetType, self.dataId, run="somerun") + ref2 = ref1.replace(run="somerun2") + + container = SerializedDatasetRefContainerV1.from_refs([ref1, ref2]) + self.assertEqual(len(container), 2) + + new_refs = container.to_refs(universe=self.universe) + self.assertEqual(new_refs, [ref1, ref2]) + + +class ZipIndexTestCase(unittest.TestCase): + """Test that a ZipIndex can be read.""" + + def test_v1(self): + """Read a v1 serialization.""" + path = os.path.join(TESTDIR, "data", "zip_index.json") + with open(path) as fd: + index = ZipIndex.model_validate_json(fd.read()) + + self.assertEqual(index.index_version, "V1") + self.assertEqual(len(index), 17) + self.assertEqual(len(index.refs), 4) + + # Reconstruct the refs using the required universe. + universe_version = index.refs.universe_version + namespace = index.refs.universe_namespace + universe_path = ResourcePath( + f"resource://lsst.daf.butler/configs/old_dimensions/{namespace}_universe{universe_version}.yaml" + ) + dimension_config = DimensionConfig(universe_path) + universe = DimensionUniverse(dimension_config) + refs = index.refs.to_refs(universe=universe) + self.assertEqual(len(refs), 4) + if __name__ == "__main__": unittest.main() diff --git a/tests/test_quantumBackedButler.py b/tests/test_quantumBackedButler.py index 984abc22e6..97e5276057 100644 --- a/tests/test_quantumBackedButler.py +++ b/tests/test_quantumBackedButler.py @@ -27,6 +27,7 @@ import json import os +import tempfile import unittest import unittest.mock from typing import cast @@ -43,6 +44,7 @@ RegistryConfig, StorageClass, ) +from lsst.daf.butler.datastores.file_datastore.retrieve_artifacts import ZipIndex from lsst.daf.butler.direct_butler import DirectButler from lsst.daf.butler.registry import _RegistryFactory from lsst.daf.butler.tests.utils import makeTestTempDir, removeTestTempDir @@ -218,6 +220,16 @@ def test_getput(self) -> None: self.assertEqual(qbb._actual_output_refs, set(self.output_refs)) + # Retrieve them as a Zip artifact. + with tempfile.TemporaryDirectory() as tmpdir: + zip = qbb.retrieve_artifacts_zip(self.output_refs, destination=tmpdir) + + index = ZipIndex.from_zip_file(zip) + zip_refs = index.refs.to_refs(universe=qbb.dimensions) + self.assertEqual(len(zip_refs), 4) + self.assertEqual(set(zip_refs), set(self.output_refs)) + self.assertEqual(len(index.artifact_map), 4) # Count number of artifacts in Zip. + def test_getDeferred(self) -> None: """Test for getDeferred method""" quantum = self.make_quantum() diff --git a/tests/test_remote_butler.py b/tests/test_remote_butler.py index e1ed3522c2..891d40b868 100644 --- a/tests/test_remote_butler.py +++ b/tests/test_remote_butler.py @@ -138,6 +138,17 @@ def test_retrieve_artifacts_security(self): ResourcePath("/tmp/output_directory/not/relative.txt"), ) + # Test prefixing. + self.assertEqual( + determine_destination_for_retrieved_artifact( + ResourcePath("/tmp/output_directory/"), + ResourcePath("file:///not/relative.txt"), + preserve_path=False, + prefix="prefix-", + ), + ResourcePath("/tmp/output_directory/prefix-relative.txt"), + ) + class RemoteButlerRegistryTests(RegistryTests): """Tests for RemoteButler's `Registry` shim."""