Skip to content

Commit

Permalink
Add output option matrix, including TeeStream
Browse files Browse the repository at this point in the history
For stderr and stddout, users can choose to log, print, or both.
  • Loading branch information
asmacdo committed Apr 29, 2024
1 parent c8ffdd1 commit e7901e8
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 5 deletions.
89 changes: 84 additions & 5 deletions src/duct.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import pprint
import shutil
import subprocess
import sys
import threading
import time

__version__ = "0.0.1"
Expand Down Expand Up @@ -164,18 +166,94 @@ def create_and_parse_args():
default=60.0,
help="Interval in seconds at which to report aggregated data.",
)
parser.add_argument(
"--capture-outputs",
type=str,
default="all",
choices=["all", "none", "stdout", "stderr"],
help="Record stdout, stderr, both, or neither to log files.",
)
parser.add_argument(
"--outputs",
type=str,
default="all",
choices=["all", "none", "stdout", "stderr"],
help="print stdout, stderr, both, or neither to log files.",
)
return parser.parse_args()


class TeeStream:
def __init__(self, file_path):
self.file = open(file_path, "w")
(
self.master_fd,
self.slave_fd,
) = os.openpty() # Use pseudo-terminal to simulate terminal behavior

def fileno(self):
"""Return the file descriptor to be used by subprocess as stdout/stderr."""
return self.slave_fd

def start(self):
"""Start a thread to read from the master_fd and write to stdout and the file."""
thread = threading.Thread(target=self._redirect_output, daemon=True)
thread.start()

def _redirect_output(self):
with os.fdopen(self.master_fd, "rb", buffering=0) as stream:
while True:
try:
data = stream.read(1024) # Read larger blocks of data
except OSError: # The file has been closed
break
if not data: # Still open, nothing to do
break
sys.stdout.buffer.write(data)
sys.stdout.buffer.flush()
self.file.write(
data.decode("utf-8", "replace")
) # Handling decoding errors
self.file.flush()

def close(self):
"""Close the slave fd and the file when done."""
os.close(self.slave_fd)
self.file.close()


def main():
"""A wrapper to execute a command, monitor and log the process details."""
args = create_and_parse_args()
os.makedirs(args.output_prefix, exist_ok=True)

if args.capture_outputs in ["all", "stdout"] and args.outputs in ["all", "stdout"]:
stdout = TeeStream(args.output_prefix + "/stdout.txt")
stdout.start()
elif args.capture_outputs in ["none", "stderr"] and args.outputs in [
"all",
"stdout",
]:
stdout = subprocess.PIPE
else:
stdout = subprocess.DEVNULL

if args.capture_outputs in ["all", "stderr"] and args.outputs in ["all", "stderr"]:
stderr = TeeStream(args.output_prefix + "/stderr.txt")
stderr.start()
elif args.capture_outputs in ["none", "stdout"] and args.outputs in [
"all",
"stderr",
]:
stderr = subprocess.PIPE
else:
stderr = subprocess.DEVNULL

try:
process = subprocess.Popen(
[str(args.command)] + args.arguments.copy(),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdout=stdout,
stderr=stderr,
preexec_fn=os.setsid,
)
session_id = os.getsid(process.pid) # Get session ID of the new process
Expand All @@ -197,6 +275,10 @@ def main():
report.number += 1

if process.poll() is not None: # the passthrough command has finished
if isinstance(stdout, TeeStream):
stdout.close()
if isinstance(stderr, TeeStream):
stderr.close()
break
time.sleep(args.sample_interval)

Expand All @@ -207,10 +289,7 @@ def main():
report.run_time_seconds = f"{report.end_time - report.start_time}"
report.get_system_info()
system_logs.write(str(report))
stdout, stderr = process.communicate()
pprint.pprint(report, width=120)
print(f"STDOUT: {stdout.decode()}")
print(f"STDERR: {stderr.decode()}")

except Exception as e:
import traceback
Expand Down
53 changes: 53 additions & 0 deletions test_logs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#!/usr/bin/env python3

import argparse
import sys
import time


def consume_cpu(duration, _):
"""Function to consume CPU proportional to 'load' for 'duration' seconds"""
for i in range(duration):
print(f"out: {i}")
print(f"err: {i}", file=sys.stderr)
time.sleep(1)


def consume_memory(size):
"""Function to consume amount of memory specified by 'size' in megabytes"""
# Create a list of size MB
bytes_in_mb = 1024 * 1024
_memory = bytearray(size * bytes_in_mb) # noqa


def main(duration, cpu_load, memory_size):
print("Printing something to STDOUT at start")
print("Printing something to STDERR at start", file=sys.stderr)
consume_memory(memory_size)
consume_cpu(duration, cpu_load)
print(
f"Test completed. Consumed {memory_size} MB for {duration} seconds with CPU load factor {cpu_load}."
)
print("Printing something to STDOUT at finish")
print("Printing something to STDERR at finish", file=sys.stderr)


if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Test script to consume CPU and memory."
)
parser.add_argument(
"--duration", type=int, default=60, help="Duration to run the test in seconds."
)
parser.add_argument(
"--cpu-load", type=int, default=10000, help="Load factor to simulate CPU usage."
)
parser.add_argument(
"--memory-size",
type=int,
default=10,
help="Amount of memory to allocate in MB.",
)

args = parser.parse_args()
main(args.duration, args.cpu_load, args.memory_size)

0 comments on commit e7901e8

Please sign in to comment.