Skip to content

Commit

Permalink
handling emitAfterMaxEvent in case of multiple keys (#213)
Browse files Browse the repository at this point in the history
* handling emitAfterMaxEvent in case of multiple keys

* pr comment

* fix
  • Loading branch information
katyakats authored Apr 29, 2021
1 parent 79eb8f2 commit fb391a2
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 8 deletions.
4 changes: 2 additions & 2 deletions integration/test_aggregation_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from storey import build_flow, SyncEmitSource, Reduce, Table, V3ioDriver, MapWithState, AggregateByKey, FieldAggregator, \
QueryByKey, NoSqlTarget, Context, DataframeSource, Map

from storey.dtypes import SlidingWindows, FixedWindows
from storey.dtypes import SlidingWindows, FixedWindows, EmitAfterMaxEvent
from storey.utils import _split_path

from .integration_test_utils import setup_teardown_test, append_return, test_base_time, V3ioHeaders
Expand Down Expand Up @@ -1156,7 +1156,7 @@ def test_aggregate_multiple_keys(setup_teardown_test):
DataframeSource(data, key_field=keys, time_field='time'),
AggregateByKey([FieldAggregator('number_of_stuff', 'some_data', ['sum'],
SlidingWindows(['1h'], '10m'))],
table),
table, emit_policy=EmitAfterMaxEvent(1)),
NoSqlTarget(table),
]).run()

Expand Down
12 changes: 6 additions & 6 deletions storey/aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,16 +152,16 @@ async def _do(self, event):
if isinstance(self._emit_policy, EmitEveryEvent):
await self._emit_event(key, event)
elif isinstance(self._emit_policy, EmitAfterMaxEvent):
if key in self._events_in_batch:
self._events_in_batch[key]['counter'] += 1
if safe_key in self._events_in_batch:
self._events_in_batch[safe_key]['counter'] += 1
else:
event_dict = {'counter': 1, 'time': time.monotonic()}
self._events_in_batch[key] = event_dict
self._events_in_batch[key]['event'] = event
self._events_in_batch[safe_key] = event_dict
self._events_in_batch[safe_key]['event'] = event
if self._emit_policy.timeout_secs and self._timeout_task is None:
self._timeout_task = asyncio.get_running_loop().create_task(self._sleep_and_emit())
if self._events_in_batch[key]['counter'] == self._emit_policy.max_events:
event_from_batch = self._events_in_batch.pop(key, None)
if self._events_in_batch[safe_key]['counter'] == self._emit_policy.max_events:
event_from_batch = self._events_in_batch.pop(safe_key, None)
if event_from_batch is not None:
await self._emit_event(key, event_from_batch['event'])
except Exception as ex:
Expand Down

0 comments on commit fb391a2

Please sign in to comment.