diff --git a/src/duct.py b/src/duct.py index 81c387c4..f7ee57a7 100755 --- a/src/duct.py +++ b/src/duct.py @@ -8,6 +8,8 @@ import pprint import shutil import subprocess +import sys +import threading import time __version__ = "0.0.1" @@ -164,18 +166,94 @@ def create_and_parse_args(): default=60.0, help="Interval in seconds at which to report aggregated data.", ) + parser.add_argument( + "--capture-outputs", + type=str, + default="all", + choices=["all", "none", "stdout", "stderr"], + help="Record stdout, stderr, both, or neither to log files.", + ) + parser.add_argument( + "--outputs", + type=str, + default="all", + choices=["all", "none", "stdout", "stderr"], + help="print stdout, stderr, both, or neither to log files.", + ) return parser.parse_args() +class TeeStream: + def __init__(self, file_path): + self.file = open(file_path, "w") + ( + self.master_fd, + self.slave_fd, + ) = os.openpty() # Use pseudo-terminal to simulate terminal behavior + + def fileno(self): + """Return the file descriptor to be used by subprocess as stdout/stderr.""" + return self.slave_fd + + def start(self): + """Start a thread to read from the master_fd and write to stdout and the file.""" + thread = threading.Thread(target=self._redirect_output, daemon=True) + thread.start() + + def _redirect_output(self): + with os.fdopen(self.master_fd, "rb", buffering=0) as stream: + while True: + try: + data = stream.read(1024) # Read larger blocks of data + except OSError: # The file has been closed + break + if not data: # Still open, nothing to do + break + sys.stdout.buffer.write(data) + sys.stdout.buffer.flush() + self.file.write( + data.decode("utf-8", "replace") + ) # Handling decoding errors + self.file.flush() + + def close(self): + """Close the slave fd and the file when done.""" + os.close(self.slave_fd) + self.file.close() + + def main(): """A wrapper to execute a command, monitor and log the process details.""" args = create_and_parse_args() os.makedirs(args.output_prefix, exist_ok=True) + + if args.capture_outputs in ["all", "stdout"] and args.outputs in ["all", "stdout"]: + stdout = TeeStream(args.output_prefix + "/stdout.txt") + stdout.start() + elif args.capture_outputs in ["none", "stderr"] and args.outputs in [ + "all", + "stdout", + ]: + stdout = subprocess.PIPE + else: + stdout = subprocess.DEVNULL + + if args.capture_outputs in ["all", "stderr"] and args.outputs in ["all", "stderr"]: + stderr = TeeStream(args.output_prefix + "/stderr.txt") + stderr.start() + elif args.capture_outputs in ["none", "stdout"] and args.outputs in [ + "all", + "stderr", + ]: + stderr = subprocess.PIPE + else: + stderr = subprocess.DEVNULL + try: process = subprocess.Popen( [str(args.command)] + args.arguments.copy(), - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, + stdout=stdout, + stderr=stderr, preexec_fn=os.setsid, ) session_id = os.getsid(process.pid) # Get session ID of the new process @@ -197,6 +275,10 @@ def main(): report.number += 1 if process.poll() is not None: # the passthrough command has finished + if isinstance(stdout, TeeStream): + stdout.close() + if isinstance(stderr, TeeStream): + stderr.close() break time.sleep(args.sample_interval) @@ -207,10 +289,7 @@ def main(): report.run_time_seconds = f"{report.end_time - report.start_time}" report.get_system_info() system_logs.write(str(report)) - stdout, stderr = process.communicate() pprint.pprint(report, width=120) - print(f"STDOUT: {stdout.decode()}") - print(f"STDERR: {stderr.decode()}") except Exception as e: import traceback diff --git a/test_logs.py b/test_logs.py new file mode 100755 index 00000000..479ac371 --- /dev/null +++ b/test_logs.py @@ -0,0 +1,53 @@ +#!/usr/bin/env python3 + +import argparse +import sys +import time + + +def consume_cpu(duration, _): + """Function to consume CPU proportional to 'load' for 'duration' seconds""" + for i in range(duration): + print(f"out: {i}") + print(f"err: {i}", file=sys.stderr) + time.sleep(1) + + +def consume_memory(size): + """Function to consume amount of memory specified by 'size' in megabytes""" + # Create a list of size MB + bytes_in_mb = 1024 * 1024 + _memory = bytearray(size * bytes_in_mb) # noqa + + +def main(duration, cpu_load, memory_size): + print("Printing something to STDOUT at start") + print("Printing something to STDERR at start", file=sys.stderr) + consume_memory(memory_size) + consume_cpu(duration, cpu_load) + print( + f"Test completed. Consumed {memory_size} MB for {duration} seconds with CPU load factor {cpu_load}." + ) + print("Printing something to STDOUT at finish") + print("Printing something to STDERR at finish", file=sys.stderr) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Test script to consume CPU and memory." + ) + parser.add_argument( + "--duration", type=int, default=60, help="Duration to run the test in seconds." + ) + parser.add_argument( + "--cpu-load", type=int, default=10000, help="Load factor to simulate CPU usage." + ) + parser.add_argument( + "--memory-size", + type=int, + default=10, + help="Amount of memory to allocate in MB.", + ) + + args = parser.parse_args() + main(args.duration, args.cpu_load, args.memory_size)