Skip to content

Commit

Permalink
fix message timestamp handling
Browse files Browse the repository at this point in the history
  • Loading branch information
beasteers committed Oct 30, 2023
1 parent 5aad417 commit a2edc7c
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 13 deletions.
7 changes: 4 additions & 3 deletions redis_record/replay/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,14 @@ def replay(
if seek:
player.seek(seek)

try:
stream_id, ts, data = player.next_message()
except (StopIteration, queue.Empty): # fixme

msg = player.next_message()
if msg is None:
log.info(f"Finished replaying recording: {record_name}")
record_name = None
r.xadd(replay_key, {b'd': b''}, '*')
continue
stream_id, ts, data = msg

if realtime:
sync.sync(ts)
Expand Down
14 changes: 14 additions & 0 deletions redis_record/storage/recorder/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,20 @@ class BaseRecorder:
def __init__(self, out_dir='.', schema=None):
self.out_dir = out_dir
self.schema = schema
self.last_timestamp = None
self.index = 0

def _fix_timestamp(self, timestamp):
if isinstance(timestamp, str):
return timestamp

timestamp = int(timestamp*1000)
if self.last_timestamp == timestamp:
self.index += 1
else:
self.index = 0
self.last_timestamp = timestamp
return f'{timestamp}-{self.index}'

def __enter__(self):
return self
Expand Down
5 changes: 3 additions & 2 deletions redis_record/storage/recorder/zip.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def __init__(self, out_dir='.', max_len=1000, max_size=9.5*MB):
def write(self, stream_id, timestamp, data):
assert set(data) == {b'd'}, f"zip recorder can only record a single field in a stream. got {set(data)}"
self.ensure_channel(stream_id)
timestamp = self._fix_timestamp(timestamp)
self.writer[stream_id].write(data[b'd'], timestamp)

def close(self):
Expand Down Expand Up @@ -60,8 +61,8 @@ def __exit__(self, e, t, tb):

def write(self, data, ts):
self.size += len(data)
if not isinstance(timestamp, str):
timestamp = format_epoch_time(timestamp)
if not isinstance(ts, str):
ts = format_epoch_time(ts)
self.buffer.append([data, ts])
if len(self.buffer) >= self.max_len or self.size >= self.max_size:
self._dump(self.buffer)
Expand Down
5 changes: 4 additions & 1 deletion redis_record/storage/replay/mcap.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ def iter_messages(self):
def next_message(self):
if self.it is None:
self.it = iter(self.reader.iter_messages())
schema, channel, message = next(self.it)
try:
schema, channel, message = next(self.it)
except StopIteration:
return
return self._parse_message(message)

def _parse_message(self, message):
Expand Down
18 changes: 11 additions & 7 deletions redis_record/storage/replay/zip.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@


class ZipPlayer:
def __init__(self, path, recording_dir=RECORDING_DIR, subset=None):
def __init__(self, path, recording_dir=RECORDING_DIR, subset=None, raw_timestamp=None):
self.recording_dir = path if recording_dir is None else os.path.join(recording_dir, path)
self.subset = subset if isinstance(subset, (list, tuple, set)) else [subset] if subset else []
self.file_index = {}
Expand All @@ -23,6 +23,7 @@ def __init__(self, path, recording_dir=RECORDING_DIR, subset=None):
self.zipfh = {}
self.file_end_timestamps = {}
self.queue = queue.PriorityQueue()
self.raw_timestamp = raw_timestamp

self._load_file_index()
for stream_id in self.file_index:
Expand All @@ -49,7 +50,10 @@ def seek(self, timestamp):
def next_message(self):
# get next message
while True:
_, (stream_id, ts) = self.queue.get(block=False)
try:
_, (stream_id, ts) = self.queue.get(block=False)
except queue.Empty:
return
tx = parse_epoch_time(ts)
if tx >= self.time_cursor or 0:
# log.debug("using timestamp: %s", ts)
Expand All @@ -63,14 +67,14 @@ def next_message(self):
# possibly load next file
if ts >= self.file_end_timestamps[stream_id]:
self._queue_next_file(stream_id)
tx = ts if self.raw_timestamp else tx
return stream_id, tx, {'d': data}

def iter_messages(self):
try:
while True:
yield self.next_message()
except queue.Empty:
pass
msg = self.next_message()
while msg:
yield msg
msg = self.next_message()

def close(self):
self.time_cursor = 0
Expand Down

0 comments on commit a2edc7c

Please sign in to comment.