Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tracing instrumentation using tracetools #294

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions rmw_zenoh_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@ find_package(rcutils REQUIRED)
find_package(rosidl_typesupport_fastrtps_c REQUIRED)
find_package(rosidl_typesupport_fastrtps_cpp REQUIRED)
find_package(rmw REQUIRED)
find_package(tracetools REQUIRED)
find_package(zenoh_cpp_vendor REQUIRED)

add_library(rmw_zenoh_cpp SHARED
@@ -67,6 +68,7 @@ target_link_libraries(rmw_zenoh_cpp
rosidl_typesupport_fastrtps_c::rosidl_typesupport_fastrtps_c
rosidl_typesupport_fastrtps_cpp::rosidl_typesupport_fastrtps_cpp
rmw::rmw
tracetools::tracetools
zenohcxx::zenohc
)

1 change: 1 addition & 0 deletions rmw_zenoh_cpp/package.xml
Original file line number Diff line number Diff line change
@@ -25,6 +25,7 @@
<depend>rosidl_typesupport_fastrtps_c</depend>
<depend>rosidl_typesupport_fastrtps_cpp</depend>
<depend>rmw</depend>
<depend>tracetools</depend>

<test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend>
13 changes: 11 additions & 2 deletions rmw_zenoh_cpp/src/detail/rmw_client_data.cpp
Original file line number Diff line number Diff line change
@@ -44,6 +44,8 @@
#include "rmw/get_topic_endpoint_info.h"
#include "rmw/impl/cpp/macros.hpp"

#include "tracetools/tracetools.h"

namespace rmw_zenoh_cpp
{
///=============================================================================
@@ -363,10 +365,17 @@ rmw_ret_t ClientData::send_request(
size_t data_length = ser.get_serialized_data_length();
*sequence_id = sequence_number_++;

TRACETOOLS_TRACEPOINT(
rmw_send_request,
static_cast<const void *>(rmw_client_),
static_cast<const void *>(ros_request),
*sequence_id);

// Send request
zenoh::Session::GetOptions opts = zenoh::Session::GetOptions::create_default();
std::array<uint8_t, RMW_GID_STORAGE_SIZE> local_gid = entity_->copy_gid();
opts.attachment = rmw_zenoh_cpp::create_map_and_set_sequence_num(*sequence_id, local_gid);
int64_t source_timestamp = rmw_zenoh_cpp::get_system_time_in_ns();
opts.attachment = rmw_zenoh_cpp::AttachmentData(
*sequence_id, source_timestamp, entity_->copy_gid()).serialize_to_zbytes();
opts.target = Z_QUERY_TARGET_ALL_COMPLETE;
// The default timeout for a z_get query is 10 seconds and if a response is not received within
// this window, the queryable will return an invalid reply. However, it is common for actions,
1 change: 1 addition & 0 deletions rmw_zenoh_cpp/src/detail/rmw_client_data.hpp
Original file line number Diff line number Diff line change
@@ -118,6 +118,7 @@ class ClientData final : public std::enable_shared_from_this<ClientData>
mutable std::recursive_mutex mutex_;
// The parent node.
const rmw_node_t * rmw_node_;
// The rmw client.
const rmw_client_t * rmw_client_;
// The Entity generated for the service.
std::shared_ptr<liveliness::Entity> entity_;
2 changes: 2 additions & 0 deletions rmw_zenoh_cpp/src/detail/rmw_node_data.cpp
Original file line number Diff line number Diff line change
@@ -141,6 +141,7 @@ bool NodeData::create_pub_data(

auto pub_data = PublisherData::make(
session,
publisher,
node_,
entity_->node_info(),
id_,
@@ -276,6 +277,7 @@ bool NodeData::create_service_data(
auto service_data = ServiceData::make(
session,
node_,
service,
entity_->node_info(),
id_,
std::move(id),
22 changes: 17 additions & 5 deletions rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp
Original file line number Diff line number Diff line change
@@ -37,6 +37,8 @@
#include "rmw/get_topic_endpoint_info.h"
#include "rmw/impl/cpp/macros.hpp"

#include "tracetools/tracetools.h"

namespace rmw_zenoh_cpp
{
// TODO(yuyuan): SHM, make this configurable
@@ -45,6 +47,7 @@ namespace rmw_zenoh_cpp
///=============================================================================
std::shared_ptr<PublisherData> PublisherData::make(
std::shared_ptr<zenoh::Session> session,
const rmw_publisher_t * const rmw_publisher,
const rmw_node_t * const node,
liveliness::NodeInfo node_info,
std::size_t node_id,
@@ -158,6 +161,7 @@ std::shared_ptr<PublisherData> PublisherData::make(

return std::shared_ptr<PublisherData>(
new PublisherData{
rmw_publisher,
node,
std::move(entity),
std::move(session),
@@ -171,6 +175,7 @@ std::shared_ptr<PublisherData> PublisherData::make(

///=============================================================================
PublisherData::PublisherData(
const rmw_publisher_t * const rmw_publisher,
const rmw_node_t * rmw_node,
std::shared_ptr<liveliness::Entity> entity,
std::shared_ptr<zenoh::Session> sess,
@@ -179,7 +184,8 @@ PublisherData::PublisherData(
zenoh::LivelinessToken token,
const void * type_support_impl,
std::unique_ptr<MessageTypeSupport> type_support)
: rmw_node_(rmw_node),
: rmw_publisher_(rmw_publisher),
rmw_node_(rmw_node),
entity_(std::move(entity)),
sess_(std::move(sess)),
pub_(std::move(pub)),
@@ -246,17 +252,19 @@ rmw_ret_t PublisherData::publish(
// session use different encoding formats. In our case, all key expressions
// will be encoded with CDR so it does not really matter.
zenoh::ZResult result;
int64_t source_timestamp = rmw_zenoh_cpp::get_system_time_in_ns();
auto options = zenoh::Publisher::PutOptions::create_default();
options.attachment = create_map_and_set_sequence_num(
sequence_number_++,
entity_->copy_gid());
options.attachment = rmw_zenoh_cpp::AttachmentData(
sequence_number_++, source_timestamp, entity_->copy_gid()).serialize_to_zbytes();

// TODO(ahcorde): shmbuf
std::vector<uint8_t> raw_data(
reinterpret_cast<const uint8_t *>(msg_bytes),
reinterpret_cast<const uint8_t *>(msg_bytes) + data_length);
zenoh::Bytes payload(std::move(raw_data));

TRACETOOLS_TRACEPOINT(
rmw_publish, static_cast<const void *>(rmw_publisher_), ros_message, source_timestamp);
pub_.put(std::move(payload), std::move(options), &result);
if (result != Z_OK) {
if (result == Z_ESESSION_CLOSED) {
@@ -292,14 +300,18 @@ rmw_ret_t PublisherData::publish_serialized_message(
// session use different encoding formats. In our case, all key expressions
// will be encoded with CDR so it does not really matter.
zenoh::ZResult result;
int64_t source_timestamp = rmw_zenoh_cpp::get_system_time_in_ns();
auto options = zenoh::Publisher::PutOptions::create_default();
options.attachment = create_map_and_set_sequence_num(sequence_number_++, entity_->copy_gid());
options.attachment = rmw_zenoh_cpp::AttachmentData(
sequence_number_++, source_timestamp, entity_->copy_gid()).serialize_to_zbytes();

std::vector<uint8_t> raw_data(
serialized_message->buffer,
serialized_message->buffer + data_length);
zenoh::Bytes payload(std::move(raw_data));

TRACETOOLS_TRACEPOINT(
rmw_publish, static_cast<const void *>(rmw_publisher_), serialized_message, source_timestamp);
pub_.put(std::move(payload), std::move(options), &result);
if (result != Z_OK) {
if (result == Z_ESESSION_CLOSED) {
4 changes: 4 additions & 0 deletions rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp
Original file line number Diff line number Diff line change
@@ -45,6 +45,7 @@ class PublisherData final
// Make a shared_ptr of PublisherData.
static std::shared_ptr<PublisherData> make(
std::shared_ptr<zenoh::Session> session,
const rmw_publisher_t * const rmw_publisher,
const rmw_node_t * const node,
liveliness::NodeInfo node_info,
std::size_t node_id,
@@ -90,6 +91,7 @@ class PublisherData final
private:
// Constructor.
PublisherData(
const rmw_publisher_t * const rmw_publisher,
const rmw_node_t * rmw_node,
std::shared_ptr<liveliness::Entity> entity,
std::shared_ptr<zenoh::Session> session,
@@ -101,6 +103,8 @@ class PublisherData final

// Internal mutex.
mutable std::mutex mutex_;
// The rmw publisher
const rmw_publisher_t * rmw_publisher_;
// The parent node.
const rmw_node_t * rmw_node_;
// The Entity generated for the publisher.
19 changes: 16 additions & 3 deletions rmw_zenoh_cpp/src/detail/rmw_service_data.cpp
Original file line number Diff line number Diff line change
@@ -40,12 +40,15 @@
#include "rmw/get_topic_endpoint_info.h"
#include "rmw/impl/cpp/macros.hpp"

#include "tracetools/tracetools.h"

namespace rmw_zenoh_cpp
{
///=============================================================================
std::shared_ptr<ServiceData> ServiceData::make(
std::shared_ptr<zenoh::Session> session,
const rmw_node_t * const node,
const rmw_service_t * rmw_service,
liveliness::NodeInfo node_info,
std::size_t node_id,
std::size_t service_id,
@@ -128,6 +131,7 @@ std::shared_ptr<ServiceData> ServiceData::make(
auto service_data = std::shared_ptr<ServiceData>(
new ServiceData{
node,
rmw_service,
std::move(entity),
session,
request_members,
@@ -192,13 +196,15 @@ std::shared_ptr<ServiceData> ServiceData::make(
///=============================================================================
ServiceData::ServiceData(
const rmw_node_t * rmw_node,
const rmw_service_t * rmw_service,
std::shared_ptr<liveliness::Entity> entity,
std::shared_ptr<zenoh::Session> session,
const void * request_type_support_impl,
const void * response_type_support_impl,
std::unique_ptr<RequestTypeSupport> request_type_support,
std::unique_ptr<ResponseTypeSupport> response_type_support)
: rmw_node_(rmw_node),
rmw_service_(rmw_service),
entity_(std::move(entity)),
sess_(std::move(session)),
request_type_support_impl_(request_type_support_impl),
@@ -438,9 +444,9 @@ rmw_ret_t ServiceData::send_response(
zenoh::Query::ReplyOptions options = zenoh::Query::ReplyOptions::create_default();
std::array<uint8_t, RMW_GID_STORAGE_SIZE> writer_gid;
memcpy(writer_gid.data(), request_id->writer_guid, RMW_GID_STORAGE_SIZE);
options.attachment = create_map_and_set_sequence_num(
request_id->sequence_number,
writer_gid);
int64_t source_timestamp = rmw_zenoh_cpp::get_system_time_in_ns();
options.attachment = rmw_zenoh_cpp::AttachmentData(
request_id->sequence_number, source_timestamp, writer_gid).serialize_to_zbytes();

std::vector<uint8_t> raw_bytes(
reinterpret_cast<const uint8_t *>(response_bytes),
@@ -454,6 +460,13 @@ rmw_ret_t ServiceData::send_response(
return RMW_RET_ERROR;
}

TRACETOOLS_TRACEPOINT(
rmw_send_response,
static_cast<const void *>(rmw_service_),
static_cast<const void *>(ros_response),
request_id->writer_guid,
request_id->sequence_number,
source_timestamp);
loaned_query.reply(service_ke, std::move(payload), std::move(options), &result);
if (result != Z_OK) {
RMW_SET_ERROR_MSG("unable to reply");
4 changes: 4 additions & 0 deletions rmw_zenoh_cpp/src/detail/rmw_service_data.hpp
Original file line number Diff line number Diff line change
@@ -49,6 +49,7 @@ class ServiceData final : public std::enable_shared_from_this<ServiceData>
static std::shared_ptr<ServiceData> make(
std::shared_ptr<zenoh::Session> session,
const rmw_node_t * const node,
const rmw_service_t * rmw_service,
liveliness::NodeInfo node_info,
std::size_t node_id,
std::size_t service_id,
@@ -98,6 +99,7 @@ class ServiceData final : public std::enable_shared_from_this<ServiceData>
// Constructor.
ServiceData(
const rmw_node_t * rmw_node,
const rmw_service_t * rmw_service,
std::shared_ptr<liveliness::Entity> entity,
std::shared_ptr<zenoh::Session> session,
const void * request_type_support_impl,
@@ -109,6 +111,8 @@ class ServiceData final : public std::enable_shared_from_this<ServiceData>
mutable std::mutex mutex_;
// The parent node.
const rmw_node_t * rmw_node_;
// The rmw service.
const rmw_service_t * rmw_service_;
// The Entity generated for the service.
std::shared_ptr<liveliness::Entity> entity_;

19 changes: 7 additions & 12 deletions rmw_zenoh_cpp/src/detail/zenoh_utils.cpp
Original file line number Diff line number Diff line change
@@ -26,18 +26,6 @@

namespace rmw_zenoh_cpp
{
///=============================================================================
zenoh::Bytes create_map_and_set_sequence_num(
int64_t sequence_number, std::array<uint8_t, RMW_GID_STORAGE_SIZE> gid)
{
auto now = std::chrono::system_clock::now().time_since_epoch();
auto now_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(now);
int64_t source_timestamp = now_ns.count();

rmw_zenoh_cpp::AttachmentData data(sequence_number, source_timestamp, gid);
return data.serialize_to_zbytes();
}

///=============================================================================
ZenohQuery::ZenohQuery(
const zenoh::Query & query,
@@ -82,4 +70,11 @@ std::chrono::nanoseconds::rep ZenohReply::get_received_timestamp() const
{
return received_timestamp_;
}

int64_t get_system_time_in_ns()
{
auto now = std::chrono::system_clock::now().time_since_epoch();
return std::chrono::duration_cast<std::chrono::nanoseconds>(now).count();
}

} // namespace rmw_zenoh_cpp
6 changes: 2 additions & 4 deletions rmw_zenoh_cpp/src/detail/zenoh_utils.hpp
Original file line number Diff line number Diff line change
@@ -26,10 +26,6 @@

namespace rmw_zenoh_cpp
{
///=============================================================================
zenoh::Bytes create_map_and_set_sequence_num(
int64_t sequence_number, std::array<uint8_t, RMW_GID_STORAGE_SIZE> gid);

///=============================================================================
// A class to store the replies to service requests.
class ZenohReply final
@@ -65,6 +61,8 @@ class ZenohQuery final
zenoh::Query query_;
std::chrono::nanoseconds::rep received_timestamp_;
};

int64_t get_system_time_in_ns();
} // namespace rmw_zenoh_cpp

#endif // DETAIL__ZENOH_UTILS_HPP_
75 changes: 69 additions & 6 deletions rmw_zenoh_cpp/src/rmw_zenoh.cpp
Original file line number Diff line number Diff line change
@@ -56,6 +56,8 @@
#include "rmw/validate_namespace.h"
#include "rmw/validate_node_name.h"

#include "tracetools/tracetools.h"

namespace
{
//==============================================================================
@@ -454,6 +456,14 @@ rmw_create_publisher(
free_topic_name.cancel();
free_rmw_publisher.cancel();

if (TRACETOOLS_TRACEPOINT_ENABLED(rmw_publisher_init)) {
rmw_gid_t gid{};
// Trigger tracepoint even if we cannot get the GID
rmw_ret_t gid_ret = rmw_get_gid_for_publisher(rmw_publisher, &gid);
static_cast<void>(gid_ret);
TRACETOOLS_DO_TRACEPOINT(
rmw_publisher_init, static_cast<const void *>(rmw_publisher), gid.data);
}
return rmw_publisher;
}

@@ -990,6 +1000,12 @@ rmw_create_subscription(
free_topic_name.cancel();
free_rmw_subscription.cancel();

// rmw does not require GIDs for subscriptions, and GIDs in rmw_zenoh are not based on any ID of
// the underlying zenoh objects, so there is no need to collect a GID here
rmw_gid_t gid{};
clalancette marked this conversation as resolved.
Show resolved Hide resolved
static_cast<void>(gid);
TRACETOOLS_TRACEPOINT(
rmw_subscription_init, static_cast<const void *>(rmw_subscription), gid.data);
return rmw_subscription;
}

@@ -1134,7 +1150,18 @@ rmw_take(
static_cast<rmw_zenoh_cpp::SubscriptionData *>(subscription->data);
RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT);

return sub_data->take_one_message(ros_message, nullptr, taken);
if (!TRACETOOLS_TRACEPOINT_ENABLED(rmw_take)) {
return sub_data->take_one_message(ros_message, nullptr, taken);
}
rmw_message_info_t message_info{};
rmw_ret_t ret = sub_data->take_one_message(ros_message, &message_info, taken);
TRACETOOLS_DO_TRACEPOINT(
rmw_take,
static_cast<const void *>(subscription),
static_cast<const void *>(ros_message),
message_info.source_timestamp,
*taken);
return ret;
}

//==============================================================================
@@ -1163,7 +1190,14 @@ rmw_take_with_info(
static_cast<rmw_zenoh_cpp::SubscriptionData *>(subscription->data);
RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT);

return sub_data->take_one_message(ros_message, message_info, taken);
rmw_ret_t ret = sub_data->take_one_message(ros_message, message_info, taken);
TRACETOOLS_TRACEPOINT(
rmw_take,
static_cast<const void *>(subscription),
static_cast<const void *>(ros_message),
message_info->source_timestamp,
*taken);
return ret;
}

//==============================================================================
@@ -1269,11 +1303,18 @@ __rmw_take_serialized(
static_cast<rmw_zenoh_cpp::SubscriptionData *>(subscription->data);
RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT);

return sub_data->take_serialized_message(
rmw_ret_t ret = sub_data->take_serialized_message(
serialized_message,
taken,
message_info
);
TRACETOOLS_TRACEPOINT(
rmw_take,
static_cast<const void *>(subscription),
static_cast<const void *>(serialized_message),
message_info->source_timestamp,
*taken);
return ret;
}
} // namespace

@@ -1452,7 +1493,8 @@ rmw_create_client(
// TODO(Yadunund): We cannot store the rmw_node_t * here since this type erased
// Client handle will be returned in the rmw_clients_t in rmw_wait
// from which we cannot obtain ClientData.
rmw_client->data = static_cast<void *>(node_data->get_client_data(rmw_client).get());
rmw_zenoh_cpp::ClientDataPtr client_data = node_data->get_client_data(rmw_client);
rmw_client->data = static_cast<void *>(client_data.get());
rmw_client->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier;
rmw_client->service_name = rcutils_strdup(service_name, *allocator);
RMW_CHECK_FOR_NULL_WITH_MSG(
@@ -1467,6 +1509,10 @@ rmw_create_client(
free_rmw_client.cancel();
free_service_name.cancel();

TRACETOOLS_TRACEPOINT(
rmw_client_init,
static_cast<const void *>(rmw_client),
client_data->copy_gid().data());
return rmw_client;
}

@@ -1560,8 +1606,17 @@ rmw_take_response(
RMW_CHECK_FOR_NULL_WITH_MSG(
client->data, "Unable to retrieve client_data from client.", RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(ros_response, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(request_header, RMW_RET_INVALID_ARGUMENT);

return client_data->take_response(request_header, ros_response, taken);
rmw_ret_t ret = client_data->take_response(request_header, ros_response, taken);
TRACETOOLS_TRACEPOINT(
rmw_take_response,
static_cast<const void *>(client),
static_cast<const void *>(ros_response),
request_header->request_id.sequence_number,
request_header->source_timestamp,
*taken);
return ret;
}

//==============================================================================
@@ -1779,10 +1834,18 @@ rmw_take_request(
RMW_CHECK_ARGUMENT_FOR_NULL(request_header, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(ros_request, RMW_RET_INVALID_ARGUMENT);

return service_data->take_request(
rmw_ret_t ret = service_data->take_request(
request_header,
ros_request,
taken);
TRACETOOLS_TRACEPOINT(
rmw_take_request,
static_cast<const void *>(service),
static_cast<const void *>(ros_request),
request_header->request_id.writer_guid,
request_header->request_id.sequence_number,
*taken);
return ret;
}

//==============================================================================