From 1d5a4672000ee99f666a5f76b4591784411dc0c7 Mon Sep 17 00:00:00 2001 From: Mahmoud Mazouz Date: Mon, 23 Dec 2024 17:03:17 +0100 Subject: [PATCH] Don't copy contiguous bytes on reception (#343) * Don't copy contiguous bytes on reception This uses the slices iterator API of zenoh-cpp to avoid unecessarily copying bytes into a vecotr, if and only if the bytes is made up of exactly one slice. * Don't use auto type specifiers * Remove unused `` includes * Explain lifetime of `Contiguous::slice` * Move `Payload` into `zenoh_utils` Signed-off-by: Mahmoud Mazouz Co-authored-by: Chris Lalancette --- .../src/detail/rmw_subscription_data.cpp | 10 ++-- .../src/detail/rmw_subscription_data.hpp | 5 +- rmw_zenoh_cpp/src/detail/zenoh_utils.cpp | 51 +++++++++++++++++++ rmw_zenoh_cpp/src/detail/zenoh_utils.hpp | 29 +++++++++++ 4 files changed, 86 insertions(+), 9 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp index 83d06139..b98f0308 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp @@ -24,7 +24,6 @@ #include #include #include -#include #include "attachment_helpers.hpp" #include "cdr.hpp" @@ -44,10 +43,10 @@ namespace rmw_zenoh_cpp { ///============================================================================= SubscriptionData::Message::Message( - std::vector && p, + const zenoh::Bytes & p, uint64_t recv_ts, AttachmentData && attachment_) -: payload(std::move(p)), recv_timestamp(recv_ts), attachment(std::move(attachment_)) +: payload(p), recv_timestamp(recv_ts), attachment(std::move(attachment_)) { } @@ -225,7 +224,7 @@ bool SubscriptionData::init() sub_data->add_new_message( std::make_unique( - sample.get_payload().as_vector(), + sample.get_payload(), std::chrono::system_clock::now().time_since_epoch().count(), std::move(attachment_data)), std::string(sample.get_keyexpr().as_string_view())); @@ -303,13 +302,12 @@ bool SubscriptionData::init() "Unable to obtain attachment") return; } - auto payload = sample.get_payload().clone(); auto attachment_value = attachment.value(); AttachmentData attachment_data(attachment_value); sub_data->add_new_message( std::make_unique( - payload.as_vector(), + sample.get_payload(), std::chrono::system_clock::now().time_since_epoch().count(), std::move(attachment_data)), std::string(sample.get_keyexpr().as_string_view())); diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp index a3fab3f9..37ab0dba 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp @@ -25,7 +25,6 @@ #include #include #include -#include #include @@ -51,13 +50,13 @@ class SubscriptionData final : public std::enable_shared_from_this && p, + const zenoh::Bytes & bytes, uint64_t recv_ts, AttachmentData && attachment); ~Message(); - std::vector payload; + Payload payload; uint64_t recv_timestamp; AttachmentData attachment; }; diff --git a/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp b/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp index 3e6f8ef9..18253fa3 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp @@ -82,4 +82,55 @@ std::chrono::nanoseconds::rep ZenohReply::get_received_timestamp() const { return received_timestamp_; } + +Payload::Payload(const zenoh::Bytes & bytes) +{ + // NOTE(fuzzypixelz): `zenoh::Bytes` is an list of reference-couted buffers. When the list of + // buffers contains exactly one element, it is not necessary to concatenate the list of buffers. + // In this case, we store a clone of the bytes object to maintain a non-zero reference-count on + // the buffer. This ensures that the slice into said buffer stays valid until we drop our copy + // of the bytes object (at the very least). This case corresponds to the `Contiguous` + // alternative of the `bytes_` variant and aims to optimize away a memcpy during "session-local" + // communication. + + zenoh::Bytes::SliceIterator slices = bytes.slice_iter(); + std::optional slice = slices.next(); + if (!slice.has_value()) { + bytes_ = nullptr; + } else { + if (!slices.next().has_value()) { + bytes_ = Contiguous {slice.value(), bytes.clone()}; + } else { + bytes_ = bytes.as_vector(); + } + } +} + +const uint8_t * Payload::data() const +{ + if (std::holds_alternative(bytes_)) { + return nullptr; + } else if (std::holds_alternative(bytes_)) { + return std::get(bytes_).data(); + } else { + return std::get(bytes_).slice.data; + } +} + +size_t Payload::size() const +{ + if (std::holds_alternative(bytes_)) { + return 0; + } else if (std::holds_alternative(bytes_)) { + return std::get(bytes_).size(); + } else { + return std::get(bytes_).slice.len; + } +} + +bool Payload::empty() const +{ + return std::holds_alternative(bytes_); +} + } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp b/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp index 8da7aae4..734ade92 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp @@ -21,6 +21,9 @@ #include #include #include +#include +#include +#include #include "rmw/types.h" @@ -65,6 +68,32 @@ class ZenohQuery final zenoh::Query query_; std::chrono::nanoseconds::rep received_timestamp_; }; + +class Payload +{ +public: + explicit Payload(const zenoh::Bytes & bytes); + + ~Payload() = default; + + const uint8_t * data() const; + + size_t size() const; + + bool empty() const; + +private: + struct Contiguous + { + zenoh::Slice slice; + zenoh::Bytes bytes; + }; + using NonContiguous = std::vector; + using Empty = std::nullptr_t; + // Is `std::vector` in case of a non-contiguous payload + // and `zenoh::Slice` plus a `zenoh::Bytes` otherwise. + std::variant bytes_; +}; } // namespace rmw_zenoh_cpp #endif // DETAIL__ZENOH_UTILS_HPP_