From f7bc83acc42cbbfefcc9e6e0c3154ee854aa9e0f Mon Sep 17 00:00:00 2001 From: Lennart Nachtigall Date: Thu, 28 Nov 2024 11:20:12 +0100 Subject: [PATCH 1/6] first step into modernizing the rt publisher --- include/realtime_tools/realtime_publisher.hpp | 60 +++++++++++++------ 1 file changed, 43 insertions(+), 17 deletions(-) diff --git a/include/realtime_tools/realtime_publisher.hpp b/include/realtime_tools/realtime_publisher.hpp index ca004342..58b55bd1 100644 --- a/include/realtime_tools/realtime_publisher.hpp +++ b/include/realtime_tools/realtime_publisher.hpp @@ -49,27 +49,39 @@ namespace realtime_tools { -template +template class RealtimePublisher { -private: - using PublisherSharedPtr = typename rclcpp::Publisher::SharedPtr; - public: + /// Provide various typedefs to resemble the rclcpp::Publisher type + using PublisherType = rclcpp::Publisher; + using PublisherSharedPtr = typename rclcpp::Publisher::SharedPtr; + + using PublishedType = typename rclcpp::TypeAdapter::custom_type; + using ROSMessageType = typename rclcpp::TypeAdapter::ros_message_type; + + RCLCPP_SMART_PTR_DEFINITIONS(RealtimePublisher) + /// The msg_ variable contains the data that will get published on the ROS topic. - Msg msg_; + MessageT msg_; /** \brief Constructor for the realtime publisher * * \param publisher the publisher to wrap */ explicit RealtimePublisher(PublisherSharedPtr publisher) - : publisher_(publisher), is_running_(false), keep_running_(true), turn_(LOOP_NOT_STARTED) + : publisher_(publisher), is_running_(false), keep_running_(true), turn_(State::LOOP_NOT_STARTED) { thread_ = std::thread(&RealtimePublisher::publishingLoop, this); } - RealtimePublisher() : is_running_(false), keep_running_(false), turn_(LOOP_NOT_STARTED) {} + [[deprecated( + "Use constructor with rclcpp::Publisher::SharedPtr instead - this class does not make sense " + "without a real publisher")]] + RealtimePublisher() + : is_running_(false), keep_running_(false), turn_(State::LOOP_NOT_STARTED) + { + } /// Destructor ~RealtimePublisher() @@ -101,7 +113,7 @@ class RealtimePublisher bool trylock() { if (msg_mutex_.try_lock()) { - if (turn_ == REALTIME) { + if (turn_ == State::REALTIME) { return true; } else { msg_mutex_.unlock(); @@ -112,6 +124,20 @@ class RealtimePublisher } } + /** \brief Try to get the data lock from realtime and publish the given message + * + * Tries to gain unique access to msg_ variable. If this succeeds + * update the msg_ variable and call unlockAndPublish + * @return false in case no lock for the realtime variable could be acquired + */ + bool tryPublish(const MessageT & msg) + { + if (!trylock()) return false; + + msg_ = msg; + unlockAndPublish(); + } + /** \brief Unlock the msg_ variable * * After a successful trylock and after the data is written to the mgs_ @@ -120,7 +146,7 @@ class RealtimePublisher */ void unlockAndPublish() { - turn_ = NON_REALTIME; + turn_ = State::NON_REALTIME; unlock(); } @@ -163,10 +189,10 @@ class RealtimePublisher void publishingLoop() { is_running_ = true; - turn_ = REALTIME; + turn_ = State::REALTIME; while (keep_running_) { - Msg outgoing; + MessageT outgoing; // Locks msg_ and copies it @@ -176,7 +202,7 @@ class RealtimePublisher lock(); #endif - while (turn_ != NON_REALTIME && keep_running_) { + while (turn_ != State::NON_REALTIME && keep_running_) { #ifdef NON_POLLING updated_cond_.wait(lock_); #else @@ -186,7 +212,7 @@ class RealtimePublisher #endif } outgoing = msg_; - turn_ = REALTIME; + turn_ = State::REALTIME; unlock(); @@ -210,12 +236,12 @@ class RealtimePublisher std::condition_variable updated_cond_; #endif - enum { REALTIME, NON_REALTIME, LOOP_NOT_STARTED }; - std::atomic turn_; // Who's turn is it to use msg_? + enum class State : int { REALTIME, NON_REALTIME, LOOP_NOT_STARTED }; + std::atomic turn_; // Who's turn is it to use msg_? }; -template -using RealtimePublisherSharedPtr = std::shared_ptr>; +template +using RealtimePublisherSharedPtr = std::shared_ptr>; } // namespace realtime_tools #endif // REALTIME_TOOLS__REALTIME_PUBLISHER_HPP_ From de78e6819afe7adea7067816f43194a15a4fd215 Mon Sep 17 00:00:00 2001 From: Lennart Nachtigall Date: Thu, 28 Nov 2024 13:57:51 +0100 Subject: [PATCH 2/6] refactored RealtimePublisher to not use conditional compilation anymore --- include/realtime_tools/realtime_publisher.hpp | 315 +++++++++++++----- 1 file changed, 226 insertions(+), 89 deletions(-) diff --git a/include/realtime_tools/realtime_publisher.hpp b/include/realtime_tools/realtime_publisher.hpp index 58b55bd1..72a52592 100644 --- a/include/realtime_tools/realtime_publisher.hpp +++ b/include/realtime_tools/realtime_publisher.hpp @@ -49,74 +49,77 @@ namespace realtime_tools { -template -class RealtimePublisher + +// Allow to select the way the RT Publisher is implemented +enum class RealtimePublisherPollingMode { Polling, NonPolling }; + +// Provide backwards compatibility for NON_POLLING define +#ifndef NON_POLLING +#define NON_POLLING RealtimePublisherPollingMode::Polling +#else +#undef NON_POLLING +#define NON_POLLING RealtimePublisherPollingMode::NonPolling +// Note this is the only way to somehow deprecate a define as #warning is not portable +namespace +{ +[[deprecated( + "NON_POLLING_DEFINE is deprecated - use RealtimePublisher")]] +static constexpr int NON_POLLING_define_is_deprecated = 0; +static constexpr int trigger_NON_POLLING_define_is_deprecated = NON_POLLING_define_is_deprecated; +} // namespace +#endif + +/** + * @brief RealtimePublisherBase base class for both RealtimePublisher implementation + * We try to reduce the amount of code duplication by moving some parts into this base class. + */ +template +class RealtimePublisherBase { public: /// Provide various typedefs to resemble the rclcpp::Publisher type using PublisherType = rclcpp::Publisher; using PublisherSharedPtr = typename rclcpp::Publisher::SharedPtr; - using PublishedType = typename rclcpp::TypeAdapter::custom_type; using ROSMessageType = typename rclcpp::TypeAdapter::ros_message_type; + using PollingMode = RealtimePublisherPollingMode; - RCLCPP_SMART_PTR_DEFINITIONS(RealtimePublisher) - - /// The msg_ variable contains the data that will get published on the ROS topic. + // The msg_ variable contains the data that will get published on the ROS topic. + // TODO(firesurfer) this should not be public but we keep it at + // the moment for backward compatibility MessageT msg_; - /** \brief Constructor for the realtime publisher - * - * \param publisher the publisher to wrap + /** + * \brief Constructor for the realtime publisher + * \param publisher SharedPtr to the publisher we ant to wrap */ - explicit RealtimePublisher(PublisherSharedPtr publisher) + explicit RealtimePublisherBase(const PublisherSharedPtr & publisher) : publisher_(publisher), is_running_(false), keep_running_(true), turn_(State::LOOP_NOT_STARTED) { - thread_ = std::thread(&RealtimePublisher::publishingLoop, this); } [[deprecated( "Use constructor with rclcpp::Publisher::SharedPtr instead - this class does not make sense " "without a real publisher")]] - RealtimePublisher() + RealtimePublisherBase() : is_running_(false), keep_running_(false), turn_(State::LOOP_NOT_STARTED) { } - /// Destructor - ~RealtimePublisher() - { - stop(); - while (is_running()) { - std::this_thread::sleep_for(std::chrono::microseconds(100)); - } - if (thread_.joinable()) { - thread_.join(); - } - } - - /// Stop the realtime publisher from sending out more ROS messages - void stop() - { - keep_running_ = false; -#ifdef NON_POLLING - updated_cond_.notify_one(); // So the publishing loop can exit -#endif - } - - /** \brief Try to get the data lock from realtime - * + /** + * \brief Try to get the data lock from realtime * To publish data from the realtime loop, you need to run trylock to * attempt to get unique access to the msg_ variable. Trylock returns * true if the lock was acquired, and false if it failed to get the lock. */ bool trylock() { - if (msg_mutex_.try_lock()) { - if (turn_ == State::REALTIME) { + if (this->msg_mutex_.try_lock()) { + if (this->turn_ == State::REALTIME) { return true; } else { - msg_mutex_.unlock(); + this->msg_mutex_.unlock(); return false; } } else { @@ -124,120 +127,254 @@ class RealtimePublisher } } - /** \brief Try to get the data lock from realtime and publish the given message - * +protected: + bool is_running() const { return is_running_; } + + PublisherSharedPtr publisher_; + std::atomic is_running_; + std::atomic keep_running_; + + std::thread thread_; + + std::mutex msg_mutex_; // Protects msg_ + + enum class State : int { REALTIME, NON_REALTIME, LOOP_NOT_STARTED }; + std::atomic turn_; // Who's turn is it to use msg_? +}; + +template +class RealtimePublisher; + +template +class RealtimePublisher +: public RealtimePublisherBase +{ +private: + // For templated base classes we need to bring using statements into our own scope + using Base = RealtimePublisherBase; + using typename Base::PublisherSharedPtr; + using typename Base::State; + +public: + RCLCPP_SMART_PTR_DEFINITIONS(RealtimePublisher) + + explicit RealtimePublisher(PublisherSharedPtr publisher) : Base(publisher) + { + this->thread_ = std::thread(&RealtimePublisher::publishingLoop, this); + } + // Provide an empty constructor for compatibility + RealtimePublisher() : Base() {} + ~RealtimePublisher() + { + stop(); + while (this->is_running()) { + std::this_thread::sleep_for(std::chrono::microseconds(100)); + } + if (this->thread_.joinable()) { + this->thread_.join(); + } + } + + // Stop the realtime publisher from sending out more ROS messages + void stop() { this->keep_running_ = false; } + + /** + * \brief Try to get the data lock from realtime and publish the given message * Tries to gain unique access to msg_ variable. If this succeeds * update the msg_ variable and call unlockAndPublish * @return false in case no lock for the realtime variable could be acquired */ bool tryPublish(const MessageT & msg) { - if (!trylock()) return false; + if (!this->trylock()) return false; - msg_ = msg; + this->msg_ = msg; unlockAndPublish(); } - /** \brief Unlock the msg_ variable - * + /** + * \brief Unlock the msg_ variable * After a successful trylock and after the data is written to the mgs_ * variable, the lock has to be released for the message to get * published on the specified topic. */ void unlockAndPublish() { - turn_ = State::NON_REALTIME; + this->turn_ = State::NON_REALTIME; unlock(); } - /** \brief Get the data lock form non-realtime - * + /** + * \brief Get the data lock form non-realtime * To publish data from the realtime loop, you need to run trylock to * attempt to get unique access to the msg_ variable. Trylock returns * true if the lock was acquired, and false if it failed to get the lock. */ void lock() { -#ifdef NON_POLLING - msg_mutex_.lock(); -#else // never actually block on the lock - while (!msg_mutex_.try_lock()) { + while (!this->msg_mutex_.try_lock()) { std::this_thread::sleep_for(std::chrono::microseconds(200)); } -#endif } /** \brief Unlocks the data without publishing anything * */ - void unlock() - { - msg_mutex_.unlock(); -#ifdef NON_POLLING - updated_cond_.notify_one(); -#endif - } + void unlock() { this->msg_mutex_.unlock(); } private: // non-copyable RealtimePublisher(const RealtimePublisher &) = delete; RealtimePublisher & operator=(const RealtimePublisher &) = delete; - bool is_running() const { return is_running_; } - void publishingLoop() { - is_running_ = true; - turn_ = State::REALTIME; + this->is_running_ = true; + this->turn_ = State::REALTIME; - while (keep_running_) { + while (this->keep_running_) { MessageT outgoing; // Locks msg_ and copies it - -#ifdef NON_POLLING - std::unique_lock lock_(msg_mutex_); -#else lock(); -#endif - while (turn_ != State::NON_REALTIME && keep_running_) { -#ifdef NON_POLLING - updated_cond_.wait(lock_); -#else + while (this->turn_ != State::NON_REALTIME && this->keep_running_) { unlock(); std::this_thread::sleep_for(std::chrono::microseconds(500)); lock(); -#endif } - outgoing = msg_; - turn_ = State::REALTIME; + outgoing = this->msg_; + this->turn_ = State::REALTIME; unlock(); // Sends the outgoing message - if (keep_running_) { - publisher_->publish(outgoing); + if (this->keep_running_) { + this->publisher_->publish(outgoing); } } - is_running_ = false; + this->is_running_ = false; } +}; - PublisherSharedPtr publisher_; - std::atomic is_running_; - std::atomic keep_running_; +template +class RealtimePublisher +: public RealtimePublisherBase +{ +private: + using Base = RealtimePublisherBase; + using typename Base::PublisherSharedPtr; + using typename Base::State; - std::thread thread_; +public: + RCLCPP_SMART_PTR_DEFINITIONS(RealtimePublisher) - std::mutex msg_mutex_; // Protects msg_ + /** + * \brief Constructor for the realtime publisher + * \param publisher the publisher to wrap + */ + explicit RealtimePublisher(const PublisherSharedPtr & publisher) : Base(publisher) + { + this->thread_ = std::thread(&RealtimePublisher::publishingLoop, this); + } + // Provide an empty constructor for compatibility + RealtimePublisher() : Base() {} -#ifdef NON_POLLING - std::condition_variable updated_cond_; -#endif + ~RealtimePublisher() + { + stop(); + while (this->is_running()) { + std::this_thread::sleep_for(std::chrono::microseconds(100)); + } + if (this->thread_.joinable()) { + this->thread_.join(); + } + } - enum class State : int { REALTIME, NON_REALTIME, LOOP_NOT_STARTED }; - std::atomic turn_; // Who's turn is it to use msg_? + /// Stop the realtime publisher from sending out more ROS messages + void stop() + { + this->keep_running_ = false; + updated_cond_.notify_one(); // So the publishing loop can exit + } + + /** + * \brief Try to get the data lock from realtime and publish the given message + * Tries to gain unique access to msg_ variable. If this succeeds + * update the msg_ variable and call unlockAndPublish + * @return false in case no lock for the realtime variable could be acquired + */ + bool tryPublish(const MessageT & msg) + { + if (!this->trylock()) return false; + + this->msg_ = msg; + unlockAndPublish(); + } + + /** + * \brief Unlock the msg_ variable + * After a successful trylock and after the data is written to the mgs_ + * variable, the lock has to be released for the message to get + * published on the specified topic. + */ + void unlockAndPublish() + { + this->turn_ = State::NON_REALTIME; + unlock(); + } + + /** + * \brief Get the data lock form non-realtime + * To publish data from the realtime loop, you need to run trylock to + * attempt to get unique access to the msg_ variable. Trylock returns + * true if the lock was acquired, and false if it failed to get the lock. + */ + void lock() { this->msg_mutex_.lock(); } + + /** + * \brief Unlocks the data without publishing anything + */ + void unlock() + { + this->msg_mutex_.unlock(); + updated_cond_.notify_one(); + } + +private: + // non-copyable + RealtimePublisher(const RealtimePublisher &) = delete; + RealtimePublisher & operator=(const RealtimePublisher &) = delete; + + void publishingLoop() + { + this->is_running_ = true; + this->turn_ = State::REALTIME; + + while (this->keep_running_) { + MessageT outgoing; + + // Locks msg_ and copies it + std::unique_lock lock_(this->msg_mutex_); + + while (this->turn_ != State::NON_REALTIME && this->keep_running_) { + updated_cond_.wait(lock_); + } + outgoing = this->msg_; + this->turn_ = State::REALTIME; + + unlock(); + + // Sends the outgoing message + if (this->keep_running_) { + this->publisher_->publish(outgoing); + } + } + this->is_running_ = false; + } + + std::condition_variable updated_cond_; }; template From 4637c73861466bf8a46fbb161f09d41d2f1f04b6 Mon Sep 17 00:00:00 2001 From: Lennart Nachtigall Date: Fri, 29 Nov 2024 13:37:57 +0100 Subject: [PATCH 3/6] Fix missing return true --- include/realtime_tools/realtime_publisher.hpp | 1 + 1 file changed, 1 insertion(+) diff --git a/include/realtime_tools/realtime_publisher.hpp b/include/realtime_tools/realtime_publisher.hpp index 58b55bd1..09d31464 100644 --- a/include/realtime_tools/realtime_publisher.hpp +++ b/include/realtime_tools/realtime_publisher.hpp @@ -136,6 +136,7 @@ class RealtimePublisher msg_ = msg; unlockAndPublish(); + return true; } /** \brief Unlock the msg_ variable From 753123c5094f26968725963fb87e3fd5202b516c Mon Sep 17 00:00:00 2001 From: Lennart Nachtigall Date: Mon, 2 Dec 2024 10:43:28 +0100 Subject: [PATCH 4/6] fix missing return --- include/realtime_tools/realtime_publisher.hpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/include/realtime_tools/realtime_publisher.hpp b/include/realtime_tools/realtime_publisher.hpp index 72a52592..15807523 100644 --- a/include/realtime_tools/realtime_publisher.hpp +++ b/include/realtime_tools/realtime_publisher.hpp @@ -190,6 +190,7 @@ class RealtimePublisher this->msg_ = msg; unlockAndPublish(); + return true; } /** @@ -311,6 +312,7 @@ class RealtimePublisher this->msg_ = msg; unlockAndPublish(); + return true; } /** From 7b58ec1c1526a119201afbbadcf86e94056b914a Mon Sep 17 00:00:00 2001 From: Lennart Nachtigall Date: Mon, 2 Dec 2024 10:48:59 +0100 Subject: [PATCH 5/6] added tests for try publish --- test/realtime_publisher_tests.cpp | 41 +++++++++++++++++++ test/realtime_publisher_tests_non_polling.cpp | 41 +++++++++++++++++++ 2 files changed, 82 insertions(+) diff --git a/test/realtime_publisher_tests.cpp b/test/realtime_publisher_tests.cpp index d560b93c..eebfff18 100644 --- a/test/realtime_publisher_tests.cpp +++ b/test/realtime_publisher_tests.cpp @@ -91,3 +91,44 @@ TEST(RealtimePublisher, rt_publish) EXPECT_STREQ(expected_msg, str_callback.msg_.string_value.c_str()); rclcpp::shutdown(); } + +TEST(RealtimePublisher, rt_try_publish) +{ + rclcpp::init(0, nullptr); + const size_t ATTEMPTS = 10; + const std::chrono::milliseconds DELAY(250); + + const char * expected_msg = "Hello World"; + auto node = std::make_shared("construct_move_destruct"); + rclcpp::QoS qos(10); + qos.reliable().transient_local(); + auto pub = node->create_publisher("~/rt_publish", qos); + RealtimePublisher rt_pub(pub); + + // try publish a latched message + bool publish_success = false; + for (std::size_t i = 0; i < ATTEMPTS; ++i) { + StringMsg msg; + msg.string_value = expected_msg; + + if (rt_pub.tryPublish(msg)) { + publish_success = true; + break; + } + std::this_thread::sleep_for(DELAY); + } + ASSERT_TRUE(publish_success); + + // make sure subscriber gets it + StringCallback str_callback; + + auto sub = node->create_subscription( + "~/rt_publish", qos, + std::bind(&StringCallback::callback, &str_callback, std::placeholders::_1)); + for (size_t i = 0; i < ATTEMPTS && str_callback.msg_.string_value.empty(); ++i) { + rclcpp::spin_some(node); + std::this_thread::sleep_for(DELAY); + } + EXPECT_STREQ(expected_msg, str_callback.msg_.string_value.c_str()); + rclcpp::shutdown(); +} diff --git a/test/realtime_publisher_tests_non_polling.cpp b/test/realtime_publisher_tests_non_polling.cpp index 001e796b..f39d1064 100644 --- a/test/realtime_publisher_tests_non_polling.cpp +++ b/test/realtime_publisher_tests_non_polling.cpp @@ -93,3 +93,44 @@ TEST(RealtimePublisherNonPolling, rt_publish) EXPECT_STREQ(expected_msg, str_callback.msg_.string_value.c_str()); rclcpp::shutdown(); } + +TEST(RealtimePublisher, rt_try_publish) +{ + rclcpp::init(0, nullptr); + const size_t ATTEMPTS = 10; + const std::chrono::milliseconds DELAY(250); + + const char * expected_msg = "Hello World"; + auto node = std::make_shared("construct_move_destruct"); + rclcpp::QoS qos(10); + qos.reliable().transient_local(); + auto pub = node->create_publisher("~/rt_publish", qos); + RealtimePublisher rt_pub(pub); + + // try publish a latched message + bool publish_success = false; + for (std::size_t i = 0; i < ATTEMPTS; ++i) { + StringMsg msg; + msg.string_value = expected_msg; + + if (rt_pub.tryPublish(msg)) { + publish_success = true; + break; + } + std::this_thread::sleep_for(DELAY); + } + ASSERT_TRUE(publish_success); + + // make sure subscriber gets it + StringCallback str_callback; + + auto sub = node->create_subscription( + "~/rt_publish", qos, + std::bind(&StringCallback::callback, &str_callback, std::placeholders::_1)); + for (size_t i = 0; i < ATTEMPTS && str_callback.msg_.string_value.empty(); ++i) { + rclcpp::spin_some(node); + std::this_thread::sleep_for(DELAY); + } + EXPECT_STREQ(expected_msg, str_callback.msg_.string_value.c_str()); + rclcpp::shutdown(); +} From 94e02980e5d9f4f61936bd3178f6fe59856cfb08 Mon Sep 17 00:00:00 2001 From: Lennart Nachtigall Date: Mon, 2 Dec 2024 10:55:38 +0100 Subject: [PATCH 6/6] use different approach for define depracation notice --- include/realtime_tools/realtime_publisher.hpp | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/include/realtime_tools/realtime_publisher.hpp b/include/realtime_tools/realtime_publisher.hpp index 15807523..2e757742 100644 --- a/include/realtime_tools/realtime_publisher.hpp +++ b/include/realtime_tools/realtime_publisher.hpp @@ -59,15 +59,15 @@ enum class RealtimePublisherPollingMode { Polling, NonPolling }; #else #undef NON_POLLING #define NON_POLLING RealtimePublisherPollingMode::NonPolling -// Note this is the only way to somehow deprecate a define as #warning is not portable -namespace -{ -[[deprecated( - "NON_POLLING_DEFINE is deprecated - use RealtimePublisher")]] -static constexpr int NON_POLLING_define_is_deprecated = 0; -static constexpr int trigger_NON_POLLING_define_is_deprecated = NON_POLLING_define_is_deprecated; -} // namespace + +// Deprecation notice +#ifdef _WIN32 +#pragma message( \ + "NON_POLLING define is deprecated. Please update your code to use RealtimePublisher") //NOLINT +#else +#warning \ + "NON_POLLING define is deprecated. Please update your code to use RealtimePublisher" //NOLINT +#endif #endif /**