Skip to content

Commit

Permalink
[core] zero-copy-zero-payload-shm-transfer (#1683)
Browse files Browse the repository at this point in the history
* zero length payload was not transferred over shm in zero copy mode
  • Loading branch information
rex-schilasky authored Jul 30, 2024
1 parent 9b4c0d3 commit 63d1cc2
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 14 deletions.
26 changes: 19 additions & 7 deletions ecal/core/src/io/shm/ecal_memfile_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,14 +211,26 @@ namespace eCAL
// -------------------------------------------------------------------------
if (zero_copy_allowed)
{
// acquire memory file payload pointer (no copying here)
const void* buf(nullptr);
if (m_memfile.GetReadAddress(buf, mfile_hdr.data_size) > 0)
if (m_data_callback)
{
// calculate data buffer offset
const char* data_buf = static_cast<const char*>(buf) + mfile_hdr.hdr_size;
// add sample to data reader (and call user callback function)
if (m_data_callback) m_data_callback(topic_name_, topic_id_, data_buf, mfile_hdr.data_size, (long long)mfile_hdr.id, (long long)mfile_hdr.clock, (long long)mfile_hdr.time, (size_t)mfile_hdr.hash);
const char* data_buf = nullptr;
if (mfile_hdr.data_size > 0)
{
// acquire memory file payload pointer (no copying here)
const void* buf(nullptr);
if (m_memfile.GetReadAddress(buf, mfile_hdr.data_size) > 0)
{
// calculate user payload address
data_buf = static_cast<const char*>(buf) + mfile_hdr.hdr_size;
// call user callback function
m_data_callback(topic_name_, topic_id_, data_buf, mfile_hdr.data_size, (long long)mfile_hdr.id, (long long)mfile_hdr.clock, (long long)mfile_hdr.time, (size_t)mfile_hdr.hash);
}
}
else
{
// call user callback function
m_data_callback(topic_name_, topic_id_, data_buf, mfile_hdr.data_size, (long long)mfile_hdr.id, (long long)mfile_hdr.clock, (long long)mfile_hdr.time, (size_t)mfile_hdr.hash);
}
}
}
// -------------------------------------------------------------------------
Expand Down
28 changes: 21 additions & 7 deletions ecal/tests/cpp/pubsub_test/src/pubsub_test_shm.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
* Copyright (C) 2016 - 2024 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -64,8 +64,13 @@ TEST(core_cpp_pubsub, ZeroPayloadMessageSHM)
pub_config.layer.udp.enable = false;
pub_config.layer.tcp.enable = false;

// create publisher for topic "A"
eCAL::CPublisher pub("A", pub_config);
// create publisher for topic "A" (no zero copy)
eCAL::CPublisher pub1("A", pub_config);

// switch on zero copy
pub_config.layer.shm.zero_copy_mode = true;
eCAL::CPublisher pub2("A", pub_config);


// add callback
EXPECT_EQ(true, sub.AddReceiveCallback(std::bind(OnReceive, std::placeholders::_1, std::placeholders::_2)));
Expand All @@ -76,21 +81,30 @@ TEST(core_cpp_pubsub, ZeroPayloadMessageSHM)
g_callback_received_bytes = 0;
g_callback_received_count = 0;

EXPECT_EQ(send_s.size(), pub.Send(send_s));
// send without zero copy
EXPECT_EQ(send_s.size(), pub1.Send(send_s));
eCAL::Process::SleepMS(DATA_FLOW_TIME_MS);

EXPECT_EQ(send_s.size(), pub1.Send(nullptr, 0));
eCAL::Process::SleepMS(DATA_FLOW_TIME_MS);

// send with zero copy
EXPECT_EQ(send_s.size(), pub2.Send(send_s));
eCAL::Process::SleepMS(DATA_FLOW_TIME_MS);

EXPECT_EQ(send_s.size(), pub.Send(nullptr, 0));
EXPECT_EQ(send_s.size(), pub2.Send(nullptr, 0));
eCAL::Process::SleepMS(DATA_FLOW_TIME_MS);

// check callback receive
EXPECT_EQ(send_s.size(), g_callback_received_bytes);
EXPECT_EQ(2, g_callback_received_count);
EXPECT_EQ(4, g_callback_received_count);

// destroy subscriber
sub.Destroy();

// destroy publisher
pub.Destroy();
pub1.Destroy();
pub2.Destroy();

// finalize eCAL API
eCAL::Finalize();
Expand Down

0 comments on commit 63d1cc2

Please sign in to comment.