Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-40441: first batch of deprecations from RFC-949 #287

Merged
merged 19 commits into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
7274e9b
Add VERBOSE logging around QG saving.
TallJimbo Aug 31, 2023
0473f1e
Switch to new QG generation interfaces in CmdLineFwk.
TallJimbo Aug 25, 2023
5a50b19
Remove unused **kwargs from SimplePipelineExecutor.from_pipeline.
TallJimbo Aug 28, 2023
20d0a87
Modernize SimplePipelineExecutor via PipelineGraph/QGB.
TallJimbo Aug 28, 2023
c3c0d63
Use new QG builders in SeparablePipelineExecutor.
TallJimbo Aug 29, 2023
a299263
Drop usage of PipelineDatasetTypes in tests.
TallJimbo Aug 30, 2023
fd6236c
Drop usage of PipelineDatasetTypes in CmdLineFwk.
TallJimbo Aug 30, 2023
d51d1fc
Rework tests to avoid changing registry after QG generation.
TallJimbo Aug 31, 2023
c3d9f8c
Use PipelineGraph in PreExecInit.
TallJimbo Aug 30, 2023
d8a2085
Support TaskNode and deprecate TaskDef in TaskFactory.
TallJimbo Sep 1, 2023
a789093
Support TaskNode and deprecate TaskDef in LogCapture.
TallJimbo Sep 1, 2023
42318dd
Support TaskNode and deprecate TaskDef in executors.
TallJimbo Sep 1, 2023
ec74e39
Replace util.filterTasks with util.filterTaskNodes.
TallJimbo Sep 1, 2023
38f6a44
Avoid toExpandedPipeline, Pipeline.__iter__ in ShowInfo and tests.
TallJimbo Sep 2, 2023
45a5595
Silence deprecation warning in dotTools.
TallJimbo Sep 2, 2023
10e92fd
Handle missing dataset types for prerequisite inputs more carefully.
TallJimbo Sep 5, 2023
6d58cc1
Add 'deprecated' package type stub dependency.
TallJimbo Mar 28, 2024
3f35ac0
Bump deprecation->removal version from v26 to v27.
TallJimbo Apr 8, 2024
fab8c00
Modernize requirements.txt.
TallJimbo Apr 8, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/lsst/ctrl/mpexec/cli/script/qgraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import logging
from types import SimpleNamespace

from lsst.pipe.base.graphBuilder import DatasetQueryConstraintVariant
from lsst.pipe.base.all_dimensions_quantum_graph_builder import DatasetQueryConstraintVariant

from ... import CmdLineFwk

Expand Down
70 changes: 37 additions & 33 deletions python/lsst/ctrl/mpexec/cmdLineFwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import getpass
import logging
import shutil
from collections.abc import Iterable, Mapping, Sequence
from collections.abc import Mapping, Sequence
from types import SimpleNamespace

import astropy.units as u
Expand All @@ -50,7 +50,6 @@
CollectionType,
Config,
DatasetId,
DatasetRef,
DatasetType,
DimensionUniverse,
LimitedButler,
Expand All @@ -65,16 +64,17 @@
from lsst.daf.butler.registry.wildcards import CollectionWildcard
from lsst.pipe.base import (
ExecutionResources,
GraphBuilder,
Instrument,
Pipeline,
PipelineDatasetTypes,
PipelineGraph,
QuantumGraph,
TaskDef,
TaskFactory,
buildExecutionButler,
)
from lsst.pipe.base.all_dimensions_quantum_graph_builder import AllDimensionsQuantumGraphBuilder
from lsst.pipe.base.pipeline_graph import NodeType
from lsst.utils import doImportType
from lsst.utils.logging import getLogger
from lsst.utils.threads import disable_implicit_threading

from .dotTools import graph2dot, pipeline2dot
Expand All @@ -87,7 +87,7 @@
# Local non-exported definitions --
# ----------------------------------

_LOG = logging.getLogger(__name__)
_LOG = getLogger(__name__)


class _OutputChainedCollectionInfo:
Expand Down Expand Up @@ -415,7 +415,7 @@ def defineDatastoreCache() -> None:
_LOG.debug("Defining shared datastore cache directory to %s", cache_dir)

@classmethod
def makeWriteButler(cls, args: SimpleNamespace, taskDefs: Iterable[TaskDef] | None = None) -> Butler:
def makeWriteButler(cls, args: SimpleNamespace, pipeline_graph: PipelineGraph | None = None) -> Butler:
"""Return a read-write butler initialized to write to and read from
the collections specified by the given command-line arguments.

Expand All @@ -424,7 +424,7 @@ def makeWriteButler(cls, args: SimpleNamespace, taskDefs: Iterable[TaskDef] | No
args : `types.SimpleNamespace`
Parsed command-line arguments. See class documentation for the
construction parameter of the same name.
taskDefs : iterable of `TaskDef`, optional
pipeline_graph : `lsst.pipe.base.PipelineGraph`, optional
Definitions for tasks in a pipeline. This argument is only needed
if ``args.replace_run`` is `True` and ``args.prune_replaced`` is
"unstore".
Expand All @@ -446,12 +446,17 @@ def makeWriteButler(cls, args: SimpleNamespace, taskDefs: Iterable[TaskDef] | No
if args.prune_replaced == "unstore":
# Remove datasets from datastore
with butler.transaction():
refs: Iterable[DatasetRef] = butler.registry.queryDatasets(..., collections=replaced)
# we want to remove regular outputs but keep
# initOutputs, configs, and versions.
if taskDefs is not None:
initDatasetNames = set(PipelineDatasetTypes.initOutputNames(taskDefs))
refs = [ref for ref in refs if ref.datasetType.name not in initDatasetNames]
# we want to remove regular outputs from this pipeline,
# but keep initOutputs, configs, and versions.
if pipeline_graph is not None:
refs = [
ref
for ref in butler.registry.queryDatasets(..., collections=replaced)
if (
(producer := pipeline_graph.producer_of(ref.datasetType.name)) is not None
and producer.key.node_type is NodeType.TASK # i.e. not TASK_INIT
)
]
butler.pruneDatasets(refs, unstore=True, disassociate=False)
elif args.prune_replaced == "purge":
# Erase entire collection and all datasets, need to remove
Expand Down Expand Up @@ -618,21 +623,25 @@ def makeGraph(self, pipeline: Pipeline, args: SimpleNamespace) -> QuantumGraph |
if args.show_qgraph_header:
print(QuantumGraph.readHeader(args.qgraph))
else:
task_defs = list(pipeline.toExpandedPipeline())
pipeline_graph = pipeline.to_graph()
if args.mock:
from lsst.pipe.base.tests.mocks import mock_task_defs
from lsst.pipe.base.tests.mocks import mock_pipeline_graph

task_defs = mock_task_defs(
task_defs,
pipeline_graph = mock_pipeline_graph(
pipeline_graph,
unmocked_dataset_types=args.unmocked_dataset_types,
force_failures=args.mock_failure,
)
# make execution plan (a.k.a. DAG) for pipeline
graphBuilder = GraphBuilder(
butler.registry,
skipExistingIn=args.skip_existing_in,
clobberOutputs=args.clobber_outputs,
datastore=butler._datastore if args.qgraph_datastore_records else None,
graph_builder = AllDimensionsQuantumGraphBuilder(
pipeline_graph,
butler,
where=args.data_query,
skip_existing_in=args.skip_existing_in if args.skip_existing_in is not None else (),
clobber=args.clobber_outputs,
dataset_query_constraint=args.dataset_query_constraint,
input_collections=collections,
output_run=run,
)
# accumulate metadata
metadata = {
Expand All @@ -648,15 +657,7 @@ def makeGraph(self, pipeline: Pipeline, args: SimpleNamespace) -> QuantumGraph |
"time": f"{datetime.datetime.now()}",
}
assert run is not None, "Butler output run collection must be defined"
qgraph = graphBuilder.makeGraph(
task_defs,
collections,
run,
args.data_query,
metadata=metadata,
datasetQueryConstraint=args.dataset_query_constraint,
dataId=pipeline.get_data_id(butler.dimensions),
)
qgraph = graph_builder.build(metadata, attach_datastore_records=args.qgraph_datastore_records)
if args.show_qgraph_header:
qgraph.buildAndPrintHeader()

Expand All @@ -666,6 +667,7 @@ def makeGraph(self, pipeline: Pipeline, args: SimpleNamespace) -> QuantumGraph |
self._summarize_qgraph(qgraph)

if args.save_qgraph:
_LOG.verbose("Writing QuantumGraph to %r.", args.save_qgraph)
qgraph.saveUri(args.save_qgraph)

if args.save_single_quanta:
Expand All @@ -675,9 +677,11 @@ def makeGraph(self, pipeline: Pipeline, args: SimpleNamespace) -> QuantumGraph |
sqgraph.saveUri(uri)

if args.qgraph_dot:
_LOG.verbose("Writing quantum graph DOT visualization to %r.", args.qgraph_dot)
graph2dot(qgraph, args.qgraph_dot)

if args.execution_butler_location:
_LOG.verbose("Writing execution butler to %r.", args.execution_butler_location)
butler = Butler.from_config(args.butler_config)
assert isinstance(butler, DirectButler), "Execution butler needs DirectButler"
newArgs = copy.deepcopy(args)
Expand Down Expand Up @@ -775,7 +779,7 @@ def runPipeline(
# Make butler instance. QuantumGraph should have an output run defined,
# but we ignore it here and let command line decide actual output run.
if butler is None:
butler = _ButlerFactory.makeWriteButler(args, graph.iterTaskGraph())
butler = _ButlerFactory.makeWriteButler(args, graph.pipeline_graph)

if args.skip_existing:
args.skip_existing_in += (butler.run,)
Expand Down
7 changes: 6 additions & 1 deletion python/lsst/ctrl/mpexec/dotTools.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import html
import io
import re
import warnings
from collections.abc import Iterable
from typing import TYPE_CHECKING, Any

Expand Down Expand Up @@ -280,7 +281,11 @@ def expand_dimensions(connection: connectionTypes.BaseConnection) -> list[str]:

allDatasets: set[str | tuple[str, str]] = set()
if isinstance(pipeline, Pipeline):
pipeline = pipeline.toExpandedPipeline()
# TODO: DM-40639 will rewrite this code and finish off the deprecation
# of toExpandedPipeline.
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=FutureWarning)
pipeline = pipeline.toExpandedPipeline()

# The next two lines are a workaround until DM-29658 at which time metadata
# connections should start working with the above code
Expand Down
39 changes: 30 additions & 9 deletions python/lsst/ctrl/mpexec/log_capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,16 @@
import os
import shutil
import tempfile
import warnings
from collections.abc import Iterator
from contextlib import contextmanager, suppress
from logging import FileHandler

from lsst.daf.butler import Butler, FileDataset, LimitedButler, Quantum
from lsst.daf.butler.logging import ButlerLogRecordHandler, ButlerLogRecords, ButlerMDC, JsonLogFormatter
from lsst.pipe.base import InvalidQuantumError, TaskDef
from lsst.pipe.base.pipeline_graph import TaskNode
from lsst.utils.introspection import find_outside_stacklevel

_LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -85,13 +88,17 @@ def from_full(cls, butler: Butler) -> LogCapture:
return cls(butler, butler)

@contextmanager
def capture_logging(self, taskDef: TaskDef, quantum: Quantum) -> Iterator[_LogCaptureFlag]:
def capture_logging(
self, task_node: TaskDef | TaskNode, /, quantum: Quantum
) -> Iterator[_LogCaptureFlag]:
"""Configure logging system to capture logs for execution of this task.

Parameters
----------
taskDef : `lsst.pipe.base.TaskDef`
The task definition.
task_node : `lsst.pipe.base.TaskDef` or \
`~lsst.pipe.base.pipeline_graph.TaskNode`
The task definition. Support for `~lsst.pipe.base.TaskDef` is
deprecated and will be removed after v27.
quantum : `~lsst.daf.butler.Quantum`
Single Quantum instance.

Expand All @@ -103,23 +110,37 @@ def capture_logging(self, taskDef: TaskDef, quantum: Quantum) -> Iterator[_LogCa

.. code-block:: py

with self.capture_logging(taskDef, quantum):
with self.capture_logging(task_node, quantum):
# Run quantum and capture logs.

Ths method can also setup logging to attach task- or
quantum-specific information to log messages. Potentially this can
take into account some info from task configuration as well.
"""
# include quantum dataId and task label into MDC
mdc = {"LABEL": taskDef.label, "RUN": ""}
mdc = {"LABEL": task_node.label, "RUN": ""}
if quantum.dataId:
mdc["LABEL"] += f":{quantum.dataId}"
if self.full_butler is not None:
mdc["RUN"] = self.full_butler.run or ""
ctx = _LogCaptureFlag()

if isinstance(task_node, TaskDef):
# TODO: remove this block and associated docs and annotations on
# DM-40443.
log_dataset_name = task_node.logOutputDatasetName
warnings.warn(
"Passing TaskDef instances to LogCapture is deprecated and will not be supported after v27.",
FutureWarning,
find_outside_stacklevel("lsst.ctrl.mpexec"),
)
else:
log_dataset_name = (
task_node.log_output.dataset_type_name if task_node.log_output is not None else None
)

# Add a handler to the root logger to capture execution log output.
if taskDef.logOutputDatasetName is not None:
if log_dataset_name is not None:
# Either accumulate into ButlerLogRecords or stream JSON records to
# file and ingest that (ingest is possible only with full butler).
if self.stream_json_logs and self.full_butler is not None:
Expand All @@ -132,7 +153,7 @@ def capture_logging(self, taskDef: TaskDef, quantum: Quantum) -> Iterator[_LogCa
tmpdir = tempfile.mkdtemp(prefix="butler-temp-logs-")

# Construct a file to receive the log records and "touch" it.
log_file = os.path.join(tmpdir, f"butler-log-{taskDef.label}.json")
log_file = os.path.join(tmpdir, f"butler-log-{task_node.label}.json")
with open(log_file, "w"):
pass
log_handler_file = FileHandler(log_file)
Expand All @@ -147,7 +168,7 @@ def capture_logging(self, taskDef: TaskDef, quantum: Quantum) -> Iterator[_LogCa
logging.getLogger().removeHandler(log_handler_file)
log_handler_file.close()
if ctx.store:
self._ingest_log_records(quantum, taskDef.logOutputDatasetName, log_file)
self._ingest_log_records(quantum, log_dataset_name, log_file)
shutil.rmtree(tmpdir, ignore_errors=True)

else:
Expand All @@ -161,7 +182,7 @@ def capture_logging(self, taskDef: TaskDef, quantum: Quantum) -> Iterator[_LogCa
# Ensure that the logs are stored in butler.
logging.getLogger().removeHandler(log_handler_memory)
if ctx.store:
self._store_log_records(quantum, taskDef.logOutputDatasetName, log_handler_memory)
self._store_log_records(quantum, log_dataset_name, log_handler_memory)
log_handler_memory.records.clear()

else:
Expand Down
Loading
Loading