From b41ed45df34d799e469be8cb06abcf3a5e03cf00 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Paul=20M=C3=BCller?= Date: Wed, 9 Oct 2024 00:29:20 +0200 Subject: [PATCH] enh: request single threaded operations when going into mp.Process (close #17) --- CHANGELOG | 3 + .../feat/feat_background/bg_roll_median.py | 10 +++ .../feat/feat_background/bg_sparse_median.py | 10 +++ src/dcnum/feat/queue_event_extractor.py | 8 ++ src/dcnum/logic/ctrl.py | 27 ------ src/dcnum/os_env_st.py | 85 +++++++++++++++++++ src/dcnum/segm/segmenter_mpo.py | 11 ++- tests/conftest.py | 7 +- 8 files changed, 128 insertions(+), 33 deletions(-) create mode 100644 src/dcnum/os_env_st.py diff --git a/CHANGELOG b/CHANGELOG index f1a9bcb..3648e66 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,3 +1,6 @@ +0.25.3 + - enh: request single threaded operations when going into mp.Process (#17) + - enh: new module `dcnum.single_thread_osenv` 0.25.2 - setup: officially support numpy 2 - setup: pin dependencies diff --git a/src/dcnum/feat/feat_background/bg_roll_median.py b/src/dcnum/feat/feat_background/bg_roll_median.py index 510af5c..598f655 100644 --- a/src/dcnum/feat/feat_background/bg_roll_median.py +++ b/src/dcnum/feat/feat_background/bg_roll_median.py @@ -4,6 +4,8 @@ import numpy as np from scipy import ndimage +from ...os_env_st import RequestSingleThreaded, confirm_single_threaded + from .base import mp_spawn, Background @@ -241,6 +243,8 @@ def __init__(self, job_queue, counter, shared_input, shared_output, def run(self): """Main loop of worker process (breaks when `self.counter` <0)""" + # confirm single-threadedness (prints to log) + confirm_single_threaded() # Create the ctypes arrays here instead of during __init__, because # for some reason they are copied in __init__ and not mapped. shared_input = np.ctypeslib.as_array( @@ -261,6 +265,12 @@ def run(self): with self.counter.get_lock(): self.counter.value += 1 + def start(self): + # Set all relevant os environment variables such libraries in the + # new process only use single-threaded computation. + with RequestSingleThreaded(): + mp_spawn.Process.start(self) + def compute_median_for_slice(shared_input, shared_output, kernel_size, output_size, job_slice): diff --git a/src/dcnum/feat/feat_background/bg_sparse_median.py b/src/dcnum/feat/feat_background/bg_sparse_median.py index a72b4ab..4624ae4 100644 --- a/src/dcnum/feat/feat_background/bg_sparse_median.py +++ b/src/dcnum/feat/feat_background/bg_sparse_median.py @@ -6,6 +6,8 @@ from ...read import HDF5Data +from ...os_env_st import RequestSingleThreaded, confirm_single_threaded + from .base import mp_spawn, Background @@ -436,6 +438,8 @@ def __init__(self, job_queue, counter, shared_input, shared_output, def run(self): """Main loop of worker process (breaks when `self.counter` <0)""" + # confirm single-threadedness (prints to log) + confirm_single_threaded() # Create the ctypes arrays here instead of during __init__, because # for some reason they are copied in __init__ and not mapped. shared_input = np.ctypeslib.as_array( @@ -468,3 +472,9 @@ def run(self): # overwrite_input=False) with self.counter.get_lock(): self.counter.value += 1 + + def start(self): + # Set all relevant os environment variables such libraries in the + # new process only use single-threaded computation. + with RequestSingleThreaded(): + mp_spawn.Process.start(self) diff --git a/src/dcnum/feat/queue_event_extractor.py b/src/dcnum/feat/queue_event_extractor.py index 7a860ab..28cfb01 100644 --- a/src/dcnum/feat/queue_event_extractor.py +++ b/src/dcnum/feat/queue_event_extractor.py @@ -10,6 +10,7 @@ import numpy as np +from ..os_env_st import RequestSingleThreaded, confirm_single_threaded from ..meta.ppid import kwargs_to_ppid, ppid_to_kwargs from ..read import HDF5Data @@ -325,6 +326,7 @@ def process_label(self, label, index): def run(self): """Main loop of worker process""" + confirm_single_threaded() self.worker_monitor[self.worker_index] = 0 # Don't wait for these two queues when joining workers self.raw_queue.cancel_join_thread() @@ -399,6 +401,12 @@ def __init__(self, *args, **kwargs): super(EventExtractorProcess, self).__init__( name="EventExtractorProcess", *args, **kwargs) + def start(self): + # Set all relevant os environment variables such libraries in the + # new process only use single-threaded computation. + with RequestSingleThreaded(): + mp_spawn.Process.start(self) + class EventExtractorThread(QueueEventExtractor, threading.Thread): """Threading worker for debugging (only one single thread)""" diff --git a/src/dcnum/logic/ctrl.py b/src/dcnum/logic/ctrl.py index 46a4c32..4a12121 100644 --- a/src/dcnum/logic/ctrl.py +++ b/src/dcnum/logic/ctrl.py @@ -128,33 +128,6 @@ def __init__(self, self.logger = logging.getLogger(f"dcnum.Runner-{self.pphash[:2]}") - # Sanity checks - for os_env in ["MKL_NUM_THREADS", "NUMBA_NUM_THREADS", - "NUMEXPR_NUM_THREADS", "NUMPY_NUM_THREADS", - "OPENBLAS_NUM_THREADS", "OMP_NUM_THREADS", - "VECLIB_MAXIMUM_THREADS"]: - # You should disable multithreading for all major tools that - # use dcnum.logic. We don't want multithreading, because dcnum - # uses linear code and relies on multiprocessing for - # parallelization. This has to be done before importing numpy - # or any other library affected. In your scripts, you can use: - # - # os.environ.setdefault("MKL_NUM_THREADS", "1") - # os.environ.setdefault("NUMBA_NUM_THREADS", "1") - # os.environ.setdefault("NUMEXPR_NUM_THREADS", "1") - # os.environ.setdefault("NUMPY_NUM_THREADS", "1") - # os.environ.setdefault("OPENBLAS_NUM_THREADS", "1") - # os.environ.setdefault("OMP_NUM_THREADS", "1") - # os.environ.setdefault("VECLIB_MAXIMUM_THREADS", "1") - # - val_act = os.environ.get(os_env) - if val_act != "1": - self.logger.warning( - f"Make sure to set the environment variable {os_env} to " - f"'1' (disables multithreading)! Other values will reduce " - f"performance and your system may become inresponsive. " - f"The current value is '{val_act}'.") - def __enter__(self): return self diff --git a/src/dcnum/os_env_st.py b/src/dcnum/os_env_st.py new file mode 100644 index 0000000..e694839 --- /dev/null +++ b/src/dcnum/os_env_st.py @@ -0,0 +1,85 @@ +import logging +import os + + +logger = logging.getLogger(__name__) + +#: environment variables that set number of threads +os_env_threading = [ + "MKL_NUM_THREADS", + "NUMBA_NUM_THREADS", + "NUMEXPR_NUM_THREADS", + "NUMPY_NUM_THREADS", + "OMP_NUM_THREADS", + "OPENBLAS_NUM_THREADS", + "VECLIB_MAXIMUM_THREADS", +] + + +class RequestSingleThreaded: + """Context manager for starting a process with specific environment + + When entering the context, the environment variables defined in + `os_env_threading` are all set to "1", telling the relevant libraries + that they should work in single-threaded mode. + When exiting the context, these environment variables are reset to + their original values (or unset if applicable). + + Note that it makes only sense to use this context manager when + starting new multiprocessing processes. When the process spawns, + the environment from the current thread is copied. Setting the + environment variable after e.g. importing numpy has no effect + on how many threads numpy will use. + """ + def __init__(self): + self.previous_env = {} + + def __enter__(self): + """Ask nicely for single-threaded computation using `os.environ` + + Note that this only affects new processes in which the + relevant libraries have not yet been imported. + """ + for key in os_env_threading: + if key in os.environ: + self.previous_env[key] = os.environ[key] + os.environ[key] = "1" + return self + + def __exit__(self, type, value, traceback): + """Restore the previous environment""" + for key in os_env_threading: + if key not in self.previous_env: + os.environ.pop(key) + else: + os.environ[key] = self.previous_env[key] + + +def confirm_single_threaded(): + """Warn via logs when environment variables are not set to single thread""" + # Sanity checks + for os_env in os_env_threading: + # You should disable multithreading for all major tools that + # use dcnum.logic. We don't want multithreading, because dcnum + # uses linear code and relies on multiprocessing for + # parallelization. This has to be done before importing numpy + # or any other library affected. In your scripts, you can use: + # + val_act = os.environ.get(os_env) + if val_act != "1": + logger.warning( + f"Make sure to set the environment variable {os_env} to " + f"'1' (disables multithreading)! Other values will reduce " + f"performance and your system may become unresponsive. " + f"The current value is '{val_act}'.") + + +def request_single_threaded(): + """Set the environment variable to single thread + + This function must be called before importing the multithreaded + libraries (such as numpy) in order for them to pick up the + environment variables. + """ + for key in os_env_threading: + os.environ[key] = "1" diff --git a/src/dcnum/segm/segmenter_mpo.py b/src/dcnum/segm/segmenter_mpo.py index e3938f9..4bbfb22 100644 --- a/src/dcnum/segm/segmenter_mpo.py +++ b/src/dcnum/segm/segmenter_mpo.py @@ -7,6 +7,8 @@ import numpy as np import scipy.ndimage as ndi +from ..os_env_st import RequestSingleThreaded, confirm_single_threaded + from .segmenter import Segmenter @@ -325,7 +327,8 @@ def __init__(self, self.sl_stop = sl_stop def run(self): - # print(f"Running {self} in PID {os.getpid()}") + # confirm single-threadedness (prints to log) + confirm_single_threaded() # We have to create the numpy-versions of the mp.RawArrays here, # otherwise we only get some kind of copy in the new process # when we use "spawn" instead of "fork". @@ -369,6 +372,12 @@ class MPOSegmenterWorkerProcess(MPOSegmenterWorker, mp_spawn.Process): def __init__(self, *args, **kwargs): super(MPOSegmenterWorkerProcess, self).__init__(*args, **kwargs) + def start(self): + # Set all relevant os environment variables such libraries in the + # new process only use single-threaded computation. + with RequestSingleThreaded(): + mp_spawn.Process.start(self) + class MPOSegmenterWorkerThread(MPOSegmenterWorker, threading.Thread): def __init__(self, *args, **kwargs): diff --git a/tests/conftest.py b/tests/conftest.py index a97ecfa..999ba19 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -4,13 +4,10 @@ import tempfile import time +from dcnum.os_env_st import request_single_threaded -# Make all libraries use only one single thread. -os.environ.setdefault("OMP_NUM_THREADS", "1") -os.environ.setdefault("MKL_NUM_THREADS", "1") -os.environ.setdefault("NUMEXPR_NUM_THREADS", "1") -os.environ.setdefault("NUMBA_NUM_THREADS", "1") +request_single_threaded() TMPDIR = tempfile.mkdtemp(prefix=time.strftime( "dcnum_test_%H.%M_"))