Skip to content

Commit

Permalink
Support zip unpacking in retrieve artifacts
Browse files Browse the repository at this point in the history
  • Loading branch information
timj committed Oct 28, 2024
1 parent 5829de4 commit 9dc43b4
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 11 deletions.
24 changes: 17 additions & 7 deletions python/lsst/daf/butler/datastores/fileDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
ArtifactIndexInfo,
ZipIndex,
determine_destination_for_retrieved_artifact,
unpack_zips,
)
from lsst.daf.butler.datastores.fileDatastoreClient import (
FileDatastoreGetPayload,
Expand Down Expand Up @@ -2100,6 +2101,7 @@ def retrieveArtifacts(
# This also helps filter out duplicate DatasetRef in the request
# that will map to the same underlying file transfer.
to_transfer: dict[ResourcePath, ResourcePath] = {}
zips_to_transfer: set[ResourcePath] = set()

# Retrieve all the records in bulk indexed by ref.id.
records = self._get_stored_records_associated_with_refs(refs, ignore_datastore_records=True)
Expand All @@ -2125,10 +2127,16 @@ def retrieveArtifacts(
location = info.file_location(self.locationFactory)
source_uri = location.uri
# For DECam/zip we only want to copy once.
# For zip files we need to unpack so that they can be
# zipped up again if needed.
is_zip = source_uri.getExtension() == ".zip" and "zip-path" in source_uri.fragment
# We need to remove fragments for consistency.
# TODO: Unzip zip files on retrieval and merge indexes.
cleaned_source_uri = source_uri.replace(fragment="", query="", params="")
if cleaned_source_uri not in to_transfer:
if is_zip:
# Assume the DatasetRef definitions are within the Zip
# file itself and so can be dropped from loop.
zips_to_transfer.add(cleaned_source_uri)

Check warning on line 2138 in python/lsst/daf/butler/datastores/fileDatastore.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/datastores/fileDatastore.py#L2138

Added line #L2138 was not covered by tests
elif cleaned_source_uri not in to_transfer:
target_uri = determine_destination_for_retrieved_artifact(
destination, location.pathInStore, preserve_path, prefix
)
Expand All @@ -2143,11 +2151,15 @@ def retrieveArtifacts(
for source_uri, target_uri in to_transfer.items():
target_uri.transfer_from(source_uri, transfer=transfer, overwrite=overwrite)

# Transfer the Zip files and unpack them.
zipped_artifacts = unpack_zips(zips_to_transfer, requested_ids, destination, preserve_path)
artifact_map.update(zipped_artifacts)

if write_index:
index = ZipIndex.from_artifact_map(refs, artifact_map, destination)
index.write_index(destination)

return list(to_transfer.values()), artifact_map
return list(artifact_map.keys()), artifact_map

def ingest_zip(self, zip_path: ResourcePath, transfer: str | None) -> None:
"""Ingest an indexed Zip file and contents.
Expand Down Expand Up @@ -2203,10 +2215,8 @@ def ingest_zip(self, zip_path: ResourcePath, transfer: str | None) -> None:
# Reference in original location.
tgtLocation = None

Check warning on line 2216 in python/lsst/daf/butler/datastores/fileDatastore.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/datastores/fileDatastore.py#L2216

Added line #L2216 was not covered by tests
else:
zip_name = index.calculate_zip_file_name()
# Zip name is UUID so add subdir of first few characters of UUID
# to spread things out if there are many Zip files.
tgtLocation = self.locationFactory.fromPath(f"zips/{zip_name[:4]}/{zip_name}")
# Name the zip file based on index contents.
tgtLocation = self.locationFactory.fromPath(index.calculate_zip_file_path_in_store())
if not tgtLocation.uri.dirname().exists():
log.debug("Folder %s does not exist yet.", tgtLocation.uri.dirname())
tgtLocation.uri.dirname().mkdir()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

from __future__ import annotations

__all__ = ("determine_destination_for_retrieved_artifact", "retrieve_and_zip", "ZipIndex")
__all__ = ("determine_destination_for_retrieved_artifact", "retrieve_and_zip", "unpack_zips", "ZipIndex")

import logging
import tempfile
Expand Down Expand Up @@ -209,6 +209,29 @@ def from_single(cls, info: SerializedStoredFileInfo, id_: uuid.UUID) -> Self:
"""
return cls(info=info, ids=[id_])

def subset(self, ids: Iterable[uuid.UUID]) -> Self:
"""Replace the IDs with a subset of the IDs and return a new instance.
Parameters
----------
ids : `~collections.abc.Iterable` [ `uuid.UUID` ]
Subset of IDs to keep.
Returns
-------
subsetted : `ArtifactIndexInfo`
New instance with the requested subset.
Raises
------
ValueError
Raised if the given IDs is not a subset of the current IDs.
"""
subset = set(ids)

Check warning on line 230 in python/lsst/daf/butler/datastores/file_datastore/retrieve_artifacts.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/datastores/file_datastore/retrieve_artifacts.py#L230

Added line #L230 was not covered by tests
if subset - self.ids:
raise ValueError(f"Given subset of {subset} is not a subset of {self.ids}")
return type(self)(ids=subset, info=self.info.model_copy())

Check warning on line 233 in python/lsst/daf/butler/datastores/file_datastore/retrieve_artifacts.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/datastores/file_datastore/retrieve_artifacts.py#L232-L233

Added lines #L232 - L233 were not covered by tests


class ZipIndex(BaseModel):
"""Index of a Zip file of Butler datasets.
Expand Down Expand Up @@ -264,6 +287,18 @@ def calculate_zip_file_name(self) -> str:
"""
return f"{self.generate_uuid5()}.zip"

def calculate_zip_file_path_in_store(self) -> str:
"""Calculate the relative path inside a datastore that should be
used for this Zip file.
Returns
-------
path_in_store : `str`
Relative path to use for Zip file in datastore.
"""
zip_name = self.calculate_zip_file_name()
return f"zips/{zip_name[:4]}/{zip_name}"

def write_index(self, dir: ResourcePath) -> ResourcePath:
"""Write the index to the specified directory.
Expand Down Expand Up @@ -478,3 +513,63 @@ def retrieve_and_zip(
zip.write(path.ospath, name)

return zip_path


def unpack_zips(
zips_to_transfer: Iterable[ResourcePath],
allowed_ids: set[uuid.UUID],
destination: ResourcePath,
preserve_path: bool,
) -> dict[ResourcePath, ArtifactIndexInfo]:
"""Transfer the Zip files and unpack them in the destination directory.
Parameters
----------
zips_to_transfer : `~collections.abc.Iterable` \
[ `~lsst.resources.ResourcePath` ]
Paths to Zip files to unpack. These must be Zip files that include
the index information and were created by the Butler.
allowed_ids : `set` [ `uuid.UUID` ]
All the possible dataset IDs for which artifacts should be extracted
from the Zip file. If an ID in the Zip file is not present in this
list the artifact will not be extracted from the Zip.
destination : `~lsst.resources.ResourcePath`
Output destination for the Zip contents.
preserve_path : `bool`
Whether to include subdirectories during extraction. If `True` a
directory will be made per Zip.
Returns
-------
artifact_map : `dict` \
[ `~lsst.resources.ResourcePath`, `ArtifactIndexInfo` ]
Path linking Zip contents location to associated artifact information.
"""
artifact_map: dict[ResourcePath, ArtifactIndexInfo] = {}
for source_uri in zips_to_transfer:
_LOG.debug("Unpacking zip file %s", source_uri)

Check warning on line 550 in python/lsst/daf/butler/datastores/file_datastore/retrieve_artifacts.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/datastores/file_datastore/retrieve_artifacts.py#L550

Added line #L550 was not covered by tests
# Assume that downloading to temporary location is more efficient
# than trying to read the contents remotely.
with ResourcePath.temporary_uri(suffix=".zip") as temp:
temp.transfer_from(source_uri, transfer="auto")
index = ZipIndex.from_zip_file(temp)

Check warning on line 555 in python/lsst/daf/butler/datastores/file_datastore/retrieve_artifacts.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/datastores/file_datastore/retrieve_artifacts.py#L553-L555

Added lines #L553 - L555 were not covered by tests

if preserve_path:
subdir = ResourcePath(

Check warning on line 558 in python/lsst/daf/butler/datastores/file_datastore/retrieve_artifacts.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/datastores/file_datastore/retrieve_artifacts.py#L558

Added line #L558 was not covered by tests
index.calculate_zip_file_path_in_store(), forceDirectory=False, forceAbsolute=False
).dirname()
outdir = destination.join(subdir)

Check warning on line 561 in python/lsst/daf/butler/datastores/file_datastore/retrieve_artifacts.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/datastores/file_datastore/retrieve_artifacts.py#L561

Added line #L561 was not covered by tests
else:
outdir = destination
outdir.mkdir()
with temp.open("rb") as fd, zipfile.ZipFile(fd) as zf:

Check warning on line 565 in python/lsst/daf/butler/datastores/file_datastore/retrieve_artifacts.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/datastores/file_datastore/retrieve_artifacts.py#L563-L565

Added lines #L563 - L565 were not covered by tests
for path_in_zip, artifact_info in index.artifact_map.items():
# Skip if this specific dataset ref is not requested.
included_ids = artifact_info.ids & allowed_ids

Check warning on line 568 in python/lsst/daf/butler/datastores/file_datastore/retrieve_artifacts.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/datastores/file_datastore/retrieve_artifacts.py#L568

Added line #L568 was not covered by tests
if included_ids:
# Do not apply a new prefix since the zip file
# should already have a prefix.
zf.extract(path_in_zip, path=outdir.ospath)
output_path = outdir.join(path_in_zip, forceDirectory=False)
artifact_map[output_path] = artifact_info.subset(included_ids)

Check warning on line 574 in python/lsst/daf/butler/datastores/file_datastore/retrieve_artifacts.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/datastores/file_datastore/retrieve_artifacts.py#L572-L574

Added lines #L572 - L574 were not covered by tests
return artifact_map
18 changes: 15 additions & 3 deletions python/lsst/daf/butler/remote_butler/_remote_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
ZipIndex,
determine_destination_for_retrieved_artifact,
retrieve_and_zip,
unpack_zips,
)
from lsst.daf.butler.datastores.fileDatastoreClient import (
FileDatastoreGetPayload,
Expand Down Expand Up @@ -434,6 +435,7 @@ def _retrieve_artifacts(
if transfer not in ("auto", "copy"):
raise ValueError("Only 'copy' and 'auto' transfer modes are supported.")

requested_ids = {ref.id for ref in refs}
output_uris: list[ResourcePath] = []
have_copied: dict[ResourcePath, ResourcePath] = {}
artifact_map: dict[ResourcePath, ArtifactIndexInfo] = {}
Expand All @@ -442,10 +444,20 @@ def _retrieve_artifacts(
file_info = _to_file_payload(self._get_file_info_for_ref(ref)).file_info
for file in file_info:
source_uri = ResourcePath(str(file.url))
# For decam/zip situation we only want to copy once.
# TODO: Unzip zip files on retrieval and merge indexes.
# For DECam/zip we only want to copy once.
# For zip files we need to unpack so that they can be
# zipped up again if needed.
is_zip = source_uri.getExtension() == ".zip" and "zip-path" in source_uri.fragment
cleaned_source_uri = source_uri.replace(fragment="", query="", params="")
if cleaned_source_uri not in have_copied:
if is_zip:
if cleaned_source_uri not in have_copied:
zipped_artifacts = unpack_zips(

Check warning on line 454 in python/lsst/daf/butler/remote_butler/_remote_butler.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/remote_butler/_remote_butler.py#L454

Added line #L454 was not covered by tests
[cleaned_source_uri], requested_ids, destination, preserve_path
)
artifact_map.update(zipped_artifacts)
have_copied[cleaned_source_uri] = cleaned_source_uri
output_uris.extend(artifact_map.keys())

Check warning on line 459 in python/lsst/daf/butler/remote_butler/_remote_butler.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/remote_butler/_remote_butler.py#L457-L459

Added lines #L457 - L459 were not covered by tests
elif cleaned_source_uri not in have_copied:
relative_path = ResourcePath(file.datastoreRecords.path, forceAbsolute=False)
target_uri = determine_destination_for_retrieved_artifact(
destination, relative_path, preserve_path, prefix
Expand Down

0 comments on commit 9dc43b4

Please sign in to comment.