From ff6ea4a77d3cf3c6c8923f1561f77555b2ef8cff Mon Sep 17 00:00:00 2001 From: Tim Jenness Date: Thu, 23 Jan 2025 17:26:27 -0700 Subject: [PATCH] Add ability to pass provenance through butler.put --- python/lsst/daf/butler/__init__.py | 1 + python/lsst/daf/butler/_butler.py | 5 ++ python/lsst/daf/butler/_dataset_provenance.py | 67 +++++++++++++++++++ python/lsst/daf/butler/_formatter.py | 7 ++ python/lsst/daf/butler/_limited_butler.py | 6 +- python/lsst/daf/butler/_quantum_backed.py | 5 +- .../daf/butler/_storage_class_delegate.py | 9 ++- .../lsst/daf/butler/datastore/_datastore.py | 12 +++- .../daf/butler/datastores/chainedDatastore.py | 9 ++- .../daf/butler/datastores/fileDatastore.py | 26 +++++-- .../butler/datastores/inMemoryDatastore.py | 10 ++- .../butler/direct_butler/_direct_butler.py | 9 ++- .../butler/remote_butler/_remote_butler.py | 2 + python/lsst/daf/butler/tests/hybrid_butler.py | 6 +- 14 files changed, 153 insertions(+), 21 deletions(-) create mode 100644 python/lsst/daf/butler/_dataset_provenance.py diff --git a/python/lsst/daf/butler/__init__.py b/python/lsst/daf/butler/__init__.py index acbfd4e929..c323241de7 100644 --- a/python/lsst/daf/butler/__init__.py +++ b/python/lsst/daf/butler/__init__.py @@ -47,6 +47,7 @@ from ._config_support import LookupKey from ._dataset_association import * from ._dataset_existence import * +from ._dataset_provenance import * from ._dataset_ref import * from ._dataset_type import * from ._deferredDatasetHandle import * diff --git a/python/lsst/daf/butler/_butler.py b/python/lsst/daf/butler/_butler.py index c8f350e5f5..b68c148d04 100644 --- a/python/lsst/daf/butler/_butler.py +++ b/python/lsst/daf/butler/_butler.py @@ -59,6 +59,7 @@ if TYPE_CHECKING: from ._dataset_existence import DatasetExistence + from ._dataset_provenance import DatasetProvenance from ._dataset_ref import DatasetId, DatasetRef from ._dataset_type import DatasetType from ._deferredDatasetHandle import DeferredDatasetHandle @@ -664,6 +665,7 @@ def put( dataId: DataId | None = None, *, run: str | None = None, + provenance: DatasetProvenance | None = None, **kwargs: Any, ) -> DatasetRef: """Store and register a dataset. @@ -683,6 +685,9 @@ def put( run : `str`, optional The name of the run the dataset should be added to, overriding ``self.run``. Not used if a resolved `DatasetRef` is provided. + provenance : `DatasetProvenance` or `None`, optional + Any provenance that should be attached to the serialized dataset. + Not supported by all serialization mechanisms. **kwargs Additional keyword arguments used to augment or construct a `DataCoordinate`. See `DataCoordinate.standardize` diff --git a/python/lsst/daf/butler/_dataset_provenance.py b/python/lsst/daf/butler/_dataset_provenance.py new file mode 100644 index 0000000000..812015d8b8 --- /dev/null +++ b/python/lsst/daf/butler/_dataset_provenance.py @@ -0,0 +1,67 @@ +# This file is part of daf_butler. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (http://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This software is dual licensed under the GNU General Public License and also +# under a 3-clause BSD license. Recipients may choose which of these licenses +# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, +# respectively. If you choose the GPL option then the following text applies +# (but note that there is still no warranty even if you opt for BSD instead): +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from __future__ import annotations + +__all__ = ("DatasetProvenance",) + +import typing +import uuid + +import pydantic + +from ._dataset_ref import DatasetRef, SerializedDatasetRef + + +class DatasetProvenance(pydantic.BaseModel): + """Provenance of a single `DatasetRef`.""" + + inputs: list[SerializedDatasetRef] = pydantic.Field(default_factory=list) + """The input datasets.""" + quantum_id: uuid.UUID | None = None + """Identifier of the Quantum that was executed.""" + _uuids: set[uuid.UUID] = pydantic.PrivateAttr(default_factory=set) + + @pydantic.model_validator(mode="after") + def populate_cache(self) -> typing.Self: + for ref in self.inputs: + self._uuids.add(ref.id) + return self + + def add_input(self, ref: DatasetRef) -> None: + """Add an input dataset to the provenance. + + Parameters + ---------- + ref : `DatasetRef` + A dataset to register as an input. + """ + if ref.id in self._uuids: + # Already registered. + return + self._uuids.add(ref.id) + self.inputs.append(ref.to_simple()) diff --git a/python/lsst/daf/butler/_formatter.py b/python/lsst/daf/butler/_formatter.py index 4ebfbfaa0b..637e7f35ff 100644 --- a/python/lsst/daf/butler/_formatter.py +++ b/python/lsst/daf/butler/_formatter.py @@ -60,6 +60,7 @@ log = logging.getLogger(__name__) if TYPE_CHECKING: + from ._dataset_provenance import DatasetProvenance from ._dataset_ref import DatasetRef from ._dataset_type import DatasetType from ._storage_class import StorageClass @@ -98,6 +99,10 @@ class FormatterV2: Parameters to control how the dataset is serialized. write_recipes : `dict`, optional Detailed write recipes indexed by recipe name. + provenance : `DatasetProvenance` or `None`, optional + Any provenance that should be attached to the serialized dataset. + Can be ignored by a formatter. + **kwargs Additional arguments that will be ignored but allow for `Formatter` V1 parameters to be given. @@ -169,6 +174,7 @@ def __init__( ref: DatasetRef, write_parameters: Mapping[str, Any] | None = None, write_recipes: Mapping[str, Any] | None = None, + provenance: DatasetProvenance | None = None, # Compatibility parameters. Unused in v2. **kwargs: Any, ): @@ -198,6 +204,7 @@ def __init__( self._write_parameters = write_parameters self._write_recipes = self.validate_write_recipes(write_recipes) + self._provenance = provenance def __str__(self) -> str: return f"{self.name()}@{self.file_descriptor.location.uri}" diff --git a/python/lsst/daf/butler/_limited_butler.py b/python/lsst/daf/butler/_limited_butler.py index 38d4fcdc60..da04e6c5e0 100644 --- a/python/lsst/daf/butler/_limited_butler.py +++ b/python/lsst/daf/butler/_limited_butler.py @@ -36,6 +36,7 @@ from lsst.resources import ResourcePath +from ._dataset_provenance import DatasetProvenance from ._dataset_ref import DatasetRef from ._deferredDatasetHandle import DeferredDatasetHandle from ._storage_class import StorageClass, StorageClassFactory @@ -64,7 +65,7 @@ def isWriteable(self) -> bool: raise NotImplementedError() @abstractmethod - def put(self, obj: Any, ref: DatasetRef, /) -> DatasetRef: + def put(self, obj: Any, ref: DatasetRef, /, *, provenance: DatasetProvenance | None = None) -> DatasetRef: """Store a dataset that already has a UUID and ``RUN`` collection. Parameters @@ -73,6 +74,9 @@ def put(self, obj: Any, ref: DatasetRef, /) -> DatasetRef: The dataset. ref : `DatasetRef` Resolved reference for a not-yet-stored dataset. + provenance : `DatasetProvenance` or `None`, optional + Any provenance that should be attached to the serialized dataset. + Not supported by all serialization mechanisms. Returns ------- diff --git a/python/lsst/daf/butler/_quantum_backed.py b/python/lsst/daf/butler/_quantum_backed.py index 0f3909dde0..de776357df 100644 --- a/python/lsst/daf/butler/_quantum_backed.py +++ b/python/lsst/daf/butler/_quantum_backed.py @@ -43,6 +43,7 @@ from ._butler_config import ButlerConfig from ._config import Config +from ._dataset_provenance import DatasetProvenance from ._dataset_ref import DatasetId, DatasetRef from ._dataset_type import DatasetType from ._deferredDatasetHandle import DeferredDatasetHandle @@ -453,11 +454,11 @@ def dimensions(self) -> DimensionUniverse: # Docstring inherited. return self._dimensions - def put(self, obj: Any, ref: DatasetRef, /) -> DatasetRef: + def put(self, obj: Any, ref: DatasetRef, /, provenance: DatasetProvenance | None = None) -> DatasetRef: # Docstring inherited. if ref.id not in self._predicted_outputs: raise RuntimeError("Cannot `put` dataset that was not predicted as an output.") - self._datastore.put(obj, ref) + self._datastore.put(obj, ref, provenance=provenance) self._actual_output_refs.add(ref) return ref diff --git a/python/lsst/daf/butler/_storage_class_delegate.py b/python/lsst/daf/butler/_storage_class_delegate.py index 072a063068..dc8d881762 100644 --- a/python/lsst/daf/butler/_storage_class_delegate.py +++ b/python/lsst/daf/butler/_storage_class_delegate.py @@ -40,7 +40,7 @@ from lsst.utils.introspection import get_full_type_name if TYPE_CHECKING: - from lsst.daf.butler import DatasetRef + from lsst.daf.butler import DatasetProvenance, DatasetRef from ._storage_class import StorageClass @@ -335,7 +335,9 @@ def disassemble( return components - def add_provenance(self, inMemoryDataset: Any, ref: DatasetRef) -> Any: + def add_provenance( + self, inMemoryDataset: Any, ref: DatasetRef, provenance: DatasetProvenance | None = None + ) -> Any: """Add provenance to the composite dataset. Parameters @@ -344,6 +346,9 @@ def add_provenance(self, inMemoryDataset: Any, ref: DatasetRef) -> Any: The composite dataset to serialize. ref : `DatasetRef` The dataset associated with this in-memory dataset. + provenance : `DatasetProvenance` or `None`, optional + Any provenance that should be attached to the serialized dataset. + Can be ignored by a delegate. Returns ------- diff --git a/python/lsst/daf/butler/datastore/_datastore.py b/python/lsst/daf/butler/datastore/_datastore.py index 7b09beda60..bc30ba6025 100644 --- a/python/lsst/daf/butler/datastore/_datastore.py +++ b/python/lsst/daf/butler/datastore/_datastore.py @@ -61,6 +61,7 @@ from .. import ddl from .._config_support import LookupKey + from .._dataset_provenance import DatasetProvenance from .._dataset_ref import DatasetRef from .._dataset_type import DatasetType from .._storage_class import StorageClass @@ -626,7 +627,9 @@ def prepare_get_for_external_client(self, ref: DatasetRef) -> object | None: raise NotImplementedError() @abstractmethod - def put(self, inMemoryDataset: Any, datasetRef: DatasetRef) -> None: + def put( + self, inMemoryDataset: Any, datasetRef: DatasetRef, provenance: DatasetProvenance | None = None + ) -> None: """Write a `InMemoryDataset` with a given `DatasetRef` to the store. Parameters @@ -635,6 +638,9 @@ def put(self, inMemoryDataset: Any, datasetRef: DatasetRef) -> None: The Dataset to store. datasetRef : `DatasetRef` Reference to the associated Dataset. + provenance : `DatasetProvenance` or `None`, optional + Any provenance that should be attached to the serialized dataset. + Not supported by all serialization mechanisms. """ raise NotImplementedError("Must be implemented by subclass") @@ -1449,7 +1455,9 @@ def get( ) -> Any: raise FileNotFoundError("This is a no-op datastore that can not access a real datastore") - def put(self, inMemoryDataset: Any, datasetRef: DatasetRef) -> None: + def put( + self, inMemoryDataset: Any, datasetRef: DatasetRef, provenance: DatasetProvenance | None = None + ) -> None: raise NotImplementedError("This is a no-op datastore that can not access a real datastore") def put_new(self, in_memory_dataset: Any, ref: DatasetRef) -> Mapping[str, DatasetRef]: diff --git a/python/lsst/daf/butler/datastores/chainedDatastore.py b/python/lsst/daf/butler/datastores/chainedDatastore.py index d2f1859fa7..5a98b77ac3 100644 --- a/python/lsst/daf/butler/datastores/chainedDatastore.py +++ b/python/lsst/daf/butler/datastores/chainedDatastore.py @@ -53,7 +53,7 @@ from lsst.utils import doImportType if TYPE_CHECKING: - from lsst.daf.butler import Config, DatasetType, LookupKey, StorageClass + from lsst.daf.butler import Config, DatasetProvenance, DatasetType, LookupKey, StorageClass from lsst.daf.butler.registry.interfaces import DatasetIdRef, DatastoreRegistryBridgeManager from lsst.resources import ResourcePathExpression @@ -420,7 +420,7 @@ def _get_matching_datastore(self, ref: DatasetRef) -> Datastore | None: return None - def put(self, inMemoryDataset: Any, ref: DatasetRef) -> None: + def put(self, inMemoryDataset: Any, ref: DatasetRef, provenance: DatasetProvenance | None = None) -> None: """Write a InMemoryDataset with a given `DatasetRef` to each datastore. @@ -435,6 +435,9 @@ def put(self, inMemoryDataset: Any, ref: DatasetRef) -> None: The dataset to store. ref : `DatasetRef` Reference to the associated Dataset. + provenance : `DatasetProvenance` or `None`, optional + Any provenance that should be attached to the serialized dataset. + Not supported by all serialization mechanisms. Raises ------ @@ -468,7 +471,7 @@ def put(self, inMemoryDataset: Any, ref: DatasetRef) -> None: else: npermanent += 1 try: - datastore.put(inMemoryDataset, ref) + datastore.put(inMemoryDataset, ref, provenance=provenance) nsuccess += 1 if not datastore.isEphemeral: isPermanent = True diff --git a/python/lsst/daf/butler/datastores/fileDatastore.py b/python/lsst/daf/butler/datastores/fileDatastore.py index b3fa21266f..6d96302874 100644 --- a/python/lsst/daf/butler/datastores/fileDatastore.py +++ b/python/lsst/daf/butler/datastores/fileDatastore.py @@ -108,7 +108,7 @@ from sqlalchemy import BigInteger, String if TYPE_CHECKING: - from lsst.daf.butler import LookupKey + from lsst.daf.butler import DatasetProvenance, LookupKey from lsst.daf.butler.registry.interfaces import DatasetIdRef, DatastoreRegistryBridgeManager log = getLogger(__name__) @@ -830,13 +830,17 @@ def _prepare_for_direct_get( parameters=parameters, ) - def _determine_put_formatter_location(self, ref: DatasetRef) -> tuple[Location, Formatter | FormatterV2]: + def _determine_put_formatter_location( + self, ref: DatasetRef, provenance: DatasetProvenance | None = None + ) -> tuple[Location, Formatter | FormatterV2]: """Calculate the formatter and output location to use for put. Parameters ---------- ref : `DatasetRef` Reference to the associated Dataset. + provenance : `DatasetProvenance` + Any provenance that should be attached to the serialized dataset. Returns ------- @@ -865,6 +869,7 @@ def _determine_put_formatter_location(self, ref: DatasetRef) -> tuple[Location, FileDescriptor(location, storageClass=storageClass, component=ref.datasetType.component()), dataId=ref.dataId, ref=ref, + provenance=provenance, ) except KeyError as e: raise DatasetTypeNotSupportedError( @@ -1275,7 +1280,9 @@ def _calculate_ingested_datastore_name( return location - def _write_in_memory_to_artifact(self, inMemoryDataset: Any, ref: DatasetRef) -> StoredFileInfo: + def _write_in_memory_to_artifact( + self, inMemoryDataset: Any, ref: DatasetRef, provenance: DatasetProvenance | None = None + ) -> StoredFileInfo: """Write out in memory dataset to datastore. Parameters @@ -1284,6 +1291,9 @@ def _write_in_memory_to_artifact(self, inMemoryDataset: Any, ref: DatasetRef) -> Dataset to write to datastore. ref : `DatasetRef` Registry information associated with this dataset. + provenance : `DatasetProvenance` or `None`, optional + Any provenance that should be attached to the serialized dataset. + Not supported by all formatters. Returns ------- @@ -1302,7 +1312,7 @@ def _write_in_memory_to_artifact(self, inMemoryDataset: Any, ref: DatasetRef) -> f"Dataset {ref} has been rejected by this datastore via configuration." ) - location, formatter = self._determine_put_formatter_location(ref) + location, formatter = self._determine_put_formatter_location(ref, provenance) # The external storage class can differ from the registry storage # class AND the given in-memory dataset might not match any of the @@ -2313,7 +2323,7 @@ def prepare_get_for_external_client(self, ref: DatasetRef) -> FileDatastoreGetPa ) @transactional - def put(self, inMemoryDataset: Any, ref: DatasetRef) -> None: + def put(self, inMemoryDataset: Any, ref: DatasetRef, provenance: DatasetProvenance | None = None) -> None: """Write a InMemoryDataset with a given `DatasetRef` to the store. Parameters @@ -2322,6 +2332,9 @@ def put(self, inMemoryDataset: Any, ref: DatasetRef) -> None: The dataset to store. ref : `DatasetRef` Reference to the associated Dataset. + provenance : `DatasetProvenance` or `None`, optional + Any provenance that should be attached to the serialized dataset. + Can be ignored by a formatter or delegate. Raises ------ @@ -2358,11 +2371,12 @@ def put(self, inMemoryDataset: Any, ref: DatasetRef) -> None: # DatasetType does not refer to the types of components # So we construct one ourselves. compRef = ref.makeComponentRef(component) + # Provenance has already been attached above. storedInfo = self._write_in_memory_to_artifact(componentInfo.component, compRef) artifacts.append((compRef, storedInfo)) else: # Write the entire thing out - storedInfo = self._write_in_memory_to_artifact(inMemoryDataset, ref) + storedInfo = self._write_in_memory_to_artifact(inMemoryDataset, ref, provenance=provenance) artifacts.append((ref, storedInfo)) self._register_datasets(artifacts, insert_mode=DatabaseInsertMode.INSERT) diff --git a/python/lsst/daf/butler/datastores/inMemoryDatastore.py b/python/lsst/daf/butler/datastores/inMemoryDatastore.py index ecd0e1b9ff..a9d53701d1 100644 --- a/python/lsst/daf/butler/datastores/inMemoryDatastore.py +++ b/python/lsst/daf/butler/datastores/inMemoryDatastore.py @@ -48,7 +48,7 @@ from lsst.resources import ResourcePath, ResourcePathExpression if TYPE_CHECKING: - from lsst.daf.butler import Config, DatasetType, LookupKey + from lsst.daf.butler import Config, DatasetProvenance, DatasetType, LookupKey from lsst.daf.butler.datastore import DatastoreOpaqueTable from lsst.daf.butler.datastores.file_datastore.retrieve_artifacts import ArtifactIndexInfo from lsst.daf.butler.registry.interfaces import DatasetIdRef, DatastoreRegistryBridgeManager @@ -374,7 +374,7 @@ def get( # Last minute type conversion. return refStorageClass.coerce_type(inMemoryDataset) - def put(self, inMemoryDataset: Any, ref: DatasetRef) -> None: + def put(self, inMemoryDataset: Any, ref: DatasetRef, provenance: DatasetProvenance | None = None) -> None: """Write a InMemoryDataset with a given `DatasetRef` to the store. Parameters @@ -383,6 +383,8 @@ def put(self, inMemoryDataset: Any, ref: DatasetRef) -> None: The dataset to store. ref : `DatasetRef` Reference to the associated Dataset. + provenance : `DatasetProvenance` or `None`, optional + Any provenance that should be attached to the serialized dataset. Raises ------ @@ -415,6 +417,10 @@ def put(self, inMemoryDataset: Any, ref: DatasetRef) -> None: if not delegate or not delegate.can_accept(inMemoryDataset): inMemoryDataset = ref.datasetType.storageClass.coerce_type(inMemoryDataset) + # Update provenance. + if delegate: + inMemoryDataset = delegate.add_provenance(inMemoryDataset, ref) + self.datasets[ref.id] = inMemoryDataset log.debug("Store %s in %s", ref, self.name) diff --git a/python/lsst/daf/butler/direct_butler/_direct_butler.py b/python/lsst/daf/butler/direct_butler/_direct_butler.py index 4095fd2af5..973ef25a77 100644 --- a/python/lsst/daf/butler/direct_butler/_direct_butler.py +++ b/python/lsst/daf/butler/direct_butler/_direct_butler.py @@ -90,6 +90,7 @@ if TYPE_CHECKING: from lsst.resources import ResourceHandleProtocol + from .._dataset_provenance import DatasetProvenance from .._dataset_ref import DatasetId from ..datastore import DatasetRefURIs from ..dimensions import DataId, DataIdValue, DimensionElement, DimensionRecord, DimensionUniverse @@ -945,6 +946,7 @@ def put( dataId: DataId | None = None, *, run: str | None = None, + provenance: DatasetProvenance | None = None, **kwargs: Any, ) -> DatasetRef: """Store and register a dataset. @@ -964,6 +966,9 @@ def put( run : `str`, optional The name of the run the dataset should be added to, overriding ``self.run``. Not used if a resolved `DatasetRef` is provided. + provenance : `DatasetProvenance` or `None`, optional + Any provenance that should be attached to the serialized dataset. + Not supported by all serialization mechanisms. **kwargs Additional keyword arguments used to augment or construct a `DataCoordinate`. See `DataCoordinate.standardize` @@ -998,7 +1003,7 @@ def put( # with another write, the content of stored data may be # unpredictable. try: - self._datastore.put(obj, datasetRefOrType) + self._datastore.put(obj, datasetRefOrType, provenance=provenance) except IntegrityError as e: raise ConflictingDefinitionError(f"Datastore already contains dataset: {e}") from e return datasetRefOrType @@ -1014,7 +1019,7 @@ def put( # Add Registry Dataset entry. dataId = self._registry.expandDataId(dataId, dimensions=datasetType.dimensions, **kwargs) (ref,) = self._registry.insertDatasets(datasetType, run=run, dataIds=[dataId]) - self._datastore.put(obj, ref) + self._datastore.put(obj, ref, provenance=provenance) return ref diff --git a/python/lsst/daf/butler/remote_butler/_remote_butler.py b/python/lsst/daf/butler/remote_butler/_remote_butler.py index 9bcb824551..52d9689188 100644 --- a/python/lsst/daf/butler/remote_butler/_remote_butler.py +++ b/python/lsst/daf/butler/remote_butler/_remote_butler.py @@ -86,6 +86,7 @@ ) if TYPE_CHECKING: + from .._dataset_provenance import DatasetProvenance from .._file_dataset import FileDataset from .._limited_butler import LimitedButler from .._timespan import Timespan @@ -238,6 +239,7 @@ def put( dataId: DataId | None = None, *, run: str | None = None, + provenance: DatasetProvenance | None = None, **kwargs: Any, ) -> DatasetRef: # Docstring inherited. diff --git a/python/lsst/daf/butler/tests/hybrid_butler.py b/python/lsst/daf/butler/tests/hybrid_butler.py index 2d60a5a447..a517723204 100644 --- a/python/lsst/daf/butler/tests/hybrid_butler.py +++ b/python/lsst/daf/butler/tests/hybrid_butler.py @@ -37,6 +37,7 @@ from .._butler import Butler from .._butler_collections import ButlerCollections from .._dataset_existence import DatasetExistence +from .._dataset_provenance import DatasetProvenance from .._dataset_ref import DatasetId, DatasetRef from .._dataset_type import DatasetType from .._deferredDatasetHandle import DeferredDatasetHandle @@ -92,9 +93,12 @@ def put( dataId: DataId | None = None, *, run: str | None = None, + provenance: DatasetProvenance | None = None, **kwargs: Any, ) -> DatasetRef: - return self._direct_butler.put(obj, datasetRefOrType, dataId, run=run, **kwargs) + return self._direct_butler.put( + obj, datasetRefOrType, dataId, run=run, provenance=provenance, **kwargs + ) def getDeferred( self,