Skip to content

Commit

Permalink
Simplify test with new understanding
Browse files Browse the repository at this point in the history
  • Loading branch information
DownerCase committed Jan 18, 2025
1 parent 703401f commit a1a2b1b
Showing 1 changed file with 25 additions and 33 deletions.
58 changes: 25 additions & 33 deletions ecal/tests/cpp/pubsub_test/src/pubsub_test_shm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,20 +151,30 @@ TEST(core_cpp_pubsub, MultipleSendsSHM)
eCAL::Finalize();
}

TEST(core_cpp_pubsub, SubscriberReconnectionSHM) {
TEST(core_cpp_pubsub, SubscriberFastReconnectionSHM) {
/* Test setup :
* publishers runs permanently in a thread
* subscriber A start reading topic A
* publisher runs permanently in a thread
* subscriber A start reading topic
* subscriber A gets out of scope (destruction)
* subscriber B start reading topic B
* Small delay less than the registration refresh
* subscriber B start reading topic again
* subscriber B gets out of scope (destruction)
* subscriber A starts again in a new scope
* Test ensures that subscriber is reconnecting and all sync mechanism are
* working properly again. Previously the test suite was not catching a case
* where a delay between the destruction of the topic A subscriber and its
* recreation would create a subscriber that was not receiving messages.
*/

constexpr auto REGISTRATION_REFRESH_MS = 100;
constexpr auto REGISTRATION_TIMEOUT_MS = 10 * REGISTRATION_REFRESH_MS;
// The delay between the destruction of the subscriber and its recreation
// must be between the registration refresh and the registration timeout
constexpr auto RESUBSCRIBE_DELAY =
std::chrono::milliseconds(2 * REGISTRATION_REFRESH_MS);
// Ensure we wait at least one timeout period
constexpr auto RECEIVE_TIMEOUT =
std::chrono::milliseconds(2 * REGISTRATION_TIMEOUT_MS);

// Prepare config
auto config = eCAL::Init::Configuration();
config.publisher.layer.shm.enable = true;
Expand All @@ -173,13 +183,13 @@ TEST(core_cpp_pubsub, SubscriberReconnectionSHM) {
config.subscriber.layer.shm.enable = true;
config.subscriber.layer.tcp.enable = false;
config.subscriber.layer.udp.enable = false;
config.registration.registration_refresh = REGISTRATION_REFRESH_MS;
config.registration.registration_timeout = REGISTRATION_TIMEOUT_MS;

// initialize eCAL API
eCAL::Initialize(config, "SubscriberReconnectionSHM");

constexpr auto RECEIVE_TIMEOUT = std::chrono::seconds(5);
constexpr auto TOPIC_A = "shm_reconnect_test_A";
constexpr auto TOPIC_B = "shm_reconnect_test_B";
constexpr auto TOPIC = "shm_reconnect_test";

// start publishing thread
std::atomic<bool> stop_publishing(false);
Expand All @@ -191,12 +201,9 @@ TEST(core_cpp_pubsub, SubscriberReconnectionSHM) {
std::cout << "Stopped publishing" << std::endl;
};

eCAL::CPublisher pub_foo(TOPIC_A);
eCAL::CPublisher pub_foo(TOPIC);
std::thread pub_foo_t(publish_messages, std::ref(pub_foo));

eCAL::CPublisher pub_bar(TOPIC_B);
std::thread pub_bar_t(publish_messages, std::ref(pub_bar));

std::condition_variable cv;
std::mutex cv_m;
bool data_received(false);
Expand All @@ -215,9 +222,9 @@ TEST(core_cpp_pubsub, SubscriberReconnectionSHM) {
};
std::unique_lock<std::mutex> cv_lk(cv_m);

// 1 - Subscribe to topic A and receive a message
// 1 - Subscribe to topic and receive a message
{
eCAL::CSubscriber sub_foo(TOPIC_A);
eCAL::CSubscriber sub_foo(TOPIC);
sub_foo.SetReceiveCallback(receive_lambda);

// We should receive something within the timeout period
Expand All @@ -227,28 +234,14 @@ TEST(core_cpp_pubsub, SubscriberReconnectionSHM) {
EXPECT_TRUE(data_received);
std::cout << "Closing first subscriber scope (A)" << std::endl;
}

data_received = false; // Reset for next scope

// 2 - Subscribe to topic B and receive a message
{
eCAL::CSubscriber sub_foo(TOPIC_B);
sub_foo.SetReceiveCallback(receive_lambda);

// We should receive something within the timeout period
cv.wait_for(cv_lk, RECEIVE_TIMEOUT,
[&data_received]() { return data_received; });

EXPECT_TRUE(data_received);
std::cout << "Closing second subscriber scope (B)" << std::endl;
}

data_received = false; // Reset for next scope
// 2 - Small delay to unlink the SHM file but not timeout the observer
std::this_thread::sleep_for(RESUBSCRIBE_DELAY);

// 3 - Subscribe to topic A again and receive a message
// TODO: Figure out why this now fails
// 3 - Subscribe to topic again and try to receive a message
{
eCAL::CSubscriber sub_foo(TOPIC_A);
eCAL::CSubscriber sub_foo(TOPIC);
sub_foo.SetReceiveCallback(receive_lambda);

// We should receive something within the timeout period
Expand All @@ -262,7 +255,6 @@ TEST(core_cpp_pubsub, SubscriberReconnectionSHM) {
// stop publishing and join thread
stop_publishing = true;
pub_foo_t.join();
pub_bar_t.join();

// finalize eCAL API
// without destroying any pub / sub
Expand Down

0 comments on commit a1a2b1b

Please sign in to comment.