Skip to content

Commit

Permalink
improve MemoryMonitor to be real time, and hopefully fix async garbag…
Browse files Browse the repository at this point in the history
…e collection issues
  • Loading branch information
bbean23 committed Mar 28, 2024
1 parent d30a981 commit ff10235
Showing 1 changed file with 67 additions and 77 deletions.
144 changes: 67 additions & 77 deletions opencsp/common/lib/process/MemoryMonitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import datetime
import multiprocessing
from multiprocessing.synchronize import Event as multiprocessing_event_type
from queue import Empty as QueueEmpty, Full as QueueFull
import time
from typing import Callable

Expand Down Expand Up @@ -74,8 +75,8 @@ def __init__(
self._start_datetime: datetime.datetime = None
self._end_datetime: datetime.datetime = None
self._process_finished = False
self._min_max_avg_usage = [10e10, 0, 0]
self._min_max_avg_free = [10e10, 0, 0]
self._min_max_avg_usage = [10e10, 0, 0, 0]
self._min_max_avg_free = [10e10, 0, 0, 0]

def start(self):
"""Starts this monitor. Returns True if started, or False if it has been started previously."""
Expand Down Expand Up @@ -135,119 +136,108 @@ def _run(self):

queue = multiprocessing.Queue()
self._proc = multiprocessing.Process(
target=_monitor_sys_memory,
args=[
self._max_lifetime_seconds,
self._stop_sig,
queue,
self._print_threshold,
self._print_on_new_min,
self._always_print,
self._log_func
],
target=_monitor_sys_memory, args=[self._max_lifetime_seconds, self._stop_sig, queue]
)
self._proc.start()

while self._proc.is_alive():
# print("-", end="")
# Check if the child memory watching thread has timed out
elapsed = time.time() - start_time
if elapsed > self._max_lifetime_seconds + 3:
self._proc.terminate()
self._stop_sig.set()
i = 0
while self._proc.is_alive() and i < 3:
time.sleep(1)
i += 1
if self._proc.is_alive():
self._proc.terminate()

# Check if stop() has been called
if self._stop_sig.is_set():
break
time.sleep(0.1)
self._process_finished = True

while not queue.empty():
elapsed, sys_tot, sys_used, sys_free = queue.get()
dt = self._start_datetime + datetime.timedelta(seconds=elapsed)
self._log[dt] = sys_tot, sys_used, sys_free
self._min_max_avg_usage[0] = min(self._min_max_avg_usage[0], sys_used)
self._min_max_avg_usage[1] = max(self._min_max_avg_usage[1], sys_used)
self._min_max_avg_usage[2] += sys_used
self._min_max_avg_free[0] = min(self._min_max_avg_free[0], sys_free)
self._min_max_avg_free[1] = max(self._min_max_avg_free[1], sys_free)
self._min_max_avg_free[2] += sys_free

if len(self._log) > 0:
self._min_max_avg_usage[2] /= len(self._log)
self._min_max_avg_free[2] /= len(self._log)
# Try to get the latest value from the child process
message: tuple[float, float, float, float] = None
try:
message = queue.get(timeout=1)
except QueueEmpty:
# meh, no message available yet
time.sleep(0.1)
if message is not None:
elapsed, sys_tot, sys_used, sys_free = message

# check about printing on a new minimum free, or above a used threshold
do_print_min, do_print_threshold = False, False
if float("%0.1f" % sys_free) < self.min_free():
do_print_min = self._print_on_new_min
if sys_used / sys_tot > self._print_threshold:
do_print_threshold = True

# Print the returned value
if do_print_min or do_print_threshold or self._always_print:
self._log_func(
"MM: %s %s %s %s"
% (
f"{int(elapsed):>3d}",
f"{sys_tot:0.1f}".rjust(5),
f"{sys_used:0.1f}".rjust(5),
f"{sys_free:0.1f}".rjust(5),
)
)

# Register the returned value
self._register_value(elapsed, sys_tot, sys_used, sys_free)

# Either the child process has finished, or we have force stopped it
self._process_finished = True
self._end_datetime = datetime.datetime.now()

def _register_value(self, elapsed: float, sys_tot: float, sys_used: float, sys_free: float):
dt = self._start_datetime + datetime.timedelta(seconds=elapsed)
self._log[dt] = sys_tot, sys_used, sys_free
self._min_max_avg_usage[0] = min(self._min_max_avg_usage[0], sys_used)
self._min_max_avg_usage[1] = max(self._min_max_avg_usage[1], sys_used)
self._min_max_avg_usage[3] += sys_used
self._min_max_avg_usage[2] = self._min_max_avg_usage[3] / len(self._log)
self._min_max_avg_free[0] = min(self._min_max_avg_free[0], sys_free)
self._min_max_avg_free[1] = max(self._min_max_avg_free[1], sys_free)
self._min_max_avg_free[3] += sys_used
self._min_max_avg_free[2] = self._min_max_avg_free[3] / len(self._log)

def min_usage(self):
"""Returns the minimum memory usage while the monitor was running, in GB."""
"""Returns the minimum memory usage, in GB."""
if not self.done():
return None
return self._min_max_avg_usage[0]

def max_usage(self):
"""Returns the maximum memory usage while the monitor was running, in GB."""
if not self.done():
return None
"""Returns the maximum memory usage, in GB."""
return self._min_max_avg_usage[1]

def avg_usage(self):
"""Returns the average memory usage while the monitor was running, in GB."""
if not self.done():
return None
"""Returns the average memory usage, in GB."""
return self._min_max_avg_usage[2]

def min_free(self):
"""Returns the minimum memory free while the monitor was running, in GB."""
if not self.done():
return None
"""Returns the minimum memory free, in GB."""
return self._min_max_avg_free[0]

def max_free(self):
"""Returns the maximum memory free while the monitor was running, in GB."""
if not self.done():
return None
"""Returns the maximum memory free, in GB."""
return self._min_max_avg_free[1]

def avg_free(self):
"""Returns the average memory free while the monitor was running, in GB."""
if not self.done():
return None
return self._min_max_avg_free[2]


def _monitor_sys_memory(
max_lifetime_secs: int,
stop_sig: multiprocessing_event_type,
queue: multiprocessing.Queue,
print_threshold: float,
print_on_new_min: bool,
always_print: bool,
log_func: Callable,
):
def _monitor_sys_memory(max_lifetime_secs: int, stop_sig: multiprocessing_event_type, queue: multiprocessing.Queue):
start = time.time()
# print("monitor started")

prev_min = 10e10
while (elapsed := time.time() - start) < max_lifetime_secs:
sys_tot, sys_used, sys_free = st.mem_status()

sys_avail_for_min = float("%0.1f" % sys_free)
is_new_min = False
if sys_avail_for_min < prev_min:
prev_min = sys_avail_for_min
is_new_min = True
do_print_min = is_new_min and print_on_new_min

used_ratio = sys_used / sys_tot
do_print_threshold = used_ratio > print_threshold

queue.put(tuple([elapsed, sys_tot, sys_used, sys_free]))
if do_print_min or do_print_threshold or always_print:
log_func(
"MM: %s %s %s %s"
% (
f"{int(elapsed):>3d}",
f"{sys_tot:0.1f}".rjust(5),
f"{sys_used:0.1f}".rjust(5),
f"{sys_free:0.1f}".rjust(5),
)
)

if stop_sig.wait(1):
break

Expand Down

0 comments on commit ff10235

Please sign in to comment.