Skip to content

Commit

Permalink
ML-777: Import timestamp field from Event (for nuclio support). (#256)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gal Topper authored Jun 30, 2021
1 parent 141de4d commit dd91a23
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 0 deletions.
2 changes: 2 additions & 0 deletions storey/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ def _build_event(self, element, key, event_time):
element_is_event = hasattr(element, 'id')
if element_is_event:
body = element.body
if not hasattr(element, 'time') and hasattr(element, 'timestamp'):
element.time = element.timestamp

if not key and self._key_field:
if isinstance(self._key_field, str) or isinstance(self._key_field, int):
Expand Down
23 changes: 23 additions & 0 deletions tests/test_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,29 @@ def increment_maybe_boom(x):
assert termination_result == 54


# ML-777
def test_emit_timeless_event():
class TimelessEvent:
pass

controller = build_flow([
SyncEmitSource(),
ReduceToDataFrame(insert_time_column_as='mytime')
]).run()

event = TimelessEvent()
event.id = 'myevent'
event.body = {'salutation': 'hello'}
t = datetime(2020, 2, 15, 2, 0)
event.timestamp = t

controller.emit(event)
controller.terminate()
termination_result = controller.await_termination()
expected = pd.DataFrame([['hello', t]], columns=['salutation', 'mytime'])
assert termination_result.equals(expected)


def test_csv_reader():
controller = build_flow([
CSVSource('tests/test.csv', header=True),
Expand Down

0 comments on commit dd91a23

Please sign in to comment.