Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shm #41

Open
wants to merge 27 commits into
base: dev/1.0.0
Choose a base branch
from
Open

Shm #41

Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
1d7280b
Proper SHM subsystem support
yellowhatter Nov 12, 2024
016fdde
add .gitignore
yellowhatter Nov 12, 2024
972bd8e
Merge commit '28d917e0532fed6e9703f044662298f2b3553716'
yellowhatter Nov 12, 2024
0b3b7dc
ament_uncrustify
yellowhatter Nov 12, 2024
48fcef8
ament_cpplint
yellowhatter Nov 12, 2024
f314501
uncrustify
yellowhatter Nov 12, 2024
388d8c6
code format fixes
yellowhatter Nov 12, 2024
5643258
build with SHM by default to align with other rmw impls
yellowhatter Nov 13, 2024
498c211
refactor: disable shared-memory in the config since it hasn't been te…
YuanYuYuan Nov 13, 2024
4a832b1
fix: inlcude shm_context.cpp in the build
yellowhatter Nov 14, 2024
027dce2
fix: use TRUE value to configure the feature flag
yellowhatter Nov 14, 2024
bdf9b21
fix: fix the SHM enabled check
yellowhatter Nov 14, 2024
2de616c
fix ShmContext move behavior
yellowhatter Nov 14, 2024
826b17b
brush-up shm
yellowhatter Nov 15, 2024
74d8761
Update CMakeLists.txt
yellowhatter Nov 15, 2024
3d21e89
Revert "Update CMakeLists.txt"
yellowhatter Nov 18, 2024
a9ef107
Update CMakeLists.txt
yellowhatter Nov 18, 2024
c2c8292
Update CMakeLists.txt
yellowhatter Nov 18, 2024
daf5278
Update CMakeLists.txt
yellowhatter Nov 18, 2024
221b522
Enable SHM in the default config
yellowhatter Nov 18, 2024
3054ba6
- add SHM support to serialized messages
yellowhatter Nov 18, 2024
50e4b69
Merge commit '7450bb26cfbf8e811c9404d5e535dfd5f51402ae'
yellowhatter Nov 20, 2024
74ed769
fix format
yellowhatter Nov 20, 2024
f0cf6a3
Update shm_context.cpp
yellowhatter Nov 20, 2024
c30ad77
Update rmw_publisher_data.cpp
yellowhatter Nov 20, 2024
e2ce9e3
Merge commit 'b485a93ed637162c674a447ac36d70735e0f0870'
yellowhatter Nov 27, 2024
d395c37
Merge commit '435186ad7a867416510f124f9b62224b72e50524'
yellowhatter Nov 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.vscode
10 changes: 10 additions & 0 deletions rmw_zenoh_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic)
endif()

set(RMW_ZENOH_BUILD_WITH_SHARED_MEMORY TRUE CACHE BOOL "Compile Zenoh RMW with Shared Memory support")

# find dependencies
find_package(ament_cmake REQUIRED)

Expand Down Expand Up @@ -47,6 +49,7 @@ add_library(rmw_zenoh_cpp SHARED
src/detail/zenoh_config.cpp
src/detail/zenoh_router_check.cpp
src/detail/zenoh_utils.cpp
src/detail/shm_context.cpp
src/rmw_event.cpp
src/rmw_get_network_flow_endpoints.cpp
src/rmw_get_node_info_and_types.cpp
Expand Down Expand Up @@ -80,6 +83,13 @@ target_compile_definitions(rmw_zenoh_cpp
RMW_VERSION_PATCH=${rmw_VERSION_PATCH}
)

if(${RMW_ZENOH_BUILD_WITH_SHARED_MEMORY})
target_compile_definitions(rmw_zenoh_cpp
PUBLIC
RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
)
endif()

ament_export_targets(export_rmw_zenoh_cpp)

register_rmw_implementation(
Expand Down
4 changes: 1 addition & 3 deletions rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -457,9 +457,7 @@
/// A probing procedure for shared memory is performed upon session opening. To enable zenoh to operate
/// over shared memory (and to not fallback on network mode), shared memory needs to be enabled also on the
/// subscriber side. By doing so, the probing procedure will succeed and shared memory will operate as expected.
///
/// ROS setting: disabled by default until fully tested
enabled: false,
enabled: true,
},
auth: {
/// The configuration of authentication.
Expand Down
4 changes: 1 addition & 3 deletions rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -462,9 +462,7 @@
/// A probing procedure for shared memory is performed upon session opening. To enable zenoh to operate
/// over shared memory (and to not fallback on network mode), shared memory needs to be enabled also on the
/// subscriber side. By doing so, the probing procedure will succeed and shared memory will operate as expected.
///
/// ROS setting: disabled by default until fully tested
enabled: false,
enabled: true,
},
auth: {
/// The configuration of authentication.
Expand Down
65 changes: 30 additions & 35 deletions rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,31 +164,20 @@ class rmw_context_impl_s::Data final
}
z_drop(z_move(handler));

// Initialize the shm manager if shared_memory is enabled in the config.
shm_provider_ = std::nullopt;
if (strncmp(
z_string_data(z_loan(shm_enabled)),
"true",
z_string_len(z_loan(shm_enabled))) == 0)
{
// TODO(yuyuan): determine the default alignment of SHM
z_alloc_alignment_t alignment = {5};
z_owned_memory_layout_t layout;
z_memory_layout_new(&layout, SHM_BUFFER_SIZE_MB * 1024 * 1024, alignment);

z_owned_shm_provider_t provider;
if (z_posix_shm_provider_new(&provider, z_loan(layout)) != Z_OK) {
RMW_ZENOH_LOG_ERROR_NAMED("rmw_zenoh_cpp", "Unable to create an SHM provider.");
throw std::runtime_error("Unable to create an SHM provider.");
}
shm_provider_ = provider;
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
// Initialize the shm subsystem if shared_memory is enabled in the config
if (rmw_zenoh_cpp::zenoh_shm_enabled()) {
RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is enabled");

shm_ = std::make_optional(
rmw_zenoh_cpp::ShmContext(
rmw_zenoh_cpp::zenoh_shm_alloc_size(),
rmw_zenoh_cpp::zenoh_shm_message_size_threshold()
));
} else {
RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is disabled");
}
auto free_shm_provider = rcpputils::make_scope_exit(
[this]() {
if (shm_provider_.has_value()) {
z_drop(z_move(shm_provider_.value()));
}
});
#endif

graph_guard_condition_ = std::make_unique<rmw_guard_condition_t>();
graph_guard_condition_->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier;
Expand Down Expand Up @@ -216,7 +205,6 @@ class rmw_context_impl_s::Data final
}

close_session.cancel();
free_shm_provider.cancel();
undeclare_z_sub.cancel();
}

Expand All @@ -231,9 +219,10 @@ class rmw_context_impl_s::Data final
}

z_undeclare_subscriber(z_move(graph_subscriber_));
if (shm_provider_.has_value()) {
z_drop(z_move(shm_provider_.value()));
}
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
// drop SHM subsystem if used
shm_ = std::nullopt;
#endif
is_shutdown_ = true;

// We specifically do *not* hold the mutex_ while tearing down the session; this allows us
Expand All @@ -260,11 +249,13 @@ class rmw_context_impl_s::Data final
return z_loan(session_);
}

std::optional<z_owned_shm_provider_t> & shm_provider()
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
std::optional<rmw_zenoh_cpp::ShmContext> & shm()
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
return shm_provider_;
return shm_;
}
#endif

rmw_guard_condition_t * graph_guard_condition()
{
Expand Down Expand Up @@ -396,9 +387,6 @@ class rmw_context_impl_s::Data final
std::string enclave_;
// An owned session.
z_owned_session_t session_;
// An optional SHM manager that is initialized of SHM is enabled in the
// zenoh session config.
std::optional<z_owned_shm_provider_t> shm_provider_;
// Graph cache.
std::shared_ptr<rmw_zenoh_cpp::GraphCache> graph_cache_;
// ROS graph liveliness subscriber.
Expand All @@ -414,6 +402,11 @@ class rmw_context_impl_s::Data final
std::size_t next_entity_id_;
// Nodes created from this context.
std::unordered_map<const rmw_node_t *, std::shared_ptr<rmw_zenoh_cpp::NodeData>> nodes_;
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
// An optional SHM context that is initialized if SHM is enabled in the
// zenoh session config.
std::optional<rmw_zenoh_cpp::ShmContext> shm_;
#endif
};

///=============================================================================
Expand Down Expand Up @@ -477,10 +470,12 @@ const z_loaned_session_t * rmw_context_impl_s::session() const
}

///=============================================================================
std::optional<z_owned_shm_provider_t> & rmw_context_impl_s::shm_provider()
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
std::optional<rmw_zenoh_cpp::ShmContext> & rmw_context_impl_s::shm()
{
return data_->shm_provider();
return data_->shm();
}
#endif

///=============================================================================
rmw_guard_condition_t * rmw_context_impl_s::graph_guard_condition()
Expand Down
6 changes: 4 additions & 2 deletions rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,13 @@ class rmw_context_impl_s final
// Loan the Zenoh session.
const z_loaned_session_t * session() const;

// Get a reference to the shm_provider.
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
// Get a reference to the shm subsystem.
// Note: This is not thread-safe.
// TODO(Yadunund): Remove this API and instead include a publish() API
// that handles the shm_provider once the context manages publishers.
std::optional<z_owned_shm_provider_t> & shm_provider();
std::optional<rmw_zenoh_cpp::ShmContext> & shm();
#endif

// Get the graph guard condition.
rmw_guard_condition_t * graph_guard_condition();
Expand Down
93 changes: 74 additions & 19 deletions rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,11 @@ PublisherData::PublisherData(

///=============================================================================
rmw_ret_t PublisherData::publish(
const void * ros_message,
std::optional<z_owned_shm_provider_t> & shm_provider)
const void * ros_message
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
, std::optional<ShmContext> & shm
#endif
)
{
std::lock_guard<std::mutex> lock(mutex_);
if (is_shutdown_) {
Expand All @@ -236,38 +239,52 @@ rmw_ret_t PublisherData::publish(
}

// Serialize data.
size_t max_data_length = type_support_->get_estimated_serialized_size(
const size_t max_data_length = type_support_->get_estimated_serialized_size(
ros_message,
type_support_impl_);

// To store serialized message byte array.
char * msg_bytes = nullptr;

#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
std::optional<z_owned_shm_mut_t> shmbuf = std::nullopt;
auto always_free_shmbuf = rcpputils::make_scope_exit(
[&shmbuf]() {
if (shmbuf.has_value()) {
z_drop(z_move(shmbuf.value()));
}
});
#endif

rcutils_allocator_t * allocator = &rmw_node_->context->options.allocator;

auto always_free_msg_bytes = rcpputils::make_scope_exit(
[&msg_bytes, allocator, &shmbuf]() {
if (msg_bytes && !shmbuf.has_value()) {
[&msg_bytes, allocator
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
, &shmbuf
#endif
]() {
if (msg_bytes
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
&& !shmbuf.has_value()
#endif
)
{
allocator->deallocate(msg_bytes, allocator->state);
}
});

#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
// Get memory from SHM buffer if available.
if (shm_provider.has_value()) {
if (shm.has_value() && max_data_length >= shm.value().msgsize_threshold) {
RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is enabled.");

auto provider = shm_provider.value();
auto & provider = shm.value().shm_provider;

// TODO(yellowhatter): SHM, use alignment based on msgsize_threshold
z_alloc_alignment_t alignment = {0};
z_buf_layout_alloc_result_t alloc;
// TODO(yuyuan): SHM, configure this
z_alloc_alignment_t alignment = {5};
z_shm_provider_alloc_gc_defrag_blocking(&alloc, z_loan(provider), SHM_BUF_OK_SIZE, alignment);
z_shm_provider_alloc_gc_defrag_blocking(&alloc, z_loan(provider), max_data_length, alignment);

if (alloc.status == ZC_BUF_LAYOUT_ALLOC_STATUS_OK) {
shmbuf = std::make_optional(alloc.buf);
Expand All @@ -278,11 +295,14 @@ rmw_ret_t PublisherData::publish(
return RMW_RET_ERROR;
}
} else {
// Get memory from the allocator.
msg_bytes = static_cast<char *>(allocator->allocate(max_data_length, allocator->state));
RMW_CHECK_FOR_NULL_WITH_MSG(
msg_bytes, "bytes for message is null", return RMW_RET_BAD_ALLOC);
}
#endif
// Get memory from the allocator.
msg_bytes = static_cast<char *>(allocator->allocate(max_data_length, allocator->state));
RMW_CHECK_FOR_NULL_WITH_MSG(
msg_bytes, "bytes for message is null", return RMW_RET_BAD_ALLOC);
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
}
#endif

// Object that manages the raw buffer
eprosima::fastcdr::FastBuffer fastbuffer(msg_bytes, max_data_length);
Expand Down Expand Up @@ -312,11 +332,16 @@ rmw_ret_t PublisherData::publish(
options.attachment = z_move(attachment);

z_owned_bytes_t payload;
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
always_free_shmbuf.cancel();
if (shmbuf.has_value()) {
z_bytes_from_shm_mut(&payload, z_move(shmbuf.value()));
} else {
z_bytes_copy_from_buf(&payload, reinterpret_cast<const uint8_t *>(msg_bytes), data_length);
}
#endif
z_bytes_copy_from_buf(&payload, reinterpret_cast<const uint8_t *>(msg_bytes), data_length);
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
}
#endif

z_result_t res = z_publisher_put(z_loan(pub_), z_move(payload), &options);
if (res != Z_OK) {
Expand All @@ -335,8 +360,11 @@ rmw_ret_t PublisherData::publish(

///=============================================================================
rmw_ret_t PublisherData::publish_serialized_message(
const rmw_serialized_message_t * serialized_message,
std::optional<z_owned_shm_provider_t> & /*shm_provider*/)
const rmw_serialized_message_t * serialized_message
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
, std::optional<ShmContext> & shm
#endif
)
{
eprosima::fastcdr::FastBuffer buffer(
reinterpret_cast<char *>(serialized_message->buffer), serialized_message->buffer_length);
Expand All @@ -363,8 +391,35 @@ rmw_ret_t PublisherData::publish_serialized_message(

options.attachment = z_move(attachment);


z_owned_bytes_t payload;
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
// Get memory from SHM buffer if available.
if (shm.has_value() && data_length >= shm.value().msgsize_threshold) {
RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is enabled.");

auto & provider = shm.value().shm_provider;

// TODO(yellowhatter): SHM, use alignment based on msgsize_threshold
z_alloc_alignment_t alignment = {0};
z_buf_layout_alloc_result_t alloc;
z_shm_provider_alloc_gc_defrag_blocking(&alloc, z_loan(provider), data_length, alignment);

if (alloc.status == ZC_BUF_LAYOUT_ALLOC_STATUS_OK) {
auto msg_bytes = reinterpret_cast<char *>(z_shm_mut_data_mut(z_loan_mut(alloc.buf)));
memcpy(msg_bytes, serialized_message->buffer, data_length);
z_bytes_from_shm_mut(&payload, z_move(alloc.buf));
} else {
// TODO(Yadunund): Should we revert to regular allocation and not return an error?
RMW_SET_ERROR_MSG("Failed to allocate a SHM buffer, even after GCing.");
return RMW_RET_ERROR;
}
} else {
#endif
z_bytes_copy_from_buf(&payload, serialized_message->buffer, data_length);
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
}
#endif

z_result_t res = z_publisher_put(z_loan(pub_), z_move(payload), &options);
if (res != Z_OK) {
Expand Down
15 changes: 11 additions & 4 deletions rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "event.hpp"
#include "liveliness_utils.hpp"
#include "message_type_support.hpp"
#include "shm_context.hpp"
#include "type_support_common.hpp"

#include "rcutils/allocator.h"
Expand All @@ -53,13 +54,19 @@ class PublisherData final

// Publish a ROS message.
rmw_ret_t publish(
const void * ros_message,
std::optional<z_owned_shm_provider_t> & shm_provider);
const void * ros_message
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
, std::optional<ShmContext> & shm
#endif
);

// Publish a serialized ROS message.
rmw_ret_t publish_serialized_message(
const rmw_serialized_message_t * serialized_message,
std::optional<z_owned_shm_provider_t> & shm_provider);
const rmw_serialized_message_t * serialized_message
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
, std::optional<ShmContext> & shm
#endif
);

// Get a copy of the keyexpr_hash of this PublisherData's liveliness::Entity.
std::size_t keyexpr_hash() const;
Expand Down
Loading