From 968ce0a03fdc92bc3c3b1b360421675ba3bbc191 Mon Sep 17 00:00:00 2001 From: Brian Date: Tue, 28 Feb 2023 13:43:39 -0500 Subject: [PATCH] Service introspection (#1985) * Implementation of service introspection. To do this, we add a new method on the Client and Service classes that allows the user to change the introspection method at runtime. These end up calling into the rcl layer to do the actual configuration, at which point service introspection messages will be sent as configured. Signed-off-by: Chris Lalancette Signed-off-by: Brian Chen --- rclcpp/include/rclcpp/client.hpp | 43 ++- rclcpp/include/rclcpp/service.hpp | 47 ++- rclcpp/src/rclcpp/client.cpp | 3 +- rclcpp/src/rclcpp/service.cpp | 5 +- rclcpp/test/rclcpp/CMakeLists.txt | 11 + .../rclcpp/test_service_introspection.cpp | 339 ++++++++++++++++++ 6 files changed, 430 insertions(+), 18 deletions(-) create mode 100644 rclcpp/test/rclcpp/test_service_introspection.cpp diff --git a/rclcpp/include/rclcpp/client.hpp b/rclcpp/include/rclcpp/client.hpp index c3a2210a71..f3346a067c 100644 --- a/rclcpp/include/rclcpp/client.hpp +++ b/rclcpp/include/rclcpp/client.hpp @@ -16,14 +16,15 @@ #define RCLCPP__CLIENT_HPP_ #include +#include #include -#include #include #include #include // NOLINT, cpplint doesn't think this is a cpp std header #include #include #include +#include #include #include // NOLINT #include @@ -31,8 +32,10 @@ #include "rcl/client.h" #include "rcl/error_handling.h" #include "rcl/event_callback.h" +#include "rcl/service_introspection.h" #include "rcl/wait.h" +#include "rclcpp/clock.hpp" #include "rclcpp/detail/cpp_callback_trampoline.hpp" #include "rclcpp/exceptions.hpp" #include "rclcpp/expand_topic_or_service_name.hpp" @@ -467,15 +470,13 @@ class Client : public ClientBase rclcpp::node_interfaces::NodeGraphInterface::SharedPtr node_graph, const std::string & service_name, rcl_client_options_t & client_options) - : ClientBase(node_base, node_graph) + : ClientBase(node_base, node_graph), + srv_type_support_handle_(rosidl_typesupport_cpp::get_service_type_support_handle()) { - using rosidl_typesupport_cpp::get_service_type_support_handle; - auto service_type_support_handle = - get_service_type_support_handle(); rcl_ret_t ret = rcl_client_init( this->get_client_handle().get(), this->get_rcl_node_handle(), - service_type_support_handle, + srv_type_support_handle_, service_name.c_str(), &client_options); if (ret != RCL_RET_OK) { @@ -781,6 +782,33 @@ class Client : public ClientBase return old_size - pending_requests_.size(); } + /// Configure client introspection. + /** + * \param[in] clock clock to use to generate introspection timestamps + * \param[in] qos_service_event_pub QoS settings to use when creating the introspection publisher + * \param[in] introspection_state the state to set introspection to + */ + void + configure_introspection( + Clock::SharedPtr clock, const QoS & qos_service_event_pub, + rcl_service_introspection_state_t introspection_state) + { + rcl_publisher_options_t pub_opts = rcl_publisher_get_default_options(); + pub_opts.qos = qos_service_event_pub.get_rmw_qos_profile(); + + rcl_ret_t ret = rcl_client_configure_service_introspection( + client_handle_.get(), + node_handle_.get(), + clock->get_clock_handle(), + srv_type_support_handle_, + pub_opts, + introspection_state); + + if (RCL_RET_OK != ret) { + rclcpp::exceptions::throw_from_rcl_error(ret, "failed to configure client introspection"); + } + } + protected: using CallbackTypeValueVariant = std::tuple; using CallbackWithRequestTypeValueVariant = std::tuple< @@ -831,6 +859,9 @@ class Client : public ClientBase CallbackInfoVariant>> pending_requests_; std::mutex pending_requests_mutex_; + +private: + const rosidl_service_type_support_t * srv_type_support_handle_; }; } // namespace rclcpp diff --git a/rclcpp/include/rclcpp/service.hpp b/rclcpp/include/rclcpp/service.hpp index 9e258a0223..3cfc11e9ca 100644 --- a/rclcpp/include/rclcpp/service.hpp +++ b/rclcpp/include/rclcpp/service.hpp @@ -26,6 +26,7 @@ #include "rcl/error_handling.h" #include "rcl/event_callback.h" #include "rcl/service.h" +#include "rcl/service_introspection.h" #include "rmw/error_handling.h" #include "rmw/impl/cpp/demangle.hpp" @@ -34,6 +35,7 @@ #include "tracetools/tracetools.h" #include "rclcpp/any_service_callback.hpp" +#include "rclcpp/clock.hpp" #include "rclcpp/detail/cpp_callback_trampoline.hpp" #include "rclcpp/exceptions.hpp" #include "rclcpp/expand_topic_or_service_name.hpp" @@ -308,11 +310,9 @@ class Service const std::string & service_name, AnyServiceCallback any_callback, rcl_service_options_t & service_options) - : ServiceBase(node_handle), any_callback_(any_callback) + : ServiceBase(node_handle), any_callback_(any_callback), + srv_type_support_handle_(rosidl_typesupport_cpp::get_service_type_support_handle()) { - using rosidl_typesupport_cpp::get_service_type_support_handle; - auto service_type_support_handle = get_service_type_support_handle(); - // rcl does the static memory allocation here service_handle_ = std::shared_ptr( new rcl_service_t, [handle = node_handle_, service_name](rcl_service_t * service) @@ -331,7 +331,7 @@ class Service rcl_ret_t ret = rcl_service_init( service_handle_.get(), node_handle.get(), - service_type_support_handle, + srv_type_support_handle_, service_name.c_str(), &service_options); if (ret != RCL_RET_OK) { @@ -371,8 +371,8 @@ class Service std::shared_ptr node_handle, std::shared_ptr service_handle, AnyServiceCallback any_callback) - : ServiceBase(node_handle), - any_callback_(any_callback) + : ServiceBase(node_handle), any_callback_(any_callback), + srv_type_support_handle_(rosidl_typesupport_cpp::get_service_type_support_handle()) { // check if service handle was initialized if (!rcl_service_is_valid(service_handle.get())) { @@ -406,8 +406,8 @@ class Service std::shared_ptr node_handle, rcl_service_t * service_handle, AnyServiceCallback any_callback) - : ServiceBase(node_handle), - any_callback_(any_callback) + : ServiceBase(node_handle), any_callback_(any_callback), + srv_type_support_handle_(rosidl_typesupport_cpp::get_service_type_support_handle()) { // check if service handle was initialized if (!rcl_service_is_valid(service_handle)) { @@ -487,10 +487,39 @@ class Service } } + /// Configure client introspection. + /** + * \param[in] clock clock to use to generate introspection timestamps + * \param[in] qos_service_event_pub QoS settings to use when creating the introspection publisher + * \param[in] introspection_state the state to set introspection to + */ + void + configure_introspection( + Clock::SharedPtr clock, const QoS & qos_service_event_pub, + rcl_service_introspection_state_t introspection_state) + { + rcl_publisher_options_t pub_opts = rcl_publisher_get_default_options(); + pub_opts.qos = qos_service_event_pub.get_rmw_qos_profile(); + + rcl_ret_t ret = rcl_service_configure_service_introspection( + service_handle_.get(), + node_handle_.get(), + clock->get_clock_handle(), + srv_type_support_handle_, + pub_opts, + introspection_state); + + if (RCL_RET_OK != ret) { + rclcpp::exceptions::throw_from_rcl_error(ret, "failed to configure service introspection"); + } + } + private: RCLCPP_DISABLE_COPY(Service) AnyServiceCallback any_callback_; + + const rosidl_service_type_support_t * srv_type_support_handle_; }; } // namespace rclcpp diff --git a/rclcpp/src/rclcpp/client.cpp b/rclcpp/src/rclcpp/client.cpp index 4adc6d4a96..e33db71ce2 100644 --- a/rclcpp/src/rclcpp/client.cpp +++ b/rclcpp/src/rclcpp/client.cpp @@ -23,9 +23,11 @@ #include "rcl/graph.h" #include "rcl/node.h" #include "rcl/wait.h" + #include "rclcpp/exceptions.hpp" #include "rclcpp/node_interfaces/node_base_interface.hpp" #include "rclcpp/node_interfaces/node_graph_interface.hpp" +#include "rclcpp/qos.hpp" #include "rclcpp/utilities.hpp" #include "rclcpp/logging.hpp" @@ -241,7 +243,6 @@ ClientBase::set_on_new_response_callback(rcl_event_callback_t callback, const vo user_data); if (RCL_RET_OK != ret) { - using rclcpp::exceptions::throw_from_rcl_error; throw_from_rcl_error(ret, "failed to set the on new response callback for client"); } } diff --git a/rclcpp/src/rclcpp/service.cpp b/rclcpp/src/rclcpp/service.cpp index ea1a83a2a6..9c246e4b6b 100644 --- a/rclcpp/src/rclcpp/service.cpp +++ b/rclcpp/src/rclcpp/service.cpp @@ -22,6 +22,7 @@ #include "rclcpp/any_service_callback.hpp" #include "rclcpp/macros.hpp" +#include "rclcpp/qos.hpp" #include "rmw/error_handling.h" #include "rmw/rmw.h" @@ -131,7 +132,7 @@ ServiceBase::set_on_new_request_callback(rcl_event_callback_t callback, const vo user_data); if (RCL_RET_OK != ret) { - using rclcpp::exceptions::throw_from_rcl_error; - throw_from_rcl_error(ret, "failed to set the on new request callback for service"); + rclcpp::exceptions::throw_from_rcl_error( + ret, "failed to set the on new request callback for service"); } } diff --git a/rclcpp/test/rclcpp/CMakeLists.txt b/rclcpp/test/rclcpp/CMakeLists.txt index 1e3b7a65ca..0399f3ae11 100644 --- a/rclcpp/test/rclcpp/CMakeLists.txt +++ b/rclcpp/test/rclcpp/CMakeLists.txt @@ -508,6 +508,17 @@ if(TARGET test_service) ) target_link_libraries(test_service ${PROJECT_NAME} mimick) endif() +ament_add_gmock(test_service_introspection test_service_introspection.cpp) +if(TARGET test_service) + ament_target_dependencies(test_service_introspection + "rcl_interfaces" + "rmw" + "rosidl_runtime_cpp" + "rosidl_typesupport_cpp" + "test_msgs" + ) + target_link_libraries(test_service_introspection ${PROJECT_NAME} mimick) +endif() # Creating and destroying nodes is slow with Connext, so this needs larger timeout. ament_add_gtest(test_subscription test_subscription.cpp TIMEOUT 120) if(TARGET test_subscription) diff --git a/rclcpp/test/rclcpp/test_service_introspection.cpp b/rclcpp/test/rclcpp/test_service_introspection.cpp new file mode 100644 index 0000000000..94c94a58ce --- /dev/null +++ b/rclcpp/test/rclcpp/test_service_introspection.cpp @@ -0,0 +1,339 @@ +// Copyright 2022 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include +#include + +#include "gmock/gmock.h" +#include "rclcpp/rclcpp.hpp" +#include "rclcpp/exceptions.hpp" +#include "rclcpp/logging.hpp" +#include "rclcpp/node_options.hpp" +#include "rclcpp/parameter.hpp" + +#include "../mocking_utils/patch.hpp" +#include "../utils/rclcpp_gtest_macros.hpp" + +#include "test_msgs/srv/basic_types.hpp" +#include "service_msgs/msg/service_event_info.hpp" + +using namespace std::chrono_literals; +using test_msgs::srv::BasicTypes; +using service_msgs::msg::ServiceEventInfo; + + +class TestServiceIntrospection : public ::testing::Test +{ +protected: + static void SetUpTestCase() + { + rclcpp::init(0, nullptr); + } + + static void TearDownTestCase() + { + rclcpp::shutdown(); + } + + void SetUp() + { + node = std::make_shared( + "my_node", "/ns"); + + auto srv_callback = + [](const BasicTypes::Request::SharedPtr & req, const BasicTypes::Response::SharedPtr & resp) { + resp->set__bool_value(!req->bool_value); + resp->set__int64_value(req->int64_value); + return resp; + }; + + auto callback = [this](const std::shared_ptr & msg) { + events.push_back(msg); + (void)msg; + }; + + client = node->create_client("service"); + service = node->create_service("service", srv_callback); + sub = node->create_subscription("service/_service_event", 10, callback); + events.clear(); + } + + void TearDown() + { + node.reset(); + } + + rclcpp::Node::SharedPtr node; + rclcpp::Client::SharedPtr client; + rclcpp::Service::SharedPtr service; + rclcpp::Subscription::SharedPtr sub; + std::vector> events; + std::chrono::milliseconds timeout = std::chrono::milliseconds(1000); +}; + +TEST_F(TestServiceIntrospection, service_introspection_nominal) +{ + auto request = std::make_shared(); + request->set__bool_value(true); + request->set__int64_value(42); + + client->configure_introspection( + node->get_clock(), rclcpp::SystemDefaultsQoS(), RCL_SERVICE_INTROSPECTION_CONTENTS); + service->configure_introspection( + node->get_clock(), rclcpp::SystemDefaultsQoS(), RCL_SERVICE_INTROSPECTION_CONTENTS); + + auto future = client->async_send_request(request); + ASSERT_EQ( + rclcpp::FutureReturnCode::SUCCESS, + rclcpp::spin_until_future_complete(node, future, timeout)); + + BasicTypes::Response::SharedPtr response = future.get(); + ASSERT_EQ(response->bool_value, false); + ASSERT_EQ(response->int64_value, 42); + + // wrap up work to get all the service_event messages + auto start = std::chrono::steady_clock::now(); + while (events.size() < 4 && (std::chrono::steady_clock::now() - start) < timeout) { + rclcpp::spin_some(node); + } + + std::map> event_map; + for (auto & event : events) { + event_map[event->info.event_type] = event; + } + ASSERT_EQ(event_map.size(), 4U); + + rmw_gid_t client_gid; + rmw_get_gid_for_client(rcl_client_get_rmw_handle(client->get_client_handle().get()), &client_gid); + std::array client_gid_arr; + std::move(std::begin(client_gid.data), std::end(client_gid.data), client_gid_arr.begin()); + ASSERT_THAT( + client_gid_arr, + testing::Eq(event_map[ServiceEventInfo::REQUEST_SENT]->info.client_gid)); + + ASSERT_EQ( + event_map[ServiceEventInfo::REQUEST_SENT]->info.sequence_number, + event_map[ServiceEventInfo::REQUEST_RECEIVED]->info.sequence_number); + ASSERT_EQ( + event_map[ServiceEventInfo::RESPONSE_SENT]->info.sequence_number, + event_map[ServiceEventInfo::RESPONSE_RECEIVED]->info.sequence_number); + ASSERT_EQ( + event_map[ServiceEventInfo::REQUEST_SENT]->info.sequence_number, + event_map[ServiceEventInfo::RESPONSE_SENT]->info.sequence_number); + ASSERT_EQ( + event_map[ServiceEventInfo::REQUEST_RECEIVED]->info.sequence_number, + event_map[ServiceEventInfo::RESPONSE_RECEIVED]->info.sequence_number); + + ASSERT_EQ(event_map[ServiceEventInfo::REQUEST_SENT]->request[0].int64_value, 42); + ASSERT_EQ(event_map[ServiceEventInfo::REQUEST_SENT]->request[0].bool_value, true); + ASSERT_EQ(event_map[ServiceEventInfo::RESPONSE_SENT]->response[0].int64_value, 42); + ASSERT_EQ(event_map[ServiceEventInfo::RESPONSE_SENT]->response[0].bool_value, false); + ASSERT_EQ(event_map[ServiceEventInfo::RESPONSE_SENT]->request.size(), 0U); + ASSERT_EQ(event_map[ServiceEventInfo::REQUEST_RECEIVED]->response.size(), 0U); +} + +TEST_F(TestServiceIntrospection, service_introspection_enable_disable_events) +{ + client->configure_introspection( + node->get_clock(), rclcpp::SystemDefaultsQoS(), RCL_SERVICE_INTROSPECTION_OFF); + service->configure_introspection( + node->get_clock(), rclcpp::SystemDefaultsQoS(), RCL_SERVICE_INTROSPECTION_OFF); + + auto request = std::make_shared(); + request->set__bool_value(true); + request->set__int64_value(42); + auto future = client->async_send_request(request); + ASSERT_EQ( + rclcpp::FutureReturnCode::SUCCESS, + rclcpp::spin_until_future_complete(node, future, timeout)); + auto start = std::chrono::steady_clock::now(); + while ((std::chrono::steady_clock::now() - start) < timeout) { + rclcpp::spin_some(node); + } + EXPECT_EQ(events.size(), 0U); + + events.clear(); + + client->configure_introspection( + node->get_clock(), rclcpp::SystemDefaultsQoS(), RCL_SERVICE_INTROSPECTION_METADATA); + service->configure_introspection( + node->get_clock(), rclcpp::SystemDefaultsQoS(), RCL_SERVICE_INTROSPECTION_OFF); + + future = client->async_send_request(request); + ASSERT_EQ( + rclcpp::FutureReturnCode::SUCCESS, + rclcpp::spin_until_future_complete(node, future, timeout)); + start = std::chrono::steady_clock::now(); + while (events.size() < 2 && (std::chrono::steady_clock::now() - start) < timeout) { + rclcpp::spin_some(node); + } + EXPECT_EQ(events.size(), 2U); + + events.clear(); + + client->configure_introspection( + node->get_clock(), rclcpp::SystemDefaultsQoS(), RCL_SERVICE_INTROSPECTION_OFF); + service->configure_introspection( + node->get_clock(), rclcpp::SystemDefaultsQoS(), RCL_SERVICE_INTROSPECTION_METADATA); + + future = client->async_send_request(request); + ASSERT_EQ( + rclcpp::FutureReturnCode::SUCCESS, + rclcpp::spin_until_future_complete(node, future, timeout)); + start = std::chrono::steady_clock::now(); + while (events.size() < 2 && (std::chrono::steady_clock::now() - start) < timeout) { + rclcpp::spin_some(node); + } + EXPECT_EQ(events.size(), 2U); + + events.clear(); + + client->configure_introspection( + node->get_clock(), rclcpp::SystemDefaultsQoS(), RCL_SERVICE_INTROSPECTION_METADATA); + service->configure_introspection( + node->get_clock(), rclcpp::SystemDefaultsQoS(), RCL_SERVICE_INTROSPECTION_METADATA); + + future = client->async_send_request(request); + ASSERT_EQ( + rclcpp::FutureReturnCode::SUCCESS, + rclcpp::spin_until_future_complete(node, future, timeout)); + start = std::chrono::steady_clock::now(); + while (events.size() < 4 && (std::chrono::steady_clock::now() - start) < timeout) { + rclcpp::spin_some(node); + } + EXPECT_EQ(events.size(), 4U); +} + +TEST_F(TestServiceIntrospection, service_introspection_enable_disable_event_content) +{ + client->configure_introspection( + node->get_clock(), rclcpp::SystemDefaultsQoS(), RCL_SERVICE_INTROSPECTION_METADATA); + service->configure_introspection( + node->get_clock(), rclcpp::SystemDefaultsQoS(), RCL_SERVICE_INTROSPECTION_METADATA); + + auto request = std::make_shared(); + request->set__bool_value(true); + request->set__int64_value(42); + auto future = client->async_send_request(request); + ASSERT_EQ( + rclcpp::FutureReturnCode::SUCCESS, + rclcpp::spin_until_future_complete(node, future, timeout)); + auto start = std::chrono::steady_clock::now(); + while (events.size() < 4 && (std::chrono::steady_clock::now() - start) < timeout) { + rclcpp::spin_some(node); + } + EXPECT_EQ(events.size(), 4U); + for (const auto & event : events) { + EXPECT_EQ(event->request.size(), 0U); + EXPECT_EQ(event->response.size(), 0U); + } + + events.clear(); + + client->configure_introspection( + node->get_clock(), rclcpp::SystemDefaultsQoS(), RCL_SERVICE_INTROSPECTION_CONTENTS); + service->configure_introspection( + node->get_clock(), rclcpp::SystemDefaultsQoS(), RCL_SERVICE_INTROSPECTION_METADATA); + + future = client->async_send_request(request); + ASSERT_EQ( + rclcpp::FutureReturnCode::SUCCESS, + rclcpp::spin_until_future_complete(node, future, timeout)); + start = std::chrono::steady_clock::now(); + while (events.size() < 4 && (std::chrono::steady_clock::now() - start) < timeout) { + rclcpp::spin_some(node); + } + EXPECT_EQ(events.size(), 4U); + for (const auto & event : events) { + switch (event->info.event_type) { + case ServiceEventInfo::REQUEST_SENT: + EXPECT_EQ(event->request.size(), 1U); + break; + case ServiceEventInfo::REQUEST_RECEIVED: + EXPECT_EQ(event->request.size(), 0U); + break; + case ServiceEventInfo::RESPONSE_SENT: + EXPECT_EQ(event->response.size(), 0U); + break; + case ServiceEventInfo::RESPONSE_RECEIVED: + EXPECT_EQ(event->response.size(), 1U); + break; + } + } + + events.clear(); + + client->configure_introspection( + node->get_clock(), rclcpp::SystemDefaultsQoS(), RCL_SERVICE_INTROSPECTION_METADATA); + service->configure_introspection( + node->get_clock(), rclcpp::SystemDefaultsQoS(), RCL_SERVICE_INTROSPECTION_CONTENTS); + + future = client->async_send_request(request); + ASSERT_EQ( + rclcpp::FutureReturnCode::SUCCESS, + rclcpp::spin_until_future_complete(node, future, timeout)); + start = std::chrono::steady_clock::now(); + while (events.size() < 4 && (std::chrono::steady_clock::now() - start) < timeout) { + rclcpp::spin_some(node); + } + EXPECT_EQ(events.size(), 4U); + for (const auto & event : events) { + switch (event->info.event_type) { + case ServiceEventInfo::REQUEST_SENT: + EXPECT_EQ(event->request.size(), 0U); + break; + case ServiceEventInfo::REQUEST_RECEIVED: + EXPECT_EQ(event->request.size(), 1U); + break; + case ServiceEventInfo::RESPONSE_SENT: + EXPECT_EQ(event->response.size(), 1U); + break; + case ServiceEventInfo::RESPONSE_RECEIVED: + EXPECT_EQ(event->response.size(), 0U); + break; + } + } + + events.clear(); + + client->configure_introspection( + node->get_clock(), rclcpp::SystemDefaultsQoS(), RCL_SERVICE_INTROSPECTION_CONTENTS); + service->configure_introspection( + node->get_clock(), rclcpp::SystemDefaultsQoS(), RCL_SERVICE_INTROSPECTION_CONTENTS); + + future = client->async_send_request(request); + ASSERT_EQ( + rclcpp::FutureReturnCode::SUCCESS, + rclcpp::spin_until_future_complete(node, future, timeout)); + start = std::chrono::steady_clock::now(); + while (events.size() < 4 && (std::chrono::steady_clock::now() - start) < timeout) { + rclcpp::spin_some(node); + } + EXPECT_EQ(events.size(), 4U); + for (const auto & event : events) { + switch (event->info.event_type) { + case ServiceEventInfo::REQUEST_SENT: + case ServiceEventInfo::REQUEST_RECEIVED: + EXPECT_EQ(event->request.size(), 1U); + break; + case ServiceEventInfo::RESPONSE_SENT: + case ServiceEventInfo::RESPONSE_RECEIVED: + EXPECT_EQ(event->response.size(), 1U); + break; + } + } +}