Skip to content

Commit

Permalink
Introduce TimeoutExecutor
Browse files Browse the repository at this point in the history
Applies a default timeout on all returned futures.

Fixes #43
  • Loading branch information
rohanpm committed Mar 22, 2018
1 parent f102701 commit 0109a66
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 2 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ extend the behavior of `Future` objects.
- Futures with implicit retry
- Futures with implicit cancel on executor shutdown
- Futures with transformed output values
- Futures with a default timeout
- Futures resolved by a caller-provided polling function
- Synchronous executor
- Convenience API for creating executors
Expand Down
2 changes: 1 addition & 1 deletion more_executors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@
This documentation was built from an unknown revision.
"""

__all__ = ['map', 'retry', 'poll', 'sync', 'cancel_on_shutdown', 'Executors']
__all__ = ['map', 'retry', 'poll', 'sync', 'timeout', 'cancel_on_shutdown', 'Executors']

from more_executors._executors import Executors
15 changes: 15 additions & 0 deletions more_executors/_executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from more_executors.map import MapExecutor
from more_executors.retry import RetryExecutor, ExceptionRetryPolicy
from more_executors.poll import PollExecutor
from more_executors.timeout import TimeoutExecutor
from more_executors.cancel_on_shutdown import CancelOnShutdownExecutor
from more_executors.sync import SyncExecutor

Expand All @@ -25,6 +26,7 @@ class Executors(object):
'with_retry',
'with_map',
'with_poll',
'with_timeout',
'with_cancel_on_shutdown',
]

Expand Down Expand Up @@ -91,6 +93,19 @@ def with_poll(cls, executor, fn, cancel_fn=None, default_interval=5.0):
- `default_interval`: default interval between polls, in seconds."""
return cls.wrap(PollExecutor(executor, fn, cancel_fn, default_interval))

@classmethod
def with_timeout(cls, executor, timeout):
"""Wrap an executor in a `more_executors.timeout.TimeoutExecutor`.
Returned futures will have a default timeout applied on calls to
`future.result()` and `future.exception()`.
- `timeout`: default timeout, in seconds (float)
*Since version 1.6.0*
"""
return cls.wrap(TimeoutExecutor(executor, timeout))

@classmethod
def with_cancel_on_shutdown(cls, executor):
"""Wrap an executor in a `more_executors.cancel_on_shutdown.CancelOnShutdownExecutor`.
Expand Down
53 changes: 53 additions & 0 deletions more_executors/timeout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
"""Create futures with a default timeout."""
from concurrent.futures import Executor

from more_executors.map import _MapFuture

__pdoc__ = {}
__pdoc__['TimeoutExecutor.map'] = None
__pdoc__['TimeoutExecutor.shutdown'] = None
__pdoc__['TimeoutExecutor.submit'] = None


class _TimeoutFuture(_MapFuture):
def __init__(self, delegate, timeout):
super(_TimeoutFuture, self).__init__(delegate, lambda x: x)
self._timeout = timeout

def result(self, timeout=None):
if timeout is None:
timeout = self._timeout
return super(_TimeoutFuture, self).result(timeout)

def exception(self, timeout=None):
if timeout is None:
timeout = self._timeout
return super(_TimeoutFuture, self).exception(timeout)


class TimeoutExecutor(Executor):
"""An `Executor` which delegates to another `Executor` while adding
default timeouts to each returned future.
Note that the default timeouts only apply to the `future.result()` and
`future.exception()` methods. Other methods of waiting on futures,
such as `concurrent.futures.wait()`, will not be affected.
*Since version 1.6.0*
"""
def __init__(self, delegate, timeout):
"""Create a new executor.
- `delegate`: the delegate executor to which callables are submitted.
- `timeout`: the default timeout applied to any calls to `future.result()`
or `future.exception()`, where a timeout has not been provided.
"""
self._delegate = delegate
self._timeout = timeout

def submit(self, fn, *args, **kwargs):
future = self._delegate.submit(fn, *args, **kwargs)
return _TimeoutFuture(future, self._timeout)

def shutdown(self, wait=True):
self._delegate.shutdown(wait)
9 changes: 8 additions & 1 deletion tests/test_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ def retry_map_executor(threadpool_executor):
return threadpool_executor.with_map(map_noop).with_retry(RetryPolicy())


@fixture
def timeout_executor(threadpool_executor):
return threadpool_executor.with_timeout(60.0)


@fixture
def cancel_poll_map_retry_executor(threadpool_executor):
return threadpool_executor.\
Expand Down Expand Up @@ -121,10 +126,12 @@ def everything_executor(base_executor):
with_cancel_on_shutdown().\
with_retry(RetryPolicy()).\
with_retry(RetryPolicy()).\
with_timeout(120.0).\
with_poll(poll_noop).\
with_poll(poll_noop).\
with_cancel_on_shutdown().\
with_map(map_noop).\
with_timeout(180.0).\
with_map(map_noop).\
with_retry(RetryPolicy())

Expand All @@ -140,7 +147,7 @@ def everything_threadpool_executor(threadpool_executor):


@fixture(params=['threadpool', 'retry', 'map', 'retry_map', 'map_retry', 'poll', 'retry_map_poll',
'sync', 'cancel_poll_map_retry', 'cancel_retry_map_poll',
'sync', 'timeout', 'cancel_poll_map_retry', 'cancel_retry_map_poll',
'everything_sync', 'everything_threadpool'])
def any_executor(request):
ex = request.getfixturevalue(request.param + '_executor')
Expand Down
31 changes: 31 additions & 0 deletions tests/test_timeout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from concurrent.futures import TimeoutError
from hamcrest import assert_that, equal_to, is_, calling, raises
from pytest import fixture
import time

from more_executors._executors import Executors


@fixture
def executor():
return Executors.thread_pool().with_timeout(0.01)


def test_basic_timeout(executor):
def fn(sleep_time, retval):
time.sleep(sleep_time)
return retval

f1 = executor.submit(fn, 1.0, 'abc')
f2 = executor.submit(fn, 1.0, 'def')

# Default should time out
assert_that(calling(f1.result), raises(TimeoutError))
assert_that(calling(f2.exception), raises(TimeoutError))

# But specifying a value should make it work
assert_that(f1.result(2.0), equal_to('abc'))
assert_that(f1.exception(), is_(None))

assert_that(f2.exception(2.0), is_(None))
assert_that(f2.result(), equal_to('def'))

0 comments on commit 0109a66

Please sign in to comment.