From 5f545722fd3490d71439912b70faa7e0156f1a38 Mon Sep 17 00:00:00 2001 From: Gal Topper Date: Wed, 6 Oct 2021 15:19:31 +0300 Subject: [PATCH] ML-1142: Fix completion on error and reject emit after termination. (#296) --- storey/flow.py | 4 ++++ storey/sources.py | 14 ++++++++++++++ tests/test_flow.py | 2 +- 3 files changed, 19 insertions(+), 1 deletion(-) diff --git a/storey/flow.py b/storey/flow.py index 86ac5111..2e1e6223 100644 --- a/storey/flow.py +++ b/storey/flow.py @@ -145,6 +145,10 @@ async def _do_and_recover(self, event): if recovery_step is None: if self.context and hasattr(self.context, 'push_error'): message = traceback.format_exc() + if event._awaitable_result: + none_or_coroutine = event._awaitable_result._set_error(ex) + if none_or_coroutine: + await none_or_coroutine if self.logger: self.logger.error(f'Pushing error to error stream: {ex}\n{message}') self.context.push_error(event, f"{ex}\n{message}", source=self.name) diff --git a/storey/sources.py b/storey/sources.py index a7e09e94..b553d90c 100644 --- a/storey/sources.py +++ b/storey/sources.py @@ -197,6 +197,9 @@ def __init__(self, buffer_size: Optional[int] = None, key_field: Union[list, str self._ex = None self._closeables = [] + def _init(self): + self._is_terminated = False + async def _run_loop(self): loop = asyncio.get_running_loop() self._termination_future = asyncio.get_running_loop().create_future() @@ -239,6 +242,10 @@ def _raise_on_error(self, ex): def _emit(self, event): if event is not _termination_obj: self._raise_on_error(self._ex) + if self._is_terminated: + raise ValueError('Cannot emit to a terminated flow') + else: + self._is_terminated = True self._q.put(event) if event is not _termination_obj: self._raise_on_error(self._ex) @@ -368,6 +375,9 @@ def __init__(self, buffer_size: int = 1024, key_field: Union[list, str, None] = self._ex = None self._closeables = [] + def _init(self): + self._is_terminated = False + async def _run_loop(self): while True: event = await self._q.get() @@ -401,6 +411,10 @@ def _raise_on_error(self): async def _emit(self, event): if event is not _termination_obj: self._raise_on_error() + if self._is_terminated: + raise ValueError('Cannot emit to a terminated flow') + else: + self._is_terminated = True await self._q.put(event) if event is not _termination_obj: self._raise_on_error() diff --git a/tests/test_flow.py b/tests/test_flow.py index 01dec6f1..3f00a3bc 100644 --- a/tests/test_flow.py +++ b/tests/test_flow.py @@ -944,7 +944,7 @@ def boom(_): for i in range(2): try: awaitable_results.append(controller.emit(0)) - except ValueError as ex: + except ValueError: pass last_trace_size = None