diff --git a/tools/cabana/streams/abstractstream.cc b/tools/cabana/streams/abstractstream.cc index 2278104c269171..1582fcd34dd7ea 100644 --- a/tools/cabana/streams/abstractstream.cc +++ b/tools/cabana/streams/abstractstream.cc @@ -226,7 +226,7 @@ std::pair AbstractStream::eventsInRange(const Messag if (!time_range) return {events.begin(), events.end()}; auto first = std::lower_bound(events.begin(), events.end(), can->toMonoTime(time_range->first), CompareCanEvent()); - auto last = std::upper_bound(events.begin(), events.end(), can->toMonoTime(time_range->second), CompareCanEvent()); + auto last = std::upper_bound(first, events.end(), can->toMonoTime(time_range->second), CompareCanEvent()); return {first, last}; } diff --git a/tools/replay/replay.cc b/tools/replay/replay.cc index a940b6d04e81a8..80f586daa634d4 100644 --- a/tools/replay/replay.cc +++ b/tools/replay/replay.cc @@ -109,6 +109,7 @@ void Replay::seekTo(double seconds, bool relative) { interruptStream([&]() { current_segment_.store(target_segment); cur_mono_time_ = route_start_ts_ + target_time * 1e9; + cur_which_ = cereal::Event::Which::INIT_DATA; seeking_to_.store(target_time, std::memory_order_relaxed); return false; }); @@ -250,7 +251,6 @@ void Replay::publishFrame(const Event *e) { void Replay::streamThread() { stream_thread_id = pthread_self(); - cereal::Event::Which cur_which = cereal::Event::Which::INIT_DATA; std::unique_lock lk(stream_lock_); while (true) { @@ -259,7 +259,7 @@ void Replay::streamThread() { event_data_ = seg_mgr_->getEventData(); const auto &events = event_data_->events; - auto first = std::upper_bound(events.cbegin(), events.cend(), Event(cur_which, cur_mono_time_, {})); + auto first = std::upper_bound(events.cbegin(), events.cend(), Event(cur_which_, cur_mono_time_, {})); if (first == events.cend()) { rInfo("waiting for events..."); events_ready_ = false; @@ -273,9 +273,7 @@ void Replay::streamThread() { camera_server_->waitForSent(); } - if (it != events.cend()) { - cur_which = it->which; - } else if (!hasFlag(REPLAY_FLAG_NO_LOOP)) { + if (it == events.cend() && !hasFlag(REPLAY_FLAG_NO_LOOP)) { int last_segment = seg_mgr_->route_.segments().rbegin()->first; if (event_data_->isSegmentLoaded(last_segment)) { rInfo("reaches the end of route, restart from beginning"); @@ -302,10 +300,12 @@ std::vector::const_iterator Replay::publishEvents(std::vector::con seg_mgr_->setCurrentSegment(segment); } + cur_mono_time_ = evt.mono_time; + cur_which_ = evt.which; + // Skip events if socket is not present if (!sockets_[evt.which]) continue; - cur_mono_time_ = evt.mono_time; const uint64_t current_nanos = nanos_since_boot(); const int64_t time_diff = (evt.mono_time - evt_start_ts) / speed_ - (current_nanos - loop_start_ts); diff --git a/tools/replay/replay.h b/tools/replay/replay.h index 8525a532a1766c..6a2c86ff0218b6 100644 --- a/tools/replay/replay.h +++ b/tools/replay/replay.h @@ -93,6 +93,7 @@ class Replay { std::time_t route_date_time_; uint64_t route_start_ts_ = 0; std::atomic cur_mono_time_ = 0; + cereal::Event::Which cur_which_ = cereal::Event::Which::INIT_DATA; double min_seconds_ = 0; double max_seconds_ = 0; SubMaster *sm_ = nullptr; diff --git a/tools/replay/seg_mgr.cc b/tools/replay/seg_mgr.cc index 8a00d426b1a804..ee034fb083ae3d 100644 --- a/tools/replay/seg_mgr.cc +++ b/tools/replay/seg_mgr.cc @@ -6,7 +6,6 @@ SegmentManager::~SegmentManager() { { std::unique_lock lock(mutex_); exit_ = true; - onSegmentMergedCallback_ = nullptr; } cv_.notify_one(); if (thread_.joinable()) thread_.join(); @@ -37,6 +36,8 @@ bool SegmentManager::load() { void SegmentManager::setCurrentSegment(int seg_num) { { std::unique_lock lock(mutex_); + if (cur_seg_num_ == seg_num) return; + cur_seg_num_ = seg_num; needs_update_ = true; } @@ -58,6 +59,8 @@ void SegmentManager::manageSegmentCache() { auto end = std::next(begin, std::min(segment_cache_limit_, std::distance(begin, segments_.end()))); begin = std::prev(end, std::min(segment_cache_limit_, std::distance(segments_.begin(), end))); + lock.unlock(); + loadSegmentsInRange(begin, cur, end); bool merged = mergeSegments(begin, end); @@ -65,8 +68,6 @@ void SegmentManager::manageSegmentCache() { std::for_each(segments_.begin(), begin, [](auto &segment) { segment.second.reset(); }); std::for_each(end, segments_.end(), [](auto &segment) { segment.second.reset(); }); - lock.unlock(); - if (merged && onSegmentMergedCallback_) { onSegmentMergedCallback_(); // Notify listener that segments have been merged } @@ -118,7 +119,11 @@ void SegmentManager::loadSegmentsInRange(SegmentMap::iterator begin, SegmentMap: if (!segment_ptr) { segment_ptr = std::make_shared( it->first, route_.at(it->first), flags_, filters_, - [this](int seg_num, bool success) { setCurrentSegment(cur_seg_num_); }); + [this](int seg_num, bool success) { + std::unique_lock lock(mutex_); + needs_update_ = true; + cv_.notify_one(); + }); } if (segment_ptr->getState() == Segment::LoadState::Loading) { diff --git a/tools/replay/seg_mgr.h b/tools/replay/seg_mgr.h index 40bdcd51f06d1d..9158e416183c54 100644 --- a/tools/replay/seg_mgr.h +++ b/tools/replay/seg_mgr.h @@ -45,7 +45,7 @@ class SegmentManager { std::mutex mutex_; std::condition_variable cv_; std::thread thread_; - std::atomic cur_seg_num_ = -1; + int cur_seg_num_ = -1; bool needs_update_ = false; bool exit_ = false;