Skip to content

Commit

Permalink
[core] Rework SReceiveCallbackData. (#1982)
Browse files Browse the repository at this point in the history
split v5 and v6 SReceiveCallbackData.
Rename v6 SReceiveCallbackData fields / adapt types.
  • Loading branch information
KerstinKeller authored Jan 31, 2025
1 parent 8aca1e2 commit d9faad3
Show file tree
Hide file tree
Showing 32 changed files with 136 additions and 118 deletions.
4 changes: 2 additions & 2 deletions app/mon/mon_plugins/raw_data_reflection/src/plugin_widget.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ PluginWidget::~PluginWidget()
void PluginWidget::ecalMessageReceivedCallback(const eCAL::SReceiveCallbackData& callback_data)
{
std::lock_guard<std::mutex> message_lock(message_mutex_);
last_message_ = QByteArray(static_cast<char*>(callback_data.buf), callback_data.size);
last_message_ = QByteArray(static_cast<const char*>(callback_data.buffer), callback_data.buffer_size);

last_message_publish_timestamp_ = eCAL::Time::ecal_clock::time_point(std::chrono::microseconds(callback_data.time));
last_message_publish_timestamp_ = eCAL::Time::ecal_clock::time_point(std::chrono::microseconds(callback_data.send_timestamp));

received_message_counter_++;
new_msg_available_ = true;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2024 Continental Corporation
* Copyright (C) 2016 - 2025 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -40,8 +40,8 @@ class RawMessageVisualizationViewModel : public MessageVisualizationViewModel
{
{
std::lock_guard<std::mutex> lock{message_mtx};
latest_message = std::string(static_cast<char *>(callback_data.buf), callback_data.size);
message_timestamp = callback_data.time;
latest_message = std::string(static_cast<const char *>(callback_data.buffer), callback_data.buffer_size);
message_timestamp = callback_data.send_timestamp;
}

NotifyDataUpdated();
Expand Down
10 changes: 5 additions & 5 deletions app/rec/rec_client_core/src/frame.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@ namespace eCAL
{
public:
Frame(const eCAL::SReceiveCallbackData* const callback_data, const std::string& topic_name, const eCAL::Time::ecal_clock::time_point receive_time, std::chrono::steady_clock::time_point system_receive_time)
: ecal_publish_time_(std::chrono::duration_cast<eCAL::Time::ecal_clock::duration>(std::chrono::microseconds(callback_data->time)))
: ecal_publish_time_(std::chrono::duration_cast<eCAL::Time::ecal_clock::duration>(std::chrono::microseconds(callback_data->send_timestamp)))
, ecal_receive_time_(receive_time)
, system_receive_time_(system_receive_time)
, topic_name_(topic_name)
, clock_(callback_data->clock)
, id_(callback_data->id)
, clock_(callback_data->send_clock)
, id_(0) // TODO: We don't receive ids any more. We shoud probably adapt the frame class here.
{
data_.reserve(callback_data->size);
data_.assign((char*)callback_data->buf, (char*)callback_data->buf + callback_data->size);
data_.reserve(callback_data->buffer_size);
data_.assign((char*)callback_data->buffer, (char*)callback_data->buffer + callback_data->buffer_size);
}

Frame()
Expand Down
74 changes: 38 additions & 36 deletions ecal/core/include/ecal/pubsub/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,13 @@

namespace eCAL
{
/**
* @brief A struct which uniquely identifies anybody producing or consuming topics, e.g. a CPublisher or a CSubscriber.
**/
struct STopicId
{
SEntityId topic_id;
std::string topic_name;
SEntityId topic_id; //!< The unique id of the topic
std::string topic_name; //!< The topics name (on which matching is performed in the pub/sub case)

bool operator==(const STopicId& other) const
{
Expand All @@ -61,64 +64,63 @@ namespace eCAL
**/
struct SReceiveCallbackData
{
void* buf = nullptr; //!< payload buffer
long size = 0; //!< payload buffer size
long long id = 0; //!< publisher id (SetId())
long long time = 0; //!< publisher send time in µs
long long clock = 0; //!< publisher send clock
const void* buffer = nullptr; //!< payload buffer, containing the sent data
size_t buffer_size = 0; //!< payload buffer size
int64_t send_timestamp = 0; //!< publisher send timestamp in µs
int64_t send_clock = 0; //!< publisher send clock. Each publisher increases the counter by one, every time a message is sent. It can be used to detect message drops.
};

/**
* @brief eCAL subscriber event callback type.
**/
enum class eSubscriberEvent
* @brief eCAL publisher event callback type.
**/
enum class ePublisherEvent
{
none = 0,
connected = 1,
disconnected = 2,
dropped = 3
connected = 1, //!< a new subscriber has been connected to the publisher
disconnected = 2, //!< a previously connected subscriber has been disconnected from this publisher
dropped = 3 //!< some subscriber has missed a message that was sent by this publisher
};

inline std::string to_string(eSubscriberEvent event_) {
inline std::string to_string(ePublisherEvent event_) {
switch (event_) {
case eSubscriberEvent::none: return "NONE";
case eSubscriberEvent::connected: return "CONNECTED";
case eSubscriberEvent::disconnected: return "DISCONNECTED";
case eSubscriberEvent::dropped: return "DROPPED";
default: return "Unknown";
case ePublisherEvent::none: return "NONE";
case ePublisherEvent::connected: return "CONNECTED";
case ePublisherEvent::disconnected: return "DISCONNECTED";
case ePublisherEvent::dropped: return "DROPPED";
default: return "Unknown";
}
}

/**
* @brief eCAL publisher event callback type.
**/
enum class ePublisherEvent
* @brief eCAL subscriber event callback type.
**/
enum class eSubscriberEvent
{
none = 0,
connected = 1,
disconnected = 2,
dropped = 3
connected = 1, //!< a new publisher has been connected to the subscriber
disconnected = 2, //!< a previously connected publisher has been disconnected from this subscriber
dropped = 3 //!< a message coming from a publisher has been dropped, e.g. the subscriber has missed it
};

inline std::string to_string(ePublisherEvent event_) {
inline std::string to_string(eSubscriberEvent event_) {
switch (event_) {
case ePublisherEvent::none: return "NONE";
case ePublisherEvent::connected: return "CONNECTED";
case ePublisherEvent::disconnected: return "DISCONNECTED";
case ePublisherEvent::dropped: return "DROPPED";
default: return "Unknown";
case eSubscriberEvent::none: return "NONE";
case eSubscriberEvent::connected: return "CONNECTED";
case eSubscriberEvent::disconnected: return "DISCONNECTED";
case eSubscriberEvent::dropped: return "DROPPED";
default: return "Unknown";
}
}

/**
* @brief Receive callback function type with topic id and data struct. The topic id contains the topic name, the process
* name, the host name and a uniques topic identifier.
* @brief Receive callback function type. A user can register this callback type with a subscriber, and this callback will be triggered when the user receives any data.
*
* @param topic_id_ The topic id struct of the received message.
* @param data_type_info_ Topic data type information (encoding, type, descriptor).
* @param publisher_id_ The topic id of the publisher that has sent the data which is now being received.
* @param data_type_info_ Topic metadata, as set by the publisher (encoding, type, descriptor).
* This can be used to validate that the received data can be properly interpreted by the subscriber.
* @param data_ Data struct containing payload, timestamp and publication clock.
**/
using ReceiveCallbackT = std::function<void(const STopicId& topic_id_, const SDataTypeInformation& data_type_info_, const SReceiveCallbackData& data_)>;
using ReceiveCallbackT = std::function<void(const STopicId& publisher_id_, const SDataTypeInformation& data_type_info_, const SReceiveCallbackData& data_)>;

/**
* @brief eCAL publisher event callback struct.
Expand Down
15 changes: 14 additions & 1 deletion ecal/core/include/ecal/v5/ecal_callback.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,19 @@ namespace eCAL
{
ECAL_CORE_NAMESPACE_V5
{
/**
* @brief eCAL subscriber receive callback struct.
**/
struct SReceiveCallbackData
{
void* buf = nullptr; //!< payload buffer
long size = 0; //!< payload buffer size
long long id = 0; //!< publisher id (SetId())
long long time = 0; //!< publisher send time in µs
long long clock = 0; //!< publisher send clock
};


/**
* @brief eCAL publisher event callback struct.
**/
Expand Down Expand Up @@ -162,7 +175,7 @@ namespace eCAL
* @param topic_name_ The topic name of the received message.
* @param data_ Data struct containing payload, timestamp and publication clock.
**/
using ReceiveCallbackT = std::function<void(const char* topic_name_, const struct SReceiveCallbackData* data_)>;
using ReceiveCallbackT = std::function<void(const char* topic_name_, const struct v5::SReceiveCallbackData* data_)>;

/**
* @brief Publisher event callback function type. (deprecated)
Expand Down
10 changes: 5 additions & 5 deletions ecal/core/src/pubsub/ecal_subscriber_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,7 @@ namespace eCAL
while (m_sample_hash_queue.size() > hash_queue_size) m_sample_hash_queue.pop_front();

// check id
// TODO: not sure if this is needed / necessary.
if (!m_id_set.empty())
{
if (m_id_set.find(id_) == m_id_set.end()) return(0);
Expand Down Expand Up @@ -532,11 +533,10 @@ namespace eCAL
#endif
// prepare data struct
SReceiveCallbackData cb_data;
cb_data.buf = const_cast<char*>(payload_);
cb_data.size = long(size_);
cb_data.id = id_;
cb_data.time = time_;
cb_data.clock = clock_;
cb_data.buffer = static_cast<const void*>(payload_);
cb_data.buffer_size = size_;
cb_data.send_timestamp = time_;
cb_data.send_clock = clock_;

STopicId topic_id;
topic_id.topic_name = topic_info_.topic_name;
Expand Down
11 changes: 9 additions & 2 deletions ecal/core/src/v5/pubsub/ecal_subscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,16 @@ namespace eCAL

bool CSubscriber::AddReceiveCallback(ReceiveCallbackT callback_)
{
auto v6_callback = [callback_](const STopicId& topic_id_, const SDataTypeInformation&, const eCAL::SReceiveCallbackData& data_)
auto v6_callback = [callback_](const STopicId& topic_id_, const SDataTypeInformation&, const eCAL::SReceiveCallbackData& v6_callback_data)
{
callback_(topic_id_.topic_name.c_str(), &data_);
eCAL::v5::SReceiveCallbackData v5_callback_data;
// we all know the const_cast is evil, however, the old api didn't define the buffer as const.
v5_callback_data.buf = const_cast<void*>(v6_callback_data.buffer);
v5_callback_data.size = static_cast<long>(v6_callback_data.buffer_size);
v5_callback_data.id = 0; // v6 callbacks do not communicate this data any more, hence it is always set to 0.
v5_callback_data.time = v6_callback_data.send_timestamp;
v5_callback_data.clock = v6_callback_data.send_clock;
callback_(topic_id_.topic_name.c_str(), &v5_callback_data);
};
return AddReceiveCallback(v6_callback);
}
Expand Down
4 changes: 2 additions & 2 deletions ecal/samples/cpp/benchmarks/counter_rec/src/counter_rec.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2024 Continental Corporation
* Copyright (C) 2016 - 2025 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -37,7 +37,7 @@ int main()

// add callback
auto on_receive = [&](const struct eCAL::SReceiveCallbackData& data_) {
long long const clock = reinterpret_cast<long long*>(data_.buf)[0];
long long const clock = static_cast<const long long*>(data_.buffer)[0];
if(g_first_clock < 0)
{
g_first_clock = clock;
Expand Down
4 changes: 2 additions & 2 deletions ecal/samples/cpp/benchmarks/datarate_rec/src/datarate_rec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ int main(int argc, char** argv)
std::vector<char> rec_buffer;
auto on_receive = [&](const struct eCAL::SReceiveCallbackData& data_) {
// make a memory copy to emulate user action
rec_buffer.reserve(data_.size);
std::memcpy(rec_buffer.data(), data_.buf, data_.size);
rec_buffer.reserve(data_.buffer_size);
std::memcpy(rec_buffer.data(), data_.buffer, data_.buffer_size);
};
sub.SetReceiveCallback(std::bind(on_receive, std::placeholders::_3));

Expand Down
6 changes: 3 additions & 3 deletions ecal/samples/cpp/benchmarks/latency_rec/src/latency_rec.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2024 Continental Corporation
* Copyright (C) 2016 - 2025 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -48,8 +48,8 @@ void on_receive(const struct eCAL::SReceiveCallbackData& data_, SCallbackPar* pa

// update latency, size and msg number
const std::lock_guard<std::mutex> lock(par_->mtx);
par_->latency_array.push_back(rec_time - data_.time);
par_->rec_size = data_.size;
par_->latency_array.push_back(rec_time - data_.send_timestamp);
par_->rec_size = data_.buffer_size;
par_->msg_num++;
// delay callback
if(delay_ > 0) std::this_thread::sleep_for(std::chrono::milliseconds(delay_));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ void PrintStatistic(const std::string& topic_name_, const std::chrono::duration<
{
std::stringstream out;
out << "Topic Name: " << topic_name_ << std::endl;
if (data_.size > 15)
if (data_.buffer_size > 15)
{
out << "Message [0 - 15]: ";
for (auto i = 0; i < 16; ++i) out << (static_cast<char*>(data_.buf))[i] << " ";
for (auto i = 0; i < 16; ++i) out << (static_cast<const char*>(data_.buffer))[i] << " ";
out << std::endl;
}
out << "Message size (kByte): " << (unsigned int)(size_ / 1024.0) << std::endl;
Expand Down Expand Up @@ -63,7 +63,7 @@ int main()

// add callback
auto on_receive = [&](const eCAL::STopicId& topic_id_, const eCAL::SReceiveCallbackData & data_) {
auto size = data_.size;
auto size = data_.buffer_size;

msgs++;
bytes += size;
Expand Down
6 changes: 3 additions & 3 deletions ecal/samples/cpp/benchmarks/perftool/src/subscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ void Subscriber::callback(const eCAL::SReceiveCallbackData& data_)
SubscribedMessage message_info;
message_info.local_receive_time = std::chrono::steady_clock::now();
message_info.ecal_receive_time = eCAL::Time::ecal_clock::now();
message_info.ecal_send_time = eCAL::Time::ecal_clock::time_point(std::chrono::microseconds(data_.time));
message_info.ecal_counter = data_.clock;
message_info.size_bytes = data_.size;
message_info.ecal_send_time = eCAL::Time::ecal_clock::time_point(std::chrono::microseconds(data_.send_timestamp));
message_info.ecal_counter = data_.send_clock;
message_info.size_bytes = data_.buffer_size;

std::chrono::steady_clock::duration time_to_waste_this_iteration(time_to_waste_);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2024 Continental Corporation
* Copyright (C) 2016 - 2025 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -69,7 +69,7 @@ void throughput_test(int snd_size, int snd_loops, eCAL::TransportLayer::eType la
// add callback
std::atomic<size_t> received_bytes;
auto on_receive = [&](const struct eCAL::SReceiveCallbackData& data_) {
received_bytes += data_.size;
received_bytes += data_.buffer_size;
};
sub.SetReceiveCallback(std::bind(on_receive, std::placeholders::_3));

Expand Down
11 changes: 5 additions & 6 deletions ecal/samples/cpp/pubsub/binary/binary_rec/src/binary_rec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,15 @@
// subscriber callback function
void OnReceive(const eCAL::STopicId& /*topic_id_*/, const eCAL::SDataTypeInformation& /*data_type_info_*/, const eCAL::SReceiveCallbackData& data_)
{
if (data_.size < 1) return;
if (data_.buffer_size < 1) return;

int content(static_cast<int>(static_cast<unsigned char*>(data_.buf)[0]));
int content(static_cast<int>(static_cast<const unsigned char*>(data_.buffer)[0]));
std::cout << "----------------------------------------------" << std::endl;
std::cout << " Received binary buffer " << content << std::endl;
std::cout << "----------------------------------------------" << std::endl;
std::cout << " Size : " << data_.size << std::endl;
std::cout << " Id : " << data_.id << std::endl;
std::cout << " Time : " << data_.time << std::endl;
std::cout << " Clock : " << data_.clock << std::endl;
std::cout << " Size : " << data_.buffer_size << std::endl;
std::cout << " Time : " << data_.send_timestamp << std::endl;
std::cout << " Clock : " << data_.send_clock << std::endl;
std::cout << std::endl;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,20 +56,19 @@ std::ostream& operator<<(std::ostream& os, const SSimpleStruct& s)
// subscriber callback function
void OnReceive(const eCAL::STopicId& /*topic_id_*/, const eCAL::SDataTypeInformation& /*data_type_info_*/, const eCAL::SReceiveCallbackData& data_)
{
if (data_.size < 1) return;
if (data_.buffer_size < 1) return;

std::cout << "------------------------------------" << std::endl;
std::cout << "Binary buffer header :" << std::endl;
std::cout << "------------------------------------" << std::endl;
std::cout << " Size : " << data_.size << std::endl;
std::cout << " Id : " << data_.id << std::endl;
std::cout << " Time : " << data_.time << std::endl;
std::cout << " Clock : " << data_.clock << std::endl;
std::cout << " Size : " << data_.buffer_size << std::endl;
std::cout << " Time : " << data_.send_timestamp << std::endl;
std::cout << " Clock : " << data_.send_clock << std::endl;
std::cout << std::endl;
std::cout << "------------------------------------" << std::endl;
std::cout << "SSimpleStruct :" << std::endl;
std::cout << "------------------------------------" << std::endl;
std::cout << *static_cast<SSimpleStruct*>(data_.buf) << std::endl;
std::cout << *static_cast<const SSimpleStruct*>(data_.buffer) << std::endl;
}

int main()
Expand Down
2 changes: 1 addition & 1 deletion ecal/tests/cpp/pubsub_test/src/pubsub_acknowledge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ TEST(core_cpp_pubsub, TimeoutAcknowledgment)
auto sub1 = std::make_shared< eCAL::CSubscriber>("topic");
auto sleeper_variable_time = [](const eCAL::STopicId& /*topic_id_*/, const eCAL::SDataTypeInformation& /*data_type_info_*/, const eCAL::SReceiveCallbackData& data_)
{
std::string const sleep_time((const char*)data_.buf, data_.size);
std::string const sleep_time((const char*)data_.buffer, data_.buffer_size);
int const sleep = std::stoi(sleep_time);
std::this_thread::sleep_for(std::chrono::milliseconds(sleep));
};
Expand Down
Loading

0 comments on commit d9faad3

Please sign in to comment.