Skip to content

Commit

Permalink
enh: introduce DCNumJobRunner.error_tb for errors happening in threads
Browse files Browse the repository at this point in the history
  • Loading branch information
paulmueller committed Dec 16, 2023
1 parent 8f5e6ef commit 915280c
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
- ref: rename event_count with image_count in background computation
- ref: do not print anything to stdout when computing background data
- ref: use data from background computer in DCNumJobRunner.get_status
- enh: introduce DCNumJobRunner.error_tb for errors happening in threads
- enh: improve logging verbosity
- enh: append job information as log entry in DCNumJobRunner output file
- enh: set chunk size for all feature data to 1MiB in HDF5Writer
Expand Down
17 changes: 15 additions & 2 deletions src/dcnum/logic/ctrl.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import socket
import threading
import time
import traceback
import uuid

import hdf5plugin
Expand Down Expand Up @@ -52,6 +53,7 @@ def __init__(self,
(defaults to hostname)
"""
super(DCNumJobRunner, self).__init__(*args, **kwargs)
self.error_tb = None
self.job = job
if tmp_suffix is None:
tmp_suffix = f"{socket.gethostname()}_{str(uuid.uuid4())[:5]}"
Expand Down Expand Up @@ -201,10 +203,10 @@ def close(self, delete_temporary_files=True):
# We don't have to delete self.path_temp_out, since this one
# is `rename`d to `self.jon["path_out"]`.

def join(self, *args, **kwargs):
def join(self, delete_temporary_files=True, *args, **kwargs):
super(DCNumJobRunner, self).join(*args, **kwargs)
# Close only after join
self.close()
self.close(delete_temporary_files=delete_temporary_files)

def get_status(self):
bgpart = .1 # fraction of background
Expand All @@ -226,6 +228,17 @@ def get_status(self):
}

def run(self):
try:
self.run_pipeline()
except BaseException:
self._state = "error"
self.error_tb = traceback.format_exc()
if not self.is_alive():
# Thread has not been started. This means we are not running
# in a thread but in the main process. Raise the exception.
raise

def run_pipeline(self):
"""Execute the pipeline job"""
time_start = time.monotonic()
time_string = time.strftime("%Y-%m-%d-%H.%M.%S", time.gmtime())
Expand Down
17 changes: 17 additions & 0 deletions tests/test_logic_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,23 @@ def test_error_file_exists():
runner.run()


def test_error_file_exists_in_thread():
path_orig = retrieve_data("fmt-hdf5_cytoshot_full-features_2023.zip")
path = path_orig.with_name("input.rtdc")
with read.concatenated_hdf5_data(5 * [path_orig], path_out=path):
pass
path_out = path.with_name("test_out.rtdc")
job = logic.DCNumPipelineJob(path_in=path,
path_out=path_out,
debug=True)
path_out.touch()
runner = logic.DCNumJobRunner(job=job)
runner.start()
runner.join()
assert runner.error_tb is not None
assert "FileExistsError" in runner.error_tb


def test_error_pipeline_log_file_remains():
path_orig = retrieve_data("fmt-hdf5_cytoshot_full-features_2023.zip")
path = path_orig.with_name("input.rtdc")
Expand Down

0 comments on commit 915280c

Please sign in to comment.