Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[rec_core] Record ecalhdf5 v6, respecting topic-id. #1884

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion app/rec/rec_client_core/include/rec_client_core/topic_info.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
* Copyright (C) 2016 - 2025 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -75,5 +75,7 @@ namespace eCAL

std::map<std::string, std::set<std::string>> publishers_;
};

using TopicInfoMap = std::map<eCAL::Registration::STopicId, TopicInfo>;
}
}
18 changes: 9 additions & 9 deletions app/rec/rec_client_core/src/ecal_rec_impl.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2024 Continental Corporation
* Copyright (C) 2016 - 2025 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -695,12 +695,12 @@ namespace eCAL
return subscribed_topics;
}

void EcalRecImpl::EcalMessageReceived(const eCAL::Registration::STopicId& topic_id_, const eCAL::SReceiveCallbackData& data_)
void EcalRecImpl::EcalMessageReceived(const Registration::STopicId& topic_id_, const SDataTypeInformation& datatype_info_, const SReceiveCallbackData& callback_data_)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: parameter 'datatype_info_' is unused [misc-unused-parameters]

Suggested change
void EcalRecImpl::EcalMessageReceived(const Registration::STopicId& topic_id_, const SDataTypeInformation& datatype_info_, const SReceiveCallbackData& callback_data_)
void EcalRecImpl::EcalMessageReceived(const Registration::STopicId& topic_id_, const SDataTypeInformation& /*datatype_info_*/, const SReceiveCallbackData& callback_data_)

{
auto ecal_receive_time = eCAL::Time::ecal_clock::now();
auto system_receive_time = std::chrono::steady_clock::now();

std::shared_ptr<Frame> frame = std::make_shared<Frame>(&data_, topic_id_.topic_name, ecal_receive_time, system_receive_time);
std::shared_ptr<Frame> frame = std::make_shared<Frame>(callback_data_, topic_id_, ecal_receive_time, system_receive_time);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: variable 'frame' of type 'std::shared_ptr' can be declared 'const' [misc-const-correctness]

Suggested change
std::shared_ptr<Frame> frame = std::make_shared<Frame>(callback_data_, topic_id_, ecal_receive_time, system_receive_time);
std::shared_ptr<Frame> const frame = std::make_shared<Frame>(callback_data_, topic_id_, ecal_receive_time, system_receive_time);


pre_buffer_.push_back(frame);

Expand All @@ -721,7 +721,7 @@ namespace eCAL
pre_buffer_.remove_old_frames();
}

void EcalRecImpl::SetTopicInfo(const std::map<std::string, TopicInfo>& topic_info_map)
void EcalRecImpl::SetTopicInfo(const TopicInfoMap& topic_info_map)
{
// Create subscribers for new topics if necessary
{
Expand Down Expand Up @@ -762,7 +762,7 @@ namespace eCAL
RemoveOldSubscribers_NoLock(filtered_topic_set);
}

std::set<std::string> EcalRecImpl::FilterAvailableTopics_NoLock(const std::map<std::string, TopicInfo>& topic_info_map) const
std::set<std::string> EcalRecImpl::FilterAvailableTopics_NoLock(const TopicInfoMap& topic_info_map) const
{
std::set<std::string> topic_set;

Expand All @@ -776,13 +776,13 @@ namespace eCAL

// Evaluate the record mode (All / Blacklist / Whitelist)
if ((record_mode_ == RecordMode::Blacklist)
&& (listed_topics_.find(topic_info.first) != listed_topics_.end()))
&& (listed_topics_.find(topic_info.first.topic_name) != listed_topics_.end()))
{
// The topic is blacklisted
continue;
}
else if ((record_mode_ == RecordMode::Whitelist)
&& (listed_topics_.find(topic_info.first) == listed_topics_.end()))
&& (listed_topics_.find(topic_info.first.topic_name) == listed_topics_.end()))
{
// The topic is not whitelisted
continue;
Expand All @@ -808,7 +808,7 @@ namespace eCAL
}

// Add the topic to the filtered set if we haven't found any reason not to do that :)
topic_set.emplace(topic_info.first);
topic_set.emplace(topic_info.first.topic_name);
}

return topic_set;
Expand All @@ -828,7 +828,7 @@ namespace eCAL
info_ = { false, "Error creating eCAL subsribers" };
continue;
}
if (!subscriber->SetReceiveCallback(std::bind(&EcalRecImpl::EcalMessageReceived, this, std::placeholders::_1, std::placeholders::_3)))
if (!subscriber->SetReceiveCallback(std::bind(&EcalRecImpl::EcalMessageReceived, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)))
{
EcalRecLogger::Instance()->error("Error adding callback for subscriber on topic " + topic);
info_ = { false, "Error creating eCAL subsribers" };
Expand Down
8 changes: 4 additions & 4 deletions app/rec/rec_client_core/src/ecal_rec_impl.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2024 Continental Corporation
* Copyright (C) 2016 - 2025 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -115,22 +115,22 @@ namespace eCAL

std::set<std::string> GetSubscribedTopics() const;

void EcalMessageReceived(const eCAL::Registration::STopicId& topic_id_, const eCAL::SReceiveCallbackData& data_);
void EcalMessageReceived(const Registration::STopicId& topic_id_, const SDataTypeInformation& datatype_info_, const SReceiveCallbackData& callback_data_);

//////////////////////////////////////
//// API for external threads ////
//////////////////////////////////////
void GarbageCollect();

void SetTopicInfo(const std::map<std::string, TopicInfo>& topic_info_map);
void SetTopicInfo(const TopicInfoMap& topic_info_map);

//////////////////////////////////////////////////////////////////////////////
//// Private functions ////
//////////////////////////////////////////////////////////////////////////////
private:
void UpdateAndCleanSubscribers();

std::set<std::string> FilterAvailableTopics_NoLock(const std::map<std::string, TopicInfo>& topic_info_map) const;
std::set<std::string> FilterAvailableTopics_NoLock(const TopicInfoMap& topic_info_map) const;

void CreateNewSubscribers_NoLock(const std::set<std::string>& topic_set);
void RemoveOldSubscribers_NoLock(const std::set<std::string>& topic_set);
Expand Down
18 changes: 9 additions & 9 deletions app/rec/rec_client_core/src/frame.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
* Copyright (C) 2016 - 2024 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -32,16 +32,16 @@ namespace eCAL
class Frame
{
public:
Frame(const eCAL::SReceiveCallbackData* const callback_data, const std::string& topic_name, const eCAL::Time::ecal_clock::time_point receive_time, std::chrono::steady_clock::time_point system_receive_time)
: ecal_publish_time_(std::chrono::duration_cast<eCAL::Time::ecal_clock::duration>(std::chrono::microseconds(callback_data->time)))
Frame(const eCAL::SReceiveCallbackData& callback_data, const Registration::STopicId& topic, const eCAL::Time::ecal_clock::time_point receive_time, std::chrono::steady_clock::time_point system_receive_time)
: ecal_publish_time_(std::chrono::duration_cast<eCAL::Time::ecal_clock::duration>(std::chrono::microseconds(callback_data.time)))
, ecal_receive_time_(receive_time)
, system_receive_time_(system_receive_time)
, topic_name_(topic_name)
, clock_(callback_data->clock)
, id_(callback_data->id)
, topic_(topic)
, clock_(callback_data.clock)
, id_(callback_data.id)
{
data_.reserve(callback_data->size);
data_.assign((char*)callback_data->buf, (char*)callback_data->buf + callback_data->size);
data_.reserve(callback_data.size);
data_.assign((char*)callback_data.buf, (char*)callback_data.buf + callback_data.size);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: do not use C-style cast to convert between unrelated types [cppcoreguidelines-pro-type-cstyle-cast]

        data_.assign((char*)callback_data.buf, (char*)callback_data.buf + callback_data.size);
                                               ^

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: do not use C-style cast to convert between unrelated types [cppcoreguidelines-pro-type-cstyle-cast]

        data_.assign((char*)callback_data.buf, (char*)callback_data.buf + callback_data.size);
                     ^

}

Frame()
Expand All @@ -57,7 +57,7 @@ namespace eCAL
eCAL::Time::ecal_clock::time_point ecal_publish_time_;
eCAL::Time::ecal_clock::time_point ecal_receive_time_;
std::chrono::steady_clock::time_point system_receive_time_;
std::string topic_name_;
Registration::STopicId topic_;
long long clock_;
long long id_;
};
Expand Down
34 changes: 20 additions & 14 deletions app/rec/rec_client_core/src/job/hdf5_writer_thread.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2024 Continental Corporation
* Copyright (C) 2016 - 2025 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -32,7 +32,7 @@ namespace eCAL
// Constructor & Destructor
///////////////////////////////

Hdf5WriterThread::Hdf5WriterThread(const JobConfig& job_config, const std::map<std::string, TopicInfo>& initial_topic_info_map, const std::deque<std::shared_ptr<Frame>>& initial_frame_buffer)
Hdf5WriterThread::Hdf5WriterThread(const JobConfig& job_config, const TopicInfoMap& initial_topic_info_map, const std::deque<std::shared_ptr<Frame>>& initial_frame_buffer)
: InterruptibleThread ()
, job_config_ (job_config)
, frame_buffer_ (initial_frame_buffer)
Expand All @@ -41,7 +41,7 @@ namespace eCAL
, new_topic_info_map_available_(true)
, flushing_ (false)
{
hdf5_writer_ = std::make_unique<eCAL::eh5::v2::HDF5Meas>();
hdf5_writer_ = std::make_unique<eCAL::eh5::HDF5Meas>();
}

Hdf5WriterThread::~Hdf5WriterThread()
Expand Down Expand Up @@ -82,7 +82,7 @@ namespace eCAL
}
}

void Hdf5WriterThread::SetTopicInfo(std::map<std::string, TopicInfo> topic_info_map)
void Hdf5WriterThread::SetTopicInfo(TopicInfoMap topic_info_map)
{
std::unique_lock<decltype(input_mutex_)> input_lock(input_mutex_);

Expand Down Expand Up @@ -128,7 +128,7 @@ namespace eCAL

// Topic info to write to the HDF5 file
bool set_topic_info_map = false;
std::map<std::string, TopicInfo> topic_info_map_to_set;
TopicInfoMap topic_info_map_to_set;

{
// Lock the input mutex
Expand Down Expand Up @@ -168,8 +168,10 @@ namespace eCAL

for (const auto& topic : topic_info_map_to_set)
{
// convert from eCAL core types to eCAL measurement types before writing to the measurement.
eh5::SChannel channel{ topic.first.topic_name, topic.first.topic_id.entity_id };
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: variable 'channel' of type 'eh5::SChannel' (aka 'eCAL::experimental::measurement::base::Channel') can be declared 'const' [misc-const-correctness]

Suggested change
eh5::SChannel channel{ topic.first.topic_name, topic.first.topic_id.entity_id };
eh5::SChannel const channel{ topic.first.topic_name, topic.first.topic_id.entity_id };

eCAL::experimental::measurement::base::DataTypeInformation const topic_info{ topic.second.tinfo_.name, topic.second.tinfo_.encoding, topic.second.tinfo_.descriptor };
hdf5_writer_->SetChannelDataTypeInformation(topic.first, topic_info);
hdf5_writer_->SetChannelDataTypeInformation(channel, topic_info);
}
}
else if (frame)
Expand All @@ -180,14 +182,18 @@ namespace eCAL
break;

// Write Frame element to HDF5
eh5::SWriteEntry entry;
// in hdf5, ids are integers, however SEntityIds are not, so we need to convert.
entry.channel = eh5::SChannel(frame->topic_.topic_name, frame->topic_.topic_id.entity_id);
entry.data = frame->data_.data();
entry.size = frame->data_.size();
entry.snd_timestamp = std::chrono::duration_cast<std::chrono::microseconds>(frame->ecal_publish_time_.time_since_epoch()).count();
entry.rcv_timestamp = std::chrono::duration_cast<std::chrono::microseconds>(frame->ecal_receive_time_.time_since_epoch()).count();
entry.sender_id = frame->id_;
entry.clock = frame->clock_;

if (!hdf5_writer_->AddEntryToFile(
frame->data_.data(),
frame->data_.size(),
std::chrono::duration_cast<std::chrono::microseconds>(frame->ecal_publish_time_.time_since_epoch()).count(),
std::chrono::duration_cast<std::chrono::microseconds>(frame->ecal_receive_time_.time_since_epoch()).count(),
frame->topic_name_,
frame->id_,
frame->clock_
entry
))
{
last_status_.info_ = { false, "Error adding frame to measurement" };
Expand Down Expand Up @@ -264,7 +270,7 @@ namespace eCAL
#endif // NDEBUG
std::unique_lock<decltype(hdf5_writer_mutex_)> hdf5_writer_lock(hdf5_writer_mutex_);

if (hdf5_writer_->Open(hdf5_dir, eCAL::eh5::v2::eAccessType::CREATE))
if (hdf5_writer_->Open(hdf5_dir, eCAL::eh5::eAccessType::CREATE))
{
#ifndef NDEBUG
EcalRecLogger::Instance()->debug("Hdf5WriterThread::Open(): Successfully opened HDF5-Writer with path \"" + hdf5_dir + "\"");
Expand Down
12 changes: 6 additions & 6 deletions app/rec/rec_client_core/src/job/hdf5_writer_thread.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2024 Continental Corporation
* Copyright (C) 2016 - 2025 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -41,7 +41,7 @@ namespace eCAL
// Constructor & Destructor
///////////////////////////////
public:
Hdf5WriterThread(const JobConfig& job_config, const std::map<std::string, TopicInfo>& initial_topic_info_map = {}, const std::deque<std::shared_ptr<Frame>>& initial_frame_buffer = {});
Hdf5WriterThread(const JobConfig& job_config, const TopicInfoMap& initial_topic_info_map = {}, const std::deque<std::shared_ptr<Frame>>& initial_frame_buffer = {});

~Hdf5WriterThread();

Expand All @@ -53,7 +53,7 @@ namespace eCAL

bool AddFrame(const std::shared_ptr<Frame>& frame);

void SetTopicInfo(std::map<std::string, TopicInfo> topic_info_map); // CALL BY VALUE (-> copy) IS INTENDED!
void SetTopicInfo(TopicInfoMap topic_info_map); // CALL BY VALUE (-> copy) IS INTENDED!

void Flush();

Expand Down Expand Up @@ -89,12 +89,12 @@ namespace eCAL
size_t written_frames_;
std::chrono::steady_clock::time_point first_written_frame_timestamp_;
std::chrono::steady_clock::time_point last_written_frame_timestamp_;
std::map<std::string, TopicInfo> new_topic_info_map_; /**< The new topic info map that shall be set to the HDF5 writer */
TopicInfoMap new_topic_info_map_; /**< The new topic info map that shall be set to the HDF5 writer */
bool new_topic_info_map_available_; /**< Telling that a new topic info map has been set from the outside. */
mutable RecHdf5JobStatus last_status_;

mutable std::mutex hdf5_writer_mutex_;
std::unique_ptr<eCAL::eh5::v2::HDF5Meas> hdf5_writer_;
mutable std::mutex hdf5_writer_mutex_;
std::unique_ptr<eCAL::eh5::HDF5Meas> hdf5_writer_;


std::atomic<bool> flushing_;
Expand Down
8 changes: 4 additions & 4 deletions app/rec/rec_client_core/src/job/record_job.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
* Copyright (C) 2016 - 2025 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -283,7 +283,7 @@ namespace eCAL
return true;
}

bool RecordJob::StartRecording(const std::map<std::string, TopicInfo>& initial_topic_info_map, const std::deque<std::shared_ptr<Frame>>& initial_frame_buffer)
bool RecordJob::StartRecording(const TopicInfoMap& initial_topic_info_map, const std::deque<std::shared_ptr<Frame>>& initial_frame_buffer)
{
std::unique_lock<std::shared_timed_mutex> lock(job_mutex_);

Expand Down Expand Up @@ -317,7 +317,7 @@ namespace eCAL
}


bool RecordJob::SaveBuffer(const std::map<std::string, TopicInfo>& topic_info_map, const std::deque<std::shared_ptr<Frame>>& frame_buffer)
bool RecordJob::SaveBuffer(const TopicInfoMap& topic_info_map, const std::deque<std::shared_ptr<Frame>>& frame_buffer)
{
std::unique_lock<std::shared_timed_mutex> lock(job_mutex_);

Expand All @@ -344,7 +344,7 @@ namespace eCAL
return hdf5_writer_thread_->AddFrame(frame);
}

void RecordJob::SetTopicInfo(const std::map<std::string, TopicInfo>& topic_info_map)
void RecordJob::SetTopicInfo(const TopicInfoMap& topic_info_map)
{
std::shared_lock<std::shared_timed_mutex> lock(job_mutex_);
if ((main_recorder_state_ != JobState::Recording) || !hdf5_writer_thread_)
Expand Down
8 changes: 4 additions & 4 deletions app/rec/rec_client_core/src/job/record_job.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
* Copyright (C) 2016 - 2025 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -64,12 +64,12 @@ namespace eCAL
///////////////////////////////////////////////
public:
bool InitializeMeasurementDirectory();
bool StartRecording(const std::map<std::string, TopicInfo>& initial_topic_info_map, const std::deque<std::shared_ptr<Frame>>& initial_frame_buffer);
bool StartRecording(const TopicInfoMap& initial_topic_info_map, const std::deque<std::shared_ptr<Frame>>& initial_frame_buffer);
bool StopRecording ();
bool SaveBuffer (const std::map<std::string, TopicInfo>& topic_info_map, const std::deque<std::shared_ptr<Frame>>& frame_buffer);
bool SaveBuffer (const TopicInfoMap& topic_info_map, const std::deque<std::shared_ptr<Frame>>& frame_buffer);

bool AddFrame(const std::shared_ptr<Frame>& frame);
void SetTopicInfo(const std::map<std::string, TopicInfo>& topic_info_map);
void SetTopicInfo(const TopicInfoMap& topic_info_map);

eCAL::rec::Error Upload(const UploadConfig& upload_config);

Expand Down
Loading
Loading