Skip to content

Commit

Permalink
fix: Better file and multiprocessing error handling (#151)
Browse files Browse the repository at this point in the history
  • Loading branch information
JuanPedroGHM authored Dec 19, 2024
1 parent 50553ff commit d12a177
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 16 deletions.
11 changes: 8 additions & 3 deletions perun/api/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,9 +324,14 @@ def monitor(args: argparse.Namespace):
log.debug(f"Cmd args: {cmd_args}")
if not args.binary:
scriptPath = Path(cmd)
assert scriptPath.exists()
assert scriptPath.is_file()
assert scriptPath.suffix == ".py"
try:
assert scriptPath.exists()
assert scriptPath.is_file()
assert scriptPath.suffix == ".py"
except AssertionError:
log.error(
f"Invalid script path. File {scriptPath} does not exist or is not a python script."
)

sys.path.insert(0, str(scriptPath.parent.absolute()))
app = Application(scriptPath, config, args=tuple(sys.argv[1:]))
Expand Down
18 changes: 17 additions & 1 deletion perun/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,10 @@ def monitor_application(
log.info(f"Rank {self.comm.Get_rank()}: Monitoring start")
multirun_nodes: Dict[str, DataNode] = {}
self.warmup_round = False
for i in range(self.config.getint("benchmarking", "rounds")):
i = 0
rounds = self.config.getint("benchmarking", "rounds")
while i < rounds:
log.info(f"Rank {self.comm.Get_rank()}: Starting run {i}")
status, runNode, last_result = self._monitor.run_application(
str(i), record=True
)
Expand All @@ -351,15 +354,28 @@ def monitor_application(
elif status == MonitorStatus.FILE_NOT_FOUND:
log.error(f"Rank {self.comm.Get_rank()}: App not found")
return
elif status == MonitorStatus.SP_ERROR:
log.error(
f"Rank {self.comm.Get_rank()}: Failed to start run {i}, saving previous runs (if any), and exiting."
)
self._monitor.status = MonitorStatus.PROCESSING
# Ideally this should just retry to run the application again, hopping for the perunSubprocess to work, but this is not working as expected, because of heat's incrementalSVD, so we will just exit out of the loop for now. This should be fixed in the future.
# This should still save the data from the previous run, so it should be fine.

# continue
break

if self.comm.Get_rank() == 0 and runNode:
log.info(f"Rank {self.comm.Get_rank()}: Processing run {i}")
runNode.metadata = {**runNode.metadata, **self.l_host_metadata}
for node in runNode.nodes.values():
node.metadata["mpi_ranks"] = self.host_rank[node.id]

runNode = processDataNode(runNode, self.config)
multirun_nodes[str(i)] = runNode

i += 1

# Get app node data if it exists
if self.comm.Get_rank() == 0 and len(multirun_nodes) > 0:
multirun_node = self._process_multirun(multirun_nodes)
Expand Down
79 changes: 67 additions & 12 deletions perun/monitoring/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import enum
import logging
import multiprocessing
import pprint as pp
import time
from configparser import ConfigParser
Expand Down Expand Up @@ -42,6 +43,7 @@ class MonitorStatus(enum.Enum):
PROCESSING = enum.auto()
SCRIPT_ERROR = enum.auto()
PERUN_ERROR = enum.auto()
SP_ERROR = enum.auto()
MPI_ERROR = enum.auto()
FILE_NOT_FOUND = enum.auto()
SUCCESS = enum.auto()
Expand Down Expand Up @@ -161,6 +163,7 @@ def run_application(
)
except Exception as e:
self.status = MonitorStatus.SCRIPT_ERROR
result = None
log.error(
f"Rank {self._comm.Get_rank()}: Found error on monitored application: {str(self._app)}"
)
Expand All @@ -186,6 +189,9 @@ def _run_python_app(
f"Rank {self._comm.Get_rank()} - Local Backendens : {pp.pformat(self._l_assigned_sensors)}"
)
self.queue = Queue()
log.info(
f"Rank {self._comm.Get_rank()}: {self.queue}, {self._backends}, {self._l_assigned_sensors}, {self._config}, {self.sp_ready_event}, {self.start_event}, {self.stop_event}, {self._config.getfloat('monitor', 'sampling_period')}"
)
self.perunSP = Process(
target=perunSubprocess,
args=[
Expand All @@ -200,19 +206,50 @@ def _run_python_app(
self._config.getfloat("monitor", "sampling_period"),
],
)
log.info(f"Rank {self._comm.Get_rank()}: Starting monitoring subprocess")
self.perunSP.start()
log.debug(f"Rank {self._comm.Get_rank()}: Alive: {self.perunSP.is_alive()}")
log.debug(f"Rank {self._comm.Get_rank()}: SP PID: {self.perunSP.pid}")
log.debug(
f"Rank {self._comm.Get_rank()}: SP Exit Code: {self.perunSP.exitcode}"
)
log.info(f"Rank {self._comm.Get_rank()}: Monitoring subprocess started")
else:
self.sp_ready_event.set() # type: ignore

event_set = self.sp_ready_event.wait(30) # type: ignore
if self.perunSP and not event_set:
log.error(
f"Rank {self._comm.Get_rank()}: Children: {multiprocessing.active_children()}"
)
log.error(
f"Rank {self._comm.Get_rank()}: Monitoring subprocess did not start in time"
)
log.error(f"Rank {self._comm.Get_rank()}: Alive: {self.perunSP.is_alive()}")
log.error(f"Rank {self._comm.Get_rank()}: SP PID: {self.perunSP.pid}")
log.error(f"Rank {self._comm.Get_rank()}: SP Exit Code: {self.perunSP}")
self.status = MonitorStatus.SP_ERROR
self._close_subprocess()

log.info(f"Rank {self._comm.Get_rank()}: Waiting for everyones status")
self.all_status = self._comm.allgather(self.status)
if MonitorStatus.SP_ERROR in self.all_status:
log.error(f"Rank {self._comm.Get_rank()}: Stopping run")
log.error(
f"Rank {self._comm.Get_rank()}: Children: {multiprocessing.active_children()}"
)

self.status = MonitorStatus.SP_ERROR
self._reset_subprocess_handlers()

return self.status, None, None

# 3) Start application
self.sp_ready_event.wait() # type: ignore
log.info(f"Rank {self._comm.Get_rank()}: Starting App")
self.local_regions = LocalRegions()
self._comm.barrier()

log.info(f"Rank {self._comm.Get_rank()}: Started App")
self.status = MonitorStatus.RUNNING
self.start_event.set() # type: ignore
starttime_ns = time.time_ns()
self.status = MonitorStatus.RUNNING
try:
app_result = self._app.run()
except SystemExit:
Expand Down Expand Up @@ -336,16 +373,13 @@ def _process_single_run(
If the rank spawned a subprocess, returns the data node with the data.
"""
if self.queue and self.perunSP:
log.info(f"Rank {self._comm.Get_rank()}: Getting queue contents")
log.info(f"Rank {self._comm.Get_rank()}: Collecting queue data.")
nodeData = self.queue.get(block=True)
log.info(f"Rank {self._comm.Get_rank()}: Got queue contents")
log.info(f"Rank {self._comm.Get_rank()}: Waiting for subprocess to close")
self.perunSP.join()
self.perunSP.close()
log.info(f"Rank {self._comm.Get_rank()}: Subprocess closed")
self.queue.close()
log.info(f"Rank {self._comm.Get_rank()}: Closing subprocess.")
self._close_subprocess()
else:
nodeData = None
self._reset_subprocess_handlers()

log.info(f"Rank {self._comm.Get_rank()}: Gathering data.")

Expand Down Expand Up @@ -379,3 +413,24 @@ def _process_single_run(

return runNode
return None

def _close_subprocess(self) -> None:
"""Close the subprocess."""
if self.perunSP and self.queue:
self.perunSP.join(30)
if self.perunSP.exitcode is None:
log.warning(
f"Rank {self._comm.Get_rank()}: Monitoring subprocess did not close in time, terminating."
)
self.perunSP.terminate()
self.perunSP.join()
if self.perunSP.exitcode and self.perunSP.exitcode != 0:
log.warning(
f"Rank {self._comm.Get_rank()}: Monitoring subprocess exited with code {self.perunSP.exitcode}"
)

self.queue.close()
self.queue = None
log.info(f"Rank {self._comm.Get_rank()}: Monitoring subprocess closed")

self._reset_subprocess_handlers()
1 change: 1 addition & 0 deletions perun/monitoring/subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ def perunSubprocess(
sampling_period : float
Sampling period in seconds
"""
log.debug(f"Rank {rank}: Subprocess: Entered perunSubprocess")
(
timesteps,
t_metadata,
Expand Down

0 comments on commit d12a177

Please sign in to comment.