diff --git a/storey/sources.py b/storey/sources.py index 9856a472..787156d5 100644 --- a/storey/sources.py +++ b/storey/sources.py @@ -763,7 +763,7 @@ async def _run_loop(self): if isinstance(self._key_field, list): key = [] for key_field in self._key_field: - if key_field not in body or body[key_field] is None: + if key_field not in body or pandas.isna(body[key_field]): create_event = False break key.append(body[key_field]) diff --git a/tests/test_flow.py b/tests/test_flow.py index 1352b413..7b0f1f82 100644 --- a/tests/test_flow.py +++ b/tests/test_flow.py @@ -2670,6 +2670,39 @@ def test_none_key_is_not_written(): assert result == expected +def test_none_key_num_is_not_written(): + data = pd.DataFrame({'index': [10, None, 20], 'some_data': [1, 2, 3]}) + data.set_index(keys=['index'], inplace=True) + + controller = build_flow([ + DataframeSource(data, key_field=['index']), + Reduce([], append_and_return), + ]).run() + result = controller.await_termination() + expected = [{'index': 10, 'some_data': 1}, {'index': 20, 'some_data': 3}] + + assert result == expected + + +def test_none_key_date_is_not_written(): + + data = pd.DataFrame({'index': [datetime(2020, 6, 27, 10, 23, 8, 420581), + None, + datetime(2020, 6, 28, 10, 23, 8, 420581)], + 'some_data': [1, 2, 3]}) + data.set_index(keys=['index'], inplace=True) + + controller = build_flow([ + DataframeSource(data, key_field=['index']), + Reduce([], append_and_return), + ]).run() + result = controller.await_termination() + expected = [{'index': datetime(2020, 6, 27, 10, 23, 8, 420581), 'some_data': 1}, + {'index': datetime(2020, 6, 28, 10, 23, 8, 420581), 'some_data': 3}] + + assert result == expected + + def test_csv_none_value_first_row(tmpdir): out_file_par = f'{tmpdir}/test_csv_none_value_first_row_{uuid.uuid4().hex}.parquet' out_file_csv = f'{tmpdir}/test_csv_none_value_first_row_{uuid.uuid4().hex}.csv'