Skip to content

Commit

Permalink
Extract dataset query logic from CLI
Browse files Browse the repository at this point in the history
Pull out the "query all datasets" logic from the query-datasets CLI command to a separate function.  In an upcoming commit this will be used to implement `Butler.query_all_datasets`.
  • Loading branch information
dhirving committed Oct 29, 2024
1 parent 6333457 commit 07a53c3
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 71 deletions.
2 changes: 2 additions & 0 deletions python/lsst/daf/butler/_butler_collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,8 @@ def _group_by_dataset_type(
Mapping of the dataset type name to its corresponding list of
collection names.
"""
# Although this is marked as private, it is called from outside this
# class by other functions internal to daf_butler.
dataset_type_collections: dict[str, list[str]] = defaultdict(list)
for info in collection_infos:
if info.dataset_types is None:
Expand Down
186 changes: 186 additions & 0 deletions python/lsst/daf/butler/_query_all_datasets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
# This file is part of daf_butler.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (http://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This software is dual licensed under the GNU General Public License and also
# under a 3-clause BSD license. Recipients may choose which of these licenses
# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
# respectively. If you choose the GPL option then the following text applies
# (but note that there is still no warranty even if you opt for BSD instead):
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from __future__ import annotations

import logging
from collections.abc import Iterable, Iterator, Mapping
from typing import Any, NamedTuple

from lsst.utils.iteration import ensure_iterable

from ._butler import Butler
from ._dataset_ref import DatasetRef
from ._exceptions import InvalidQueryError, MissingDatasetTypeError
from .dimensions import DataId
from .utils import has_globs

_LOG = logging.getLogger(__name__)


class DatasetsPage(NamedTuple):
dataset_type: str
data: list[DatasetRef]


def query_all_datasets(
butler: Butler,
*,
collections: str | Iterable[str] | None = None,
name: str | Iterable[str] = "*",
find_first: bool = True,
data_id: DataId | None = None,
where: str = "",
bind: Mapping[str, Any] | None = None,
with_dimension_records: bool = False,
limit: int | None = None,
order_by: Iterable[str] | str | None = None,
**kwargs: Any,
) -> Iterator[DatasetsPage]:
"""Query for dataset refs from multiple types simultaneously.
Parameters
----------
butler : `Butler`
Butler instance to use for executing queries.
collections : `str` or `~collections.abc.Iterable` [ `str` ], optional
The collection or collections to search, in order. If not provided
or `None`, the default collection search path for this butler is
used.
name : `str` or `~collections.abc.Iterable` [ `str` ], optional
Names or name patterns (glob-style) that returned dataset type
names must match. If an iterable, items are OR'd together. The
default is to include all dataset types in the given collections.
find_first : `bool`, optional
If `True` (default), for each result data ID, only yield one
`DatasetRef` of each `DatasetType`, from the first collection in
which a dataset of that dataset type appears (according to the
order of ``collections`` passed in).
data_id : `dict` or `DataCoordinate`, optional
A data ID whose key-value pairs are used as equality constraints in
the query.
where : `str`, optional
A string expression similar to a SQL WHERE clause. May involve any
column of a dimension table or (as a shortcut for the primary key
column of a dimension table) dimension name. See
:ref:`daf_butler_dimension_expressions` for more information.
bind : `~collections.abc.Mapping`, optional
Mapping containing literal values that should be injected into the
``where`` expression, keyed by the identifiers they replace. Values
of collection type can be expanded in some cases; see
:ref:`daf_butler_dimension_expressions_identifiers` for more
information.
with_dimension_records : `bool`, optional
If `True` (default is `False`) then returned data IDs will have
dimension records.
limit : `int` or `None`, optional
Upper limit on the number of returned records. `None` can be used
if no limit is wanted. A limit of ``0`` means that the query will
be executed and validated but no results will be returned.
order_by : `~collections.abc.Iterable` [`str`] or `str`, optional
Names of the columns/dimensions to use for ordering returned data
IDs. Column name can be prefixed with minus (``-``) to use
descending ordering. Results are ordered only within each dataset
type, they are not globally ordered across all results.
**kwargs
Additional keyword arguments are forwarded to
`DataCoordinate.standardize` when processing the ``data_id``
argument (and may be used to provide a constraining data ID even
when the ``data_id`` argument is `None`).
Raises
------
MissingDatasetTypeError
When no dataset types match ``name``, or an explicit (non-glob)
dataset type in ``name`` does not exist.
InvalidQueryError
If the parameters to the query are inconsistent or malformed.
Returns
-------
pages : `~collections.abc.Iterator` [ `DatasetsPage` ]
`DatasetRef` results matching the given query criteria, grouped by
dataset type.
"""
if collections is None:
collections = butler.collections.defaults
else:
collections = list(ensure_iterable(collections))
if find_first and has_globs(collections):
raise InvalidQueryError("Can not use wildcards in collections when find_first=True")

missing_types: list[str] = []
dataset_type_query = list(ensure_iterable(name))
dataset_types = set(butler.registry.queryDatasetTypes(dataset_type_query, missing=missing_types))
if len(dataset_types) == 0:
raise MissingDatasetTypeError(f"No dataset types found for query {dataset_type_query}")
if len(missing_types) > 0:
raise MissingDatasetTypeError(f"Dataset types not found: {missing_types}")

# Expand the collections query and include summary information.
query_collections_info = butler.collections.query_info(
collections,
include_summary=True,
flatten_chains=True,
include_chains=False,
summary_datasets=dataset_types,
)

# Only iterate over dataset types that are relevant for the query.
dataset_type_names = {dataset_type.name for dataset_type in dataset_types}
dataset_type_collections = butler.collections._group_by_dataset_type(
dataset_type_names, query_collections_info
)
n_dataset_types = len(dataset_types)
if (n_filtered := len(dataset_type_collections)) != n_dataset_types:
_LOG.debug("Filtered %d dataset types down to %d", n_dataset_types, n_filtered)
else:
_LOG.debug("Processing %d dataset type%s", n_dataset_types, "" if n_dataset_types == 1 else "s")

for dt, filtered_collections in sorted(dataset_type_collections.items()):
_LOG.debug("Querying dataset type %s", dt)
results = butler.query_datasets(
dt,
collections=filtered_collections,
find_first=find_first,
with_dimension_records=with_dimension_records,
data_id=data_id,
order_by=order_by,
explain=False,
where=where,
bind=bind,
limit=limit,
**kwargs,
)

yield DatasetsPage(dataset_type=dt, data=results)

if limit is not None:
# Track how much of the limit has been used up by each query.
limit -= len(results)
if limit <= 0:
break
100 changes: 32 additions & 68 deletions python/lsst/daf/butler/script/queryDatasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@
import warnings
from collections import defaultdict
from collections.abc import Iterable, Iterator
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING

import numpy as np
from astropy.table import Table as AstropyTable

from .._butler import Butler
from .._exceptions import MissingDatasetTypeError
from .._query_all_datasets import query_all_datasets
from ..cli.utils import sortAstropyTable
from ..utils import has_globs

if TYPE_CHECKING:
from lsst.daf.butler import DatasetRef
Expand Down Expand Up @@ -256,83 +257,46 @@ def getDatasets(self) -> Iterator[list[DatasetRef]]:
Dataset references matching the given query criteria grouped
by dataset type.
"""
datasetTypes = self._dataset_type_glob
query_collections: Iterable[str] = self._collections_wildcard or ["*"]

# Currently need to use old interface to get all the matching
# dataset types and loop over the dataset types executing a new
# query each time.
dataset_types = set(self.butler.registry.queryDatasetTypes(datasetTypes or ...))
n_dataset_types = len(dataset_types)
if n_dataset_types == 0:
_LOG.info("The given dataset type, %s, is not known to this butler.", datasetTypes)
return

# Expand the collections query and include summary information.
query_collections_info = self.butler.collections.query_info(
query_collections,
include_summary=True,
flatten_chains=True,
include_chains=False,
summary_datasets=dataset_types,
)
expanded_query_collections = [c.name for c in query_collections_info]
if self._find_first and has_globs(query_collections):
raise RuntimeError("Can not use wildcards in collections when find_first=True")
query_collections = expanded_query_collections

# Only iterate over dataset types that are relevant for the query.
dataset_type_names = {dataset_type.name for dataset_type in dataset_types}
dataset_type_collections = self.butler.collections._group_by_dataset_type(
dataset_type_names, query_collections_info
)

if (n_filtered := len(dataset_type_collections)) != n_dataset_types:
_LOG.info("Filtered %d dataset types down to %d", n_dataset_types, n_filtered)
else:
_LOG.info("Processing %d dataset type%s", n_dataset_types, "" if n_dataset_types == 1 else "s")

# Accumulate over dataset types.
limit = self._limit
warn_limit = False
unlimited = True if limit == 0 else False
if limit < 0:
# Must track this limit in the loop rather than relying on
# butler.query_datasets() because this loop knows there are more
# possible dataset types to query.
if self._limit < 0:
# Negative limit means we should warn if the limit is exceeded.
warn_limit = True
limit = abs(limit) + 1 # +1 to tell us we hit the limit.
for dt, collections in sorted(dataset_type_collections.items()):
kwargs: dict[str, Any] = {}
if self._where:
kwargs["where"] = self._where
# API uses 0 to mean "check query but return nothing" and None
# to mean "unlimited".
kwargs["limit"] = None if unlimited else limit
_LOG.debug("Querying dataset type %s with %s", dt, kwargs)
results = self.butler.query_datasets(
dt,
collections=collections,
limit = abs(self._limit) + 1 # +1 to tell us we hit the limit.
elif self._limit == 0:
# 0 means 'unlimited' in the CLI.
limit = None
else:
limit = self._limit

try:
pages = query_all_datasets(
self.butler,
collections=query_collections,
find_first=self._find_first,
name=self._dataset_type_glob,
with_dimension_records=True,
where=self._where,
limit=limit,
order_by=self._order_by,
explain=False,
**kwargs,
)
if not unlimited:
limit -= len(results)
if warn_limit and limit == 0 and results:
datasets_found = 0
for dataset_type, refs in pages:
datasets_found += len(refs)
if warn_limit and limit is not None and datasets_found >= limit:
# We asked for one too many so must remove that from
# the list.
results.pop(-1)
_LOG.debug("Got %d results for dataset type %s", len(results), dt)
yield results

if not unlimited and limit == 0:
if warn_limit:
refs.pop(-1)
_LOG.warning(
"Requested limit of %d hit for number of datasets returned. "
"Use --limit to increase this limit.",
self._limit,
limit,
)
break

yield refs

_LOG.debug("Got %d results for dataset type %s", len(refs), dataset_type)
except MissingDatasetTypeError as e:
_LOG.info(str(e))
return
6 changes: 3 additions & 3 deletions tests/test_cliCmdQueryDatasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import unittest

from astropy.table import Table as AstropyTable
from lsst.daf.butler import CollectionType, StorageClassFactory, script
from lsst.daf.butler import CollectionType, InvalidQueryError, StorageClassFactory, script
from lsst.daf.butler.tests import addDatasetType
from lsst.daf.butler.tests.utils import ButlerTestHelper, MetricTestRepo, makeTestTempDir, removeTestTempDir
from lsst.resources import ResourcePath
Expand Down Expand Up @@ -287,7 +287,7 @@ def testWhere(self):

self.assertAstropyTablesEqual(tables, expectedTables, filterColumns=True)

with self.assertRaises(RuntimeError):
with self.assertRaises(InvalidQueryError):
self._queryDatasets(repo=testRepo.butler, collections="*", find_first=True, glob="*")

def testGlobDatasetType(self):
Expand Down Expand Up @@ -630,7 +630,7 @@ def testFindFirstAndCollections(self):
self.assertAstropyTablesEqual(tables, expectedTables, filterColumns=True)

# Verify that globs are not supported with find_first=True.
with self.assertRaises(RuntimeError):
with self.assertRaises(InvalidQueryError):
self._queryDatasets(
repo=testRepo.butler, collections=["*"], show_uri=True, find_first=True, glob="*"
)
Expand Down

0 comments on commit 07a53c3

Please sign in to comment.