Skip to content

Commit

Permalink
Fix completion on error in concurrent execution step. (#308)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gal Topper authored Nov 4, 2021
1 parent 8cdccef commit b970e81
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 0 deletions.
5 changes: 5 additions & 0 deletions storey/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,7 @@ def _init(self):
self._q = None

async def _worker(self):
event = None
try:
while True:
job = await self._q.get()
Expand All @@ -620,6 +621,10 @@ async def _worker(self):
completed = await job[1]
await self._handle_completed(event, completed)
except BaseException as ex:
if event:
none_or_coroutine = event._awaitable_result._set_error(ex)
if none_or_coroutine:
await none_or_coroutine
if not self._q.empty():
await self._q.get()
raise ex
Expand Down
24 changes: 24 additions & 0 deletions tests/test_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
Event, Batch, Table, CSVTarget, DataframeSource, MapClass, JoinWithTable, ReduceToDataFrame, ToDataFrame, \
ParquetTarget, QueryByKey, \
TSDBTarget, Extend, SendToHttp, HttpRequest, NoSqlTarget, NoopDriver, Driver, Recover, V3ioDriver, ParquetSource
from storey.flow import _ConcurrentJobExecution


class ATestException(Exception):
Expand Down Expand Up @@ -2775,3 +2776,26 @@ def my_func(param1, param2):
controller.await_termination()

assert len(dictionary) == 1


class _ErrorInConcurrentExecution(_ConcurrentJobExecution):
async def _process_event(self, event):
pass

async def _handle_completed(self, event, response):
raise ATestException()


def test_completion_on_error_in_concurrent_execution_step():
controller = build_flow([
SyncEmitSource(),
_ErrorInConcurrentExecution(),
Complete()
]).run()

awaitable_result = controller.emit(1)
try:
with pytest.raises(ATestException):
awaitable_result.await_result()
finally:
controller.terminate()

0 comments on commit b970e81

Please sign in to comment.