From d37d6e5ba3105e87800d3d21304e2196e78a0c63 Mon Sep 17 00:00:00 2001 From: ignaciotb Date: Tue, 5 Dec 2023 17:53:05 +0100 Subject: [PATCH] Checking that topics exists before subscribing --- .../config/emergency_config.yaml | 5 +- sam_action_servers/src/emergency_manager.cpp | 53 ++++++++++++++----- 2 files changed, 42 insertions(+), 16 deletions(-) diff --git a/sam_action_servers/config/emergency_config.yaml b/sam_action_servers/config/emergency_config.yaml index 29765ab..eaee694 100644 --- a/sam_action_servers/config/emergency_config.yaml +++ b/sam_action_servers/config/emergency_config.yaml @@ -12,9 +12,6 @@ monitor: - topic: name: core/heartbeat freq: 3. - - topic: - name: core/leak - freq: 4. - topic: name: core/vbs_fb freq: 5. @@ -28,5 +25,5 @@ monitor: name: core/depth20_pressure freq: 10. - topic: - name: uwcomms/heartbeat + name: uwcomms/uw_heartbeat freq: 0.1 \ No newline at end of file diff --git a/sam_action_servers/src/emergency_manager.cpp b/sam_action_servers/src/emergency_manager.cpp index b4788fb..b626778 100644 --- a/sam_action_servers/src/emergency_manager.cpp +++ b/sam_action_servers/src/emergency_manager.cpp @@ -92,7 +92,7 @@ class GenericSensorMonitor public: - std::string topic_name_; + std::string topic_name_, topic_end_; double rate_; ros::CallbackQueue queue; ros::Subscriber subscriber_; @@ -100,10 +100,14 @@ class GenericSensorMonitor actionlib::SimpleActionClient *ac_; ros::NodeHandle *nh_; ros::Publisher thrust_1_cmd_pub_, thrust_2_cmd_pub_, vbs_cmd_pub_; + bool queue_init_, subs_init_; + ros::master::V_TopicInfo master_topics_; - GenericSensorMonitor(std::string &topic, double rate): topic_name_(topic), rate_(rate) + GenericSensorMonitor(std::string &topic, double rate) : topic_name_(topic), rate_(rate) { nh_ = new ros::NodeHandle("~"); + nh_->setCallbackQueue(&queue); + std::string abort_top, emergency_as; nh_->param(("abort_top"), abort_top, "/sam/core/abort"); abort_pub_ = nh_->advertise(abort_top, 10); @@ -116,41 +120,60 @@ class GenericSensorMonitor thrust_2_cmd_pub_ = nh_->advertise(thruster_2_top, 1); vbs_cmd_pub_ = nh_->advertise(vbs_cmd_top, 1); - nh_->setCallbackQueue(&queue); - subscriber_ = nh_->subscribe(topic_name_, 10, &GenericSensorMonitor::topicCB, this); - // Get last word of the topic name. It'll be used to find the heartbeat one std::string delimiter = "/"; size_t pos = 0; std::string token; - while ((pos = topic_name_.find(delimiter)) != std::string::npos) + topic_end_ = topic_name_; + while ((pos = topic_end_.find(delimiter)) != std::string::npos) { - token = topic_name_.substr(0, pos); - topic_name_.erase(0, pos + delimiter.length()); + token = topic_end_.substr(0, pos); + topic_end_.erase(0, pos + delimiter.length()); } + // Flags to monitor topics + queue_init_ = false; + subs_init_ = false; std::thread(&GenericSensorMonitor::CheckQueue, this).detach(); } void CheckQueue() { ros::Rate r(rate_); - bool queue_init = false; while(ros::ok()) { + // Check if topic exists, then subscribe + if(!subs_init_) + { + ros::master::getTopics(master_topics_); + + for (ros::master::V_TopicInfo::iterator it = master_topics_.begin(); it != master_topics_.end(); it++) + { + const ros::master::TopicInfo &info = *it; + if(topic_name_ == info.name) + { + ROS_INFO_STREAM_NAMED("Emergency manager: ", "listening to " << topic_name_); + subscriber_ = nh_->subscribe(topic_name_, 10, &GenericSensorMonitor::topicCB, this); + subs_init_ = true; + } + } + } + // If msg in queue if(!queue.isEmpty()) { queue.callOne(ros::WallDuration(0.01)); - queue_init = true; + queue_init_ = true; } + // If no msg else { - if(queue_init) + // And the queue has already been initialized, error + if(queue_init_) { ROS_ERROR_STREAM_NAMED("Emergency manager: ", "data not coming in " << topic_name_ + " aborting mission"); std_msgs::Empty abort; abort_pub_.publish(abort); - if (topic_name_ == "heartbeat") + if (topic_end_ == "heartbeat") { ROS_ERROR_STREAM_NAMED("Emergency manager ", "BT is down, aborting mission"); this->emergency_no_bt(); @@ -158,6 +181,11 @@ class GenericSensorMonitor break; } } + // If data flow has not started, throw warning + if (!queue_init_) + { + ROS_WARN_STREAM_THROTTLE_NAMED(int(rate_), "Emergency manager: ", "data stream not initialized " << topic_name_); + } r.sleep(); } } @@ -233,6 +261,7 @@ int main(int argn, char* args[]) std::string topic_name = value["name"].as(); double freq = value["freq"].as(); std::string topic = "/" + robot_name + "/" + topic_name; + ros::Duration(0.01).sleep(); monitors.emplace_back(new GenericSensorMonitor(topic, freq * 0.9)); // Decrease slightly freq to account for slow transmission at times } }