Skip to content

Commit

Permalink
Dry up maxing/aggregation logic
Browse files Browse the repository at this point in the history
  • Loading branch information
asmacdo committed May 28, 2024
1 parent 110acc4 commit 05f3d25
Showing 1 changed file with 13 additions and 14 deletions.
27 changes: 13 additions & 14 deletions src/duct.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,14 @@ def get_system_info(self):
except subprocess.CalledProcessError:
self.gpus = ["Failed to query GPU info"]

def update_max_resources(self, maxes, sample):
for pid in sample:
if pid in maxes:
for key, value in sample[pid].items():
maxes[pid][key] = max(maxes[pid].get(key, value), value)
else:
maxes[pid] = sample[pid]

def collect_sample(self):
process_data = {}
try:
Expand All @@ -120,7 +128,7 @@ def collect_sample(self):
if line:
pid, pcpu, pmem, rss, vsz, etime, cmd = line.split(maxsplit=6)

pid_sample = {
process_data[pid] = {
# %CPU
"pcpu": float(pcpu),
# %MEM
Expand All @@ -131,26 +139,16 @@ def collect_sample(self):
"vsz": int(vsz),
"timestamp": datetime.utcnow().isoformat() + "Z",
}
if pid in self._sample:
for key, value in pid_sample.items():
self._sample[pid][key] = max(
self._sample[pid].get(key, value), value
)
else:
self._sample[pid] = pid_sample

except subprocess.CalledProcessError:
process_data["error"] = "Failed to query process data"
return process_data

def write_pid_samples(self):
resource_stats_log_path = f"{self.output_prefix}usage.json"
for pid, pinfo in self._sample.items():
print(pid)
print(self.datetime_filesafe)
pid_resources_log_path = resource_stats_log_path.format(
pid=pid, datetime_filesafe=self.datetime_filesafe
)
print(pid_resources_log_path)
ensure_directories(pid_resources_log_path)
with open(pid_resources_log_path, "a") as resource_statistics_log:
resource_statistics_log.write(json.dumps(pinfo) + "\n")
Expand Down Expand Up @@ -192,9 +190,11 @@ def monitor_process(report, process, report_interval, sample_interval):
if process.poll() is not None: # the passthrough command has finished
break
# print(f"Resource stats log path: {resource_stats_log_path}")
report.collect_sample()
sample = report.collect_sample()
report.update_max_resources(report._sample, sample)
if report.elapsed_time >= (report.number + 1) * report_interval:
report.write_pid_samples()
report.update_max_resources(report.max_values, report._sample)
report._sample = defaultdict(dict) # Reset sample
report.number += 1
time.sleep(sample_interval)
Expand Down Expand Up @@ -355,7 +355,6 @@ def main():
formatted_output_prefix = args.output_prefix.format(
datetime_filesafe=datetime_filesafe, pid=duct_pid
)
print(formatted_output_prefix)
ensure_directories(formatted_output_prefix)
stdout, stderr = prepare_outputs(
args.capture_outputs, args.outputs, formatted_output_prefix
Expand Down

0 comments on commit 05f3d25

Please sign in to comment.