From c0b9e6a782a7a3d3105fbc08f80dcd17ccc9c3be Mon Sep 17 00:00:00 2001 From: "John T. Wodder II" Date: Tue, 11 Jun 2024 10:23:28 -0400 Subject: [PATCH] Add dedicated types for process stats *et alii* --- src/duct.py | 136 +++++++++++++++++++++++++++----------------- test/test_report.py | 85 +++++++++++++++------------ 2 files changed, 132 insertions(+), 89 deletions(-) diff --git a/src/duct.py b/src/duct.py index 89e476f5..64584097 100755 --- a/src/duct.py +++ b/src/duct.py @@ -1,9 +1,8 @@ #!/usr/bin/env python3 from __future__ import annotations import argparse -from collections import defaultdict from collections.abc import Iterable -from dataclasses import asdict, dataclass +from dataclasses import asdict, dataclass, field from datetime import datetime import json import os @@ -38,6 +37,59 @@ class SystemInfo: cpu_total: int +@dataclass +class ProcessStats: + # %CPU + pcpu: float + # %MEM + pmem: float + # Memory Resident Set Size + rss: int + # Virtual Memory size + vsz: int + timestamp: str + + def max(self, other: ProcessStats) -> ProcessStats: + return ProcessStats( + pcpu=max(self.pcpu, other.pcpu), + pmem=max(self.pmem, other.pmem), + rss=max(self.rss, other.rss), + vsz=max(self.vsz, other.vsz), + timestamp=max(self.timestamp, other.timestamp), + ) + + +@dataclass +class Sample: + stats: dict[int, ProcessStats] = field(default_factory=dict) + total_pmem: float = 0.0 + total_pcpu: float = 0.0 + + def add(self, pid: int, stats: ProcessStats) -> None: + self.total_pmem += stats.pmem + self.total_pcpu += stats.pcpu + self.stats[pid] = stats + + def max(self: Sample, other: Sample) -> Sample: + output = Sample() + for pid in self.stats.keys() | other.stats.keys(): + if (mine := self.stats.get(pid)) is not None: + if (theirs := other.stats.get(pid)) is not None: + output.add(pid, mine.max(theirs)) + else: + output.add(pid, mine) + else: + output.add(pid, other.stats[pid]) + output.total_pmem = max(self.total_pmem, other.total_pmem) + output.total_pcpu = max(self.total_pcpu, other.total_pcpu) + return output + + def for_json(self) -> dict[str, Any]: + d = {str(pid): asdict(stats) for pid, stats in self.stats.items()} + d["totals"] = {"pmem": self.total_pmem, "pcpu": self.total_pcpu} + return d + + class Report: """Top level report""" @@ -59,9 +111,9 @@ def __init__( self.number = 0 self.system_info: SystemInfo | None = None self.output_prefix = output_prefix - self.max_values: dict[str, dict[str, Any]] = defaultdict(dict) + self.max_values = Sample() self.process = process - self._sample: dict[str, dict[str, Any]] = defaultdict(dict) + self._sample: Sample | None = None self.datetime_filesafe = datetime_filesafe self.end_time: float | None = None self.run_time_seconds: str | None = None @@ -107,31 +159,9 @@ def get_system_info(self) -> None: except subprocess.CalledProcessError: self.gpus = None - def calculate_total_usage( - self, sample: dict[str, dict[str, Any]] - ) -> dict[str, dict[str, float]]: - pmem = 0.0 - pcpu = 0.0 - for _pid, pinfo in sample.items(): - pmem += pinfo["pmem"] - pcpu += pinfo["pcpu"] - totals = {"totals": {"pmem": pmem, "pcpu": pcpu}} - return totals - - @staticmethod - def update_max_resources( - maxes: dict[str, dict[str, Any]], sample: dict[str, dict[str, Any]] - ) -> None: - for pid in sample: - if pid in maxes: - for key, value in sample[pid].items(): - maxes[pid][key] = max(maxes[pid].get(key, value), value) - else: - maxes[pid] = sample[pid].copy() - - def collect_sample(self) -> dict[str, dict[str, int | float | str]]: + def collect_sample(self) -> Sample: assert self.session_id is not None - process_data: dict[str, dict[str, int | float | str]] = {} + sample = Sample() try: output = subprocess.check_output( [ @@ -146,29 +176,29 @@ def collect_sample(self) -> dict[str, dict[str, int | float | str]]: for line in output.splitlines()[1:]: if line: pid, pcpu, pmem, rss, vsz, etime, cmd = line.split(maxsplit=6) - process_data[pid] = { - # %CPU - "pcpu": float(pcpu), - # %MEM - "pmem": float(pmem), - # Memory Resident Set Size - "rss": int(rss), - # Virtual Memory size - "vsz": int(vsz), - "timestamp": datetime.now().astimezone().isoformat(), - } + sample.add( + int(pid), + ProcessStats( + pcpu=float(pcpu), + pmem=float(pmem), + rss=int(rss), + vsz=int(vsz), + timestamp=datetime.now().astimezone().isoformat(), + ), + ) except subprocess.CalledProcessError: pass - return process_data + return sample def write_pid_samples(self) -> None: + assert self._sample is not None resource_stats_log_path = f"{self.output_prefix}usage.json" with open(resource_stats_log_path, "a") as resource_statistics_log: - resource_statistics_log.write(json.dumps(self._sample) + "\n") + resource_statistics_log.write(json.dumps(self._sample.for_json()) + "\n") def print_max_values(self) -> None: - for pid, maxes in self.max_values.items(): - print(f"PID {pid} Maximum Values: {maxes}") + for pid, maxes in self.max_values.stats.items(): + print(f"PID {pid} Maximum Values: {asdict(maxes)}") def finalize(self) -> None: if not self.process.returncode: @@ -180,10 +210,12 @@ def finalize(self) -> None: print(f"Log files location: {self.output_prefix}") print(f"Wall Clock Time: {self.elapsed_time:.3f} sec") print( - f"Memory Peak Usage: {self.max_values.get('totals', {}).get('pmem', 'unknown')}%" + "Memory Peak Usage:", + f"{self.max_values.total_pmem}%" if self.max_values.stats else "unknown%", ) print( - f"CPU Peak Usage: {self.max_values.get('totals', {}).get('pcpu', 'unknown')}%" + "CPU Peak Usage:", + f"{self.max_values.total_pcpu}%" if self.max_values.stats else "unknown%", ) def __repr__(self) -> str: @@ -308,13 +340,13 @@ def monitor_process( break # print(f"Resource stats log path: {resource_stats_log_path}") sample = report.collect_sample() - totals = report.calculate_total_usage(sample) - report.update_max_resources(sample, totals) - report.update_max_resources(report._sample, sample) + report._sample = ( + report._sample.max(sample) if report._sample is not None else sample + ) if report.elapsed_time >= report.number * report_interval: report.write_pid_samples() - report.update_max_resources(report.max_values, report._sample) - report._sample = defaultdict(dict) # Reset sample + report.max_values = report.max_values.max(report._sample) + report._sample = None # Reset sample report.number += 1 @@ -499,8 +531,8 @@ def execute(args: Arguments) -> None: monitoring_thread.join() # If we have any extra samples that haven't been written yet, do it now - if report._sample: - report.update_max_resources(report.max_values, report._sample) + if report._sample is not None: + report.max_values = report.max_values.max(report._sample) report.write_pid_samples() report.process = process report.finalize() diff --git a/test/test_report.py b/test/test_report.py index 5cdb9916..0846e9da 100644 --- a/test/test_report.py +++ b/test/test_report.py @@ -1,38 +1,49 @@ from __future__ import annotations -from collections import defaultdict -from duct import Report - -ex0 = {"pid1": {"pcpu": 0.0}} -ex1 = {"pid1": {"pcpu": 1.0}} -ex2 = {"pid2": {"pcpu": 1.0}} -ex2pids = {"pid1": {"pcpu": 0.0}, "pid2": {"pcpu": 0.0}} - - -def test_update_max_resources_initial_values_one_pid() -> None: - maxes: dict[str, dict[str, float]] = defaultdict(dict) - Report.update_max_resources(maxes, ex0) - assert maxes == ex0 - - -def test_update_max_resources_max_values_one_pid() -> None: - maxes: dict[str, dict[str, float]] = defaultdict(dict) - Report.update_max_resources(maxes, ex0) - Report.update_max_resources(maxes, ex1) - assert maxes == ex1 - - -def test_update_max_resources_initial_values_two_pids() -> None: - maxes: dict[str, dict[str, float]] = defaultdict(dict) - Report.update_max_resources(maxes, ex2pids) - assert maxes == ex2pids - - -def test_update_max_resources_max_update_values_two_pids() -> None: - maxes: dict[str, dict[str, float]] = defaultdict(dict) - Report.update_max_resources(maxes, ex2pids) - Report.update_max_resources(maxes, ex1) - Report.update_max_resources(maxes, ex2) - assert maxes.keys() == ex2pids.keys() - assert maxes != ex2pids - assert maxes["pid1"] == ex1["pid1"] - assert maxes["pid2"] == ex2["pid2"] +from duct import ProcessStats, Sample + +stat0 = ProcessStats( + pcpu=0.0, pmem=0, rss=0, vsz=0, timestamp="2024-06-11T10:09:37-04:00" +) + +stat1 = ProcessStats( + pcpu=1.0, pmem=0, rss=0, vsz=0, timestamp="2024-06-11T10:13:23-04:00" +) + + +def test_sample_max_initial_values_one_pid() -> None: + maxes = Sample() + ex0 = Sample() + ex0.add(1, stat0) + maxes = maxes.max(ex0) + assert maxes.stats == {1: stat0} + + +def test_sample_max_one_pid() -> None: + maxes = Sample() + maxes.add(1, stat0) + ex1 = Sample() + ex1.add(1, stat1) + maxes = maxes.max(ex1) + assert maxes.stats == {1: stat1} + + +def test_sample_max_initial_values_two_pids() -> None: + maxes = Sample() + ex0 = Sample() + ex0.add(1, stat0) + ex0.add(2, stat0) + maxes = maxes.max(ex0) + assert maxes.stats == {1: stat0, 2: stat0} + + +def test_sample_maxtwo_pids() -> None: + maxes = Sample() + maxes.add(1, stat0) + maxes.add(2, stat0) + ex1 = Sample() + ex1.add(1, stat1) + maxes = maxes.max(ex1) + ex2 = Sample() + ex2.add(2, stat1) + maxes = maxes.max(ex2) + assert maxes.stats == {1: stat1, 2: stat1}