Skip to content

Commit

Permalink
[Storey] Using table string in QueryByKey fails (#276)
Browse files Browse the repository at this point in the history
Signed-off-by: Eyal Salomon <[email protected]>
  • Loading branch information
Eyal Salomon authored Aug 1, 2021
1 parent 3835002 commit 9cc5bef
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 3 deletions.
11 changes: 9 additions & 2 deletions integration/test_aggregation_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,18 +185,25 @@ def test_aggregate_and_query_with_different_fixed_windows(setup_teardown_test, p
assert actual == expected_results, \
f'actual did not match expected. \n actual: {actual} \n expected: {expected_results}'

tables = [table, Table(setup_teardown_test, V3ioDriver())] # test on previous table and on new table
table_name = setup_teardown_test
tables = [table_name, table, Table(setup_teardown_test, V3ioDriver())] # test on previous table and on new table
expected_results = [
{'col1': 10, 'number_of_stuff_sum_1h': 17.0, 'number_of_stuff_min_1h': 8.0,
'number_of_stuff_max_1h': 9.0, 'number_of_stuff_avg_1h': 8.5},
{'col1': 10, 'number_of_stuff_sum_1h': 0.0, 'number_of_stuff_min_1h': math.inf,
'number_of_stuff_max_1h': -math.inf, 'number_of_stuff_avg_1h': math.nan},
]
for table in tables:
if isinstance(table, str):
tmp_table = Table(table_name, V3ioDriver())
context = Context(initial_tables={table_name: tmp_table})
else:
context = None

controller = build_flow([
SyncEmitSource(),
QueryByKey(['number_of_stuff_sum_1h', 'number_of_stuff_avg_1h', 'number_of_stuff_min_1h', 'number_of_stuff_max_1h'],
table),
table, context=context),
Reduce([], lambda acc, x: append_return(acc, x)),
]).run()

Expand Down
5 changes: 4 additions & 1 deletion storey/aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,10 @@ def __init__(self, features: List[str], table: Union[Table, str], key: Union[str
feature, aggr = name.rsplit('_', 1)
# setting as SlidingWindow temporarily until actual window type will be read from schema
self._aggrs.append(FieldAggregator(name=feature, field=None, aggr=[aggr], windows=SlidingWindows(windows, '10m')))
other_table = table._clone() if table._aggregates is not None else table
if isinstance(table, Table):
other_table = table._clone() if table._aggregates is not None else table
else:
other_table = table # str - pass table string along with the context object
AggregateByKey.__init__(self, self._aggrs, other_table, key, augmentation_fn=augmentation_fn,
enrich_with=self._enrich_cols, aliases=aliases, use_windows_from_schema=True, **kwargs)
self._table._aggregations_read_only = True
Expand Down

0 comments on commit 9cc5bef

Please sign in to comment.