From 05f3d253706bfadb2ae5fa308eecc5c9673cda9f Mon Sep 17 00:00:00 2001 From: Austin Macdonald Date: Tue, 28 May 2024 10:49:05 -0500 Subject: [PATCH] Dry up maxing/aggregation logic --- src/duct.py | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/src/duct.py b/src/duct.py index a2cf5f6e..14762998 100755 --- a/src/duct.py +++ b/src/duct.py @@ -103,6 +103,14 @@ def get_system_info(self): except subprocess.CalledProcessError: self.gpus = ["Failed to query GPU info"] + def update_max_resources(self, maxes, sample): + for pid in sample: + if pid in maxes: + for key, value in sample[pid].items(): + maxes[pid][key] = max(maxes[pid].get(key, value), value) + else: + maxes[pid] = sample[pid] + def collect_sample(self): process_data = {} try: @@ -120,7 +128,7 @@ def collect_sample(self): if line: pid, pcpu, pmem, rss, vsz, etime, cmd = line.split(maxsplit=6) - pid_sample = { + process_data[pid] = { # %CPU "pcpu": float(pcpu), # %MEM @@ -131,26 +139,16 @@ def collect_sample(self): "vsz": int(vsz), "timestamp": datetime.utcnow().isoformat() + "Z", } - if pid in self._sample: - for key, value in pid_sample.items(): - self._sample[pid][key] = max( - self._sample[pid].get(key, value), value - ) - else: - self._sample[pid] = pid_sample - except subprocess.CalledProcessError: process_data["error"] = "Failed to query process data" + return process_data def write_pid_samples(self): resource_stats_log_path = f"{self.output_prefix}usage.json" for pid, pinfo in self._sample.items(): - print(pid) - print(self.datetime_filesafe) pid_resources_log_path = resource_stats_log_path.format( pid=pid, datetime_filesafe=self.datetime_filesafe ) - print(pid_resources_log_path) ensure_directories(pid_resources_log_path) with open(pid_resources_log_path, "a") as resource_statistics_log: resource_statistics_log.write(json.dumps(pinfo) + "\n") @@ -192,9 +190,11 @@ def monitor_process(report, process, report_interval, sample_interval): if process.poll() is not None: # the passthrough command has finished break # print(f"Resource stats log path: {resource_stats_log_path}") - report.collect_sample() + sample = report.collect_sample() + report.update_max_resources(report._sample, sample) if report.elapsed_time >= (report.number + 1) * report_interval: report.write_pid_samples() + report.update_max_resources(report.max_values, report._sample) report._sample = defaultdict(dict) # Reset sample report.number += 1 time.sleep(sample_interval) @@ -355,7 +355,6 @@ def main(): formatted_output_prefix = args.output_prefix.format( datetime_filesafe=datetime_filesafe, pid=duct_pid ) - print(formatted_output_prefix) ensure_directories(formatted_output_prefix) stdout, stderr = prepare_outputs( args.capture_outputs, args.outputs, formatted_output_prefix