diff --git a/storey/sources.py b/storey/sources.py index 8ed28b56..49f14ab9 100644 --- a/storey/sources.py +++ b/storey/sources.py @@ -79,6 +79,8 @@ def _build_event(self, element, key, event_time): element_is_event = hasattr(element, 'id') if element_is_event: body = element.body + if not hasattr(element, 'time') and hasattr(element, 'timestamp'): + element.time = element.timestamp if not key and self._key_field: if isinstance(self._key_field, str) or isinstance(self._key_field, int): diff --git a/tests/test_flow.py b/tests/test_flow.py index dbb7d84c..c0e24a60 100644 --- a/tests/test_flow.py +++ b/tests/test_flow.py @@ -93,6 +93,29 @@ def increment_maybe_boom(x): assert termination_result == 54 +# ML-777 +def test_emit_timeless_event(): + class TimelessEvent: + pass + + controller = build_flow([ + SyncEmitSource(), + ReduceToDataFrame(insert_time_column_as='mytime') + ]).run() + + event = TimelessEvent() + event.id = 'myevent' + event.body = {'salutation': 'hello'} + t = datetime(2020, 2, 15, 2, 0) + event.timestamp = t + + controller.emit(event) + controller.terminate() + termination_result = controller.await_termination() + expected = pd.DataFrame([['hello', t]], columns=['salutation', 'mytime']) + assert termination_result.equals(expected) + + def test_csv_reader(): controller = build_flow([ CSVSource('tests/test.csv', header=True),