Skip to content

Commit

Permalink
Addresses Yadu's comments.
Browse files Browse the repository at this point in the history
Signed-off-by: Franco Cipollone <[email protected]>
  • Loading branch information
francocipollone committed Dec 22, 2023
1 parent 81b4f70 commit 0a42aca
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 24 deletions.
22 changes: 22 additions & 0 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@
#include "rmw_data_types.hpp"

///==============================================================================

saved_msg_data::saved_msg_data(zc_owned_payload_t p, uint64_t recv_ts, const uint8_t pub_gid[16])
: payload(p), recv_timestamp(recv_ts)
{
memcpy(publisher_gid, pub_gid, 16);
}

void sub_data_handler(
const z_sample_t * sample,
void * data)
Expand Down Expand Up @@ -71,6 +78,21 @@ void sub_data_handler(
z_drop(z_move(keystr));
}

saved_queryable_data::saved_queryable_data(z_owned_query_t query)
: query(query)
{}

saved_queryable_data::~saved_queryable_data()
{
z_query_drop(&query);
}


unsigned int rmw_service_data_t::get_new_uid()
{
return client_count++;
}

void service_data_handler(const z_query_t * query, void * service_data)
{
RCUTILS_LOG_INFO_NAMED(
Expand Down
19 changes: 5 additions & 14 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,7 @@ void sub_data_handler(const z_sample_t * sample, void * sub_data);

struct saved_msg_data
{
explicit saved_msg_data(zc_owned_payload_t p, uint64_t recv_ts, const uint8_t pub_gid[16])
: payload(p), recv_timestamp(recv_ts)
{
memcpy(publisher_gid, pub_gid, 16);
}
explicit saved_msg_data(zc_owned_payload_t p, uint64_t recv_ts, const uint8_t pub_gid[16]);

zc_owned_payload_t payload;
uint64_t recv_timestamp;
Expand Down Expand Up @@ -144,22 +140,17 @@ void client_data_handler(z_owned_reply_t * reply, void * client_data);

struct saved_queryable_data
{
explicit saved_queryable_data(z_owned_query_t query)
: query(query)
{
}
explicit saved_queryable_data(z_owned_query_t query);
~saved_queryable_data();

const z_owned_query_t query;
z_owned_query_t query;
};

///==============================================================================

struct rmw_service_data_t
{
unsigned int get_new_uid()
{
return client_count++;
}
unsigned int get_new_uid();

const char * zn_queryable_key;
z_owned_queryable_t zn_queryable;
Expand Down
47 changes: 37 additions & 10 deletions rmw_zenoh_cpp/src/rmw_zenoh.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1874,15 +1874,23 @@ rmw_destroy_client(rmw_node_t * node, rmw_client_t * client)
rcutils_allocator_t * allocator = &node->context->options.allocator;

auto client_data = static_cast<rmw_client_data_t *>(client->data);
RMW_CHECK_FOR_NULL_WITH_MSG(
client_data,
"client implementation pointer is null",
return RMW_RET_INVALID_ARGUMENT);

// CLEANUP ===================================================================
z_drop(z_move(client_data->zn_closure_reply));

allocator->deallocate(client_data->request_type_support, allocator->state);
allocator->deallocate(client_data->response_type_support, allocator->state);
allocator->deallocate(client->data, allocator->state);

allocator->deallocate(const_cast<char *>(client->service_name), allocator->state);
allocator->deallocate(client, allocator->state);

RCUTILS_LOG_DEBUG_NAMED(
"rmw_zenoh_cpp", "[rmw_destroy_client] %s FINISHED", client->service_name);
return RMW_RET_OK;
}

Expand Down Expand Up @@ -1957,13 +1965,11 @@ rmw_send_request(
// TODO(francocipollone): Do I really need the sequency number here?
*sequence_id = 0;


// Send request
z_get_options_t opts = z_get_options_default();
opts.value.payload = z_bytes_t{data_length, reinterpret_cast<const uint8_t *>(request_bytes)};
opts.value.encoding = z_encoding(Z_ENCODING_PREFIX_EMPTY, NULL);


z_owned_keyexpr_t keyexpr = ros_topic_name_to_zenoh_key(
client->service_name, client_data->context->actual_domain_id, allocator);
auto always_free_ros_keyexpr = rcpputils::make_scope_exit(
Expand Down Expand Up @@ -2264,7 +2270,6 @@ rmw_create_service(
allocator->deallocate(const_cast<char *>(rmw_service->service_name), allocator->state);
});


// Zenoh implementation for the service

// TODO(francocipollone): Replace ros_topic_name_to_zenoh_key by service related function.
Expand Down Expand Up @@ -2302,7 +2307,6 @@ rmw_create_service(
z_undeclare_queryable(z_move(service_data->zn_queryable));
});


// TODO(francocipollone): Update graph cache.

free_rmw_service.cancel();
Expand All @@ -2322,11 +2326,35 @@ rmw_create_service(
rmw_ret_t
rmw_destroy_service(rmw_node_t * node, rmw_service_t * service)
{
// Interim implementation to suppress type_description service that spins up
// with a node by default.
if (node == nullptr || service == nullptr) {
return RMW_RET_ERROR;
}
// ASSERTIONS ================================================================
RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(service, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
node,
node->implementation_identifier,
rmw_zenoh_identifier,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
service,
service->implementation_identifier,
rmw_zenoh_identifier,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);

rcutils_allocator_t * allocator = &node->context->options.allocator;

auto service_data = static_cast<rmw_service_data_t *>(service->data);
RMW_CHECK_FOR_NULL_WITH_MSG(
service_data,
"service implementation pointer is null",
return RMW_RET_INVALID_ARGUMENT);

// CLEANUP ================================================================
z_drop(z_move(service_data->zn_queryable));

allocator->deallocate(service_data->request_type_support, allocator->state);
allocator->deallocate(service_data->response_type_support, allocator->state);
allocator->deallocate(service->data, allocator->state);

rmw_service_free(service);

// TODO(francocipollone): Update graph cache.
Expand Down Expand Up @@ -2360,7 +2388,6 @@ rmw_take_request(
RMW_CHECK_FOR_NULL_WITH_MSG(
service->data, "service implementation pointer is null", RMW_RET_INVALID_ARGUMENT);


auto * service_data = static_cast<rmw_service_data_t *>(service->data);

std::unique_lock<std::mutex> lock(service_data->query_queue_mutex);
Expand Down

0 comments on commit 0a42aca

Please sign in to comment.