Skip to content

Commit

Permalink
Add dedicated types for process stats *et alii*
Browse files Browse the repository at this point in the history
  • Loading branch information
jwodder committed Jun 11, 2024
1 parent 8c6989c commit c0b9e6a
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 89 deletions.
136 changes: 84 additions & 52 deletions src/duct.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
from collections import defaultdict
from collections.abc import Iterable
from dataclasses import asdict, dataclass
from dataclasses import asdict, dataclass, field
from datetime import datetime
import json
import os
Expand Down Expand Up @@ -38,6 +37,59 @@ class SystemInfo:
cpu_total: int


@dataclass
class ProcessStats:
# %CPU
pcpu: float
# %MEM
pmem: float
# Memory Resident Set Size
rss: int
# Virtual Memory size
vsz: int
timestamp: str

def max(self, other: ProcessStats) -> ProcessStats:
return ProcessStats(
pcpu=max(self.pcpu, other.pcpu),
pmem=max(self.pmem, other.pmem),
rss=max(self.rss, other.rss),
vsz=max(self.vsz, other.vsz),
timestamp=max(self.timestamp, other.timestamp),
)


@dataclass
class Sample:
stats: dict[int, ProcessStats] = field(default_factory=dict)
total_pmem: float = 0.0
total_pcpu: float = 0.0

def add(self, pid: int, stats: ProcessStats) -> None:
self.total_pmem += stats.pmem
self.total_pcpu += stats.pcpu
self.stats[pid] = stats

def max(self: Sample, other: Sample) -> Sample:
output = Sample()
for pid in self.stats.keys() | other.stats.keys():
if (mine := self.stats.get(pid)) is not None:
if (theirs := other.stats.get(pid)) is not None:
output.add(pid, mine.max(theirs))
else:
output.add(pid, mine)
else:
output.add(pid, other.stats[pid])
output.total_pmem = max(self.total_pmem, other.total_pmem)
output.total_pcpu = max(self.total_pcpu, other.total_pcpu)
return output

def for_json(self) -> dict[str, Any]:
d = {str(pid): asdict(stats) for pid, stats in self.stats.items()}
d["totals"] = {"pmem": self.total_pmem, "pcpu": self.total_pcpu}
return d


class Report:
"""Top level report"""

Expand All @@ -59,9 +111,9 @@ def __init__(
self.number = 0
self.system_info: SystemInfo | None = None
self.output_prefix = output_prefix
self.max_values: dict[str, dict[str, Any]] = defaultdict(dict)
self.max_values = Sample()
self.process = process
self._sample: dict[str, dict[str, Any]] = defaultdict(dict)
self._sample: Sample | None = None
self.datetime_filesafe = datetime_filesafe
self.end_time: float | None = None
self.run_time_seconds: str | None = None
Expand Down Expand Up @@ -107,31 +159,9 @@ def get_system_info(self) -> None:
except subprocess.CalledProcessError:
self.gpus = None

def calculate_total_usage(
self, sample: dict[str, dict[str, Any]]
) -> dict[str, dict[str, float]]:
pmem = 0.0
pcpu = 0.0
for _pid, pinfo in sample.items():
pmem += pinfo["pmem"]
pcpu += pinfo["pcpu"]
totals = {"totals": {"pmem": pmem, "pcpu": pcpu}}
return totals

@staticmethod
def update_max_resources(
maxes: dict[str, dict[str, Any]], sample: dict[str, dict[str, Any]]
) -> None:
for pid in sample:
if pid in maxes:
for key, value in sample[pid].items():
maxes[pid][key] = max(maxes[pid].get(key, value), value)
else:
maxes[pid] = sample[pid].copy()

def collect_sample(self) -> dict[str, dict[str, int | float | str]]:
def collect_sample(self) -> Sample:
assert self.session_id is not None
process_data: dict[str, dict[str, int | float | str]] = {}
sample = Sample()
try:
output = subprocess.check_output(
[
Expand All @@ -146,29 +176,29 @@ def collect_sample(self) -> dict[str, dict[str, int | float | str]]:
for line in output.splitlines()[1:]:
if line:
pid, pcpu, pmem, rss, vsz, etime, cmd = line.split(maxsplit=6)
process_data[pid] = {
# %CPU
"pcpu": float(pcpu),
# %MEM
"pmem": float(pmem),
# Memory Resident Set Size
"rss": int(rss),
# Virtual Memory size
"vsz": int(vsz),
"timestamp": datetime.now().astimezone().isoformat(),
}
sample.add(
int(pid),
ProcessStats(
pcpu=float(pcpu),
pmem=float(pmem),
rss=int(rss),
vsz=int(vsz),
timestamp=datetime.now().astimezone().isoformat(),
),
)
except subprocess.CalledProcessError:
pass
return process_data
return sample

def write_pid_samples(self) -> None:
assert self._sample is not None
resource_stats_log_path = f"{self.output_prefix}usage.json"
with open(resource_stats_log_path, "a") as resource_statistics_log:
resource_statistics_log.write(json.dumps(self._sample) + "\n")
resource_statistics_log.write(json.dumps(self._sample.for_json()) + "\n")

def print_max_values(self) -> None:
for pid, maxes in self.max_values.items():
print(f"PID {pid} Maximum Values: {maxes}")
for pid, maxes in self.max_values.stats.items():
print(f"PID {pid} Maximum Values: {asdict(maxes)}")

Check warning on line 201 in src/duct.py

View check run for this annotation

Codecov / codecov/patch

src/duct.py#L201

Added line #L201 was not covered by tests

def finalize(self) -> None:
if not self.process.returncode:
Expand All @@ -180,10 +210,12 @@ def finalize(self) -> None:
print(f"Log files location: {self.output_prefix}")
print(f"Wall Clock Time: {self.elapsed_time:.3f} sec")
print(
f"Memory Peak Usage: {self.max_values.get('totals', {}).get('pmem', 'unknown')}%"
"Memory Peak Usage:",
f"{self.max_values.total_pmem}%" if self.max_values.stats else "unknown%",
)
print(
f"CPU Peak Usage: {self.max_values.get('totals', {}).get('pcpu', 'unknown')}%"
"CPU Peak Usage:",
f"{self.max_values.total_pcpu}%" if self.max_values.stats else "unknown%",
)

def __repr__(self) -> str:
Expand Down Expand Up @@ -308,13 +340,13 @@ def monitor_process(
break
# print(f"Resource stats log path: {resource_stats_log_path}")
sample = report.collect_sample()
totals = report.calculate_total_usage(sample)
report.update_max_resources(sample, totals)
report.update_max_resources(report._sample, sample)
report._sample = (
report._sample.max(sample) if report._sample is not None else sample
)
if report.elapsed_time >= report.number * report_interval:
report.write_pid_samples()
report.update_max_resources(report.max_values, report._sample)
report._sample = defaultdict(dict) # Reset sample
report.max_values = report.max_values.max(report._sample)
report._sample = None # Reset sample
report.number += 1


Expand Down Expand Up @@ -499,8 +531,8 @@ def execute(args: Arguments) -> None:
monitoring_thread.join()

# If we have any extra samples that haven't been written yet, do it now
if report._sample:
report.update_max_resources(report.max_values, report._sample)
if report._sample is not None:
report.max_values = report.max_values.max(report._sample)
report.write_pid_samples()
report.process = process
report.finalize()
Expand Down
85 changes: 48 additions & 37 deletions test/test_report.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,49 @@
from __future__ import annotations
from collections import defaultdict
from duct import Report

ex0 = {"pid1": {"pcpu": 0.0}}
ex1 = {"pid1": {"pcpu": 1.0}}
ex2 = {"pid2": {"pcpu": 1.0}}
ex2pids = {"pid1": {"pcpu": 0.0}, "pid2": {"pcpu": 0.0}}


def test_update_max_resources_initial_values_one_pid() -> None:
maxes: dict[str, dict[str, float]] = defaultdict(dict)
Report.update_max_resources(maxes, ex0)
assert maxes == ex0


def test_update_max_resources_max_values_one_pid() -> None:
maxes: dict[str, dict[str, float]] = defaultdict(dict)
Report.update_max_resources(maxes, ex0)
Report.update_max_resources(maxes, ex1)
assert maxes == ex1


def test_update_max_resources_initial_values_two_pids() -> None:
maxes: dict[str, dict[str, float]] = defaultdict(dict)
Report.update_max_resources(maxes, ex2pids)
assert maxes == ex2pids


def test_update_max_resources_max_update_values_two_pids() -> None:
maxes: dict[str, dict[str, float]] = defaultdict(dict)
Report.update_max_resources(maxes, ex2pids)
Report.update_max_resources(maxes, ex1)
Report.update_max_resources(maxes, ex2)
assert maxes.keys() == ex2pids.keys()
assert maxes != ex2pids
assert maxes["pid1"] == ex1["pid1"]
assert maxes["pid2"] == ex2["pid2"]
from duct import ProcessStats, Sample

stat0 = ProcessStats(
pcpu=0.0, pmem=0, rss=0, vsz=0, timestamp="2024-06-11T10:09:37-04:00"
)

stat1 = ProcessStats(
pcpu=1.0, pmem=0, rss=0, vsz=0, timestamp="2024-06-11T10:13:23-04:00"
)


def test_sample_max_initial_values_one_pid() -> None:
maxes = Sample()
ex0 = Sample()
ex0.add(1, stat0)
maxes = maxes.max(ex0)
assert maxes.stats == {1: stat0}


def test_sample_max_one_pid() -> None:
maxes = Sample()
maxes.add(1, stat0)
ex1 = Sample()
ex1.add(1, stat1)
maxes = maxes.max(ex1)
assert maxes.stats == {1: stat1}


def test_sample_max_initial_values_two_pids() -> None:
maxes = Sample()
ex0 = Sample()
ex0.add(1, stat0)
ex0.add(2, stat0)
maxes = maxes.max(ex0)
assert maxes.stats == {1: stat0, 2: stat0}


def test_sample_maxtwo_pids() -> None:
maxes = Sample()
maxes.add(1, stat0)
maxes.add(2, stat0)
ex1 = Sample()
ex1.add(1, stat1)
maxes = maxes.max(ex1)
ex2 = Sample()
ex2.add(2, stat1)
maxes = maxes.max(ex2)
assert maxes.stats == {1: stat1, 2: stat1}

0 comments on commit c0b9e6a

Please sign in to comment.