From ab12144ff0af15aa3ce6c3a7106bda8c037489c5 Mon Sep 17 00:00:00 2001 From: Jim Bosch Date: Thu, 14 Mar 2024 15:08:39 -0400 Subject: [PATCH] More docs. DO NOT MERGE (squash instead). --- .../direct_query_driver/_convert_results.py | 29 +- .../daf/butler/direct_query_driver/_driver.py | 316 +++++++++-- .../direct_query_driver/_postprocessing.py | 123 ++++- .../direct_query_driver/_query_builder.py | 502 ++++++++++++++---- python/lsst/daf/butler/queries/_base.py | 16 +- 5 files changed, 794 insertions(+), 192 deletions(-) diff --git a/python/lsst/daf/butler/direct_query_driver/_convert_results.py b/python/lsst/daf/butler/direct_query_driver/_convert_results.py index bf0bda3948..13f5fe558a 100644 --- a/python/lsst/daf/butler/direct_query_driver/_convert_results.py +++ b/python/lsst/daf/butler/direct_query_driver/_convert_results.py @@ -48,12 +48,31 @@ def convert_dimension_record_results( next_key: PageKey | None, name_shrinker: NameShrinker, ) -> DimensionRecordResultPage: + """Convert a raw SQL result iterable into a page of `DimensionRecord` + query results. + + Parameters + ---------- + raw_rows : `~collections.abc.Iterable` [ `sqlalchemy.Row` ] + Iterable of SQLAlchemy rows, with `Postprocessing` filters already + applied. + spec : `DimensionRecordResultSpec` + Specification for result objects. + next_key : `PageKey` or `None` + Key for the next page to add into the returned page object. + name_shrinker : `NameShrinker` + Object used to ensure dataset type field names fit inside the database + engine's identifier size limit. Unnecessary for dimension fields, + which are restricted to fit in the identifier size limit when a + universe is initialized. + + Returns + ------- + result_page : `DimensionRecordResultPage` + Page object that holds a `DimensionRecord` container. + """ record_set = DimensionRecordSet(spec.element) - columns = spec.get_result_columns() - column_mapping = [ - (field, name_shrinker.shrink(columns.get_qualified_name(spec.element.name, field))) - for field in spec.element.schema.names - ] + column_mapping = [(field, field) for field in spec.element.schema.names] record_cls = spec.element.RecordClass if not spec.element.temporal: for raw_row in raw_rows: diff --git a/python/lsst/daf/butler/direct_query_driver/_driver.py b/python/lsst/daf/butler/direct_query_driver/_driver.py index a6e153a9e7..eb2c621987 100644 --- a/python/lsst/daf/butler/direct_query_driver/_driver.py +++ b/python/lsst/daf/butler/direct_query_driver/_driver.py @@ -32,7 +32,7 @@ __all__ = ("DirectQueryDriver",) import logging -from collections.abc import Iterable, Iterator, Mapping, Sequence +from collections.abc import Iterable, Iterator, Mapping, Sequence, Set from contextlib import ExitStack from typing import TYPE_CHECKING, Any, cast, overload @@ -97,6 +97,10 @@ class DirectQueryDriver(QueryDriver): raw_page_size : `int`, optional Number of database rows to fetch for each result page. The actual number of rows in a page may be smaller due to postprocessing. + constant_rows_limit : `int`, optional + Maximum number of uploaded rows to include in queries via + `Database.constant_rows`; above this limit a temporary table is used + instead. postprocessing_filter_factor : `int`, optional The number of database rows we expect to have to fetch to yield a single output row for queries that involve postprocessing. This is @@ -112,6 +116,7 @@ def __init__( managers: RegistryManagerInstances, defaults: RegistryDefaults, raw_page_size: int = 10000, + constant_rows_limit: int = 1000, postprocessing_filter_factor: int = 10, ): self.db = db @@ -119,10 +124,11 @@ def __init__( self._universe = universe self._defaults = defaults self._materializations: dict[qt.MaterializationKey, tuple[sqlalchemy.Table, Postprocessing]] = {} - self._upload_tables: dict[qt.DataCoordinateUploadKey, sqlalchemy.Table] = {} + self._upload_tables: dict[qt.DataCoordinateUploadKey, sqlalchemy.FromClause] = {} self._exit_stack: ExitStack | None = None self._raw_page_size = raw_page_size self._postprocessing_filter_factor = postprocessing_filter_factor + self._constant_rows_limit = constant_rows_limit self._active_pages: dict[PageKey, tuple[Iterator[Sequence[sqlalchemy.Row]], Postprocessing]] = {} self._name_shrinker = NameShrinker(self.db.dialect.max_identifier_length) @@ -173,17 +179,11 @@ def execute(self, result_spec: ResultSpec, tree: qt.QueryTree) -> ResultPage: builder.postprocessing.limit = result_spec.limit else: sql_select = sql_select.limit(result_spec.limit) - if result_spec.offset: - if builder.postprocessing: - sql_select = sql_select.offset(result_spec.offset) - else: - builder.postprocessing.offset = result_spec.offset if builder.postprocessing.limit is not None: - # We might want to fetch many fewer rows that the default page - # size if we have to implement offset and limit in postprocessing. + # We might want to fetch many fewer rows than the default page + # size if we have to implement limit in postprocessing. raw_page_size = min( - self._postprocessing_filter_factor - * (builder.postprocessing.offset + builder.postprocessing.limit), + self._postprocessing_filter_factor * builder.postprocessing.limit, self._raw_page_size, ) cursor = self._exit_stack.enter_context( @@ -209,6 +209,7 @@ def fetch_next_page(self, result_spec: DatasetRefResultSpec, key: PageKey) -> Da def fetch_next_page(self, result_spec: GeneralResultSpec, key: PageKey) -> GeneralResultPage: ... def fetch_next_page(self, result_spec: ResultSpec, key: PageKey) -> ResultPage: + # Docstring inherited. raw_page_iter, postprocessing = self._active_pages.pop(key) return self._process_page(raw_page_iter, result_spec, postprocessing) @@ -248,10 +249,15 @@ def upload_data_coordinates( QueryBuilder.EMPTY_COLUMNS_NAME, dtype=QueryBuilder.EMPTY_COLUMNS_TYPE, nullable=True ) ) - table = self._exit_stack.enter_context(self.db.temporary_table(table_spec)) - self.db.insert(table, *(dict(zip(dimensions.required, values)) for values in rows)) + dict_rows = [dict(zip(dimensions.required, values)) for values in rows] + from_clause: sqlalchemy.FromClause + if len(dict_rows) > self._constant_rows_limit: + from_clause = self._exit_stack.enter_context(self.db.temporary_table(table_spec)) + self.db.insert(from_clause, *dict_rows) + else: + from_clause = self.db.constant_rows(table_spec.fields, *dict_rows) key = uuid.uuid4() - self._upload_tables[key] = table + self._upload_tables[key] = from_clause return key def count( @@ -310,7 +316,7 @@ def any(self, tree: qt.QueryTree, *, execute: bool, exact: bool) -> bool: def explain_no_results(self, tree: qt.QueryTree, execute: bool) -> Iterable[str]: # Docstring inherited. - plan, _ = self.build_query(tree, qt.ColumnSet(tree.dimensions)) + plan, _ = self.analyze_query(tree, qt.ColumnSet(tree.dimensions)) if plan.joins.messages or not execute: return plan.joins.messages # TODO: guess at ways to split up query that might fail or succeed if @@ -334,6 +340,36 @@ def build_query( order_by: Iterable[qt.OrderExpression] = (), find_first_dataset: str | None = None, ) -> tuple[QueryPlan, QueryBuilder]: + """Convert a query description into a mostly-completed `QueryBuilder`. + + Parameters + ---------- + tree : `.queries.tree.QueryTree` + Description of the joins and row filters in the query. + final_columns : `.queries.tree.ColumnSet` + Final output columns that should be emitted by the SQL query. + order_by : `~collections.abc.Iterable` [ \ + `.queries.tree.OrderExpression` ], optional + Column expressions to sort by. + find_first_dataset : `str` or `None`, optional + Name of a dataset type for which only one result row for each data + ID should be returned, with the colletions searched in order. + + Returns + ------- + plan : `QueryPlan` + Plan used to transform the query into SQL, including some + information (e.g. diagnostics about doomed-to-fail dataset + searches) that isn't transferred into the builder itself. + builder : `QueryBuilder` + Builder object that can be used to create a SQL SELECT via its + `~QueryBuilder.select` method. We return this instead of a + `sqlalchemy.Select` object itself to allow different methods to + customize the SELECT clause itself (e.g. `count` can replace the + columns selected with ``COUNT(*)``). + """ + # See the QueryPlan docs for an overview of what these stages of query + # construction do. plan, builder = self.analyze_query(tree, final_columns, order_by, find_first_dataset) self.apply_query_joins(plan.joins, builder.joiner) self.apply_query_projection(plan.projection, builder) @@ -348,19 +384,54 @@ def analyze_query( order_by: Iterable[qt.OrderExpression] = (), find_first_dataset: str | None = None, ) -> tuple[QueryPlan, QueryBuilder]: + """Construct a plan for building a query and initialize a builder. + + Parameters + ---------- + tree : `.queries.tree.QueryTree` + Description of the joins and row filters in the query. + final_columns : `.queries.tree.ColumnSet` + Final output columns that should be emitted by the SQL query. + order_by : `~collections.abc.Iterable` [ \ + `.queries.tree.OrderExpression` ], optional + Column expressions to sort by. + find_first_dataset : `str` or `None`, optional + Name of a dataset type for which only one result row for each data + ID should be returned, with the colletions searched in order. + + Returns + ------- + plan : `QueryPlan` + Plan used to transform the query into SQL, including some + information (e.g. diagnostics about doomed-to-fail dataset + searches) that isn't transferred into the builder itself. + builder : `QueryBuilder` + Builder object initialized with overlap joins and constraints + potentially included, with the remainder still present in + `QueryJoinPlans.predicate`. + """ + # The fact that this method returns both a QueryPlan and an initial + # QueryBuilder (rather than just a QueryPlan) is a tradeoff that lets + # DimensionRecordStorageManager.process_query_overlaps (which is called + # by the `_analyze_query_tree` call below) pull out overlap expressions + # from the predicate at the same time it turns them into SQL table + # joins (in the builder). joins_plan, builder = self._analyze_query_tree(tree) # The "projection" columns differ from the final columns by not - # omitting any dimension keys (since that makes it easier to reason - # about), including any columns needed by order_by terms, and including - # the dataset rank if there's a find-first search in play. + # omitting any dimension keys (this keeps queries for different result + # types more similar during construction), including any columns needed + # only by order_by terms, and including the collection key if we need + # it for GROUP BY or DISTINCT. projection_plan = QueryProjectionPlan( final_columns.copy(), joins_plan.datasets, find_first_dataset=find_first_dataset ) projection_plan.columns.restore_dimension_keys() for term in order_by: term.gather_required_columns(projection_plan.columns) - + # The projection gets interesting if it does not have all of the + # dimensions of the "joins" stage, because that means it needs to do + # a GROUP BY or DISTINCT ON to get unique rows. if projection_plan.columns.dimensions != joins_plan.columns.dimensions: assert projection_plan.columns.dimensions.issubset(joins_plan.columns.dimensions) # We're going from a larger set of dimensions to a smaller set, @@ -369,7 +440,7 @@ def analyze_query( # If there are any dataset fields being propagated through that # projection and there is more than one collection, we need to # include the collection_key column so we can use that as one of - # the DISTINCT ON or GROUP BY columns. + # the DISTINCT or GROUP BY columns. for dataset_type, fields_for_dataset in projection_plan.columns.dataset_fields.items(): if len(joins_plan.datasets[dataset_type].collection_records) > 1: fields_for_dataset.add("collection_key") @@ -388,11 +459,12 @@ def analyze_query( # If we're doing a find-first search and there's a calibration # collection in play, we need to make sure the rows coming out of # the base query have only one timespan for each data ID + - # collection, and we can only do that with a GROUP BY and COUNT. + # collection, and we can only do that with a GROUP BY and COUNT + # that we inspect in postprocessing. if find_first_plan.search.is_calibration_search: builder.postprocessing.check_validity_match_count = True - # The base query also needs to include all columns needed by the + # The joins-stage query also needs to include all columns needed by the # downstream projection query. joins_plan.columns.update(projection_plan.columns) @@ -405,6 +477,18 @@ def analyze_query( return plan, builder def apply_query_joins(self, plan: QueryJoinsPlan, joiner: QueryJoiner) -> None: + """Modify a `QueryJoiner` to include all tables and other FROM and + WHERE clause terms needed. + + Parameters + ---------- + plan : `QueryJoinPlan` + Component of a `QueryPlan` relevant for the "joins" stage. + joiner : `QueryJoiner` + Component of a `QueryBuilder` that holds the FROM and WHERE + clauses. This is expected to be initialized by `analyze_query` + and will be modified in-place on return. + """ # Process data coordinate upload joins. for upload_key, upload_dimensions in plan.data_coordinate_uploads.items(): joiner.join( @@ -416,12 +500,11 @@ def apply_query_joins(self, plan: QueryJoinsPlan, joiner: QueryJoiner) -> None: for materialization_key, materialization_spec in plan.materializations.items(): self._join_materialization(joiner, materialization_key, materialization_spec) # Process dataset joins. - for dataset_type, dataset_search in plan.datasets.items(): + for dataset_search in plan.datasets.values(): self._join_dataset_search( joiner, - dataset_type, dataset_search, - plan.columns, + plan.columns.dataset_fields[dataset_search.name], ) # Join in dimension element tables that we know we need relationships # or columns from. @@ -448,6 +531,19 @@ def apply_query_joins(self, plan: QueryJoinsPlan, joiner: QueryJoiner) -> None: joiner.where(plan.predicate.visit(SqlColumnVisitor(joiner, self))) def apply_query_projection(self, plan: QueryProjectionPlan, builder: QueryBuilder) -> None: + """Modify `QueryBuilder` to reflect the "projection" stage of query + construction, which can involve a GROUP BY or DISTINCT [ON] clause + that enforces uniqueness. + + Parameters + ---------- + plan : `QueryProjectionPlan` + Component of a `QueryPlan` relevant for the "projection" stage. + builder : `QueryBuilder` + Builder object that will be modified in place. Expected to be + initialized by `analyze_query` and further modified by + `apply_query_joins`. + """ builder.columns = plan.columns if not plan.needs_dimension_distinct and not builder.postprocessing.check_validity_match_count: # Rows are already unique; nothing else to do in this method. @@ -563,6 +659,25 @@ def apply_query_projection(self, plan: QueryProjectionPlan, builder: QueryBuilde builder.group_by = unique_keys def apply_query_find_first(self, plan: QueryFindFirstPlan | None, builder: QueryBuilder) -> QueryBuilder: + """Modify an under-construction SQL query to return only one row for + each data ID, searching collections in order. + + Parameters + ---------- + plan : `QueryFindFirstPlan` or `None` + Component of a `QueryPlan` relevant for the "find first" stage. + builder : `QueryBuilder` + Builder object as produced by `apply_query_projection`. This + object should be considered to be consumed by this method - the + same instance may or may not be returned, and if it is not + returned, its state is not defined. + + Returns + ------- + builder : `QueryBuilder` + Modified query builder that includes the find-first resolution, if + one was needed. + """ if not plan: return builder # The query we're building looks like this: @@ -584,10 +699,16 @@ def apply_query_find_first(self, plan: QueryFindFirstPlan | None, builder: Query # WHERE # {dst}_window.rownum = 1; # - # The outermost SELECT will be represented by the SqlProjection we - # return. - # The SqlProjection we're given corresponds to the Common Table + # The outermost SELECT will be represented by the QueryBuilder we + # return. The QueryBuilder we're given corresponds to the Common Table # Expression (CTE) at the top. + # + # For SQLite only, we could use a much simpler GROUP BY instead, + # because it extends the standard to do exactly what we want when MIN + # or MAX appears once and a column does not have an aggregate function + # (https://www.sqlite.org/quirks.html). But since that doesn't work + # with PostgreSQL it doesn't help us. + # builder = builder.nested(cte=True, force=True) # We start by filling out the "window" SELECT statement... partition_by = [builder.joiner.dimension_keys[d][0] for d in builder.columns.dimensions.required] @@ -612,9 +733,28 @@ def apply_query_find_first(self, plan: QueryFindFirstPlan | None, builder: Query return builder def _analyze_query_tree(self, tree: qt.QueryTree) -> tuple[QueryJoinsPlan, QueryBuilder]: - # Delegate to the dimensions manager to rewrite the predicate and - # start a QueryBuilder to cover any spatial overlap - # joins or constraints. We'll return that SqlBuilder at the end. + """Start constructing a plan for building a query from a + `.queries.tree.QueryTree`. + + Parameters + ---------- + tree : `.queries.tree.QueryTree` + Description of the joins and row filters in the query. + + Returns + ------- + plan : `QueryJoinsPlan` + Initial component of the plan relevant for the "joins" stage, + including all joins and columns needed by ``tree``. Additional + columns will be added to this plan later. + builder : `QueryBuilder` + Builder object initialized with overlap joins and constraints + potentially included, with the remainder still present in + `QueryJoinPlans.predicate`. + """ + # Delegate to the dimensions manager to rewrite the predicate and start + # a QueryBuilder to cover any spatial overlap joins or constraints. + # We'll return that QueryBuilder at the end. ( predicate, builder, @@ -670,6 +810,25 @@ def _resolve_dataset_search( dataset_search: qt.DatasetSearch, constraint_data_id: Mapping[str, DataIdValue], ) -> ResolvedDatasetSearch: + """Resolve the collections that should actually be searched for + datasets of a particular type. + + Parameters + ---------- + dataset_type_name : `str` + Name of the dataset being searched for. + dataset_search : `.queries.tree.DatasetSearch` + Struct holding the dimensions and original collection search path. + constraint_data_id : `~collections.abc.Mapping` + Data ID mapping derived from the query predicate that may be used + to filter out some collections based on their governor dimensions. + + Returns + ------- + resolved : `ResolvedDatasetSearch` + Struct that extends `dataset_search`` with the dataset type name + and resolved collection records. + """ result = ResolvedDatasetSearch(dataset_type_name, dataset_search.dimensions) for collection_record, collection_summary in self._resolve_collection_path( dataset_search.collections @@ -704,9 +863,29 @@ def _resolve_dataset_search( def _resolve_collection_path( self, collections: Iterable[str] ) -> list[tuple[CollectionRecord, CollectionSummary]]: + """Expand an ordered iterable of collection names into a list of + collection records and summaries. + + Parameters + ---------- + collections : `~collections.abc.Iterable` [ `str` ] + Ordered iterable of collections. + + Returns + ------- + resolved : `list` [ `tuple` [ `.registry.interfaces.CollectionRecord`,\ + `.registry.CollectionSummary` ] ] + Tuples of collection record and summary. `~CollectionType.CHAINED` + collections are flattened out and not included. + """ result: list[tuple[CollectionRecord, CollectionSummary]] = [] done: set[str] = set() + # Eventually we really want this recursive Python code to be replaced + # by a recursive SQL query, especially if we extend this method to + # support collection glob patterns to support public APIs we don't yet + # have in the new query system (but will need to add). + def recurse(collection_names: Iterable[str]) -> None: for collection_name in collection_names: if collection_name not in done: @@ -725,32 +904,54 @@ def recurse(collection_names: Iterable[str]) -> None: def _join_materialization( self, joiner: QueryJoiner, - materialization_key: qt.MaterializationKey, + key: qt.MaterializationKey, dimensions: DimensionGroup, ) -> None: + """Join a materialization into an under-construction query. + + Parameters + ---------- + joiner : `QueryJoiner` + Component of a `QueryBuilder` that holds the FROM and WHERE + clauses. This will be modified in-place on return. + key : `.queries.tree.MaterializationKey` + Unique identifier created for this materialization when it was + created. + dimensions : `DimensionGroup` + Dimensions of the materialization. + """ columns = qt.ColumnSet(dimensions) - table, postprocessing = self._materializations[materialization_key] + table, postprocessing = self._materializations[key] joiner.join(QueryJoiner(self.db, table).extract_columns(columns, postprocessing)) def _join_dataset_search( self, joiner: QueryJoiner, - dataset_type: str, - processed_dataset_search: ResolvedDatasetSearch, - columns: qt.ColumnSet, + resolved_search: ResolvedDatasetSearch, + fields: Set[str], ) -> None: - storage = self.managers.datasets[dataset_type] + """Join a dataset search into an under-construction query. + + Parameters + ---------- + joiner : `QueryJoiner` + Component of a `QueryBuilder` that holds the FROM and WHERE + clauses. This will be modified in-place on return. + resolved_search : `ResolvedDatasetSearch` + Struct that describes the dataset type and collections. + fields : `~collections.abc.Set` [ `str` ] + Dataset fields to include. + """ + storage = self.managers.datasets[resolved_search.name] # The next two asserts will need to be dropped (and the implications # dealt with instead) if materializations start having dataset fields. - assert dataset_type not in joiner.fields, "Dataset fields have unexpectedly already been joined in." assert ( - dataset_type not in joiner.timespans + resolved_search.name not in joiner.fields + ), "Dataset fields have unexpectedly already been joined in." + assert ( + resolved_search.name not in joiner.timespans ), "Dataset timespan has unexpectedly already been joined in." - joiner.join( - storage.make_query_joiner( - processed_dataset_search.collection_records, columns.dataset_fields[dataset_type] - ) - ) + joiner.join(storage.make_query_joiner(resolved_search.collection_records, fields)) def _process_page( self, @@ -758,10 +959,26 @@ def _process_page( result_spec: ResultSpec, postprocessing: Postprocessing, ) -> ResultPage: - try: - raw_page = next(raw_page_iter) - except StopIteration: - raw_page = tuple() + """Process a query result iterator into a `ResultPage` object. + + Parameters + ---------- + raw_page_iterator : `~collections.abc.Iterator` [ \ + `~collections.abc.Sequence` [ `sqlalchemy.Row` ] ] + Iterator over sequences of SQLAlchemy result-row objects, with each + sequence corresponding to a page. + result_spec : `.queries.ResultSpec` + Description of the result type. + postprocessing : `Postprocessing` + Object that filters and/or verifies result rows to perform + operations we sometimes can't perform in SQL. + + Returns + ------- + result_page : `ResultPage` + Page of results of the type described by ``result_spec``. + """ + raw_page = next(raw_page_iter, tuple()) if len(raw_page) == self._raw_page_size: # There's some chance we got unlucky and this page exactly finishes # off the query, and we won't know the next page does not exist @@ -771,10 +988,11 @@ def _process_page( self._active_pages[next_key] = (raw_page_iter, postprocessing) else: next_key = None + postprocessed_rows = postprocessing.apply(raw_page) match result_spec: case DimensionRecordResultSpec(): return convert_dimension_record_results( - postprocessing.apply(raw_page), + postprocessed_rows, result_spec, next_key, self._name_shrinker, diff --git a/python/lsst/daf/butler/direct_query_driver/_postprocessing.py b/python/lsst/daf/butler/direct_query_driver/_postprocessing.py index 2dc4ead742..8001d06732 100644 --- a/python/lsst/daf/butler/direct_query_driver/_postprocessing.py +++ b/python/lsst/daf/butler/direct_query_driver/_postprocessing.py @@ -27,7 +27,7 @@ from __future__ import annotations -__all__ = ("Postprocessing", "ValidityRangeMatchError") +__all__ = ("Postprocessing",) from collections.abc import Iterable, Iterator from typing import TYPE_CHECKING, ClassVar @@ -35,40 +35,64 @@ import sqlalchemy from lsst.sphgeom import DISJOINT, Region +from ..queries import ValidityRangeMatchError from ..queries import tree as qt if TYPE_CHECKING: from ..dimensions import DimensionElement -class ValidityRangeMatchError(RuntimeError): - pass +class Postprocessing: + """A helper object that filters and checks SQL-query result rows to perform + operations we can't [fully] perform in the SQL query. + Notes + ----- + Postprocessing objects are initialized with no parameters to do nothing + when applied; they are modified as needed in place as the query is built. + + Postprocessing objects evaluate to `True` in a boolean context only when + they might perform actual row filtering. They may still perform checks + when they evaluate to `False`. + """ -class Postprocessing: def __init__(self) -> None: - self.spatial_join_filtering: list[tuple[DimensionElement, DimensionElement]] = [] - self.spatial_where_filtering: list[tuple[DimensionElement, Region]] = [] + self.spatial_join_filtering = [] + self.spatial_where_filtering = [] self.check_validity_match_count: bool = False - self._offset: int = 0 self._limit: int | None = None VALIDITY_MATCH_COUNT: ClassVar[str] = "_VALIDITY_MATCH_COUNT" + """The field name used for the special result column that holds the number + of matching find-first calibration datasets for each data ID. - @property - def offset(self) -> int: - return self._offset + When present, the value of this column must be one for all rows. + """ - @offset.setter - def offset(self, value: int) -> None: - if value and not self: - raise RuntimeError( - "Postprocessing should only implement 'offset' if it needs to do spatial filtering." - ) - self._offset = value + spatial_join_filtering: list[tuple[DimensionElement, DimensionElement]] + """Pairs of dimension elements whose regions must overlap; rows with + any non-overlap pair will be filtered out. + """ + + spatial_where_filtering: list[tuple[DimensionElement, Region]] + """Dimension elements and regions that must overlap; rows with any + non-overlap pair will be filtered out. + """ + + check_validity_match_count: bool + """If `True`, result rows will include a special column that counts the + number of matching datasets in each collection for each data ID, and + postprocessing should check that the value of this column is one for + every row (and raise `.queries.ValidityRangeMatchError` if it is not). + """ @property def limit(self) -> int | None: + """The maximum number of rows to return, or `None` for no limit. + + This is only set when other postprocess filtering makes it impossible + to apply directly in SQL. + """ return self._limit @limit.setter @@ -83,11 +107,27 @@ def __bool__(self) -> bool: return bool(self.spatial_join_filtering or self.spatial_where_filtering) def gather_columns_required(self, columns: qt.ColumnSet) -> None: + """Add all columns required to perform postprocessing to the given + column set. + + Parameters + ---------- + columns : `.queries.tree.ColumnSet` + Column set to modify in place. + """ for element in self.iter_region_dimension_elements(): columns.update_dimensions(element.minimal_group) columns.dimension_fields[element.name].add("region") def iter_region_dimension_elements(self) -> Iterator[DimensionElement]: + """Iterate over the dimension elements whose regions are needed for + postprocessing. + + Returns + ------- + elements : `~collections.abc.Iterator` [ `DimensionElement` ] + Iterator over dimension element objects. + """ for a, b in self.spatial_join_filtering: yield a yield b @@ -95,6 +135,21 @@ def iter_region_dimension_elements(self) -> Iterator[DimensionElement]: yield element def iter_missing(self, columns: qt.ColumnSet) -> Iterator[DimensionElement]: + """Iterate over the columns needed for postprocessing that are not in + the given `.queries.tree.ColumnSet`. + + Parameters + ---------- + columns : `.queries.tree.ColumnSet` + Columns that should not be returned by this method. These are + typically the columns included in a query even in the absence of + postprocessing. + + Returns + ------- + elements : `~collections.abc.Iterator` [ `DimensionElement` ] + Iterator over dimension element objects. + """ done: set[DimensionElement] = set() for element in self.iter_region_dimension_elements(): if element not in done: @@ -103,8 +158,30 @@ def iter_missing(self, columns: qt.ColumnSet) -> Iterator[DimensionElement]: done.add(element) def apply(self, rows: Iterable[sqlalchemy.Row]) -> Iterable[sqlalchemy.Row]: - if not self: + """Apply the postprocessing to an iterable of SQL result rows. + + Parameters + ---------- + rows : `~collections.abc.Iterable` [ `sqlalchemy.Row` ] + Rows to process. + + Returns + ------- + processed : `~collections.abc.Iterable` [ `sqlalchemy.Row` ] + Rows that pass the postprocessing filters and checks. + + Notes + ----- + This method decreases `limit` in place if it is not `None`, such that + the same `Postprocessing` instance can be applied to each page in a + sequence of result pages. This means a single `Postprocessing` object + can only be used for a single SQL query, and should be discarded when + iteration over the results of that query is complete. + """ + if not (self or self.check_validity_match_count): yield from rows + if self._limit == 0: + return joins = [ ( qt.ColumnSet.get_qualified_name(a.name, "region"), @@ -116,6 +193,7 @@ def apply(self, rows: Iterable[sqlalchemy.Row]) -> Iterable[sqlalchemy.Row]: (qt.ColumnSet.get_qualified_name(element.name, "region"), region) for element, region in self.spatial_where_filtering ] + for row in rows: m = row._mapping if any(m[a].relate(m[b]) & DISJOINT for a, b in joins) or any( @@ -128,11 +206,8 @@ def apply(self, rows: Iterable[sqlalchemy.Row]) -> Iterable[sqlalchemy.Row]: "'where' needs to be added, but it could also mean that multiple validity ranges " "overlap a single output data ID." ) - if self._offset: - self._offset -= 1 - continue - if self._limit == 0: - break - yield row if self._limit is not None: self._limit -= 1 + if self._limit == 0: + return + yield row diff --git a/python/lsst/daf/butler/direct_query_driver/_query_builder.py b/python/lsst/daf/butler/direct_query_driver/_query_builder.py index 0fed41564b..8378bedfab 100644 --- a/python/lsst/daf/butler/direct_query_driver/_query_builder.py +++ b/python/lsst/daf/butler/direct_query_driver/_query_builder.py @@ -48,132 +48,60 @@ @dataclasses.dataclass -class QueryJoiner: - db: Database - from_clause: sqlalchemy.FromClause | None = None - where_terms: list[sqlalchemy.ColumnElement[bool]] = dataclasses.field(default_factory=list) - - dimension_keys: NonemptyMapping[str, list[sqlalchemy.ColumnElement]] = dataclasses.field( - default_factory=lambda: NonemptyMapping(list) - ) - - fields: NonemptyMapping[str, dict[str, sqlalchemy.ColumnElement[Any]]] = dataclasses.field( - default_factory=lambda: NonemptyMapping(dict) - ) - - timespans: dict[str, TimespanDatabaseRepresentation] = dataclasses.field(default_factory=dict) - - special: dict[str, sqlalchemy.ColumnElement[Any]] = dataclasses.field(default_factory=dict) - - name_shrinker: NameShrinker | None = None - - @property - def sql_columns(self) -> sqlalchemy.ColumnCollection: - assert self.from_clause is not None - return self.from_clause.columns +class QueryBuilder: + """A struct used to represent an under-construction SQL SELECT query. - def extract_dimensions(self, dimensions: Iterable[str], **kwargs: str) -> QueryJoiner: - assert self.from_clause is not None, "Cannot extract columns with no FROM clause." - for dimension_name in dimensions: - self.dimension_keys[dimension_name].append(self.from_clause.columns[dimension_name]) - for k, v in kwargs.items(): - self.dimension_keys[v].append(self.from_clause.columns[k]) - return self + This object's methods frequently "consume" ``self``, by either returning + it after modification or returning related copy that may share state with + the original. Users should be careful never to use consumed instances, and + are recommended to reuse the same variable name to make that hard to do + accidentally. + """ - def extract_columns( - self, - columns: qt.ColumnSet, - postprocessing: Postprocessing | None = None, - special: Iterable[str] = (), - ) -> QueryJoiner: - assert self.from_clause is not None, "Cannot extract columns with no FROM clause." - if self.name_shrinker is None: - self.name_shrinker = self._make_name_shrinker() - for logical_table, field in columns: - name = columns.get_qualified_name(logical_table, field) - if field is None: - self.dimension_keys[logical_table].append(self.from_clause.columns[name]) - else: - name = self.name_shrinker.shrink(name) - if columns.is_timespan(logical_table, field): - self.timespans[logical_table] = self.db.getTimespanRepresentation().from_columns( - self.from_clause.columns, name - ) - else: - self.fields[logical_table][field] = self.from_clause.columns[name] - if postprocessing is not None: - for element in postprocessing.iter_missing(columns): - self.fields[element.name]["region"] = self.from_clause.columns[ - self.name_shrinker.shrink(columns.get_qualified_name(element.name, "region")) - ] - if postprocessing.check_validity_match_count: - self.special[postprocessing.VALIDITY_MATCH_COUNT] = self.from_clause.columns[ - postprocessing.VALIDITY_MATCH_COUNT - ] - for name in special: - self.special[name] = self.from_clause.columns[name] - return self + joiner: QueryJoiner + """Struct representing the SQL FROM and WHERE clauses, as well as the + columns *available* to the query (but not necessarily in the SELECT + clause). + """ - def join(self, other: QueryJoiner) -> QueryJoiner: - join_on: list[sqlalchemy.ColumnElement] = [] - for dimension_name in self.dimension_keys.keys() & other.dimension_keys.keys(): - for column1, column2 in itertools.product( - self.dimension_keys[dimension_name], other.dimension_keys[dimension_name] - ): - join_on.append(column1 == column2) - self.dimension_keys[dimension_name].extend(other.dimension_keys[dimension_name]) - if self.from_clause is None: - self.from_clause = other.from_clause - elif other.from_clause is not None: - self.from_clause = self.from_clause.join(other.from_clause, onclause=sqlalchemy.and_(*join_on)) - self.where_terms += other.where_terms - self.special.update(other.special) - if other.name_shrinker: - if self.name_shrinker is not None: - self.name_shrinker.update(other.name_shrinker) - else: - self.name_shrinker = other.name_shrinker - return self + columns: qt.ColumnSet + """Columns to include the SELECT clause. - def where(self, *arg: sqlalchemy.ColumnElement[bool]) -> QueryJoiner: - self.where_terms.extend(arg) - return self + This does not include columns required only by `postprocessing` and columns + in `QueryJoiner.special`, which are also always included in the SELECT + clause. + """ - def to_builder( - self, - columns: qt.ColumnSet, - postprocessing: Postprocessing | None = None, - distinct: bool | Sequence[sqlalchemy.ColumnElement[Any]] = (), - group_by: Sequence[sqlalchemy.ColumnElement[Any]] = (), - ) -> QueryBuilder: - return QueryBuilder( - self, - columns, - postprocessing=postprocessing if postprocessing is not None else Postprocessing(), - distinct=distinct, - group_by=group_by, - ) + postprocessing: Postprocessing = dataclasses.field(default_factory=Postprocessing) + """Postprocessing that will be needed in Python after the SQL query has + been executed. + """ - def _make_name_shrinker(self) -> NameShrinker: - return NameShrinker(self.db.dialect.max_identifier_length, 6) + distinct: bool | Sequence[sqlalchemy.ColumnElement[Any]] = () + """A representation of a DISTINCT or DISTINCT ON clause. + If `True`, this represents a SELECT DISTINCT. If a non-empty sequence, + this represents a SELECT DISTINCT ON. If `False` or an empty sequence, + there is no DISTINCT clause. + """ -@dataclasses.dataclass -class QueryBuilder: - joiner: QueryJoiner - columns: qt.ColumnSet - postprocessing: Postprocessing = dataclasses.field(default_factory=Postprocessing) - distinct: bool | Sequence[sqlalchemy.ColumnElement[Any]] = () group_by: Sequence[sqlalchemy.ColumnElement[Any]] = () + """A representation of a GROUP BY clause. + + If not-empty, a GROUP BY clause with these columns is added. This + generally requires that every `sqlalchemy.ColumnElement` held in the nested + `joiner` that is part of `columns` must either be part of `group_by` or + hold an aggregate function. + """ EMPTY_COLUMNS_NAME: ClassVar[str] = "IGNORED" - """Name of the column added to a SQL ``SELECT`` query in order to represent - relations that have no real columns. + """Name of the column added to a SQL SELECT clause in order to construct + queries that have no real columns. """ EMPTY_COLUMNS_TYPE: ClassVar[type] = sqlalchemy.Boolean - """Type of the column added to a SQL ``SELECT`` query in order to represent - relations that have no real columns. + """Type of the column added to a SQL SELECT clause in order to construct + queries that have no real columns. """ def __post_init__(self) -> None: @@ -202,7 +130,15 @@ def handle_empty_columns( columns.append(sqlalchemy.sql.literal(True).label(cls.EMPTY_COLUMNS_NAME)) return columns - def select(self) -> sqlalchemy.sql.Select: + def select(self) -> sqlalchemy.Select: + """Transfrom this builder into a SQLAlchemy representation of a SELECT + query. + + Returns + ------- + select : `sqlalchemy.Select` + SQLAlchemy SELECT statement. + """ if self.joiner.name_shrinker is None: self.joiner.name_shrinker = self.joiner._make_name_shrinker() sql_columns: list[sqlalchemy.ColumnElement[Any]] = [] @@ -245,10 +181,46 @@ def select(self) -> sqlalchemy.sql.Select: return result def join(self, other: QueryJoiner) -> QueryBuilder: + """Join tables, subqueries, and WHERE clauses from another query into + this one, in place. + + Parameters + ---------- + other : `QueryJoiner` + Object holding the FROM and WHERE clauses to add to this one. + JOIN ON clauses are generated via the dimension keys in common. + + Returns + ------- + self : `QueryBuilder` + This `QueryBuilder` instance (never a copy); returned to enable + method-chaining. + """ self.joiner.join(other) return self def to_joiner(self, cte: bool = False, force: bool = False) -> QueryJoiner: + """Convert this builder into a `QueryJoiner`, nesting it in a subquery + or common table expression only if needed to apply DISTINCT or GROUP BY + clauses. + + This method consumes ``self``. + + Parameters + ---------- + cte : `bool`, optional + If `True`, nest via a common table expression instead of a + subquery. + force : `bool`, optional + If `True`, nest via a subquery or common table expression even if + there is no DISTINCT or GROUP BY. + + Returns + ------- + joiner : `QueryJoiner` + QueryJoiner` with at least all columns in `columns` available. + This may or may not be the `joiner` attribute of this object. + """ if force or self.distinct or self.group_by: sql_from_clause = self.select().cte() if cte else self.select().subquery() return QueryJoiner( @@ -257,6 +229,28 @@ def to_joiner(self, cte: bool = False, force: bool = False) -> QueryJoiner: return self.joiner def nested(self, cte: bool = False, force: bool = False) -> QueryBuilder: + """Convert this builder into a `QueryBuiler` that is guaranteed to have + no DISTINCT or GROUP BY, nesting it in a subquery or common table + expression only if needed to apply any current DISTINCT or GROUP BY + clauses. + + This method consumes ``self``. + + Parameters + ---------- + cte : `bool`, optional + If `True`, nest via a common table expression instead of a + subquery. + force : `bool`, optional + If `True`, nest via a subquery or common table expression even if + there is no DISTINCT or GROUP BY. + + Returns + ------- + builder : `QueryBuilder` + `QueryBuilder` with at least all columns in `columns` available. + This may or may not be the `builder` attribute of this object. + """ return QueryBuilder( self.to_joiner(cte=cte, force=force), columns=self.columns, postprocessing=self.postprocessing ) @@ -265,6 +259,20 @@ def union_subquery( self, others: Iterable[QueryBuilder], ) -> QueryJoiner: + """Combine this builder with others to make a SELECT UNION subquery. + + Parameters + ---------- + others : `~collections.abc.Iterable` [ `QueryBuilder` ] + Other query builders to union with. Their `columns` attributes + must be the same as those of ``self``. + + Returns + ------- + joiner : `QueryJoiner` + `QueryJoiner` with at least all columns in `columns` available. + This may or may not be the `joiner` attribute of this object. + """ select0 = self.select() other_selects = [other.select() for other in others] return QueryJoiner( @@ -274,6 +282,15 @@ def union_subquery( ).extract_columns(self.columns, self.postprocessing) def make_table_spec(self) -> ddl.TableSpec: + """Make a specification that can be used to create a table to store + this query's outputs. + + Returns + ------- + spec : `.ddl.TableSpec` + Table specification for this query's result columns (including + those from `postprocessing` and `QueryJoiner.special`). + """ assert not self.joiner.special, "special columns not supported in make_table_spec" if self.joiner.name_shrinker is None: self.joiner.name_shrinker = self.joiner._make_name_shrinker() @@ -295,3 +312,262 @@ def make_table_spec(self) -> ddl.TableSpec: ) ) return results + + +@dataclasses.dataclass +class QueryJoiner: + """A struct used to represent the FROM and WHERE clauses of an + under-construction SQL SELECT query. + + This object's methods frequently "consume" ``self``, by either returning + it after modification or returning related copy that may share state with + the original. Users should be careful never to use consumed instances, and + are recommended to reuse the same variable name to make that hard to do + accidentally. + """ + + db: Database + """Object that abstracts over the database engine.""" + + from_clause: sqlalchemy.FromClause | None = None + """SQLAlchemy representation of the FROM clause. + + This is initialized to `None` but in almost all cases is immediately + replaced. + """ + + where_terms: list[sqlalchemy.ColumnElement[bool]] = dataclasses.field(default_factory=list) + """Sequence of WHERE clause terms to be combined with AND.""" + + dimension_keys: NonemptyMapping[str, list[sqlalchemy.ColumnElement]] = dataclasses.field( + default_factory=lambda: NonemptyMapping(list) + ) + """Mapping of dimension keys included in the FROM clause. + + Nested lists correspond to different tables that have the same dimension + key (which should all have equal values for all result rows). + """ + + fields: NonemptyMapping[str, dict[str, sqlalchemy.ColumnElement[Any]]] = dataclasses.field( + default_factory=lambda: NonemptyMapping(dict) + ) + """Mapping of columns that are neither dimension keys nor timespans. + + Inner and outer keys correspond to the "logical table" and "field" pairs + that result from iterating over `~.queries.tree.ColumnSet`, with the former + either a dimension element name or dataset type name. + """ + + timespans: dict[str, TimespanDatabaseRepresentation] = dataclasses.field(default_factory=dict) + """Mapping of timespan columns. + + Keys are "logical tables" - dimension element names or dataset type names. + """ + + special: dict[str, sqlalchemy.ColumnElement[Any]] = dataclasses.field(default_factory=dict) + """Special columns that are available from the FROM clause and + automatically included in the SELECT clause when this joiner is nested + within a `QueryBuilder`. + + These columns are not part of the dimension universe and are not associated + with a dataset. They are never returned to users, even if they may be + included in raw SQL results. + """ + + name_shrinker: NameShrinker | None = None + """An object that can be used to shrink field names to fit within the + identifier limit of the database engine. + + This is important for PostgreSQL (which has a 64-character limit) and + dataset fields, since dataset type names are used to qualify those and they + can be quite long. `DimensionUniverse` guarantees at construction that + dimension names and fully-qualified dimension fields do not exceed this + limit. + """ + + @property + def sql_columns(self) -> sqlalchemy.ColumnCollection: + """The full set of SQL columns available in the FROM clause. + + This property may only be accessed if `from_clause` is not `None`. + """ + assert self.from_clause is not None + return self.from_clause.columns + + def extract_dimensions(self, dimensions: Iterable[str], **kwargs: str) -> QueryJoiner: + """Add dimension key columns from `from_clause` into `dimension_keys`. + + Parameters + ---------- + dimensions : `~collections.abc.Iterable` [ `str` ] + Names of dimensions to include, assuming that their names in + `sql_columns` are just the dimension names. + **kwargs : `str` + Additional dimensions to include, with the names in `sql_columns` + as keys and the actual dimension names as values. + + Returns + ------- + self : `QueryJoiner` + This `QueryJoiner` instance (never a copy). Provided to enable + method chaining. + """ + assert self.from_clause is not None, "Cannot extract columns with no FROM clause." + for dimension_name in dimensions: + self.dimension_keys[dimension_name].append(self.from_clause.columns[dimension_name]) + for k, v in kwargs.items(): + self.dimension_keys[v].append(self.from_clause.columns[k]) + return self + + def extract_columns( + self, + columns: qt.ColumnSet, + postprocessing: Postprocessing | None = None, + special: Iterable[str] = (), + ) -> QueryJoiner: + """Add columns from `from_clause` into `dimension_keys`. + + Parameters + ---------- + columns : `.queries.tree.ColumnSet` + Columns to include, assuming that + `.queries.tree.ColumnSet.get_qualified_name` corresponds to the + name used in `sql_columns` (after name shrinking). + postprocessing : `Postprocessing`, optional + Postprocessing object whose needed columns should also be included. + special : `~collections.abc.Iterable` [ `str` ], optional + Additional special columns to extract. + + Returns + ------- + self : `QueryJoiner` + This `QueryJoiner` instance (never a copy). Provided to enable + method chaining. + """ + assert self.from_clause is not None, "Cannot extract columns with no FROM clause." + if self.name_shrinker is None: + self.name_shrinker = self._make_name_shrinker() + for logical_table, field in columns: + name = columns.get_qualified_name(logical_table, field) + if field is None: + self.dimension_keys[logical_table].append(self.from_clause.columns[name]) + else: + name = self.name_shrinker.shrink(name) + if columns.is_timespan(logical_table, field): + self.timespans[logical_table] = self.db.getTimespanRepresentation().from_columns( + self.from_clause.columns, name + ) + else: + self.fields[logical_table][field] = self.from_clause.columns[name] + if postprocessing is not None: + for element in postprocessing.iter_missing(columns): + self.fields[element.name]["region"] = self.from_clause.columns[ + self.name_shrinker.shrink(columns.get_qualified_name(element.name, "region")) + ] + if postprocessing.check_validity_match_count: + self.special[postprocessing.VALIDITY_MATCH_COUNT] = self.from_clause.columns[ + postprocessing.VALIDITY_MATCH_COUNT + ] + for name in special: + self.special[name] = self.from_clause.columns[name] + return self + + def join(self, other: QueryJoiner) -> QueryJoiner: + """Combine this `QueryJoiner` with another via an INNER JOIN on + dimension keys. + + This method consumes ``self``. + + Parameters + ---------- + other : `QueryJoiner` + Other joiner to combine with this one. + + Returns + ------- + joined : `QueryJoiner` + A `QueryJoiner` with all columns present in either operand, with + its `from_clause` representing a SQL INNER JOIN where the dimension + key columns common to both operands are constrained to be equal. + If either operand does not have `from_clause`, the other's is used. + The `where_terms` of the two operands are concatenated, + representing a logical AND (with no attempt at deduplication). + """ + join_on: list[sqlalchemy.ColumnElement] = [] + for dimension_name in self.dimension_keys.keys() & other.dimension_keys.keys(): + for column1, column2 in itertools.product( + self.dimension_keys[dimension_name], other.dimension_keys[dimension_name] + ): + join_on.append(column1 == column2) + self.dimension_keys[dimension_name].extend(other.dimension_keys[dimension_name]) + if self.from_clause is None: + self.from_clause = other.from_clause + elif other.from_clause is not None: + self.from_clause = self.from_clause.join(other.from_clause, onclause=sqlalchemy.and_(*join_on)) + self.where_terms += other.where_terms + self.special.update(other.special) + if other.name_shrinker: + if self.name_shrinker is not None: + self.name_shrinker.update(other.name_shrinker) + else: + self.name_shrinker = other.name_shrinker + return self + + def where(self, *args: sqlalchemy.ColumnElement[bool]) -> QueryJoiner: + """Add a WHERE clause term. + + Parameters + ---------- + *args : `sqlalchemy.ColumnElement` + SQL boolean column expressions to be combined with AND. + + Returns + ------- + self : `QueryJoiner` + This `QueryJoiner` instance (never a copy). Provided to enable + method chaining. + """ + self.where_terms.extend(args) + return self + + def to_builder( + self, + columns: qt.ColumnSet, + postprocessing: Postprocessing | None = None, + distinct: bool | Sequence[sqlalchemy.ColumnElement[Any]] = (), + group_by: Sequence[sqlalchemy.ColumnElement[Any]] = (), + ) -> QueryBuilder: + """Convert this joiner into a `QueryBuilder` by providing SELECT clause + columns and optional DISTINCT or GROUP BY clauses. + + This method consumes ``self``. + + Parameters + ---------- + columns : `~.queries.tree.ColumnSet` + Regular columns to include in the SELECT clause. + postprocessing : `Postprocessing`, optional + Addition processing to be performed on result rows after executing + the SQL query. + distinct : `bool` or `~collections.abc.Sequence` [ \ + `sqlalchemy.ColumnElement` ], optional + Specification of the DISTINCT clause (see `QueryBuilder.distinct`). + group_by : `~collections.abc.Sequence` [ \ + `sqlalchemy.ColumnElement` ], optional + Specification of the GROUP BY clause (see `QueryBuilder.group_by`). + + Returns + ------- + builder : `QueryBuilder` + New query builder. + """ + return QueryBuilder( + self, + columns, + postprocessing=postprocessing if postprocessing is not None else Postprocessing(), + distinct=distinct, + group_by=group_by, + ) + + def _make_name_shrinker(self) -> NameShrinker: + return NameShrinker(self.db.dialect.max_identifier_length, 6) diff --git a/python/lsst/daf/butler/queries/_base.py b/python/lsst/daf/butler/queries/_base.py index c23baf66f8..5d81870bfc 100644 --- a/python/lsst/daf/butler/queries/_base.py +++ b/python/lsst/daf/butler/queries/_base.py @@ -27,7 +27,7 @@ from __future__ import annotations -__all__ = ("QueryBase", "QueryResultsBase") +__all__ = ("QueryBase", "QueryResultsBase", "ValidityRangeMatchError") from abc import ABC, abstractmethod from collections.abc import Iterable, Mapping, Set @@ -40,6 +40,20 @@ from .tree import OrderExpression, Predicate, QueryTree +class ValidityRangeMatchError(RuntimeError): + """Exception raised when a find-first calibration dataset query does not + fully resolve validity ranges. + + For a find-first query involving a calibration dataset to work, either the + query's result rows need to include a temporal dimension or needs to be + constrained temporally, such that each result row corresponds to a unique + calibration dataset. This exception can be raised if those dimensions or + constraint are missing, or if a temporal dimension timespan overlaps + multiple validity ranges (e.g. the recommended bias changes in the middle + of an exposure). + """ + + class QueryBase(ABC): """Common base class for `Query` and all `QueryResult` objects.