Skip to content

Commit

Permalink
Track spikes, bugfixes, test execution, and enable osx (#37)
Browse files Browse the repository at this point in the history
  • Loading branch information
asmacdo authored Jun 7, 2024
1 parent 1e53ff1 commit 8b1e218
Show file tree
Hide file tree
Showing 10 changed files with 348 additions and 92 deletions.
36 changes: 12 additions & 24 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ jobs:
fail-fast: true
matrix:
os:
# - macos-12
# - macos-latest
- macos-12
- macos-latest
# - windows-latest
- ubuntu-latest
python-version:
Expand All @@ -38,25 +38,16 @@ jobs:
- 'pypy-3.8'
- 'pypy-3.9'
- 'pypy-3.10'
# exclude:
# # No older Pythons on arm64 macos-latest
# - python-version: '3.7'
# os: macos-latest
# - python-version: '3.8'
# os: macos-latest
# - python-version: '3.9'
# os: macos-latest
# - python-version: 'pypy-3.8'
# os: macos-latest
# - python-version: 'pypy-3.9'
# os: macos-latest
# include:
# - python-version: '3.7'
# toxenv: lint
# os: ubuntu-latest
# - python-version: '3.7'
# toxenv: typing
# os: ubuntu-latest
exclude:
# No older Pythons on arm64 macos-latest
- python-version: '3.8'
os: macos-latest
- python-version: '3.9'
os: macos-latest
- python-version: 'pypy-3.8'
os: macos-latest
- python-version: 'pypy-3.9'
os: macos-latest
steps:
- name: Check out repository
uses: actions/checkout@v4
Expand All @@ -77,9 +68,6 @@ jobs:
- name: Run tests
run: tox -e py -- -vv --cov-report=xml

- name: Run smoke tests
run: ./smoke-tests.sh

- name: Upload coverage to Codecov
uses: codecov/codecov-action@v4
with:
Expand Down
43 changes: 31 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,47 @@ A process wrapper script that monitors the execution of a command.
```shell
> duct --help

usage: duct [-h] [--sample-interval SAMPLE_INTERVAL]
[--output_prefix OUTPUT_PREFIX]
[--report-interval REPORT_INTERVAL]
usage: duct [-h] [-p OUTPUT_PREFIX] [--sample-interval SAMPLE_INTERVAL]
[--report-interval REPORT_INTERVAL] [-c {all,none,stdout,stderr}]
[-o {all,none,stdout,stderr}]
[-t {all,system-summary,processes-samples}]
command [arguments ...]

Duct creates a new session to run a command and all its child processes, and
the collects metrics for all processes in the session.
Gathers metrics on a command and all its child processes.

positional arguments:
command The command to execute.
arguments Arguments for the command.
arguments Arguments for the command. (default: None)

options:
-h, --help show this help message and exit
--sample-interval SAMPLE_INTERVAL
-p OUTPUT_PREFIX, --output-prefix OUTPUT_PREFIX
File string format to be used as a prefix for the
files -- the captured stdout and stderr and the
resource usage logs. The understood variables are
{datetime}, {datetime_filesafe}, and {pid}. Leading
directories will be created if they do not exist. You
can also provide value via DUCT_OUTPUT_PREFIX env
variable. (default:
.duct/logs/{datetime_filesafe}-{pid}_)
--sample-interval SAMPLE_INTERVAL, --s-i SAMPLE_INTERVAL
Interval in seconds between status checks of the
running process.
--output_prefix OUTPUT_PREFIX
Directory in which all logs will be saved.
--report-interval REPORT_INTERVAL
running process. Sample interval should be larger than
the runtime of the process or `duct` may underreport
the number of processes started. (default: 1.0)
--report-interval REPORT_INTERVAL, --r-i REPORT_INTERVAL
Interval in seconds at which to report aggregated
data.
data. (default: 60.0)
-c {all,none,stdout,stderr}, --capture-outputs {all,none,stdout,stderr}
Record stdout, stderr, all, or none to log files. You
can also provide value via DUCT_CAPTURE_OUTPUTS env
variable. (default: all)
-o {all,none,stdout,stderr}, --outputs {all,none,stdout,stderr}
Print stdout, stderr, all, or none to stdout/stderr
respectively. (default: all)
-t {all,system-summary,processes-samples}, --record-types {all,system-summary,processes-samples}
Record system-summary, processes-samples, or all
(default: all)
```
## Testing:
Expand Down
4 changes: 0 additions & 4 deletions smoke-tests.sh

This file was deleted.

136 changes: 89 additions & 47 deletions src/duct.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def __init__(
self.arguments = arguments
self.session_id = session_id
self.gpus = []
self.env = None
self.number = 0
self.system_info = {}
self.output_prefix = output_prefix
Expand All @@ -75,7 +76,7 @@ def collect_environment(self):

def get_system_info(self):
"""Gathers system information related to CPU, GPU, memory, and environment variables."""
self.system_info["uid"] = os.environ["USER"]
self.system_info["uid"] = os.environ.get("USER")
self.system_info["memory_total"] = os.sysconf("SC_PAGE_SIZE") * os.sysconf(
"SC_PHYS_PAGES"
)
Expand Down Expand Up @@ -103,13 +104,23 @@ def get_system_info(self):
except subprocess.CalledProcessError:
self.gpus = ["Failed to query GPU info"]

def update_max_resources(self, maxes, sample):
def calculate_total_usage(self, sample):
pmem = 0.0
pcpu = 0.0
for _pid, pinfo in sample.items():
pmem += pinfo["pmem"]
pcpu += pinfo["pcpu"]
totals = {"totals": {"pmem": pmem, "pcpu": pcpu}}
return totals

@staticmethod
def update_max_resources(maxes, sample):
for pid in sample:
if pid in maxes:
for key, value in sample[pid].items():
maxes[pid][key] = max(maxes[pid].get(key, value), value)
else:
maxes[pid] = sample[pid]
maxes[pid] = sample[pid].copy()

def collect_sample(self):
process_data = {}
Expand Down Expand Up @@ -140,7 +151,7 @@ def collect_sample(self):
"timestamp": datetime.now().astimezone().isoformat(),
}
except subprocess.CalledProcessError:
process_data["error"] = "Failed to query process data"
pass
return process_data

def write_pid_samples(self):
Expand All @@ -157,32 +168,43 @@ def finalize(self):
print(Colors.OKGREEN)
else:
print(Colors.FAIL)

print("-----------------------------------------------------")
print(" duct report")
print("-----------------------------------------------------")
print(f"Exit Code: {self.process.returncode}")
print(f"{Colors.OKCYAN}Command: {self.command}")
print(f"Log files location: {self.output_prefix}")
print(f"Wall Clock Time: {self.elapsed_time}")
print(f"Number of Processes: {len(self.max_values)}")
for pid, values in self.max_values.items():
values.pop("timestamp") # Meaningless
print(f" {pid} Max Usage: {values}")


def monitor_process(report, process, report_interval, sample_interval):
while True:
if process.poll() is not None: # the passthrough command has finished
break
# print(f"Resource stats log path: {resource_stats_log_path}")
sample = report.collect_sample()
report.update_max_resources(report._sample, sample)
if report.elapsed_time >= (report.number + 1) * report_interval:
report.write_pid_samples()
report.update_max_resources(report.max_values, report._sample)
report._sample = defaultdict(dict) # Reset sample
report.number += 1
time.sleep(sample_interval)
print(
f"Memory Peak Usage: {self.max_values.get('totals', {}).get('pmem', 'unknown')}%"
)
print(
f"CPU Peak Usage: {self.max_values.get('totals', {}).get('pcpu', 'unknown')}%"
)

def __repr__(self):
return json.dumps(
{
"Command": self.command,
"System": self.system_info,
"ENV": self.env,
"GPU": self.gpus,
}
)


def monitor_process(report, process, report_interval, sample_interval, stop_event):
while not stop_event.wait(timeout=sample_interval):
while True:
if process.poll() is not None: # the passthrough command has finished
break
# print(f"Resource stats log path: {resource_stats_log_path}")
sample = report.collect_sample()
totals = report.calculate_total_usage(sample)
report.update_max_resources(sample, totals)
report.update_max_resources(report._sample, sample)
if report.elapsed_time >= report.number * report_interval:
report.write_pid_samples()
report.update_max_resources(report.max_values, report._sample)
report._sample = defaultdict(dict) # Reset sample
report.number += 1


def create_and_parse_args():
Expand Down Expand Up @@ -210,7 +232,9 @@ def create_and_parse_args():
"--s-i",
type=float,
default=float(os.getenv("DUCT_SAMPLE_INTERVAL", "1.0")),
help="Interval in seconds between status checks of the running process.",
help="Interval in seconds between status checks of the running process. "
"Sample interval should be larger than the runtime of the process or `duct` may "
"underreport the number of processes started.",
)
parser.add_argument(
"--report-interval",
Expand Down Expand Up @@ -306,8 +330,8 @@ def prepare_outputs(
elif capture_outputs in ["all", "stdout"] and outputs in ["none", "stderr"]:
stdout = open(f"{output_prefix}stdout", "w")
elif capture_outputs in ["none", "stderr"] and outputs in ["all", "stdout"]:
stdout = subprocess.PIPE
else:
stdout = None
elif capture_outputs in ["none", "stderr"] and outputs in ["none", "stderr"]:
stdout = subprocess.DEVNULL

if capture_outputs in ["all", "stderr"] and outputs in ["all", "stderr"]:
Expand All @@ -316,12 +340,20 @@ def prepare_outputs(
elif capture_outputs in ["all", "stderr"] and outputs in ["none", "stdout"]:
stderr = open(f"{output_prefix}stderr", "w")
elif capture_outputs in ["none", "stdout"] and outputs in ["all", "stderr"]:
stderr = subprocess.PIPE
else:
stderr = None
elif capture_outputs in ["none", "stdout"] and outputs in ["none", "stdout"]:
stderr = subprocess.DEVNULL
return stdout, stderr


def safe_close_files(file_list):
for f in file_list:
try:
f.close()
except Exception:
pass


def ensure_directories(path: str) -> None:
if path.endswith(os.sep): # If it ends in "/" (for linux) treat as a dir
os.makedirs(path, exist_ok=True)
Expand All @@ -333,8 +365,12 @@ def ensure_directories(path: str) -> None:


def main():
"""A wrapper to execute a command, monitor and log the process details."""
args = create_and_parse_args()
execute(args)


def execute(args):
"""A wrapper to execute a command, monitor and log the process details."""
datetime_filesafe = datetime.now().strftime("%Y.%m.%dT%H.%M.%S")
duct_pid = os.getpid()
formatted_output_prefix = args.output_prefix.format(
Expand All @@ -354,18 +390,19 @@ def main():
stderr_file = stderr

full_command = " ".join([str(args.command)] + args.arguments)
print(f"{Colors.OKCYAN}-----------------------------------------------------")
print(f"duct is executing {full_command}...")
print()
print(f"Log files will be written to {formatted_output_prefix}")
print(f"-----------------------------------------------------{Colors.ENDC}")
print(f"{Colors.OKCYAN}duct is executing {full_command}...")
print(f"Log files will be written to {formatted_output_prefix}{Colors.ENDC}")
process = subprocess.Popen(
[str(args.command)] + args.arguments,
stdout=stdout_file,
stderr=stderr_file,
preexec_fn=os.setsid,
)
session_id = os.getsid(process.pid) # Get session ID of the new process
try:
session_id = os.getsid(process.pid) # Get session ID of the new process
except ProcessLookupError: # process has already finished
session_id = None

report = Report(
args.command,
args.arguments,
Expand All @@ -374,18 +411,21 @@ def main():
process,
datetime_filesafe,
)
stop_event = threading.Event()
if args.record_types in ["all", "processes-samples"]:
monitoring_args = [
report,
process,
args.report_interval,
args.sample_interval,
stop_event,
]
monitoring_thread = threading.Thread(
target=monitor_process, args=monitoring_args
)
monitoring_thread.start()
monitoring_thread.join()
else:
monitoring_thread = None

if args.record_types in ["all", "system-summary"]:
report.collect_environment()
Expand All @@ -400,15 +440,17 @@ def main():
system_logs.write(str(report))

process.wait()
stop_event.set()
if monitoring_thread is not None:
monitoring_thread.join()

# If we have any extra samples that haven't been written yet, do it now
if report._sample:
report.update_max_resources(report.max_values, report._sample)
report.write_pid_samples()
report.process = process
if isinstance(stdout, TailPipe):
stdout_file.close()
stdout.close()
if isinstance(stderr, TailPipe):
stderr_file.close()
stderr.close()
report.finalize()
print(f"Log files location: {report.output_prefix}")
safe_close_files([stdout_file, stdout, stderr_file, stderr])


if __name__ == "__main__":
Expand Down
2 changes: 0 additions & 2 deletions test/test_aggregation.py

This file was deleted.

Loading

0 comments on commit 8b1e218

Please sign in to comment.