From 3c730d1931ede9b4ec575a862868d739dc7ee2f7 Mon Sep 17 00:00:00 2001 From: Lester Hedges Date: Thu, 2 Nov 2023 16:55:44 +0000 Subject: [PATCH] Fix error logging and handling within the parallel region. --- src/somd2/runner/_dynamics.py | 2 -- src/somd2/runner/_runner.py | 38 ++++++++++++++++++++++------------- 2 files changed, 24 insertions(+), 16 deletions(-) diff --git a/src/somd2/runner/_dynamics.py b/src/somd2/runner/_dynamics.py index 55babcc..e899927 100644 --- a/src/somd2/runner/_dynamics.py +++ b/src/somd2/runner/_dynamics.py @@ -274,7 +274,6 @@ def generate_lam_vals(lambda_base, increment): df = self._system.energy_trajectory(to_alchemlyb=True) if _ == 0: # Not including speed in checkpoints for now. - f = _dataframe_to_parquet( df, metadata={ @@ -287,7 +286,6 @@ def generate_lam_vals(lambda_base, increment): filepath=self._config.output_directory, filename=f"energy_traj_{self._lambda_array.index(self._lambda_val)}.parquet", ) - print(f) else: _parquet_append( f, diff --git a/src/somd2/runner/_runner.py b/src/somd2/runner/_runner.py index b7205ce..3c91b61 100644 --- a/src/somd2/runner/_runner.py +++ b/src/somd2/runner/_runner.py @@ -365,10 +365,11 @@ def run(self): Returns -------- - results : [str] - List of simulation results. + results : [bool] + List of simulation results. (Currently whether the simulation finished + successfully or not.) """ - results = [] + results = self._manager.list() if self._config.run_parallel and (self._config.num_lambda is not None): # Create shared resources. self._create_shared_resources() @@ -403,18 +404,20 @@ def run(self): import concurrent.futures as _futures with _futures.ProcessPoolExecutor(max_workers=self.max_workers) as executor: + jobs = {} for lambda_value in self._lambda_values: - kwargs = {"lambda_value": lambda_value} - jobs = {executor.submit(self.run_window, **kwargs): lambda_value} + jobs[executor.submit(self.run_window, lambda_value)] = lambda_value try: for job in _futures.as_completed(jobs): - lam = jobs[job] + lambda_value = jobs[job] try: result = job.result() except Exception as e: - _logger.warning(f"Lambda = {lam} failed with {e}") - pass - else: + result = False + _logger.error( + f"Exception raised for lambda = {lambda_value}: {e}" + ) + with self._lock: results.append(result) # Kill all current and future jobs if keyboard interrupt. except KeyboardInterrupt: @@ -430,14 +433,17 @@ def run(self): for i in range(0, self._config.num_lambda) ] for lambda_value in self._lambda_values: - result = self.run_window(lambda_value) + try: + result = self.run_window(lambda_value) + except: + result = False results.append(result) else: raise ValueError( "Vanilla MD not currently supported. Please set num_lambda > 1." ) - # Results as boolean list - window finished + return results def run_window(self, lambda_value): @@ -459,7 +465,6 @@ def run_window(self, lambda_value): result: str The result of the simulation. """ - _logger.info(f"Running lambda = {lambda_value}") def _run(sim): # This function is complex due to the mixture of options for minimisation and dynamics @@ -506,10 +511,13 @@ def _run(sim): gpu_num = self._gpu_pool[0] self._remove_gpu_from_pool(gpu_num) if lambda_value is not None: - print(f"Running lambda = {lambda_value} on GPU {gpu_num}") + _logger.info( + f"Running lambda = {lambda_value} on GPU {gpu_num}" + ) # Assumes that device for non-parallel GPU jobs is 0 else: gpu_num = 0 + _logger.info("Running lambda = {lambda_value} on GPU 0") self._initialise_simulation(system, lambda_value, device=gpu_num) try: df, lambda_grad, speed = _run(self._sim) @@ -526,6 +534,8 @@ def _run(sim): # All other platforms. else: + _logger.info(f"Running lambda = {lambda_value}") + self._initialise_simulation(system, lambda_value) try: df, lambda_grad, speed = _run(self._sim) @@ -548,4 +558,4 @@ def _run(sim): ) del system _logger.success("Lambda = {} complete".format(lambda_value)) - return f"Lambda = {lambda_value} complete" + return True