Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-46363: Allow ObscoreExporter to work without SqlRegistry #19

Merged
merged 3 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ repos:
- id: end-of-file-fixer
- id: trailing-whitespace
- repo: https://github.com/psf/black
rev: 24.4.2
rev: 24.8.0
hooks:
- id: black
# It is recommended to specify the latest version of Python
Expand All @@ -23,10 +23,10 @@ repos:
name: isort (python)
- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
rev: v0.5.2
rev: v0.6.5
hooks:
- id: ruff
- repo: https://github.com/numpy/numpydoc
rev: "v1.7.0"
rev: "v1.8.0"
hooks:
- id: numpydoc-validation
8 changes: 8 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ ignore_missing_imports = True
ignore_missing_imports = True
ignore_errors = True

[mypy-lsst.daf.butler.*]
ignore_missing_imports = False
ignore_errors = True

[mypy-lsst.utils.*]
ignore_missing_imports = False
ignore_errors = True

[mypy-lsst.dax.obscore.*]
ignore_missing_imports = False
ignore_errors = False
Expand Down
21 changes: 6 additions & 15 deletions python/lsst/dax/obscore/obscore_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import io
import logging
from collections.abc import Iterator
from typing import TYPE_CHECKING, Any, cast
from typing import Any, cast

import pyarrow
import sqlalchemy
Expand All @@ -38,18 +38,13 @@
RecordFactory,
SpatialObsCorePlugin,
)
from lsst.daf.butler.registry.queries import SqlQueryBackend
from lsst.daf.butler.registry.sql_registry import SqlRegistry
from lsst.sphgeom import Region
from pyarrow import RecordBatch, Schema
from pyarrow.csv import CSVWriter, WriteOptions
from pyarrow.parquet import ParquetWriter

from . import ExporterConfig

if TYPE_CHECKING:
from lsst.daf.butler.registry.queries import SqlQueryContext

_LOG = logging.getLogger(__name__)

# Map few standard Python types to pyarrow types
Expand Down Expand Up @@ -197,7 +192,7 @@ def __init__(self, registry: Registry):
# Maps instrument and exposure ID to a visit ID
self._exposure_to_visit: dict[str, dict[int, int]] = {}

def exposure_region(self, dataId: DataCoordinate, context: SqlQueryContext) -> Region | None:
def exposure_region(self, dataId: DataCoordinate) -> Region | None:
# Docstring is inherited from a base class.
registry = self.registry
instrument = cast(str, dataId["instrument"])
Expand Down Expand Up @@ -344,15 +339,11 @@ def _make_record_batches(self, batch_size: int = 10_000) -> Iterator[RecordBatch
if not collections:
collections = ...

# Have to use non-public Registry interface.
registry = self.butler._registry # type: ignore
assert isinstance(registry, SqlRegistry), "Registry must be SqlRegistry"
backend = SqlQueryBackend(registry._db, registry._managers, registry.dimension_record_cache)

context = backend.context()
for dataset_type_name in self.config.dataset_types:
_LOG.debug("Reading data for dataset %s", dataset_type_name)
refs = registry.queryDatasets(dataset_type_name, collections=collections, where=self.config.where)
refs = self.butler.registry.queryDatasets(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is rewritten in #18 to use butler.query() but rebasing should be pretty straightforward.

dataset_type_name, collections=collections, where=self.config.where
)

# need dimension records
refs = refs.expanded()
Expand All @@ -362,7 +353,7 @@ def _make_record_batches(self, batch_size: int = 10_000) -> Iterator[RecordBatch
_LOG.debug("New record, dataId=%s", dataId.mapping)
# _LOG.debug("New record, records=%s", dataId.records)

record = self.record_factory(ref, context)
record = self.record_factory(ref)
if record is None:
continue

Expand Down
7 changes: 2 additions & 5 deletions python/lsst/dax/obscore/script/obscore_update_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
ObsCoreLiveTableManager,
ObsCoreManagerConfig,
)
from lsst.daf.butler.registry.queries import SqlQueryBackend
from lsst.daf.butler.registry.sql_registry import SqlRegistry
from lsst.utils import iteration

Expand Down Expand Up @@ -75,16 +74,14 @@ def obscore_update_table(
for ref in refs:
_LOG.info("Will be adding dataset %s", ref)
else:
backend = SqlQueryBackend(registry._db, registry._managers, registry.dimension_record_cache)
context = backend.context()
count = 0
if collection_record.type is CollectionType.RUN:
# Limit record number in single insert.
for refs_chunk in iteration.chunk_iterable(refs):
count += manager.add_datasets(refs_chunk, context)
count += manager.add_datasets(refs_chunk)
elif collection_record.type is CollectionType.TAGGED:
for refs_chunk in iteration.chunk_iterable(refs):
count += manager.associate(refs_chunk, collection_record, context)
count += manager.associate(refs_chunk, collection_record)
else:
raise ValueError(f"Unexpected collection type: {collection_record.type}")
end_time = time.time()
Expand Down
2 changes: 1 addition & 1 deletion tests/test_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def make_butler(self) -> Butler:
config = Config()
config["root"] = self.root
config["registry", "db"] = f"sqlite:///{self.root}/gen3.sqlite3"
butler = Butler(Butler.makeRepo(self.root, config=config), writeable=True)
butler = Butler.from_config(Butler.makeRepo(self.root, config=config), writeable=True)
DatastoreMock.apply(butler)
return butler

Expand Down
Loading