Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use None rather than 0 prior to measurement #135

Merged
merged 4 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 60 additions & 50 deletions src/con_duct/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import sys
import threading
import time
from typing import IO, Any, TextIO
from typing import IO, Any, Optional, TextIO
from . import __version__

ENV_PREFIXES = ("PBS_", "SLURM_", "OSG")
Expand Down Expand Up @@ -162,19 +162,34 @@ def prepare_paths(self, clobber: bool, capture_outputs: Outputs) -> None:

@dataclass
class Averages:
rss: float = 0.0
vsz: float = 0.0
pmem: float = 0.0
pcpu: float = 0.0
rss: Optional[float] = None
vsz: Optional[float] = None
pmem: Optional[float] = None
pcpu: Optional[float] = None
num_samples: int = 0

def update(self: Averages, other: Sample) -> None:
assert_num(other.total_rss, other.total_vsz, other.total_pmem, other.total_pcpu)
self.num_samples += 1
self.rss += (other.total_rss - self.rss) / self.num_samples
self.vsz += (other.total_vsz - self.vsz) / self.num_samples
self.pmem += (other.total_pmem - self.pmem) / self.num_samples
self.pcpu += (other.total_pcpu - self.pcpu) / self.num_samples
if not self.num_samples:
self.num_samples += 1
self.rss = other.total_rss
self.vsz = other.total_vsz
self.pmem = other.total_pmem
self.pcpu = other.total_pcpu
else:
assert self.rss is not None
assert self.vsz is not None
assert self.pmem is not None
assert self.pcpu is not None
assert other.total_rss is not None
assert other.total_vsz is not None
assert other.total_pmem is not None
assert other.total_pcpu is not None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a note since not relevant to this PR: I have odd feeling here about code that this self is having assumptions about other (Sample)

self.num_samples += 1
self.rss += (other.total_rss - self.rss) / self.num_samples
self.vsz += (other.total_vsz - self.vsz) / self.num_samples
self.pmem += (other.total_pmem - self.pmem) / self.num_samples
self.pcpu += (other.total_pcpu - self.pcpu) / self.num_samples
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just another note: I feel odd about this math here... unrelated to this PR though, so just a note.


@classmethod
def from_sample(cls, sample: Sample) -> Averages:
Expand All @@ -194,17 +209,17 @@ def from_sample(cls, sample: Sample) -> Averages:
class Sample:
stats: dict[int, ProcessStats] = field(default_factory=dict)
averages: Averages = field(default_factory=Averages)
total_rss: int = 0
total_vsz: int = 0
total_pmem: float = 0.0
total_pcpu: float = 0.0
total_rss: Optional[int] = None
total_vsz: Optional[int] = None
total_pmem: Optional[float] = None
total_pcpu: Optional[float] = None
timestamp: str = "" # TS of last sample collected

def add_pid(self, pid: int, stats: ProcessStats) -> None:
self.total_rss += stats.rss
self.total_vsz += stats.vsz
self.total_pmem += stats.pmem
self.total_pcpu += stats.pcpu
self.total_rss = self.total_rss or 0 + stats.rss
self.total_vsz = self.total_vsz or 0 + stats.vsz
self.total_pmem = self.total_vsz or 0 + stats.pmem
self.total_pcpu = self.total_pcpu or 0 + stats.pcpu
self.stats[pid] = stats
self.timestamp = max(self.timestamp, stats.timestamp)

Expand All @@ -218,10 +233,14 @@ def max(self: Sample, other: Sample) -> Sample:
output.add_pid(pid, mine)
else:
output.add_pid(pid, other.stats[pid])
output.total_pmem = max(self.total_pmem, other.total_pmem)
output.total_pcpu = max(self.total_pcpu, other.total_pcpu)
output.total_rss = max(self.total_rss, other.total_rss)
output.total_vsz = max(self.total_vsz, other.total_vsz)
assert other.total_pmem is not None
assert other.total_pcpu is not None
assert other.total_rss is not None
assert other.total_vsz is not None
output.total_pmem = max(self.total_pmem or 0, other.total_pmem)
output.total_pcpu = max(self.total_pcpu or 0, other.total_pcpu)
output.total_rss = max(self.total_rss or 0, other.total_rss)
output.total_vsz = max(self.total_vsz or 0, other.total_vsz)
return output

def for_json(self) -> dict[str, Any]:
Expand Down Expand Up @@ -312,7 +331,7 @@ def get_system_info(self) -> None:
except subprocess.CalledProcessError:
self.gpus = None

def collect_sample(self) -> Sample:
def collect_sample(self) -> Optional[Sample]:
assert self.session_id is not None
sample = Sample()
try:
Expand Down Expand Up @@ -341,8 +360,8 @@ def collect_sample(self) -> Sample:
timestamp=datetime.now().astimezone().isoformat(),
),
)
except subprocess.CalledProcessError:
pass
except subprocess.CalledProcessError: # when session_id has no processes
return None
return sample

def write_subreport(self) -> None:
Expand All @@ -359,30 +378,14 @@ def execution_summary(self) -> dict[str, Any]:
"command": self.command,
"logs_prefix": self.log_paths.prefix,
"wall_clock_time": self.elapsed_time,
"peak_rss": (
self.max_values.total_rss if self.max_values.stats else "unknown"
),
"average_rss": (
self.averages.rss if self.averages.num_samples >= 1 else "unknown"
),
"peak_vsz": (
self.max_values.total_vsz if self.max_values.stats else "unknown"
),
"average_vsz": (
self.averages.vsz if self.averages.num_samples >= 1 else "unknown"
),
"peak_pmem": (
self.max_values.total_pmem if self.max_values.stats else "unknown"
),
"average_pmem": (
self.averages.pmem if self.averages.num_samples >= 1 else "unknown"
),
"peak_pcpu": (
self.max_values.total_pcpu if self.max_values.stats else "unknown"
),
"average_pcpu": (
self.averages.pcpu if self.averages.num_samples >= 1 else "unknown"
),
"peak_rss": self.max_values.total_rss,
"average_rss": self.averages.rss,
"peak_vsz": self.max_values.total_vsz,
"average_vsz": self.averages.vsz,
"peak_pmem": self.max_values.total_pmem,
"average_pmem": self.averages.pmem,
"peak_pcpu": self.max_values.total_pcpu,
"average_pcpu": self.averages.pcpu,
"num_samples": self.averages.num_samples,
"num_reports": self.number,
}
Expand All @@ -403,7 +406,10 @@ def dump_json(self) -> str:

@cached_property
def execution_summary_formatted(self) -> str:
return self.summary_format.format_map(self.execution_summary)
human_readable = {
k: "unknown" if v is None else v for k, v in self.execution_summary.items()
}
return self.summary_format.format_map(human_readable)


@dataclass
Expand Down Expand Up @@ -543,6 +549,10 @@ def monitor_process(
break
sample = report.collect_sample()
# Report averages should be updated prior to sample aggregation
if (
sample is None
): # passthrough has probably finished before sample could be collected
continue
report.averages.update(sample)
if report.current_sample is None:
sample.averages = Averages.from_sample(sample)
Expand Down
28 changes: 25 additions & 3 deletions test/test_report.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
from __future__ import annotations
from datetime import datetime
from unittest import mock
import pytest
from con_duct.__main__ import Averages, ProcessStats, Sample
from con_duct.__main__ import (
EXECUTION_SUMMARY_FORMAT,
Averages,
ProcessStats,
Report,
Sample,
)

stat0 = ProcessStats(
pcpu=0.0, pmem=0, rss=0, vsz=0, timestamp="2024-06-11T10:09:37-04:00"
Expand Down Expand Up @@ -102,7 +109,7 @@ def test_averages_three_samples() -> None:
(0, 0.0, 0, 0.0),
(2.5, 3.5, 8192, 16384),
(100.0, 99.9, 65536, 131072),
]
],
)
def test_process_stats_green(pcpu: float, pmem: float, rss: int, vsz: int) -> None:
# Assert does not raise
Expand All @@ -123,7 +130,7 @@ def test_process_stats_green(pcpu: float, pmem: float, rss: int, vsz: int) -> No
(1, 2, "one", 4),
(1, 2, 3, "value"),
("2", "fail", "or", "more"),
]
],
)
def test_process_stats_red(pcpu: float, pmem: float, rss: int, vsz: int) -> None:
with pytest.raises(AssertionError):
Expand All @@ -134,3 +141,18 @@ def test_process_stats_red(pcpu: float, pmem: float, rss: int, vsz: int) -> None
vsz=vsz,
timestamp=datetime.now().astimezone().isoformat(),
)


@mock.patch("con_duct.__main__.LogPaths")
@mock.patch("con_duct.__main__.subprocess.Popen")
def test_execution_summary_formatted(
mock_popen: mock.MagicMock, mock_log_paths: mock.MagicMock
) -> None:
mock_log_paths.prefix = "mock_prefix"
report = Report(
"_cmd", [], None, mock_popen, mock_log_paths, EXECUTION_SUMMARY_FORMAT, False
)

output = report.execution_summary_formatted
assert "None" not in output
assert "unknown" in output
Loading