Skip to content

Commit

Permalink
ML-1142: Fix completion on error and reject emit after termination. (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Gal Topper authored Oct 6, 2021
1 parent c3ab2d0 commit 5f54572
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 1 deletion.
4 changes: 4 additions & 0 deletions storey/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ async def _do_and_recover(self, event):
if recovery_step is None:
if self.context and hasattr(self.context, 'push_error'):
message = traceback.format_exc()
if event._awaitable_result:
none_or_coroutine = event._awaitable_result._set_error(ex)
if none_or_coroutine:
await none_or_coroutine
if self.logger:
self.logger.error(f'Pushing error to error stream: {ex}\n{message}')
self.context.push_error(event, f"{ex}\n{message}", source=self.name)
Expand Down
14 changes: 14 additions & 0 deletions storey/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,9 @@ def __init__(self, buffer_size: Optional[int] = None, key_field: Union[list, str
self._ex = None
self._closeables = []

def _init(self):
self._is_terminated = False

async def _run_loop(self):
loop = asyncio.get_running_loop()
self._termination_future = asyncio.get_running_loop().create_future()
Expand Down Expand Up @@ -239,6 +242,10 @@ def _raise_on_error(self, ex):
def _emit(self, event):
if event is not _termination_obj:
self._raise_on_error(self._ex)
if self._is_terminated:
raise ValueError('Cannot emit to a terminated flow')
else:
self._is_terminated = True
self._q.put(event)
if event is not _termination_obj:
self._raise_on_error(self._ex)
Expand Down Expand Up @@ -368,6 +375,9 @@ def __init__(self, buffer_size: int = 1024, key_field: Union[list, str, None] =
self._ex = None
self._closeables = []

def _init(self):
self._is_terminated = False

async def _run_loop(self):
while True:
event = await self._q.get()
Expand Down Expand Up @@ -401,6 +411,10 @@ def _raise_on_error(self):
async def _emit(self, event):
if event is not _termination_obj:
self._raise_on_error()
if self._is_terminated:
raise ValueError('Cannot emit to a terminated flow')
else:
self._is_terminated = True
await self._q.put(event)
if event is not _termination_obj:
self._raise_on_error()
Expand Down
2 changes: 1 addition & 1 deletion tests/test_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -944,7 +944,7 @@ def boom(_):
for i in range(2):
try:
awaitable_results.append(controller.emit(0))
except ValueError as ex:
except ValueError:
pass

last_trace_size = None
Expand Down

0 comments on commit 5f54572

Please sign in to comment.