Skip to content

Commit

Permalink
Port zenoh_c to zenoh_cpp (#327)
Browse files Browse the repository at this point in the history
* zenoh_c to zenoh_cpp

removed zenoh_router_check

removed rmw_init_options_impl.hpp

Publisher

Added subscribers

Removed zid_to_str

use zenoh::KeyExpr

service data

client data

make linters happy

wrap the zenoh session with a shared pointer

* Removed shm and small fix

Fix the high latency on the subscriber due to zenoh bytes conversion

* Fixed windows

* Switch to storing GIDs with a std::array.

They are a fixed size, so should use the fixed-size data
structure.

* Remove unnecessary ClientData init() method.

Since we aren't calling shared_from_this() in it anymore,
we can do all of the work in the constructor.

* Rewrite connecting to the router to only print when it fails.

This simplifies the code, and makes it less noisy.

Signed-off-by: Alejandro Hernández Cordero <[email protected]>
Signed-off-by: Yadunund <[email protected]>
Signed-off-by: Chris Lalancette <[email protected]>
Co-authored-by: Yuyuan Yuan <[email protected]>
  • Loading branch information
ahcorde and YuanYuYuan authored Dec 17, 2024
1 parent dd82e84 commit 271da8d
Show file tree
Hide file tree
Showing 36 changed files with 881 additions and 1,116 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,6 @@ jobs:
with:
package-name: |
rmw_zenoh_cpp
zenoh_c_vendor
zenoh_cpp_vendor
target-ros2-distro: ${{ matrix.ROS_DISTRO }}
vcs-repo-file-url: ${{ matrix.BUILD_TYPE == 'source' && env.ROS2_REPOS_FILE_URL || '' }}
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ Build `rmw_zenoh_cpp`

>Note: By default, we vendor and compile `zenoh-c` with a subset of `zenoh` features.
The `ZENOHC_CARGO_FLAGS` CMake argument may be overwritten with other features included if required.
See [zenoh_c_vendor/CMakeLists.txt](./zenoh_c_vendor/CMakeLists.txt) for more details.
See [zenoh_cpp_vendor/CMakeLists.txt](./zenoh_cpp_vendor/CMakeLists.txt) for more details.

```bash
mkdir ~/ws_rmw_zenoh/src -p && cd ~/ws_rmw_zenoh/src
Expand Down
7 changes: 3 additions & 4 deletions rmw_zenoh_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ find_package(rcutils REQUIRED)
find_package(rosidl_typesupport_fastrtps_c REQUIRED)
find_package(rosidl_typesupport_fastrtps_cpp REQUIRED)
find_package(rmw REQUIRED)
find_package(zenoh_c_vendor REQUIRED)
find_package(zenoh_cpp_vendor REQUIRED)

add_library(rmw_zenoh_cpp SHARED
src/detail/attachment_helpers.cpp
Expand All @@ -45,7 +45,6 @@ add_library(rmw_zenoh_cpp SHARED
src/detail/type_support.cpp
src/detail/type_support_common.cpp
src/detail/zenoh_config.cpp
src/detail/zenoh_router_check.cpp
src/detail/zenoh_utils.cpp
src/rmw_event.cpp
src/rmw_get_network_flow_endpoints.cpp
Expand All @@ -68,7 +67,7 @@ target_link_libraries(rmw_zenoh_cpp
rosidl_typesupport_fastrtps_c::rosidl_typesupport_fastrtps_c
rosidl_typesupport_fastrtps_cpp::rosidl_typesupport_fastrtps_cpp
rmw::rmw
zenohc::lib
zenohcxx::zenohc
)

configure_rmw_library(rmw_zenoh_cpp)
Expand Down Expand Up @@ -130,7 +129,7 @@ target_link_libraries(rmw_zenohd
rcutils::rcutils
rcpputils::rcpputils
rmw::rmw
zenohc::lib
zenohcxx::zenohc
)

install(
Expand Down
4 changes: 2 additions & 2 deletions rmw_zenoh_cpp/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@

<buildtool_depend>ament_cmake</buildtool_depend>

<build_depend>zenoh_c_vendor</build_depend>
<build_export_depend>zenoh_c_vendor</build_export_depend>
<build_depend>zenoh_cpp_vendor</build_depend>
<build_export_depend>zenoh_cpp_vendor</build_export_depend>

<depend>ament_index_cpp</depend>
<depend>fastcdr</depend>
Expand Down
123 changes: 47 additions & 76 deletions rmw_zenoh_cpp/src/detail/attachment_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,90 +12,40 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <zenoh.h>

#include <array>
#include <cstdlib>
#include <cstring>
#include <stdexcept>
#include <string_view>
#include <string>
#include <utility>

#include <zenoh.hxx>

#include "rmw/types.h"

#include "attachment_helpers.hpp"
#include "liveliness_utils.hpp"

namespace rmw_zenoh_cpp
{
///=============================================================================

AttachmentData::AttachmentData(
const int64_t sequence_number,
const int64_t source_timestamp,
const uint8_t source_gid[RMW_GID_STORAGE_SIZE])
const std::array<uint8_t, RMW_GID_STORAGE_SIZE> source_gid)
: sequence_number_(sequence_number),
source_timestamp_(source_timestamp)
source_timestamp_(source_timestamp),
source_gid_(source_gid),
gid_hash_(hash_gid(source_gid))
{
memcpy(source_gid_, source_gid, RMW_GID_STORAGE_SIZE);
gid_hash_ = hash_gid(source_gid_);
}

///=============================================================================
AttachmentData::AttachmentData(AttachmentData && data)
: sequence_number_(std::move(data.sequence_number_)),
source_timestamp_(std::move(data.source_timestamp_)),
gid_hash_(std::move(data.gid_hash_))
{
memcpy(source_gid_, data.source_gid_, RMW_GID_STORAGE_SIZE);
}

///=============================================================================
AttachmentData::AttachmentData(const z_loaned_bytes_t * attachment)
{
ze_deserializer_t deserializer = ze_deserializer_from_bytes(attachment);
z_owned_string_t key;

// Deserialize the sequence_number
ze_deserializer_deserialize_string(&deserializer, &key);
if (std::string_view(
z_string_data(z_loan(key)),
z_string_len(z_loan(key))) != "sequence_number")
{
throw std::runtime_error("sequence_number is not found in the attachment.");
}
z_drop(z_move(key));
if (ze_deserializer_deserialize_int64(&deserializer, &this->sequence_number_)) {
throw std::runtime_error("Failed to deserialize the sequence_number.");
}

// Deserialize the source_timestamp
ze_deserializer_deserialize_string(&deserializer, &key);
if (std::string_view(
z_string_data(z_loan(key)),
z_string_len(z_loan(key))) != "source_timestamp")
{
throw std::runtime_error("source_timestamp is not found in the attachment");
}
z_drop(z_move(key));
if (ze_deserializer_deserialize_int64(&deserializer, &this->source_timestamp_)) {
throw std::runtime_error("Failed to deserialize the source_timestamp.");
}

// Deserialize the source_gid
ze_deserializer_deserialize_string(&deserializer, &key);
if (std::string_view(z_string_data(z_loan(key)), z_string_len(z_loan(key))) != "source_gid") {
throw std::runtime_error("Invalid attachment: the key source_gid is not found");
}
z_drop(z_move(key));
z_owned_slice_t slice;
if (ze_deserializer_deserialize_slice(&deserializer, &slice)) {
throw std::runtime_error("Failed to deserialize the source_gid.");
}
if (z_slice_len(z_loan(slice)) != RMW_GID_STORAGE_SIZE) {
throw std::runtime_error("The length of source_gid mismatched.");
}
memcpy(this->source_gid_, z_slice_data(z_loan(slice)), z_slice_len(z_loan(slice)));
z_drop(z_move(slice));
gid_hash_ = hash_gid(this->source_gid_);
gid_hash_ = std::move(data.gid_hash_);
sequence_number_ = std::move(data.sequence_number_);
source_timestamp_ = std::move(data.source_timestamp_);
source_gid_ = data.source_gid_;
}

///=============================================================================
Expand All @@ -111,9 +61,9 @@ int64_t AttachmentData::source_timestamp() const
}

///=============================================================================
void AttachmentData::copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const
std::array<uint8_t, RMW_GID_STORAGE_SIZE> AttachmentData::copy_gid() const
{
memcpy(out_gid, source_gid_, RMW_GID_STORAGE_SIZE);
return source_gid_;
}

///=============================================================================
Expand All @@ -122,17 +72,38 @@ size_t AttachmentData::gid_hash() const
return gid_hash_;
}

///=============================================================================
void AttachmentData::serialize_to_zbytes(z_owned_bytes_t * attachment)
zenoh::Bytes AttachmentData::serialize_to_zbytes()
{
auto serializer = zenoh::ext::Serializer();
serializer.serialize(std::string("sequence_number"));
serializer.serialize(this->sequence_number_);
serializer.serialize(std::string("source_timestamp"));
serializer.serialize(this->source_timestamp_);
serializer.serialize(std::string("source_gid"));
serializer.serialize(this->source_gid_);
return std::move(serializer).finish();
}

AttachmentData::AttachmentData(const zenoh::Bytes & bytes)
{
ze_owned_serializer_t serializer;
ze_serializer_empty(&serializer);
ze_serializer_serialize_str(z_loan_mut(serializer), "sequence_number");
ze_serializer_serialize_int64(z_loan_mut(serializer), this->sequence_number_);
ze_serializer_serialize_str(z_loan_mut(serializer), "source_timestamp");
ze_serializer_serialize_int64(z_loan_mut(serializer), this->source_timestamp_);
ze_serializer_serialize_str(z_loan_mut(serializer), "source_gid");
ze_serializer_serialize_buf(z_loan_mut(serializer), this->source_gid_, RMW_GID_STORAGE_SIZE);
ze_serializer_finish(z_move(serializer), attachment);
zenoh::ext::Deserializer deserializer(bytes);
const auto sequence_number_str = deserializer.deserialize<std::string>();
if (sequence_number_str != "sequence_number") {
throw std::runtime_error("sequence_number is not found in the attachment.");
}
this->sequence_number_ = deserializer.deserialize<int64_t>();

const auto source_timestamp_str = deserializer.deserialize<std::string>();
if (source_timestamp_str != "source_timestamp") {
throw std::runtime_error("source_timestamp is not found in the attachment.");
}
this->source_timestamp_ = deserializer.deserialize<int64_t>();

const auto source_gid_str = deserializer.deserialize<std::string>();
if (source_gid_str != "source_gid") {
throw std::runtime_error("source_gid is not found in the attachment.");
}
this->source_gid_ = deserializer.deserialize<std::array<uint8_t, RMW_GID_STORAGE_SIZE>>();
gid_hash_ = hash_gid(this->source_gid_);
}
} // namespace rmw_zenoh_cpp
16 changes: 10 additions & 6 deletions rmw_zenoh_cpp/src/detail/attachment_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
#ifndef DETAIL__ATTACHMENT_HELPERS_HPP_
#define DETAIL__ATTACHMENT_HELPERS_HPP_

#include <zenoh.h>
#include <array>
#include <cstdint>

#include <zenoh.hxx>

#include "rmw/types.h"

Expand All @@ -28,21 +31,22 @@ class AttachmentData final
AttachmentData(
const int64_t sequence_number,
const int64_t source_timestamp,
const uint8_t source_gid[RMW_GID_STORAGE_SIZE]);
explicit AttachmentData(const z_loaned_bytes_t *);
const std::array<uint8_t, RMW_GID_STORAGE_SIZE> source_gid);

explicit AttachmentData(const zenoh::Bytes & bytes);
explicit AttachmentData(AttachmentData && data);

int64_t sequence_number() const;
int64_t source_timestamp() const;
void copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const;
std::array<uint8_t, RMW_GID_STORAGE_SIZE> copy_gid() const;
size_t gid_hash() const;

void serialize_to_zbytes(z_owned_bytes_t *);
zenoh::Bytes serialize_to_zbytes();

private:
int64_t sequence_number_;
int64_t source_timestamp_;
uint8_t source_gid_[RMW_GID_STORAGE_SIZE];
std::array<uint8_t, RMW_GID_STORAGE_SIZE> source_gid_;
size_t gid_hash_;
};
} // namespace rmw_zenoh_cpp
Expand Down
9 changes: 4 additions & 5 deletions rmw_zenoh_cpp/src/detail/graph_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ TopicData::TopicData(ConstEntityPtr entity)
}

///=============================================================================
GraphCache::GraphCache(const z_id_t & zid)
: zid_str_(liveliness::zid_to_str(zid))
GraphCache::GraphCache(const zenoh::Id & zid)
: zid_str_(zid.to_string())
{
// Do nothing.
}
Expand Down Expand Up @@ -1168,8 +1168,7 @@ rmw_ret_t GraphCache::get_entities_info_by_topic(
}
}

memset(ep.endpoint_gid, 0, RMW_GID_STORAGE_SIZE);
entity->copy_gid(ep.endpoint_gid);
memcpy(ep.endpoint_gid, entity->copy_gid().data(), RMW_GID_STORAGE_SIZE);

endpoints.push_back(ep);
}
Expand Down Expand Up @@ -1202,7 +1201,7 @@ rmw_ret_t GraphCache::service_server_is_available(
service_it->second.find(client_topic_info.type_);
if (type_it != service_it->second.end()) {
for (const auto & [_, topic_data] : type_it->second) {
if (topic_data->subs_.size() > 0) {
if (!topic_data->subs_.empty()) {
*is_available = true;
return RMW_RET_OK;
}
Expand Down
3 changes: 2 additions & 1 deletion rmw_zenoh_cpp/src/detail/graph_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "rmw/get_topic_endpoint_info.h"
#include "rmw/names_and_types.h"

#include <zenoh/api/id.hxx>

namespace rmw_zenoh_cpp
{
Expand Down Expand Up @@ -110,7 +111,7 @@ class GraphCache final
/// @param id The id of the zenoh session that is building the graph cache.
/// This is used to infer which entities originated from the current session
/// so that appropriate event callbacks may be triggered.
explicit GraphCache(const z_id_t & zid);
explicit GraphCache(const zenoh::Id & zid);

// Parse a PUT message over a token's key-expression and update the graph.
void parse_put(const std::string & keyexpr, bool ignore_from_current_session = false);
Expand Down
Loading

0 comments on commit 271da8d

Please sign in to comment.