From 211c719fd1276fe2c6cf37a4f9db195fdf9be980 Mon Sep 17 00:00:00 2001 From: Katya Katsenelenbogen Date: Sun, 31 Oct 2021 11:33:37 +0200 Subject: [PATCH] ML-1101: indicate there were no events by writing datetime.min (#306) * ML-1101: indicate there were no events by writing datetime.min * adding a comment --- storey/targets.py | 8 ++++++-- tests/test_flow.py | 17 +++++------------ 2 files changed, 11 insertions(+), 14 deletions(-) diff --git a/storey/targets.py b/storey/targets.py index ceb5c6e5..11f207fa 100644 --- a/storey/targets.py +++ b/storey/targets.py @@ -481,8 +481,12 @@ async def _emit(self, batch, batch_key, batch_time, last_event_time=None): self._last_written_event = last_event_time async def _terminate(self): - if self._mlrun_callback and self._last_written_event: - self._mlrun_callback(self._full_path, self._last_written_event) + if self._mlrun_callback: + if self._last_written_event: + self._mlrun_callback(self._full_path, self._last_written_event) + else: + # min is a special case that indicates to mlrun that nothing was written + self._mlrun_callback(self._full_path, datetime.datetime.min) class TSDBTarget(_Batching, _Writer): diff --git a/tests/test_flow.py b/tests/test_flow.py index cddcb65c..9b9ab5bc 100644 --- a/tests/test_flow.py +++ b/tests/test_flow.py @@ -2752,8 +2752,7 @@ def test_reduce_to_df_multiple_indexes(): assert_frame_equal(expected, termination_result) -@pytest.mark.parametrize("empty", [True, False]) -def test_func_parquet_target_terminate(tmpdir, empty): +def test_func_parquet_target_terminate(tmpdir): out_file = f'{tmpdir}/test_func_parquet_target_terminate_{uuid.uuid4().hex}/' dictionary = {} @@ -2761,12 +2760,9 @@ def test_func_parquet_target_terminate(tmpdir, empty): def my_func(param1, param2): dictionary[param1] = param2 - if empty: - data = None - else: - data = [['dina', pd.Timestamp('2019-07-01 00:00:00'), 'tel aviv'], - ['uri', pd.Timestamp('2018-12-30 09:00:00'), 'tel aviv'], - ['katya', pd.Timestamp('2020-12-31 14:00:00'), 'hod hasharon']] + data = [['dina', pd.Timestamp('2019-07-01 00:00:00'), 'tel aviv'], + ['uri', pd.Timestamp('2018-12-30 09:00:00'), 'tel aviv'], + ['katya', pd.Timestamp('2020-12-31 14:00:00'), 'hod hasharon']] df = pd.DataFrame(data, columns=['my_string', 'my_time', 'my_city']) df.set_index('my_string') @@ -2778,7 +2774,4 @@ def my_func(param1, param2): controller.await_termination() - if empty: - assert len(dictionary) == 0 - else: - assert len(dictionary) == 1 + assert len(dictionary) == 1