From a57137ab8ed3c4c51e4b631dd9fb9c263844880a Mon Sep 17 00:00:00 2001 From: Rohan McGovern Date: Mon, 6 Jun 2022 09:15:14 +1000 Subject: [PATCH] Fix InvalidStateError log spam in some cancellation scenarios If a future has already been cancelled, we can't set the result or exception. It is not an error if this happens to a future cancelled by the caller, so don't generate ERROR logs in that case. Fixes #337 --- more_executors/_impl/common.py | 36 ++++++++++++++++++++++++---- more_executors/_impl/futures/bool.py | 4 ++-- more_executors/_impl/futures/zip.py | 7 ++++-- more_executors/_impl/map.py | 9 +++++-- more_executors/_impl/poll.py | 11 ++++++--- 5 files changed, 53 insertions(+), 14 deletions(-) diff --git a/more_executors/_impl/common.py b/more_executors/_impl/common.py index 5f1fdc96..df134313 100644 --- a/more_executors/_impl/common.py +++ b/more_executors/_impl/common.py @@ -5,6 +5,15 @@ from .logwrap import LogWrapper +try: + from concurrent.futures import InvalidStateError +except ImportError: # pragma: no cover + # InvalidStateError doesn't exist on older versions. + # Keep code simple by declaring a class of same name (which will never + # have any instances created). + class InvalidStateError(RuntimeError): + pass + LOG = LogWrapper(logging.getLogger("more_executors._Future")) @@ -93,9 +102,26 @@ def copy_exception(future, exception=None, traceback=None): traceback = exc_info[2] try: - future.set_exception_info(exception, traceback) - return - except AttributeError: - pass + try: + future.set_exception_info(exception, traceback) + return + except AttributeError: + pass + + future.set_exception(exception) - future.set_exception(exception) + except InvalidStateError: + # See commentary on try_set_result below. + LOG.debug("%s: can't set exception %s", future, exception, exc_info=True) + + +def try_set_result(future, result): + # Try to set a result on a future, but tolerate an InvalidStateError. + # + # The intended usage of this function is to set results on futures we've + # already handed out to the user and which therefore could potentially + # have been cancelled by them at any time.. + try: + future.set_result(result) + except InvalidStateError: + LOG.debug("%s: can't set result %s", future, result, exc_info=True) diff --git a/more_executors/_impl/futures/bool.py b/more_executors/_impl/futures/bool.py index 422d38b6..13056e61 100644 --- a/more_executors/_impl/futures/bool.py +++ b/more_executors/_impl/futures/bool.py @@ -5,7 +5,7 @@ from concurrent.futures import Future from .base import chain_cancel, weak_callback -from ..common import copy_future_exception +from ..common import copy_future_exception, try_set_result from .check import ensure_futures from ..logwrap import LogWrapper from ..metrics import track_future @@ -44,7 +44,7 @@ def handle_done(self, f): (set_result, set_exception, cancel_futures) = self.get_state_update(f) if set_result: - self.out.set_result(f.result()) + try_set_result(self.out, f.result()) if set_exception: copy_future_exception(f, self.out) diff --git a/more_executors/_impl/futures/zip.py b/more_executors/_impl/futures/zip.py index 21e98125..a4e35711 100644 --- a/more_executors/_impl/futures/zip.py +++ b/more_executors/_impl/futures/zip.py @@ -4,7 +4,10 @@ from functools import partial from collections import namedtuple -from more_executors._impl.common import copy_future_exception +from more_executors._impl.common import ( + copy_future_exception, + try_set_result, +) from .base import f_return, chain_cancel, weak_callback from .check import ensure_futures from ..metrics import track_future @@ -67,7 +70,7 @@ def handle_done(self, index, f): if cancel: self.out.cancel() if set_result: - self.out.set_result(maketuple(self.fs)) + try_set_result(self.out, maketuple(self.fs)) if set_exception: copy_future_exception(f, self.out) diff --git a/more_executors/_impl/map.py b/more_executors/_impl/map.py index f3ca5a3e..fc5e8688 100644 --- a/more_executors/_impl/map.py +++ b/more_executors/_impl/map.py @@ -1,6 +1,11 @@ from concurrent.futures import Executor -from .common import _Future, copy_exception, copy_future_exception +from .common import ( + _Future, + copy_exception, + copy_future_exception, + try_set_result, +) from .wrap import CanCustomizeBind from .metrics import metrics, track_future from .helpers import ShutdownHelper @@ -71,7 +76,7 @@ def _delegate_resolved(self, delegate): copy_exception(self) def _on_mapped(self, result): - self.set_result(result) + try_set_result(self, result) def set_result(self, result): with self._me_lock: diff --git a/more_executors/_impl/poll.py b/more_executors/_impl/poll.py index 511911b8..0288bf02 100644 --- a/more_executors/_impl/poll.py +++ b/more_executors/_impl/poll.py @@ -5,7 +5,13 @@ from monotonic import monotonic -from .common import _Future, MAX_TIMEOUT, copy_exception, copy_future_exception +from .common import ( + _Future, + MAX_TIMEOUT, + copy_exception, + copy_future_exception, + try_set_result, +) from .wrap import CanCustomizeBind from .helpers import executor_loop from .event import get_event, is_shutdown @@ -108,8 +114,7 @@ def yield_result(self, result): result (object): a result to be returned by the future associated with this descriptor """ - - self.__future.set_result(result) + try_set_result(self.__future, result) def yield_exception(self, exception, traceback=None): """The poll function can call this function to make the future raise the given exception.