Skip to content

Commit

Permalink
ML-493: fix in 2 consecutive QueryByKey steps with non existing column (
Browse files Browse the repository at this point in the history
  • Loading branch information
katyakats authored May 4, 2021
1 parent 1b264af commit b7a0c88
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 3 deletions.
5 changes: 3 additions & 2 deletions storey/aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,8 +266,9 @@ async def _do(self, event):
element = event.body
key = event.key
if self.key_extractor:
key = self.key_extractor(element)
if key is None:
if element:
key = self.key_extractor(element)
if key is None or element is None:
event.body = None
await self._do_downstream(event)
return
Expand Down
3 changes: 2 additions & 1 deletion tests/test_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2392,7 +2392,7 @@ def test_complete_in_error_flow():


def test_non_existing_key_query_by_key():
df = pd.DataFrame([['katya', 'green'], ['dina', 'blue']], columns=['name', 'color'])
df = pd.DataFrame([['katya', 'green', 'hod hasharon'], ['dina', 'blue', 'ramat gan']], columns=['name', 'color', 'city'])
table = Table('table', NoopDriver())
controller = build_flow([
DataframeSource(df, key_field='name'),
Expand All @@ -2403,6 +2403,7 @@ def test_non_existing_key_query_by_key():
controller = build_flow([
SyncEmitSource(),
QueryByKey(["color"], table, key="name"),
QueryByKey(["city"], table, key="name"),
]).run()

controller.emit({'nameeeee': 'katya'}, 'katya')
Expand Down

0 comments on commit b7a0c88

Please sign in to comment.