diff --git a/storey/targets.py b/storey/targets.py index 85b18e9a..ceb5c6e5 100644 --- a/storey/targets.py +++ b/storey/targets.py @@ -481,7 +481,7 @@ async def _emit(self, batch, batch_key, batch_time, last_event_time=None): self._last_written_event = last_event_time async def _terminate(self): - if self._mlrun_callback: + if self._mlrun_callback and self._last_written_event: self._mlrun_callback(self._full_path, self._last_written_event) diff --git a/tests/test_flow.py b/tests/test_flow.py index ea7a627e..cddcb65c 100644 --- a/tests/test_flow.py +++ b/tests/test_flow.py @@ -2750,3 +2750,35 @@ def test_reduce_to_df_multiple_indexes(): termination_result = controller.await_termination() assert_frame_equal(expected, termination_result) + + +@pytest.mark.parametrize("empty", [True, False]) +def test_func_parquet_target_terminate(tmpdir, empty): + out_file = f'{tmpdir}/test_func_parquet_target_terminate_{uuid.uuid4().hex}/' + + dictionary = {} + + def my_func(param1, param2): + dictionary[param1] = param2 + + if empty: + data = None + else: + data = [['dina', pd.Timestamp('2019-07-01 00:00:00'), 'tel aviv'], + ['uri', pd.Timestamp('2018-12-30 09:00:00'), 'tel aviv'], + ['katya', pd.Timestamp('2020-12-31 14:00:00'), 'hod hasharon']] + + df = pd.DataFrame(data, columns=['my_string', 'my_time', 'my_city']) + df.set_index('my_string') + + controller = build_flow([ + DataframeSource(df), + ParquetTarget(path=out_file, update_last_written=my_func) + ]).run() + + controller.await_termination() + + if empty: + assert len(dictionary) == 0 + else: + assert len(dictionary) == 1