From f549e3084ccfa9ba5f7a08aef40087da3a5c0c76 Mon Sep 17 00:00:00 2001 From: Austin Macdonald Date: Wed, 24 Apr 2024 09:55:19 -0500 Subject: [PATCH 01/20] Fixup: Remove profilers.py, all used code was moved into duct.py in ^HEAD --- src/profilers.py | 29 ----------------------------- 1 file changed, 29 deletions(-) delete mode 100644 src/profilers.py diff --git a/src/profilers.py b/src/profilers.py deleted file mode 100644 index 7eb89e4f..00000000 --- a/src/profilers.py +++ /dev/null @@ -1,29 +0,0 @@ -import subprocess -import os -import time -import json - - -def get_processes_in_session(session_id): - """Retrieve all PIDs belonging to the given session ID.""" - pids = [] - for pid in os.listdir('/proc'): - if pid.isdigit(): - try: - with open(os.path.join('/proc', pid, 'stat'), 'r') as f: - data = f.read().split() - if int(data[5]) == session_id: # Check session ID in stat file - pids.append(int(pid)) - except IOError: # proc has already terminated - continue - return pids - - - -def pid_dummy_monitor(pid, elapsed_time, subreport): - """A dummy function to simulate process monitoring and logging.""" - try: - os.kill(pid, 0) # Check if the process is still running - subreport.pids_dummy[pid].append(f"Process {pid} checked at {elapsed_time} seconds") - except OSError: - subreport.pids_dummy[pid].append(f"Process {pid} has terminated.") From 47eced5896fcd0ff6109a7c7f80cb359aff26c14 Mon Sep 17 00:00:00 2001 From: Austin Macdonald Date: Wed, 24 Apr 2024 09:55:36 -0500 Subject: [PATCH 02/20] Move logic out of Report constructor --- src/duct.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/src/duct.py b/src/duct.py index 1781997c..fdd19b5f 100755 --- a/src/duct.py +++ b/src/duct.py @@ -10,7 +10,6 @@ from collections import defaultdict from dataclasses import dataclass, field -import profilers __version__ = "0.0.1" ENV_PREFIXES = ("PBS_", "SLURM_", "OSG") @@ -23,19 +22,22 @@ def __init__(self, command, session_id): self.start_time = time.time() self.command = command self.session_id = session_id - self.system_info = {"uid": os.environ["USER"]} - self.env = ( - {k: v for k, v in os.environ.items() if k.startswith(ENV_PREFIXES)}, - ) self.gpu = None self.unaggregated_samples = [] self.stdout = "" self.stderr = "" self.number = 0 - self.get_system_info() + self.system_info = {} + + # TODO property? + def collect_environment(self): + self.env = ( + {k: v for k, v in os.environ.items() if k.startswith(ENV_PREFIXES)}, + ) def get_system_info(self): """Gathers system information related to CPU, GPU, memory, and environment variables.""" + self.system_info["uid"] = os.environ["USER"] self.system_info["memory_total"] = os.sysconf("SC_PAGE_SIZE") * os.sysconf( "SC_PHYS_PAGES" ) @@ -164,6 +166,8 @@ def main(): ) session_id = os.getsid(process.pid) # Get session ID of the new process report = Report(args.command, session_id) + report.collect_environment() + report.get_system_info() while True: elapsed_time = time.time() - report.start_time From 73ec9f90264e9f7e79621f5e551676a33b61baaa Mon Sep 17 00:00:00 2001 From: Austin Macdonald Date: Wed, 24 Apr 2024 09:59:18 -0500 Subject: [PATCH 03/20] Pull args out of main Since we are making main the 'explicit readable' layer, lets simplify it --- src/duct.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/duct.py b/src/duct.py index fdd19b5f..ac7b7287 100755 --- a/src/duct.py +++ b/src/duct.py @@ -131,8 +131,7 @@ def serialize(self): } -def main(): - """A wrapper to execute a command, monitor and log the process details.""" +def create_and_parse_args(): parser = argparse.ArgumentParser( description="A process wrapper script that monitors the execution of a command." ) @@ -155,8 +154,12 @@ def main(): default=60.0, help="Interval in seconds at which to report aggregated data.", ) - args = parser.parse_args() + return parser.parse_args() + +def main(): + """A wrapper to execute a command, monitor and log the process details.""" + args = create_and_parse_args() try: process = subprocess.Popen( [str(args.command)] + args.arguments.copy(), From 6a7ed737bfb09878e9ea3da709d7b4719ead8312 Mon Sep 17 00:00:00 2001 From: Austin Macdonald Date: Wed, 24 Apr 2024 10:26:13 -0500 Subject: [PATCH 04/20] write system information --- src/duct.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/src/duct.py b/src/duct.py index ac7b7287..0b91c2b6 100755 --- a/src/duct.py +++ b/src/duct.py @@ -148,6 +148,12 @@ def create_and_parse_args(): type=str, default="TODO_BETTER.stats.json", ) + parser.add_argument( + "--system_log_path", + type=str, + default="TODO_BETTER.system.json", + ) + parser.add_argument( "--report-interval", type=float, @@ -171,6 +177,8 @@ def main(): report = Report(args.command, session_id) report.collect_environment() report.get_system_info() + with open(args.system_log_path, "a") as system_logs: + system_logs.write(str(report)) while True: elapsed_time = time.time() - report.start_time @@ -187,9 +195,13 @@ def main(): break time.sleep(args.sample_interval) + with open(args.system_log_path, "a") as system_logs: + report.run_time_seconds = f"{report.end_time - report.start_time}" + report.end_time = time.time() + report.get_system_info() + # TODO redundant. maybe just write new stuff or wait to write the begining? + system_logs.write(str(report)) stdout, stderr = process.communicate() - report.end_time = time.time() - report.run_time_seconds = f"{report.end_time - report.start_time}" report.stdout = stdout.decode() report.stderr = stderr.decode() pprint.pprint(report, width=120) From 3dd2d6e61d17a409c5a9071729d829444b5f40fb Mon Sep 17 00:00:00 2001 From: Austin Macdonald Date: Wed, 24 Apr 2024 14:06:21 -0500 Subject: [PATCH 05/20] mv stdout stderr out of report --- src/duct.py | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/src/duct.py b/src/duct.py index 0b91c2b6..90854ff5 100755 --- a/src/duct.py +++ b/src/duct.py @@ -24,8 +24,6 @@ def __init__(self, command, session_id): self.session_id = session_id self.gpu = None self.unaggregated_samples = [] - self.stdout = "" - self.stderr = "" self.number = 0 self.system_info = {} @@ -153,7 +151,6 @@ def create_and_parse_args(): type=str, default="TODO_BETTER.system.json", ) - parser.add_argument( "--report-interval", type=float, @@ -177,8 +174,6 @@ def main(): report = Report(args.command, session_id) report.collect_environment() report.get_system_info() - with open(args.system_log_path, "a") as system_logs: - system_logs.write(str(report)) while True: elapsed_time = time.time() - report.start_time @@ -196,15 +191,14 @@ def main(): time.sleep(args.sample_interval) with open(args.system_log_path, "a") as system_logs: - report.run_time_seconds = f"{report.end_time - report.start_time}" report.end_time = time.time() + report.run_time_seconds = f"{report.end_time - report.start_time}" report.get_system_info() - # TODO redundant. maybe just write new stuff or wait to write the begining? system_logs.write(str(report)) stdout, stderr = process.communicate() - report.stdout = stdout.decode() - report.stderr = stderr.decode() pprint.pprint(report, width=120) + print(f"STDOUT: {stdout.decode()}") + print(f"STDERR: {stderr.decode()}") except Exception as e: print(f"Failed to execute command: {str(e)}") From acf724374676c2cd596982381f0e91156a410352 Mon Sep 17 00:00:00 2001 From: Austin Macdonald Date: Thu, 25 Apr 2024 13:29:19 -0500 Subject: [PATCH 06/20] Add output prefix argument Clean up output and ignore default log dir --- .../2024-04-16T15:35:32-04:00_802267.err | 0 .../2024-04-16T15:35:32-04:00_802267.out | 32 ------------------- ...024-04-16T15:35:32-04:00_802267.stats.json | 1 - .../2024-04-16T15:37:51-04:00_803473.err | 0 .../2024-04-16T15:37:51-04:00_803473.out | 32 ------------------- ...024-04-16T15:37:51-04:00_803473.stats.json | 1 - .gitignore | 2 ++ src/duct.py | 31 ++++++++++-------- 8 files changed, 19 insertions(+), 80 deletions(-) delete mode 100644 .duct/run-logs/2024-04-16T15:35:32-04:00_802267.err delete mode 100644 .duct/run-logs/2024-04-16T15:35:32-04:00_802267.out delete mode 100644 .duct/run-logs/2024-04-16T15:35:32-04:00_802267.stats.json delete mode 100644 .duct/run-logs/2024-04-16T15:37:51-04:00_803473.err delete mode 100644 .duct/run-logs/2024-04-16T15:37:51-04:00_803473.out delete mode 100644 .duct/run-logs/2024-04-16T15:37:51-04:00_803473.stats.json diff --git a/.duct/run-logs/2024-04-16T15:35:32-04:00_802267.err b/.duct/run-logs/2024-04-16T15:35:32-04:00_802267.err deleted file mode 100644 index e69de29b..00000000 diff --git a/.duct/run-logs/2024-04-16T15:35:32-04:00_802267.out b/.duct/run-logs/2024-04-16T15:35:32-04:00_802267.out deleted file mode 100644 index 3f3a4574..00000000 --- a/.duct/run-logs/2024-04-16T15:35:32-04:00_802267.out +++ /dev/null @@ -1,32 +0,0 @@ -Started 5 for 802273 -abandoning_pare(802273)-+-pstree(802280) - |-python3(802275) - |-python3(802276) - |-python3(802277) - |-python3(802278) - `-python3(802279) -Starting one more in subprocess -[1] Running "$@" & -[2] Running "$@" & -[3] Running "$@" & -[4]- Running "$@" & -[5]+ Running "$@" & -abandoning_pare(802273)-+-pstree(802283) - |-python3(802275) - |-python3(802276) - |-python3(802277) - |-python3(802278) - `-python3(802279) -waiting -PID of this process: 802275 - -PID of this process: 802277 - -PID of this process: 802279 - -PID of this process: 802276 - -PID of this process: 802278 - -PID of this process: 802282 - diff --git a/.duct/run-logs/2024-04-16T15:35:32-04:00_802267.stats.json b/.duct/run-logs/2024-04-16T15:35:32-04:00_802267.stats.json deleted file mode 100644 index 078ed9d4..00000000 --- a/.duct/run-logs/2024-04-16T15:35:32-04:00_802267.stats.json +++ /dev/null @@ -1 +0,0 @@ -{"cpu-user-time": 10.48, "cpu-system-time": "2.39", "cpu-perc": "497%", "memory-max-kb": 1053280, "memory-avg-kb": 0, "elapsed-time": 2.58, "cpu-time": 2.39 } diff --git a/.duct/run-logs/2024-04-16T15:37:51-04:00_803473.err b/.duct/run-logs/2024-04-16T15:37:51-04:00_803473.err deleted file mode 100644 index e69de29b..00000000 diff --git a/.duct/run-logs/2024-04-16T15:37:51-04:00_803473.out b/.duct/run-logs/2024-04-16T15:37:51-04:00_803473.out deleted file mode 100644 index 8ceba84a..00000000 --- a/.duct/run-logs/2024-04-16T15:37:51-04:00_803473.out +++ /dev/null @@ -1,32 +0,0 @@ -Started 5 for 803479 -abandoning_pare(803479)-+-pstree(803486) - |-python3(803481) - |-python3(803482) - |-python3(803483) - |-python3(803484) - `-python3(803485) -Starting one more in subprocess -[1] Running "$@" & -[2] Running "$@" & -[3] Running "$@" & -[4]- Running "$@" & -[5]+ Running "$@" & -abandoning_pare(803479)-+-pstree(803489) - |-python3(803481) - |-python3(803482) - |-python3(803483) - |-python3(803484) - `-python3(803485) -waiting -PID of this process: 803481 - -PID of this process: 803485 - -PID of this process: 803482 - -PID of this process: 803484 - -PID of this process: 803483 - -PID of this process: 803488 - diff --git a/.duct/run-logs/2024-04-16T15:37:51-04:00_803473.stats.json b/.duct/run-logs/2024-04-16T15:37:51-04:00_803473.stats.json deleted file mode 100644 index cb0b5c42..00000000 --- a/.duct/run-logs/2024-04-16T15:37:51-04:00_803473.stats.json +++ /dev/null @@ -1 +0,0 @@ -{"cpu-user-time": 10.40, "cpu-system-time": "2.67", "cpu-perc": "494%", "memory-max-kb": 1053664, "memory-avg-kb": 0, "elapsed-time": 2.64, "cpu-time": 2.67 } diff --git a/.gitignore b/.gitignore index 70546a14..492504e0 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ __pycache__/ src/__pycache__/ duct.egg-info/ + +.duct/ diff --git a/src/duct.py b/src/duct.py index 90854ff5..3514790c 100755 --- a/src/duct.py +++ b/src/duct.py @@ -9,6 +9,7 @@ import time from collections import defaultdict from dataclasses import dataclass, field +from datetime import datetime __version__ = "0.0.1" @@ -87,7 +88,8 @@ def collect_sample(self): def aggregate_samples(self): max_values = {} - for sample in self.unaggregated_samples: + while self.unaggregated_samples: + sample = self.unaggregated_samples.pop() for pid, metrics in sample.items(): if pid not in max_values: max_values[pid] = metrics.copy() # Make a copy of the metrics for the first entry @@ -95,7 +97,6 @@ def aggregate_samples(self): # Update each metric to the maximum found so far for key in metrics: max_values[pid][key] = max(max_values[pid][key], metrics[key]) - return max_values def __repr__(self): @@ -130,6 +131,9 @@ def serialize(self): def create_and_parse_args(): + now = datetime.now() + # 'pure' iso 8601 does not make good filenames + file_safe_iso = now.strftime('%Y-%m-%d.%H-%M-%S') parser = argparse.ArgumentParser( description="A process wrapper script that monitors the execution of a command." ) @@ -142,14 +146,9 @@ def create_and_parse_args(): help="Interval in seconds between status checks of the running process.", ) parser.add_argument( - "--stats_log_path", - type=str, - default="TODO_BETTER.stats.json", - ) - parser.add_argument( - "--system_log_path", + "--output_prefix", type=str, - default="TODO_BETTER.system.json", + default=os.getenv("DUCT_OUTPUT_PREFIX", f".duct/run-logs/{file_safe_iso}") ) parser.add_argument( "--report-interval", @@ -163,6 +162,7 @@ def create_and_parse_args(): def main(): """A wrapper to execute a command, monitor and log the process details.""" args = create_and_parse_args() + os.makedirs(args.output_prefix, exist_ok=True) try: process = subprocess.Popen( [str(args.command)] + args.arguments.copy(), @@ -180,17 +180,17 @@ def main(): report.collect_sample() if elapsed_time >= (report.number + 1) * args.report_interval: aggregated = report.aggregate_samples() - print(aggregated) - with open(args.stats_log_path, "a") as resource_statistics_log: - aggregated["elapsed_time"] = elapsed_time - resource_statistics_log.write(json.dumps(aggregated)) + for pid, pinfo in aggregated.items(): + with open(f"{args.output_prefix}/{pid}_resource_usage.json", "a") as resource_statistics_log: + pinfo["elapsed_time"] = elapsed_time + resource_statistics_log.write(json.dumps(aggregated)) report.number += 1 if process.poll() is not None: # the passthrough command has finished break time.sleep(args.sample_interval) - with open(args.system_log_path, "a") as system_logs: + with open(f"{args.output_prefix}/{report.session_id}", "a") as system_logs: report.end_time = time.time() report.run_time_seconds = f"{report.end_time - report.start_time}" report.get_system_info() @@ -201,6 +201,9 @@ def main(): print(f"STDERR: {stderr.decode()}") except Exception as e: + import traceback + traceback.print_exc() + import ipdb; ipdb.set_trace() print(f"Failed to execute command: {str(e)}") From 124083b2c1d67aa7ef76ffa3fcc51125a768fa14 Mon Sep 17 00:00:00 2001 From: Austin Macdonald Date: Thu, 25 Apr 2024 15:05:10 -0500 Subject: [PATCH 07/20] fix system file --- src/duct.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/duct.py b/src/duct.py index 3514790c..6d1afff8 100755 --- a/src/duct.py +++ b/src/duct.py @@ -106,8 +106,6 @@ def __repr__(self): "System": self.system_info, "ENV": self.env, "GPU": self.gpu, - "STDOUT": self.stdout, - "STDERR": self.stderr, } ) @@ -190,7 +188,7 @@ def main(): break time.sleep(args.sample_interval) - with open(f"{args.output_prefix}/{report.session_id}", "a") as system_logs: + with open(f"{args.output_prefix}/system-report.session-{report.session_id}.json", "a") as system_logs: report.end_time = time.time() report.run_time_seconds = f"{report.end_time - report.start_time}" report.get_system_info() From 0de7150e30b7cdabc9e2096c8634e9b2650293db Mon Sep 17 00:00:00 2001 From: Austin Macdonald Date: Thu, 25 Apr 2024 15:06:43 -0500 Subject: [PATCH 08/20] Add smoketest --- smoke-test.sh | 3 +++ 1 file changed, 3 insertions(+) create mode 100755 smoke-test.sh diff --git a/smoke-test.sh b/smoke-test.sh new file mode 100755 index 00000000..be392ea6 --- /dev/null +++ b/smoke-test.sh @@ -0,0 +1,3 @@ +rm -rf .duct/* +duct --report-interval 2 --sample-interval 0.5 ./test_script.py -- --duration 6 --cpu-load 50000 --memory-size 50000 +find .duct/ -name '*.json' -exec sh -c 'echo "File: $1)"; cat "$1" | jq' _ {} \; From 937be2c3f3fb5e821b7f8be5fcd26406f0180877 Mon Sep 17 00:00:00 2001 From: Austin Macdonald Date: Thu, 25 Apr 2024 15:31:47 -0500 Subject: [PATCH 09/20] Use datalad-install tox config --- tox.ini | 76 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 76 insertions(+) create mode 100644 tox.ini diff --git a/tox.ini b/tox.ini new file mode 100644 index 00000000..78b8aeb9 --- /dev/null +++ b/tox.ini @@ -0,0 +1,76 @@ +[tox] +envlist = lint,typing,py37,py38,py39,py310,py311,py312,pypy3 +skip_missing_interpreters = True +isolated_build = True +minversion = 3.3.0 + +[testenv] +# HOME needs to be set in order for brew to work: +passenv = + GITHUB_TOKEN + HOME +deps = + pytest + pytest-cov + pytest-mock +commands = + pytest {posargs} test + +[testenv:lint] +deps = + flake8 + flake8-bugbear + flake8-builtins + flake8-unused-arguments +commands = + flake8 src test + +[testenv:typing] +deps = + mypy + {[testenv]deps} +commands = + mypy src test + +[pytest] +addopts = --cov=datalad_installer --no-cov-on-fail +filterwarnings = error +norecursedirs = test/data +markers = + ci_only: Only run when --ci is given + ghauth: May use GitHub token + ghauth_required: Requires GitHub token + miniconda: Installs miniconda + needs_sudo: Requires passwordless sudo + +[coverage:run] +branch = True +parallel = True + +[coverage:paths] +source = + src + .tox/**/site-packages + +[coverage:report] +precision = 2 +show_missing = True + +[flake8] +doctests = True +exclude = .*/,build/,dist/,test/data,venv/ +max-line-length = 100 +unused-arguments-ignore-stub-functions = True +select = A,B,B902,B950,C,E,E242,F,U100,W +ignore = A003,B005,E203,E262,E266,E501,W503 + +[isort] +atomic = True +force_sort_within_sections = True +honor_noqa = True +lines_between_sections = 0 +profile = black +reverse_relative = True +sort_relative_in_force_sorted_sections = True +src_paths = src + From 5fe7fdb143c1b0847a4c809bfc8c423a8c617e53 Mon Sep 17 00:00:00 2001 From: Austin Macdonald Date: Thu, 25 Apr 2024 15:33:22 -0500 Subject: [PATCH 10/20] blacken --- smoke-test.sh | 2 +- src/duct.py | 50 +++++++++++++++++++++++++++++++++----------------- test_script.py | 2 +- tox.ini | 1 - 4 files changed, 35 insertions(+), 20 deletions(-) diff --git a/smoke-test.sh b/smoke-test.sh index be392ea6..6a0ce188 100755 --- a/smoke-test.sh +++ b/smoke-test.sh @@ -1,3 +1,3 @@ rm -rf .duct/* duct --report-interval 2 --sample-interval 0.5 ./test_script.py -- --duration 6 --cpu-load 50000 --memory-size 50000 -find .duct/ -name '*.json' -exec sh -c 'echo "File: $1)"; cat "$1" | jq' _ {} \; +find .duct/ -name '*.json' -exec sh -c 'echo "File: $1)"; cat "$1" | jq' _ {} \; diff --git a/src/duct.py b/src/duct.py index 6d1afff8..953556cd 100755 --- a/src/duct.py +++ b/src/duct.py @@ -1,16 +1,14 @@ #!/usr/bin/env python3 import argparse +from collections import defaultdict +from dataclasses import dataclass, field +from datetime import datetime import json import os import pprint import shutil import subprocess -import sys import time -from collections import defaultdict -from dataclasses import dataclass, field -from datetime import datetime - __version__ = "0.0.1" ENV_PREFIXES = ("PBS_", "SLURM_", "OSG") @@ -67,22 +65,31 @@ def get_system_info(self): def collect_sample(self): process_data = {} try: - output = subprocess.check_output(["ps", "-s", str(self.session_id), "-o", "pid,pcpu,pmem,rss,vsz,etime,cmd"], text=True) + output = subprocess.check_output( + [ + "ps", + "-s", + str(self.session_id), + "-o", + "pid,pcpu,pmem,rss,vsz,etime,cmd", + ], + text=True, + ) for line in output.splitlines()[1:]: if line: pid, pcpu, pmem, rss, vsz, etime, cmd = line.split(maxsplit=6) process_data[pid] = { # %CPU - 'pcpu': float(pcpu), + "pcpu": float(pcpu), # %MEM - 'pmem': float(pmem), + "pmem": float(pmem), # Memory Resident Set Size - 'rss': int(rss), + "rss": int(rss), # Virtual Memory size - 'vsz': int(vsz), + "vsz": int(vsz), } except subprocess.CalledProcessError: - process_data['error'] = "Failed to query process data" + process_data["error"] = "Failed to query process data" self.unaggregated_samples.append(process_data) @@ -92,7 +99,9 @@ def aggregate_samples(self): sample = self.unaggregated_samples.pop() for pid, metrics in sample.items(): if pid not in max_values: - max_values[pid] = metrics.copy() # Make a copy of the metrics for the first entry + max_values[ + pid + ] = metrics.copy() # Make a copy of the metrics for the first entry else: # Update each metric to the maximum found so far for key in metrics: @@ -131,7 +140,7 @@ def serialize(self): def create_and_parse_args(): now = datetime.now() # 'pure' iso 8601 does not make good filenames - file_safe_iso = now.strftime('%Y-%m-%d.%H-%M-%S') + file_safe_iso = now.strftime("%Y-%m-%d.%H-%M-%S") parser = argparse.ArgumentParser( description="A process wrapper script that monitors the execution of a command." ) @@ -146,7 +155,7 @@ def create_and_parse_args(): parser.add_argument( "--output_prefix", type=str, - default=os.getenv("DUCT_OUTPUT_PREFIX", f".duct/run-logs/{file_safe_iso}") + default=os.getenv("DUCT_OUTPUT_PREFIX", f".duct/run-logs/{file_safe_iso}"), ) parser.add_argument( "--report-interval", @@ -179,7 +188,9 @@ def main(): if elapsed_time >= (report.number + 1) * args.report_interval: aggregated = report.aggregate_samples() for pid, pinfo in aggregated.items(): - with open(f"{args.output_prefix}/{pid}_resource_usage.json", "a") as resource_statistics_log: + with open( + f"{args.output_prefix}/{pid}_resource_usage.json", "a" + ) as resource_statistics_log: pinfo["elapsed_time"] = elapsed_time resource_statistics_log.write(json.dumps(aggregated)) report.number += 1 @@ -188,7 +199,9 @@ def main(): break time.sleep(args.sample_interval) - with open(f"{args.output_prefix}/system-report.session-{report.session_id}.json", "a") as system_logs: + with open( + f"{args.output_prefix}/system-report.session-{report.session_id}.json", "a" + ) as system_logs: report.end_time = time.time() report.run_time_seconds = f"{report.end_time - report.start_time}" report.get_system_info() @@ -200,8 +213,11 @@ def main(): except Exception as e: import traceback + traceback.print_exc() - import ipdb; ipdb.set_trace() + import ipdb + + ipdb.set_trace() print(f"Failed to execute command: {str(e)}") diff --git a/test_script.py b/test_script.py index 0ea18519..debe7e40 100755 --- a/test_script.py +++ b/test_script.py @@ -17,7 +17,7 @@ def consume_memory(size): """Function to consume amount of memory specified by 'size' in megabytes""" # Create a list of size MB bytes_in_mb = 1024 * 1024 - memory = bytearray(size * bytes_in_mb) + _memory = bytearray(size * bytes_in_mb) # noqa def main(duration, cpu_load, memory_size): diff --git a/tox.ini b/tox.ini index 78b8aeb9..fe5e28b0 100644 --- a/tox.ini +++ b/tox.ini @@ -73,4 +73,3 @@ profile = black reverse_relative = True sort_relative_in_force_sorted_sections = True src_paths = src - From 0d990b6497a6e092d5fcde2e83178a44ecbd7376 Mon Sep 17 00:00:00 2001 From: Austin Macdonald Date: Thu, 25 Apr 2024 15:56:34 -0500 Subject: [PATCH 11/20] trim mem requirement --- smoke-test.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/smoke-test.sh b/smoke-test.sh index 6a0ce188..eb20db37 100755 --- a/smoke-test.sh +++ b/smoke-test.sh @@ -1,3 +1,3 @@ rm -rf .duct/* -duct --report-interval 2 --sample-interval 0.5 ./test_script.py -- --duration 6 --cpu-load 50000 --memory-size 50000 +duct --report-interval 2 --sample-interval 0.5 ./test_script.py -- --duration 6 --cpu-load 50000 --memory-size 500 find .duct/ -name '*.json' -exec sh -c 'echo "File: $1)"; cat "$1" | jq' _ {} \; From 79f6094b7442de377fa95b2a5c8019d16fb78296 Mon Sep 17 00:00:00 2001 From: Austin Macdonald Date: Thu, 25 Apr 2024 16:08:25 -0500 Subject: [PATCH 12/20] Doc cleanup --- README.md | 17 +++++++++++++---- src/duct.py | 3 ++- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 24b037ac..b11df3c7 100644 --- a/README.md +++ b/README.md @@ -6,9 +6,14 @@ A process wrapper script that monitors the execution of a command. ```shell > duct --help -duct [-h] [--sample-interval SAMPLE_INTERVAL] [--report-interval REPORT_INTERVAL] [--] command [arguments ...] -Duct creates a new session to run a command and all its child processes, and the collects metrics for all processes in the session. +usage: duct [-h] [--sample-interval SAMPLE_INTERVAL] + [--output_prefix OUTPUT_PREFIX] + [--report-interval REPORT_INTERVAL] + command [arguments ...] + +Duct creates a new session to run a command and all its child processes, and +the collects metrics for all processes in the session. positional arguments: command The command to execute. @@ -17,9 +22,13 @@ positional arguments: options: -h, --help show this help message and exit --sample-interval SAMPLE_INTERVAL - Interval in seconds between status checks of the running process. + Interval in seconds between status checks of the + running process. + --output_prefix OUTPUT_PREFIX + Directory in which all logs will be saved. --report-interval REPORT_INTERVAL - Interval in seconds at which to report aggregated data. + Interval in seconds at which to report aggregated + data. ``` ## Testing: diff --git a/src/duct.py b/src/duct.py index 953556cd..ebd2fee8 100755 --- a/src/duct.py +++ b/src/duct.py @@ -142,7 +142,7 @@ def create_and_parse_args(): # 'pure' iso 8601 does not make good filenames file_safe_iso = now.strftime("%Y-%m-%d.%H-%M-%S") parser = argparse.ArgumentParser( - description="A process wrapper script that monitors the execution of a command." + description="Gathers metrics on a command and all its child processes." ) parser.add_argument("command", help="The command to execute.") parser.add_argument("arguments", nargs="*", help="Arguments for the command.") @@ -156,6 +156,7 @@ def create_and_parse_args(): "--output_prefix", type=str, default=os.getenv("DUCT_OUTPUT_PREFIX", f".duct/run-logs/{file_safe_iso}"), + help="Directory in which all logs will be saved.", ) parser.add_argument( "--report-interval", From 09524092bf44e548e7372f56ae4faddf81d3cbb4 Mon Sep 17 00:00:00 2001 From: Austin Macdonald Date: Thu, 25 Apr 2024 16:13:44 -0500 Subject: [PATCH 13/20] Add codespell CI, from https://raw.githubusercontent.com/datalad/datalad-installer/master/.github/workflows/test.yml --- .github/workflows/codespell.yaml | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) create mode 100644 .github/workflows/codespell.yaml diff --git a/.github/workflows/codespell.yaml b/.github/workflows/codespell.yaml new file mode 100644 index 00000000..6f32efea --- /dev/null +++ b/.github/workflows/codespell.yaml @@ -0,0 +1,19 @@ +--- +name: Codespell + +on: + push: + branches: [master] + pull_request: + branches: [master] + +jobs: + codespell: + name: Check for spelling errors + runs-on: ubuntu-latest + + steps: + - name: Checkout + uses: actions/checkout@v4 + - name: Codespell + uses: codespell-project/actions-codespell@v2 From 9c2b1e2727b170fa113a9c301eaa3e4a483b868e Mon Sep 17 00:00:00 2001 From: Austin Macdonald Date: Thu, 25 Apr 2024 16:20:55 -0500 Subject: [PATCH 14/20] Fix codespell, his project uses main instead of master --- .github/workflows/codespell.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/codespell.yaml b/.github/workflows/codespell.yaml index 6f32efea..3bda7023 100644 --- a/.github/workflows/codespell.yaml +++ b/.github/workflows/codespell.yaml @@ -3,9 +3,9 @@ name: Codespell on: push: - branches: [master] + branches: [main] pull_request: - branches: [master] + branches: [main] jobs: codespell: From 5112021c4b05ac24af6fcc25817a8d5ef6275f4d Mon Sep 17 00:00:00 2001 From: Austin Macdonald Date: Thu, 25 Apr 2024 16:28:15 -0500 Subject: [PATCH 15/20] silly commit: added this from Github to trigger actions for the first time. --- .github/workflows/pylint.yml | 23 ----------------------- 1 file changed, 23 deletions(-) delete mode 100644 .github/workflows/pylint.yml diff --git a/.github/workflows/pylint.yml b/.github/workflows/pylint.yml deleted file mode 100644 index c73e032c..00000000 --- a/.github/workflows/pylint.yml +++ /dev/null @@ -1,23 +0,0 @@ -name: Pylint - -on: [push] - -jobs: - build: - runs-on: ubuntu-latest - strategy: - matrix: - python-version: ["3.8", "3.9", "3.10"] - steps: - - uses: actions/checkout@v4 - - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v3 - with: - python-version: ${{ matrix.python-version }} - - name: Install dependencies - run: | - python -m pip install --upgrade pip - pip install pylint - - name: Analysing the code with pylint - run: | - pylint $(git ls-files '*.py') From 3a08033da627ecdb9c9a5f3f8fe7aaee1c4a6a55 Mon Sep 17 00:00:00 2001 From: Austin Macdonald Date: Thu, 25 Apr 2024 16:29:57 -0500 Subject: [PATCH 16/20] Remove example code from friendly projects --- profilers/pydra.py | 310 --------------------------------------------- profilers/smon.py | 219 -------------------------------- 2 files changed, 529 deletions(-) delete mode 100644 profilers/pydra.py delete mode 100644 profilers/smon.py diff --git a/profilers/pydra.py b/profilers/pydra.py deleted file mode 100644 index 272dbe8c..00000000 --- a/profilers/pydra.py +++ /dev/null @@ -1,310 +0,0 @@ -"""Utilities to keep track of performance and resource utilization.""" - -from pathlib import Path -import psutil -import threading -from time import time - -# Init variables -_MB = 1024.0**2 - - -class ResourceMonitor(threading.Thread): - """A thread to monitor a specific PID with a certain frequency to a file.""" - - def __init__(self, pid, interval=5, logdir=None, fname=None): - """ - Initialize this monitor. - - Parameters - ---------- - pid : :obj:`int` - The process id (PID) - interval : :obj:`float` - Frequency with with the resources will be estimated. - logdir : :obj:`os.pathlike` - Filesystem location of the logging folder. - fname : :obj:`str` - Filename for the log file. - - """ - if fname is None: - fname = "proc-%d_time-%s_freq-%0.2f.log" % (pid, time(), interval) - if logdir is None: - logdir = Path.cwd() - self._fname = logdir / fname - self._logfile = open(self._fname, "w") - self._interval = interval - - # Leave process initialized and make first sample - self._process = psutil.Process(pid) - self._sample(cpu_interval=0.2) - - # Start thread - threading.Thread.__init__(self) - self._event = threading.Event() - - @property - def fname(self): - """Get/set the internal filename.""" - return self._fname - - def stop(self): - """Stop monitoring.""" - if not self._event.is_set(): - self._event.set() - self.join() - self._sample() - self._logfile.flush() - self._logfile.close() - - def _sample(self, cpu_interval=None): - cpu = 0.0 - rss = 0.0 - vms = 0.0 - try: - with self._process.oneshot(): - cpu += self._process.cpu_percent(interval=cpu_interval) - mem_info = self._process.memory_info() - rss += mem_info.rss - vms += mem_info.vms - except psutil.NoSuchProcess: - pass - - # Iterate through child processes and get number of their threads - try: - children = self._process.children(recursive=True) - except psutil.NoSuchProcess: - children = [] - - for child in children: - try: - with child.oneshot(): - cpu += child.cpu_percent() - mem_info = child.memory_info() - rss += mem_info.rss - vms += mem_info.vms - except psutil.NoSuchProcess: - pass - - print(f"{time():f},{cpu:f},{rss / _MB:f},{vms / _MB:f}", file=self._logfile) - self._logfile.flush() - - def run(self): - """Core monitoring function, called by start().""" - start_time = time() - wait_til = start_time - while not self._event.is_set(): - self._sample() - wait_til += self._interval - self._event.wait(max(0, wait_til - time())) - - -# Log node stats function -def log_nodes_cb(node, status): - """ - Record node run statistics to a log file as json dictionaries. - - Parameters - ---------- - node : nipype.pipeline.engine.Node - the node being logged - status : string - acceptable values are 'start', 'end'; otherwise it is - considered and error - - Returns - ------- - None - this function does not return any values, it logs the node - status info to the callback logger - - """ - if status != "end": - return - - # Import packages - import logging - import json - - status_dict = { - "name": node.name, - "id": node._id, - "start": getattr(node.result.runtime, "startTime"), - "finish": getattr(node.result.runtime, "endTime"), - "duration": getattr(node.result.runtime, "duration"), - "runtime_threads": getattr(node.result.runtime, "cpu_percent", "N/A"), - "runtime_memory_gb": getattr(node.result.runtime, "mem_peak_gb", "N/A"), - "estimated_memory_gb": node.mem_gb, - "num_threads": node.n_procs, - } - - if status_dict["start"] is None or status_dict["finish"] is None: - status_dict["error"] = True - - # Dump string to log - logging.getLogger("callback").debug(json.dumps(status_dict)) - - -def get_system_total_memory_gb(): - """Get the total RAM of the running system, in GB.""" - # Import packages - import os - import sys - - # Get memory - if "linux" in sys.platform: - with open("/proc/meminfo") as f_in: - meminfo_lines = f_in.readlines() - mem_total_line = [line for line in meminfo_lines if "MemTotal" in line][0] - mem_total = float(mem_total_line.split()[1]) - memory_gb = mem_total / (1024.0**2) - elif "darwin" in sys.platform: - mem_str = os.popen("sysctl hw.memsize").read().strip().split(" ")[-1] - memory_gb = float(mem_str) / (1024.0**3) - else: - err_msg = "System platform: %s is not supported" - raise Exception(err_msg) - - # Return memory - return memory_gb - - -def get_max_resources_used(pid, mem_mb, num_threads, pyfunc=False): - """ - Get the RAM and threads utilized by a given process. - - Parameters - ---------- - pid : integer - the process ID of process to profile - mem_mb : float - the high memory watermark so far during process execution (in MB) - num_threads: int - the high thread watermark so far during process execution - - Returns - ------- - mem_mb : float - the new high memory watermark of process (MB) - num_threads : float - the new high thread watermark of process - - """ - try: - mem_mb = max(mem_mb, _get_ram_mb(pid, pyfunc=pyfunc)) - num_threads = max(num_threads, _get_num_threads(pid)) - except Exception as exc: - print("Could not get resources used by process.\n%s", exc) - - return mem_mb, num_threads - - -# Get number of threads for process -def _get_num_threads(pid): - """ - Get the number of threads a process is using. - - Parameters - ---------- - pid : integer - the process ID of process to profile - - Returns - ------- - num_threads : int - the number of threads that the process is using - - """ - try: - proc = psutil.Process(pid) - # If process is running - if proc.status() == psutil.STATUS_RUNNING: - num_threads = proc.num_threads() - elif proc.num_threads() > 1: - tprocs = [psutil.Process(thr.id) for thr in proc.threads()] - alive_tprocs = [ - tproc for tproc in tprocs if tproc.status() == psutil.STATUS_RUNNING - ] - num_threads = len(alive_tprocs) - else: - num_threads = 1 - - child_threads = 0 - # Iterate through child processes and get number of their threads - for child in proc.children(recursive=True): - # Leaf process - if len(child.children()) == 0: - # If process is running, get its number of threads - if child.status() == psutil.STATUS_RUNNING: - child_thr = child.num_threads() - # If its not necessarily running, but still multi-threaded - elif child.num_threads() > 1: - # Cast each thread as a process and check for only running - tprocs = [psutil.Process(thr.id) for thr in child.threads()] - alive_tprocs = [ - tproc - for tproc in tprocs - if tproc.status() == psutil.STATUS_RUNNING - ] - child_thr = len(alive_tprocs) - # Otherwise, no threads are running - else: - child_thr = 0 - # Increment child threads - child_threads += child_thr - except psutil.NoSuchProcess: - return None - - # Number of threads is max between found active children and parent - num_threads = max(child_threads, num_threads) - - # Return number of threads found - return num_threads - - -# Get ram usage of process -def _get_ram_mb(pid, pyfunc=False): - """ - Get the RAM usage of a process and its children. - - Parameters - ---------- - pid : integer - the PID of the process to get RAM usage of - pyfunc : boolean (optional); default=False - a flag to indicate if the process is a python function; - when Pythons are multithreaded via multiprocess or threading, - children functions include their own memory + parents. if this - is set, the parent memory will removed from children memories - - - Returns - ------- - mem_mb : float - the memory RAM in MB utilized by the process PID - - See Also - -------- - See `this thread - `__. - - """ - try: - # Init parent - parent = psutil.Process(pid) - # Get memory of parent - parent_mem = parent.memory_info().rss - mem_mb = parent_mem / _MB - # Iterate through child processes - for child in parent.children(recursive=True): - child_mem = child.memory_info().rss - if pyfunc: - child_mem -= parent_mem - mem_mb += child_mem / _MB - except psutil.NoSuchProcess: - return None - - # Return memory - return mem_mb - diff --git a/profilers/smon.py b/profilers/smon.py deleted file mode 100644 index 55aa205d..00000000 --- a/profilers/smon.py +++ /dev/null @@ -1,219 +0,0 @@ -#!/usr/bin/env python3 - -import os -import subprocess -import json -import time -import sys -import shutil - -name="_smon.out" - -sid=os.getsid(os.getpid()) - - -def get_size(start_path = '.'): - total_size = 0 - for dirpath, dirnames, filenames in os.walk(start_path): - for f in filenames: - fp = os.path.join(dirpath, f) - total_size += os.path.getsize(fp) - return total_size - -with open(name, "w") as outfile: - - env = {} - #put some batch scheduler specific env - for k in os.environ: - if k.startswith(("PBS_", "SLURM_", "OSG")): - env[k] = os.environ[k] - - #figure out max PPN - max_ppn = os.sysconf('SC_NPROCESSORS_CONF') #default to all available cores - if "PBS_NUM_PPN" in os.environ: - max_ppn = int(os.environ["PBS_NUM_PPN"]) - #from https://slurm.schedmd.com/srun.html - if "SLURM_CPUS_ON_NODE" in os.environ: #incase SLURM_NTASKS is not set? - max_ppn = int(os.environ["SLURM_CPUS_ON_NODE"]) - if "SLURM_NTASKS" in os.environ: - max_ppn = int(os.environ["SLURM_NTASKS"]) - - #figure out max mem - max_mem = os.sysconf('SC_PAGE_SIZE') * os.sysconf('SC_PHYS_PAGES') - max_walltime = None - if "PBS_JOBID" in os.environ: - #PBS doesn't expose requested memory in ENV.. I need to query from qstat - out=subprocess.check_output(["qstat", "-f1", os.environ["PBS_JOBID"]]).decode("utf-8") - qstat={} - for line in out.split("\n"): - delpos = line.find("=") - if delpos == -1: continue - k=line[0:delpos].strip() - v=line[delpos+1:].strip() - qstat[k] = v - - if "Resource_List.vmem" in qstat: - #only carbonate has this - max_mem_str = qstat["Resource_List.vmem"] #64gb, etc.. - if max_mem_str.endswith("gb"): - max_mem = int(max_mem_str[0:-2])*1024*1024*1024 #pbs treats gb and GB the same.. - - if "Walltime.Remaining" in qstat: - max_walltime = int(qstat["Walltime.Remaining"]) - - if "SLURM_MEM_PER_NODE" in os.environ: - #Default units are megabytes unless the SchedulerParameters configuration parameter includes the "default_gbytes" option for gigabytes. - #https://slurm.schedmd.com/sbatch.html - max_mem = int(os.environ["SLURM_MEM_PER_NODE"])*1024*1024 - - if "PBS_WALLTIME" in os.environ: - max_walltime = int(os.environ["PBS_WALLTIME"]) - - #TODO - figure out how to find walltime for slurm - #https://confluence.csiro.au/display/SC/Reference+Guide%3A+Migrating+from+Torque+to+SLURMt - - #query for gpu info - gpus = None - if shutil.which("nvidia-smi") is not None: - try: - out=subprocess.check_output(["nvidia-smi", "--query-gpu=index,name,pci.bus_id,driver_version,memory.total,compute_mode", "--format=csv"]).decode("utf-8") - lines=out.strip().split("\n") - header=lines.pop(0) - gpus = [] - for line in lines: - cols=line.split(", ") - gpus.append({ - "index": cols[0], - "name": cols[1], - "bus_id": cols[2], - "driver_version": cols[3], - "memory.total": cols[4], - "compute_mode": cols[5], - }) - except subprocess.CalledProcessError as e: - print(e) - - #dump info that doesn't change on the first entry - json.dump({ - "time": time.time(), - "uname": os.uname(), #os/kernerl/hostname version - "cpu_total": os.sysconf('SC_NPROCESSORS_CONF'), - "cpu_requested": max_ppn, - - "gpus": gpus, - - "memory_total": os.sysconf('SC_PAGE_SIZE') * os.sysconf('SC_PHYS_PAGES'), - "memory_requested": max_mem, - - "walltime_requested": max_walltime, - - "sid": sid, - "uid": os.environ['USER'], - - "env": env, - - }, outfile) - - outfile.write("\n") - outfile.flush() - - #now start infinite loop! - while True: - - #query gpu usage - if gpus != None: - gpus = [] - out=subprocess.check_output(["nvidia-smi", "--query-gpu=index,name,pstate,temperature.gpu,utilization.gpu,utilization.memory", "--format=csv"]).decode("utf-8") - lines=out.strip().split("\n") - header=lines.pop(0) - for line in lines: - cols=line.split(", ") - gpus.append({ - "index": cols[0], - "name": cols[1], - "pstate": cols[2], #The current performance state for the GPU. States range from P0 (maximum performance) to P12 (minimum performance). - "temperature.gpu": int(cols[3]), - "utilization.gpu": int(cols[4][:-1]), #remove % - "utilization.memory": int(cols[5][:-1]), #remove % - #"fan.speed": int(cols[6][:-1]), #remove % ([N/A] on bridges2) - }) - - #query process under current session (query every 2 seconds for 1 minute) - processes = {} - for i in range(30): - out=subprocess.check_output(["ps", "-s", str(sid), "ho", "pid,pcpu,pmem,rss,vsz,etime,cmd"]).decode("utf-8") - for line in out.split("\n"): - if line == "": - continue - tokens=line.split() - pid=tokens[0] - pcpu=float(tokens[1]) - pmem=float(tokens[2]) - rss=int(tokens[3]) - vsz=int(tokens[4]) - etime=tokens[5] - cmd=' '.join(tokens[6:]) - - #ignore myself. - if cmd.startswith("ps -s"): - continue - - #ignore smon - if cmd.startswith("python ./smon"): - continue - - #etime == elapsed time .. don't include process that just got started? (TODO why did I do this?) - if etime == "00:00": - continue - - if not pid in processes: - processes[pid] = [] - processes[pid].append({"pid": pid, "pcpu": pcpu, "pmem": pmem, "rss": rss, "vsz": vsz, "etime": etime, "cmd": cmd}) - - time.sleep(2) - - #aggregate(max) processes list for each pid - processes_groups = [] - for pid in processes: - group = processes[pid] - last = group[len(group)-1] - agg = {"pid": last["pid"], "pcpu": 0, "pmem": 0, "rss": 0, "vsz": 0, "etime": last["etime"], "cmd": last["cmd"]} - #pick max value - for p in group: - agg["pcpu"]=max(agg["pcpu"], p["pcpu"]) - agg["pmem"]=max(agg["pmem"], p["pmem"]) - agg["rss"]=max(agg["rss"], p["rss"]) - agg["vsz"]=max(agg["vsz"], p["vsz"]) - - processes_groups.append(agg) - - #query disk usage - disks = [] - try: - out=subprocess.check_output(["du", "-s"]).decode("utf-8") - for line in out.split("\n"): - if line == "": - continue - tokens=line.split() - size=int(tokens[0]) - path=tokens[1] - disks.append({"path": path, "size": size}) - except subprocess.CalledProcessError as err: - #sometimes files go missing while running du.. but that's ok - print(err) - - json.dump({ - "time": time.time(), - "processes": processes_groups, - "disks": disks, - "gpus": gpus, - "memory_avail": os.sysconf('SC_PAGE_SIZE') * os.sysconf('SC_AVPHYS_PAGES'), - #"memory_avail": psutil.virtual_memory().available, - #"memory_used": psutil.virtual_memory().used, - #"memory_free": psutil.virtual_memory().free, - }, outfile) - - outfile.write("\n") - outfile.flush() - - From ec74b3085347ed4f92cd8d97b6e095e2f6aee0fb Mon Sep 17 00:00:00 2001 From: Austin Macdonald Date: Thu, 25 Apr 2024 16:55:46 -0500 Subject: [PATCH 17/20] Delete unused sandbox --- sandbox/abandoning_parent.sh | 23 ----------- sandbox/add_git_notes | 19 --------- sandbox/consume_mem.py | 32 --------------- sandbox/duct_time | 29 -------------- sandbox/initial-thoughts.md | 78 ------------------------------------ sandbox/profile.sh | 24 ----------- sandbox/top_test.sh | 38 ------------------ 7 files changed, 243 deletions(-) delete mode 100755 sandbox/abandoning_parent.sh delete mode 100755 sandbox/add_git_notes delete mode 100755 sandbox/consume_mem.py delete mode 100755 sandbox/duct_time delete mode 100644 sandbox/initial-thoughts.md delete mode 100755 sandbox/profile.sh delete mode 100755 sandbox/top_test.sh diff --git a/sandbox/abandoning_parent.sh b/sandbox/abandoning_parent.sh deleted file mode 100755 index e581cfdf..00000000 --- a/sandbox/abandoning_parent.sh +++ /dev/null @@ -1,23 +0,0 @@ -#!/bin/bash - -nchildren=$1 -shift - -for i in `seq 1 $nchildren`; do - "$@" & -done - -echo "Started $nchildren for $$" -pstree -c -p "$$" - -echo "Starting the same number in subprocesses" - -for i in `seq 1 $nchildren`; do - ( "$@" & ) -done - -jobs - -pstree -c -p "$$" -echo "waiting" -wait diff --git a/sandbox/add_git_notes b/sandbox/add_git_notes deleted file mode 100755 index 99f02e23..00000000 --- a/sandbox/add_git_notes +++ /dev/null @@ -1,19 +0,0 @@ -#!/bin/bash - -set -eu - -: ${DUCT_OUTPUT_PREFIX:=.duct/run-logs/`date --iso-8601=sec`_$$} - -export DUCT_OUTPUT_PREFIX - -STATS_FILE="${DUCT_OUTPUT_PREFIX}.stats.json" - -current_commit=$(git rev-parse HEAD) - -"$@" - -new_commit=$(git rev-parse HEAD) - -if [ "$current_commit" != "$new_commit" ] && [ -e "$STATS_FILE" ]; then - git notes add -m "$STATS_FILE" -F "$STATS_FILE" -fi diff --git a/sandbox/consume_mem.py b/sandbox/consume_mem.py deleted file mode 100755 index 5d0abed5..00000000 --- a/sandbox/consume_mem.py +++ /dev/null @@ -1,32 +0,0 @@ -#!/usr/bin/env python3 - -import os -import sys -import time - - -def consume_memory(size_in_mb): - """ - Consumes approximately `size_in_mb` megabytes of memory. - - Parameters: - size_in_mb (int): Amount of memory to consume in megabytes. - """ - bytes_per_mb = 1024 * 1024 # 1MB is 1024 * 1024 bytes - data = bytearray(size_in_mb * bytes_per_mb) - print(type(data)) - - -if __name__ == "__main__": - if len(sys.argv) != 3: - print("Usage: python consume_memory.py ") - sys.exit(1) - - print(f"PID of this process: {os.getpid()}") - size_in_mb = int(sys.argv[1]) - time_to_spend = int(sys.argv[2]) - - consume_memory(size_in_mb) - t0 = time.time() - while time.time() - t0 < time_to_spend: - pass diff --git a/sandbox/duct_time b/sandbox/duct_time deleted file mode 100755 index 549c51f9..00000000 --- a/sandbox/duct_time +++ /dev/null @@ -1,29 +0,0 @@ -#!/usr/bin/env bash -# e.g. `duct_time ./abandoning_parent.sh 5 ./consume_mem.py 100 200` - -: ${DUCT_OUTPUT_PREFIX:=.duct/run-logs/`date --iso-8601=sec`_$$} - -mkdir -p "$(dirname ${DUCT_OUTPUT_PREFIX})" - -STDOUT_FILE="${DUCT_OUTPUT_PREFIX}.out" -STDERR_FILE="${DUCT_OUTPUT_PREFIX}.err" -STATS_FILE="${DUCT_OUTPUT_PREFIX}.stats.json" - -# Be capable to support options to pass through stdout stderr - -/usr/bin/time \ - -o "$STATS_FILE" \ - --format='{"cpu-user-time": %U, "cpu-system-time": "%S", "cpu-perc": "%P", "memory-max-kb": %M, "memory-avg-kb": %K, "elapsed-time": %e, "cpu-time": %S }' \ - "$@" 2>"$STDERR_FILE" | tee "$STDOUT_FILE" - -# Option to add: remove empty files -for f in "$DUCT_OUTPUT_PREFIX"*; do - if [ -e "$f" ] && [ ! -s "$f" ]; then - echo "D: removing empty $f" >&2 - rm "$f" - fi -done - -echo "Logs are saved under $DUCT_OUTPUT_PREFIX*" >&2 -echo "stats: $(cat "$STATS_FILE")" - diff --git a/sandbox/initial-thoughts.md b/sandbox/initial-thoughts.md deleted file mode 100644 index ab827a81..00000000 --- a/sandbox/initial-thoughts.md +++ /dev/null @@ -1,78 +0,0 @@ -# con/duct-or - helper to execute and monitor a process - -The origin: in ReproMan (reproman run) we do already collect stdout/stderr but otherwise do not collect any statistics about execution of a command. -In DataLad (datalad run) we do not collect anything, and just passthrough the stdout/stderr. -But capturing/storing such logs are most often desired. -Moreover knowing maximum memory utilization could be critical for adequate resource requests for batch jobs. - -## ChatGPT prototype - -For the request/question: - -is there a helper for linux to run a command while that helper would also capture and store somewhere std output and stderr, while also monitoring underlying process(es) for their memory consumption, start/stop endtimes, etc and then provide a json-formatted record with that upon completion (also with the exit code of the command). - -it gave: - -```bash -#!/bin/bash - -# Command to execute -COMMAND="$*" - -# Redirect stdout and stderr to files -STDOUT_FILE=$(mktemp) -STDERR_FILE=$(mktemp) - -# Record start time -START_TIME=$(date +%s) - -# Execute command -eval "$COMMAND" 1>$STDOUT_FILE 2>$STDERR_FILE & -PID=$! - -# Wait for the command to finish -wait $PID -EXIT_CODE=$? - -# Record end time -END_TIME=$(date +%s) - -# Get memory usage (max resident set size) -MEMORY_USAGE=$(cat /proc/$PID/status | grep VmHWM | awk '{print $2}') - -# Generate JSON output -jq -n \ - --arg cmd "$COMMAND" \ - --arg stdout "$(cat $STDOUT_FILE)" \ - --arg stderr "$(cat $STDERR_FILE)" \ - --argjson exit_code "$EXIT_CODE" \ - --argjson start_time "$START_TIME" \ - --argjson end_time "$END_TIME" \ - --argjson memory_usage "$MEMORY_USAGE" \ - '{command: $cmd, stdout: $stdout, stderr: $stderr, exit_code: $exit_code, start_time: $start_time, end_time: $end_time, memory_usage_kb: $memory_usage}' - -# Clean up -rm $STDOUT_FILE $STDERR_FILE -``` - -## Desired features/usecases - -- be configurable. Most likely -- output filenames - -## Initial implementation details - -- Since destined ATM for tools which are in datalad space can just use DataLad's Runner construct, smth like - - out = Runner().run([cmd[0], '-c', 'datalad.dummy=this'] + cmd[1:], protocol=StdOutErrCapture) - -but we might want custom/derived protocol so we can capture and also at once output... - -- For monitoring -- we need stats on the entire "child tree" and it could be under singularity... I guess we would not be able to monitor server'ish docker - - - may be via cgroups somehow? - - may be there is already a tool? how brainlife does it? - -- Would be nice to collect it through time evolution to plots the graphs but then in the .json itself have only summaries over those monitored parameters - - - again -- look into brainlife? diff --git a/sandbox/profile.sh b/sandbox/profile.sh deleted file mode 100755 index 6a31b754..00000000 --- a/sandbox/profile.sh +++ /dev/null @@ -1,24 +0,0 @@ -DUCT_PROFILER="smon.py" -CHILD_PROCESSES=5 -MEM_KB=10 -HOLD_MEM_TIME=2 - - -ps -p $$ -o sid= -rm -f _smon.out -# TODO why doesnt CHILD_PROCESSES work? -# seq: invalid floating point argument: ‘./consume_mem.py’ -# To me looks like it thinks consume_mem.py is an int, which means that $CHILD_PROCESSES is emptystring? -# setsid bash -c 'ps -p $$ -o sid=; $DUCT_PROFILER & jobs ; ./abandoning_parent.sh $CHILD_PROCESSES ./consume_mem.py $MEM_KB $HOLD_MEM_TIME ; kill %1' - -# Does DUCT_PROFILER need to be run in same sid? Yes I think it does because it retrieves target SID from OS -setsid bash -c 'ps -p $$ -o sid=; $DUCT_PROFILER ./abandoning_parent.sh 5 ./consume_mem.py 100 5' - -# ./abandoning_parent.sh $CHILD_PROCESSES sleep 1 - - - -# ps -p $$ Process id of current script -# -o sid= only show SID, = hides header -# & jobs run cmd in background, show active jobs until they are done -# kill %1 kill current PID diff --git a/sandbox/top_test.sh b/sandbox/top_test.sh deleted file mode 100755 index 4fe005b3..00000000 --- a/sandbox/top_test.sh +++ /dev/null @@ -1,38 +0,0 @@ -#!/bin/bash - -OUTPUT_FILE=/work2/03201/jbwexler/frontera/duct_test/duct_output.txt -COMMAND="datalad containers-run \ - --container-name containers/bids-mriqc \ - --input sourcedata \ - --output . \ - '{inputs}' '{outputs}' participant --participant-label sub-02 -w workdir" - -rm "$OUTPUT_FILE" -$COMMAND & -# Get the process ID of the command and its child processes -PID=$(pgrep -d',' -P $$) - -# Function to clean up and exit -cleanup_exit() { - echo "Cleaning up..." - kill $PID # Kill the background process - exit 0 -} - -# Trap Ctrl+C and call the cleanup function -trap 'cleanup_exit' INT - -# Header for the output file -echo "Time MemoryUsage(kB) CPUUsage(%)" > $OUTPUT_FILE - -# Monitor CPU and memory usage every 10 seconds -while ps -p $PID > /dev/null; do - # Get the process IDs of the command and its child processes - PIDS=$(pgrep -d',' -P $PID) - - TIME=$(date +"%Y-%m-%d %H:%M:%S") - MEMORY_USAGE=$(top -p "$PIDS" -b -n 1 | awk 'NR>7 { sum += $6; } END { print sum; }') - CPU_USAGE=$(top -p "$PIDS" -b -n 1 | awk 'NR>7 { sum += $9; } END { print sum; }') - echo "$TIME $MEMORY_USAGE $CPU_USAGE" >> $OUTPUT_FILE - sleep 1 -done \ No newline at end of file From 0269ca789387671b7a315f0b51e56cf17b658c00 Mon Sep 17 00:00:00 2001 From: Austin Macdonald Date: Thu, 25 Apr 2024 20:35:01 -0500 Subject: [PATCH 18/20] Run smoke test with CI Rename to run-tests.sh so we can reuse datalad-installer verbatim --- .github/workflows/test.yaml | 95 +++++++++++++++++++++++++++++++++++ smoke-test.sh => run-tests.sh | 0 2 files changed, 95 insertions(+) create mode 100644 .github/workflows/test.yaml rename smoke-test.sh => run-tests.sh (100%) diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml new file mode 100644 index 00000000..e5504f27 --- /dev/null +++ b/.github/workflows/test.yaml @@ -0,0 +1,95 @@ +name: Test + +on: + pull_request: + branches: + - main + push: + branches: + - main + schedule: + - cron: '0 6 * * *' + +defaults: + run: + shell: bash + +concurrency: + group: ${{ github.workflow }}-${{ github.event_name }}-${{ github.ref_name }} + cancel-in-progress: true + +jobs: + test: + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + os: + - macos-12 + - macos-latest + - windows-latest + - ubuntu-latest + python-version: + - '3.7' + - '3.8' + - '3.9' + - '3.10' + - '3.11' + - '3.12' + - 'pypy-3.8' + - 'pypy-3.9' + - 'pypy-3.10' + toxenv: [py] + exclude: + # No older Pythons on arm64 macos-latest + - python-version: '3.7' + os: macos-latest + - python-version: '3.8' + os: macos-latest + - python-version: '3.9' + os: macos-latest + - python-version: 'pypy-3.8' + os: macos-latest + - python-version: 'pypy-3.9' + os: macos-latest + include: + - python-version: '3.7' + toxenv: lint + os: ubuntu-latest + - python-version: '3.7' + toxenv: typing + os: ubuntu-latest + steps: + - name: Check out repository + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Install dependencies + run: | + python -m pip install --upgrade pip wheel + python -m pip install --upgrade --upgrade-strategy=eager tox + + - name: Run tests + if: matrix.toxenv == 'py' + run: ./run-tests.sh + # run: ./run-tests.sh -e py -- -vv --ci --cov-report=xml + env: + GITHUB_TOKEN: ${{ secrets.GH_DOWNLOAD_TOKEN }} + + # - name: Run generic tests + # if: matrix.toxenv != 'py' + # run: tox -e ${{ matrix.toxenv }} + # + # - name: Upload coverage to Codecov + # if: matrix.toxenv == 'py' + # uses: codecov/codecov-action@v4 + # with: + # fail_ci_if_error: false + # token: ${{ secrets.CODECOV_TOKEN }} + # name: ${{ matrix.python-version }} + +# vim:set et sts=2: diff --git a/smoke-test.sh b/run-tests.sh similarity index 100% rename from smoke-test.sh rename to run-tests.sh From d7f47f742ea2147627e6aa12f9b736bb3cdab2b6 Mon Sep 17 00:00:00 2001 From: Austin Macdonald Date: Thu, 25 Apr 2024 20:56:05 -0500 Subject: [PATCH 19/20] disable some parts of datalad-installer, and install our code --- .github/workflows/test.yaml | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index e5504f27..2722d79e 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -73,17 +73,22 @@ jobs: python -m pip install --upgrade pip wheel python -m pip install --upgrade --upgrade-strategy=eager tox + - name: Install `duct` + run: pip install . + - name: Run tests - if: matrix.toxenv == 'py' + # if: matrix.toxenv == 'py' run: ./run-tests.sh # run: ./run-tests.sh -e py -- -vv --ci --cov-report=xml - env: - GITHUB_TOKEN: ${{ secrets.GH_DOWNLOAD_TOKEN }} + # env: + # GITHUB_TOKEN: ${{ secrets.GH_DOWNLOAD_TOKEN }} + # TODO # - name: Run generic tests # if: matrix.toxenv != 'py' # run: tox -e ${{ matrix.toxenv }} # + # TODO # - name: Upload coverage to Codecov # if: matrix.toxenv == 'py' # uses: codecov/codecov-action@v4 From 213eabd8641821b298193c7ea959e523a03368a9 Mon Sep 17 00:00:00 2001 From: Austin Macdonald Date: Thu, 25 Apr 2024 20:58:54 -0500 Subject: [PATCH 20/20] temporarily restrict matrix to save resources --- .github/workflows/test.yaml | 60 ++++++++++++++++++------------------- 1 file changed, 30 insertions(+), 30 deletions(-) diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 2722d79e..6b56035f 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -22,43 +22,43 @@ jobs: test: runs-on: ${{ matrix.os }} strategy: - fail-fast: false + fail-fast: true matrix: os: - - macos-12 - - macos-latest - - windows-latest + # - macos-12 + # - macos-latest + # - windows-latest - ubuntu-latest python-version: - - '3.7' - - '3.8' - - '3.9' - - '3.10' + # - '3.7' + # - '3.8' + # - '3.9' + # - '3.10' - '3.11' - '3.12' - - 'pypy-3.8' - - 'pypy-3.9' - - 'pypy-3.10' + # - 'pypy-3.8' + # - 'pypy-3.9' + # - 'pypy-3.10' toxenv: [py] - exclude: - # No older Pythons on arm64 macos-latest - - python-version: '3.7' - os: macos-latest - - python-version: '3.8' - os: macos-latest - - python-version: '3.9' - os: macos-latest - - python-version: 'pypy-3.8' - os: macos-latest - - python-version: 'pypy-3.9' - os: macos-latest - include: - - python-version: '3.7' - toxenv: lint - os: ubuntu-latest - - python-version: '3.7' - toxenv: typing - os: ubuntu-latest + # exclude: + # # No older Pythons on arm64 macos-latest + # - python-version: '3.7' + # os: macos-latest + # - python-version: '3.8' + # os: macos-latest + # - python-version: '3.9' + # os: macos-latest + # - python-version: 'pypy-3.8' + # os: macos-latest + # - python-version: 'pypy-3.9' + # os: macos-latest + # include: + # - python-version: '3.7' + # toxenv: lint + # os: ubuntu-latest + # - python-version: '3.7' + # toxenv: typing + # os: ubuntu-latest steps: - name: Check out repository uses: actions/checkout@v4