diff --git a/include/realtime_tools/realtime_publisher.hpp b/include/realtime_tools/realtime_publisher.hpp index ca004342..2e757742 100644 --- a/include/realtime_tools/realtime_publisher.hpp +++ b/include/realtime_tools/realtime_publisher.hpp @@ -49,62 +49,77 @@ namespace realtime_tools { -template -class RealtimePublisher -{ -private: - using PublisherSharedPtr = typename rclcpp::Publisher::SharedPtr; +// Allow to select the way the RT Publisher is implemented +enum class RealtimePublisherPollingMode { Polling, NonPolling }; + +// Provide backwards compatibility for NON_POLLING define +#ifndef NON_POLLING +#define NON_POLLING RealtimePublisherPollingMode::Polling +#else +#undef NON_POLLING +#define NON_POLLING RealtimePublisherPollingMode::NonPolling + +// Deprecation notice +#ifdef _WIN32 +#pragma message( \ + "NON_POLLING define is deprecated. Please update your code to use RealtimePublisher") //NOLINT +#else +#warning \ + "NON_POLLING define is deprecated. Please update your code to use RealtimePublisher" //NOLINT +#endif +#endif + +/** + * @brief RealtimePublisherBase base class for both RealtimePublisher implementation + * We try to reduce the amount of code duplication by moving some parts into this base class. + */ +template +class RealtimePublisherBase +{ public: - /// The msg_ variable contains the data that will get published on the ROS topic. - Msg msg_; + /// Provide various typedefs to resemble the rclcpp::Publisher type + using PublisherType = rclcpp::Publisher; + using PublisherSharedPtr = typename rclcpp::Publisher::SharedPtr; + using PublishedType = typename rclcpp::TypeAdapter::custom_type; + using ROSMessageType = typename rclcpp::TypeAdapter::ros_message_type; + using PollingMode = RealtimePublisherPollingMode; - /** \brief Constructor for the realtime publisher - * - * \param publisher the publisher to wrap + // The msg_ variable contains the data that will get published on the ROS topic. + // TODO(firesurfer) this should not be public but we keep it at + // the moment for backward compatibility + MessageT msg_; + + /** + * \brief Constructor for the realtime publisher + * \param publisher SharedPtr to the publisher we ant to wrap */ - explicit RealtimePublisher(PublisherSharedPtr publisher) - : publisher_(publisher), is_running_(false), keep_running_(true), turn_(LOOP_NOT_STARTED) + explicit RealtimePublisherBase(const PublisherSharedPtr & publisher) + : publisher_(publisher), is_running_(false), keep_running_(true), turn_(State::LOOP_NOT_STARTED) { - thread_ = std::thread(&RealtimePublisher::publishingLoop, this); } - RealtimePublisher() : is_running_(false), keep_running_(false), turn_(LOOP_NOT_STARTED) {} - - /// Destructor - ~RealtimePublisher() + [[deprecated( + "Use constructor with rclcpp::Publisher::SharedPtr instead - this class does not make sense " + "without a real publisher")]] + RealtimePublisherBase() + : is_running_(false), keep_running_(false), turn_(State::LOOP_NOT_STARTED) { - stop(); - while (is_running()) { - std::this_thread::sleep_for(std::chrono::microseconds(100)); - } - if (thread_.joinable()) { - thread_.join(); - } } - /// Stop the realtime publisher from sending out more ROS messages - void stop() - { - keep_running_ = false; -#ifdef NON_POLLING - updated_cond_.notify_one(); // So the publishing loop can exit -#endif - } - - /** \brief Try to get the data lock from realtime - * + /** + * \brief Try to get the data lock from realtime * To publish data from the realtime loop, you need to run trylock to * attempt to get unique access to the msg_ variable. Trylock returns * true if the lock was acquired, and false if it failed to get the lock. */ bool trylock() { - if (msg_mutex_.try_lock()) { - if (turn_ == REALTIME) { + if (this->msg_mutex_.try_lock()) { + if (this->turn_ == State::REALTIME) { return true; } else { - msg_mutex_.unlock(); + this->msg_mutex_.unlock(); return false; } } else { @@ -112,110 +127,260 @@ class RealtimePublisher } } - /** \brief Unlock the msg_ variable - * +protected: + bool is_running() const { return is_running_; } + + PublisherSharedPtr publisher_; + std::atomic is_running_; + std::atomic keep_running_; + + std::thread thread_; + + std::mutex msg_mutex_; // Protects msg_ + + enum class State : int { REALTIME, NON_REALTIME, LOOP_NOT_STARTED }; + std::atomic turn_; // Who's turn is it to use msg_? +}; + +template +class RealtimePublisher; + +template +class RealtimePublisher +: public RealtimePublisherBase +{ +private: + // For templated base classes we need to bring using statements into our own scope + using Base = RealtimePublisherBase; + using typename Base::PublisherSharedPtr; + using typename Base::State; + +public: + RCLCPP_SMART_PTR_DEFINITIONS(RealtimePublisher) + + explicit RealtimePublisher(PublisherSharedPtr publisher) : Base(publisher) + { + this->thread_ = std::thread(&RealtimePublisher::publishingLoop, this); + } + // Provide an empty constructor for compatibility + RealtimePublisher() : Base() {} + ~RealtimePublisher() + { + stop(); + while (this->is_running()) { + std::this_thread::sleep_for(std::chrono::microseconds(100)); + } + if (this->thread_.joinable()) { + this->thread_.join(); + } + } + + // Stop the realtime publisher from sending out more ROS messages + void stop() { this->keep_running_ = false; } + + /** + * \brief Try to get the data lock from realtime and publish the given message + * Tries to gain unique access to msg_ variable. If this succeeds + * update the msg_ variable and call unlockAndPublish + * @return false in case no lock for the realtime variable could be acquired + */ + bool tryPublish(const MessageT & msg) + { + if (!this->trylock()) return false; + + this->msg_ = msg; + unlockAndPublish(); + return true; + } + + /** + * \brief Unlock the msg_ variable * After a successful trylock and after the data is written to the mgs_ * variable, the lock has to be released for the message to get * published on the specified topic. */ void unlockAndPublish() { - turn_ = NON_REALTIME; + this->turn_ = State::NON_REALTIME; unlock(); } - /** \brief Get the data lock form non-realtime - * + /** + * \brief Get the data lock form non-realtime * To publish data from the realtime loop, you need to run trylock to * attempt to get unique access to the msg_ variable. Trylock returns * true if the lock was acquired, and false if it failed to get the lock. */ void lock() { -#ifdef NON_POLLING - msg_mutex_.lock(); -#else // never actually block on the lock - while (!msg_mutex_.try_lock()) { + while (!this->msg_mutex_.try_lock()) { std::this_thread::sleep_for(std::chrono::microseconds(200)); } -#endif } /** \brief Unlocks the data without publishing anything * */ - void unlock() - { - msg_mutex_.unlock(); -#ifdef NON_POLLING - updated_cond_.notify_one(); -#endif - } + void unlock() { this->msg_mutex_.unlock(); } private: // non-copyable RealtimePublisher(const RealtimePublisher &) = delete; RealtimePublisher & operator=(const RealtimePublisher &) = delete; - bool is_running() const { return is_running_; } - void publishingLoop() { - is_running_ = true; - turn_ = REALTIME; + this->is_running_ = true; + this->turn_ = State::REALTIME; - while (keep_running_) { - Msg outgoing; + while (this->keep_running_) { + MessageT outgoing; // Locks msg_ and copies it - -#ifdef NON_POLLING - std::unique_lock lock_(msg_mutex_); -#else lock(); -#endif - while (turn_ != NON_REALTIME && keep_running_) { -#ifdef NON_POLLING - updated_cond_.wait(lock_); -#else + while (this->turn_ != State::NON_REALTIME && this->keep_running_) { unlock(); std::this_thread::sleep_for(std::chrono::microseconds(500)); lock(); -#endif } - outgoing = msg_; - turn_ = REALTIME; + outgoing = this->msg_; + this->turn_ = State::REALTIME; unlock(); // Sends the outgoing message - if (keep_running_) { - publisher_->publish(outgoing); + if (this->keep_running_) { + this->publisher_->publish(outgoing); } } - is_running_ = false; + this->is_running_ = false; } +}; - PublisherSharedPtr publisher_; - std::atomic is_running_; - std::atomic keep_running_; +template +class RealtimePublisher +: public RealtimePublisherBase +{ +private: + using Base = RealtimePublisherBase; + using typename Base::PublisherSharedPtr; + using typename Base::State; - std::thread thread_; +public: + RCLCPP_SMART_PTR_DEFINITIONS(RealtimePublisher) - std::mutex msg_mutex_; // Protects msg_ + /** + * \brief Constructor for the realtime publisher + * \param publisher the publisher to wrap + */ + explicit RealtimePublisher(const PublisherSharedPtr & publisher) : Base(publisher) + { + this->thread_ = std::thread(&RealtimePublisher::publishingLoop, this); + } + // Provide an empty constructor for compatibility + RealtimePublisher() : Base() {} -#ifdef NON_POLLING - std::condition_variable updated_cond_; -#endif + ~RealtimePublisher() + { + stop(); + while (this->is_running()) { + std::this_thread::sleep_for(std::chrono::microseconds(100)); + } + if (this->thread_.joinable()) { + this->thread_.join(); + } + } + + /// Stop the realtime publisher from sending out more ROS messages + void stop() + { + this->keep_running_ = false; + updated_cond_.notify_one(); // So the publishing loop can exit + } - enum { REALTIME, NON_REALTIME, LOOP_NOT_STARTED }; - std::atomic turn_; // Who's turn is it to use msg_? + /** + * \brief Try to get the data lock from realtime and publish the given message + * Tries to gain unique access to msg_ variable. If this succeeds + * update the msg_ variable and call unlockAndPublish + * @return false in case no lock for the realtime variable could be acquired + */ + bool tryPublish(const MessageT & msg) + { + if (!this->trylock()) return false; + + this->msg_ = msg; + unlockAndPublish(); + return true; + } + + /** + * \brief Unlock the msg_ variable + * After a successful trylock and after the data is written to the mgs_ + * variable, the lock has to be released for the message to get + * published on the specified topic. + */ + void unlockAndPublish() + { + this->turn_ = State::NON_REALTIME; + unlock(); + } + + /** + * \brief Get the data lock form non-realtime + * To publish data from the realtime loop, you need to run trylock to + * attempt to get unique access to the msg_ variable. Trylock returns + * true if the lock was acquired, and false if it failed to get the lock. + */ + void lock() { this->msg_mutex_.lock(); } + + /** + * \brief Unlocks the data without publishing anything + */ + void unlock() + { + this->msg_mutex_.unlock(); + updated_cond_.notify_one(); + } + +private: + // non-copyable + RealtimePublisher(const RealtimePublisher &) = delete; + RealtimePublisher & operator=(const RealtimePublisher &) = delete; + + void publishingLoop() + { + this->is_running_ = true; + this->turn_ = State::REALTIME; + + while (this->keep_running_) { + MessageT outgoing; + + // Locks msg_ and copies it + std::unique_lock lock_(this->msg_mutex_); + + while (this->turn_ != State::NON_REALTIME && this->keep_running_) { + updated_cond_.wait(lock_); + } + outgoing = this->msg_; + this->turn_ = State::REALTIME; + + unlock(); + + // Sends the outgoing message + if (this->keep_running_) { + this->publisher_->publish(outgoing); + } + } + this->is_running_ = false; + } + + std::condition_variable updated_cond_; }; -template -using RealtimePublisherSharedPtr = std::shared_ptr>; +template +using RealtimePublisherSharedPtr = std::shared_ptr>; } // namespace realtime_tools #endif // REALTIME_TOOLS__REALTIME_PUBLISHER_HPP_ diff --git a/test/realtime_publisher_tests.cpp b/test/realtime_publisher_tests.cpp index d560b93c..eebfff18 100644 --- a/test/realtime_publisher_tests.cpp +++ b/test/realtime_publisher_tests.cpp @@ -91,3 +91,44 @@ TEST(RealtimePublisher, rt_publish) EXPECT_STREQ(expected_msg, str_callback.msg_.string_value.c_str()); rclcpp::shutdown(); } + +TEST(RealtimePublisher, rt_try_publish) +{ + rclcpp::init(0, nullptr); + const size_t ATTEMPTS = 10; + const std::chrono::milliseconds DELAY(250); + + const char * expected_msg = "Hello World"; + auto node = std::make_shared("construct_move_destruct"); + rclcpp::QoS qos(10); + qos.reliable().transient_local(); + auto pub = node->create_publisher("~/rt_publish", qos); + RealtimePublisher rt_pub(pub); + + // try publish a latched message + bool publish_success = false; + for (std::size_t i = 0; i < ATTEMPTS; ++i) { + StringMsg msg; + msg.string_value = expected_msg; + + if (rt_pub.tryPublish(msg)) { + publish_success = true; + break; + } + std::this_thread::sleep_for(DELAY); + } + ASSERT_TRUE(publish_success); + + // make sure subscriber gets it + StringCallback str_callback; + + auto sub = node->create_subscription( + "~/rt_publish", qos, + std::bind(&StringCallback::callback, &str_callback, std::placeholders::_1)); + for (size_t i = 0; i < ATTEMPTS && str_callback.msg_.string_value.empty(); ++i) { + rclcpp::spin_some(node); + std::this_thread::sleep_for(DELAY); + } + EXPECT_STREQ(expected_msg, str_callback.msg_.string_value.c_str()); + rclcpp::shutdown(); +} diff --git a/test/realtime_publisher_tests_non_polling.cpp b/test/realtime_publisher_tests_non_polling.cpp index 001e796b..f39d1064 100644 --- a/test/realtime_publisher_tests_non_polling.cpp +++ b/test/realtime_publisher_tests_non_polling.cpp @@ -93,3 +93,44 @@ TEST(RealtimePublisherNonPolling, rt_publish) EXPECT_STREQ(expected_msg, str_callback.msg_.string_value.c_str()); rclcpp::shutdown(); } + +TEST(RealtimePublisher, rt_try_publish) +{ + rclcpp::init(0, nullptr); + const size_t ATTEMPTS = 10; + const std::chrono::milliseconds DELAY(250); + + const char * expected_msg = "Hello World"; + auto node = std::make_shared("construct_move_destruct"); + rclcpp::QoS qos(10); + qos.reliable().transient_local(); + auto pub = node->create_publisher("~/rt_publish", qos); + RealtimePublisher rt_pub(pub); + + // try publish a latched message + bool publish_success = false; + for (std::size_t i = 0; i < ATTEMPTS; ++i) { + StringMsg msg; + msg.string_value = expected_msg; + + if (rt_pub.tryPublish(msg)) { + publish_success = true; + break; + } + std::this_thread::sleep_for(DELAY); + } + ASSERT_TRUE(publish_success); + + // make sure subscriber gets it + StringCallback str_callback; + + auto sub = node->create_subscription( + "~/rt_publish", qos, + std::bind(&StringCallback::callback, &str_callback, std::placeholders::_1)); + for (size_t i = 0; i < ATTEMPTS && str_callback.msg_.string_value.empty(); ++i) { + rclcpp::spin_some(node); + std::this_thread::sleep_for(DELAY); + } + EXPECT_STREQ(expected_msg, str_callback.msg_.string_value.c_str()); + rclcpp::shutdown(); +}