From ff1023516ab941893826c6e209efc2c04c8f0114 Mon Sep 17 00:00:00 2001 From: bbean Date: Wed, 27 Mar 2024 21:05:46 -0600 Subject: [PATCH] improve MemoryMonitor to be real time, and hopefully fix async garbage collection issues --- opencsp/common/lib/process/MemoryMonitor.py | 144 +++++++++----------- 1 file changed, 67 insertions(+), 77 deletions(-) diff --git a/opencsp/common/lib/process/MemoryMonitor.py b/opencsp/common/lib/process/MemoryMonitor.py index c6a8e00ff..063623665 100644 --- a/opencsp/common/lib/process/MemoryMonitor.py +++ b/opencsp/common/lib/process/MemoryMonitor.py @@ -2,6 +2,7 @@ import datetime import multiprocessing from multiprocessing.synchronize import Event as multiprocessing_event_type +from queue import Empty as QueueEmpty, Full as QueueFull import time from typing import Callable @@ -74,8 +75,8 @@ def __init__( self._start_datetime: datetime.datetime = None self._end_datetime: datetime.datetime = None self._process_finished = False - self._min_max_avg_usage = [10e10, 0, 0] - self._min_max_avg_free = [10e10, 0, 0] + self._min_max_avg_usage = [10e10, 0, 0, 0] + self._min_max_avg_free = [10e10, 0, 0, 0] def start(self): """Starts this monitor. Returns True if started, or False if it has been started previously.""" @@ -135,119 +136,108 @@ def _run(self): queue = multiprocessing.Queue() self._proc = multiprocessing.Process( - target=_monitor_sys_memory, - args=[ - self._max_lifetime_seconds, - self._stop_sig, - queue, - self._print_threshold, - self._print_on_new_min, - self._always_print, - self._log_func - ], + target=_monitor_sys_memory, args=[self._max_lifetime_seconds, self._stop_sig, queue] ) self._proc.start() + while self._proc.is_alive(): - # print("-", end="") + # Check if the child memory watching thread has timed out elapsed = time.time() - start_time if elapsed > self._max_lifetime_seconds + 3: - self._proc.terminate() + self._stop_sig.set() + i = 0 + while self._proc.is_alive() and i < 3: + time.sleep(1) + i += 1 + if self._proc.is_alive(): + self._proc.terminate() + + # Check if stop() has been called if self._stop_sig.is_set(): break - time.sleep(0.1) - self._process_finished = True - while not queue.empty(): - elapsed, sys_tot, sys_used, sys_free = queue.get() - dt = self._start_datetime + datetime.timedelta(seconds=elapsed) - self._log[dt] = sys_tot, sys_used, sys_free - self._min_max_avg_usage[0] = min(self._min_max_avg_usage[0], sys_used) - self._min_max_avg_usage[1] = max(self._min_max_avg_usage[1], sys_used) - self._min_max_avg_usage[2] += sys_used - self._min_max_avg_free[0] = min(self._min_max_avg_free[0], sys_free) - self._min_max_avg_free[1] = max(self._min_max_avg_free[1], sys_free) - self._min_max_avg_free[2] += sys_free - - if len(self._log) > 0: - self._min_max_avg_usage[2] /= len(self._log) - self._min_max_avg_free[2] /= len(self._log) + # Try to get the latest value from the child process + message: tuple[float, float, float, float] = None + try: + message = queue.get(timeout=1) + except QueueEmpty: + # meh, no message available yet + time.sleep(0.1) + if message is not None: + elapsed, sys_tot, sys_used, sys_free = message + + # check about printing on a new minimum free, or above a used threshold + do_print_min, do_print_threshold = False, False + if float("%0.1f" % sys_free) < self.min_free(): + do_print_min = self._print_on_new_min + if sys_used / sys_tot > self._print_threshold: + do_print_threshold = True + + # Print the returned value + if do_print_min or do_print_threshold or self._always_print: + self._log_func( + "MM: %s %s %s %s" + % ( + f"{int(elapsed):>3d}", + f"{sys_tot:0.1f}".rjust(5), + f"{sys_used:0.1f}".rjust(5), + f"{sys_free:0.1f}".rjust(5), + ) + ) + + # Register the returned value + self._register_value(elapsed, sys_tot, sys_used, sys_free) + + # Either the child process has finished, or we have force stopped it + self._process_finished = True self._end_datetime = datetime.datetime.now() + def _register_value(self, elapsed: float, sys_tot: float, sys_used: float, sys_free: float): + dt = self._start_datetime + datetime.timedelta(seconds=elapsed) + self._log[dt] = sys_tot, sys_used, sys_free + self._min_max_avg_usage[0] = min(self._min_max_avg_usage[0], sys_used) + self._min_max_avg_usage[1] = max(self._min_max_avg_usage[1], sys_used) + self._min_max_avg_usage[3] += sys_used + self._min_max_avg_usage[2] = self._min_max_avg_usage[3] / len(self._log) + self._min_max_avg_free[0] = min(self._min_max_avg_free[0], sys_free) + self._min_max_avg_free[1] = max(self._min_max_avg_free[1], sys_free) + self._min_max_avg_free[3] += sys_used + self._min_max_avg_free[2] = self._min_max_avg_free[3] / len(self._log) + def min_usage(self): - """Returns the minimum memory usage while the monitor was running, in GB.""" + """Returns the minimum memory usage, in GB.""" if not self.done(): return None return self._min_max_avg_usage[0] def max_usage(self): - """Returns the maximum memory usage while the monitor was running, in GB.""" - if not self.done(): - return None + """Returns the maximum memory usage, in GB.""" return self._min_max_avg_usage[1] def avg_usage(self): - """Returns the average memory usage while the monitor was running, in GB.""" - if not self.done(): - return None + """Returns the average memory usage, in GB.""" return self._min_max_avg_usage[2] def min_free(self): - """Returns the minimum memory free while the monitor was running, in GB.""" - if not self.done(): - return None + """Returns the minimum memory free, in GB.""" return self._min_max_avg_free[0] def max_free(self): - """Returns the maximum memory free while the monitor was running, in GB.""" - if not self.done(): - return None + """Returns the maximum memory free, in GB.""" return self._min_max_avg_free[1] def avg_free(self): """Returns the average memory free while the monitor was running, in GB.""" - if not self.done(): - return None return self._min_max_avg_free[2] -def _monitor_sys_memory( - max_lifetime_secs: int, - stop_sig: multiprocessing_event_type, - queue: multiprocessing.Queue, - print_threshold: float, - print_on_new_min: bool, - always_print: bool, - log_func: Callable, -): +def _monitor_sys_memory(max_lifetime_secs: int, stop_sig: multiprocessing_event_type, queue: multiprocessing.Queue): start = time.time() # print("monitor started") - prev_min = 10e10 while (elapsed := time.time() - start) < max_lifetime_secs: sys_tot, sys_used, sys_free = st.mem_status() - - sys_avail_for_min = float("%0.1f" % sys_free) - is_new_min = False - if sys_avail_for_min < prev_min: - prev_min = sys_avail_for_min - is_new_min = True - do_print_min = is_new_min and print_on_new_min - - used_ratio = sys_used / sys_tot - do_print_threshold = used_ratio > print_threshold - queue.put(tuple([elapsed, sys_tot, sys_used, sys_free])) - if do_print_min or do_print_threshold or always_print: - log_func( - "MM: %s %s %s %s" - % ( - f"{int(elapsed):>3d}", - f"{sys_tot:0.1f}".rjust(5), - f"{sys_used:0.1f}".rjust(5), - f"{sys_free:0.1f}".rjust(5), - ) - ) - if stop_sig.wait(1): break