Skip to content

Commit

Permalink
Add initial implementation of query_all_datasets
Browse files Browse the repository at this point in the history
Add a method for querying multiple dataset types simultaneously, currently hidden as `Butler._query_all_datasets`.

This implementation uses the existing logic from the query-datasets CLI for doing the search.
  • Loading branch information
dhirving committed Nov 4, 2024
1 parent a23c5b2 commit e2f48d5
Show file tree
Hide file tree
Showing 5 changed files with 255 additions and 16 deletions.
116 changes: 109 additions & 7 deletions python/lsst/daf/butler/_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1729,13 +1729,6 @@ def query_datasets(
collection wildcard is passed when ``find_first`` is `True`, or
when ``collections`` is `None` and default butler collections are
not defined.
Notes
-----
When multiple dataset types are queried in a single call, the results
of this operation are equivalent to querying for each dataset type
separately in turn, and no information about the relationships between
datasets of different types is included.
"""
if data_id is None:
data_id = DataCoordinate.make_empty(self.dimensions)
Expand Down Expand Up @@ -1878,6 +1871,115 @@ def query_dimension_records(
raise EmptyQueryResultError(list(result.explain_no_results()))
return dimension_records

def _query_all_datasets(
self,
collections: str | Iterable[str] | None = None,
*,
name: str | Iterable[str] = "*",
find_first: bool = True,
data_id: DataId | None = None,
where: str = "",
bind: Mapping[str, Any] | None = None,
limit: int | None = -20_000,
**kwargs: Any,
) -> list[DatasetRef]:
"""Query for datasets of potentially multiple types.
Parameters
----------
collections : `str` or `~collections.abc.Iterable` [ `str` ], optional
The collection or collections to search, in order. If not provided
or `None`, the default collection search path for this butler is
used.
name : `str` or `~collections.abc.Iterable` [ `str` ], optional
Names or name patterns (glob-style) that returned dataset type
names must match. If an iterable, items are OR'd together. The
default is to include all dataset types in the given collections.
find_first : `bool`, optional
If `True` (default), for each result data ID, only yield one
`DatasetRef` of each `DatasetType`, from the first collection in
which a dataset of that dataset type appears (according to the
order of ``collections`` passed in).
data_id : `dict` or `DataCoordinate`, optional
A data ID whose key-value pairs are used as equality constraints in
the query.
where : `str`, optional
A string expression similar to a SQL WHERE clause. May involve any
column of a dimension table or (as a shortcut for the primary key
column of a dimension table) dimension name. See
:ref:`daf_butler_dimension_expressions` for more information.
bind : `~collections.abc.Mapping`, optional
Mapping containing literal values that should be injected into the
``where`` expression, keyed by the identifiers they replace. Values
of collection type can be expanded in some cases; see
:ref:`daf_butler_dimension_expressions_identifiers` for more
information.
limit : `int` or `None`, optional
Upper limit on the number of returned records. `None` can be used
if no limit is wanted. A limit of ``0`` means that the query will
be executed and validated but no results will be returned.
If a negative value is given a warning will be issued if the number
of results is capped by that limit. If no limit is provided, by
default a maximum of 20,000 records will be returned.
**kwargs
Additional keyword arguments are forwarded to
`DataCoordinate.standardize` when processing the ``data_id``
argument (and may be used to provide a constraining data ID even
when the ``data_id`` argument is `None`).
Raises
------
MissingDatasetTypeError
When no dataset types match ``name``, or an explicit (non-glob)
dataset type in ``name`` does not exist.
InvalidQueryError
If the parameters to the query are inconsistent or malformed.
MissingCollectionError
If a given collection is not found.
Returns
-------
refs : `list` [ `DatasetRef` ]
Dataset references matching the given query criteria. Nested data
IDs are guaranteed to include values for all implied dimensions
(i.e. `DataCoordinate.hasFull` will return `True`), but will not
include dimension records (`DataCoordinate.hasRecords` will be
`False`).
"""
from ._query_all_datasets import query_all_datasets

if collections is None:
collections = list(self.collections.defaults)
else:
collections = list(ensure_iterable(collections))

warn_limit = False
if limit is not None and limit < 0:
# Add one to the limit so we can detect if we have exceeded it.
limit = abs(limit) + 1
warn_limit = True

result = []
for page in query_all_datasets(
self,
collections=collections,
name=name,
find_first=find_first,
data_id=data_id,
where=where,
limit=limit,
bind=bind,
**kwargs,
):
result.extend(page.data)

if warn_limit and limit is not None and len(result) >= limit:
# Remove the extra dataset we added for the limit check.
result.pop()
_LOG.warning("More datasets are available than the requested limit of %d.", limit - 1)

return result

def clone(
self,
*,
Expand Down
10 changes: 3 additions & 7 deletions python/lsst/daf/butler/_query_all_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class DatasetsPage(NamedTuple):
def query_all_datasets(
butler: Butler,
*,
collections: str | Iterable[str] | None = None,
collections: list[str],
name: str | Iterable[str] = "*",
find_first: bool = True,
data_id: DataId | None = None,
Expand All @@ -69,8 +69,8 @@ def query_all_datasets(
----------
butler : `Butler`
Butler instance to use for executing queries.
collections : `str` or `~collections.abc.Iterable` [ `str` ], optional
The collection or collections to search, in order. If not provided
collections : `list` [ `str` ]
The collections to search, in order. If not provided
or `None`, the default collection search path for this butler is
used.
name : `str` or `~collections.abc.Iterable` [ `str` ], optional
Expand Down Expand Up @@ -128,10 +128,6 @@ def query_all_datasets(
`DatasetRef` results matching the given query criteria, grouped by
dataset type.
"""
if collections is None:
collections = list(butler.collections.defaults)
else:
collections = list(ensure_iterable(collections))
if find_first and has_globs(collections):
raise InvalidQueryError("Can not use wildcards in collections when find_first=True")

Expand Down
2 changes: 1 addition & 1 deletion python/lsst/daf/butler/script/queryDatasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ def getDatasets(self) -> Iterator[list[DatasetRef]]:
Dataset references matching the given query criteria grouped
by dataset type.
"""
query_collections: Iterable[str] = self._collections_wildcard or ["*"]
query_collections = self._collections_wildcard or ["*"]

warn_limit = False
if self._limit < 0:
Expand Down
136 changes: 135 additions & 1 deletion python/lsst/daf/butler/tests/butler_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,14 @@

from .._butler import Butler
from .._collection_type import CollectionType
from .._dataset_ref import DatasetRef
from .._dataset_type import DatasetType
from .._exceptions import EmptyQueryResultError, InvalidQueryError
from .._exceptions import (
EmptyQueryResultError,
InvalidQueryError,
MissingCollectionError,
MissingDatasetTypeError,
)
from .._timespan import Timespan
from ..dimensions import DataCoordinate, DimensionRecord
from ..direct_query_driver import DirectQueryDriver
Expand Down Expand Up @@ -2007,6 +2013,130 @@ def test_unusual_column_literals(self) -> None:
names = [x.full_name for x in result]
self.assertEqual(names, ["Ba"])

def test_query_all_datasets(self) -> None:
butler = self.make_butler("base.yaml", "datasets.yaml")

# Make sure that refs are coming out well-formed.
datasets = butler._query_all_datasets("imported_r", where="detector = 2", instrument="Cam1")
datasets.sort(key=lambda ref: ref.datasetType.name)
self.assertEqual(len(datasets), 2)
bias = datasets[0]
self.assertEqual(bias.datasetType.name, "bias")
self.assertEqual(bias.dataId["instrument"], "Cam1")
self.assertEqual(bias.dataId["detector"], 2)
self.assertEqual(bias.run, "imported_r")
self.assertEqual(bias.id, UUID("87f3e68d-258d-41b7-8ea5-edf3557ccb30"))
flat = datasets[1]
self.assertEqual(flat.datasetType.name, "flat")
self.assertEqual(flat.dataId["instrument"], "Cam1")
self.assertEqual(flat.dataId["detector"], 2)
self.assertEqual(flat.dataId["physical_filter"], "Cam1-R1")
self.assertEqual(flat.dataId["band"], "r")
self.assertEqual(flat.run, "imported_r")
self.assertEqual(flat.id, UUID("c1296796-56c5-4acf-9b49-40d920c6f840"))

# Querying for everything finds everything.
results = butler._query_all_datasets("*", find_first=False)
self.assertEqual(len(results), 13)

# constraining by data ID works
detector_1_ids = ("d0bb04cd-d697-4a83-ba53-cdfcd58e3a0c", "e15ab039-bc8b-4135-87c5-90902a7c0b22")
results = butler._query_all_datasets(
"*", data_id={"detector": 1, "instrument": "Cam1"}, find_first=False
)
self.assertCountEqual(detector_1_ids, _ref_uuids(results))

# bind values work.
results = butler._query_all_datasets(
"*", where="detector=my_bind and instrument='Cam1'", bind={"my_bind": 1}, find_first=False
)
self.assertCountEqual(detector_1_ids, _ref_uuids(results))

# find_first requires ordered collections.
with self.assertRaisesRegex(InvalidQueryError, "Can not use wildcards"):
results = butler._query_all_datasets("*")

butler.collections.register("chain", CollectionType.CHAINED)
butler.collections.redefine_chain("chain", ["imported_g", "imported_r"])
results = butler._query_all_datasets(
"chain", where="detector=2 and instrument = 'Cam1'", find_first=True
)
# find_first searches the collection chain in order.
self.assertCountEqual(
_ref_uuids(results),
[
"51352db4-a47a-447c-b12d-a50b206b17cd", # imported_g bias
"60c8a65c-7290-4c38-b1de-e3b1cdcf872d", # imported_g flat
"c1296796-56c5-4acf-9b49-40d920c6f840", # imported_r flat
# There is also a bias dataset with detector=2 in imported_r,
# but it is masked by the presence of the same data ID in
# imported_g.
],
)

# collection searches work.
results = butler._query_all_datasets(
"*g", where="detector=1 and instrument = 'Cam1'", find_first=False
)
self.assertEqual(_ref_uuids(results), ["e15ab039-bc8b-4135-87c5-90902a7c0b22"])

# we raise for missing collections with explicit names.
with self.assertRaises(MissingCollectionError):
results = butler._query_all_datasets("nonexistent")
# we don't raise for collection wildcard searches that find nothing.
results = butler._query_all_datasets("nonexistent*", find_first=False)
self.assertEqual(results, [])

# dataset type searches work.
results = butler._query_all_datasets(
"*", name="b*", where="detector=1 and instrument = 'Cam1'", find_first=False
)
self.assertEqual(_ref_uuids(results), ["e15ab039-bc8b-4135-87c5-90902a7c0b22"])

# Missing dataset types raise.
with self.assertRaises(MissingDatasetTypeError):
results = butler._query_all_datasets("chain", name=["notfound", "imported_g"])
with self.assertRaises(MissingDatasetTypeError):
results = butler._query_all_datasets("chain", name="notfound*")

# Limit of 3 lands at the boundary of a dataset type.
# Limit of 4 is in the middle of a dataset type.
for limit in [3, 4]:
with self.subTest(limit=limit):
results = butler._query_all_datasets("imported_g", limit=limit)
self.assertEqual(len(results), limit)
with self.assertLogs(level="WARNING") as log:
results = butler._query_all_datasets("imported_g", limit=-limit)
self.assertEqual(len(results), limit)
self.assertIn("requested limit", log.output[0])

results = butler._query_all_datasets("imported_g", limit=0)
self.assertEqual(len(results), 0)

# 'where' constraints that don't apply to all dataset types follow the
# same rules as query_datasets.
results = butler._query_all_datasets(
"*", where="detector = 2 and band = 'g' and instrument = 'Cam1'", find_first=False
)
self.assertCountEqual(
_ref_uuids(results),
[
# bias does not have 'band'
"51352db4-a47a-447c-b12d-a50b206b17cd",
"87f3e68d-258d-41b7-8ea5-edf3557ccb30",
# flat does have 'band', and we filter based on it
"60c8a65c-7290-4c38-b1de-e3b1cdcf872d",
],
)

# Default collections and data ID apply.
butler.registry.defaults = RegistryDefaults(collections="imported_g")
results = butler._query_all_datasets(where="detector = 2")
self.assertCountEqual(
_ref_uuids(results),
["51352db4-a47a-447c-b12d-a50b206b17cd", "60c8a65c-7290-4c38-b1de-e3b1cdcf872d"],
)


def _get_exposure_ids_from_dimension_records(dimension_records: Iterable[DimensionRecord]) -> list[int]:
output = []
Expand All @@ -2016,3 +2146,7 @@ def _get_exposure_ids_from_dimension_records(dimension_records: Iterable[Dimensi
output.append(id)

return output


def _ref_uuids(refs: list[DatasetRef]) -> list[str]:
return [str(ref.id) for ref in refs]
7 changes: 7 additions & 0 deletions tests/test_cliCmdQueryDatasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,13 @@ def test_limit_order(self):
]
self.assertAstropyTablesEqual(tables, expectedTables, filterColumns=True)

# Same as previous test, but with positive limit so no warning is
# issued.
tables = self._queryDatasets(
repo=testRepo.butler, limit=1, order_by=("visit"), collections="*", glob="*"
)
self.assertAstropyTablesEqual(tables, expectedTables, filterColumns=True)

with self.assertLogs("lsst.daf.butler.script.queryDatasets", level="WARNING") as cm:
tables = self._queryDatasets(
repo=testRepo.butler, limit=-1, order_by=("-visit"), collections="*", glob="*"
Expand Down

0 comments on commit e2f48d5

Please sign in to comment.