Skip to content

Commit

Permalink
Start adding docs to QueryPlan.
Browse files Browse the repository at this point in the history
  • Loading branch information
TallJimbo committed Mar 8, 2024
1 parent 288535e commit 38e83e8
Show file tree
Hide file tree
Showing 2 changed files with 285 additions and 125 deletions.
240 changes: 120 additions & 120 deletions python/lsst/daf/butler/direct_query_driver/_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,14 +334,14 @@ def build_query(
order_by: Iterable[qt.OrderExpression] = (),
find_first_dataset: str | None = None,
) -> tuple[QueryPlan, QueryBuilder]:
plan, builder = self._analyze_query(tree, final_columns, order_by, find_first_dataset)
self._apply_query_joins(plan.joins, builder.joiner)
self._apply_query_projection(plan.projection, builder)
builder = self._apply_find_first(plan.find_first, builder)
plan, builder = self.analyze_query(tree, final_columns, order_by, find_first_dataset)
self.apply_query_joins(plan.joins, builder.joiner)
self.apply_query_projection(plan.projection, builder)
builder = self.apply_query_find_first(plan.find_first, builder)
builder.columns = plan.final_columns
return plan, builder

def _analyze_query(
def analyze_query(
self,
tree: qt.QueryTree,
final_columns: qt.ColumnSet,
Expand Down Expand Up @@ -384,7 +384,7 @@ def _analyze_query(

find_first_plan = None
if find_first_dataset is not None:
find_first_plan = QueryFindFirstPlan(find_first_dataset, joins_plan.datasets[find_first_dataset])
find_first_plan = QueryFindFirstPlan(joins_plan.datasets[find_first_dataset])
# If we're doing a find-first search and there's a calibration
# collection in play, we need to make sure the rows coming out of
# the base query have only one timespan for each data ID +
Expand All @@ -404,118 +404,7 @@ def _analyze_query(
)
return plan, builder

def _analyze_query_tree(self, tree: qt.QueryTree) -> tuple[QueryJoinsPlan, QueryBuilder]:
# Delegate to the dimensions manager to rewrite the predicate and
# start a QueryBuilder to cover any spatial overlap
# joins or constraints. We'll return that SqlBuilder at the end.
(
predicate,
builder,
) = self.managers.dimensions.process_query_overlaps(
tree.dimensions,
tree.predicate,
tree.get_joined_dimension_groups(),
)
result = QueryJoinsPlan(predicate=predicate, columns=builder.columns)
# We also check that the predicate doesn't reference any dimensions
# without constraining their governor dimensions, since that's a
# particularly easy mistake to make and it's almost never intentional.
# We also allow the registry data ID values to provide governor values.
where_columns = qt.ColumnSet(self.universe.empty.as_group())
result.predicate.gather_required_columns(where_columns)
for governor in where_columns.dimensions.governors:
if governor not in result.constraint_data_id:
if governor in self._defaults.dataId.dimensions:
result.constraint_data_id[governor] = self._defaults.dataId[governor]
else:
raise qt.InvalidQueryError(
f"Query 'where' expression references a dimension dependent on {governor} without "
"constraining it directly."
)
# Add materializations, which can also bring in more postprocessing.
for m_key, m_dimensions in tree.materializations.items():
_, m_postprocessing = self._materializations[m_key]
result.materializations[m_key] = m_dimensions
# When a query is materialized, the new tree has an empty
# (trivially true) predicate because the original was used to make
# the materialized rows. But the original postprocessing isn't
# executed when the materialization happens, so we have to include
# it here.
builder.postprocessing.spatial_join_filtering.extend(m_postprocessing.spatial_join_filtering)
builder.postprocessing.spatial_where_filtering.extend(m_postprocessing.spatial_where_filtering)
# Add data coordinate uploads.
result.data_coordinate_uploads.update(tree.data_coordinate_uploads)
# Add dataset_searches and filter out collections that don't have the
# right dataset type or governor dimensions.
for dataset_type_name, dataset_search in tree.datasets.items():
resolved_dataset_search = self._resolve_dataset_search(
dataset_type_name, dataset_search, result.constraint_data_id
)
result.datasets[dataset_type_name] = resolved_dataset_search
if not resolved_dataset_search.collection_records:
result.messages.append(f"Search for dataset type {dataset_type_name!r} is doomed to fail.")
result.messages.extend(resolved_dataset_search.messages)
return result, builder

def _resolve_dataset_search(
self,
dataset_type_name: str,
dataset_search: qt.DatasetSearch,
constraint_data_id: Mapping[str, DataIdValue],
) -> ResolvedDatasetSearch:
result = ResolvedDatasetSearch(dataset_type_name, dataset_search.dimensions)
for collection_record, collection_summary in self._resolve_collection_path(
dataset_search.collections
):
rejected: bool = False
if result.name not in collection_summary.dataset_types.names:
result.messages.append(
f"No datasets of type {result.name!r} in collection {collection_record.name}."
)
rejected = True
for governor in constraint_data_id.keys() & collection_summary.governors.keys():
if constraint_data_id[governor] not in collection_summary.governors[governor]:
result.messages.append(
f"No datasets with {governor}={constraint_data_id[governor]!r} "
f"in collection {collection_record.name}."
)
rejected = True
if not rejected:
if collection_record.type is CollectionType.CALIBRATION:
result.is_calibration_search = True
result.collection_records.append(collection_record)
if result.dimensions != self.get_dataset_type(dataset_type_name).dimensions.as_group():
# This is really for server-side defensiveness; it's hard to
# imagine the query getting different dimensions for a dataset
# type in two calls to the same query driver.
raise qt.InvalidQueryError(
f"Incorrect dimensions {result.dimensions} for dataset {dataset_type_name} "
f"in query (vs. {self.get_dataset_type(dataset_type_name).dimensions.as_group()})."
)
return result

def _resolve_collection_path(
self, collections: Iterable[str]
) -> list[tuple[CollectionRecord, CollectionSummary]]:
result: list[tuple[CollectionRecord, CollectionSummary]] = []
done: set[str] = set()

def recurse(collection_names: Iterable[str]) -> None:
for collection_name in collection_names:
if collection_name not in done:
done.add(collection_name)
record = self.managers.collections.find(collection_name)

if record.type is CollectionType.CHAINED:
recurse(cast(ChainedCollectionRecord, record).children)
else:
result.append((record, self.managers.datasets.getCollectionSummary(record)))

recurse(collections)

return result

def _apply_query_joins(self, plan: QueryJoinsPlan, joiner: QueryJoiner) -> None:
def apply_query_joins(self, plan: QueryJoinsPlan, joiner: QueryJoiner) -> None:
# Process data coordinate upload joins.
for upload_key, upload_dimensions in plan.data_coordinate_uploads.items():
joiner.join(
Expand Down Expand Up @@ -558,7 +447,7 @@ def _apply_query_joins(self, plan: QueryJoinsPlan, joiner: QueryJoiner) -> None:
# Add the WHERE clause to the joiner.
joiner.where(plan.predicate.visit(SqlColumnVisitor(joiner, self)))

def _apply_query_projection(self, plan: QueryProjectionPlan, builder: QueryBuilder) -> None:
def apply_query_projection(self, plan: QueryProjectionPlan, builder: QueryBuilder) -> None:
builder.columns = plan.columns
if not plan.needs_dimension_distinct and not builder.postprocessing.check_validity_match_count:
# Rows are already unique; nothing else to do in this method.
Expand Down Expand Up @@ -673,7 +562,7 @@ def _apply_query_projection(self, plan: QueryProjectionPlan, builder: QueryBuild
unique_keys.append(builder.joiner.fields[logical_table][field])
builder.group_by = unique_keys

def _apply_find_first(self, plan: QueryFindFirstPlan | None, builder: QueryBuilder) -> QueryBuilder:
def apply_query_find_first(self, plan: QueryFindFirstPlan | None, builder: QueryBuilder) -> QueryBuilder:
if not plan:
return builder
# The query we're building looks like this:
Expand Down Expand Up @@ -722,6 +611,117 @@ def _apply_find_first(self, plan: QueryFindFirstPlan | None, builder: QueryBuild
del builder.joiner.special["_ROWNUM"]
return builder

def _analyze_query_tree(self, tree: qt.QueryTree) -> tuple[QueryJoinsPlan, QueryBuilder]:
# Delegate to the dimensions manager to rewrite the predicate and
# start a QueryBuilder to cover any spatial overlap
# joins or constraints. We'll return that SqlBuilder at the end.
(
predicate,
builder,
) = self.managers.dimensions.process_query_overlaps(
tree.dimensions,
tree.predicate,
tree.get_joined_dimension_groups(),
)
result = QueryJoinsPlan(predicate=predicate, columns=builder.columns)
# We also check that the predicate doesn't reference any dimensions
# without constraining their governor dimensions, since that's a
# particularly easy mistake to make and it's almost never intentional.
# We also allow the registry data ID values to provide governor values.
where_columns = qt.ColumnSet(self.universe.empty.as_group())
result.predicate.gather_required_columns(where_columns)
for governor in where_columns.dimensions.governors:
if governor not in result.constraint_data_id:
if governor in self._defaults.dataId.dimensions:
result.constraint_data_id[governor] = self._defaults.dataId[governor]
else:
raise qt.InvalidQueryError(
f"Query 'where' expression references a dimension dependent on {governor} without "
"constraining it directly."
)
# Add materializations, which can also bring in more postprocessing.
for m_key, m_dimensions in tree.materializations.items():
_, m_postprocessing = self._materializations[m_key]
result.materializations[m_key] = m_dimensions
# When a query is materialized, the new tree has an empty
# (trivially true) predicate because the original was used to make
# the materialized rows. But the original postprocessing isn't
# executed when the materialization happens, so we have to include
# it here.
builder.postprocessing.spatial_join_filtering.extend(m_postprocessing.spatial_join_filtering)
builder.postprocessing.spatial_where_filtering.extend(m_postprocessing.spatial_where_filtering)
# Add data coordinate uploads.
result.data_coordinate_uploads.update(tree.data_coordinate_uploads)
# Add dataset_searches and filter out collections that don't have the
# right dataset type or governor dimensions.
for dataset_type_name, dataset_search in tree.datasets.items():
resolved_dataset_search = self._resolve_dataset_search(
dataset_type_name, dataset_search, result.constraint_data_id
)
result.datasets[dataset_type_name] = resolved_dataset_search
if not resolved_dataset_search.collection_records:
result.messages.append(f"Search for dataset type {dataset_type_name!r} is doomed to fail.")
result.messages.extend(resolved_dataset_search.messages)
return result, builder

def _resolve_dataset_search(
self,
dataset_type_name: str,
dataset_search: qt.DatasetSearch,
constraint_data_id: Mapping[str, DataIdValue],
) -> ResolvedDatasetSearch:
result = ResolvedDatasetSearch(dataset_type_name, dataset_search.dimensions)
for collection_record, collection_summary in self._resolve_collection_path(
dataset_search.collections
):
rejected: bool = False
if result.name not in collection_summary.dataset_types.names:
result.messages.append(
f"No datasets of type {result.name!r} in collection {collection_record.name}."
)
rejected = True
for governor in constraint_data_id.keys() & collection_summary.governors.keys():
if constraint_data_id[governor] not in collection_summary.governors[governor]:
result.messages.append(
f"No datasets with {governor}={constraint_data_id[governor]!r} "
f"in collection {collection_record.name}."
)
rejected = True
if not rejected:
if collection_record.type is CollectionType.CALIBRATION:
result.is_calibration_search = True
result.collection_records.append(collection_record)
if result.dimensions != self.get_dataset_type(dataset_type_name).dimensions.as_group():
# This is really for server-side defensiveness; it's hard to
# imagine the query getting different dimensions for a dataset
# type in two calls to the same query driver.
raise qt.InvalidQueryError(
f"Incorrect dimensions {result.dimensions} for dataset {dataset_type_name} "
f"in query (vs. {self.get_dataset_type(dataset_type_name).dimensions.as_group()})."
)
return result

def _resolve_collection_path(
self, collections: Iterable[str]
) -> list[tuple[CollectionRecord, CollectionSummary]]:
result: list[tuple[CollectionRecord, CollectionSummary]] = []
done: set[str] = set()

def recurse(collection_names: Iterable[str]) -> None:
for collection_name in collection_names:
if collection_name not in done:
done.add(collection_name)
record = self.managers.collections.find(collection_name)

if record.type is CollectionType.CHAINED:
recurse(cast(ChainedCollectionRecord, record).children)
else:
result.append((record, self.managers.datasets.getCollectionSummary(record)))

recurse(collections)

return result

def _join_materialization(
self,
joiner: QueryJoiner,
Expand Down
Loading

0 comments on commit 38e83e8

Please sign in to comment.