Skip to content

Commit

Permalink
enh: request single threaded operations when going into mp.Process (c…
Browse files Browse the repository at this point in the history
…lose #17)
  • Loading branch information
paulmueller committed Oct 8, 2024
1 parent bc03356 commit b41ed45
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 33 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
0.25.3
- enh: request single threaded operations when going into mp.Process (#17)
- enh: new module `dcnum.single_thread_osenv`
0.25.2
- setup: officially support numpy 2
- setup: pin dependencies
Expand Down
10 changes: 10 additions & 0 deletions src/dcnum/feat/feat_background/bg_roll_median.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import numpy as np
from scipy import ndimage

from ...os_env_st import RequestSingleThreaded, confirm_single_threaded

from .base import mp_spawn, Background


Expand Down Expand Up @@ -241,6 +243,8 @@ def __init__(self, job_queue, counter, shared_input, shared_output,

def run(self):
"""Main loop of worker process (breaks when `self.counter` <0)"""
# confirm single-threadedness (prints to log)
confirm_single_threaded()
# Create the ctypes arrays here instead of during __init__, because
# for some reason they are copied in __init__ and not mapped.
shared_input = np.ctypeslib.as_array(
Expand All @@ -261,6 +265,12 @@ def run(self):
with self.counter.get_lock():
self.counter.value += 1

def start(self):
# Set all relevant os environment variables such libraries in the
# new process only use single-threaded computation.
with RequestSingleThreaded():
mp_spawn.Process.start(self)


def compute_median_for_slice(shared_input, shared_output, kernel_size,
output_size, job_slice):
Expand Down
10 changes: 10 additions & 0 deletions src/dcnum/feat/feat_background/bg_sparse_median.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

from ...read import HDF5Data

from ...os_env_st import RequestSingleThreaded, confirm_single_threaded

from .base import mp_spawn, Background


Expand Down Expand Up @@ -436,6 +438,8 @@ def __init__(self, job_queue, counter, shared_input, shared_output,

def run(self):
"""Main loop of worker process (breaks when `self.counter` <0)"""
# confirm single-threadedness (prints to log)
confirm_single_threaded()
# Create the ctypes arrays here instead of during __init__, because
# for some reason they are copied in __init__ and not mapped.
shared_input = np.ctypeslib.as_array(
Expand Down Expand Up @@ -468,3 +472,9 @@ def run(self):
# overwrite_input=False)
with self.counter.get_lock():
self.counter.value += 1

def start(self):
# Set all relevant os environment variables such libraries in the
# new process only use single-threaded computation.
with RequestSingleThreaded():
mp_spawn.Process.start(self)
8 changes: 8 additions & 0 deletions src/dcnum/feat/queue_event_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import numpy as np

from ..os_env_st import RequestSingleThreaded, confirm_single_threaded
from ..meta.ppid import kwargs_to_ppid, ppid_to_kwargs
from ..read import HDF5Data

Expand Down Expand Up @@ -325,6 +326,7 @@ def process_label(self, label, index):

def run(self):
"""Main loop of worker process"""
confirm_single_threaded()
self.worker_monitor[self.worker_index] = 0
# Don't wait for these two queues when joining workers
self.raw_queue.cancel_join_thread()
Expand Down Expand Up @@ -399,6 +401,12 @@ def __init__(self, *args, **kwargs):
super(EventExtractorProcess, self).__init__(
name="EventExtractorProcess", *args, **kwargs)

def start(self):
# Set all relevant os environment variables such libraries in the
# new process only use single-threaded computation.
with RequestSingleThreaded():
mp_spawn.Process.start(self)


class EventExtractorThread(QueueEventExtractor, threading.Thread):
"""Threading worker for debugging (only one single thread)"""
Expand Down
27 changes: 0 additions & 27 deletions src/dcnum/logic/ctrl.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,33 +128,6 @@ def __init__(self,

self.logger = logging.getLogger(f"dcnum.Runner-{self.pphash[:2]}")

# Sanity checks
for os_env in ["MKL_NUM_THREADS", "NUMBA_NUM_THREADS",
"NUMEXPR_NUM_THREADS", "NUMPY_NUM_THREADS",
"OPENBLAS_NUM_THREADS", "OMP_NUM_THREADS",
"VECLIB_MAXIMUM_THREADS"]:
# You should disable multithreading for all major tools that
# use dcnum.logic. We don't want multithreading, because dcnum
# uses linear code and relies on multiprocessing for
# parallelization. This has to be done before importing numpy
# or any other library affected. In your scripts, you can use:
#
# os.environ.setdefault("MKL_NUM_THREADS", "1")
# os.environ.setdefault("NUMBA_NUM_THREADS", "1")
# os.environ.setdefault("NUMEXPR_NUM_THREADS", "1")
# os.environ.setdefault("NUMPY_NUM_THREADS", "1")
# os.environ.setdefault("OPENBLAS_NUM_THREADS", "1")
# os.environ.setdefault("OMP_NUM_THREADS", "1")
# os.environ.setdefault("VECLIB_MAXIMUM_THREADS", "1")
#
val_act = os.environ.get(os_env)
if val_act != "1":
self.logger.warning(
f"Make sure to set the environment variable {os_env} to "
f"'1' (disables multithreading)! Other values will reduce "
f"performance and your system may become inresponsive. "
f"The current value is '{val_act}'.")

def __enter__(self):
return self

Expand Down
85 changes: 85 additions & 0 deletions src/dcnum/os_env_st.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import logging
import os


logger = logging.getLogger(__name__)

#: environment variables that set number of threads
os_env_threading = [
"MKL_NUM_THREADS",
"NUMBA_NUM_THREADS",
"NUMEXPR_NUM_THREADS",
"NUMPY_NUM_THREADS",
"OMP_NUM_THREADS",
"OPENBLAS_NUM_THREADS",
"VECLIB_MAXIMUM_THREADS",
]


class RequestSingleThreaded:
"""Context manager for starting a process with specific environment
When entering the context, the environment variables defined in
`os_env_threading` are all set to "1", telling the relevant libraries
that they should work in single-threaded mode.
When exiting the context, these environment variables are reset to
their original values (or unset if applicable).
Note that it makes only sense to use this context manager when
starting new multiprocessing processes. When the process spawns,
the environment from the current thread is copied. Setting the
environment variable after e.g. importing numpy has no effect
on how many threads numpy will use.
"""
def __init__(self):
self.previous_env = {}

def __enter__(self):
"""Ask nicely for single-threaded computation using `os.environ`
Note that this only affects new processes in which the
relevant libraries have not yet been imported.
"""
for key in os_env_threading:
if key in os.environ:
self.previous_env[key] = os.environ[key]
os.environ[key] = "1"
return self

def __exit__(self, type, value, traceback):
"""Restore the previous environment"""
for key in os_env_threading:
if key not in self.previous_env:
os.environ.pop(key)
else:
os.environ[key] = self.previous_env[key]


def confirm_single_threaded():
"""Warn via logs when environment variables are not set to single thread"""
# Sanity checks
for os_env in os_env_threading:
# You should disable multithreading for all major tools that
# use dcnum.logic. We don't want multithreading, because dcnum
# uses linear code and relies on multiprocessing for
# parallelization. This has to be done before importing numpy
# or any other library affected. In your scripts, you can use:
#
val_act = os.environ.get(os_env)
if val_act != "1":
logger.warning(
f"Make sure to set the environment variable {os_env} to "
f"'1' (disables multithreading)! Other values will reduce "
f"performance and your system may become unresponsive. "
f"The current value is '{val_act}'.")


def request_single_threaded():
"""Set the environment variable to single thread
This function must be called before importing the multithreaded
libraries (such as numpy) in order for them to pick up the
environment variables.
"""
for key in os_env_threading:
os.environ[key] = "1"
11 changes: 10 additions & 1 deletion src/dcnum/segm/segmenter_mpo.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import numpy as np
import scipy.ndimage as ndi

from ..os_env_st import RequestSingleThreaded, confirm_single_threaded

from .segmenter import Segmenter


Expand Down Expand Up @@ -325,7 +327,8 @@ def __init__(self,
self.sl_stop = sl_stop

def run(self):
# print(f"Running {self} in PID {os.getpid()}")
# confirm single-threadedness (prints to log)
confirm_single_threaded()
# We have to create the numpy-versions of the mp.RawArrays here,
# otherwise we only get some kind of copy in the new process
# when we use "spawn" instead of "fork".
Expand Down Expand Up @@ -369,6 +372,12 @@ class MPOSegmenterWorkerProcess(MPOSegmenterWorker, mp_spawn.Process):
def __init__(self, *args, **kwargs):
super(MPOSegmenterWorkerProcess, self).__init__(*args, **kwargs)

def start(self):
# Set all relevant os environment variables such libraries in the
# new process only use single-threaded computation.
with RequestSingleThreaded():
mp_spawn.Process.start(self)


class MPOSegmenterWorkerThread(MPOSegmenterWorker, threading.Thread):
def __init__(self, *args, **kwargs):
Expand Down
7 changes: 2 additions & 5 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,10 @@
import tempfile
import time

from dcnum.os_env_st import request_single_threaded

# Make all libraries use only one single thread.
os.environ.setdefault("OMP_NUM_THREADS", "1")
os.environ.setdefault("MKL_NUM_THREADS", "1")
os.environ.setdefault("NUMEXPR_NUM_THREADS", "1")
os.environ.setdefault("NUMBA_NUM_THREADS", "1")

request_single_threaded()

TMPDIR = tempfile.mkdtemp(prefix=time.strftime(
"dcnum_test_%H.%M_"))
Expand Down

0 comments on commit b41ed45

Please sign in to comment.