Skip to content

Commit

Permalink
ML 443: saving last written event time to pass to mlrun (#268)
Browse files Browse the repository at this point in the history
* ML-443: saving last written event

* some bug fixes

* work on rainy days scenario

* frames version

* debug print

* debug

* debug prints

* some cleanup

* cleanup

* req

* req2

* remove unused imports

* fix

* fix
  • Loading branch information
katyakats authored Jul 27, 2021
1 parent a021fb5 commit 3835002
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 6 deletions.
11 changes: 8 additions & 3 deletions storey/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,7 @@ def __init__(
def _init(self):
self._batch: Dict[Optional[str], List[Any]] = defaultdict(list)
self._batch_first_event_time: Dict[Optional[str], datetime.datetime] = {}
self._batch_last_event_time: Dict[Optional[str], datetime.datetime] = {}
self._batch_start_time: Dict[Optional[str], float] = {}
self._timeout_task: Optional[Task] = None
self._timeout_task_ex: Optional[Exception] = None
Expand All @@ -732,7 +733,7 @@ def _create_key_extractor(key) -> Callable:
else:
raise ValueError(f'Unsupported key type {type(key)}')

async def _emit(self, batch, batch_key, batch_time):
async def _emit(self, batch, batch_key, batch_time, last_event_time=None):
raise NotImplementedError

async def _terminate(self):
Expand All @@ -749,6 +750,9 @@ async def _do(self, event):
if len(self._batch[key]) == 0:
self._batch_first_event_time[key] = event.time
self._batch_start_time[key] = time.monotonic()
self._batch_last_event_time[key] = event.time
elif self._batch_last_event_time[key] < event.time:
self._batch_last_event_time[key] = event.time

if self._timeout_task_ex:
raise self._timeout_task_ex
Expand Down Expand Up @@ -785,8 +789,9 @@ async def _emit_batch(self, batch_key: Optional[str] = None):
if batch_to_emit is None:
return
batch_time = self._batch_first_event_time.pop(batch_key)
last_event_time = self._batch_last_event_time.pop(batch_key)
del self._batch_start_time[batch_key]
await self._emit(batch_to_emit, batch_key, batch_time)
await self._emit(batch_to_emit, batch_key, batch_time, last_event_time)

async def _emit_all(self):
for key in list(self._batch.keys()):
Expand All @@ -809,7 +814,7 @@ class Batch(_Batching):
"""
_do_downstream_per_event = False

async def _emit(self, batch, batch_key, batch_time):
async def _emit(self, batch, batch_key, batch_time, last_event_time=None):
event = Event(batch, time=batch_time)
return await self._do_downstream(event)

Expand Down
15 changes: 12 additions & 3 deletions storey/targets.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ async def _terminate(self):
asyncio.get_running_loop().run_in_executor(None, lambda: self._data_buffer.put(_termination_obj))
await self._blocking_io_loop_future

async def _emit(self, batch, batch_key, batch_time):
async def _emit(self, batch, batch_key, batch_time, last_event_time=None):
if not self._blocking_io_loop_future:
self._blocking_io_loop_future = asyncio.get_running_loop().run_in_executor(None, self._blocking_io_loop)

Expand Down Expand Up @@ -428,6 +428,7 @@ def __init__(self, path: str, index_cols: Union[str, List[str], None] = None,

storage_options = kwargs.get('storage_options')
self._file_system, self._path = url_to_file_system(path, storage_options)
self._full_path = path

path_from_event = self._path_from_event if partition_cols else None

Expand All @@ -436,6 +437,8 @@ def __init__(self, path: str, index_cols: Union[str, List[str], None] = None,

self._field_extractor = lambda event_body, field_name: event_body.get(field_name)
self._write_missing_fields = True
self._mlrun_callback = kwargs.get('update_last_written')
self._last_written_event = None

def _init(self):
_Batching._init(self)
Expand All @@ -444,7 +447,7 @@ def _init(self):
def _event_to_batch_entry(self, event):
return self._event_to_writer_entry(event)

async def _emit(self, batch, batch_key, batch_time):
async def _emit(self, batch, batch_key, batch_time, last_event_time=None):
df_columns = []
if self._index_cols:
df_columns.extend(self._index_cols)
Expand Down Expand Up @@ -474,6 +477,12 @@ async def _emit(self, batch, batch_key, batch_time):
if self._schema is not None:
kwargs['schema'] = self._schema
df.to_parquet(path=file, index=bool(self._index_cols), **kwargs)
if not self._last_written_event or last_event_time > self._last_written_event:
self._last_written_event = last_event_time

async def _terminate(self):
if self._mlrun_callback:
self._mlrun_callback(self._full_path, self._last_written_event)


class TSDBTarget(_Batching, _Writer):
Expand Down Expand Up @@ -551,7 +560,7 @@ def _init(self):
def _event_to_batch_entry(self, event):
return self._event_to_writer_entry(event)

async def _emit(self, batch, batch_key, batch_time):
async def _emit(self, batch, batch_key, batch_time, last_event_time=None):
df_columns = []
if self._index_cols:
df_columns.extend(self._index_cols)
Expand Down

0 comments on commit 3835002

Please sign in to comment.