diff --git a/.chatgpt/2024-04-21.md b/.chatgpt/2024-04-21.md deleted file mode 100644 index 76751b56..00000000 --- a/.chatgpt/2024-04-21.md +++ /dev/null @@ -1,276 +0,0 @@ -User -i need a python script that is a "wrapper" meaning that it functions like the time cmd, it executes the next arg as the command, and passes all subsequent args as args to the command-- but prior, during, and following the execution, the script runs additional processes to monitor the session, including the process executing the wrapped command and all child processes -ChatGPT - -Creating a Python script that acts as a wrapper to execute a command and monitor the session can be achieved using Python's subprocess module for running the external command, and potentially the psutil module for monitoring the processes. This script will: - - Take a command and its arguments as input. - Execute the command while capturing its start time and end time. - Monitor and log details about the process and its child processes before, during, and after its execution. - -First, ensure you have the psutil library installed, which helps in retrieving information on running processes. You can install it using pip: - -bash - -pip install psutil - -Here's a basic version of the Python script that functions as a wrapper: - -python - -import subprocess -import psutil -import sys -import time - -def monitor_processes(initial_procs): - """ Monitor and log details about specified processes. """ - for proc in initial_procs: - try: - p = psutil.Process(proc.pid) - print(f"PID: {p.pid}, Name: {p.name()}, CPU usage: {p.cpu_percent(interval=1)}%, Memory: {p.memory_info().rss}") - except (psutil.NoSuchProcess, psutil.AccessDenied): - print(f"Process {proc.pid} no longer exists or access denied.") - -def main(command, *args): - """ A wrapper to execute a command, monitor and log the process details. """ - try: - # Start monitoring processes - initial_procs = list(psutil.process_iter(['pid', 'name'])) - - # Start the process - print("Starting the command...") - start_time = time.time() - process = subprocess.Popen([command] + list(args), stdout=subprocess.PIPE, stderr=subprocess.PIPE) - - # Monitor during execution - try: - while True: - monitor_processes([process]) - if process.poll() is not None: - break - time.sleep(1) # Delay for a bit to avoid too much logging - except KeyboardInterrupt: - print("Monitoring interrupted.") - - # Collect outputs - stdout, stderr = process.communicate() - end_time = time.time() - - # Final monitoring snapshot - print("Command finished. Final process details:") - monitor_processes([process]) - - # Output results - print(f"Command executed in {end_time - start_time:.2f} seconds.") - print("STDOUT:", stdout.decode()) - print("STDERR:", stderr.decode()) - - except Exception as e: - print(f"Failed to execute command: {str(e)}") - -if __name__ == "__main__": - if len(sys.argv) > 1: - main(sys.argv[1], *sys.argv[2:]) - else: - print("Usage: python wrapper.py [arguments...]") - -How to Use the Script - - Save the script as wrapper.py. - Run it from the command line by specifying the command and any arguments you want to pass to it. For example: - - bash - - python wrapper.py ls -l /home - -This script will print out process details before and during the execution of the command and show the command output and execution time. You can extend this script by adding more sophisticated monitoring features or integrating with other tools for specific monitoring needs. -User -add shebang and use as a terminal command in the $PATH -ChatGPT - -To use the Python script as a terminal command, we'll add a shebang line at the beginning of the script and provide instructions on how to make the script executable and accessible from anywhere in the system by adding it to the $PATH. -Step 1: Add Shebang Line - -Add the following shebang line at the top of your Python script (wrapper.py). This line tells the system that the script should be run using Python. - -python - -#!/usr/bin/env python3 - -Step 2: Update the Script - -Here's your updated script with the shebang included: - -python - -#!/usr/bin/env python3 - -import subprocess -import psutil -import sys -import time - -def monitor_processes(initial_procs): - """ Monitor and log details about specified processes. """ - for proc in initial_procs: - try: - p = psutil.Process(proc.pid) - print(f"PID: {p.pid}, Name: {p.name()}, CPU usage: {p.cpu_percent(interval=1)}%, Memory: {p.memory_info().rss}") - except (psutil.NoSuchProcess, psutil.AccessDenied): - print(f"Process {proc.pid} no longer exists or access denied.") - -def main(command, *args): - """ A wrapper to execute a command, monitor and log the process details. """ - try: - # Start monitoring processes - initial_procs = list(psutil.process_iter(['pid', 'name'])) - - # Start the process - print("Starting the command...") - start_time = time.time() - process = subprocess.Popen([command] + list(args), stdout=subprocess.PIPE, stderr=subprocess.PIPE) - - # Monitor during execution - try: - while True: - monitor_processes([process]) - if process.poll() is not None: - break - time.sleep(1) # Delay for a bit to avoid too much logging - except KeyboardInterrupt: - print("Monitoring interrupted.") - - # Collect outputs - stdout, stderr = process.communicate() - end_time = time.time() - - # Final monitoring snapshot - print("Command finished. Final process details:") - monitor_processes([process]) - - # Output results - print(f"Command executed in {end_time - start_time:.2f} seconds.") - print("STDOUT:", stdout.decode()) - print("STDERR:", stderr.decode()) - - except Exception as e: - print(f"Failed to execute command: {str(e)}") - -if __name__ == "__main__": - if len(sys.argv) > 1: - main(sys.argv[1], *sys.argv[2:]) - else: - print("Usage: python wrapper.py [arguments...]") - -Step 3: Make the Script Executable - -Change the permissions of your script to make it executable. You can do this with the following command in your terminal: - -bash - -chmod +x wrapper.py - -Step 4: Add to $PATH - -To make the script accessible from anywhere on your system, move it to a directory that's included in your $PATH. A common location is /usr/local/bin/, but you can choose another if it fits your configuration better. - -bash - -sudo mv wrapper.py /usr/local/bin/wrapper - -Now, you should be able to call the script from anywhere in your terminal just by typing wrapper followed by your command and its arguments. For example: - -bash - -wrapper ls -l /home - -This setup lets you use the wrapper as a regular terminal command across your system. -User -make it a native python -- ie no pip install pyutil -ChatGPT - -If you prefer to create a native Python wrapper script without using the psutil library, you'll need to rely on native modules like subprocess and perhaps os to manage and monitor processes. However, without psutil, your options for monitoring detailed metrics about processes (like memory and CPU usage) become limited. - -Here's a revised version of the script that does not require psutil. This version can monitor the process by checking its existence and status, but it lacks the ability to fetch CPU or memory usage details: - -python - -#!/usr/bin/env python3 - -import subprocess -import sys -import time -import os - -def monitor_process(pid): - """ Monitor and log basic details about the process. """ - try: - # Check if the process is still running - os.kill(pid, 0) - print(f"Process {pid} is still running.") - except OSError: - print(f"Process {pid} has terminated.") - -def main(command, *args): - """ A wrapper to execute a command, monitor and log the process details. """ - try: - # Start the process - print("Starting the command...") - start_time = time.time() - process = subprocess.Popen([command] + list(args), stdout=subprocess.PIPE, stderr=subprocess.PIPE) - - # Monitor during execution - try: - while True: - monitor_process(process.pid) - if process.poll() is not None: - break - time.sleep(1) # Delay for a bit to avoid too much logging - except KeyboardInterrupt: - print("Monitoring interrupted.") - - # Collect outputs - stdout, stderr = process.communicate() - end_time = time.time() - - # Output results - print(f"Command executed in {end_time - start_time:.2f} seconds.") - print("STDOUT:", stdout.decode()) - print("STDERR:", stderr.decode()) - - except Exception as e: - print(f"Failed to execute command: {str(e)}") - -if __name__ == "__main__": - if len(sys.argv) > 1: - main(sys.argv[1], *sys.argv[2:]) - else: - print("Usage: wrapper [arguments...]") - -Steps to Make It a Terminal Command - -Step 1: Save the script in a file named wrapper.py. - -Step 2: Make the script executable: - -bash - -chmod +x wrapper.py - -Step 3: Move it to a directory in your $PATH (e.g., /usr/local/bin/): - -bash - -sudo mv wrapper.py /usr/local/bin/wrapper - -Step 4: Use the script by calling wrapper followed by your command: - -bash - -wrapper ls -l /home - -Limitations - -This script only checks if the process is still running and does not gather detailed metrics about CPU and memory usage. For more detailed process monitoring without third-party libraries, you would typically need to directly parse system files like those in /proc on Linux systems, which can be complex and not easily portable across different operating systems. - diff --git a/.chatgpt/2024-04-22.md b/.chatgpt/2024-04-22.md deleted file mode 100644 index 55b190f4..00000000 --- a/.chatgpt/2024-04-22.md +++ /dev/null @@ -1,469 +0,0 @@ -User - -prior to executing the command, start a new session, this way we will track all child processes also - -ChatGPT - -To ensure that your Python wrapper script starts a new session when executing the command, you can use the setsid function from the os module. By starting a new session, the command executed by subprocess.Popen becomes the session leader, which means all child processes spawned by this command will be part of this new session. This is especially useful for tracking and managing all related child processes more effectively. - -Here’s how you can update your script to start a new session before executing the command: - -python - -#!/usr/bin/env python3 - -import subprocess -import sys -import time -import os -import argparse - -def monitor_process(pid): - """ Monitor and log basic details about the process. """ - try: - # Check if the process is still running - os.kill(pid, 0) - print(f"Process {pid} is still running.") - except OSError: - print(f"Process {pid} has terminated.") - -def main(command, args, sample_interval): - """ A wrapper to execute a command, monitor and log the process details. """ - try: - # Start the process - print("Starting the command...") - start_time = time.time() - # Use setsid to start a new session - process = subprocess.Popen([command] + args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, preexec_fn=os.setsid) - - # Monitor during execution - try: - while True: - monitor_process(process.pid) - if process.poll() is not None: - break - time.sleep(sample_interval) # Parameterized delay - except KeyboardInterrupt: - print("Monitoring interrupted.") - - # Collect outputs - stdout, stderr = process.communicate() - end_time = time.time() - - # Output results - print(f"Command executed in {end_time - start_time:.2f} seconds.") - print("STDOUT:", stdout.decode()) - print("STDERR:", stderr.decode()) - - except Exception as e: - print(f"Failed to execute command: {str(e)}") - -if __name__ == "__main__": - # Setup argparse to handle command line arguments - parser = argparse.ArgumentParser(description="A process wrapper script that monitors the execution of a command.") - parser.add_argument('command', help="The command to execute.") - parser.add_argument('arguments', nargs='*', help="Arguments for the command.") - parser.add_argument('--sample-interval', type=float, default=1.0, help="Interval in seconds between status checks of the running process.") - - args = parser.parse_args() - - main(args.command, args.arguments, args.sample_interval) - -Key Changes: - - Added preexec_fn=os.setsid to subprocess.Popen. This function is called in the child process just before the child is executed. os.setsid() makes the child process a session leader, which gives it control over all child processes it may spawn, allowing you to track and manage these processes more effectively. - -Usage: - -This script should be saved, made executable (chmod +x wrapper.py), and can be run with: - -bash - -./wrapper.py ls --sample-interval 0.5 - -In this command, ls is the command being executed and monitored, and --sample-interval 0.5 specifies that the monitoring process checks the command's status every 0.5 seconds. The command and its children will run in a new session, making it easier to manage and terminate them collectively if needed. - - -User: - -next we are going to aggregate the samples. ie, if we are polling every second, we only want to record aggregate data every minute. whats a good name for this second var -ChatGPT - -For aggregating samples taken at a specific interval into larger time units, it's helpful to use a variable name that clearly communicates its purpose. Since you're collecting samples every second and want to aggregate this data over a period of one minute, a good variable name could be something that reflects the idea of a reporting or aggregation interval. - -Here are some suggested names for this variable: - - aggregate_interval - This name clearly indicates that the value stored in this variable determines the interval over which data will be aggregated. - report_interval - Similar to aggregate_interval, this name suggests that data will be compiled and reported at this interval. - summary_interval - This is another good option that hints at summarizing or aggregating data over a set period. - collection_period - This term emphasizes the duration over which data is collected before being processed or reported. - -Any of these names would be suitable, with aggregate_interval perhaps being the most directly descriptive of the intended functionality. -User - 20 def main(command, args, sample_interval): ->> 21 """ A wrapper to execute a command, monitor and log the process details. """ # E: line too long (80 > 79 characters) - 22 try: - 23 # Start the process - 24 print("Starting the command...") - 25 start_time = time.time() - 26 # Use setsid to start a new session ->> 27 process = subprocess.Popen([command] + args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, preexec_fn=os.setsid) # E: line too long (122 > 79 characters) - 28 - 29 # Monitor during execution - 30 try: - 31 while True: - 32 monitor_process(process.pid) - 33 if process.poll() is not None: - 34 break - 35 time.sleep(sample_interval) # Parameterized delay - 36 except KeyboardInterrupt: - 37 print("Monitoring interrupted.") - 38 - 39 # Collect outputs - 40 stdout, stderr = process.communicate() - 41 end_time = time.time() - 42 - 43 # Output results - 44 print(f"Command executed in {end_time - start_time:.2f} seconds.") - 45 print("STDOUT:", stdout.decode()) - 46 print("STDERR:", stderr.decode()) - 47 - 48 except Exception as e: - 49 print(f"Failed to execute command: {str(e)}") - 50 - 51 - 52 if __name__ == "__main__": - 53 # Setup argparse to handle command line arguments ->> 54 parser = argparse.ArgumentParser(description="A process wrapper script that monitors the execution of a command.") # E: line too long (118 > 79 characters) - 55 parser.add_argument('command', help="The command to execute.") ->> 56 parser.add_argument('arguments', nargs='*', help="Arguments for the command.") # E: line too long (82 > 79 characters) ->> 57 parser.add_argument('--sample-interval', type=float, default=1.0, help="Interval in seconds between status checks of the running process.") # E: line too long (143 > 79 characters) - 58 - 59 args = parser.parse_args() - 60 - 61 main(args.command, args.arguments, args.sample_interval) - -ok i like report_interval. add it -ChatGPT - -To add report_interval to the script along with sample_interval, we'll modify the script to include this new argument. We'll use argparse to parse the report_interval from the command line, allowing the user to specify how frequently aggregated data should be reported. - -Here’s how you can integrate report_interval into the script: - -python - -#!/usr/bin/env python3 - -import subprocess -import sys -import time -import os -import argparse - -def monitor_process(pid, elapsed_time, report_interval, process_data): - """ Monitor and log basic details about the process. """ - try: - # Check if the process is still running - os.kill(pid, 0) - # Aggregating data here, for demonstration we just record the event - process_data.append(f"Process {pid} checked at {elapsed_time} seconds") - if elapsed_time >= report_interval: - print("\n".join(process_data)) - process_data.clear() - except OSError: - print(f"Process {pid} has terminated.") - -def main(command, args, sample_interval, report_interval): - """ A wrapper to execute a command, monitor and log the process details. """ - try: - # Start the process - print("Starting the command...") - start_time = time.time() - process = subprocess.Popen([command] + args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, preexec_fn=os.setsid) - - process_data = [] - elapsed_time = 0 - - # Monitor during execution - try: - while True: - current_time = time.time() - elapsed_time = current_time - start_time - monitor_process(process.pid, elapsed_time, report_interval, process_data) - if process.poll() is not None: - break - time.sleep(sample_interval) # Parameterized delay - except KeyboardInterrupt: - print("Monitoring interrupted.") - - # Collect outputs - stdout, stderr = process.communicate() - end_time = time.time() - - # Output results - print(f"Command executed in {end_time - start_time:.2f} seconds.") - print("STDOUT:", stdout.decode()) - print("STDERR:", stderr.decode()) - - except Exception as e: - print(f"Failed to execute command: {str(e)}") - -if __name__ == "__main__": - # Setup argparse to handle command line arguments - parser = argparse.ArgumentParser(description="A process wrapper script that monitors the execution of a command.") - parser.add_argument('command', help="The command to execute.") - parser.add_argument('arguments', nargs='*', help="Arguments for the command.") - parser.add_argument('--sample-interval', type=float, default=1.0, help="Interval in seconds between status checks of the running process.") - parser.add_argument('--report-interval', type=float, default=60.0, help="Interval in seconds at which to report aggregated data.") - - args = parser.parse_args() - - main(args.command, args.arguments, args.sample_interval, args.report_interval) - -Key Changes: - - Added report_interval: This is parsed via argparse and used to control how often aggregated data is printed. - Aggregated Data Handling: Inside the monitor_process function, data is aggregated into process_data, and once the elapsed time reaches the report_interval, the collected data is printed and the list is cleared. - -Usage: - -To use this script, specify the report_interval along with the sample_interval: - -bash - -./wrapper.py ls --sample-interval 1 --report-interval 60 - -In this example, the script checks the status of the process every second (sample_interval) and reports aggregated data every 60 seconds (report_interval). This setup allows for detailed monitoring and periodic reporting, making the script versatile for different monitoring scenarios. - -User -Next we need to improve our testing. -ChatGPT - -Improving testing for a script like the process wrapper you've developed involves several steps. These include testing individual components (unit tests), testing the script as a whole (integration tests), and possibly using mock objects to simulate interactions with the system environment and external commands. Here's a structured approach to improving the testing of your wrapper script: -1. Unit Testing - -Unit tests should focus on testing each function in isolation, ensuring that each part behaves as expected. - - Testing monitor_process(): - Verify that it correctly logs process checks. - Ensure it handles non-existent processes (using mock to simulate OSError). - Check that data aggregation and reporting work correctly at specified intervals. - - Testing main(): - Mock subprocesses to test handling of different command outputs and errors. - Test response to different argparse inputs to ensure correct parsing and error handling. - -To implement these tests, Python's unittest framework and unittest.mock for mocking are useful tools. -2. Integration Testing - -Integration tests will check how well the components work together and with the Python environment: - - Script Execution: Test the script with real commands where possible, such as ls, echo, and other harmless system commands to ensure they are executed, monitored, and logged correctly. - Error Handling: Run the script with commands that fail or produce errors to ensure that error handling works as expected. - Signal Handling: Test how the script handles interruptions like KeyboardInterrupt. - -3. Mocking External Commands - -Using unittest.mock, you can simulate subprocess behaviors to test how your script handles various outputs and states without needing to execute real commands: - - Mock subprocess.Popen: You can mock this method to simulate command execution and control the outputs and process characteristics (like PID, return status). - Simulate Long-running Processes: Test how the script monitors processes over extended periods. - -4. Test Automation - -Automating your tests can greatly enhance the reliability of your testing process: - - Continuous Integration (CI): Set up a CI pipeline using tools like Jenkins, GitHub Actions, or GitLab CI to run tests automatically when changes are pushed to your repository. - Test Coverage Analysis: Use tools like coverage.py to ensure that your tests cover all parts of your script. This will help identify any parts of your script that may not be tested adequately. - -Example Unit Test Using unittest and unittest.mock - -Here’s a basic example of what unit testing might look like for the monitor_process function using unittest and mock: - -python - -import unittest -from unittest.mock import patch, MagicMock -import your_script # import your script here - -class TestWrapperScript(unittest.TestCase): - @patch('your_script.os.kill') - def test_monitor_process_running(self, mock_kill): - process_data = [] - your_script.monitor_process(1234, 0, 60, process_data) - self.assertIn("Process 1234 checked at 0 seconds", process_data) - - @patch('your_script.os.kill', side_effect=OSError) - def test_monitor_process_terminated(self, mock_kill): - with self.assertLogs('your_script', level='INFO') as log: - your_script.monitor_process(1234, 0, 60, []) - self.assertIn("Process 1234 has terminated", log.output) - -if __name__ == '__main__': - unittest.main() - -This is just a start, and the complexity of your tests will grow depending on how complex your script is and how robust you want your testing suite to be. Make sure each component of your script is covered by tests to ensure reliability and maintainability. -User -i think id prefer to create a test script that can be passed in as the command rather than ls or others. The test script should be configurable to consume CPU, memory, and time, which can produce allow us to generate reports on longer running, heavier processes. -ChatGPT - -Creating a test script that you can pass in as the command for your wrapper script is an excellent idea, especially for controlled testing of resource consumption (CPU, memory) and process duration. This allows you to simulate a variety of conditions to thoroughly test how your wrapper handles different types of processes. -Test Script Requirements: - - Configurability: The script should accept command-line arguments to specify how much CPU and memory it should consume, and for how long it should run. - CPU Consumption: It should be able to perform operations that consume a significant amount of CPU cycles. - Memory Consumption: It should allocate a specified amount of memory. - Execution Time: It should run for a designated period. - -Python Test Script Example - -Here’s a simple Python script that meets these requirements. It uses command-line arguments to control its behavior: - -python - -#!/usr/bin/env python3 -import time -import sys -import argparse - -def consume_cpu(duration, load): - """ Function to consume CPU proportional to 'load' for 'duration' seconds """ - end_time = time.time() + duration - while time.time() < end_time: - for _ in range(load): - pass # Busy-wait - -def consume_memory(size): - """ Function to consume amount of memory specified by 'size' in megabytes """ - # Create a list of size MB - bytes_in_mb = 1024 * 1024 - memory = bytearray(size * bytes_in_mb) - -def main(duration, cpu_load, memory_size): - consume_memory(memory_size) - consume_cpu(duration, cpu_load) - print(f"Test completed. Consumed {memory_size} MB for {duration} seconds with CPU load factor {cpu_load}.") - -if __name__ == "__main__": - parser = argparse.ArgumentParser(description="Test script to consume CPU and memory.") - parser.add_argument('--duration', type=int, default=60, help="Duration to run the test in seconds.") - parser.add_argument('--cpu-load', type=int, default=10000, help="Load factor to simulate CPU usage.") - parser.add_argument('--memory-size', type=int, default=10, help="Amount of memory to allocate in MB.") - - args = parser.parse_args() - main(args.duration, args.cpu_load, args.memory_size) - -Usage - - Save this script as test_script.py. - Make it executable: chmod +x test_script.py. - Run using your wrapper script: wrapper ./test_script.py -- --duration 120 --cpu-load 50000 --memory-size 50 - -Explanation - - consume_cpu function: Simulates CPU load by performing a number of no-op operations based on the load factor within a loop that lasts for the specified duration. - consume_memory function: Allocates a bytearray of the specified size in MB. This occupies memory but doesn't do much else, simulating a memory load. - Command-line arguments: Allow you to specify how long the script runs, how hard it works the CPU, and how much memory it uses. - -This script provides a controlled environment for testing your wrapper under different load conditions. You can adjust the cpu_load and memory_size to see how your monitoring script reacts to high CPU usage and significant memory consumption over a specified duration. This makes it very useful for stress testing and verifying the robustness of your process monitoring and logging mechanisms. -ChatGPT can make mistakes. Consider checking important information. - -User - 43 monitor_process(process.pid, elapsed_time, report_interval, process_data) - -here we are using the process pid, but instead we should use the session id to retrieve a list of pids to monitor -ChatGPT - -To monitor all processes within a session rather than just a single process identified by its PID, you need to modify the script to work with session IDs and retrieve a list of PIDs within that session. This approach allows you to monitor not only the main process but also all its child processes. - -In Unix-like operating systems, each process belongs to a session, and each session can contain multiple processes. To implement this functionality, you need to: - - Retrieve the session ID of the process started by your wrapper. - Find all processes that belong to this session. - Monitor all these processes as part of your reporting. - -Here's how you can modify your script to incorporate these changes: -Modifications to the Script - -First, let's modify the monitor_process function to handle a session ID and find all PIDs in that session. - -python - -import os -import subprocess -import sys -import time -import argparse - -def get_processes_in_session(session_id): - """Retrieve all PIDs belonging to the given session ID.""" - pids = [] - for pid in os.listdir('/proc'): - if pid.isdigit(): - try: - with open(os.path.join('/proc', pid, 'stat'), 'r') as f: - data = f.read().split() - if int(data[5]) == session_id: # Check session ID in stat file - pids.append(int(pid)) - except IOError: # proc has already terminated - continue - return pids - -def monitor_processes(session_id, elapsed_time, report_interval, process_data): - """Monitor and log details about all processes in the given session.""" - pids = get_processes_in_session(session_id) - for pid in pids: - try: - os.kill(pid, 0) - process_data.append(f"Process {pid} checked at {elapsed_time} seconds") - except OSError: - process_data.append(f"Process {pid} has terminated.") - - if elapsed_time >= report_interval: - print("\n".join(process_data)) - process_data.clear() - -def main(command, args, sample_interval, report_interval): - try: - print("Starting the command...") - start_time = time.time() - process = subprocess.Popen([command] + args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, preexec_fn=os.setsid) - - session_id = os.getsid(process.pid) # Get session ID of the new process - process_data = [] - elapsed_time = 0 - - while True: - current_time = time.time() - elapsed_time = current_time - start_time - monitor_processes(session_id, elapsed_time, report_interval, process_data) - if process.poll() is not None: - break - time.sleep(sample_interval) - - stdout, stderr = process.communicate() - end_time = time.time() - print(f"Command executed in {end_time - start_time:.2f} seconds.") - print("STDOUT:", stdout.decode()) - print("STDERR:", stderr.decode()) - - except Exception as e: - print(f"Failed to execute command: {str(e)}") - -if __name__ == "__main__": - parser = argparse.ArgumentParser(description="A process wrapper script that monitors the execution of a command.") - parser.add_argument('command', help="The command to execute.") - parser.add_argument('arguments', nargs='*', help="Arguments for the command.") - parser.add_argument('--sample-interval', type=float, default=1.0, help="Interval in seconds between status checks of the running process.") - parser.add_argument('--report-interval', type=float, default=60.0, help="Interval in seconds at which to report aggregated data.") - - args = parser.parse_args() - main(args.command, args.arguments, args.sample_interval, args.report_interval) - -Key Points - - Process Session: The session ID is obtained using os.getsid(process.pid), which retrieves the session ID of the lead process. - /proc File System: The script now reads from /proc//stat to find all processes in the same session. This is specific to Linux-like systems. - Monitoring All Processes: It now monitors all processes in the session, not just the initial process. - -This modification allows you to comprehensively monitor all subprocesses spawned by the initial command, providing more detailed insights into the behavior of complex commands that spawn multiple processes. diff --git a/.chatgpt/2024-04-23.md b/.chatgpt/2024-04-23.md deleted file mode 100644 index 4018ccb8..00000000 --- a/.chatgpt/2024-04-23.md +++ /dev/null @@ -1,366 +0,0 @@ -User -Current tree - -ustin@fancy  ~/devel/duct   wrapper-smon-pydra-rewrite ± tree -. -├── profiler.py -├── profilers -│ ├── pydra.py -│ └── smon.py -├── pyproject.toml -├── README.md -├── sandbox -│ ├── abandoning_parent.sh -│ ├── add_git_notes -│ ├── consume_mem.py -│ ├── duct_time -│ ├── initial-thoughts.md -│ ├── profile.sh -│ └── top_test.sh -├── setup.cfg -├── smon -├── src -│ ├── duct.py -│ ├── profilers.py -│ └── __pycache__ -│ ├── duct.cpython-311.pyc -│ └── profilers.cpython-311.pyc -└── test_script.py - -5 directories, 19 files - -Heres how profilers are used >> 64 def generate_subreport(session_id, elapsed_time, report_interval, report, subreport): # E: line too long (85 > 79 characters) - 65 """Monitor and log details about all processes in the given session.""" - 66 if elapsed_time >= (subreport.number+1) * report_interval: - 67 report.subreports.append(subreport) - 68 subreport = SubReport(subreport.number+1) - 69 - 70 pids = get_processes_in_session(session_id) - 71 for pid in pids: - 72 profilers.pid_dummy_monitor(pid, elapsed_time, subreport) - 73 - 74 return subreport - 75 - -and heres src/profilers.py - - 1 import os - 2 - 3 - 4 def pid_dummy_monitor(pid, elapsed_time, subreport): - 5 try: - 6 os.kill(pid, 0) ->> 7 subreport.pids_dummy[pid].append(f"Process {pid} checked at {elapsed_time} seconds") # E: line too long (92 > 79 characters) - 8 except OSError: - 9 subreport.pids_dummy[pid].append(f"Process {pid} has terminated.") - - -Heres smon, a script with similar intentions. Please give me a new profilers.py with the metrics that smon collects. - - austin@fancy  ~/devel/duct   wrapper-smon-pydra-rewrite ± cat smon -#!/usr/bin/env python3 - -import os -import subprocess -import json -import time -import sys -import shutil - -name="_smon.out" - -sid=os.getsid(os.getpid()) - - -def get_size(start_path = '.'): - total_size = 0 - for dirpath, dirnames, filenames in os.walk(start_path): - for f in filenames: - fp = os.path.join(dirpath, f) - total_size += os.path.getsize(fp) - return total_size - -with open(name, "w") as outfile: - - env = {} - #put some batch scheduler specific env - for k in os.environ: - if k.startswith(("PBS_", "SLURM_", "OSG")): - env[k] = os.environ[k] - - #figure out max PPN - max_ppn = os.sysconf('SC_NPROCESSORS_CONF') #default to all available cores - if "PBS_NUM_PPN" in os.environ: - max_ppn = int(os.environ["PBS_NUM_PPN"]) - #from https://slurm.schedmd.com/srun.html - if "SLURM_CPUS_ON_NODE" in os.environ: #incase SLURM_NTASKS is not set? - max_ppn = int(os.environ["SLURM_CPUS_ON_NODE"]) - if "SLURM_NTASKS" in os.environ: - max_ppn = int(os.environ["SLURM_NTASKS"]) - - #figure out max mem - max_mem = os.sysconf('SC_PAGE_SIZE') * os.sysconf('SC_PHYS_PAGES') - max_walltime = None - if "PBS_JOBID" in os.environ: - #PBS doesn't expose requested memory in ENV.. I need to query from qstat - out=subprocess.check_output(["qstat", "-f1", os.environ["PBS_JOBID"]]).decode("utf-8") - qstat={} - for line in out.split("\n"): - delpos = line.find("=") - if delpos == -1: continue - k=line[0:delpos].strip() - v=line[delpos+1:].strip() - qstat[k] = v - - if "Resource_List.vmem" in qstat: - #only carbonate has this - max_mem_str = qstat["Resource_List.vmem"] #64gb, etc.. - if max_mem_str.endswith("gb"): - max_mem = int(max_mem_str[0:-2])*1024*1024*1024 #pbs treats gb and GB the same.. - - if "Walltime.Remaining" in qstat: - max_walltime = int(qstat["Walltime.Remaining"]) - - if "SLURM_MEM_PER_NODE" in os.environ: - #Default units are megabytes unless the SchedulerParameters configuration parameter includes the "default_gbytes" option for gigabytes. - #https://slurm.schedmd.com/sbatch.html - max_mem = int(os.environ["SLURM_MEM_PER_NODE"])*1024*1024 - - if "PBS_WALLTIME" in os.environ: - max_walltime = int(os.environ["PBS_WALLTIME"]) - - #TODO - figure out how to find walltime for slurm - #https://confluence.csiro.au/display/SC/Reference+Guide%3A+Migrating+from+Torque+to+SLURMt - - #query for gpu info - gpus = None - if shutil.which("nvidia-smi") is not None: - try: - out=subprocess.check_output(["nvidia-smi", "--query-gpu=index,name,pci.bus_id,driver_version,memory.total,compute_mode", "--format=csv"]).decode("utf-8") - lines=out.strip().split("\n") - header=lines.pop(0) - gpus = [] - for line in lines: - cols=line.split(", ") - gpus.append({ - "index": cols[0], - "name": cols[1], - "bus_id": cols[2], - "driver_version": cols[3], - "memory.total": cols[4], - "compute_mode": cols[5], - }) - except subprocess.CalledProcessError as e: - print(e) - - #dump info that doesn't change on the first entry - json.dump({ - "time": time.time(), - "uname": os.uname(), #os/kernerl/hostname version - "cpu_total": os.sysconf('SC_NPROCESSORS_CONF'), - "cpu_requested": max_ppn, - - "gpus": gpus, - - "memory_total": os.sysconf('SC_PAGE_SIZE') * os.sysconf('SC_PHYS_PAGES'), - "memory_requested": max_mem, - - "walltime_requested": max_walltime, - - "sid": sid, - "uid": os.environ['USER'], - - "env": env, - - }, outfile) - - outfile.write("\n") - outfile.flush() - - #now start infinite loop! - while True: - - #query gpu usage - if gpus != None: - gpus = [] - out=subprocess.check_output(["nvidia-smi", "--query-gpu=index,name,pstate,temperature.gpu,utilization.gpu,utilization.memory", "--format=csv"]).decode("utf-8") - lines=out.strip().split("\n") - header=lines.pop(0) - for line in lines: - cols=line.split(", ") - gpus.append({ - "index": cols[0], - "name": cols[1], - "pstate": cols[2], #The current performance state for the GPU. States range from P0 (maximum performance) to P12 (minimum performance). - "temperature.gpu": int(cols[3]), - "utilization.gpu": int(cols[4][:-1]), #remove % - "utilization.memory": int(cols[5][:-1]), #remove % - #"fan.speed": int(cols[6][:-1]), #remove % ([N/A] on bridges2) - }) - - #query process under current session (query every 2 seconds for 1 minute) - processes = {} - for i in range(30): - out=subprocess.check_output(["ps", "-s", str(sid), "ho", "pid,pcpu,pmem,rss,vsz,etime,cmd"]).decode("utf-8") - for line in out.split("\n"): - if line == "": - continue - tokens=line.split() - pid=tokens[0] - pcpu=float(tokens[1]) - pmem=float(tokens[2]) - rss=int(tokens[3]) - vsz=int(tokens[4]) - etime=tokens[5] - cmd=' '.join(tokens[6:]) - - #ignore myself. - if cmd.startswith("ps -s"): - continue - - #ignore smon - if cmd.startswith("python ./smon"): - continue - - #etime == elapsed time .. don't include process that just got started? (TODO why did I do this?) - if etime == "00:00": - continue - - if not pid in processes: - processes[pid] = [] - processes[pid].append({"pid": pid, "pcpu": pcpu, "pmem": pmem, "rss": rss, "vsz": vsz, "etime": etime, "cmd": cmd}) - - time.sleep(2) - - #aggregate(max) processes list for each pid - processes_groups = [] - for pid in processes: - group = processes[pid] - last = group[len(group)-1] - agg = {"pid": last["pid"], "pcpu": 0, "pmem": 0, "rss": 0, "vsz": 0, "etime": last["etime"], "cmd": last["cmd"]} - #pick max value - for p in group: - agg["pcpu"]=max(agg["pcpu"], p["pcpu"]) - agg["pmem"]=max(agg["pmem"], p["pmem"]) - agg["rss"]=max(agg["rss"], p["rss"]) - agg["vsz"]=max(agg["vsz"], p["vsz"]) - - processes_groups.append(agg) - - #query disk usage - disks = [] - try: - out=subprocess.check_output(["du", "-s"]).decode("utf-8") - for line in out.split("\n"): - if line == "": - continue - tokens=line.split() - size=int(tokens[0]) - path=tokens[1] - disks.append({"path": path, "size": size}) - except subprocess.CalledProcessError as err: - #sometimes files go missing while running du.. but that's ok - print(err) - - json.dump({ - "time": time.time(), - "processes": processes_groups, - "disks": disks, - "gpus": gpus, - "memory_avail": os.sysconf('SC_PAGE_SIZE') * os.sysconf('SC_AVPHYS_PAGES'), - #"memory_avail": psutil.virtual_memory().available, - #"memory_used": psutil.virtual_memory().used, - #"memory_free": psutil.virtual_memory().free, - }, outfile) - - outfile.write("\n") - outfile.flush() -ChatGPT - -To create a new version of profilers.py that incorporates the metrics collection similar to what the smon script does, we need to adapt some of its functionality to be suitable for inclusion in a Python module. This means structuring the code to collect system and process metrics in a function or class method that can be called, possibly storing or returning the gathered data in a structured format (like a dictionary) that could be further processed or serialized to JSON. - -Here's a revised version of profilers.py based on your smon script details: -New profilers.py Implementation - -python - -import subprocess -import os -import time -import json - -def get_system_info(): - """Gathers system information related to CPU, GPU, memory, and environment variables.""" - info = { - 'cpu_total': os.sysconf('SC_NPROCESSORS_CONF'), - 'memory_total': os.sysconf('SC_PAGE_SIZE') * os.sysconf('SC_PHYS_PAGES'), - 'env': {k: v for k, v in os.environ.items() if k.startswith(("PBS_", "SLURM_", "OSG"))}, - } - - # GPU information - if shutil.which("nvidia-smi"): - try: - gpu_info = subprocess.check_output(["nvidia-smi", "--query-gpu=index,name,pci.bus_id,driver_version,memory.total,compute_mode", "--format=csv"], text=True).strip().split('\n')[1:] - info['gpus'] = [dict(zip(gpu_info[0].split(", "), gpu.split(", "))) for gpu in gpu_info[1:]] - except subprocess.CalledProcessError: - info['gpus'] = "Failed to query GPU info" - - return info - -def monitor_processes(session_id): - """Monitor processes for the given session ID and collect resource usage metrics.""" - process_data = {} - try: - output = subprocess.check_output(["ps", "-s", str(session_id), "-o", "pid,pcpu,pmem,rss,vsz,etime,cmd"], text=True) - for line in output.splitlines()[1:]: - if line: - pid, pcpu, pmem, rss, vsz, etime, cmd = line.split(maxsplit=6) - process_data[pid] = { - 'pcpu': float(pcpu), - 'pmem': float(pmem), - 'rss': int(rss), - 'vsz': int(vsz), - 'etime': etime, - 'cmd': cmd - } - except subprocess.CalledProcessError: - process_data['error'] = "Failed to query process data" - - return process_data - -def pid_dummy_monitor(pid, elapsed_time, subreport): - """A dummy function to simulate process monitoring and logging.""" - try: - os.kill(pid, 0) # Check if the process is still running - subreport.pids_dummy[pid].append(f"Process {pid} checked at {elapsed_time} seconds") - except OSError: - subreport.pids_dummy[pid].append(f"Process {pid} has terminated.") - -def collect_metrics(): - """Main function to collect and dump metrics periodically.""" - session_id = os.getsid(os.getpid()) - system_info = get_system_info() - process_metrics = monitor_processes(session_id) - - # Here you would handle further data processing, storing, or reporting - with open("_smon.out", "w") as outfile: - json.dump({ - "time": time.time(), - "system_info": system_info, - "process_metrics": process_metrics - }, outfile, indent=4) - outfile.write("\n") - - # Implement a loop or event-based system for continuous monitoring if necessary - -if __name__ == "__main__": - collect_metrics() - -Explanation: - - get_system_info(): This function collects general system information, including CPU, memory, and optional GPU details. - monitor_processes(session_id): Function to fetch and parse process details for a given session. - pid_dummy_monitor(): Updated the function to keep its original intent but include the session-based monitoring for consistency. - collect_metrics(): Central function that orchestrates the collection of data and writes it to a file. - -This version of profilers.py adapts concepts from the smon script into more modular functions that can be integrated into a larger Python application or used for specific profiling tasks. Adjust the script according to specific needs, ensuring that any required permissions or system commands are compatible with the deployment environment. diff --git a/src/duct.py b/src/duct.py index 15986d50..79a929f1 100755 --- a/src/duct.py +++ b/src/duct.py @@ -3,10 +3,9 @@ from collections import defaultdict from dataclasses import dataclass, field from datetime import datetime -import errno import json import os -import pprint +from pathlib import Path import shutil import subprocess import sys @@ -213,63 +212,56 @@ def create_and_parse_args(): return parser.parse_args() -class TeeStream: - """TeeStream simultaneously streams to standard output (stdout) and a specified file.""" +class TailPipe: + """TailPipe simultaneously streams to an output stream (stdout or stderr) and a specified file.""" - listener_fd: int - writer_fd: int - file: TextIO + TAIL_CYCLE_TIME = 0.01 - def __init__(self, file_path: str) -> None: - self.file = open(file_path, "w") - ( - self.listener_fd, - self.writer_fd, - ) = os.openpty() # Use pseudo-terminal to simulate terminal behavior - - def fileno(self) -> int: - """Return the file descriptor to be used by subprocess as stdout/stderr.""" - return self.listener_fd + def __init__(self, file_path, buffer): + self.file_path = file_path + self.buffer = buffer + self.stop_event = None + self.infile = None + self.thread = None def start(self): - """Start a thread to read from the main_fd and write to stdout and the file.""" - thread = threading.Thread(target=self._redirect_output, daemon=True) - thread.start() - - def _redirect_output(self): - with os.fdopen(self.listener_fd, "rb", buffering=0) as stream: - while True: - try: - data = stream.read(1024) - except OSError as e: - if e.errno == errno.EIO: # The file has been closed - break - else: - raise - if not data: # Still open, but no new data to write - break - sys.stdout.buffer.write(data) - sys.stdout.buffer.flush() - self.file.write( - data.decode("utf-8", "replace") - ) # Handling decoding errors - self.file.flush() + Path(self.file_path).touch() + self.stop_event = threading.Event() + self.infile = open(self.file_path, "rb") + self.thread = threading.Thread(target=self._tail, daemon=True) + self.thread.start() + + def fileno(self): + return self.infile.fileno() + + def _catch_up(self): + data = self.infile.read() + if data: + self.buffer.write(data) + self.buffer.flush() + + def _tail(self): + try: + while not self.stop_event.is_set(): + self._catch_up() + time.sleep(TailPipe.TAIL_CYCLE_TIME) + + # After stop event, collect and passthrough data one last time + self._catch_up() + except Exception: + raise + finally: + self.buffer.flush() def close(self): - """Close the slave fd and the file when done.""" - os.close(self.writer_fd) - self.file.close() + self.stop_event.set() + self.thread.join() + self.infile.close() -def monitor_process( - stdout, stderr, report, process, report_interval, sample_interval, output_prefix -): +def monitor_process(report, process, report_interval, sample_interval, output_prefix): while True: if process.poll() is not None: # the passthrough command has finished - if hasattr(stdout, "close"): - stdout.close() - if hasattr(stderr, "close"): - stderr.close() break elapsed_time = time.time() - report.start_time @@ -291,14 +283,13 @@ def monitor_process( def prepare_outputs( capture_outputs: str, outputs: str, output_prefix: str -) -> Tuple[Union[TextIO, TeeStream, int], Union[TextIO, TeeStream, int]]: - stdout: Union[TextIO, TeeStream, int] - stderr: Union[TextIO, TeeStream, int] +) -> Tuple[Union[TextIO, TailPipe, int], Union[TextIO, TailPipe, int]]: + stdout: Union[TextIO, TailPipe, int] + stderr: Union[TextIO, TailPipe, int] - # Code remains the same if capture_outputs in ["all", "stdout"] and outputs in ["all", "stdout"]: - stdout = TeeStream(f"{output_prefix}stdout") - stdout.start() # type: ignore + stdout = TailPipe(f"{output_prefix}stdout", buffer=sys.stdout.buffer) + stdout.start() elif capture_outputs in ["all", "stdout"] and outputs in ["none", "stderr"]: stdout = open(f"{output_prefix}stdout", "w") elif capture_outputs in ["none", "stderr"] and outputs in ["all", "stdout"]: @@ -307,8 +298,8 @@ def prepare_outputs( stdout = subprocess.DEVNULL if capture_outputs in ["all", "stderr"] and outputs in ["all", "stderr"]: - stderr = TeeStream(f"{output_prefix}stderr") - stderr.start() # type: ignore + stderr = TailPipe(f"{output_prefix}stderr", buffer=sys.stderr.buffer) + stderr.start() elif capture_outputs in ["all", "stderr"] and outputs in ["none", "stdout"]: stderr = open(f"{output_prefix}stderr", "w") elif capture_outputs in ["none", "stdout"] and outputs in ["all", "stderr"]: @@ -347,21 +338,25 @@ def main(): stdout, stderr = prepare_outputs( args.capture_outputs, args.outputs, formatted_output_prefix ) + if isinstance(stdout, TailPipe): + stdout_file = open(stdout.file_path, "wb") + else: + stdout_file = stdout + if isinstance(stderr, TailPipe): + stderr_file = open(stderr.file_path, "wb") + else: + stderr_file = stderr + process = subprocess.Popen( [str(args.command)] + args.arguments, - stdout=stdout, - stderr=stderr, + stdout=stdout_file, + stderr=stderr_file, preexec_fn=os.setsid, ) session_id = os.getsid(process.pid) # Get session ID of the new process report = Report(args.command, session_id) - report.collect_environment() - report.get_system_info() - if args.record_types in ["all", "processes-samples"]: monitoring_args = [ - stdout, - stderr, report, process, args.report_interval, @@ -375,13 +370,22 @@ def main(): monitoring_thread.join() if args.record_types in ["all", "system-summary"]: + report.collect_environment() + report.get_system_info() system_info_path = f"{formatted_output_prefix}info.json" with open(system_info_path, "a") as system_logs: report.end_time = time.time() report.run_time_seconds = f"{report.end_time - report.start_time}" report.get_system_info() system_logs.write(str(report)) - pprint.pprint(report, width=120) + + process.wait() + if isinstance(stdout, TailPipe): + stdout_file.close() + stdout.close() + if isinstance(stderr, TailPipe): + stderr_file.close() + stderr.close() if __name__ == "__main__": diff --git a/test/cat_to_err.py b/test/cat_to_err.py new file mode 100755 index 00000000..8967b07f --- /dev/null +++ b/test/cat_to_err.py @@ -0,0 +1,15 @@ +#!/usr/bin/env python3 +import argparse +import sys + + +def cat_to_stream(path, buffer): + with open(path, "rb") as infile: + buffer.write(infile.read()) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Cat to stderr") + parser.add_argument("path", help="Path to the file to be catted") + args = parser.parse_args() + cat_to_stream(args.path, sys.stderr.buffer) diff --git a/test/test_aggregation.py b/test/test_aggregation.py new file mode 100644 index 00000000..4d496347 --- /dev/null +++ b/test/test_aggregation.py @@ -0,0 +1,2 @@ +def test_sanity(): + pass diff --git a/test/test_helpers.py b/test/test_helpers.py new file mode 100644 index 00000000..346d16b2 --- /dev/null +++ b/test/test_helpers.py @@ -0,0 +1,40 @@ +import os +from unittest import mock +import pytest +from duct import ensure_directories + + +def ensure_directoiries(path: str) -> None: + if path.endswith(os.sep): # If it ends in "/" (for linux) treat as a dir + os.makedirs(path, exist_ok=True) + else: + # Path does not end with a separator, treat the last part as a filename + directory = os.path.dirname(path) + if directory: # If there's a directory part, create it + os.makedirs(directory, exist_ok=True) + + +@pytest.mark.parametrize( + "path", + [ + "directory/", + "nested/directory/", + "/abs/path/", + ], +) +@mock.patch("duct.os.makedirs") +def test_ensure_directories_with_dirs(mock_mkdir, path): + ensure_directories(path) + mock_mkdir.assert_called_once_with(path, exist_ok=True) + + +@mock.patch("duct.os.makedirs") +def test_ensure_directories_with_file(mock_mkdir): + ensure_directories("just_a_file_name") + mock_mkdir.assert_not_called() + + +@mock.patch("duct.os.makedirs") +def test_ensure_directories_with_filepart_and_directory_part(mock_mkdir): + ensure_directories("nested/dir/file_name") + mock_mkdir.assert_called_once_with("nested/dir", exist_ok=True) diff --git a/test/test_prepare_outputs.py b/test/test_prepare_outputs.py index 666b8896..02ad3b59 100644 --- a/test/test_prepare_outputs.py +++ b/test/test_prepare_outputs.py @@ -1,28 +1,35 @@ import subprocess from unittest.mock import MagicMock, call, patch +from utils import MockStream from duct import prepare_outputs -def test_prepare_outputs_all_stdout(): +@patch("sys.stdout", new_callable=lambda: MockStream()) +def test_prepare_outputs_all_stdout(mock_stdout): output_prefix = "test_outputs_" - with patch("duct.TeeStream") as mock_tee_stream, patch( + with patch("duct.TailPipe") as mock_tee_stream, patch( "builtins.open", new_callable=MagicMock ) as mock_open: mock_tee_stream.return_value.start = MagicMock() stdout, stderr = prepare_outputs("all", "stdout", output_prefix) - mock_tee_stream.assert_called_with(f"{output_prefix}stdout") + mock_tee_stream.assert_called_with( + f"{output_prefix}stdout", buffer=mock_stdout.buffer + ) assert stdout == mock_tee_stream.return_value assert stderr == mock_open.return_value -def test_prepare_outputs_all_stderr(): +@patch("sys.stderr", new_callable=lambda: MockStream()) +def test_prepare_outputs_all_stderr(mock_stderr): output_prefix = "test_outputs_" - with patch("duct.TeeStream") as mock_tee_stream, patch( + with patch("duct.TailPipe") as mock_tee_stream, patch( "builtins.open", new_callable=MagicMock ) as mock_open: mock_tee_stream.return_value.start = MagicMock() stdout, stderr = prepare_outputs("all", "stderr", output_prefix) - mock_tee_stream.assert_called_with(f"{output_prefix}stderr") + mock_tee_stream.assert_called_with( + f"{output_prefix}stderr", buffer=mock_stderr.buffer + ) assert stdout == mock_open.return_value assert stderr == mock_tee_stream.return_value @@ -54,15 +61,17 @@ def test_prepare_outputs_none_stderr(): assert stdout == subprocess.DEVNULL -def test_prepare_outputs_all_all(): +@patch("sys.stderr", new_callable=lambda: MockStream()) +@patch("sys.stdout", new_callable=lambda: MockStream()) +def test_prepare_outputs_all_all(mock_stdout, mock_stderr): output_prefix = "test_outputs_" - with patch("duct.TeeStream") as mock_tee_stream: + with patch("duct.TailPipe") as mock_tee_stream: mock_tee_stream.return_value.start = MagicMock() stdout, stderr = prepare_outputs("all", "all", output_prefix) assert stdout == mock_tee_stream.return_value assert stderr == mock_tee_stream.return_value calls = [ - call(f"{output_prefix}stdout"), - call(f"{output_prefix}stderr"), + call(f"{output_prefix}stdout", buffer=mock_stdout.buffer), + call(f"{output_prefix}stderr", buffer=mock_stderr.buffer), ] mock_tee_stream.assert_has_calls(calls, any_order=True) diff --git a/test/test_tailpipe.py b/test/test_tailpipe.py new file mode 100644 index 00000000..0f082091 --- /dev/null +++ b/test/test_tailpipe.py @@ -0,0 +1,73 @@ +import os +import subprocess +import tempfile +from unittest.mock import patch +import pytest +from utils import MockStream +from duct import TailPipe + +# 10^7 line fixture is about 70MB +FIXTURE_LIST = [f"ten_{i}" for i in range(1, 8)] + + +@pytest.fixture(scope="module", params=FIXTURE_LIST) +def fixture_path(request, tmp_path_factory): + num_lines_exponent = int(request.param.split("_")[1]) + base_temp_dir = tmp_path_factory.mktemp("fixture_data") + file_path = base_temp_dir / f"{request.param}.txt" + with open(file_path, "w") as f: + for i in range(10**num_lines_exponent): + f.write(f"{i}\n") + # print(f"10 ^ {num_lines_exponent}: {10 ** num_lines_exponent}") + # print(f"Fixture file size: {os.path.getsize(file_path)} bytes") + yield str(file_path) + + os.remove(file_path) + + +@pytest.mark.parametrize("fixture_path", FIXTURE_LIST, indirect=True) +@patch("sys.stdout", new_callable=lambda: MockStream()) +def test_high_throughput_stdout(mock_stdout, fixture_path): + with tempfile.NamedTemporaryFile(mode="wb") as tmpfile: + process = subprocess.Popen( + ["cat", fixture_path], + stdout=tmpfile, + ) + stream = TailPipe(tmpfile.name, mock_stdout.buffer) + stream.start() + process.wait() + stream.close() + + assert process.returncode == 0 + with open(fixture_path, "rb") as fixture: + expected = fixture.read() + assert mock_stdout.getvalue() == expected + + +@pytest.mark.parametrize("fixture_path", FIXTURE_LIST, indirect=True) +@patch("sys.stderr", new_callable=lambda: MockStream()) +def test_high_throughput_stderr(mock_stderr, fixture_path): + with tempfile.NamedTemporaryFile(mode="wb") as tmpfile: + process = subprocess.Popen( + ["./test/cat_to_err.py", fixture_path], + stdout=subprocess.DEVNULL, + stderr=tmpfile, + ) + stream = TailPipe(tmpfile.name, mock_stderr.buffer) + stream.start() + process.wait() + stream.close() + + assert process.returncode == 0 + with open(fixture_path, "rb") as fixture: + expected = fixture.read() + assert mock_stderr.getvalue() == expected + + +@patch("sys.stdout", new_callable=lambda: MockStream()) +def test_close(mock_stdout): + with tempfile.NamedTemporaryFile(mode="wb") as tmpfile: + stream = TailPipe(tmpfile.name, mock_stdout.buffer) + stream.start() + stream.close() + assert stream.infile.closed diff --git a/test/utils.py b/test/utils.py new file mode 100644 index 00000000..7519e686 --- /dev/null +++ b/test/utils.py @@ -0,0 +1,11 @@ +from io import BytesIO + + +class MockStream: + """Mocks stderr or stdout""" + + def __init__(self): + self.buffer = BytesIO() + + def getvalue(self): + return self.buffer.getvalue() diff --git a/test_logs.py b/test_logs.py index 479ac371..580dbb81 100755 --- a/test_logs.py +++ b/test_logs.py @@ -8,8 +8,8 @@ def consume_cpu(duration, _): """Function to consume CPU proportional to 'load' for 'duration' seconds""" for i in range(duration): - print(f"out: {i}") - print(f"err: {i}", file=sys.stderr) + print(f"out: {i}", file=sys.stdout, flush=True) + print(f"err: {i}", file=sys.stderr, flush=True) time.sleep(1) @@ -21,15 +21,17 @@ def consume_memory(size): def main(duration, cpu_load, memory_size): - print("Printing something to STDOUT at start") - print("Printing something to STDERR at start", file=sys.stderr) + print("Printing something to STDOUT at start", file=sys.stdout, flush=True) + print("Printing something to STDERR at start", file=sys.stderr, flush=True) consume_memory(memory_size) consume_cpu(duration, cpu_load) print( - f"Test completed. Consumed {memory_size} MB for {duration} seconds with CPU load factor {cpu_load}." + f"Test completed. Consumed {memory_size} MB for {duration} seconds with CPU load factor {cpu_load}.", + file=sys.stdout, + flush=True, ) - print("Printing something to STDOUT at finish") - print("Printing something to STDERR at finish", file=sys.stderr) + print("Printing something to STDOUT at finish", file=sys.stdout, flush=True) + print("Printing something to STDERR at finish", file=sys.stderr, flush=True) if __name__ == "__main__": diff --git a/tox.ini b/tox.ini index d2d7530c..d74292de 100644 --- a/tox.ini +++ b/tox.ini @@ -33,7 +33,6 @@ commands = mypy src test [pytest] -addopts = --cov=duct --no-cov-on-fail filterwarnings = error norecursedirs = test/data markers =