From 74b2515eb93d46e7f6e0d6ce6ae0e9dc694dbb40 Mon Sep 17 00:00:00 2001 From: Jim Bosch Date: Fri, 24 Jan 2025 11:18:29 -0500 Subject: [PATCH 1/4] Update to reflect QPG summary change. QuantumInfo no longer has 'caveats' key; instead you should get its final run to look at the ceveats on that. --- tests/test_simple_pipeline_executor.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/tests/test_simple_pipeline_executor.py b/tests/test_simple_pipeline_executor.py index 9c6b5748..dabab14e 100644 --- a/tests/test_simple_pipeline_executor.py +++ b/tests/test_simple_pipeline_executor.py @@ -261,15 +261,17 @@ def test_partial_outputs_success(self): prov.assemble_quantum_provenance_graph(self.butler, [executor.quantum_graph]) (quantum_key_a,) = prov.quanta["a"] quantum_info_a = prov.get_quantum_info(quantum_key_a) + _, quantum_run_a = qpg.QuantumRun.find_final(quantum_info_a) self.assertEqual( - quantum_info_a["caveats"], + quantum_run_a.caveats, QuantumSuccessCaveats.ALL_OUTPUTS_MISSING | QuantumSuccessCaveats.ANY_OUTPUTS_MISSING | QuantumSuccessCaveats.PARTIAL_OUTPUTS_ERROR, ) (quantum_key_b,) = prov.quanta["b"] quantum_info_b = prov.get_quantum_info(quantum_key_b) - self.assertEqual(quantum_info_b["caveats"], QuantumSuccessCaveats.NO_CAVEATS) + _, quantum_run_b = qpg.QuantumRun.find_final(quantum_info_b) + self.assertEqual(quantum_run_b.caveats, QuantumSuccessCaveats.NO_CAVEATS) def test_no_work_found(self): """Test executing two quanta where the first raises @@ -304,16 +306,18 @@ def test_no_work_found(self): prov.assemble_quantum_provenance_graph(self.butler, [executor.quantum_graph]) (quantum_key_a,) = prov.quanta["a"] quantum_info_a = prov.get_quantum_info(quantum_key_a) + _, quantum_run_a = qpg.QuantumRun.find_final(quantum_info_a) self.assertEqual( - quantum_info_a["caveats"], + quantum_run_a.caveats, QuantumSuccessCaveats.ALL_OUTPUTS_MISSING | QuantumSuccessCaveats.ANY_OUTPUTS_MISSING | QuantumSuccessCaveats.NO_WORK, ) (quantum_key_b,) = prov.quanta["b"] quantum_info_b = prov.get_quantum_info(quantum_key_b) + _, quantum_run_b = qpg.QuantumRun.find_final(quantum_info_b) self.assertEqual( - quantum_info_b["caveats"], + quantum_run_b.caveats, QuantumSuccessCaveats.ALL_OUTPUTS_MISSING | QuantumSuccessCaveats.ANY_OUTPUTS_MISSING | QuantumSuccessCaveats.ADJUST_QUANTUM_RAISED From 3473648687abd08814fa31a2522b106772571820 Mon Sep 17 00:00:00 2001 From: Jim Bosch Date: Fri, 24 Jan 2025 11:19:31 -0500 Subject: [PATCH 2/4] Rewrite pipetask report brief to delegate to pipe_base. The main quantum and dataset summary tables are unchanged, but the code is now in pipe_base. The JSON-like printouts of data ID information have been replaced with new tables. --- python/lsst/ctrl/mpexec/cli/script/report.py | 129 ++----------------- tests/test_cliCmdReport.py | 3 - 2 files changed, 14 insertions(+), 118 deletions(-) diff --git a/python/lsst/ctrl/mpexec/cli/script/report.py b/python/lsst/ctrl/mpexec/cli/script/report.py index 65f59773..0ae1b347 100644 --- a/python/lsst/ctrl/mpexec/cli/script/report.py +++ b/python/lsst/ctrl/mpexec/cli/script/report.py @@ -243,129 +243,28 @@ def print_summary(summary: Summary, full_output_filename: str | None, brief: boo brief : `bool` Only display short (counts-only) summary on stdout. This includes counts and not error messages or data_ids (similar to BPS report). - This option will still report all `cursed` datasets and `wonky` - quanta. """ - quanta_table = [] - failed_quanta_table = [] - wonky_quanta_table = [] - for label, task_summary in summary.tasks.items(): - if task_summary.n_wonky > 0: - print( - f"{label} has produced wonky quanta. Recommend processing cease until the issue is resolved." - ) - for quantum_summary in task_summary.wonky_quanta: - wonky_quanta_table.append( - { - "Task": label, - "Data ID": quantum_summary.data_id, - "Runs and Status": quantum_summary.runs, - "Messages": quantum_summary.messages, - } - ) - if len(task_summary.caveats) > 1: - caveats = "(multiple)" - elif len(task_summary.caveats) == 1: - ((code, data_ids),) = task_summary.caveats.items() - caveats = f"{code}({len(data_ids)})" - else: - caveats = "" - quanta_table.append( - { - "Task": label, - "Unknown": task_summary.n_unknown, - "Successful": task_summary.n_successful, - "Caveats": caveats, - "Blocked": task_summary.n_blocked, - "Failed": task_summary.n_failed, - "Wonky": task_summary.n_wonky, - "TOTAL": sum( - [ - task_summary.n_successful, - task_summary.n_unknown, - task_summary.n_blocked, - task_summary.n_failed, - task_summary.n_wonky, - ] - ), - "EXPECTED": task_summary.n_expected, - } - ) - if task_summary.failed_quanta: - for quantum_summary in task_summary.failed_quanta: - failed_quanta_table.append( - { - "Task": label, - "Data ID": quantum_summary.data_id, - "Runs and Status": quantum_summary.runs, - "Messages": quantum_summary.messages, - } - ) - quanta = Table(quanta_table) - quanta.pprint_all() + summary.make_quantum_table().pprint_all() print("") print("Caveat codes:") for k, v in QuantumSuccessCaveats.legend().items(): print(f"{k}: {v}") print("") - # Dataset loop - dataset_table = [] - cursed_datasets = [] - unsuccessful_datasets = {} - for dataset_type_name, dataset_type_summary in summary.datasets.items(): - dataset_table.append( - { - "Dataset": dataset_type_name, - "Visible": dataset_type_summary.n_visible, - "Shadowed": dataset_type_summary.n_shadowed, - "Predicted Only": dataset_type_summary.n_predicted_only, - "Unsuccessful": dataset_type_summary.n_unsuccessful, - "Cursed": dataset_type_summary.n_cursed, - "TOTAL": sum( - [ - dataset_type_summary.n_visible, - dataset_type_summary.n_shadowed, - dataset_type_summary.n_predicted_only, - dataset_type_summary.n_unsuccessful, - dataset_type_summary.n_cursed, - ] - ), - "EXPECTED": dataset_type_summary.n_expected, - } - ) - if dataset_type_summary.n_cursed > 0: - for cursed_dataset in dataset_type_summary.cursed_datasets: - print( - f"{dataset_type_name} has cursed quanta with message(s) {cursed_dataset.messages}. " - "Recommend processing cease until the issue is resolved." - ) - cursed_datasets.append( - { - "Dataset Type": dataset_type_name, - "Producer Data Id": cursed_dataset.producer_data_id, - } - ) - if dataset_type_summary.n_unsuccessful > 0: - unsuccessful_datasets[dataset_type_name] = dataset_type_summary.unsuccessful_datasets - datasets = Table(dataset_table) - datasets.pprint_all() - curse_table = Table(cursed_datasets) - # Display wonky quanta - if wonky_quanta_table: - print("Wonky Quanta") - pprint.pprint(wonky_quanta_table) - # Display cursed datasets - if cursed_datasets: - print("Cursed Datasets") - curse_table.pprint_all() + if exception_table := summary.make_exception_table(): + exception_table.pprint_all() + print("") + summary.make_dataset_table().pprint_all() + print("") if full_output_filename: with open(full_output_filename, "w") as stream: stream.write(summary.model_dump_json(indent=2)) else: if not brief: - if failed_quanta_table: - print("Failed Quanta") - pprint.pprint(failed_quanta_table) - if unsuccessful_datasets: - print("Unsuccessful Datasets") - pprint.pprint(unsuccessful_datasets) + for task_label, bad_quantum_table in summary.make_bad_quantum_tables().items(): + print(f"{task_label} failures:") + bad_quantum_table.pprint_all() + print("") + for dataset_type_name, bad_dataset_table in summary.make_bad_dataset_tables().items(): + print(f"{dataset_type_name} failures:") + bad_dataset_table.pprint_all() + print("") diff --git a/tests/test_cliCmdReport.py b/tests/test_cliCmdReport.py index a9902b02..c2d90d82 100644 --- a/tests/test_cliCmdReport.py +++ b/tests/test_cliCmdReport.py @@ -167,9 +167,6 @@ def test_report(self): self.assertIn("TOTAL", result_v2_terminal_out.stdout) self.assertIn("EXPECTED", result_v2_terminal_out.stdout) - # Check that title from the error summary appears - self.assertIn("Unsuccessful Datasets", result_v2_terminal_out.stdout) - # Test cli for the QPG brief option result_v2_brief = self.runner.invoke( pipetask_cli, From 03ef1df61ed2a11b81903e2dc9f622044b6e5be1 Mon Sep 17 00:00:00 2001 From: Jim Bosch Date: Mon, 3 Feb 2025 11:32:09 -0500 Subject: [PATCH 3/4] Add tests of execution status and exception reporting. --- tests/test_simple_pipeline_executor.py | 140 +++++++++++++++++++++++++ 1 file changed, 140 insertions(+) diff --git a/tests/test_simple_pipeline_executor.py b/tests/test_simple_pipeline_executor.py index dabab14e..b0d8eeb2 100644 --- a/tests/test_simple_pipeline_executor.py +++ b/tests/test_simple_pipeline_executor.py @@ -268,10 +268,68 @@ def test_partial_outputs_success(self): | QuantumSuccessCaveats.ANY_OUTPUTS_MISSING | QuantumSuccessCaveats.PARTIAL_OUTPUTS_ERROR, ) + self.assertEqual( + quantum_run_a.exception.type_name, + "lsst.pipe.base.tests.mocks.MockAlgorithmError", + ) + self.assertEqual( + quantum_run_a.exception.metadata, + {"badness": 12}, + ) (quantum_key_b,) = prov.quanta["b"] quantum_info_b = prov.get_quantum_info(quantum_key_b) _, quantum_run_b = qpg.QuantumRun.find_final(quantum_info_b) self.assertEqual(quantum_run_b.caveats, QuantumSuccessCaveats.NO_CAVEATS) + prov_summary = prov.to_summary(self.butler) + # One partial-outputs case, with an empty data ID: + self.assertEqual(prov_summary.tasks["a"].caveats, {"*P": [{}]}) + self.assertEqual( + prov_summary.tasks["a"].exceptions.keys(), {"lsst.pipe.base.tests.mocks.MockAlgorithmError"} + ) + self.assertEqual( + prov_summary.tasks["a"] + .exceptions["lsst.pipe.base.tests.mocks.MockAlgorithmError"][0] + .exception.metadata, + {"badness": 12}, + ) + # No caveats for the second task, since it didn't need the first task's + # output anyway. + self.assertEqual(prov_summary.tasks["b"].caveats, {}) + self.assertEqual(prov_summary.tasks["b"].exceptions, {}) + # Check table forms for summaries of the same information. + quantum_table = prov_summary.make_quantum_table() + self.assertEqual(list(quantum_table["Task"]), ["a", "b"]) + self.assertEqual(list(quantum_table["Unknown"]), [0, 0]) + self.assertEqual(list(quantum_table["Successful"]), [1, 1]) + self.assertEqual(list(quantum_table["Caveats"]), ["*P(1)", ""]) + self.assertEqual(list(quantum_table["Blocked"]), [0, 0]) + self.assertEqual(list(quantum_table["Failed"]), [0, 0]) + self.assertEqual(list(quantum_table["Wonky"]), [0, 0]) + self.assertEqual(list(quantum_table["TOTAL"]), [1, 1]) + self.assertEqual(list(quantum_table["EXPECTED"]), [1, 1]) + dataset_table = prov_summary.make_dataset_table() + self.assertEqual( + list(dataset_table["Dataset"]), + ["intermediate", "a_metadata", "a_log", "output", "b_metadata", "b_log"], + ) + self.assertEqual(list(dataset_table["Visible"]), [0, 1, 1, 1, 1, 1]) + self.assertEqual(list(dataset_table["Shadowed"]), [0, 0, 0, 0, 0, 0]) + self.assertEqual(list(dataset_table["Predicted Only"]), [1, 0, 0, 0, 0, 0]) + self.assertEqual(list(dataset_table["Unsuccessful"]), [0, 0, 0, 0, 0, 0]) + self.assertEqual(list(dataset_table["Cursed"]), [0, 0, 0, 0, 0, 0]) + self.assertEqual(list(dataset_table["TOTAL"]), [1, 1, 1, 1, 1, 1]) + self.assertEqual(list(dataset_table["EXPECTED"]), [1, 1, 1, 1, 1, 1]) + exception_table = prov_summary.make_exception_table() + self.assertEqual(list(exception_table["Task"]), ["a"]) + self.assertEqual( + list(exception_table["Exception"]), ["lsst.pipe.base.tests.mocks.MockAlgorithmError"] + ) + self.assertEqual(list(exception_table["Count"]), [1]) + bad_quantum_tables = prov_summary.make_bad_quantum_tables() + self.assertEqual(bad_quantum_tables.keys(), {"a"}) + self.assertEqual(list(bad_quantum_tables["a"]["Status(Caveats)"]), ["SUCCESSFUL(P)"]) + self.assertEqual(list(bad_quantum_tables["a"]["Exception"]), ["MockAlgorithmError"]) + self.assertFalse(prov_summary.make_bad_dataset_tables()) def test_no_work_found(self): """Test executing two quanta where the first raises @@ -323,6 +381,39 @@ def test_no_work_found(self): | QuantumSuccessCaveats.ADJUST_QUANTUM_RAISED | QuantumSuccessCaveats.NO_WORK, ) + prov_summary = prov.to_summary(self.butler) + # One NoWorkFound, raised by runQuantum, with an empty data ID: + self.assertEqual(prov_summary.tasks["a"].caveats, {"*N": [{}]}) + self.assertEqual(prov_summary.tasks["a"].exceptions, {}) + # One NoWorkFound, raised by adjustQuantum, with an empty data ID. + self.assertEqual(prov_summary.tasks["b"].caveats, {"*A": [{}]}) + self.assertEqual(prov_summary.tasks["b"].exceptions, {}) + # Check table forms for summaries of the same information. + quantum_table = prov_summary.make_quantum_table() + self.assertEqual(list(quantum_table["Task"]), ["a", "b"]) + self.assertEqual(list(quantum_table["Unknown"]), [0, 0]) + self.assertEqual(list(quantum_table["Successful"]), [1, 1]) + self.assertEqual(list(quantum_table["Caveats"]), ["*N(1)", "*A(1)"]) + self.assertEqual(list(quantum_table["Blocked"]), [0, 0]) + self.assertEqual(list(quantum_table["Failed"]), [0, 0]) + self.assertEqual(list(quantum_table["Wonky"]), [0, 0]) + self.assertEqual(list(quantum_table["TOTAL"]), [1, 1]) + self.assertEqual(list(quantum_table["EXPECTED"]), [1, 1]) + dataset_table = prov_summary.make_dataset_table() + self.assertEqual( + list(dataset_table["Dataset"]), + ["intermediate", "a_metadata", "a_log", "output", "b_metadata", "b_log"], + ) + self.assertEqual(list(dataset_table["Visible"]), [0, 1, 1, 0, 1, 1]) + self.assertEqual(list(dataset_table["Shadowed"]), [0, 0, 0, 0, 0, 0]) + self.assertEqual(list(dataset_table["Predicted Only"]), [1, 0, 0, 1, 0, 0]) + self.assertEqual(list(dataset_table["Unsuccessful"]), [0, 0, 0, 0, 0, 0]) + self.assertEqual(list(dataset_table["Cursed"]), [0, 0, 0, 0, 0, 0]) + self.assertEqual(list(dataset_table["TOTAL"]), [1, 1, 1, 1, 1, 1]) + self.assertEqual(list(dataset_table["EXPECTED"]), [1, 1, 1, 1, 1, 1]) + self.assertFalse(prov_summary.make_exception_table()) + self.assertFalse(prov_summary.make_bad_quantum_tables()) + self.assertFalse(prov_summary.make_bad_dataset_tables()) def test_partial_outputs_failure(self): """Test executing two quanta where the first raises @@ -361,6 +452,55 @@ def test_partial_outputs_failure(self): executor.run(register_dataset_types=True) self.assertFalse(self.butler.exists("intermediate")) self.assertFalse(self.butler.exists("output")) + prov = qpg.QuantumProvenanceGraph() + prov.assemble_quantum_provenance_graph(self.butler, [executor.quantum_graph]) + (quantum_key_a,) = prov.quanta["a"] + quantum_info_a = prov.get_quantum_info(quantum_key_a) + _, quantum_run_a = qpg.QuantumRun.find_final(quantum_info_a) + self.assertEqual(quantum_run_a.status, qpg.QuantumRunStatus.FAILED) + self.assertIsNone(quantum_run_a.caveats) + self.assertIsNone(quantum_run_a.exception) + (quantum_key_b,) = prov.quanta["b"] + quantum_info_b = prov.get_quantum_info(quantum_key_b) + self.assertEqual(quantum_info_b["status"], qpg.QuantumInfoStatus.BLOCKED) + prov_summary = prov.to_summary(self.butler) + # One partial-outputs failure case for the first task. + self.assertEqual(prov_summary.tasks["a"].n_failed, 1) + # No direct failures, but one blocked for the second + self.assertEqual(prov_summary.tasks["b"].n_failed, 0) + self.assertEqual(prov_summary.tasks["b"].n_blocked, 1) + # Check table forms for summaries of the same information. + quantum_table = prov_summary.make_quantum_table() + self.assertEqual(list(quantum_table["Task"]), ["a", "b"]) + self.assertEqual(list(quantum_table["Unknown"]), [0, 0]) + self.assertEqual(list(quantum_table["Successful"]), [0, 0]) + self.assertEqual(list(quantum_table["Caveats"]), ["", ""]) + self.assertEqual(list(quantum_table["Blocked"]), [0, 1]) + self.assertEqual(list(quantum_table["Failed"]), [1, 0]) + self.assertEqual(list(quantum_table["Wonky"]), [0, 0]) + self.assertEqual(list(quantum_table["TOTAL"]), [1, 1]) + self.assertEqual(list(quantum_table["EXPECTED"]), [1, 1]) + dataset_table = prov_summary.make_dataset_table() + self.assertEqual( + list(dataset_table["Dataset"]), + ["intermediate", "a_metadata", "a_log", "output", "b_metadata", "b_log"], + ) + # Note that a_log is UNSUCCESSFUL, not VISIBLE, despite being present + # in butler because those categories are mutually exclusive and we + # don't want to consider any outputs of failed quanta to be successful. + self.assertEqual(list(dataset_table["Visible"]), [0, 0, 0, 0, 0, 0]) + self.assertEqual(list(dataset_table["Shadowed"]), [0, 0, 0, 0, 0, 0]) + self.assertEqual(list(dataset_table["Predicted Only"]), [0, 0, 0, 0, 0, 0]) + self.assertEqual(list(dataset_table["Unsuccessful"]), [1, 1, 1, 1, 1, 1]) + self.assertEqual(list(dataset_table["Cursed"]), [0, 0, 0, 0, 0, 0]) + self.assertEqual(list(dataset_table["TOTAL"]), [1, 1, 1, 1, 1, 1]) + self.assertEqual(list(dataset_table["EXPECTED"]), [1, 1, 1, 1, 1, 1]) + self.assertFalse(prov_summary.make_exception_table()) + bad_quantum_tables = prov_summary.make_bad_quantum_tables() + self.assertEqual(bad_quantum_tables.keys(), {"a"}) + self.assertEqual(list(bad_quantum_tables["a"]["Status(Caveats)"]), ["FAILED"]) + self.assertEqual(list(bad_quantum_tables["a"]["Exception"]), [""]) + self.assertFalse(prov_summary.make_bad_dataset_tables()) def test_existence_check_skips(self): """Test that pre-execution existence checks are not performed for From 78a2d0d8ed3455ca21e3938a557862967f5ecd02 Mon Sep 17 00:00:00 2001 From: Jim Bosch Date: Thu, 6 Feb 2025 12:51:35 -0500 Subject: [PATCH 4/4] Move QPG summary printing to pipe_base. --- python/lsst/ctrl/mpexec/cli/script/report.py | 37 +++++--------------- 1 file changed, 9 insertions(+), 28 deletions(-) diff --git a/python/lsst/ctrl/mpexec/cli/script/report.py b/python/lsst/ctrl/mpexec/cli/script/report.py index 0ae1b347..733ddaef 100644 --- a/python/lsst/ctrl/mpexec/cli/script/report.py +++ b/python/lsst/ctrl/mpexec/cli/script/report.py @@ -30,7 +30,7 @@ from astropy.table import Table from lsst.daf.butler import Butler -from lsst.pipe.base import QuantumGraph, QuantumSuccessCaveats +from lsst.pipe.base import QuantumGraph from lsst.pipe.base.execution_reports import QuantumGraphExecutionReport from lsst.pipe.base.quantum_provenance_graph import QuantumProvenanceGraph, Summary @@ -235,36 +235,17 @@ def print_summary(summary: Summary, full_output_filename: str | None, brief: boo Parameters ---------- summary : `QuantumProvenanceGraph.Summary` - This `Pydantic` model contains all the information derived from the - `QuantumProvenanceGraph`. + This `Pydantic` model contains all the information derived from the + `QuantumProvenanceGraph`. full_output_filename : `str | None` - Name of the JSON file in which to store summary information, if - passed. + Name of the JSON file in which to store summary information, if + passed. brief : `bool` - Only display short (counts-only) summary on stdout. This includes - counts and not error messages or data_ids (similar to BPS report). + Only display short (counts-only) summary on stdout. This includes + counts and not error messages or data_ids (similar to BPS report). + Ignored (considered `False`) if ``full_output_filename`` is passed. """ - summary.make_quantum_table().pprint_all() - print("") - print("Caveat codes:") - for k, v in QuantumSuccessCaveats.legend().items(): - print(f"{k}: {v}") - print("") - if exception_table := summary.make_exception_table(): - exception_table.pprint_all() - print("") - summary.make_dataset_table().pprint_all() - print("") + summary.pprint(brief=(brief or bool(full_output_filename))) if full_output_filename: with open(full_output_filename, "w") as stream: stream.write(summary.model_dump_json(indent=2)) - else: - if not brief: - for task_label, bad_quantum_table in summary.make_bad_quantum_tables().items(): - print(f"{task_label} failures:") - bad_quantum_table.pprint_all() - print("") - for dataset_type_name, bad_dataset_table in summary.make_bad_dataset_tables().items(): - print(f"{dataset_type_name} failures:") - bad_dataset_table.pprint_all() - print("")