Skip to content

Commit

Permalink
test: Enhance reconnection test case to trigger failure
Browse files Browse the repository at this point in the history
  • Loading branch information
DownerCase committed Jan 17, 2025
1 parent a77ae1f commit c6db50f
Showing 1 changed file with 65 additions and 29 deletions.
94 changes: 65 additions & 29 deletions ecal/tests/cpp/pubsub_test/src/pubsub_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -440,65 +440,101 @@ TEST(core_cpp_pubsub /*unused*/, DISABLED_DestroyInCallback /*unused*/)
TEST(core_cpp_pubsub, SubscriberReconnection)
{
/* Test setup :
* publisher runs permanently in a thread
* subscriber start reading
* subscriber gets out of scope (destruction)
* subscriber starts again in a new scope
* publishers runs permanently in a thread
* subscriber A start reading topic A
* subscriber A gets out of scope (destruction)
* subscriber B start reading topic B
* subscriber B gets out of scope (destruction)
* subscriber A starts again in a new scope
* Test ensures that subscriber is reconnecting and all sync mechanism are working properly again.
* Previously this test was not catching a case where a delay between the destruction of the
* topic A subscriber and its recreation would create a subscriber that was not receiving messages.
*/

// initialize eCAL API
eCAL::Initialize("SubscriberReconnection");

constexpr auto RECEIVE_TIMEOUT = std::chrono::seconds(5);

// start publishing thread
std::atomic<bool> stop_publishing(false);
eCAL::CPublisher pub_foo("foo");
std::thread pub_foo_t([&pub_foo, &stop_publishing]() {
while (!stop_publishing)
{
pub_foo.Send("Hello World");
const auto publish_messages = [&stop_publishing](eCAL::CPublisher &pub) {
while (!stop_publishing) {
pub.Send("Hello World");
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
std::cout << "Stopped publishing" << std::endl;
});
};

// scope 1
eCAL::CPublisher pub_foo("foo");
std::thread pub_foo_t(publish_messages, std::ref(pub_foo));

eCAL::CPublisher pub_bar("bar");
std::thread pub_bar_t(publish_messages, std::ref(pub_bar));

std::condition_variable cv;
std::mutex cv_m;
bool data_received(false);

const auto receive_lambda =
[&cv_m, &cv,
&data_received](const eCAL::Registration::STopicId & /*topic_id_*/,
const eCAL::SDataTypeInformation & /*data_type_info_*/,
const eCAL::SReceiveCallbackData & /*data_*/) {
{
std::cout << "Callback received message" << std::endl;
std::lock_guard<std::mutex> lk(cv_m);
data_received = true;
}
cv.notify_all();
};
std::unique_lock<std::mutex> cv_lk(cv_m);

// 1 - Subscribe to topic A and receive a message
{
size_t callback_received_count(0);

eCAL::CSubscriber sub_foo("foo");
auto receive_lambda = [&callback_received_count](const eCAL::Registration::STopicId& /*topic_id_*/, const eCAL::SDataTypeInformation& /*data_type_info_*/, const eCAL::SReceiveCallbackData& /*data_*/) {
std::cout << "Receiving in scope 1" << std::endl;
callback_received_count++;
};
sub_foo.SetReceiveCallback(receive_lambda);

// sleep for 3 seconds, we should receive something
std::this_thread::sleep_for(std::chrono::seconds(3));
// We should receive something within the timeout period
cv.wait_for(cv_lk,RECEIVE_TIMEOUT, [&data_received]() { return data_received; });

EXPECT_TRUE(callback_received_count > 0);
EXPECT_TRUE(data_received);
std::cout << "Closing first subscriber scope (foo)" << std::endl;
}

// scope 2
data_received = false; // Reset for next scope

// 2 - Subscribe to topic B and receive a message
{
size_t callback_received_count(0);
eCAL::CSubscriber sub_foo("bar");
sub_foo.SetReceiveCallback(receive_lambda);

// We should receive something within the timeout period
cv.wait_for(cv_lk,RECEIVE_TIMEOUT, [&data_received]() { return data_received; });

EXPECT_TRUE(data_received);
std::cout << "Closing second subscriber scope (bar)" << std::endl;
}

data_received = false; // Reset for next scope

// 3 - Subscribe to topic A again and receive a message
// TODO: Figure out why this now fails
{
eCAL::CSubscriber sub_foo("foo");
auto receive_lambda = [&callback_received_count](const eCAL::Registration::STopicId& /*topic_id_*/, const eCAL::SDataTypeInformation& /*data_type_info_*/, const eCAL::SReceiveCallbackData& /*data_*/) {
std::cout << "Receiving in scope 2" << std::endl;
callback_received_count++;
};
sub_foo.SetReceiveCallback(receive_lambda);

// sleep for 3 seconds, we should receive something
std::this_thread::sleep_for(std::chrono::seconds(3));
// We should receive something within the timeout period
cv.wait_for(cv_lk,RECEIVE_TIMEOUT, [&data_received]() { return data_received; });

EXPECT_TRUE(callback_received_count > 0);
EXPECT_TRUE(data_received);
std::cout << "Closing third subscriber scope (foo again)" << std::endl;
}

// stop publishing and join thread
stop_publishing = true;
pub_foo_t.join();
pub_bar_t.join();

// finalize eCAL API
// without destroying any pub / sub
Expand Down

0 comments on commit c6db50f

Please sign in to comment.