From a2edc7c7562ec11d6a1da1ee4224395d546623c5 Mon Sep 17 00:00:00 2001 From: Bea Steers Date: Mon, 30 Oct 2023 18:25:23 -0400 Subject: [PATCH] fix message timestamp handling --- redis_record/replay/streams.py | 7 ++++--- redis_record/storage/recorder/base.py | 14 ++++++++++++++ redis_record/storage/recorder/zip.py | 5 +++-- redis_record/storage/replay/mcap.py | 5 ++++- redis_record/storage/replay/zip.py | 18 +++++++++++------- 5 files changed, 36 insertions(+), 13 deletions(-) diff --git a/redis_record/replay/streams.py b/redis_record/replay/streams.py index 1ff6a51..9eee7ba 100644 --- a/redis_record/replay/streams.py +++ b/redis_record/replay/streams.py @@ -86,13 +86,14 @@ def replay( if seek: player.seek(seek) - try: - stream_id, ts, data = player.next_message() - except (StopIteration, queue.Empty): # fixme + + msg = player.next_message() + if msg is None: log.info(f"Finished replaying recording: {record_name}") record_name = None r.xadd(replay_key, {b'd': b''}, '*') continue + stream_id, ts, data = msg if realtime: sync.sync(ts) diff --git a/redis_record/storage/recorder/base.py b/redis_record/storage/recorder/base.py index 709794b..faa453a 100644 --- a/redis_record/storage/recorder/base.py +++ b/redis_record/storage/recorder/base.py @@ -4,6 +4,20 @@ class BaseRecorder: def __init__(self, out_dir='.', schema=None): self.out_dir = out_dir self.schema = schema + self.last_timestamp = None + self.index = 0 + + def _fix_timestamp(self, timestamp): + if isinstance(timestamp, str): + return timestamp + + timestamp = int(timestamp*1000) + if self.last_timestamp == timestamp: + self.index += 1 + else: + self.index = 0 + self.last_timestamp = timestamp + return f'{timestamp}-{self.index}' def __enter__(self): return self diff --git a/redis_record/storage/recorder/zip.py b/redis_record/storage/recorder/zip.py index 10a3472..c2759c2 100644 --- a/redis_record/storage/recorder/zip.py +++ b/redis_record/storage/recorder/zip.py @@ -17,6 +17,7 @@ def __init__(self, out_dir='.', max_len=1000, max_size=9.5*MB): def write(self, stream_id, timestamp, data): assert set(data) == {b'd'}, f"zip recorder can only record a single field in a stream. got {set(data)}" self.ensure_channel(stream_id) + timestamp = self._fix_timestamp(timestamp) self.writer[stream_id].write(data[b'd'], timestamp) def close(self): @@ -60,8 +61,8 @@ def __exit__(self, e, t, tb): def write(self, data, ts): self.size += len(data) - if not isinstance(timestamp, str): - timestamp = format_epoch_time(timestamp) + if not isinstance(ts, str): + ts = format_epoch_time(ts) self.buffer.append([data, ts]) if len(self.buffer) >= self.max_len or self.size >= self.max_size: self._dump(self.buffer) diff --git a/redis_record/storage/replay/mcap.py b/redis_record/storage/replay/mcap.py index d4639b2..a1eac23 100644 --- a/redis_record/storage/replay/mcap.py +++ b/redis_record/storage/replay/mcap.py @@ -32,7 +32,10 @@ def iter_messages(self): def next_message(self): if self.it is None: self.it = iter(self.reader.iter_messages()) - schema, channel, message = next(self.it) + try: + schema, channel, message = next(self.it) + except StopIteration: + return return self._parse_message(message) def _parse_message(self, message): diff --git a/redis_record/storage/replay/zip.py b/redis_record/storage/replay/zip.py index 5a37e73..199c62e 100644 --- a/redis_record/storage/replay/zip.py +++ b/redis_record/storage/replay/zip.py @@ -14,7 +14,7 @@ class ZipPlayer: - def __init__(self, path, recording_dir=RECORDING_DIR, subset=None): + def __init__(self, path, recording_dir=RECORDING_DIR, subset=None, raw_timestamp=None): self.recording_dir = path if recording_dir is None else os.path.join(recording_dir, path) self.subset = subset if isinstance(subset, (list, tuple, set)) else [subset] if subset else [] self.file_index = {} @@ -23,6 +23,7 @@ def __init__(self, path, recording_dir=RECORDING_DIR, subset=None): self.zipfh = {} self.file_end_timestamps = {} self.queue = queue.PriorityQueue() + self.raw_timestamp = raw_timestamp self._load_file_index() for stream_id in self.file_index: @@ -49,7 +50,10 @@ def seek(self, timestamp): def next_message(self): # get next message while True: - _, (stream_id, ts) = self.queue.get(block=False) + try: + _, (stream_id, ts) = self.queue.get(block=False) + except queue.Empty: + return tx = parse_epoch_time(ts) if tx >= self.time_cursor or 0: # log.debug("using timestamp: %s", ts) @@ -63,14 +67,14 @@ def next_message(self): # possibly load next file if ts >= self.file_end_timestamps[stream_id]: self._queue_next_file(stream_id) + tx = ts if self.raw_timestamp else tx return stream_id, tx, {'d': data} def iter_messages(self): - try: - while True: - yield self.next_message() - except queue.Empty: - pass + msg = self.next_message() + while msg: + yield msg + msg = self.next_message() def close(self): self.time_cursor = 0