diff --git a/storey/aggregations.py b/storey/aggregations.py index e53cc53b..c6d4ff05 100644 --- a/storey/aggregations.py +++ b/storey/aggregations.py @@ -266,8 +266,9 @@ async def _do(self, event): element = event.body key = event.key if self.key_extractor: - key = self.key_extractor(element) - if key is None: + if element: + key = self.key_extractor(element) + if key is None or element is None: event.body = None await self._do_downstream(event) return diff --git a/tests/test_flow.py b/tests/test_flow.py index 9bd98005..16e67a35 100644 --- a/tests/test_flow.py +++ b/tests/test_flow.py @@ -2392,7 +2392,7 @@ def test_complete_in_error_flow(): def test_non_existing_key_query_by_key(): - df = pd.DataFrame([['katya', 'green'], ['dina', 'blue']], columns=['name', 'color']) + df = pd.DataFrame([['katya', 'green', 'hod hasharon'], ['dina', 'blue', 'ramat gan']], columns=['name', 'color', 'city']) table = Table('table', NoopDriver()) controller = build_flow([ DataframeSource(df, key_field='name'), @@ -2403,6 +2403,7 @@ def test_non_existing_key_query_by_key(): controller = build_flow([ SyncEmitSource(), QueryByKey(["color"], table, key="name"), + QueryByKey(["city"], table, key="name"), ]).run() controller.emit({'nameeeee': 'katya'}, 'katya')