diff --git a/README.md b/README.md index c7f24027..05703883 100644 --- a/README.md +++ b/README.md @@ -19,6 +19,7 @@ extend the behavior of `Future` objects. - Futures with implicit retry - Futures with implicit cancel on executor shutdown - Futures with transformed output values +- Futures with a default timeout - Futures resolved by a caller-provided polling function - Synchronous executor - Convenience API for creating executors diff --git a/more_executors/__init__.py b/more_executors/__init__.py index 79fd9970..40152ce8 100644 --- a/more_executors/__init__.py +++ b/more_executors/__init__.py @@ -14,6 +14,6 @@ This documentation was built from an unknown revision. """ -__all__ = ['map', 'retry', 'poll', 'sync', 'cancel_on_shutdown', 'Executors'] +__all__ = ['map', 'retry', 'poll', 'sync', 'timeout', 'cancel_on_shutdown', 'Executors'] from more_executors._executors import Executors diff --git a/more_executors/_executors.py b/more_executors/_executors.py index d51c64f9..0b587bb0 100644 --- a/more_executors/_executors.py +++ b/more_executors/_executors.py @@ -4,6 +4,7 @@ from more_executors.map import MapExecutor from more_executors.retry import RetryExecutor, ExceptionRetryPolicy from more_executors.poll import PollExecutor +from more_executors.timeout import TimeoutExecutor from more_executors.cancel_on_shutdown import CancelOnShutdownExecutor from more_executors.sync import SyncExecutor @@ -25,6 +26,7 @@ class Executors(object): 'with_retry', 'with_map', 'with_poll', + 'with_timeout', 'with_cancel_on_shutdown', ] @@ -91,6 +93,19 @@ def with_poll(cls, executor, fn, cancel_fn=None, default_interval=5.0): - `default_interval`: default interval between polls, in seconds.""" return cls.wrap(PollExecutor(executor, fn, cancel_fn, default_interval)) + @classmethod + def with_timeout(cls, executor, timeout): + """Wrap an executor in a `more_executors.timeout.TimeoutExecutor`. + + Returned futures will have a default timeout applied on calls to + `future.result()` and `future.exception()`. + + - `timeout`: default timeout, in seconds (float) + + *Since version 1.6.0* + """ + return cls.wrap(TimeoutExecutor(executor, timeout)) + @classmethod def with_cancel_on_shutdown(cls, executor): """Wrap an executor in a `more_executors.cancel_on_shutdown.CancelOnShutdownExecutor`. diff --git a/more_executors/timeout.py b/more_executors/timeout.py new file mode 100644 index 00000000..ab6a8af4 --- /dev/null +++ b/more_executors/timeout.py @@ -0,0 +1,53 @@ +"""Create futures with a default timeout.""" +from concurrent.futures import Executor + +from more_executors.map import _MapFuture + +__pdoc__ = {} +__pdoc__['TimeoutExecutor.map'] = None +__pdoc__['TimeoutExecutor.shutdown'] = None +__pdoc__['TimeoutExecutor.submit'] = None + + +class _TimeoutFuture(_MapFuture): + def __init__(self, delegate, timeout): + super(_TimeoutFuture, self).__init__(delegate, lambda x: x) + self._timeout = timeout + + def result(self, timeout=None): + if timeout is None: + timeout = self._timeout + return super(_TimeoutFuture, self).result(timeout) + + def exception(self, timeout=None): + if timeout is None: + timeout = self._timeout + return super(_TimeoutFuture, self).exception(timeout) + + +class TimeoutExecutor(Executor): + """An `Executor` which delegates to another `Executor` while adding + default timeouts to each returned future. + + Note that the default timeouts only apply to the `future.result()` and + `future.exception()` methods. Other methods of waiting on futures, + such as `concurrent.futures.wait()`, will not be affected. + + *Since version 1.6.0* + """ + def __init__(self, delegate, timeout): + """Create a new executor. + + - `delegate`: the delegate executor to which callables are submitted. + - `timeout`: the default timeout applied to any calls to `future.result()` + or `future.exception()`, where a timeout has not been provided. + """ + self._delegate = delegate + self._timeout = timeout + + def submit(self, fn, *args, **kwargs): + future = self._delegate.submit(fn, *args, **kwargs) + return _TimeoutFuture(future, self._timeout) + + def shutdown(self, wait=True): + self._delegate.shutdown(wait) diff --git a/tests/test_executor.py b/tests/test_executor.py index 5c189d9b..90e809ff 100644 --- a/tests/test_executor.py +++ b/tests/test_executor.py @@ -66,6 +66,11 @@ def retry_map_executor(threadpool_executor): return threadpool_executor.with_map(map_noop).with_retry(RetryPolicy()) +@fixture +def timeout_executor(threadpool_executor): + return threadpool_executor.with_timeout(60.0) + + @fixture def cancel_poll_map_retry_executor(threadpool_executor): return threadpool_executor.\ @@ -121,10 +126,12 @@ def everything_executor(base_executor): with_cancel_on_shutdown().\ with_retry(RetryPolicy()).\ with_retry(RetryPolicy()).\ + with_timeout(120.0).\ with_poll(poll_noop).\ with_poll(poll_noop).\ with_cancel_on_shutdown().\ with_map(map_noop).\ + with_timeout(180.0).\ with_map(map_noop).\ with_retry(RetryPolicy()) @@ -140,7 +147,7 @@ def everything_threadpool_executor(threadpool_executor): @fixture(params=['threadpool', 'retry', 'map', 'retry_map', 'map_retry', 'poll', 'retry_map_poll', - 'sync', 'cancel_poll_map_retry', 'cancel_retry_map_poll', + 'sync', 'timeout', 'cancel_poll_map_retry', 'cancel_retry_map_poll', 'everything_sync', 'everything_threadpool']) def any_executor(request): ex = request.getfixturevalue(request.param + '_executor') diff --git a/tests/test_timeout.py b/tests/test_timeout.py new file mode 100644 index 00000000..d3ff3b6d --- /dev/null +++ b/tests/test_timeout.py @@ -0,0 +1,31 @@ +from concurrent.futures import TimeoutError +from hamcrest import assert_that, equal_to, is_, calling, raises +from pytest import fixture +import time + +from more_executors._executors import Executors + + +@fixture +def executor(): + return Executors.thread_pool().with_timeout(0.01) + + +def test_basic_timeout(executor): + def fn(sleep_time, retval): + time.sleep(sleep_time) + return retval + + f1 = executor.submit(fn, 1.0, 'abc') + f2 = executor.submit(fn, 1.0, 'def') + + # Default should time out + assert_that(calling(f1.result), raises(TimeoutError)) + assert_that(calling(f2.exception), raises(TimeoutError)) + + # But specifying a value should make it work + assert_that(f1.result(2.0), equal_to('abc')) + assert_that(f1.exception(), is_(None)) + + assert_that(f2.exception(2.0), is_(None)) + assert_that(f2.result(), equal_to('def'))