Skip to content

Commit

Permalink
Don't copy contiguous bytes on reception (#343)
Browse files Browse the repository at this point in the history
* Don't copy contiguous bytes on reception

This uses the slices iterator API of zenoh-cpp to avoid unecessarily
copying bytes into a vecotr, if and only if the bytes is made up of
exactly one slice.

* Don't use auto type specifiers

* Remove unused `<vector>` includes

* Explain lifetime of `Contiguous::slice`

* Move `Payload` into `zenoh_utils`

Signed-off-by: Mahmoud Mazouz <[email protected]>
Co-authored-by: Chris Lalancette <[email protected]>
(cherry picked from commit cebb972)

# Conflicts:
#	rmw_zenoh_cpp/src/detail/zenoh_utils.cpp
#	rmw_zenoh_cpp/src/detail/zenoh_utils.hpp
  • Loading branch information
fuzzypixelz authored and mergify[bot] committed Jan 2, 2025
1 parent 52c681a commit 8e50d08
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 9 deletions.
10 changes: 4 additions & 6 deletions rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
#include <string>
#include <utility>
#include <variant>
#include <vector>

#include "attachment_helpers.hpp"
#include "cdr.hpp"
Expand All @@ -44,10 +43,10 @@ namespace rmw_zenoh_cpp
{
///=============================================================================
SubscriptionData::Message::Message(
std::vector<uint8_t> && p,
const zenoh::Bytes & p,
uint64_t recv_ts,
AttachmentData && attachment_)
: payload(std::move(p)), recv_timestamp(recv_ts), attachment(std::move(attachment_))
: payload(p), recv_timestamp(recv_ts), attachment(std::move(attachment_))
{
}

Expand Down Expand Up @@ -225,7 +224,7 @@ bool SubscriptionData::init()

sub_data->add_new_message(
std::make_unique<SubscriptionData::Message>(
sample.get_payload().as_vector(),
sample.get_payload(),
std::chrono::system_clock::now().time_since_epoch().count(),
std::move(attachment_data)),
std::string(sample.get_keyexpr().as_string_view()));
Expand Down Expand Up @@ -303,13 +302,12 @@ bool SubscriptionData::init()
"Unable to obtain attachment")
return;
}
auto payload = sample.get_payload().clone();
auto attachment_value = attachment.value();

AttachmentData attachment_data(attachment_value);
sub_data->add_new_message(
std::make_unique<SubscriptionData::Message>(
payload.as_vector(),
sample.get_payload(),
std::chrono::system_clock::now().time_since_epoch().count(),
std::move(attachment_data)),
std::string(sample.get_keyexpr().as_string_view()));
Expand Down
5 changes: 2 additions & 3 deletions rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
#include <string>
#include <unordered_map>
#include <variant>
#include <vector>

#include <zenoh.hxx>

Expand All @@ -51,13 +50,13 @@ class SubscriptionData final : public std::enable_shared_from_this<SubscriptionD
struct Message
{
explicit Message(
std::vector<uint8_t> && p,
const zenoh::Bytes & bytes,
uint64_t recv_ts,
AttachmentData && attachment);

~Message();

std::vector<uint8_t> payload;
Payload payload;
uint64_t recv_timestamp;
AttachmentData attachment;
};
Expand Down
61 changes: 61 additions & 0 deletions rmw_zenoh_cpp/src/detail/zenoh_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,65 @@ std::chrono::nanoseconds::rep ZenohReply::get_received_timestamp() const
{
return received_timestamp_;
}
<<<<<<< HEAD
=======

int64_t get_system_time_in_ns()
{
auto now = std::chrono::system_clock::now().time_since_epoch();
return std::chrono::duration_cast<std::chrono::nanoseconds>(now).count();
}

///=============================================================================
Payload::Payload(const zenoh::Bytes & bytes)
{
// NOTE(fuzzypixelz): `zenoh::Bytes` is an list of reference-couted buffers. When the list of
// buffers contains exactly one element, it is not necessary to concatenate the list of buffers.
// In this case, we store a clone of the bytes object to maintain a non-zero reference-count on
// the buffer. This ensures that the slice into said buffer stays valid until we drop our copy
// of the bytes object (at the very least). This case corresponds to the `Contiguous`
// alternative of the `bytes_` variant and aims to optimize away a memcpy during "session-local"
// communication.

zenoh::Bytes::SliceIterator slices = bytes.slice_iter();
std::optional<zenoh::Slice> slice = slices.next();
if (!slice.has_value()) {
bytes_ = nullptr;
} else {
if (!slices.next().has_value()) {
bytes_ = Contiguous {slice.value(), bytes.clone()};
} else {
bytes_ = bytes.as_vector();
}
}
}

const uint8_t * Payload::data() const
{
if (std::holds_alternative<Empty>(bytes_)) {
return nullptr;
} else if (std::holds_alternative<NonContiguous>(bytes_)) {
return std::get<NonContiguous>(bytes_).data();
} else {
return std::get<Contiguous>(bytes_).slice.data;
}
}

size_t Payload::size() const
{
if (std::holds_alternative<Empty>(bytes_)) {
return 0;
} else if (std::holds_alternative<NonContiguous>(bytes_)) {
return std::get<NonContiguous>(bytes_).size();
} else {
return std::get<Contiguous>(bytes_).slice.len;
}
}

bool Payload::empty() const
{
return std::holds_alternative<Empty>(bytes_);
}

>>>>>>> cebb972 (Don't copy contiguous bytes on reception (#343))
} // namespace rmw_zenoh_cpp
34 changes: 34 additions & 0 deletions rmw_zenoh_cpp/src/detail/zenoh_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
#include <chrono>
#include <functional>
#include <optional>
#include <utility>
#include <variant>
#include <vector>

#include "rmw/types.h"

Expand Down Expand Up @@ -65,6 +68,37 @@ class ZenohQuery final
zenoh::Query query_;
std::chrono::nanoseconds::rep received_timestamp_;
};
<<<<<<< HEAD
=======

int64_t get_system_time_in_ns();

class Payload
{
public:
explicit Payload(const zenoh::Bytes & bytes);

~Payload() = default;

const uint8_t * data() const;

size_t size() const;

bool empty() const;

private:
struct Contiguous
{
zenoh::Slice slice;
zenoh::Bytes bytes;
};
using NonContiguous = std::vector<uint8_t>;
using Empty = std::nullptr_t;
// Is `std::vector<uint8_t>` in case of a non-contiguous payload
// and `zenoh::Slice` plus a `zenoh::Bytes` otherwise.
std::variant<NonContiguous, Contiguous, Empty> bytes_;
};
>>>>>>> cebb972 (Don't copy contiguous bytes on reception (#343))
} // namespace rmw_zenoh_cpp
#endif // DETAIL__ZENOH_UTILS_HPP_

0 comments on commit 8e50d08

Please sign in to comment.