diff --git a/rcl/include/rcl/macros.h b/rcl/include/rcl/macros.h
index 4df9ff427..973dec116 100644
--- a/rcl/include/rcl/macros.h
+++ b/rcl/include/rcl/macros.h
@@ -27,6 +27,37 @@ extern "C"
#define RCL_UNUSED(x) RCUTILS_UNUSED(x)
+#define RCL_RET_FROM_RCUTIL_RET(rcl_ret_var, rcutils_expr) \
+ { \
+ rcutils_ret_t rcutils_ret = rcutils_expr; \
+ if (RCUTILS_RET_OK != rcutils_ret) { \
+ if (rcutils_error_is_set()) { \
+ RCL_SET_ERROR_MSG(rcutils_get_error_string().str); \
+ } else { \
+ RCL_SET_ERROR_MSG_WITH_FORMAT_STRING("rcutils_ret_t code: %i", rcutils_ret); \
+ } \
+ } \
+ switch (rcutils_ret) { \
+ case RCUTILS_RET_OK: \
+ rcl_ret_var = RCL_RET_OK; \
+ break; \
+ case RCUTILS_RET_ERROR: \
+ rcl_ret_var = RCL_RET_ERROR; \
+ break; \
+ case RCUTILS_RET_BAD_ALLOC: \
+ rcl_ret_var = RCL_RET_BAD_ALLOC; \
+ break; \
+ case RCUTILS_RET_INVALID_ARGUMENT: \
+ rcl_ret_var = RCL_RET_INVALID_ARGUMENT; \
+ break; \
+ case RCUTILS_RET_NOT_INITIALIZED: \
+ rcl_ret_var = RCL_RET_NOT_INIT; \
+ break; \
+ default: \
+ rcl_ret_var = RCUTILS_RET_ERROR; \
+ } \
+ }
+
#ifdef __cplusplus
}
#endif
diff --git a/rcl/include/rcl/subscription.h b/rcl/include/rcl/subscription.h
index 452cb694b..3e08f42ae 100644
--- a/rcl/include/rcl/subscription.h
+++ b/rcl/include/rcl/subscription.h
@@ -27,6 +27,7 @@ extern "C"
#include "rcl/macros.h"
#include "rcl/node.h"
#include "rcl/visibility_control.h"
+#include "rcutils/types/string_array.h"
#include "rmw/message_sequence.h"
@@ -208,6 +209,110 @@ RCL_WARN_UNUSED
rcl_subscription_options_t
rcl_subscription_get_default_options(void);
+/// Reclaim resources held inside rcl_subscription_options_t structure.
+/**
+ *
+ * Attribute | Adherence
+ * ------------------ | -------------
+ * Allocates Memory | No
+ * Thread-Safe | No
+ * Uses Atomics | No
+ * Lock-Free | Yes
+ *
+ * \param[in] option The structure which its resources have to be deallocated.
+ * \return `RCL_RET_OK` if the memory was successfully freed, or
+ * \return `RCL_RET_INVALID_ARGUMENT` if option is NULL, or
+ * if its allocator is invalid and the structure contains initialized memory.
+ */
+RCL_PUBLIC
+RCL_WARN_UNUSED
+rcl_ret_t
+rcl_subscription_options_fini(rcl_subscription_options_t * option);
+
+/// Check if the content filtered topic feature is supported in the subscription.
+/**
+ * Depending on the middleware and whether cft is supported in the subscription.
+ * this will return true if the middleware can support ContentFilteredTopic in the subscription.
+ */
+RCL_PUBLIC
+RCL_WARN_UNUSED
+bool
+rcl_subscription_is_cft_supported(const rcl_subscription_t * subscription);
+
+/// Set the filter expression and expression parameters for the subscription.
+/**
+ * This function will set a filter expression and an array of expression parameters
+ * for the given subscription, but not to update the original rcl_subscription_options_t
+ * of subscription.
+ *
+ *
+ * Attribute | Adherence
+ * ------------------ | -------------
+ * Allocates Memory | No
+ * Thread-Safe | No
+ * Uses Atomics | Maybe [1]
+ * Lock-Free | Maybe [1]
+ *
+ * \param[in] subscription the subscription object to inspect.
+ * \param[in] filter_expression A filter_expression is a string that specifies the criteria
+ * to select the data samples of interest. It is similar to the WHERE part of an SQL clause.
+ * Using an empty("") string can reset/clean content filtered topic for the subscription.
+ * \param[in] expression_parameters An expression_parameters is an array of strings that
+ * give values to the ‘parameters’ (i.e., "%n" tokens begin from 0) in the filter_expression.
+ * The number of supplied parameters must fit with the requested values in the filter_expression.
+ * It can be NULL if there is no "%n" tokens placeholder in filter_expression.
+ * The maximun size allowance depends on concrete DDS vendor.
+ * (i.e., it cannot be greater than 100 on RTI_Connext.)
+ * \return `RCL_RET_OK` if the query was successful, or
+ * \return `RCL_RET_INVALID_ARGUMENT` if `subscription` is NULL, or
+ * \return `RCL_RET_INVALID_ARGUMENT` if `filter_expression` is NULL, or
+ * \return `RCL_RET_UNSUPPORTED` if the implementation does not support content filter topic, or
+ * \return `RCL_RET_ERROR` if an unspecified error occurs.
+ */
+RCL_PUBLIC
+RCL_WARN_UNUSED
+rcl_ret_t
+rcl_subscription_set_cft_expression_parameters(
+ const rcl_subscription_t * subscription,
+ const char * filter_expression,
+ const rcutils_string_array_t * expression_parameters
+);
+
+/// Retrieve the filter expression of the subscription.
+/**
+ * This function will return an filter expression by the given subscription.
+ *
+ *
+ * Attribute | Adherence
+ * ------------------ | -------------
+ * Allocates Memory | Yes
+ * Thread-Safe | No
+ * Uses Atomics | Maybe [1]
+ * Lock-Free | Maybe [1]
+ *
+ * \param[in] subscription the subscription object to inspect.
+ * \param[out] filter_expression an filter expression, populated on success.
+ * It is up to the caller to deallocate the filter expression later on,
+ * using rcutils_get_default_allocator().deallocate().
+ * \param[out] expression_parameters Array of expression parameters, populated on success.
+ * It is up to the caller to finalize this array later on, using rcutils_string_array_fini().
+ * \return `RCL_RET_OK` if the query was successful, or
+ * \return `RCL_RET_INVALID_ARGUMENT` if `subscription` is NULL, or
+ * \return `RCL_RET_INVALID_ARGUMENT` if `filter_expression` is NULL, or
+ * \return `RCL_RET_INVALID_ARGUMENT` if `expression_parameters` is NULL, or
+ * \return `RCL_RET_BAD_ALLOC` if memory allocation fails, or
+ * \return `RCL_RET_UNSUPPORTED` if the implementation does not support content filter topic, or
+ * \return `RCL_RET_ERROR` if an unspecified error occurs.
+ */
+RCL_PUBLIC
+RCL_WARN_UNUSED
+rcl_ret_t
+rcl_subscription_get_cft_expression_parameters(
+ const rcl_subscription_t * subscription,
+ char ** filter_expression,
+ rcutils_string_array_t * expression_parameters
+);
+
/// Take a ROS message from a topic using a rcl subscription.
/**
* It is the job of the caller to ensure that the type of the ros_message
diff --git a/rcl/src/rcl/logging_rosout.c b/rcl/src/rcl/logging_rosout.c
index 4133a52f6..c1803f72a 100644
--- a/rcl/src/rcl/logging_rosout.c
+++ b/rcl/src/rcl/logging_rosout.c
@@ -15,6 +15,7 @@
#include "rcl/allocator.h"
#include "rcl/error_handling.h"
#include "rcl/logging_rosout.h"
+#include "rcl/macros.h"
#include "rcl/node.h"
#include "rcl/publisher.h"
#include "rcl/time.h"
@@ -43,37 +44,6 @@ extern "C"
return RCL_RET_OK; \
}
-#define RCL_RET_FROM_RCUTIL_RET(rcl_ret_var, rcutils_expr) \
- { \
- rcutils_ret_t rcutils_ret = rcutils_expr; \
- if (RCUTILS_RET_OK != rcutils_ret) { \
- if (rcutils_error_is_set()) { \
- RCL_SET_ERROR_MSG(rcutils_get_error_string().str); \
- } else { \
- RCL_SET_ERROR_MSG_WITH_FORMAT_STRING("rcutils_ret_t code: %i", rcutils_ret); \
- } \
- } \
- switch (rcutils_ret) { \
- case RCUTILS_RET_OK: \
- rcl_ret_var = RCL_RET_OK; \
- break; \
- case RCUTILS_RET_ERROR: \
- rcl_ret_var = RCL_RET_ERROR; \
- break; \
- case RCUTILS_RET_BAD_ALLOC: \
- rcl_ret_var = RCL_RET_BAD_ALLOC; \
- break; \
- case RCUTILS_RET_INVALID_ARGUMENT: \
- rcl_ret_var = RCL_RET_INVALID_ARGUMENT; \
- break; \
- case RCUTILS_RET_NOT_INITIALIZED: \
- rcl_ret_var = RCL_RET_NOT_INIT; \
- break; \
- default: \
- rcl_ret_var = RCUTILS_RET_ERROR; \
- } \
- }
-
typedef struct rosout_map_entry_t
{
rcl_node_t * node;
diff --git a/rcl/src/rcl/subscription.c b/rcl/src/rcl/subscription.c
index 611994356..dd459b060 100644
--- a/rcl/src/rcl/subscription.c
+++ b/rcl/src/rcl/subscription.c
@@ -24,6 +24,8 @@ extern "C"
#include "rcl/error_handling.h"
#include "rcl/node.h"
#include "rcutils/logging_macros.h"
+#include "rcutils/strdup.h"
+#include "rcutils/types/string_array.h"
#include "rmw/error_handling.h"
#include "rmw/validate_full_topic_name.h"
#include "tracetools/tracetools.h"
@@ -92,6 +94,7 @@ rcl_subscription_init(
sizeof(rcl_subscription_impl_t), allocator->state);
RCL_CHECK_FOR_NULL_WITH_MSG(
subscription->impl, "allocating memory failed", ret = RCL_RET_BAD_ALLOC; goto cleanup);
+ subscription->impl->options = rcl_subscription_get_default_options();
// Fill out the implemenation struct.
// rmw_handle
// TODO(wjwwood): pass allocator once supported in rmw api.
@@ -115,8 +118,8 @@ rcl_subscription_init(
}
subscription->impl->actual_qos.avoid_ros_namespace_conventions =
options->qos.avoid_ros_namespace_conventions;
- // options
subscription->impl->options = *options;
+
RCUTILS_LOG_DEBUG_NAMED(ROS_PACKAGE_NAME, "Subscription initialized");
ret = RCL_RET_OK;
TRACEPOINT(
@@ -138,6 +141,12 @@ rcl_subscription_init(
}
}
+ ret = rcl_subscription_options_fini(&subscription->impl->options);
+ if (RCL_RET_OK != ret) {
+ RCUTILS_SAFE_FWRITE_TO_STDERR(rmw_get_error_string().str);
+ RCUTILS_SAFE_FWRITE_TO_STDERR("\n");
+ }
+
allocator->deallocate(subscription->impl, allocator->state);
subscription->impl = NULL;
}
@@ -174,6 +183,13 @@ rcl_subscription_fini(rcl_subscription_t * subscription, rcl_node_t * node)
RCL_SET_ERROR_MSG(rmw_get_error_string().str);
result = RCL_RET_ERROR;
}
+ rcl_ret_t rcl_ret = rcl_subscription_options_fini(&subscription->impl->options);
+ if (RCL_RET_OK != rcl_ret) {
+ RCUTILS_SAFE_FWRITE_TO_STDERR(rcl_get_error_string().str);
+ RCUTILS_SAFE_FWRITE_TO_STDERR("\n");
+ result = RCL_RET_ERROR;
+ }
+
allocator.deallocate(subscription->impl, allocator.state);
subscription->impl = NULL;
}
@@ -193,6 +209,90 @@ rcl_subscription_get_default_options()
return default_options;
}
+RCL_PUBLIC
+RCL_WARN_UNUSED
+rcl_ret_t
+rcl_subscription_options_fini(rcl_subscription_options_t * option)
+{
+ RCL_CHECK_ARGUMENT_FOR_NULL(option, RCL_RET_INVALID_ARGUMENT);
+ // fini rmw_subscription_options_t
+ const rcl_allocator_t * allocator = &option->allocator;
+ RCL_CHECK_ALLOCATOR_WITH_MSG(allocator, "invalid allocator", return RCL_RET_INVALID_ARGUMENT);
+ if (option->rmw_subscription_options.filter_expression) {
+ allocator->deallocate(option->rmw_subscription_options.filter_expression, allocator->state);
+ option->rmw_subscription_options.filter_expression = NULL;
+ }
+
+ if (option->rmw_subscription_options.expression_parameters) {
+ rcutils_ret_t ret = rcutils_string_array_fini(
+ option->rmw_subscription_options.expression_parameters);
+ if (RCUTILS_RET_OK != ret) {
+ RCUTILS_SAFE_FWRITE_TO_STDERR("Failed to fini string array.\n");
+ }
+ allocator->deallocate(option->rmw_subscription_options.expression_parameters, allocator->state);
+ option->rmw_subscription_options.expression_parameters = NULL;
+ }
+ return RCL_RET_OK;
+}
+
+bool
+rcl_subscription_is_cft_supported(const rcl_subscription_t * subscription)
+{
+ if (!rcl_subscription_is_valid(subscription)) {
+ return false;
+ }
+ return subscription->impl->rmw_handle->is_cft_supported;
+}
+
+rcl_ret_t
+rcl_subscription_set_cft_expression_parameters(
+ const rcl_subscription_t * subscription,
+ const char * filter_expression,
+ const rcutils_string_array_t * expression_parameters
+)
+{
+ RCUTILS_CAN_RETURN_WITH_ERROR_OF(RCL_RET_SUBSCRIPTION_INVALID);
+ RCUTILS_CAN_RETURN_WITH_ERROR_OF(RCL_RET_INVALID_ARGUMENT);
+
+ if (!rcl_subscription_is_valid(subscription)) {
+ return RCL_RET_SUBSCRIPTION_INVALID;
+ }
+ RCL_CHECK_ARGUMENT_FOR_NULL(filter_expression, RCL_RET_INVALID_ARGUMENT);
+ rmw_ret_t ret = rmw_subscription_set_cft_expression_parameters(
+ subscription->impl->rmw_handle, filter_expression, expression_parameters);
+
+ if (ret != RMW_RET_OK) {
+ RCL_SET_ERROR_MSG(rmw_get_error_string().str);
+ return rcl_convert_rmw_ret_to_rcl_ret(ret);
+ }
+ return RCL_RET_OK;
+}
+
+rcl_ret_t
+rcl_subscription_get_cft_expression_parameters(
+ const rcl_subscription_t * subscription,
+ char ** filter_expression,
+ rcutils_string_array_t * expression_parameters
+)
+{
+ RCUTILS_CAN_RETURN_WITH_ERROR_OF(RCL_RET_SUBSCRIPTION_INVALID);
+ RCUTILS_CAN_RETURN_WITH_ERROR_OF(RCL_RET_INVALID_ARGUMENT);
+
+ if (!rcl_subscription_is_valid(subscription)) {
+ return RCL_RET_SUBSCRIPTION_INVALID;
+ }
+ RCL_CHECK_ARGUMENT_FOR_NULL(filter_expression, RCL_RET_INVALID_ARGUMENT);
+ RCL_CHECK_ARGUMENT_FOR_NULL(expression_parameters, RCL_RET_INVALID_ARGUMENT);
+ rmw_ret_t ret = rmw_subscription_get_cft_expression_parameters(
+ subscription->impl->rmw_handle, filter_expression, expression_parameters);
+
+ if (ret != RMW_RET_OK) {
+ RCL_SET_ERROR_MSG(rmw_get_error_string().str);
+ return rcl_convert_rmw_ret_to_rcl_ret(ret);
+ }
+ return RCL_RET_OK;
+}
+
rcl_ret_t
rcl_take(
const rcl_subscription_t * subscription,
diff --git a/rcl/test/rcl/test_subscription.cpp b/rcl/test/rcl/test_subscription.cpp
index adedd7bb9..c5e64d383 100644
--- a/rcl/test/rcl/test_subscription.cpp
+++ b/rcl/test/rcl/test_subscription.cpp
@@ -23,6 +23,7 @@
#include "rmw/rmw.h"
#include "rmw/validate_full_topic_name.h"
+#include "rcutils/strdup.h"
#include "rcutils/testing/fault_injection.h"
#include "test_msgs/msg/basic_types.h"
#include "test_msgs/msg/strings.h"
@@ -847,6 +848,388 @@ TEST_F(CLASSNAME(TestSubscriptionFixture, RMW_IMPLEMENTATION), test_bad_return_l
rcl_subscription_fini(&subscription, this->node_ptr)) << rcl_get_error_string().str;
}
+
+/* A subscription with a content filtered topic setting.
+ */
+TEST_F(
+ CLASSNAME(
+ TestSubscriptionFixture,
+ RMW_IMPLEMENTATION), test_subscription_content_filtered) {
+ bool is_vendor_support_cft =
+ (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0);
+
+ rcl_ret_t ret;
+ rcl_publisher_t publisher = rcl_get_zero_initialized_publisher();
+ const rosidl_message_type_support_t * ts =
+ ROSIDL_GET_MSG_TYPE_SUPPORT(test_msgs, msg, Strings);
+ constexpr char topic[] = "rcl_test_subscription_content_filtered_chatter";
+ rcl_publisher_options_t publisher_options = rcl_publisher_get_default_options();
+ ret = rcl_publisher_init(&publisher, this->node_ptr, ts, topic, &publisher_options);
+ ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
+ OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT(
+ {
+ rcl_ret_t ret = rcl_publisher_fini(&publisher, this->node_ptr);
+ EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
+ });
+ rcl_subscription_t subscription = rcl_get_zero_initialized_subscription();
+ rcl_subscription_options_t subscription_options = rcl_subscription_get_default_options();
+ subscription_options.rmw_subscription_options.filter_expression =
+ rcutils_strdup("string_value MATCH 'FilteredData'", subscription_options.allocator);
+ ret = rcl_subscription_init(&subscription, this->node_ptr, ts, topic, &subscription_options);
+ ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
+ OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT(
+ {
+ rcl_ret_t ret = rcl_subscription_fini(&subscription, this->node_ptr);
+ EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
+ });
+ if (is_vendor_support_cft) {
+ ASSERT_TRUE(rcl_subscription_is_cft_supported(&subscription));
+ } else {
+ ASSERT_FALSE(rcl_subscription_is_cft_supported(&subscription));
+ }
+ ASSERT_TRUE(wait_for_established_subscription(&publisher, 10, 100));
+ constexpr char test_string[] = "NotFilteredData";
+ {
+ test_msgs__msg__Strings msg;
+ test_msgs__msg__Strings__init(&msg);
+ ASSERT_TRUE(rosidl_runtime_c__String__assign(&msg.string_value, test_string));
+ ret = rcl_publish(&publisher, &msg, nullptr);
+ test_msgs__msg__Strings__fini(&msg);
+ ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
+ }
+
+ if (is_vendor_support_cft) {
+ ASSERT_FALSE(wait_for_subscription_to_be_ready(&subscription, context_ptr, 10, 100));
+ } else {
+ ASSERT_TRUE(wait_for_subscription_to_be_ready(&subscription, context_ptr, 10, 100));
+
+ test_msgs__msg__Strings msg;
+ test_msgs__msg__Strings__init(&msg);
+ OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT(
+ {
+ test_msgs__msg__Strings__fini(&msg);
+ });
+ ret = rcl_take(&subscription, &msg, nullptr, nullptr);
+ ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
+ ASSERT_EQ(
+ std::string(test_string),
+ std::string(msg.string_value.data, msg.string_value.size));
+ }
+
+ constexpr char test_filtered_string[] = "FilteredData";
+ {
+ test_msgs__msg__Strings msg;
+ test_msgs__msg__Strings__init(&msg);
+ ASSERT_TRUE(rosidl_runtime_c__String__assign(&msg.string_value, test_filtered_string));
+ ret = rcl_publish(&publisher, &msg, nullptr);
+ test_msgs__msg__Strings__fini(&msg);
+ ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
+ }
+
+ ASSERT_TRUE(wait_for_subscription_to_be_ready(&subscription, context_ptr, 10, 100));
+
+ {
+ test_msgs__msg__Strings msg;
+ test_msgs__msg__Strings__init(&msg);
+ OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT(
+ {
+ test_msgs__msg__Strings__fini(&msg);
+ });
+ ret = rcl_take(&subscription, &msg, nullptr, nullptr);
+ ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
+ ASSERT_EQ(
+ std::string(test_filtered_string),
+ std::string(msg.string_value.data, msg.string_value.size));
+ }
+
+ // set filter
+ {
+ rcutils_string_array_t expression_parameters;
+ expression_parameters = rcutils_get_zero_initialized_string_array();
+ rcl_allocator_t allocator = rcl_get_default_allocator();
+ rcutils_ret_t rcutils_ret = rcutils_string_array_init(
+ &expression_parameters, 1, &allocator);
+ ASSERT_EQ(RCUTILS_RET_OK, rcutils_ret) << rcl_get_error_string().str;
+ expression_parameters.data[0] = rcutils_strdup("'FilteredOtherData'", allocator);
+
+ ret = rcl_subscription_set_cft_expression_parameters(
+ &subscription, "string_value MATCH %0", &expression_parameters);
+ if (is_vendor_support_cft) {
+ ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
+ } else {
+ ASSERT_EQ(RCL_RET_UNSUPPORTED, ret);
+ }
+
+ rcutils_ret = rcutils_string_array_fini(&expression_parameters);
+ ASSERT_EQ(RCUTILS_RET_OK, rcutils_ret) << rcl_get_error_string().str;
+ }
+
+ // publish FilteredData again
+ {
+ test_msgs__msg__Strings msg;
+ test_msgs__msg__Strings__init(&msg);
+ ASSERT_TRUE(rosidl_runtime_c__String__assign(&msg.string_value, test_filtered_string));
+ ret = rcl_publish(&publisher, &msg, nullptr);
+ test_msgs__msg__Strings__fini(&msg);
+ ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
+ }
+
+ if (is_vendor_support_cft) {
+ ASSERT_FALSE(wait_for_subscription_to_be_ready(&subscription, context_ptr, 10, 100));
+ } else {
+ ASSERT_TRUE(wait_for_subscription_to_be_ready(&subscription, context_ptr, 10, 100));
+
+ test_msgs__msg__Strings msg;
+ test_msgs__msg__Strings__init(&msg);
+ OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT(
+ {
+ test_msgs__msg__Strings__fini(&msg);
+ });
+ ret = rcl_take(&subscription, &msg, nullptr, nullptr);
+ ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
+ ASSERT_EQ(
+ std::string(test_filtered_string),
+ std::string(msg.string_value.data, msg.string_value.size));
+ }
+
+ constexpr char test_filtered_other_string[] = "FilteredOtherData";
+ {
+ test_msgs__msg__Strings msg;
+ test_msgs__msg__Strings__init(&msg);
+ ASSERT_TRUE(rosidl_runtime_c__String__assign(&msg.string_value, test_filtered_other_string));
+ ret = rcl_publish(&publisher, &msg, nullptr);
+ test_msgs__msg__Strings__fini(&msg);
+ ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
+ }
+
+ ASSERT_TRUE(wait_for_subscription_to_be_ready(&subscription, context_ptr, 10, 100));
+
+ {
+ test_msgs__msg__Strings msg;
+ test_msgs__msg__Strings__init(&msg);
+ OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT(
+ {
+ test_msgs__msg__Strings__fini(&msg);
+ });
+ ret = rcl_take(&subscription, &msg, nullptr, nullptr);
+ ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
+ ASSERT_EQ(
+ std::string(test_filtered_other_string),
+ std::string(msg.string_value.data, msg.string_value.size));
+ }
+
+ // get filter
+ char * filter_expression = nullptr;
+ rcutils_string_array_t expression_parameters;
+ expression_parameters = rcutils_get_zero_initialized_string_array();
+ ret = rcl_subscription_get_cft_expression_parameters(
+ &subscription, &filter_expression, &expression_parameters);
+ if (is_vendor_support_cft) {
+ ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
+ ASSERT_EQ(
+ std::string("string_value MATCH %0"),
+ std::string(filter_expression));
+ rcutils_allocator_t allocator = rcutils_get_default_allocator();
+ allocator.deallocate(filter_expression, allocator.state);
+ ASSERT_EQ(static_cast(1), expression_parameters.size);
+ ASSERT_EQ(
+ std::string("'FilteredOtherData'"),
+ std::string(expression_parameters.data[0]));
+ ASSERT_EQ(RCUTILS_RET_OK, rcutils_string_array_fini(&expression_parameters));
+ } else {
+ ASSERT_EQ(RCL_RET_UNSUPPORTED, ret);
+ }
+
+ // reset filter
+ {
+ ret = rcl_subscription_set_cft_expression_parameters(
+ &subscription, "", NULL);
+ if (is_vendor_support_cft) {
+ ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
+ ASSERT_TRUE(wait_for_established_subscription(&publisher, 50, 100));
+ ASSERT_FALSE(rcl_subscription_is_cft_supported(&subscription));
+ } else {
+ ASSERT_EQ(RCL_RET_UNSUPPORTED, ret);
+ }
+ }
+
+ // publish no filtered data again
+ {
+ test_msgs__msg__Strings msg;
+ test_msgs__msg__Strings__init(&msg);
+ ASSERT_TRUE(rosidl_runtime_c__String__assign(&msg.string_value, test_string));
+ ret = rcl_publish(&publisher, &msg, nullptr);
+ test_msgs__msg__Strings__fini(&msg);
+ ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
+ }
+
+ ASSERT_TRUE(wait_for_subscription_to_be_ready(&subscription, context_ptr, 10, 100));
+
+ {
+ test_msgs__msg__Strings msg;
+ test_msgs__msg__Strings__init(&msg);
+ OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT(
+ {
+ test_msgs__msg__Strings__fini(&msg);
+ });
+ ret = rcl_take(&subscription, &msg, nullptr, nullptr);
+ ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
+
+ // it might take 'FilteredData' again, if so, contine to take data
+ std::string data = std::string(msg.string_value.data, msg.string_value.size);
+ if (is_vendor_support_cft) {
+ const int try_total_num = 3;
+ int i = 0;
+ while (data != test_string && i++ < try_total_num) {
+ ASSERT_TRUE(wait_for_subscription_to_be_ready(&subscription, context_ptr, 10, 100));
+ test_msgs__msg__Strings msg;
+ test_msgs__msg__Strings__init(&msg);
+ OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT(
+ {
+ test_msgs__msg__Strings__fini(&msg);
+ });
+ ret = rcl_take(&subscription, &msg, nullptr, nullptr);
+ ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
+ data = std::string(msg.string_value.data, msg.string_value.size);
+ }
+ }
+ ASSERT_EQ(std::string(test_string), data);
+ }
+}
+
+/* A subscription without a content filtered topic setting at beginning.
+ */
+TEST_F(
+ CLASSNAME(
+ TestSubscriptionFixture, RMW_IMPLEMENTATION), test_subscription_not_begin_content_filtered) {
+ bool is_vendor_support_cft =
+ (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0);
+
+ rcl_ret_t ret;
+ rcl_publisher_t publisher = rcl_get_zero_initialized_publisher();
+ const rosidl_message_type_support_t * ts =
+ ROSIDL_GET_MSG_TYPE_SUPPORT(test_msgs, msg, Strings);
+ constexpr char topic[] = "rcl_test_subscription_not_begin_content_filtered_chatter";
+ rcl_publisher_options_t publisher_options = rcl_publisher_get_default_options();
+ ret = rcl_publisher_init(&publisher, this->node_ptr, ts, topic, &publisher_options);
+ ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
+ OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT(
+ {
+ rcl_ret_t ret = rcl_publisher_fini(&publisher, this->node_ptr);
+ EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
+ });
+ rcl_subscription_t subscription = rcl_get_zero_initialized_subscription();
+ // not to set filter expression
+ rcl_subscription_options_t subscription_options = rcl_subscription_get_default_options();
+ ret = rcl_subscription_init(&subscription, this->node_ptr, ts, topic, &subscription_options);
+ ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
+ OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT(
+ {
+ rcl_ret_t ret = rcl_subscription_fini(&subscription, this->node_ptr);
+ EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
+ });
+ ASSERT_FALSE(rcl_subscription_is_cft_supported(&subscription));
+ ASSERT_TRUE(wait_for_established_subscription(&publisher, 10, 100));
+
+ // publish no filtered data
+ constexpr char test_string[] = "NotFilteredData";
+ {
+ test_msgs__msg__Strings msg;
+ test_msgs__msg__Strings__init(&msg);
+ ASSERT_TRUE(rosidl_runtime_c__String__assign(&msg.string_value, test_string));
+ ret = rcl_publish(&publisher, &msg, nullptr);
+ test_msgs__msg__Strings__fini(&msg);
+ ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
+ }
+
+ ASSERT_TRUE(wait_for_subscription_to_be_ready(&subscription, context_ptr, 10, 100));
+
+ {
+ test_msgs__msg__Strings msg;
+ test_msgs__msg__Strings__init(&msg);
+ OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT(
+ {
+ test_msgs__msg__Strings__fini(&msg);
+ });
+ ret = rcl_take(&subscription, &msg, nullptr, nullptr);
+ ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
+ ASSERT_EQ(
+ std::string(test_string),
+ std::string(msg.string_value.data, msg.string_value.size));
+ }
+
+ // set filter
+ {
+ ret = rcl_subscription_set_cft_expression_parameters(
+ &subscription, "string_value MATCH 'FilteredData'", NULL);
+ if (is_vendor_support_cft) {
+ ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
+ } else {
+ ASSERT_EQ(RCL_RET_UNSUPPORTED, ret);
+ }
+ }
+
+ if (is_vendor_support_cft) {
+ ASSERT_TRUE(rcl_subscription_is_cft_supported(&subscription));
+ } else {
+ ASSERT_FALSE(rcl_subscription_is_cft_supported(&subscription));
+ }
+ // publish no filtered data again
+ {
+ test_msgs__msg__Strings msg;
+ test_msgs__msg__Strings__init(&msg);
+ ASSERT_TRUE(rosidl_runtime_c__String__assign(&msg.string_value, test_string));
+ ret = rcl_publish(&publisher, &msg, nullptr);
+ test_msgs__msg__Strings__fini(&msg);
+ ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
+ }
+
+ if (is_vendor_support_cft) {
+ ASSERT_FALSE(wait_for_subscription_to_be_ready(&subscription, context_ptr, 10, 100));
+ } else {
+ ASSERT_TRUE(wait_for_subscription_to_be_ready(&subscription, context_ptr, 10, 100));
+
+ test_msgs__msg__Strings msg;
+ test_msgs__msg__Strings__init(&msg);
+ OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT(
+ {
+ test_msgs__msg__Strings__fini(&msg);
+ });
+ ret = rcl_take(&subscription, &msg, nullptr, nullptr);
+ ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
+ ASSERT_EQ(
+ std::string(test_string),
+ std::string(msg.string_value.data, msg.string_value.size));
+ }
+
+ // publish filtered data
+ constexpr char test_filtered_string[] = "FilteredData";
+ {
+ test_msgs__msg__Strings msg;
+ test_msgs__msg__Strings__init(&msg);
+ ASSERT_TRUE(rosidl_runtime_c__String__assign(&msg.string_value, test_filtered_string));
+ ret = rcl_publish(&publisher, &msg, nullptr);
+ test_msgs__msg__Strings__fini(&msg);
+ ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
+ }
+
+ ASSERT_TRUE(wait_for_subscription_to_be_ready(&subscription, context_ptr, 10, 100));
+
+ {
+ test_msgs__msg__Strings msg;
+ test_msgs__msg__Strings__init(&msg);
+ OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT(
+ {
+ test_msgs__msg__Strings__fini(&msg);
+ });
+ ret = rcl_take(&subscription, &msg, nullptr, nullptr);
+ ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
+ ASSERT_EQ(
+ std::string(test_filtered_string),
+ std::string(msg.string_value.data, msg.string_value.size));
+ }
+}
+
TEST_F(CLASSNAME(TestSubscriptionFixture, RMW_IMPLEMENTATION), test_get_options) {
rcl_ret_t ret;
const rosidl_message_type_support_t * ts =
@@ -1087,6 +1470,8 @@ TEST_F(CLASSNAME(TestSubscriptionFixtureInit, RMW_IMPLEMENTATION), test_subscrip
rcl_reset_error();
EXPECT_EQ(NULL, rcl_subscription_get_options(nullptr));
rcl_reset_error();
+ EXPECT_FALSE(rcl_subscription_is_cft_supported(nullptr));
+ rcl_reset_error();
EXPECT_EQ(NULL, rcl_subscription_get_actual_qos(&subscription_zero_init));
rcl_reset_error();
@@ -1098,6 +1483,111 @@ TEST_F(CLASSNAME(TestSubscriptionFixtureInit, RMW_IMPLEMENTATION), test_subscrip
rcl_reset_error();
EXPECT_EQ(NULL, rcl_subscription_get_options(&subscription_zero_init));
rcl_reset_error();
+ EXPECT_FALSE(rcl_subscription_is_cft_supported(&subscription_zero_init));
+ rcl_reset_error();
+}
+
+/* Test for all failure modes in rcl_subscription_set_cft_expression_parameters function.
+ */
+TEST_F(
+ CLASSNAME(
+ TestSubscriptionFixtureInit,
+ RMW_IMPLEMENTATION), test_bad_rcl_subscription_set_cft_expression_parameters) {
+ EXPECT_EQ(
+ RCL_RET_SUBSCRIPTION_INVALID,
+ rcl_subscription_set_cft_expression_parameters(nullptr, nullptr, nullptr));
+ rcl_reset_error();
+
+ EXPECT_EQ(
+ RCL_RET_SUBSCRIPTION_INVALID,
+ rcl_subscription_set_cft_expression_parameters(&subscription_zero_init, nullptr, nullptr));
+ rcl_reset_error();
+
+ EXPECT_EQ(
+ RCL_RET_INVALID_ARGUMENT,
+ rcl_subscription_set_cft_expression_parameters(&subscription, nullptr, nullptr));
+ rcl_reset_error();
+
+ {
+ auto mock = mocking_utils::patch_and_return(
+ "lib:rcl", rmw_subscription_set_cft_expression_parameters, RMW_RET_UNSUPPORTED);
+ std::string filter_expression = "data MATCH '0'";
+ EXPECT_EQ(
+ RMW_RET_UNSUPPORTED,
+ rcl_subscription_set_cft_expression_parameters(
+ &subscription, filter_expression.c_str(),
+ nullptr));
+ rcl_reset_error();
+ }
+
+ {
+ auto mock = mocking_utils::patch_and_return(
+ "lib:rcl", rmw_subscription_set_cft_expression_parameters, RMW_RET_ERROR);
+ std::string filter_expression = "data MATCH '0'";
+ EXPECT_EQ(
+ RCL_RET_ERROR,
+ rcl_subscription_set_cft_expression_parameters(
+ &subscription, filter_expression.c_str(),
+ nullptr));
+ rcl_reset_error();
+ }
+}
+
+/* Test for all failure modes in rcl_subscription_get_cft_expression_parameters function.
+ */
+TEST_F(
+ CLASSNAME(
+ TestSubscriptionFixtureInit,
+ RMW_IMPLEMENTATION), test_bad_rcl_subscription_get_cft_expression_parameters) {
+ EXPECT_EQ(
+ RCL_RET_SUBSCRIPTION_INVALID,
+ rcl_subscription_get_cft_expression_parameters(nullptr, nullptr, nullptr));
+ rcl_reset_error();
+
+ EXPECT_EQ(
+ RCL_RET_SUBSCRIPTION_INVALID,
+ rcl_subscription_get_cft_expression_parameters(&subscription_zero_init, nullptr, nullptr));
+ rcl_reset_error();
+
+ EXPECT_EQ(
+ RCL_RET_INVALID_ARGUMENT,
+ rcl_subscription_get_cft_expression_parameters(&subscription, nullptr, nullptr));
+ rcl_reset_error();
+
+ char * filter_expression = NULL;
+ rcutils_string_array_t parameters;
+
+ EXPECT_EQ(
+ RCL_RET_INVALID_ARGUMENT,
+ rcl_subscription_get_cft_expression_parameters(&subscription, &filter_expression, nullptr));
+ rcl_reset_error();
+
+ EXPECT_EQ(
+ RCL_RET_INVALID_ARGUMENT,
+ rcl_subscription_get_cft_expression_parameters(&subscription, nullptr, ¶meters));
+ rcl_reset_error();
+
+ {
+ auto mock = mocking_utils::patch_and_return(
+ "lib:rcl", rmw_subscription_get_cft_expression_parameters, RMW_RET_UNSUPPORTED);
+ EXPECT_EQ(
+ RMW_RET_UNSUPPORTED,
+ rcl_subscription_get_cft_expression_parameters(
+ &subscription, &filter_expression,
+ ¶meters));
+ rcl_reset_error();
+ }
+
+ {
+ auto mock = mocking_utils::patch_and_return(
+ "lib:rcl", rmw_subscription_get_cft_expression_parameters, RMW_RET_ERROR);
+ EXPECT_EQ(
+ RCL_RET_ERROR,
+ rcl_subscription_get_cft_expression_parameters(
+ &subscription, &filter_expression,
+ ¶meters));
+ rcl_reset_error();
+ }
}
TEST_F(CLASSNAME(TestSubscriptionFixture, RMW_IMPLEMENTATION), test_init_fini_maybe_fail)
diff --git a/rcl_action/include/rcl_action/action_client.h b/rcl_action/include/rcl_action/action_client.h
index 430d05e66..810d9994f 100644
--- a/rcl_action/include/rcl_action/action_client.h
+++ b/rcl_action/include/rcl_action/action_client.h
@@ -24,6 +24,7 @@ extern "C"
#include "rcl_action/visibility_control.h"
#include "rcl/macros.h"
#include "rcl/node.h"
+#include "rcl/subscription.h"
/// Internal action client implementation struct.
@@ -741,6 +742,64 @@ bool
rcl_action_client_is_valid(
const rcl_action_client_t * action_client);
+/// Add a goal uuid.
+/**
+ * This function is to add a goal uuid to the map of rcl_action_client_t
+ * and then try to set content filtered topic if it is supported.
+ *
+ * The caller must provide a lock to call this interface
+ *
+ *
+ * Attribute | Adherence
+ * ------------------ | -------------
+ * Allocates Memory | No
+ * Thread-Safe | No
+ * Uses Atomics | No
+ * Lock-Free | Yes
+ *
+ * \param[in] action_client handle to the client that will take the goal response
+ * \param[in] uuid pointer to a uuid which length is 16
+ * \return `RCL_RET_OK` if success on setting a goal uuid, or
+ * \return `RCL_RET_INVALID_ARGUMENT` if any arguments are invalid, or
+ * \return `RCL_RET_ACTION_CLIENT_INVALID` if the action client is invalid, or
+ * \return `RCL_RET_UNSUPPORTED` if setting content filtered topic is not supported
+ * in the middleware, or
+ * \return `RCL_RET_ERROR` if an unspecified error occurs.
+ */
+RCL_ACTION_PUBLIC
+rcl_ret_t rcl_action_add_goal_uuid(
+ const rcl_action_client_t * action_client,
+ const uint8_t * uuid);
+
+/// Remove a goal uuid.
+/**
+ * This function is to remove a goal uuid from the map of rcl_action_client_t
+ * and then try to reset content filtered topic if it is supported.
+ *
+ * The caller must provide a lock to call this interface
+ *
+ *
+ * Attribute | Adherence
+ * ------------------ | -------------
+ * Allocates Memory | No
+ * Thread-Safe | No
+ * Uses Atomics | No
+ * Lock-Free | Yes
+ *
+ * \param[in] action_client handle to the client that will take the goal response
+ * \param[in] uuid pointer to a uuid which length is 16
+ * \return `RCL_RET_OK` if success on removing a goal uuid, or
+ * \return `RCL_RET_INVALID_ARGUMENT` if any arguments are invalid, or
+ * \return `RCL_RET_ACTION_CLIENT_INVALID` if the action client is invalid, or
+ * \return `RCL_RET_UNSUPPORTED` if setting content filtered topic is not supported
+ * in the middleware, or
+ * \return `RCL_RET_ERROR` if an unspecified error occurs.
+ */
+RCL_ACTION_PUBLIC
+rcl_ret_t rcl_action_remove_goal_uuid(
+ const rcl_action_client_t * action_client,
+ const uint8_t * uuid);
+
#ifdef __cplusplus
}
#endif
diff --git a/rcl_action/include/rcl_action/types.h b/rcl_action/include/rcl_action/types.h
index d5f60e446..8aa108583 100644
--- a/rcl_action/include/rcl_action/types.h
+++ b/rcl_action/include/rcl_action/types.h
@@ -58,6 +58,25 @@ extern "C"
#define uuidcmp(uuid0, uuid1) (0 == memcmp(uuid0, uuid1, UUID_SIZE))
#define zerouuid (uint8_t[UUID_SIZE]) {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
#define uuidcmpzero(uuid) uuidcmp(uuid, (zerouuid))
+#define UUID_FMT \
+ "%02X%02X%02X%02X%02X%02X%02X%02X%02X%02X%02X%02X%02X%02X%02X%02X"
+#define UUID_FMT_ARGS(uuid) \
+ uuid[0], \
+ uuid[1], \
+ uuid[2], \
+ uuid[3], \
+ uuid[4], \
+ uuid[5], \
+ uuid[6], \
+ uuid[7], \
+ uuid[8], \
+ uuid[9], \
+ uuid[10], \
+ uuid[11], \
+ uuid[12], \
+ uuid[13], \
+ uuid[14], \
+ uuid[15]
// Forward declare
typedef struct rcl_action_server_t rcl_action_server_t;
diff --git a/rcl_action/src/rcl_action/action_client.c b/rcl_action/src/rcl_action/action_client.c
index a03a61ec3..0fe4d329a 100644
--- a/rcl_action/src/rcl_action/action_client.c
+++ b/rcl_action/src/rcl_action/action_client.c
@@ -33,6 +33,7 @@ extern "C"
#include "rcl/wait.h"
#include "rcutils/logging_macros.h"
+#include "rcutils/format_string.h"
#include "rcutils/strdup.h"
#include "rmw/qos_profiles.h"
@@ -63,7 +64,8 @@ _rcl_action_get_zero_initialized_client_impl(void)
0,
0,
0,
- 0
+ 0,
+ rcutils_get_zero_initialized_hash_map()
};
return null_action_client;
}
@@ -92,6 +94,30 @@ _rcl_action_client_fini_impl(
ret = RCL_RET_ERROR;
}
allocator.deallocate(action_client->impl->action_name, allocator.state);
+ if (NULL != action_client->impl->goal_uuids.impl) {
+ uint8_t uuid[UUID_SIZE];
+ char * value = NULL;
+ rcl_allocator_t default_allocator = rcl_get_default_allocator();
+ rcutils_ret_t hashmap_ret = rcutils_hash_map_get_next_key_and_data(
+ &action_client->impl->goal_uuids, NULL, uuid, &value);
+ while (RCUTILS_RET_OK == hashmap_ret) {
+ RCUTILS_LOG_DEBUG_NAMED(ROS_PACKAGE_NAME, "remove a uuid: %s", value);
+ default_allocator.deallocate(value, default_allocator.state);
+ RCL_RET_FROM_RCUTIL_RET(
+ ret, rcutils_hash_map_unset(&action_client->impl->goal_uuids, uuid));
+ if (ret == RCL_RET_OK) {
+ hashmap_ret = rcutils_hash_map_get_next_key_and_data(
+ &action_client->impl->goal_uuids, NULL, uuid, &value);
+ }
+ }
+ if (RCUTILS_RET_HASH_MAP_NO_MORE_ENTRIES != hashmap_ret) {
+ RCL_RET_FROM_RCUTIL_RET(ret, hashmap_ret);
+ }
+ if (RCL_RET_OK == ret) {
+ RCL_RET_FROM_RCUTIL_RET(ret, rcutils_hash_map_fini(&action_client->impl->goal_uuids));
+ }
+ }
+
allocator.deallocate(action_client->impl, allocator.state);
action_client->impl = NULL;
RCUTILS_LOG_DEBUG_NAMED(ROS_PACKAGE_NAME, "Action client finalized");
@@ -171,6 +197,26 @@ _rcl_action_client_fini_impl(
goto fail; \
}
+size_t hash_map_uuid_hash_func(const void * uuid)
+{
+ const uint8_t * ckey_str = (const uint8_t *) uuid;
+ size_t hash = 5381;
+
+ for (size_t i = 0; i < UUID_SIZE; ++i) {
+ const char c = *(ckey_str++);
+ hash = ((hash << 5) + hash) + (size_t)c; /* hash * 33 + c */
+ }
+
+ return hash;
+}
+
+int hash_map_uuid_cmp_func(const void * val1, const void * val2)
+{
+ const uint8_t * cval1 = (const uint8_t *)val1;
+ const uint8_t * cval2 = (const uint8_t *)val2;
+ return memcmp(cval1, cval2, UUID_SIZE);
+}
+
rcl_ret_t
rcl_action_client_init(
rcl_action_client_t * action_client,
@@ -222,6 +268,15 @@ rcl_action_client_init(
SUBSCRIPTION_INIT(feedback);
SUBSCRIPTION_INIT(status);
+ // Initialize goal_uuids map
+ RCL_RET_FROM_RCUTIL_RET(
+ ret, rcutils_hash_map_init(
+ &action_client->impl->goal_uuids, 2, sizeof(uint8_t[16]), sizeof(const char **),
+ hash_map_uuid_hash_func, hash_map_uuid_cmp_func, &allocator));
+ if (RCL_RET_OK != ret) {
+ goto fail;
+ }
+
RCUTILS_LOG_DEBUG_NAMED(ROS_PACKAGE_NAME, "Action client initialized");
return ret;
fail:
@@ -649,6 +704,210 @@ rcl_action_client_wait_set_get_entities_ready(
return RCL_RET_OK;
}
+static char *
+to_uuid_string(const uint8_t * uuid, rcl_allocator_t allocator)
+{
+ char * uuid_str = rcutils_format_string(allocator, UUID_FMT, UUID_FMT_ARGS(uuid));
+ return uuid_str;
+}
+
+static
+rcl_ret_t set_content_filtered_topic(
+ const rcl_action_client_t * action_client)
+{
+ rcl_ret_t ret;
+ rcutils_ret_t rcutils_ret;
+ uint8_t uuid[UUID_SIZE];
+ char * uuid_str = NULL;
+ size_t size;
+ char * feedback_filter = NULL;
+ char * feedback_filter_update = NULL;
+ rcl_allocator_t allocator = rcl_get_default_allocator();
+
+ feedback_filter = rcutils_strdup("", allocator);
+ if (NULL == feedback_filter) {
+ RCL_SET_ERROR_MSG("failed to allocate memory for feedback filter string");
+ ret = RCL_RET_BAD_ALLOC;
+ goto clean;
+ }
+
+ RCL_RET_FROM_RCUTIL_RET(
+ ret, rcutils_hash_map_get_size(&action_client->impl->goal_uuids, &size));
+ if (RCL_RET_OK != ret) {
+ goto clean;
+ }
+ RCUTILS_LOG_DEBUG_NAMED(ROS_PACKAGE_NAME, "size: %zu", size);
+ if (0 == size) {
+ // mainly to reset content filtered topic with empty string
+ goto set_cft;
+ }
+
+ RCL_RET_FROM_RCUTIL_RET(
+ ret, rcutils_hash_map_get_next_key_and_data(
+ &action_client->impl->goal_uuids, NULL, uuid, &uuid_str));
+ if (RCL_RET_OK != ret) {
+ goto clean;
+ }
+
+ feedback_filter_update =
+ rcutils_format_string(allocator, "goal_id.uuid = &hex(%s)", uuid_str);
+ if (NULL == feedback_filter_update) {
+ RCL_SET_ERROR_MSG("failed to format string for feedback filter");
+ ret = RCL_RET_BAD_ALLOC;
+ goto clean;
+ }
+ allocator.deallocate(feedback_filter, allocator.state);
+ feedback_filter = feedback_filter_update;
+
+ while (RCUTILS_RET_OK == (rcutils_ret = rcutils_hash_map_get_next_key_and_data(
+ &action_client->impl->goal_uuids, uuid, uuid, &uuid_str)))
+ {
+ feedback_filter_update = rcutils_format_string(
+ allocator, "%s or goal_id.uuid = &hex(%s)", feedback_filter, uuid_str);
+ if (NULL == feedback_filter_update) {
+ RCL_SET_ERROR_MSG("failed to format string for feedback filter");
+ ret = RCL_RET_BAD_ALLOC;
+ goto clean;
+ }
+ allocator.deallocate(feedback_filter, allocator.state);
+ feedback_filter = feedback_filter_update;
+ }
+ if (RCUTILS_RET_HASH_MAP_NO_MORE_ENTRIES != rcutils_ret) {
+ RCL_RET_FROM_RCUTIL_RET(ret, rcutils_ret);
+ }
+ if (RCL_RET_OK != ret) {
+ goto clean;
+ }
+
+set_cft:
+
+ RCUTILS_LOG_DEBUG_NAMED(ROS_PACKAGE_NAME, "feedback_filter: %s", feedback_filter);
+ ret = rcl_subscription_set_cft_expression_parameters(
+ &action_client->impl->feedback_subscription, feedback_filter, NULL);
+ if (RCL_RET_OK != ret) {
+ goto clean;
+ }
+
+ // TODO(iuhilnehc-ynos) set status_subscription with filter expression for the sequence
+ // by wildcard if rti_connext_dds support this feature in the future,
+ // otherwise, it seems it's necessary to update the action message type or something else.
+
+clean:
+
+ allocator.deallocate(feedback_filter, allocator.state);
+ return ret;
+}
+
+rcl_ret_t rcl_action_add_goal_uuid(
+ const rcl_action_client_t * action_client,
+ const uint8_t * uuid)
+{
+ if (!rcl_action_client_is_valid(action_client)) {
+ return RCL_RET_ACTION_CLIENT_INVALID; /* error already set */
+ }
+ RCL_CHECK_ARGUMENT_FOR_NULL(uuid, RCL_RET_INVALID_ARGUMENT);
+
+ rcl_ret_t ret;
+ char * uuid_str = NULL;
+ rcl_allocator_t allocator = rcl_get_default_allocator();
+ uuid_str = to_uuid_string(uuid, allocator);
+ if (NULL == uuid_str) {
+ RCL_SET_ERROR_MSG("failed to allocate memory for uuid value");
+ ret = RCL_RET_BAD_ALLOC;
+ goto end;
+ }
+
+ RCL_RET_FROM_RCUTIL_RET(
+ ret, rcutils_hash_map_set(&action_client->impl->goal_uuids, uuid, &uuid_str));
+ if (RCL_RET_OK != ret) {
+ goto clean;
+ }
+
+ // to set content filtered topic
+ RCUTILS_LOG_DEBUG_NAMED(
+ ROS_PACKAGE_NAME, "set content filtered topic after adding a uuid: %s", uuid_str);
+ ret = set_content_filtered_topic(action_client);
+ if (RCL_RET_OK != ret) {
+ char * err = rcutils_strdup(rcl_get_error_string().str, allocator);
+ if (NULL == err) {
+ RCUTILS_LOG_ERROR_NAMED(ROS_PACKAGE_NAME, "failed to allocate memory for error");
+ ret = RCL_RET_BAD_ALLOC;
+ goto clean;
+ }
+ rcl_reset_error();
+ RCL_SET_ERROR_MSG_WITH_FORMAT_STRING("failed to set_content_filtered_topic: %s", err);
+ allocator.deallocate(err, allocator.state);
+ }
+ goto end;
+
+clean:
+ allocator.deallocate(uuid_str, allocator.state);
+
+end:
+ return ret;
+}
+
+rcl_ret_t rcl_action_remove_goal_uuid(
+ const rcl_action_client_t * action_client,
+ const uint8_t * uuid)
+{
+ if (!rcl_action_client_is_valid(action_client)) {
+ return RCL_RET_ACTION_CLIENT_INVALID; /* error already set */
+ }
+ RCL_CHECK_ARGUMENT_FOR_NULL(uuid, RCL_RET_INVALID_ARGUMENT);
+
+ rcl_ret_t ret;
+ char * uuid_str = NULL;
+ char * uuid_value = NULL;
+ rcl_allocator_t allocator = rcl_get_default_allocator();
+ uuid_str = to_uuid_string(uuid, allocator);
+ if (NULL == uuid_str) {
+ RCL_SET_ERROR_MSG("failed to allocate memory for uuid value");
+ ret = RCL_RET_BAD_ALLOC;
+ goto end;
+ }
+ if (!rcutils_hash_map_key_exists(&action_client->impl->goal_uuids, uuid)) {
+ RCL_SET_ERROR_MSG_WITH_FORMAT_STRING(
+ "item key [%s] not found in the map of goal uuids",
+ uuid_str);
+ ret = RCL_RET_ERROR;
+ goto end;
+ }
+
+ RCL_RET_FROM_RCUTIL_RET(
+ ret, rcutils_hash_map_get(&action_client->impl->goal_uuids, uuid, &uuid_value));
+ if (RCL_RET_OK != ret) {
+ goto end;
+ }
+ allocator.deallocate(uuid_value, allocator.state);
+
+ RCL_RET_FROM_RCUTIL_RET(
+ ret, rcutils_hash_map_unset(&action_client->impl->goal_uuids, uuid));
+ if (RCL_RET_OK != ret) {
+ goto end;
+ }
+
+ RCUTILS_LOG_DEBUG_NAMED(
+ ROS_PACKAGE_NAME, "set content filtered topic after removing a uuid: %s", uuid_str);
+ ret = set_content_filtered_topic(action_client);
+ if (RCL_RET_OK != ret) {
+ rcl_allocator_t allocator = rcl_get_default_allocator();
+ char * err = rcutils_strdup(rcl_get_error_string().str, allocator);
+ if (NULL == err) {
+ RCUTILS_LOG_ERROR_NAMED(ROS_PACKAGE_NAME, "failed to allocate memory for error");
+ ret = RCL_RET_BAD_ALLOC;
+ goto end;
+ }
+ rcl_reset_error();
+ RCL_SET_ERROR_MSG_WITH_FORMAT_STRING("failed to set_content_filtered_topic: %s", err);
+ allocator.deallocate(err, allocator.state);
+ }
+
+end:
+ allocator.deallocate(uuid_str, allocator.state);
+ return ret;
+}
+
#ifdef __cplusplus
}
#endif
diff --git a/rcl_action/src/rcl_action/action_client_impl.h b/rcl_action/src/rcl_action/action_client_impl.h
index 00cdd6125..e4bccdfea 100644
--- a/rcl_action/src/rcl_action/action_client_impl.h
+++ b/rcl_action/src/rcl_action/action_client_impl.h
@@ -18,6 +18,8 @@
#include "rcl_action/types.h"
#include "rcl/rcl.h"
+#include "rcutils/types.h"
+
typedef struct rcl_action_client_impl_t
{
rcl_client_t goal_client;
@@ -33,6 +35,7 @@ typedef struct rcl_action_client_impl_t
size_t wait_set_result_client_index;
size_t wait_set_feedback_subscription_index;
size_t wait_set_status_subscription_index;
+ rcutils_hash_map_t goal_uuids;
} rcl_action_client_impl_t;
diff --git a/rcl_action/test/rcl_action/test_action_communication.cpp b/rcl_action/test/rcl_action/test_action_communication.cpp
index 7f59fc333..bad51bfe6 100644
--- a/rcl_action/test/rcl_action/test_action_communication.cpp
+++ b/rcl_action/test/rcl_action/test_action_communication.cpp
@@ -13,6 +13,8 @@
// limitations under the License.
#include
+#include
+
#include "osrf_testing_tools_cpp/scope_exit.hpp"
#include "rcl_action/action_client.h"
@@ -652,6 +654,105 @@ TEST_F(CLASSNAME(TestActionCommunication, RMW_IMPLEMENTATION), test_valid_feedba
test_msgs__action__Fibonacci_FeedbackMessage__fini(&outgoing_feedback);
}
+TEST_F(CLASSNAME(TestActionCommunication, RMW_IMPLEMENTATION), test_valid_feedback_comm_cft)
+{
+ bool is_vendor_support_cft =
+ (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0);
+
+ #define FEEDBACK_SIZE 2
+ test_msgs__action__Fibonacci_FeedbackMessage outgoing_feedback[FEEDBACK_SIZE];
+ test_msgs__action__Fibonacci_FeedbackMessage incoming_feedback[FEEDBACK_SIZE];
+ for (size_t i = 0; i < FEEDBACK_SIZE; ++i) {
+ test_msgs__action__Fibonacci_FeedbackMessage__init(&outgoing_feedback[i]);
+ test_msgs__action__Fibonacci_FeedbackMessage__init(&incoming_feedback[i]);
+ }
+
+ uint8_t uuid[UUID_SIZE];
+ init_test_uuid0(uuid);
+ rcl_ret_t ret = rcl_action_add_goal_uuid(&this->action_client, uuid);
+ if (is_vendor_support_cft) {
+ EXPECT_EQ(ret, RCL_RET_OK) << rcl_get_error_string().str;
+ } else {
+ EXPECT_EQ(ret, RCL_RET_UNSUPPORTED) << rcl_get_error_string().str;
+ }
+
+ // Initialize feedback
+ for (size_t i = 0; i < FEEDBACK_SIZE; ++i) {
+ ASSERT_TRUE(
+ rosidl_runtime_c__int32__Sequence__init(
+ &outgoing_feedback[i].feedback.sequence, 3));
+ outgoing_feedback[i].feedback.sequence.data[0] = 0;
+ outgoing_feedback[i].feedback.sequence.data[1] = 1;
+ outgoing_feedback[i].feedback.sequence.data[2] = 2;
+ i == 0 ? init_test_uuid0(outgoing_feedback[i].goal_id.uuid) :
+ init_test_uuid1(outgoing_feedback[i].goal_id.uuid);
+ }
+
+ // Publish feedback with valid arguments
+ for (size_t i = 0; i < FEEDBACK_SIZE; ++i) {
+ ret = rcl_action_publish_feedback(&this->action_server, &outgoing_feedback[i]);
+ EXPECT_EQ(ret, RCL_RET_OK) << rcl_get_error_string().str;
+
+ ret = rcl_wait_set_clear(&this->wait_set);
+ EXPECT_EQ(ret, RCL_RET_OK) << rcl_get_error_string().str;
+
+ ret = rcl_action_wait_set_add_action_client(
+ &this->wait_set, &this->action_client, NULL, NULL);
+ ASSERT_EQ(ret, RCL_RET_OK) << rcl_get_error_string().str;
+
+ ret = rcl_wait(&this->wait_set, RCL_S_TO_NS(10));
+ if (0 == i || !is_vendor_support_cft) {
+ ASSERT_EQ(ret, RCL_RET_OK) << rcl_get_error_string().str;
+ ret = rcl_action_client_wait_set_get_entities_ready(
+ &this->wait_set,
+ &this->action_client,
+ &this->is_feedback_ready,
+ &this->is_status_ready,
+ &this->is_goal_response_ready,
+ &this->is_cancel_response_ready,
+ &this->is_result_response_ready);
+ EXPECT_EQ(ret, RCL_RET_OK) << rcl_get_error_string().str;
+
+ EXPECT_TRUE(this->is_feedback_ready);
+ EXPECT_FALSE(this->is_status_ready);
+ EXPECT_FALSE(this->is_result_response_ready);
+ EXPECT_FALSE(this->is_cancel_response_ready);
+ EXPECT_FALSE(this->is_goal_response_ready);
+
+ // Take feedback with valid arguments
+ ret = rcl_action_take_feedback(&this->action_client, &incoming_feedback[i]);
+ EXPECT_EQ(ret, RCL_RET_OK) << rcl_get_error_string().str;
+
+ // Check that feedback was received correctly
+ EXPECT_TRUE(
+ uuidcmp(
+ outgoing_feedback[i].goal_id.uuid,
+ incoming_feedback[i].goal_id.uuid));
+ ASSERT_EQ(
+ outgoing_feedback[i].feedback.sequence.size, incoming_feedback[i].feedback.sequence.size);
+ EXPECT_TRUE(
+ !memcmp(
+ outgoing_feedback[i].feedback.sequence.data,
+ incoming_feedback[i].feedback.sequence.data,
+ outgoing_feedback[i].feedback.sequence.size));
+ } else {
+ EXPECT_EQ(ret, RCL_RET_TIMEOUT) << rcl_get_error_string().str;
+ }
+ }
+
+ ret = rcl_action_remove_goal_uuid(&this->action_client, uuid);
+ if (is_vendor_support_cft) {
+ EXPECT_EQ(ret, RCL_RET_OK) << rcl_get_error_string().str;
+ } else {
+ EXPECT_EQ(ret, RCL_RET_UNSUPPORTED) << rcl_get_error_string().str;
+ }
+
+ for (size_t i = 0; i < FEEDBACK_SIZE; ++i) {
+ test_msgs__action__Fibonacci_FeedbackMessage__fini(&outgoing_feedback[i]);
+ test_msgs__action__Fibonacci_FeedbackMessage__fini(&incoming_feedback[i]);
+ }
+}
+
TEST_F(CLASSNAME(TestActionCommunication, RMW_IMPLEMENTATION), test_invalid_goal_request_opts)
{
test_msgs__action__Fibonacci_SendGoal_Request outgoing_goal_request;