Skip to content

Commit

Permalink
Introduce 'block' parameter to ThrottleExecutor
Browse files Browse the repository at this point in the history
Blocks on submit rather than enqueuing. Can be used to constrain
memory usage.
  • Loading branch information
rohanpm committed Apr 27, 2022
1 parent cc83fce commit 857642c
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 7 deletions.
27 changes: 24 additions & 3 deletions more_executors/_impl/throttle.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,14 @@ class ThrottleExecutor(CanCustomizeBind, Executor):
- Where `count` is used to initialize this executor, if there
are already `count` futures submitted to the delegate executor and not
yet :meth:`~concurrent.futures.Future.done`, additional callables will
be queued and only submitted to the delegate executor once there are
less than `count` futures in progress.
either be queued or will block on submit, and will only be submitted
to the delegate executor once there are less than `count` futures in
progress.
.. versionadded:: 1.9.0
"""

def __init__(self, delegate, count, logger=None, name="default"):
def __init__(self, delegate, count, logger=None, name="default", block=False):
"""
Parameters:
delegate (~concurrent.futures.Executor):
Expand All @@ -84,6 +85,13 @@ def __init__(self, delegate, count, logger=None, name="default"):
.. versionadded:: 2.5.0
block (bool)
If ``True``, calls to ``submit()`` on this executor may block if
there are already ``count`` futures in progress.
Otherwise, calls to ``submit()`` will always return immediately
and callables will be queued internally.
logger (~logging.Logger):
a logger used for messages from this executor
Expand All @@ -92,10 +100,14 @@ def __init__(self, delegate, count, logger=None, name="default"):
.. versionchanged:: 2.7.0
Introduced ``name``.
.. versionchanged:: 2.11.0
Introduced ``block``.
"""
self._log = LogWrapper(
logger if logger else logging.getLogger("ThrottleExecutor")
)
self._block = block
self._name = name
self._delegate = delegate
self._to_submit = deque()
Expand All @@ -120,6 +132,8 @@ def __init__(self, delegate, count, logger=None, name="default"):

def submit(self, fn, *args, **kwargs): # pylint: disable=arguments-differ
with self._shutdown.ensure_alive():
self._block_until_ready(self._eval_throttle())

out = ThrottleFuture(self)
track_future(out, type="throttle", executor=self._name)

Expand All @@ -140,6 +154,13 @@ def shutdown(self, wait=True, **_kwargs):
if wait:
self._thread.join(MAX_TIMEOUT)

def _block_until_ready(self, throttle_val):
while self._block and not self._shutdown.is_shutdown:
if len(self._to_submit) < throttle_val:
return
self._log.debug("%s: throttling on submit", self._name)
self._event.wait(30.0)

def _eval_throttle(self):
try:
self._last_throttle = self._throttle()
Expand Down
2 changes: 2 additions & 0 deletions more_executors/_impl/throttle.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class ThrottleExecutor(ExecutorProtocol):
count: int | Callable[[], int],
logger: logging.Logger | None = ...,
name: str = ...,
block: bool = ...,
): ...
def __enter__(self) -> ThrottleExecutor: ...

Expand All @@ -25,5 +26,6 @@ class TypedThrottleExecutor(TypedExecutorProtocol[A, B]):
count: int | Callable[[], int],
logger: logging.Logger | None = ...,
name: str = ...,
block: bool = ...,
): ...
def __enter__(self) -> TypedThrottleExecutor[A, B]: ...
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def get_install_requires():

setup(
name="more-executors",
version="2.10.1",
version="2.11.0",
author="Rohan McGovern",
author_email="[email protected]",
packages=find_packages(exclude=["tests", "tests.*"]),
Expand Down
11 changes: 8 additions & 3 deletions tests/test_throttle.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import pytest

from threading import Lock
import time

Expand All @@ -6,7 +8,8 @@
from more_executors import Executors, ThrottleExecutor


def test_throttle():
@pytest.mark.parametrize("block", [True, False])
def test_throttle(block):
THREADS = 8
COUNT = 3
samples = []
Expand All @@ -27,7 +30,9 @@ def record(x):
running_now.remove(x)

futures = []
executor = ThrottleExecutor(Executors.thread_pool(max_workers=THREADS), count=COUNT)
executor = ThrottleExecutor(
Executors.thread_pool(max_workers=THREADS), count=COUNT, block=block
)
with executor:
for i in range(0, 1000):
future = executor.submit(record, i)
Expand All @@ -51,6 +56,6 @@ def record(x):

def test_with_throttle():
assert_that(
Executors.sync(name="throttle-test").with_throttle(4),
Executors.sync(name="throttle-test").with_throttle(4, block=True),
instance_of(ThrottleExecutor),
)

0 comments on commit 857642c

Please sign in to comment.