Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SHM mutation #328

Merged
merged 11 commits into from
Dec 18, 2024
4 changes: 1 addition & 3 deletions examples/zenohc/z_queryable_shm.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,7 @@ int _main(int argc, char **argv) {

const char *payload_type = "";
if (query_payload.has_value()) {
ZResult result;
query_payload->get().as_shm(&result);
if (result == Z_OK) {
if (query_payload->get().as_shm().has_value()) {
payload_type = "SHM";
} else {
payload_type = "RAW";
Expand Down
12 changes: 7 additions & 5 deletions examples/zenohc/z_sub_shm.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ const char *kind_to_str(SampleKind kind) {
}
}

void data_handler(const Sample &sample) {
void data_handler(Sample &sample) {
// if Zenoh is built without SHM support, the only buffer type it can receive is RAW
#if !defined(Z_FEATURE_SHARED_MEMORY)
const char *payload_type = "RAW";
Expand All @@ -50,10 +50,12 @@ void data_handler(const Sample &sample) {
#if defined(Z_FEATURE_SHARED_MEMORY) && defined(Z_FEATURE_UNSTABLE_API)
const char *payload_type = "RAW";
{
ZResult result;
sample.get_payload().as_shm(&result);
if (result == Z_OK) {
payload_type = "SHM";
// try to convert sample payload into SHM buffer. The conversion will succeed
// only if payload is carrying underlying SHM buffer
auto shm = sample.get_payload().as_shm();
if (shm.has_value()) {
// try to get mutable access to SHM buffer
payload_type = ZShm::try_mutate(shm.value()).has_value() ? "SHM (MUT)" : "SHM (IMMUT)";
}
}
#endif
Expand Down
20 changes: 15 additions & 5 deletions include/zenoh/api/bytes.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,21 @@ class Bytes : public Owned<::z_owned_bytes_t> {
#if (defined(Z_FEATURE_SHARED_MEMORY) && defined(Z_FEATURE_UNSTABLE_API))
/// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future
/// release.
ZShm as_shm(ZResult* err = nullptr) const {
ZShm shm = interop::detail::null<ZShm>();
__ZENOH_RESULT_CHECK(::z_bytes_to_owned_shm(interop::as_loaned_c_ptr(*this), interop::as_owned_c_ptr(shm)), err,
"Failed to deserialize into ZShm!");
return shm;
std::optional<std::reference_wrapper<const ZShm>> as_shm() const {
const z_loaned_shm_t* shm;
if (::z_bytes_as_loaned_shm(interop::as_loaned_c_ptr(*this), &shm) != Z_OK) {
return std::nullopt;
}
return std::cref(interop::as_owned_cpp_ref<const ZShm>(shm));
}
/// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future
/// release.
std::optional<std::reference_wrapper<ZShm>> as_shm() {
z_loaned_shm_t* shm;
if (::z_bytes_as_mut_loaned_shm(interop::as_loaned_c_ptr(*this), &shm) != Z_OK) {
return std::nullopt;
}
return std::ref(interop::as_owned_cpp_ref<ZShm>(shm));
}
#endif

Expand Down
9 changes: 9 additions & 0 deletions include/zenoh/api/query.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,15 @@ class Query : public Owned<::z_owned_query_t> {
if (payload == nullptr) return {};
return std::cref(interop::as_owned_cpp_ref<Bytes>(payload));
}
#if defined(ZENOHCXX_ZENOHC)
/// @brief Get the payload of the query.
/// @return payload of the query.
std::optional<std::reference_wrapper<Bytes>> get_payload() {
auto payload = ::z_query_payload_mut(interop::as_loaned_c_ptr(*this));
if (payload == nullptr) return {};
return std::ref(interop::as_owned_cpp_ref<Bytes>(payload));
}
#endif

/// @brief Get the encoding of the query.
/// @return encoding of the query.
Expand Down
7 changes: 7 additions & 0 deletions include/zenoh/api/reply.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ class ReplyError : public Owned<::z_owned_reply_err_t> {
const Bytes& get_payload() const {
return interop::as_owned_cpp_ref<Bytes>(::z_reply_err_payload(interop::as_loaned_c_ptr(*this)));
}
#if defined(ZENOHCXX_ZENOHC)
/// @brief The payload of this error.
/// @return error payload.
Bytes& get_payload() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

today there is no way to get mutable ReplyErr or Sample from reply, so this method is useless as it is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we should add this. SHM buffer may reside literally everywhere where ZBytes is

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is the problem. We can not add these methods, because this would allow user to apply std::move to mutable Sample or ReplyError which would lead to their destruction. Since on zenoh-c level calling destructors is only possible on owned objects and sample/error are stored as loaned inside reply this would lead to UB.

So it is ok to merge this pr as is, but adding mutable access to sample/reply_err from Reply would require first resolving eclipse-zenoh/zenoh-c#718.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made mutable access to sample/reply_err from Reply private for a while.

return interop::as_owned_cpp_ref<Bytes>(::z_reply_err_payload_mut(interop::as_loaned_c_ptr(*this)));
}
#endif

/// @brief The encoding of this error.
/// @return error encoding.
Expand Down
7 changes: 7 additions & 0 deletions include/zenoh/api/sample.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ class Sample : public Owned<::z_owned_sample_t> {
const Bytes& get_payload() const {
return interop::as_owned_cpp_ref<Bytes>(::z_sample_payload(interop::as_loaned_c_ptr(*this)));
}
#if defined(ZENOHCXX_ZENOHC)
/// @brief Get the data of this sample.
/// @return ``Bytes`` object representing the sample payload.
Bytes& get_payload() {
return interop::as_owned_cpp_ref<Bytes>(::z_sample_payload_mut(interop::as_loaned_c_ptr(*this)));
}
#endif

/// @brief Get the encoding of the data of this sample.
/// @return ``Encoding`` object.
Expand Down
4 changes: 2 additions & 2 deletions include/zenoh/api/session.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -433,14 +433,14 @@ class Session : public Owned<::z_owned_session_t> {
SubscriberOptions&& options = SubscriberOptions::create_default(),
ZResult* err = nullptr) const {
static_assert(
std::is_invocable_r<void, C, const Sample&>::value,
std::is_invocable_r<void, C, Sample&>::value,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar fixes need to be done for every subscriber/queryable and get function :(

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

"on_sample should be callable with the following signature: void on_sample(zenoh::Sample& sample)");
static_assert(std::is_invocable_r<void, D>::value,
"on_drop should be callable with the following signature: void on_drop()");
::z_owned_closure_sample_t c_closure;
using Cval = std::remove_reference_t<C>;
using Dval = std::remove_reference_t<D>;
using ClosureType = typename detail::closures::Closure<Cval, Dval, void, const Sample&>;
using ClosureType = typename detail::closures::Closure<Cval, Dval, void, Sample&>;
auto closure = ClosureType::into_context(std::forward<C>(on_sample), std::forward<D>(on_drop));
::z_closure(&c_closure, detail::closures::_zenoh_on_sample_call, detail::closures::_zenoh_on_drop, closure);
::z_subscriber_options_t opts;
Expand Down
10 changes: 10 additions & 0 deletions include/zenoh/api/shm/buffer/zshm.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,16 @@ class ZShm : public Owned<::z_owned_shm_t> {
}
return std::nullopt;
}

/// @brief Create a new ZShmMut& from ZShm&.
/// @param immut immutable buffer, NOTE: the value will not be moved if nullopt returned.
/// @return mutable buffer or empty option if buffer mutation is impossible.
static std::optional<std::reference_wrapper<ZShmMut>> try_mutate(ZShm& immut) {
if (z_loaned_shm_mut_t* shm_mut = ::z_shm_try_reloan_mut(z_loan_mut(immut._0))) {
return std::ref(interop::as_owned_cpp_ref<ZShmMut>(shm_mut));
}
return std::nullopt;
}
};

} // end of namespace zenoh
Loading