Skip to content

Commit

Permalink
Fix error logging and handling within the parallel region.
Browse files Browse the repository at this point in the history
  • Loading branch information
lohedges committed Nov 2, 2023
1 parent 1852d09 commit 3c730d1
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 16 deletions.
2 changes: 0 additions & 2 deletions src/somd2/runner/_dynamics.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,6 @@ def generate_lam_vals(lambda_base, increment):
df = self._system.energy_trajectory(to_alchemlyb=True)
if _ == 0:
# Not including speed in checkpoints for now.

f = _dataframe_to_parquet(
df,
metadata={
Expand All @@ -287,7 +286,6 @@ def generate_lam_vals(lambda_base, increment):
filepath=self._config.output_directory,
filename=f"energy_traj_{self._lambda_array.index(self._lambda_val)}.parquet",
)
print(f)
else:
_parquet_append(
f,
Expand Down
38 changes: 24 additions & 14 deletions src/somd2/runner/_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,10 +365,11 @@ def run(self):
Returns
--------
results : [str]
List of simulation results.
results : [bool]
List of simulation results. (Currently whether the simulation finished
successfully or not.)
"""
results = []
results = self._manager.list()
if self._config.run_parallel and (self._config.num_lambda is not None):
# Create shared resources.
self._create_shared_resources()
Expand Down Expand Up @@ -403,18 +404,20 @@ def run(self):
import concurrent.futures as _futures

with _futures.ProcessPoolExecutor(max_workers=self.max_workers) as executor:
jobs = {}
for lambda_value in self._lambda_values:
kwargs = {"lambda_value": lambda_value}
jobs = {executor.submit(self.run_window, **kwargs): lambda_value}
jobs[executor.submit(self.run_window, lambda_value)] = lambda_value
try:
for job in _futures.as_completed(jobs):
lam = jobs[job]
lambda_value = jobs[job]
try:
result = job.result()
except Exception as e:
_logger.warning(f"Lambda = {lam} failed with {e}")
pass
else:
result = False
_logger.error(
f"Exception raised for lambda = {lambda_value}: {e}"
)
with self._lock:
results.append(result)
# Kill all current and future jobs if keyboard interrupt.
except KeyboardInterrupt:
Expand All @@ -430,14 +433,17 @@ def run(self):
for i in range(0, self._config.num_lambda)
]
for lambda_value in self._lambda_values:
result = self.run_window(lambda_value)
try:
result = self.run_window(lambda_value)
except:
result = False
results.append(result)

else:
raise ValueError(
"Vanilla MD not currently supported. Please set num_lambda > 1."
)
# Results as boolean list - window finished

return results

def run_window(self, lambda_value):
Expand All @@ -459,7 +465,6 @@ def run_window(self, lambda_value):
result: str
The result of the simulation.
"""
_logger.info(f"Running lambda = {lambda_value}")

def _run(sim):
# This function is complex due to the mixture of options for minimisation and dynamics
Expand Down Expand Up @@ -506,10 +511,13 @@ def _run(sim):
gpu_num = self._gpu_pool[0]
self._remove_gpu_from_pool(gpu_num)
if lambda_value is not None:
print(f"Running lambda = {lambda_value} on GPU {gpu_num}")
_logger.info(
f"Running lambda = {lambda_value} on GPU {gpu_num}"
)
# Assumes that device for non-parallel GPU jobs is 0
else:
gpu_num = 0
_logger.info("Running lambda = {lambda_value} on GPU 0")
self._initialise_simulation(system, lambda_value, device=gpu_num)
try:
df, lambda_grad, speed = _run(self._sim)
Expand All @@ -526,6 +534,8 @@ def _run(sim):

# All other platforms.
else:
_logger.info(f"Running lambda = {lambda_value}")

self._initialise_simulation(system, lambda_value)
try:
df, lambda_grad, speed = _run(self._sim)
Expand All @@ -548,4 +558,4 @@ def _run(sim):
)
del system
_logger.success("Lambda = {} complete".format(lambda_value))
return f"Lambda = {lambda_value} complete"
return True

0 comments on commit 3c730d1

Please sign in to comment.