diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 5adfc7d6..35961fb0 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -25,8 +25,8 @@ jobs: fail-fast: true matrix: os: - # - macos-12 - # - macos-latest + - macos-12 + - macos-latest # - windows-latest - ubuntu-latest python-version: @@ -38,25 +38,16 @@ jobs: - 'pypy-3.8' - 'pypy-3.9' - 'pypy-3.10' - # exclude: - # # No older Pythons on arm64 macos-latest - # - python-version: '3.7' - # os: macos-latest - # - python-version: '3.8' - # os: macos-latest - # - python-version: '3.9' - # os: macos-latest - # - python-version: 'pypy-3.8' - # os: macos-latest - # - python-version: 'pypy-3.9' - # os: macos-latest - # include: - # - python-version: '3.7' - # toxenv: lint - # os: ubuntu-latest - # - python-version: '3.7' - # toxenv: typing - # os: ubuntu-latest + exclude: + # No older Pythons on arm64 macos-latest + - python-version: '3.8' + os: macos-latest + - python-version: '3.9' + os: macos-latest + - python-version: 'pypy-3.8' + os: macos-latest + - python-version: 'pypy-3.9' + os: macos-latest steps: - name: Check out repository uses: actions/checkout@v4 @@ -77,9 +68,6 @@ jobs: - name: Run tests run: tox -e py -- -vv --cov-report=xml - - name: Run smoke tests - run: ./smoke-tests.sh - - name: Upload coverage to Codecov uses: codecov/codecov-action@v4 with: diff --git a/README.md b/README.md index 0468bd03..1f80e1a2 100644 --- a/README.md +++ b/README.md @@ -9,28 +9,47 @@ A process wrapper script that monitors the execution of a command. ```shell > duct --help -usage: duct [-h] [--sample-interval SAMPLE_INTERVAL] - [--output_prefix OUTPUT_PREFIX] - [--report-interval REPORT_INTERVAL] +usage: duct [-h] [-p OUTPUT_PREFIX] [--sample-interval SAMPLE_INTERVAL] + [--report-interval REPORT_INTERVAL] [-c {all,none,stdout,stderr}] + [-o {all,none,stdout,stderr}] + [-t {all,system-summary,processes-samples}] command [arguments ...] -Duct creates a new session to run a command and all its child processes, and -the collects metrics for all processes in the session. +Gathers metrics on a command and all its child processes. positional arguments: command The command to execute. - arguments Arguments for the command. + arguments Arguments for the command. (default: None) options: -h, --help show this help message and exit - --sample-interval SAMPLE_INTERVAL + -p OUTPUT_PREFIX, --output-prefix OUTPUT_PREFIX + File string format to be used as a prefix for the + files -- the captured stdout and stderr and the + resource usage logs. The understood variables are + {datetime}, {datetime_filesafe}, and {pid}. Leading + directories will be created if they do not exist. You + can also provide value via DUCT_OUTPUT_PREFIX env + variable. (default: + .duct/logs/{datetime_filesafe}-{pid}_) + --sample-interval SAMPLE_INTERVAL, --s-i SAMPLE_INTERVAL Interval in seconds between status checks of the - running process. - --output_prefix OUTPUT_PREFIX - Directory in which all logs will be saved. - --report-interval REPORT_INTERVAL + running process. Sample interval should be larger than + the runtime of the process or `duct` may underreport + the number of processes started. (default: 1.0) + --report-interval REPORT_INTERVAL, --r-i REPORT_INTERVAL Interval in seconds at which to report aggregated - data. + data. (default: 60.0) + -c {all,none,stdout,stderr}, --capture-outputs {all,none,stdout,stderr} + Record stdout, stderr, all, or none to log files. You + can also provide value via DUCT_CAPTURE_OUTPUTS env + variable. (default: all) + -o {all,none,stdout,stderr}, --outputs {all,none,stdout,stderr} + Print stdout, stderr, all, or none to stdout/stderr + respectively. (default: all) + -t {all,system-summary,processes-samples}, --record-types {all,system-summary,processes-samples} + Record system-summary, processes-samples, or all + (default: all) ``` ## Testing: diff --git a/smoke-tests.sh b/smoke-tests.sh deleted file mode 100755 index e7c39861..00000000 --- a/smoke-tests.sh +++ /dev/null @@ -1,4 +0,0 @@ -# Smoketests -rm -rf .duct/* -duct --report-interval 2 --sample-interval 0.5 ./test_script.py -- --duration 6 --cpu-load 50000 --memory-size 500 -find .duct/ -name '*.json' -exec sh -c 'echo "File: $1)"; cat "$1" | jq' _ {} \; diff --git a/src/duct.py b/src/duct.py index e09edc55..1c2372cd 100755 --- a/src/duct.py +++ b/src/duct.py @@ -52,6 +52,7 @@ def __init__( self.arguments = arguments self.session_id = session_id self.gpus = [] + self.env = None self.number = 0 self.system_info = {} self.output_prefix = output_prefix @@ -75,7 +76,7 @@ def collect_environment(self): def get_system_info(self): """Gathers system information related to CPU, GPU, memory, and environment variables.""" - self.system_info["uid"] = os.environ["USER"] + self.system_info["uid"] = os.environ.get("USER") self.system_info["memory_total"] = os.sysconf("SC_PAGE_SIZE") * os.sysconf( "SC_PHYS_PAGES" ) @@ -103,13 +104,23 @@ def get_system_info(self): except subprocess.CalledProcessError: self.gpus = ["Failed to query GPU info"] - def update_max_resources(self, maxes, sample): + def calculate_total_usage(self, sample): + pmem = 0.0 + pcpu = 0.0 + for _pid, pinfo in sample.items(): + pmem += pinfo["pmem"] + pcpu += pinfo["pcpu"] + totals = {"totals": {"pmem": pmem, "pcpu": pcpu}} + return totals + + @staticmethod + def update_max_resources(maxes, sample): for pid in sample: if pid in maxes: for key, value in sample[pid].items(): maxes[pid][key] = max(maxes[pid].get(key, value), value) else: - maxes[pid] = sample[pid] + maxes[pid] = sample[pid].copy() def collect_sample(self): process_data = {} @@ -140,7 +151,7 @@ def collect_sample(self): "timestamp": datetime.now().astimezone().isoformat(), } except subprocess.CalledProcessError: - process_data["error"] = "Failed to query process data" + pass return process_data def write_pid_samples(self): @@ -157,32 +168,43 @@ def finalize(self): print(Colors.OKGREEN) else: print(Colors.FAIL) - - print("-----------------------------------------------------") - print(" duct report") - print("-----------------------------------------------------") print(f"Exit Code: {self.process.returncode}") print(f"{Colors.OKCYAN}Command: {self.command}") + print(f"Log files location: {self.output_prefix}") print(f"Wall Clock Time: {self.elapsed_time}") - print(f"Number of Processes: {len(self.max_values)}") - for pid, values in self.max_values.items(): - values.pop("timestamp") # Meaningless - print(f" {pid} Max Usage: {values}") - - -def monitor_process(report, process, report_interval, sample_interval): - while True: - if process.poll() is not None: # the passthrough command has finished - break - # print(f"Resource stats log path: {resource_stats_log_path}") - sample = report.collect_sample() - report.update_max_resources(report._sample, sample) - if report.elapsed_time >= (report.number + 1) * report_interval: - report.write_pid_samples() - report.update_max_resources(report.max_values, report._sample) - report._sample = defaultdict(dict) # Reset sample - report.number += 1 - time.sleep(sample_interval) + print( + f"Memory Peak Usage: {self.max_values.get('totals', {}).get('pmem', 'unknown')}%" + ) + print( + f"CPU Peak Usage: {self.max_values.get('totals', {}).get('pcpu', 'unknown')}%" + ) + + def __repr__(self): + return json.dumps( + { + "Command": self.command, + "System": self.system_info, + "ENV": self.env, + "GPU": self.gpus, + } + ) + + +def monitor_process(report, process, report_interval, sample_interval, stop_event): + while not stop_event.wait(timeout=sample_interval): + while True: + if process.poll() is not None: # the passthrough command has finished + break + # print(f"Resource stats log path: {resource_stats_log_path}") + sample = report.collect_sample() + totals = report.calculate_total_usage(sample) + report.update_max_resources(sample, totals) + report.update_max_resources(report._sample, sample) + if report.elapsed_time >= report.number * report_interval: + report.write_pid_samples() + report.update_max_resources(report.max_values, report._sample) + report._sample = defaultdict(dict) # Reset sample + report.number += 1 def create_and_parse_args(): @@ -210,7 +232,9 @@ def create_and_parse_args(): "--s-i", type=float, default=float(os.getenv("DUCT_SAMPLE_INTERVAL", "1.0")), - help="Interval in seconds between status checks of the running process.", + help="Interval in seconds between status checks of the running process. " + "Sample interval should be larger than the runtime of the process or `duct` may " + "underreport the number of processes started.", ) parser.add_argument( "--report-interval", @@ -306,8 +330,8 @@ def prepare_outputs( elif capture_outputs in ["all", "stdout"] and outputs in ["none", "stderr"]: stdout = open(f"{output_prefix}stdout", "w") elif capture_outputs in ["none", "stderr"] and outputs in ["all", "stdout"]: - stdout = subprocess.PIPE - else: + stdout = None + elif capture_outputs in ["none", "stderr"] and outputs in ["none", "stderr"]: stdout = subprocess.DEVNULL if capture_outputs in ["all", "stderr"] and outputs in ["all", "stderr"]: @@ -316,12 +340,20 @@ def prepare_outputs( elif capture_outputs in ["all", "stderr"] and outputs in ["none", "stdout"]: stderr = open(f"{output_prefix}stderr", "w") elif capture_outputs in ["none", "stdout"] and outputs in ["all", "stderr"]: - stderr = subprocess.PIPE - else: + stderr = None + elif capture_outputs in ["none", "stdout"] and outputs in ["none", "stdout"]: stderr = subprocess.DEVNULL return stdout, stderr +def safe_close_files(file_list): + for f in file_list: + try: + f.close() + except Exception: + pass + + def ensure_directories(path: str) -> None: if path.endswith(os.sep): # If it ends in "/" (for linux) treat as a dir os.makedirs(path, exist_ok=True) @@ -333,8 +365,12 @@ def ensure_directories(path: str) -> None: def main(): - """A wrapper to execute a command, monitor and log the process details.""" args = create_and_parse_args() + execute(args) + + +def execute(args): + """A wrapper to execute a command, monitor and log the process details.""" datetime_filesafe = datetime.now().strftime("%Y.%m.%dT%H.%M.%S") duct_pid = os.getpid() formatted_output_prefix = args.output_prefix.format( @@ -354,18 +390,19 @@ def main(): stderr_file = stderr full_command = " ".join([str(args.command)] + args.arguments) - print(f"{Colors.OKCYAN}-----------------------------------------------------") - print(f"duct is executing {full_command}...") - print() - print(f"Log files will be written to {formatted_output_prefix}") - print(f"-----------------------------------------------------{Colors.ENDC}") + print(f"{Colors.OKCYAN}duct is executing {full_command}...") + print(f"Log files will be written to {formatted_output_prefix}{Colors.ENDC}") process = subprocess.Popen( [str(args.command)] + args.arguments, stdout=stdout_file, stderr=stderr_file, preexec_fn=os.setsid, ) - session_id = os.getsid(process.pid) # Get session ID of the new process + try: + session_id = os.getsid(process.pid) # Get session ID of the new process + except ProcessLookupError: # process has already finished + session_id = None + report = Report( args.command, args.arguments, @@ -374,18 +411,21 @@ def main(): process, datetime_filesafe, ) + stop_event = threading.Event() if args.record_types in ["all", "processes-samples"]: monitoring_args = [ report, process, args.report_interval, args.sample_interval, + stop_event, ] monitoring_thread = threading.Thread( target=monitor_process, args=monitoring_args ) monitoring_thread.start() - monitoring_thread.join() + else: + monitoring_thread = None if args.record_types in ["all", "system-summary"]: report.collect_environment() @@ -400,15 +440,17 @@ def main(): system_logs.write(str(report)) process.wait() + stop_event.set() + if monitoring_thread is not None: + monitoring_thread.join() + + # If we have any extra samples that haven't been written yet, do it now + if report._sample: + report.update_max_resources(report.max_values, report._sample) + report.write_pid_samples() report.process = process - if isinstance(stdout, TailPipe): - stdout_file.close() - stdout.close() - if isinstance(stderr, TailPipe): - stderr_file.close() - stderr.close() report.finalize() - print(f"Log files location: {report.output_prefix}") + safe_close_files([stdout_file, stdout, stderr_file, stderr]) if __name__ == "__main__": diff --git a/test/test_aggregation.py b/test/test_aggregation.py deleted file mode 100644 index 4d496347..00000000 --- a/test/test_aggregation.py +++ /dev/null @@ -1,2 +0,0 @@ -def test_sanity(): - pass diff --git a/test/test_execution.py b/test/test_execution.py new file mode 100644 index 00000000..c3db957d --- /dev/null +++ b/test/test_execution.py @@ -0,0 +1,161 @@ +import argparse +import os +import shutil +from unittest import mock +import pytest +from utils import assert_files +from duct import execute + + +@pytest.fixture +def temp_output_dir(tmp_path): + yield str(tmp_path) + os.sep + shutil.rmtree(tmp_path) + + +def test_sanity_green(temp_output_dir): + args = argparse.Namespace( + command="echo", + arguments=["hello", "world"], + output_prefix=temp_output_dir, + sample_interval=1.0, + report_interval=60.0, + capture_outputs="all", + outputs="all", + record_types="all", + ) + execute(args) + # When runtime < sample_interval, we won't have a usage.json + expected_files = ["stdout", "stderr", "info.json"] + assert_files(temp_output_dir, expected_files, exists=True) + + +def test_sanity_red(temp_output_dir): + args = argparse.Namespace( + command="false", + arguments=[], + output_prefix=temp_output_dir, + sample_interval=1.0, + report_interval=60.0, + capture_outputs="all", + outputs="all", + record_types="all", + ) + with mock.patch("sys.stdout", new_callable=mock.MagicMock) as mock_stdout: + execute(args) + mock_stdout.write.assert_has_calls([mock.call("Exit Code: 1")]) + + # We still should execute normally + expected_files = ["stdout", "stderr", "info.json"] + assert_files(temp_output_dir, expected_files, exists=True) + # But no polling of the already failed command + not_expected_files = ["usage.json"] + assert_files(temp_output_dir, not_expected_files, exists=False) + + +def test_outputs_full(temp_output_dir): + args = argparse.Namespace( + command="./test_script.py", + arguments=["--duration", "1"], + output_prefix=temp_output_dir, + sample_interval=0.01, + report_interval=0.1, + capture_outputs="all", + outputs="all", + record_types="all", + ) + execute(args) + expected_files = ["stdout", "stderr", "info.json", "usage.json"] + assert_files(temp_output_dir, expected_files, exists=True) + + +def test_outputs_passthrough(temp_output_dir): + args = argparse.Namespace( + command="./test_script.py", + arguments=["--duration", "1"], + output_prefix=temp_output_dir, + sample_interval=0.01, + report_interval=0.1, + capture_outputs="none", + outputs="all", + record_types="all", + ) + execute(args) + expected_files = ["info.json", "usage.json"] + assert_files(temp_output_dir, expected_files, exists=True) + not_expected_files = ["stdout", "stderr"] + assert_files(temp_output_dir, not_expected_files, exists=False) + + +def test_outputs_capture(temp_output_dir): + args = argparse.Namespace( + command="./test_script.py", + arguments=["--duration", "1"], + output_prefix=temp_output_dir, + sample_interval=0.01, + report_interval=0.1, + capture_outputs="all", + outputs="none", + record_types="all", + ) + execute(args) + # TODO make this work assert mock.call("this is of test of STDOUT\n") not in mock_stdout.write.mock_calls + + expected_files = ["stdout", "stderr", "info.json", "usage.json"] + assert_files(temp_output_dir, expected_files, exists=True) + + +def test_outputs_none(temp_output_dir): + args = argparse.Namespace( + command="./test_script.py", + arguments=["--duration", "1"], + output_prefix=temp_output_dir, + sample_interval=0.01, + report_interval=0.1, + capture_outputs="none", + outputs="none", + record_types="all", + ) + execute(args) + # assert mock.call("this is of test of STDOUT\n") not in mock_stdout.write.mock_calls + + expected_files = ["info.json", "usage.json"] + assert_files(temp_output_dir, expected_files, exists=True) + + not_expected_files = ["stdout", "stderr"] + assert_files(temp_output_dir, not_expected_files, exists=False) + + +def test_exit_before_first_sample(temp_output_dir): + args = argparse.Namespace( + command="ls", + arguments=[], + output_prefix=temp_output_dir, + sample_interval=0.1, + report_interval=0.1, + capture_outputs="all", + outputs="none", + record_types="all", + ) + execute(args) + expected_files = ["stdout", "stderr", "info.json"] + assert_files(temp_output_dir, expected_files, exists=True) + not_expected_files = ["usage.json"] + assert_files(temp_output_dir, not_expected_files, exists=False) + + +def test_run_less_than_report_interval(temp_output_dir): + args = argparse.Namespace( + command="sleep", + arguments=["0.01"], + output_prefix=temp_output_dir, + sample_interval=0.001, + report_interval=0.1, + capture_outputs="all", + outputs="none", + record_types="all", + ) + execute(args) + # Specifically we need to assert that usage.json gets written anyway. + expected_files = ["stdout", "stderr", "usage.json", "info.json"] + assert_files(temp_output_dir, expected_files, exists=True) diff --git a/test/test_prepare_outputs.py b/test/test_prepare_outputs.py index 02ad3b59..bb8d8d28 100644 --- a/test/test_prepare_outputs.py +++ b/test/test_prepare_outputs.py @@ -50,14 +50,14 @@ def test_prepare_outputs_all_none(): def test_prepare_outputs_none_stdout(): output_prefix = "test_outputs_" stdout, stderr = prepare_outputs("none", "stdout", output_prefix) - assert stdout == subprocess.PIPE + assert stdout is None assert stderr == subprocess.DEVNULL def test_prepare_outputs_none_stderr(): output_prefix = "test_outputs_" stdout, stderr = prepare_outputs("none", "stderr", output_prefix) - assert stderr == subprocess.PIPE + assert stderr is None assert stdout == subprocess.DEVNULL diff --git a/test/test_report.py b/test/test_report.py new file mode 100644 index 00000000..44ed7d9b --- /dev/null +++ b/test/test_report.py @@ -0,0 +1,37 @@ +from collections import defaultdict +from duct import Report + +ex0 = {"pid1": {"pcpu": 0.0}} +ex1 = {"pid1": {"pcpu": 1.0}} +ex2 = {"pid2": {"pcpu": 1.0}} +ex2pids = {"pid1": {"pcpu": 0.0}, "pid2": {"pcpu": 0.0}} + + +def test_update_max_resources_initial_values_one_pid(): + maxes = defaultdict(dict) + Report.update_max_resources(maxes, ex0) + assert maxes == ex0 + + +def test_update_max_resources_max_values_one_pid(): + maxes = defaultdict(dict) + Report.update_max_resources(maxes, ex0) + Report.update_max_resources(maxes, ex1) + assert maxes == ex1 + + +def test_update_max_resources_initial_values_two_pids(): + maxes = defaultdict(dict) + Report.update_max_resources(maxes, ex2pids) + assert maxes == ex2pids + + +def test_update_max_resources_max_update_values_two_pids(): + maxes = defaultdict(dict) + Report.update_max_resources(maxes, ex2pids) + Report.update_max_resources(maxes, ex1) + Report.update_max_resources(maxes, ex2) + assert maxes.keys() == ex2pids.keys() + assert maxes != ex2pids + assert maxes["pid1"] == ex1["pid1"] + assert maxes["pid2"] == ex2["pid2"] diff --git a/test/utils.py b/test/utils.py index 7519e686..d62899dc 100644 --- a/test/utils.py +++ b/test/utils.py @@ -1,4 +1,5 @@ from io import BytesIO +from pathlib import Path class MockStream: @@ -9,3 +10,16 @@ def __init__(self): def getvalue(self): return self.buffer.getvalue() + + +def assert_files(parent_dir, file_list, exists=True): + if exists: + for file_path in file_list: + assert Path( + parent_dir + file_path + ).exists(), f"Expected file does not exist: {file_path}" + else: + for file_path in file_list: + assert not Path( + parent_dir + file_path + ).exists(), f"Unexpected file should not exist: {file_path}" diff --git a/test_script.py b/test_script.py index debe7e40..8cba8572 100755 --- a/test_script.py +++ b/test_script.py @@ -21,8 +21,9 @@ def consume_memory(size): def main(duration, cpu_load, memory_size): + print("this is of test of STDOUT") + print("this is of test of STDERR", file=sys.stderr) consume_memory(memory_size) - print("this is of test of STDERR: ERRRRRRRRRRRRRRR", file=sys.stderr) consume_cpu(duration, cpu_load) print( f"Test completed. Consumed {memory_size} MB for {duration} seconds with CPU load factor {cpu_load}."