Skip to content

Commit

Permalink
[test: shm] Add testcase for fast reconnection (#1916)
Browse files Browse the repository at this point in the history
  • Loading branch information
DownerCase authored Jan 27, 2025
1 parent 7a34ebd commit f36ac33
Showing 1 changed file with 112 additions and 0 deletions.
112 changes: 112 additions & 0 deletions ecal/tests/cpp/pubsub_test/src/pubsub_test_shm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <atomic>
#include <functional>
#include <string>
#include <thread>

#include <gtest/gtest.h>
#include <vector>
Expand Down Expand Up @@ -149,3 +150,114 @@ TEST(core_cpp_pubsub, MultipleSendsSHM)
// finalize eCAL API
eCAL::Finalize();
}

TEST(core_cpp_pubsub, SubscriberFastReconnectionSHM) {
/* Test setup :
* publisher runs permanently in a thread
* subscriber A start reading topic
* subscriber A gets out of scope (destruction)
* Small delay less than the registration refresh
* subscriber B start reading topic again
* subscriber B gets out of scope (destruction)
* Test ensures that subscriber is reconnecting and all sync mechanism are
* working properly again. Previously the test suite was not catching a case
* where a delay between the destruction of the topic A subscriber and its
* recreation would create a subscriber that was not receiving messages.
*/

constexpr auto REGISTRATION_REFRESH_MS = 100;
constexpr auto REGISTRATION_TIMEOUT_MS = 10 * REGISTRATION_REFRESH_MS;

constexpr auto MESSAGE_SEND_DELAY =
std::chrono::milliseconds(REGISTRATION_REFRESH_MS);

// The delay between the destruction of the subscriber and its recreation
// must be between the registration refresh and the registration timeout
constexpr auto RESUBSCRIBE_DELAY =
std::chrono::milliseconds(3 * REGISTRATION_REFRESH_MS);

// Ensure we wait at least one timeout period
constexpr auto RECEIVE_TIMEOUT =
std::chrono::milliseconds(2 * REGISTRATION_TIMEOUT_MS);

constexpr auto TOPIC = "shm_reconnect_test";

// Prepare config
auto config = eCAL::Init::Configuration();
config.publisher.layer.shm.enable = true;
config.publisher.layer.tcp.enable = false;
config.publisher.layer.udp.enable = false;
config.subscriber.layer.shm.enable = true;
config.subscriber.layer.tcp.enable = false;
config.subscriber.layer.udp.enable = false;
config.registration.registration_refresh = REGISTRATION_REFRESH_MS;
config.registration.registration_timeout = REGISTRATION_TIMEOUT_MS;

eCAL::Initialize(config, "SubscriberReconnectionSHM");

std::atomic<bool> stop_publishing(false);
const auto send_messages = [&](eCAL::CPublisher &pub) {
while (!stop_publishing) {
pub.Send("Hello World");
std::this_thread::sleep_for(MESSAGE_SEND_DELAY);
}
std::cerr << "Stopped publishing\n";
};

eCAL::CPublisher publisher(TOPIC);
std::thread message_publishing_thread(send_messages, std::ref(publisher));

std::condition_variable cv;
std::mutex cv_m;
bool data_received(false);
std::unique_lock<std::mutex> cv_lk(cv_m);

const auto create_subscriber = [&]() -> eCAL::CSubscriber {
eCAL::CSubscriber subscriber(TOPIC);

const auto receive_lambda =
[&](const eCAL::STopicId & /*topic_id_*/,
const eCAL::SDataTypeInformation & /*data_type_info_*/,
const eCAL::SReceiveCallbackData & /*data_*/) {
{
std::lock_guard<std::mutex> lk(cv_m);
data_received = true;
}
// Mutex should be unlocked before notifying the condition variable
cv.notify_all();
};
subscriber.SetReceiveCallback(receive_lambda);
return subscriber;
};

const auto receive_one_message = [&]() {
// We should receive something within the timeout period
cv.wait_for(cv_lk, RECEIVE_TIMEOUT,
[&data_received]() { return data_received; });

EXPECT_TRUE(data_received) << "No messages received within the timeout";
};

// 1 - Subscribe to topic and receive a message
{
const auto subscriber = create_subscriber();
receive_one_message();
std::cerr << "Closing first subscriber scope\n";
}

// 2 - Small delay to unlink the SHM file but not timeout the observer
std::this_thread::sleep_for(RESUBSCRIBE_DELAY);
data_received = false; // Reset for next scope

// 3 - Subscribe to topic again and try to receive a message
{
const auto subscriber = create_subscriber();
receive_one_message();
std::cerr << "Closing second subscriber scope\n";
}

stop_publishing = true;
message_publishing_thread.join();

eCAL::Finalize();
}

0 comments on commit f36ac33

Please sign in to comment.