Skip to content

Commit

Permalink
bug fix in reading non existing column from v3io when keys are list (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
katyakats authored Jun 8, 2021
1 parent b0f1eb9 commit 2325b3a
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 3 deletions.
24 changes: 22 additions & 2 deletions integration/test_aggregation_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -1166,7 +1166,7 @@ def test_aggregate_multiple_keys(setup_teardown_test):
controller = build_flow([
SyncEmitSource(),
QueryByKey(['number_of_stuff_sum_1h'],
other_table, keys=['first_name', 'last_name']),
other_table, key=['first_name', 'last_name']),
Reduce([], lambda acc, x: append_return(acc, x)),
]).run()

Expand Down Expand Up @@ -1299,7 +1299,7 @@ def map_multiply(x):
controller = build_flow([
SyncEmitSource(),
QueryByKey(['number_of_stuff_avg_1h', 'number_of_stuff2_sum_2h'],
other_table, keys=['first_name']),
other_table, key=['first_name']),
Reduce([], lambda acc, x: append_return(acc, x)),
]).run()

Expand Down Expand Up @@ -1344,3 +1344,23 @@ def test_write_read_first_last(setup_teardown_test):

assert result == [{'mykey': 'onekey', 'attr_first_1h': 0.0, 'attr_last_1h': 9.0},
{'mykey': 'onekey', 'attr_first_1h': 0.0, 'attr_last_1h': 90.0}]


def test_non_existing_key_query_by_key_from_v3io_key_is_list(setup_teardown_test):
table = Table(setup_teardown_test, V3ioDriver())
df = pd.DataFrame([['katya', 'green', 'hod hasharon'], ['dina', 'blue', 'ramat gan']], columns=['name', 'color', 'city'])
controller = build_flow([
DataframeSource(df, key_field='name'),
NoSqlTarget(table),
]).run()
controller.await_termination()

controller = build_flow([
SyncEmitSource(),
QueryByKey(["color"], table, key=["name"]),
QueryByKey(["city"], table, key="name"),
]).run()

controller.emit({'nameeeee': 'katya'}, 'katya')
controller.terminate()
controller.await_termination()
2 changes: 1 addition & 1 deletion storey/aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ async def _do(self, event):
if self.key_extractor:
if element:
key = self.key_extractor(element)
if key is None or element is None:
if key is None or key == [None] or element is None:
event.body = None
await self._do_downstream(event)
return
Expand Down

0 comments on commit 2325b3a

Please sign in to comment.