Skip to content

Commit

Permalink
replay: improve segment loading and event handling (commaai#34490)
Browse files Browse the repository at this point in the history
improve segment Loading and Event Handling
deanlee authored Jan 28, 2025
1 parent 2eb3585 commit 227bb68
Showing 5 changed files with 18 additions and 12 deletions.
2 changes: 1 addition & 1 deletion tools/cabana/streams/abstractstream.cc
Original file line number Diff line number Diff line change
@@ -226,7 +226,7 @@ std::pair<CanEventIter, CanEventIter> AbstractStream::eventsInRange(const Messag
if (!time_range) return {events.begin(), events.end()};

auto first = std::lower_bound(events.begin(), events.end(), can->toMonoTime(time_range->first), CompareCanEvent());
auto last = std::upper_bound(events.begin(), events.end(), can->toMonoTime(time_range->second), CompareCanEvent());
auto last = std::upper_bound(first, events.end(), can->toMonoTime(time_range->second), CompareCanEvent());
return {first, last};
}

12 changes: 6 additions & 6 deletions tools/replay/replay.cc
Original file line number Diff line number Diff line change
@@ -109,6 +109,7 @@ void Replay::seekTo(double seconds, bool relative) {
interruptStream([&]() {
current_segment_.store(target_segment);
cur_mono_time_ = route_start_ts_ + target_time * 1e9;
cur_which_ = cereal::Event::Which::INIT_DATA;
seeking_to_.store(target_time, std::memory_order_relaxed);
return false;
});
@@ -250,7 +251,6 @@ void Replay::publishFrame(const Event *e) {

void Replay::streamThread() {
stream_thread_id = pthread_self();
cereal::Event::Which cur_which = cereal::Event::Which::INIT_DATA;
std::unique_lock lk(stream_lock_);

while (true) {
@@ -259,7 +259,7 @@ void Replay::streamThread() {

event_data_ = seg_mgr_->getEventData();
const auto &events = event_data_->events;
auto first = std::upper_bound(events.cbegin(), events.cend(), Event(cur_which, cur_mono_time_, {}));
auto first = std::upper_bound(events.cbegin(), events.cend(), Event(cur_which_, cur_mono_time_, {}));
if (first == events.cend()) {
rInfo("waiting for events...");
events_ready_ = false;
@@ -273,9 +273,7 @@ void Replay::streamThread() {
camera_server_->waitForSent();
}

if (it != events.cend()) {
cur_which = it->which;
} else if (!hasFlag(REPLAY_FLAG_NO_LOOP)) {
if (it == events.cend() && !hasFlag(REPLAY_FLAG_NO_LOOP)) {
int last_segment = seg_mgr_->route_.segments().rbegin()->first;
if (event_data_->isSegmentLoaded(last_segment)) {
rInfo("reaches the end of route, restart from beginning");
@@ -302,10 +300,12 @@ std::vector<Event>::const_iterator Replay::publishEvents(std::vector<Event>::con
seg_mgr_->setCurrentSegment(segment);
}

cur_mono_time_ = evt.mono_time;
cur_which_ = evt.which;

// Skip events if socket is not present
if (!sockets_[evt.which]) continue;

cur_mono_time_ = evt.mono_time;
const uint64_t current_nanos = nanos_since_boot();
const int64_t time_diff = (evt.mono_time - evt_start_ts) / speed_ - (current_nanos - loop_start_ts);

1 change: 1 addition & 0 deletions tools/replay/replay.h
Original file line number Diff line number Diff line change
@@ -93,6 +93,7 @@ class Replay {
std::time_t route_date_time_;
uint64_t route_start_ts_ = 0;
std::atomic<uint64_t> cur_mono_time_ = 0;
cereal::Event::Which cur_which_ = cereal::Event::Which::INIT_DATA;
double min_seconds_ = 0;
double max_seconds_ = 0;
SubMaster *sm_ = nullptr;
13 changes: 9 additions & 4 deletions tools/replay/seg_mgr.cc
Original file line number Diff line number Diff line change
@@ -6,7 +6,6 @@ SegmentManager::~SegmentManager() {
{
std::unique_lock lock(mutex_);
exit_ = true;
onSegmentMergedCallback_ = nullptr;
}
cv_.notify_one();
if (thread_.joinable()) thread_.join();
@@ -37,6 +36,8 @@ bool SegmentManager::load() {
void SegmentManager::setCurrentSegment(int seg_num) {
{
std::unique_lock lock(mutex_);
if (cur_seg_num_ == seg_num) return;

cur_seg_num_ = seg_num;
needs_update_ = true;
}
@@ -58,15 +59,15 @@ void SegmentManager::manageSegmentCache() {
auto end = std::next(begin, std::min<int>(segment_cache_limit_, std::distance(begin, segments_.end())));
begin = std::prev(end, std::min<int>(segment_cache_limit_, std::distance(segments_.begin(), end)));

lock.unlock();

loadSegmentsInRange(begin, cur, end);
bool merged = mergeSegments(begin, end);

// Free segments outside the current range
std::for_each(segments_.begin(), begin, [](auto &segment) { segment.second.reset(); });
std::for_each(end, segments_.end(), [](auto &segment) { segment.second.reset(); });

lock.unlock();

if (merged && onSegmentMergedCallback_) {
onSegmentMergedCallback_(); // Notify listener that segments have been merged
}
@@ -118,7 +119,11 @@ void SegmentManager::loadSegmentsInRange(SegmentMap::iterator begin, SegmentMap:
if (!segment_ptr) {
segment_ptr = std::make_shared<Segment>(
it->first, route_.at(it->first), flags_, filters_,
[this](int seg_num, bool success) { setCurrentSegment(cur_seg_num_); });
[this](int seg_num, bool success) {
std::unique_lock lock(mutex_);
needs_update_ = true;
cv_.notify_one();
});
}

if (segment_ptr->getState() == Segment::LoadState::Loading) {
2 changes: 1 addition & 1 deletion tools/replay/seg_mgr.h
Original file line number Diff line number Diff line change
@@ -45,7 +45,7 @@ class SegmentManager {
std::mutex mutex_;
std::condition_variable cv_;
std::thread thread_;
std::atomic<int> cur_seg_num_ = -1;
int cur_seg_num_ = -1;
bool needs_update_ = false;
bool exit_ = false;

0 comments on commit 227bb68

Please sign in to comment.