diff --git a/app/mon/mon_plugins/raw_data_reflection/src/plugin_widget.cpp b/app/mon/mon_plugins/raw_data_reflection/src/plugin_widget.cpp index 523db09232..9b843a490b 100644 --- a/app/mon/mon_plugins/raw_data_reflection/src/plugin_widget.cpp +++ b/app/mon/mon_plugins/raw_data_reflection/src/plugin_widget.cpp @@ -75,9 +75,9 @@ PluginWidget::~PluginWidget() void PluginWidget::ecalMessageReceivedCallback(const eCAL::SReceiveCallbackData& callback_data) { std::lock_guard message_lock(message_mutex_); - last_message_ = QByteArray(static_cast(callback_data.buf), callback_data.size); + last_message_ = QByteArray(static_cast(callback_data.buffer), callback_data.buffer_size); - last_message_publish_timestamp_ = eCAL::Time::ecal_clock::time_point(std::chrono::microseconds(callback_data.time)); + last_message_publish_timestamp_ = eCAL::Time::ecal_clock::time_point(std::chrono::microseconds(callback_data.send_timestamp)); received_message_counter_++; new_msg_available_ = true; diff --git a/app/mon/mon_tui/src/tui/viewmodel/message_visualization/raw.hpp b/app/mon/mon_tui/src/tui/viewmodel/message_visualization/raw.hpp index c6879e27b5..3b1b5adcd1 100644 --- a/app/mon/mon_tui/src/tui/viewmodel/message_visualization/raw.hpp +++ b/app/mon/mon_tui/src/tui/viewmodel/message_visualization/raw.hpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2024 Continental Corporation + * Copyright (C) 2016 - 2025 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -40,8 +40,8 @@ class RawMessageVisualizationViewModel : public MessageVisualizationViewModel { { std::lock_guard lock{message_mtx}; - latest_message = std::string(static_cast(callback_data.buf), callback_data.size); - message_timestamp = callback_data.time; + latest_message = std::string(static_cast(callback_data.buffer), callback_data.buffer_size); + message_timestamp = callback_data.send_timestamp; } NotifyDataUpdated(); diff --git a/app/rec/rec_client_core/src/frame.h b/app/rec/rec_client_core/src/frame.h index 28dac992d7..d7f1862722 100644 --- a/app/rec/rec_client_core/src/frame.h +++ b/app/rec/rec_client_core/src/frame.h @@ -33,15 +33,15 @@ namespace eCAL { public: Frame(const eCAL::SReceiveCallbackData* const callback_data, const std::string& topic_name, const eCAL::Time::ecal_clock::time_point receive_time, std::chrono::steady_clock::time_point system_receive_time) - : ecal_publish_time_(std::chrono::duration_cast(std::chrono::microseconds(callback_data->time))) + : ecal_publish_time_(std::chrono::duration_cast(std::chrono::microseconds(callback_data->send_timestamp))) , ecal_receive_time_(receive_time) , system_receive_time_(system_receive_time) , topic_name_(topic_name) - , clock_(callback_data->clock) - , id_(callback_data->id) + , clock_(callback_data->send_clock) + , id_(0) // TODO: We don't receive ids any more. We shoud probably adapt the frame class here. { - data_.reserve(callback_data->size); - data_.assign((char*)callback_data->buf, (char*)callback_data->buf + callback_data->size); + data_.reserve(callback_data->buffer_size); + data_.assign((char*)callback_data->buffer, (char*)callback_data->buffer + callback_data->buffer_size); } Frame() diff --git a/ecal/core/include/ecal/pubsub/types.h b/ecal/core/include/ecal/pubsub/types.h index b735671d22..d0d0061b99 100644 --- a/ecal/core/include/ecal/pubsub/types.h +++ b/ecal/core/include/ecal/pubsub/types.h @@ -33,10 +33,13 @@ namespace eCAL { + /** + * @brief A struct which uniquely identifies anybody producing or consuming topics, e.g. a CPublisher or a CSubscriber. + **/ struct STopicId { - SEntityId topic_id; - std::string topic_name; + SEntityId topic_id; //!< The unique id of the topic + std::string topic_name; //!< The topics name (on which matching is performed in the pub/sub case) bool operator==(const STopicId& other) const { @@ -61,64 +64,63 @@ namespace eCAL **/ struct SReceiveCallbackData { - void* buf = nullptr; //!< payload buffer - long size = 0; //!< payload buffer size - long long id = 0; //!< publisher id (SetId()) - long long time = 0; //!< publisher send time in µs - long long clock = 0; //!< publisher send clock + const void* buffer = nullptr; //!< payload buffer, containing the sent data + size_t buffer_size = 0; //!< payload buffer size + int64_t send_timestamp = 0; //!< publisher send timestamp in µs + int64_t send_clock = 0; //!< publisher send clock. Each publisher increases the counter by one, every time a message is sent. It can be used to detect message drops. }; /** - * @brief eCAL subscriber event callback type. -**/ - enum class eSubscriberEvent + * @brief eCAL publisher event callback type. + **/ + enum class ePublisherEvent { none = 0, - connected = 1, - disconnected = 2, - dropped = 3 + connected = 1, //!< a new subscriber has been connected to the publisher + disconnected = 2, //!< a previously connected subscriber has been disconnected from this publisher + dropped = 3 //!< some subscriber has missed a message that was sent by this publisher }; - inline std::string to_string(eSubscriberEvent event_) { + inline std::string to_string(ePublisherEvent event_) { switch (event_) { - case eSubscriberEvent::none: return "NONE"; - case eSubscriberEvent::connected: return "CONNECTED"; - case eSubscriberEvent::disconnected: return "DISCONNECTED"; - case eSubscriberEvent::dropped: return "DROPPED"; - default: return "Unknown"; + case ePublisherEvent::none: return "NONE"; + case ePublisherEvent::connected: return "CONNECTED"; + case ePublisherEvent::disconnected: return "DISCONNECTED"; + case ePublisherEvent::dropped: return "DROPPED"; + default: return "Unknown"; } } /** - * @brief eCAL publisher event callback type. - **/ - enum class ePublisherEvent + * @brief eCAL subscriber event callback type. +**/ + enum class eSubscriberEvent { none = 0, - connected = 1, - disconnected = 2, - dropped = 3 + connected = 1, //!< a new publisher has been connected to the subscriber + disconnected = 2, //!< a previously connected publisher has been disconnected from this subscriber + dropped = 3 //!< a message coming from a publisher has been dropped, e.g. the subscriber has missed it }; - inline std::string to_string(ePublisherEvent event_) { + inline std::string to_string(eSubscriberEvent event_) { switch (event_) { - case ePublisherEvent::none: return "NONE"; - case ePublisherEvent::connected: return "CONNECTED"; - case ePublisherEvent::disconnected: return "DISCONNECTED"; - case ePublisherEvent::dropped: return "DROPPED"; - default: return "Unknown"; + case eSubscriberEvent::none: return "NONE"; + case eSubscriberEvent::connected: return "CONNECTED"; + case eSubscriberEvent::disconnected: return "DISCONNECTED"; + case eSubscriberEvent::dropped: return "DROPPED"; + default: return "Unknown"; } } /** - * @brief Receive callback function type with topic id and data struct. The topic id contains the topic name, the process - * name, the host name and a uniques topic identifier. + * @brief Receive callback function type. A user can register this callback type with a subscriber, and this callback will be triggered when the user receives any data. * - * @param topic_id_ The topic id struct of the received message. - * @param data_type_info_ Topic data type information (encoding, type, descriptor). + * @param publisher_id_ The topic id of the publisher that has sent the data which is now being received. + * @param data_type_info_ Topic metadata, as set by the publisher (encoding, type, descriptor). + * This can be used to validate that the received data can be properly interpreted by the subscriber. * @param data_ Data struct containing payload, timestamp and publication clock. **/ - using ReceiveCallbackT = std::function; + using ReceiveCallbackT = std::function; /** * @brief eCAL publisher event callback struct. diff --git a/ecal/core/include/ecal/v5/ecal_callback.h b/ecal/core/include/ecal/v5/ecal_callback.h index fb39a97fcc..36bcccfa43 100644 --- a/ecal/core/include/ecal/v5/ecal_callback.h +++ b/ecal/core/include/ecal/v5/ecal_callback.h @@ -37,6 +37,19 @@ namespace eCAL { ECAL_CORE_NAMESPACE_V5 { + /** + * @brief eCAL subscriber receive callback struct. + **/ + struct SReceiveCallbackData + { + void* buf = nullptr; //!< payload buffer + long size = 0; //!< payload buffer size + long long id = 0; //!< publisher id (SetId()) + long long time = 0; //!< publisher send time in µs + long long clock = 0; //!< publisher send clock + }; + + /** * @brief eCAL publisher event callback struct. **/ @@ -162,7 +175,7 @@ namespace eCAL * @param topic_name_ The topic name of the received message. * @param data_ Data struct containing payload, timestamp and publication clock. **/ - using ReceiveCallbackT = std::function; + using ReceiveCallbackT = std::function; /** * @brief Publisher event callback function type. (deprecated) diff --git a/ecal/core/src/pubsub/ecal_subscriber_impl.cpp b/ecal/core/src/pubsub/ecal_subscriber_impl.cpp index cec030a06d..931332af30 100644 --- a/ecal/core/src/pubsub/ecal_subscriber_impl.cpp +++ b/ecal/core/src/pubsub/ecal_subscriber_impl.cpp @@ -479,6 +479,7 @@ namespace eCAL while (m_sample_hash_queue.size() > hash_queue_size) m_sample_hash_queue.pop_front(); // check id + // TODO: not sure if this is needed / necessary. if (!m_id_set.empty()) { if (m_id_set.find(id_) == m_id_set.end()) return(0); @@ -532,11 +533,10 @@ namespace eCAL #endif // prepare data struct SReceiveCallbackData cb_data; - cb_data.buf = const_cast(payload_); - cb_data.size = long(size_); - cb_data.id = id_; - cb_data.time = time_; - cb_data.clock = clock_; + cb_data.buffer = static_cast(payload_); + cb_data.buffer_size = size_; + cb_data.send_timestamp = time_; + cb_data.send_clock = clock_; STopicId topic_id; topic_id.topic_name = topic_info_.topic_name; diff --git a/ecal/core/src/v5/pubsub/ecal_subscriber.cpp b/ecal/core/src/v5/pubsub/ecal_subscriber.cpp index c489453aca..c2586c0881 100644 --- a/ecal/core/src/v5/pubsub/ecal_subscriber.cpp +++ b/ecal/core/src/v5/pubsub/ecal_subscriber.cpp @@ -150,9 +150,16 @@ namespace eCAL bool CSubscriber::AddReceiveCallback(ReceiveCallbackT callback_) { - auto v6_callback = [callback_](const STopicId& topic_id_, const SDataTypeInformation&, const eCAL::SReceiveCallbackData& data_) + auto v6_callback = [callback_](const STopicId& topic_id_, const SDataTypeInformation&, const eCAL::SReceiveCallbackData& v6_callback_data) { - callback_(topic_id_.topic_name.c_str(), &data_); + eCAL::v5::SReceiveCallbackData v5_callback_data; + // we all know the const_cast is evil, however, the old api didn't define the buffer as const. + v5_callback_data.buf = const_cast(v6_callback_data.buffer); + v5_callback_data.size = static_cast(v6_callback_data.buffer_size); + v5_callback_data.id = 0; // v6 callbacks do not communicate this data any more, hence it is always set to 0. + v5_callback_data.time = v6_callback_data.send_timestamp; + v5_callback_data.clock = v6_callback_data.send_clock; + callback_(topic_id_.topic_name.c_str(), &v5_callback_data); }; return AddReceiveCallback(v6_callback); } diff --git a/ecal/samples/cpp/benchmarks/counter_rec/src/counter_rec.cpp b/ecal/samples/cpp/benchmarks/counter_rec/src/counter_rec.cpp index 5094110f47..7023841699 100644 --- a/ecal/samples/cpp/benchmarks/counter_rec/src/counter_rec.cpp +++ b/ecal/samples/cpp/benchmarks/counter_rec/src/counter_rec.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2024 Continental Corporation + * Copyright (C) 2016 - 2025 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -37,7 +37,7 @@ int main() // add callback auto on_receive = [&](const struct eCAL::SReceiveCallbackData& data_) { - long long const clock = reinterpret_cast(data_.buf)[0]; + long long const clock = static_cast(data_.buffer)[0]; if(g_first_clock < 0) { g_first_clock = clock; diff --git a/ecal/samples/cpp/benchmarks/datarate_rec/src/datarate_rec.cpp b/ecal/samples/cpp/benchmarks/datarate_rec/src/datarate_rec.cpp index b44464b679..dcca10143f 100644 --- a/ecal/samples/cpp/benchmarks/datarate_rec/src/datarate_rec.cpp +++ b/ecal/samples/cpp/benchmarks/datarate_rec/src/datarate_rec.cpp @@ -52,8 +52,8 @@ int main(int argc, char** argv) std::vector rec_buffer; auto on_receive = [&](const struct eCAL::SReceiveCallbackData& data_) { // make a memory copy to emulate user action - rec_buffer.reserve(data_.size); - std::memcpy(rec_buffer.data(), data_.buf, data_.size); + rec_buffer.reserve(data_.buffer_size); + std::memcpy(rec_buffer.data(), data_.buffer, data_.buffer_size); }; sub.SetReceiveCallback(std::bind(on_receive, std::placeholders::_3)); diff --git a/ecal/samples/cpp/benchmarks/latency_rec/src/latency_rec.cpp b/ecal/samples/cpp/benchmarks/latency_rec/src/latency_rec.cpp index 57417e28c1..3343972f31 100644 --- a/ecal/samples/cpp/benchmarks/latency_rec/src/latency_rec.cpp +++ b/ecal/samples/cpp/benchmarks/latency_rec/src/latency_rec.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2024 Continental Corporation + * Copyright (C) 2016 - 2025 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -48,8 +48,8 @@ void on_receive(const struct eCAL::SReceiveCallbackData& data_, SCallbackPar* pa // update latency, size and msg number const std::lock_guard lock(par_->mtx); - par_->latency_array.push_back(rec_time - data_.time); - par_->rec_size = data_.size; + par_->latency_array.push_back(rec_time - data_.send_timestamp); + par_->rec_size = data_.buffer_size; par_->msg_num++; // delay callback if(delay_ > 0) std::this_thread::sleep_for(std::chrono::milliseconds(delay_)); diff --git a/ecal/samples/cpp/benchmarks/performance_rec/src/performance_rec.cpp b/ecal/samples/cpp/benchmarks/performance_rec/src/performance_rec.cpp index 6e28025a4d..c1d2acfea5 100644 --- a/ecal/samples/cpp/benchmarks/performance_rec/src/performance_rec.cpp +++ b/ecal/samples/cpp/benchmarks/performance_rec/src/performance_rec.cpp @@ -30,10 +30,10 @@ void PrintStatistic(const std::string& topic_name_, const std::chrono::duration< { std::stringstream out; out << "Topic Name: " << topic_name_ << std::endl; - if (data_.size > 15) + if (data_.buffer_size > 15) { out << "Message [0 - 15]: "; - for (auto i = 0; i < 16; ++i) out << (static_cast(data_.buf))[i] << " "; + for (auto i = 0; i < 16; ++i) out << (static_cast(data_.buffer))[i] << " "; out << std::endl; } out << "Message size (kByte): " << (unsigned int)(size_ / 1024.0) << std::endl; @@ -63,7 +63,7 @@ int main() // add callback auto on_receive = [&](const eCAL::STopicId& topic_id_, const eCAL::SReceiveCallbackData & data_) { - auto size = data_.size; + auto size = data_.buffer_size; msgs++; bytes += size; diff --git a/ecal/samples/cpp/benchmarks/perftool/src/subscriber.cpp b/ecal/samples/cpp/benchmarks/perftool/src/subscriber.cpp index 8606285480..80f9c5298e 100644 --- a/ecal/samples/cpp/benchmarks/perftool/src/subscriber.cpp +++ b/ecal/samples/cpp/benchmarks/perftool/src/subscriber.cpp @@ -81,9 +81,9 @@ void Subscriber::callback(const eCAL::SReceiveCallbackData& data_) SubscribedMessage message_info; message_info.local_receive_time = std::chrono::steady_clock::now(); message_info.ecal_receive_time = eCAL::Time::ecal_clock::now(); - message_info.ecal_send_time = eCAL::Time::ecal_clock::time_point(std::chrono::microseconds(data_.time)); - message_info.ecal_counter = data_.clock; - message_info.size_bytes = data_.size; + message_info.ecal_send_time = eCAL::Time::ecal_clock::time_point(std::chrono::microseconds(data_.send_timestamp)); + message_info.ecal_counter = data_.send_clock; + message_info.size_bytes = data_.buffer_size; std::chrono::steady_clock::duration time_to_waste_this_iteration(time_to_waste_); diff --git a/ecal/samples/cpp/benchmarks/pubsub_throughput/src/pubsub_throughput.cpp b/ecal/samples/cpp/benchmarks/pubsub_throughput/src/pubsub_throughput.cpp index 32f0ff97e6..5214f70816 100644 --- a/ecal/samples/cpp/benchmarks/pubsub_throughput/src/pubsub_throughput.cpp +++ b/ecal/samples/cpp/benchmarks/pubsub_throughput/src/pubsub_throughput.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2024 Continental Corporation + * Copyright (C) 2016 - 2025 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -69,7 +69,7 @@ void throughput_test(int snd_size, int snd_loops, eCAL::TransportLayer::eType la // add callback std::atomic received_bytes; auto on_receive = [&](const struct eCAL::SReceiveCallbackData& data_) { - received_bytes += data_.size; + received_bytes += data_.buffer_size; }; sub.SetReceiveCallback(std::bind(on_receive, std::placeholders::_3)); diff --git a/ecal/samples/cpp/pubsub/binary/binary_rec/src/binary_rec.cpp b/ecal/samples/cpp/pubsub/binary/binary_rec/src/binary_rec.cpp index 44e8347570..0e772dbc4f 100644 --- a/ecal/samples/cpp/pubsub/binary/binary_rec/src/binary_rec.cpp +++ b/ecal/samples/cpp/pubsub/binary/binary_rec/src/binary_rec.cpp @@ -27,16 +27,15 @@ // subscriber callback function void OnReceive(const eCAL::STopicId& /*topic_id_*/, const eCAL::SDataTypeInformation& /*data_type_info_*/, const eCAL::SReceiveCallbackData& data_) { - if (data_.size < 1) return; + if (data_.buffer_size < 1) return; - int content(static_cast(static_cast(data_.buf)[0])); + int content(static_cast(static_cast(data_.buffer)[0])); std::cout << "----------------------------------------------" << std::endl; std::cout << " Received binary buffer " << content << std::endl; std::cout << "----------------------------------------------" << std::endl; - std::cout << " Size : " << data_.size << std::endl; - std::cout << " Id : " << data_.id << std::endl; - std::cout << " Time : " << data_.time << std::endl; - std::cout << " Clock : " << data_.clock << std::endl; + std::cout << " Size : " << data_.buffer_size << std::endl; + std::cout << " Time : " << data_.send_timestamp << std::endl; + std::cout << " Clock : " << data_.send_clock << std::endl; std::cout << std::endl; } diff --git a/ecal/samples/cpp/pubsub/binary/binary_zero_copy_rec/src/binary_zero_copy_rec.cpp b/ecal/samples/cpp/pubsub/binary/binary_zero_copy_rec/src/binary_zero_copy_rec.cpp index 9dda53b8ca..9d53676e44 100644 --- a/ecal/samples/cpp/pubsub/binary/binary_zero_copy_rec/src/binary_zero_copy_rec.cpp +++ b/ecal/samples/cpp/pubsub/binary/binary_zero_copy_rec/src/binary_zero_copy_rec.cpp @@ -56,20 +56,19 @@ std::ostream& operator<<(std::ostream& os, const SSimpleStruct& s) // subscriber callback function void OnReceive(const eCAL::STopicId& /*topic_id_*/, const eCAL::SDataTypeInformation& /*data_type_info_*/, const eCAL::SReceiveCallbackData& data_) { - if (data_.size < 1) return; + if (data_.buffer_size < 1) return; std::cout << "------------------------------------" << std::endl; std::cout << "Binary buffer header :" << std::endl; std::cout << "------------------------------------" << std::endl; - std::cout << " Size : " << data_.size << std::endl; - std::cout << " Id : " << data_.id << std::endl; - std::cout << " Time : " << data_.time << std::endl; - std::cout << " Clock : " << data_.clock << std::endl; + std::cout << " Size : " << data_.buffer_size << std::endl; + std::cout << " Time : " << data_.send_timestamp << std::endl; + std::cout << " Clock : " << data_.send_clock << std::endl; std::cout << std::endl; std::cout << "------------------------------------" << std::endl; std::cout << "SSimpleStruct :" << std::endl; std::cout << "------------------------------------" << std::endl; - std::cout << *static_cast(data_.buf) << std::endl; + std::cout << *static_cast(data_.buffer) << std::endl; } int main() diff --git a/ecal/tests/cpp/pubsub_test/src/pubsub_acknowledge.cpp b/ecal/tests/cpp/pubsub_test/src/pubsub_acknowledge.cpp index cdeed6c1de..17d507a953 100644 --- a/ecal/tests/cpp/pubsub_test/src/pubsub_acknowledge.cpp +++ b/ecal/tests/cpp/pubsub_test/src/pubsub_acknowledge.cpp @@ -63,7 +63,7 @@ TEST(core_cpp_pubsub, TimeoutAcknowledgment) auto sub1 = std::make_shared< eCAL::CSubscriber>("topic"); auto sleeper_variable_time = [](const eCAL::STopicId& /*topic_id_*/, const eCAL::SDataTypeInformation& /*data_type_info_*/, const eCAL::SReceiveCallbackData& data_) { - std::string const sleep_time((const char*)data_.buf, data_.size); + std::string const sleep_time((const char*)data_.buffer, data_.buffer_size); int const sleep = std::stoi(sleep_time); std::this_thread::sleep_for(std::chrono::milliseconds(sleep)); }; diff --git a/ecal/tests/cpp/pubsub_test/src/pubsub_connection_test.cpp b/ecal/tests/cpp/pubsub_test/src/pubsub_connection_test.cpp index 45a9157b41..aee9edcef8 100644 --- a/ecal/tests/cpp/pubsub_test/src/pubsub_connection_test.cpp +++ b/ecal/tests/cpp/pubsub_test/src/pubsub_connection_test.cpp @@ -77,7 +77,7 @@ TEST(core_cpp_pubsub, TestSubscriberIsPublishedTiming) if (sub_count == 0) { publisher_seen_at_subscription_start = sub.GetPublisherCount() > 0; - first_received_sample = std::string(static_cast(data_.buf), data_.size); + first_received_sample = std::string(static_cast(data_.buffer), data_.buffer_size); } if (sub_count < max_sub_count) @@ -176,7 +176,7 @@ TEST(core_cpp_pubsub, TestPublisherIsSubscribedTiming) if (sub_count == 0) { publisher_seen_at_subscription_start = sub.GetPublisherCount() > 0; - first_received_sample = std::string(static_cast(data_.buf), data_.size); + first_received_sample = std::string(static_cast(data_.buffer), data_.buffer_size); } if (sub_count < max_sub_count) @@ -255,7 +255,7 @@ TEST(core_cpp_pubsub, TestChainedPublisherSubscriberCallback) eCAL::CSubscriber sub1("topic1"); auto subscriber1_callback = [&pub2](const eCAL::STopicId& /*topic_id_*/, const eCAL::SDataTypeInformation& /*data_type_info_*/, const eCAL::SReceiveCallbackData& data_) { // On receiving data from Publisher1, Publisher2 sends the same data - const std::string received_data(static_cast(data_.buf), data_.size); + const std::string received_data(static_cast(data_.buffer), data_.buffer_size); pub2.Send(received_data); }; sub1.SetReceiveCallback(subscriber1_callback); diff --git a/ecal/tests/cpp/pubsub_test/src/pubsub_multibuffer.cpp b/ecal/tests/cpp/pubsub_test/src/pubsub_multibuffer.cpp index 23fbc0a375..0b1e34111c 100644 --- a/ecal/tests/cpp/pubsub_test/src/pubsub_multibuffer.cpp +++ b/ecal/tests/cpp/pubsub_test/src/pubsub_multibuffer.cpp @@ -99,11 +99,11 @@ std::vector multibuffer_pub_sub_test(int buffer_count, bool zero_copy, int // add callback auto lambda = [&](const eCAL::STopicId& /*topic_id_*/, const eCAL::SDataTypeInformation& /*data_type_info_*/, const eCAL::SReceiveCallbackData& data_) { - received_bytes += data_.size; + received_bytes += data_.buffer_size; ++received_count; for (auto i = 0; i < bytes_to_read; ++i) { - const char rec_char(static_cast(data_.buf)[i]); + const char rec_char(static_cast(data_.buffer)[i]); received_content.push_back(rec_char); std::cout << std::setw(2) << std::setfill('0') << static_cast(rec_char) << " "; } diff --git a/ecal/tests/cpp/pubsub_test/src/pubsub_test.cpp b/ecal/tests/cpp/pubsub_test/src/pubsub_test.cpp index 741dfee0d0..e4a4e24fca 100644 --- a/ecal/tests/cpp/pubsub_test/src/pubsub_test.cpp +++ b/ecal/tests/cpp/pubsub_test/src/pubsub_test.cpp @@ -45,7 +45,7 @@ namespace std::atomic g_callback_received_count; void OnReceive(const struct eCAL::SReceiveCallbackData& data_) { - g_callback_received_bytes += data_.size; + g_callback_received_bytes += data_.buffer_size; g_callback_received_count++; } diff --git a/ecal/tests/cpp/pubsub_test/src/pubsub_test_shm.cpp b/ecal/tests/cpp/pubsub_test/src/pubsub_test_shm.cpp index 0bbab1f18e..bb229101ef 100644 --- a/ecal/tests/cpp/pubsub_test/src/pubsub_test_shm.cpp +++ b/ecal/tests/cpp/pubsub_test/src/pubsub_test_shm.cpp @@ -42,7 +42,7 @@ namespace std::atomic g_callback_received_count; void OnReceive(const struct eCAL::SReceiveCallbackData& data_) { - g_callback_received_bytes += data_.size; + g_callback_received_bytes += data_.buffer_size; g_callback_received_count++; } } @@ -130,8 +130,8 @@ TEST(core_cpp_pubsub, MultipleSendsSHM) // add callback auto save_data = [&last_received_msg, &last_received_timestamp](const eCAL::STopicId& /*topic_id_*/, const eCAL::SDataTypeInformation& /*data_type_info_*/, const eCAL::SReceiveCallbackData& data_) { - last_received_msg = std::string{ (const char*)data_.buf, (size_t)data_.size}; - last_received_timestamp = data_.time; + last_received_msg = std::string{ (const char*)data_.buffer, (size_t)data_.buffer_size}; + last_received_timestamp = data_.send_timestamp; }; EXPECT_TRUE(sub.SetReceiveCallback(save_data)); diff --git a/ecal/tests/cpp/pubsub_test/src/pubsub_test_udp.cpp b/ecal/tests/cpp/pubsub_test/src/pubsub_test_udp.cpp index 6724dacef1..1918e7b65d 100644 --- a/ecal/tests/cpp/pubsub_test/src/pubsub_test_udp.cpp +++ b/ecal/tests/cpp/pubsub_test/src/pubsub_test_udp.cpp @@ -41,7 +41,7 @@ namespace std::atomic g_callback_received_count; void OnReceive(const eCAL::SReceiveCallbackData& data_) { - g_callback_received_bytes += data_.size; + g_callback_received_bytes += data_.buffer_size; g_callback_received_count++; } } @@ -116,8 +116,8 @@ TEST(core_cpp_pubsub, MultipleSendsUDP) // add callback auto save_data = [&last_received_msg, &last_received_timestamp](const eCAL::STopicId& /*topic_id_*/, const eCAL::SDataTypeInformation& /*data_type_info_*/, const eCAL::SReceiveCallbackData& data_) { - last_received_msg = std::string{ (const char*)data_.buf, (size_t)data_.size }; - last_received_timestamp = data_.time; + last_received_msg = std::string{ (const char*)data_.buffer, (size_t)data_.buffer_size }; + last_received_timestamp = data_.send_timestamp; }; EXPECT_TRUE(sub.SetReceiveCallback(save_data)); diff --git a/ecal/tests/cpp/pubsub_v5_test/src/pubsub_connection_test.cpp b/ecal/tests/cpp/pubsub_v5_test/src/pubsub_connection_test.cpp index 01b60d5f08..42482de333 100644 --- a/ecal/tests/cpp/pubsub_v5_test/src/pubsub_connection_test.cpp +++ b/ecal/tests/cpp/pubsub_v5_test/src/pubsub_connection_test.cpp @@ -73,7 +73,7 @@ TEST(core_cpp_pubsub_v5, TestSubscriberIsPublishedTiming) eCAL::v5::CSubscriber sub("blob"); const auto max_sub_count(10); auto sub_count(0); - auto receive_lambda = [&max_sub_count, &sub_count, &publisher_seen_at_subscription_start, &first_received_sample, &sub](const char* /*topic_name_*/, const struct eCAL::SReceiveCallbackData* data_) { + auto receive_lambda = [&max_sub_count, &sub_count, &publisher_seen_at_subscription_start, &first_received_sample, &sub](const char* /*topic_name_*/, const struct eCAL::v5::SReceiveCallbackData* data_) { if (sub_count == 0) { publisher_seen_at_subscription_start = sub.IsPublished(); @@ -172,7 +172,7 @@ TEST(core_cpp_pubsub_v5, TestPublisherIsSubscribedTiming) eCAL::v5::CSubscriber sub("blob"); const auto max_sub_count(10); auto sub_count(0); - auto receive_lambda = [&max_sub_count, &sub_count, &publisher_seen_at_subscription_start, &first_received_sample, &sub](const char* /*topic_name_*/, const struct eCAL::SReceiveCallbackData* data_) { + auto receive_lambda = [&max_sub_count, &sub_count, &publisher_seen_at_subscription_start, &first_received_sample, &sub](const char* /*topic_name_*/, const struct eCAL::v5::SReceiveCallbackData* data_) { if (sub_count == 0) { publisher_seen_at_subscription_start = sub.IsPublished(); @@ -253,7 +253,7 @@ TEST(core_cpp_pubsub_v5, TestChainedPublisherSubscriberCallback) // Subscriber1 with callback that triggers Publisher2 eCAL::v5::CSubscriber sub1("topic1"); - auto subscriber1_callback = [&pub2](const char* /*topic_name*/, const eCAL::SReceiveCallbackData* data) { + auto subscriber1_callback = [&pub2](const char* /*topic_name*/, const eCAL::v5::SReceiveCallbackData* data) { // On receiving data from Publisher1, Publisher2 sends the same data const std::string received_data(static_cast(data->buf), data->size); pub2.Send(received_data); @@ -262,7 +262,7 @@ TEST(core_cpp_pubsub_v5, TestChainedPublisherSubscriberCallback) // Subscriber2 that receives data from Publisher2 eCAL::v5::CSubscriber sub2("topic2"); - auto subscriber2_callback = [&subscriber2_received_count](const char* /*topic_name*/, const eCAL::SReceiveCallbackData* /*data*/) { + auto subscriber2_callback = [&subscriber2_received_count](const char* /*topic_name*/, const eCAL::v5::SReceiveCallbackData* /*data*/) { // Count each received message from Publisher2 subscriber2_received_count++; //std::cout << "Subscriber2 Receiving " << std::string(static_cast(data->buf), data->size) << std::endl; diff --git a/lang/c/core/src/subscriber.cpp b/lang/c/core/src/subscriber.cpp index a261689b58..3590cde8f4 100644 --- a/lang/c/core/src/subscriber.cpp +++ b/lang/c/core/src/subscriber.cpp @@ -54,7 +54,7 @@ namespace } std::recursive_mutex g_sub_receive_callback_mtx; // NOLINT(*-avoid-non-const-global-variables) - void g_sub_receive_callback(const char* topic_name_, const struct eCAL::SReceiveCallbackData* data_, const ReceiveCallbackCT callback_, void* par_) + void g_sub_receive_callback(const char* topic_name_, const struct eCAL::v5::SReceiveCallbackData* data_, const ReceiveCallbackCT callback_, void* par_) { const std::lock_guard lock(g_sub_receive_callback_mtx); SReceiveCallbackDataC data{}; diff --git a/lang/csharp/Continental.eCAL.Core/ecal_clr.cpp b/lang/csharp/Continental.eCAL.Core/ecal_clr.cpp index 74b3bf2172..1e7315a635 100644 --- a/lang/csharp/Continental.eCAL.Core/ecal_clr.cpp +++ b/lang/csharp/Continental.eCAL.Core/ecal_clr.cpp @@ -353,7 +353,7 @@ System::String^ Subscriber::Dump() return(StlStringToString(m_sub->Dump())); } -void Subscriber::OnReceive(const char* topic_name_, const ::eCAL::SReceiveCallbackData* data_) +void Subscriber::OnReceive(const char* topic_name_, const ::eCAL::v5::SReceiveCallbackData* data_) { std::string received_bytes = std::string(static_cast(data_->buf), static_cast(data_->size)); ReceiveCallbackData^ data = gcnew ReceiveCallbackData(); @@ -364,7 +364,7 @@ void Subscriber::OnReceive(const char* topic_name_, const ::eCAL::SReceiveCallba std::string topic_name = std::string(topic_name_); m_callbacks(StlStringToString(topic_name), data); } -void Subscriber::OnReceiveUnsafe(const char* topic_name_, const ::eCAL::SReceiveCallbackData* data_) +void Subscriber::OnReceiveUnsafe(const char* topic_name_, const ::eCAL::v5::SReceiveCallbackData* data_) { ReceiveCallbackDataUnsafe^ data = gcnew ReceiveCallbackDataUnsafe(); data->data = data_->buf; diff --git a/lang/csharp/Continental.eCAL.Core/ecal_clr.h b/lang/csharp/Continental.eCAL.Core/ecal_clr.h index c512b23955..9a830fd9b5 100644 --- a/lang/csharp/Continental.eCAL.Core/ecal_clr.h +++ b/lang/csharp/Continental.eCAL.Core/ecal_clr.h @@ -365,14 +365,14 @@ namespace Continental /** * @brief The callback of the subscriber, that is registered with the unmanaged code **/ - delegate void subCallback(const char* topic_name_, const ::eCAL::SReceiveCallbackData* data_); + delegate void subCallback(const char* topic_name_, const ::eCAL::v5::SReceiveCallbackData* data_); subCallback^ m_sub_callback; - void OnReceive(const char* topic_name_, const ::eCAL::SReceiveCallbackData* data_); - void OnReceiveUnsafe(const char* topic_name_, const ::eCAL::SReceiveCallbackData* data_); + void OnReceive(const char* topic_name_, const ::eCAL::v5::SReceiveCallbackData* data_); + void OnReceiveUnsafe(const char* topic_name_, const ::eCAL::v5::SReceiveCallbackData* data_); /** * @brief stdcall function pointer definition of eCAL::ReceiveCallbackT **/ - typedef void(__stdcall * stdcall_eCAL_ReceiveCallbackT)(const char*, const ::eCAL::SReceiveCallbackData*); + typedef void(__stdcall * stdcall_eCAL_ReceiveCallbackT)(const char*, const ::eCAL::v5::SReceiveCallbackData*); }; diff --git a/lang/python/core/src/ecal_clang.cpp b/lang/python/core/src/ecal_clang.cpp index 0b6ce7e296..81c2a5ee24 100644 --- a/lang/python/core/src/ecal_clang.cpp +++ b/lang/python/core/src/ecal_clang.cpp @@ -528,7 +528,7 @@ bool sub_receive_buffer(ECAL_HANDLE handle_, const char** rcv_buf_, int* rcv_buf /* sub_add_receive_callback */ /****************************************/ static std::mutex g_sub_receive_callback_mtx; -static void g_sub_receive_callback(const char* topic_name_, const struct eCAL::SReceiveCallbackData* data_, const ReceiveCallbackCT callback_, void* par_) +static void g_sub_receive_callback(const char* topic_name_, const struct eCAL::v5::SReceiveCallbackData* data_, const ReceiveCallbackCT callback_, void* par_) { const std::lock_guard lock(g_sub_receive_callback_mtx); SReceiveCallbackDataC data{}; diff --git a/lang/python/core/src/ecal_wrap.cxx b/lang/python/core/src/ecal_wrap.cxx index 0e37e814bd..26236d927d 100644 --- a/lang/python/core/src/ecal_wrap.cxx +++ b/lang/python/core/src/ecal_wrap.cxx @@ -368,7 +368,7 @@ PyObject* sub_receive(PyObject* /*self*/, PyObject* args) /****************************************/ /* sub_set_callback */ /****************************************/ -static void c_subscriber_callback(const char* topic_name_, const struct eCAL::SReceiveCallbackData* data_, ECAL_HANDLE handle_, const std::string& /*python_formatter*/) +static void c_subscriber_callback(const char* topic_name_, const struct eCAL::v5::SReceiveCallbackData* data_, ECAL_HANDLE handle_, const std::string& /*python_formatter*/) { #if ECAL_PY_INIT_THREADS_NEEDED if (!g_pygil_init) diff --git a/serialization/capnproto/samples/pubsub/addressbook_rec_dynamic/src/addressbook_rec_dynamic.cpp b/serialization/capnproto/samples/pubsub/addressbook_rec_dynamic/src/addressbook_rec_dynamic.cpp index ef257068e8..8a0b5acdec 100644 --- a/serialization/capnproto/samples/pubsub/addressbook_rec_dynamic/src/addressbook_rec_dynamic.cpp +++ b/serialization/capnproto/samples/pubsub/addressbook_rec_dynamic/src/addressbook_rec_dynamic.cpp @@ -115,7 +115,7 @@ int main() // create a subscriber (topic name "addressbook") eCAL::capnproto::CDynamicSubscriber sub("addressbook"); - auto lambda = [](const eCAL::STopicId& /*topic_id_*/, const capnp::DynamicValue::Reader& msg_, long long /*time_*/, long long /*clock_*/, long long /*id_*/) -> void { + auto lambda = [](const eCAL::STopicId& /*topic_id_*/, const capnp::DynamicValue::Reader& msg_, long long /*time_*/, long long /*clock_*/) -> void { dynamicPrintValue(msg_); }; sub.SetReceiveCallback(lambda); diff --git a/serialization/common/core/include/ecal/msg/dynamic.h b/serialization/common/core/include/ecal/msg/dynamic.h index ed1b6dba19..392be47754 100644 --- a/serialization/common/core/include/ecal/msg/dynamic.h +++ b/serialization/common/core/include/ecal/msg/dynamic.h @@ -134,9 +134,8 @@ namespace eCAL * @param msg_ Message content. * @param time_ Message time stamp. * @param clock_ Message writer clock. - * @param id_ Message id. **/ - using MsgReceiveCallbackT = std::function; + using MsgReceiveCallbackT = std::function; /** * @brief Set receive callback for incoming messages. @@ -221,8 +220,8 @@ namespace eCAL try { - auto msg = m_deserializer.Deserialize(data_.buf, data_.size, topic_info_); - fn_callback(topic_id_, msg, data_.time, data_.clock, data_.id); + auto msg = m_deserializer.Deserialize(data_.buffer, data_.buffer_size, topic_info_); + fn_callback(topic_id_, msg, data_.send_timestamp, data_.send_clock); } catch (const DynamicReflectionException& e) { diff --git a/serialization/common/core/include/ecal/msg/subscriber.h b/serialization/common/core/include/ecal/msg/subscriber.h index f16f32f227..bb4764d678 100644 --- a/serialization/common/core/include/ecal/msg/subscriber.h +++ b/serialization/common/core/include/ecal/msg/subscriber.h @@ -129,9 +129,8 @@ namespace eCAL * @param msg_ Message content. * @param time_ Message time stamp. * @param clock_ Message writer clock. - * @param id_ Message id. **/ - using MsgReceiveCallbackT = std::function; + using MsgReceiveCallbackT = std::function; /** * @brief Add receive callback for incoming messages. @@ -180,9 +179,9 @@ namespace eCAL T msg; // In the future, I would like to get m_datatype_info from the ReceiveBuffer function! - if (m_deserializer.Deserialize(msg, data_.buf, data_.size)) + if (m_deserializer.Deserialize(msg, data_.buffer, data_.buffer_size)) { - (fn_callback)(topic_id_, msg, data_.time, data_.clock, data_.id); + (fn_callback)(topic_id_, msg, data_.send_timestamp, data_.send_clock); } } diff --git a/serialization/flatbuffers/samples/pubsub/monster_rec/monster_rec.cpp b/serialization/flatbuffers/samples/pubsub/monster_rec/monster_rec.cpp index 16be14945e..583d346e38 100644 --- a/serialization/flatbuffers/samples/pubsub/monster_rec/monster_rec.cpp +++ b/serialization/flatbuffers/samples/pubsub/monster_rec/monster_rec.cpp @@ -32,7 +32,7 @@ #include -void OnMonster(const eCAL::STopicId& topic_id_, const flatbuffers::FlatBufferBuilder& msg_, long long time_, long long /*clock_*/, long long /*id_*/) +void OnMonster(const eCAL::STopicId& topic_id_, const flatbuffers::FlatBufferBuilder& msg_, long long time_, long long /*clock_*/) { // create monster auto monster(Game::Sample::GetMonster(msg_.GetBufferPointer())); diff --git a/serialization/protobuf/samples/pubsub/person_loopback/src/person_loopback.cpp b/serialization/protobuf/samples/pubsub/person_loopback/src/person_loopback.cpp index af08c44566..4cc3dd5a98 100644 --- a/serialization/protobuf/samples/pubsub/person_loopback/src/person_loopback.cpp +++ b/serialization/protobuf/samples/pubsub/person_loopback/src/person_loopback.cpp @@ -40,7 +40,7 @@ int main() pb::People::Person person; eCAL::protobuf::CSubscriber sub("person"); - auto receive_lambda = [](const eCAL::STopicId& /*topic_id_*/, const pb::People::Person& person_, const long long /*time_*/, const long long /*clock_*/, const long long /*id_*/){ + auto receive_lambda = [](const eCAL::STopicId& /*topic_id_*/, const pb::People::Person& person_, const long long /*time_*/, const long long /*clock_*/){ std::cout << "------------------------------------------" << std::endl; std::cout << " RECEIVED " << std::endl; std::cout << "------------------------------------------" << std::endl;