From 2bd83e4d5ae005883318217a66c93daf88a9d7e3 Mon Sep 17 00:00:00 2001 From: Austin Macdonald Date: Mon, 11 Nov 2024 15:20:50 -0600 Subject: [PATCH] enh: Add con-duct ls Fixes: https://github.com/con/duct/issues/185 --- README.md | 10 ++- setup.cfg | 2 + src/con_duct/__main__.py | 7 +- src/con_duct/suite/ls.py | 173 +++++++++++++++++++++++++++++++++++++ src/con_duct/suite/main.py | 44 ++++++++++ test/test_ls.py | 73 ++++++++++++++++ test/test_suite.py | 141 ++++++++++++++++++++++++++++++ tox.ini | 1 + 8 files changed, 444 insertions(+), 7 deletions(-) create mode 100644 src/con_duct/suite/ls.py create mode 100644 test/test_ls.py diff --git a/README.md b/README.md index c3fede9..8fd2b0c 100644 --- a/README.md +++ b/README.md @@ -139,12 +139,14 @@ usage: con-duct [options] A suite of commands to manage or manipulate con-duct logs. positional arguments: - {pp,plot} Available subcommands - pp Pretty print a JSON log. - plot Plot resource usage for an execution. + {pp,plot,ls} Available subcommands + pp Pretty print a JSON log. + plot Plot resource usage for an execution. + ls Print execution information for all runs matching + DUCT_OUTPUT_PREFIX. options: - -h, --help show this help message and exit + -h, --help show this help message and exit ``` diff --git a/setup.cfg b/setup.cfg index 318ddd8..520ff91 100644 --- a/setup.cfg +++ b/setup.cfg @@ -58,6 +58,8 @@ where = src [options.extras_require] all = matplotlib + PyYAML + pyout [options.entry_points] diff --git a/src/con_duct/__main__.py b/src/con_duct/__main__.py index 402aa76..8a6ce3d 100755 --- a/src/con_duct/__main__.py +++ b/src/con_duct/__main__.py @@ -28,6 +28,9 @@ lgr = logging.getLogger("con-duct") DEFAULT_LOG_LEVEL = os.environ.get("DUCT_LOG_LEVEL", "INFO").upper() +DUCT_OUTPUT_PREFIX = os.getenv( + "DUCT_OUTPUT_PREFIX", ".duct/logs/{datetime_filesafe}-{pid}_" +) ENV_PREFIXES = ("PBS_", "SLURM_", "OSG") SUFFIXES = { "stdout": "stdout", @@ -712,9 +715,7 @@ def from_argv( "-p", "--output-prefix", type=str, - default=os.getenv( - "DUCT_OUTPUT_PREFIX", ".duct/logs/{datetime_filesafe}-{pid}_" - ), + default=DUCT_OUTPUT_PREFIX, help="File string format to be used as a prefix for the files -- the captured " "stdout and stderr and the resource usage logs. The understood variables are " "{datetime}, {datetime_filesafe}, and {pid}. " diff --git a/src/con_duct/suite/ls.py b/src/con_duct/suite/ls.py new file mode 100644 index 0000000..f3c9e03 --- /dev/null +++ b/src/con_duct/suite/ls.py @@ -0,0 +1,173 @@ +import argparse +from collections import OrderedDict +import json +import logging +from typing import Any, Dict, List, Optional +from packaging.version import Version + +try: + import pyout # type: ignore +except ImportError: + pyout = None +import yaml +from con_duct.__main__ import SummaryFormatter + +lgr = logging.getLogger(__name__) + +VALUE_TRANSFORMATION_MAP: Dict[str, str] = { + "exit_code": "{value!E}", + "wall_clock_time": "{value:.3f} sec", + "peak_rss": "{value!S}", + "memory_total": "{value!S}", + "average_rss": "{value!S}", + "peak_vsz": "{value!S}", + "average_vsz": "{value!S}", + "peak_pmem": "{value:.2f!N}%", + "average_pmem": "{value:.2f!N}%", + "peak_pcpu": "{value:.2f!N}%", + "average_pcpu": "{value:.2f!N}%", + "start_time": "{value:.2f!N}", + "end_time": "{value:.2f!N}", +} + +NON_TRANSFORMED_FIELDS: List[str] = [ + "hostname", + "uid", + "user", + "gpu", + "duct_version", + "schema_version", + "command", + "prefix", + "num_samples", + "num_reports", + "stderr", + "usage", + "info", + "prefix", +] + +LS_FIELD_CHOICES: List[str] = ( + list(VALUE_TRANSFORMATION_MAP.keys()) + NON_TRANSFORMED_FIELDS +) +MINIMUM_SCHEMA_VERSION: str = "0.2.0" + + +def load_duct_runs(info_files: List[str]) -> List[Dict[str, Any]]: + loaded: List[Dict[str, Any]] = [] + for info_file in info_files: + with open(info_file) as file: + try: + this: Dict[str, Any] = json.load(file) + # this["prefix"] is the path at execution time, could have moved + this["prefix"] = info_file.split("info.json")[0] + if Version(this["schema_version"]) >= Version(MINIMUM_SCHEMA_VERSION): + loaded.append(this) + else: + # TODO lower log level once --log-level is respected + lgr.warning( + f"Skipping {this['prefix']}, schema version {this['schema_version']} " + f"is below minimum schema version {MINIMUM_SCHEMA_VERSION}." + ) + except Exception as exc: + lgr.warning("Failed to load file %s: %s", file, exc) + return loaded + + +def process_run_data( + run_data_list: List[Dict[str, Any]], fields: List[str], formatter: SummaryFormatter +) -> List[OrderedDict[str, Any]]: + output_rows: List[OrderedDict[str, Any]] = [] + for row in run_data_list: + flattened = _flatten_dict(row) + try: + restricted = _restrict_row(fields, flattened) + except KeyError: + lgr.warning( + "Failed to pick fields of interest from a record, skipping. Record was: %s", + list(flattened), + ) + continue + formatted = _format_row(restricted, formatter) + output_rows.append(formatted) + return output_rows + + +def _flatten_dict(d: Dict[str, Any]) -> Dict[str, Any]: + items: List[tuple[str, Any]] = [] + for k, v in d.items(): + if isinstance(v, dict): + items.extend(_flatten_dict(v).items()) + else: + items.append((k, v)) + return dict(items) + + +def _restrict_row(field_list: List[str], row: Dict[str, Any]) -> OrderedDict[str, Any]: + restricted: OrderedDict[str, Any] = OrderedDict() + # prefix is the "primary key", its the only field guaranteed to be unique. + restricted["prefix"] = row["prefix"] + for field in field_list: + if field != "prefix" and field in row: + restricted[field.split(".")[-1]] = row[field] + return restricted + + +def _format_row( + row: OrderedDict[str, Any], formatter: SummaryFormatter +) -> OrderedDict[str, Any]: + transformed: OrderedDict[str, Any] = OrderedDict() + for col, value in row.items(): + transformation: Optional[str] = VALUE_TRANSFORMATION_MAP.get(col) + if transformation is not None: + value = formatter.format(transformation, value=value) + transformed[col] = value + return transformed + + +def pyout_ls(run_data_list: List[OrderedDict[str, Any]]) -> None: + """Generate and print a tabular table using pyout.""" + if pyout is None: + raise RuntimeError("pyout is required for this output format.") + + with pyout.Tabular( + style=dict( + header_=dict(bold=True, transform=str.upper), + ), + mode="final", + ) as table: + for row in run_data_list: + table(row) + + +def ls(args: argparse.Namespace) -> int: + info_files = [path for path in args.paths if path.endswith("info.json")] + run_data_raw = load_duct_runs(info_files) + formatter = SummaryFormatter(enable_colors=args.colors) + output_rows = process_run_data(run_data_raw, args.fields, formatter) + + if args.format == "auto": + args.format = "summaries" if pyout is None else "pyout" + + if args.format == "summaries": + for row in output_rows: + for col, value in row.items(): + if not col == "prefix": + col = f"\t{col}" + print(f"{col.replace('_', ' ').title()}: {value}") + elif args.format == "pyout": + if pyout is None: + raise RuntimeError("Install pyout for pyout output") + pyout_ls(output_rows) + elif args.format == "json": + print(json.dumps(output_rows)) + elif args.format == "json_pp": + print(json.dumps(output_rows, indent=2)) + elif args.format == "yaml": + plain_rows = [dict(row) for row in output_rows] + print(yaml.dump(plain_rows, default_flow_style=False)) + else: + raise RuntimeError( + f"Unexpected format encountered: {args.format}. This should have been caught by argparse.", + ) + return 0 diff --git a/src/con_duct/suite/main.py b/src/con_duct/suite/main.py index f5baf9b..4906513 100644 --- a/src/con_duct/suite/main.py +++ b/src/con_duct/suite/main.py @@ -1,6 +1,9 @@ import argparse +import os import sys from typing import List, Optional +from con_duct.__main__ import DUCT_OUTPUT_PREFIX +from con_duct.suite.ls import LS_FIELD_CHOICES, ls from con_duct.suite.plot import matplotlib_plot from con_duct.suite.pprint_json import pprint_json @@ -46,6 +49,47 @@ def main(argv: Optional[List[str]] = None) -> None: # ) parser_plot.set_defaults(func=matplotlib_plot) + parser_ls = subparsers.add_parser( + "ls", + help="Print execution information for all runs matching DUCT_OUTPUT_PREFIX.", + ) + parser_ls.add_argument( + "-f", + "--format", + choices=("auto", "pyout", "summaries", "json", "json_pp", "yaml"), + default="auto", # TODO dry + help="Output format. TODO Fixme. 'auto' chooses 'pyout' if pyout library is installed," + " 'summaries' otherwise.", + ) + parser_ls.add_argument( + "-F", + "--fields", + nargs="+", + metavar="FIELD", + help=f"List of fields to show. Prefix is always included implicitly as the first field. " + f"Available choices: {', '.join(LS_FIELD_CHOICES)}.", + choices=LS_FIELD_CHOICES, + default=[ + "command", + "exit_code", + "wall_clock_time", + "peak_rss", + ], + ) + parser_ls.add_argument( + "--colors", + action="store_true", + default=os.getenv("DUCT_COLORS", False), + help="Use colors in duct output.", + ) + parser_ls.add_argument( + "paths", + nargs="*", + default=[f"{DUCT_OUTPUT_PREFIX[:DUCT_OUTPUT_PREFIX.index('{')]}*"], + help="Path to duct report files, only `info.json` would be considered.", + ) + parser_ls.set_defaults(func=ls) + args = parser.parse_args(argv) if args.command is None: diff --git a/test/test_ls.py b/test/test_ls.py new file mode 100644 index 0000000..ca12493 --- /dev/null +++ b/test/test_ls.py @@ -0,0 +1,73 @@ +import json +from unittest.mock import mock_open, patch +from con_duct.__main__ import SummaryFormatter +from con_duct.suite.ls import ( + _flatten_dict, + _restrict_row, + load_duct_runs, + process_run_data, +) + + +def test_load_duct_runs_sanity() -> None: + mock_json = json.dumps( + {"schema_version": "0.2.1", "prefix": "/test/path_", "command": "echo hello"} + ) + with patch("builtins.open", mock_open(read_data=mock_json)): + result = load_duct_runs(["/test/path_info.json"]) + assert len(result) == 1 + assert result[0]["prefix"] == "/test/path_" + + +def test_load_duct_runs_skips_unsupported_schema() -> None: + mock_json = json.dumps( + {"schema_version": "0.1.1", "prefix": "/test/path_", "command": "echo hello"} + ) + with patch("builtins.open", mock_open(read_data=mock_json)): + result = load_duct_runs(["/test/path_info.json"]) + assert len(result) == 0 + + +def test_load_duct_runs_uses_filenames_not_stored_prefix() -> None: + mock_json = json.dumps( + { + "schema_version": "0.2.1", + "prefix": "/test/not_anymore_", + "command": "echo hello", + } + ) + with patch("builtins.open", mock_open(read_data=mock_json)): + result = load_duct_runs(["/actual_filepath_info.json"]) + assert len(result) == 1 + assert result[0]["prefix"] == "/actual_filepath_" + + +def test_flatten_dict() -> None: + nested = {"a": {"b": 1, "c": 2}, "d": 3} + result = _flatten_dict(nested) + assert result == {"b": 1, "c": 2, "d": 3} + + +def test_restrict_row() -> None: + row = {"prefix": "/test/path", "exit_code": 0, "extra": "ignore"} + fields = ["exit_code"] + result = _restrict_row(fields, row) + assert "prefix" in result + assert "exit_code" in result + assert "extra" not in result + + +def test_process_run_data() -> None: + run_data = [ + { + "prefix": "/test/path", + "exit_code": 0, + "wall_clock_time": 0.12345678, + } + ] + formatter = SummaryFormatter(enable_colors=False) + result = process_run_data(run_data, ["wall_clock_time"], formatter) + assert isinstance(result, list) + assert result[0]["prefix"] == "/test/path" + assert "exit_code" not in result[0] + assert result[0]["wall_clock_time"] == "0.123 sec" diff --git a/test/test_suite.py b/test/test_suite.py index 719b505..673a977 100644 --- a/test/test_suite.py +++ b/test/test_suite.py @@ -1,9 +1,16 @@ import argparse +import contextlib +from io import StringIO +import json +import os +import tempfile from typing import Any import unittest from unittest.mock import MagicMock, mock_open, patch import pytest +import yaml from con_duct.suite import main, plot, pprint_json +from con_duct.suite.ls import MINIMUM_SCHEMA_VERSION, ls class TestSuiteHelpers(unittest.TestCase): @@ -136,3 +143,137 @@ def test_matplotlib_plot_invalid_json( ) assert main.execute(args) == 1 mock_plot_save.assert_not_called() + + +class TestLS(unittest.TestCase): + def setUp(self) -> None: + """Create a temporary directory and test files.""" + self.temp_dir = tempfile.TemporaryDirectory() + self.files = { + "file1_info.json": { + "schema_version": MINIMUM_SCHEMA_VERSION, + "prefix": "test1", + }, + "file2_info.json": { + "schema_version": MINIMUM_SCHEMA_VERSION, + "prefix": "test2", + }, + "file3_info.json": {"schema_version": "0.1.0", "prefix": "old_version"}, + "not_matching.json": { + "schema_version": MINIMUM_SCHEMA_VERSION, + "prefix": "no_match", + }, + } + for filename, content in self.files.items(): + with open(os.path.join(self.temp_dir.name, filename), "w") as f: + json.dump(content, f) + + def tearDown(self) -> None: + """Clean up the temporary directory.""" + self.temp_dir.cleanup() + + def _run_ls(self, paths: list[str], fmt: str) -> str: + """Helper function to run ls() and capture stdout.""" + args = argparse.Namespace( + paths=[os.path.join(self.temp_dir.name, path) for path in paths], + colors=False, + fields=["prefix", "schema_version"], + format=fmt, + func=ls, + ) + buf = StringIO() + with contextlib.redirect_stdout(buf): + exit_code = ls(args) + assert exit_code == 0 + return buf.getvalue().strip() + + def test_ls_sanity(self) -> None: + """Basic sanity test to ensure ls() runs without crashing.""" + just_file1 = ["file1_info.json"] + result = self._run_ls(just_file1, "summaries") + + assert "Prefix:" in result + prefixes = [ + line.split(":", 1)[1].strip() + for line in result.splitlines() + if line.startswith("Prefix:") + ] + assert len(prefixes) == 1 + assert any("file1" in p for p in prefixes) + + def test_ls_multiple_paths(self) -> None: + """Basic sanity test to ensure ls() runs without crashing.""" + files_1_and_2 = ["file1_info.json", "file2_info.json"] + result = self._run_ls(files_1_and_2, "summaries") + + assert "Prefix:" in result + prefixes = [ + line.split(":", 1)[1].strip() + for line in result.splitlines() + if line.startswith("Prefix:") + ] + assert len(prefixes) == 2 + assert any("file1" in p for p in prefixes) + assert any("file2" in p for p in prefixes) + + def test_ls_ignore_old_schema(self) -> None: + """Basic sanity test to ensure ls() runs without crashing.""" + files_1_2_3 = ["file1_info.json", "file2_info.json", "file3_info.json"] + result = self._run_ls(files_1_2_3, "summaries") + + assert "Prefix:" in result + prefixes = [ + line.split(":", 1)[1].strip() + for line in result.splitlines() + if line.startswith("Prefix:") + ] + assert len(prefixes) == 2 + assert any("file1" in p for p in prefixes) + assert any("file2" in p for p in prefixes) + # file3 does not meet minimum schema version + assert "file3" not in result + + def test_ls_ignore_non_infojson(self) -> None: + """Basic sanity test to ensure ls() runs without crashing.""" + all_files = ["file1_info.json", "file2_info.json", "not_matching.json"] + result = self._run_ls(all_files, "summaries") + + assert "Prefix:" in result + prefixes = [ + line.split(":", 1)[1].strip() + for line in result.splitlines() + if line.startswith("Prefix:") + ] + assert len(prefixes) == 2 + assert any("file1" in p for p in prefixes) + assert any("file2" in p for p in prefixes) + # does not end in info.json + assert "not_matching.json" not in result + + def test_ls_json_output(self) -> None: + """Test JSON output format.""" + result = self._run_ls(["file1_info.json"], "json") + parsed = json.loads(result) + assert len(parsed) == 1 + assert "prefix" in parsed[0] + + def test_ls_json_pp_output(self) -> None: + """Test pretty-printed JSON output format.""" + result = self._run_ls(["file1_info.json"], "json_pp") + parsed = json.loads(result) + assert len(parsed) == 1 + assert "prefix" in parsed[0] + + def test_ls_yaml_output(self) -> None: + """Test YAML output format.""" + result = self._run_ls(["file1_info.json"], "yaml") + parsed = yaml.safe_load(result) + assert len(parsed) == 1 + assert "prefix" in parsed[0] + + def test_ls_pyout_output(self) -> None: + """Test YAML output format.""" + result = self._run_ls(["file1_info.json"], "pyout") + # pyout header + assert "PREFIX" in result + assert os.path.join(self.temp_dir.name, "file1_") in result diff --git a/tox.ini b/tox.ini index 4868b96..4e58001 100644 --- a/tox.ini +++ b/tox.ini @@ -26,6 +26,7 @@ commands = deps = mypy data-science-types # TODO replace archived, https://github.com/wearepal/data-science-types + types-PyYAML {[testenv]deps} commands = mypy src test