Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added doc and test for thread-safety of join/leaveMulticastGroup #11

Merged
merged 1 commit into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 159 additions & 1 deletion tests/udpcap_test/src/udpcap_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,7 @@ TEST(udpcap, MulticastReceive)
receive_thread2.join();
}

// Create and destroy a bound many Udpcap sockets with a thread waiting for a datagram
TEST(udpcap, ManySockets)
{
constexpr int num_udpcap_socket = 100;
Expand Down Expand Up @@ -805,4 +806,161 @@ TEST(udpcap, ManySockets)

// Join the send thread
send_thread.join();
}
}

// Create many Udpcap multicast sockets and join / leave multicast groups while receiving datagrams
TEST(udpcap, ManyMulticastSockets)
{
constexpr int num_udpcap_socket = 10;
constexpr int num_test_loops = 5;
constexpr char* multicast_group_1 = "225.0.0.1";
constexpr char* multicast_group_2 = "225.0.0.2";
constexpr uint16_t port = 14000;

// Create asio sockets to send datagrams to the multicast groups
asio::io_service io_service;
asio::ip::udp::socket asio_socket1(io_service, asio::ip::udp::v4());
asio::ip::udp::socket asio_socket2(io_service, asio::ip::udp::v4());
asio::ip::udp::endpoint endpoint1(asio::ip::make_address(multicast_group_1), port);
asio::ip::udp::endpoint endpoint2(asio::ip::make_address(multicast_group_2), port);
asio_socket1.set_option(asio::ip::multicast::hops(1));
asio_socket2.set_option(asio::ip::multicast::hops(1));
asio_socket1.set_option(asio::ip::multicast::enable_loopback(true));
asio_socket2.set_option(asio::ip::multicast::enable_loopback(true));

asio_socket1.connect(endpoint1);
asio_socket2.connect(endpoint2);

// Thread that constantly pushes datagrams via the asio sockets
std::thread send_thread1([&asio_socket1]()
{
std::string buffer_string = "Hello World";
while (true)
{
asio::error_code ec;
asio_socket1.send(asio::buffer(buffer_string), 0, ec);
if (ec)
{
break;
}
}
});

std::thread send_thread2([&asio_socket2]()
{
std::string buffer_string = "Hello World";
while (true)
{
asio::error_code ec;
asio_socket2.send(asio::buffer(buffer_string), 0, ec);
if (ec)
{
break;
}
}
});

// Create num_udpcap_socket udpcap sockets
std::vector<Udpcap::UdpcapSocket> udpcap_sockets;
std::vector<std::thread> receive_threads;

// Reserve space for the sockets
udpcap_sockets.reserve(num_udpcap_socket);

for (int i = 0; i < num_udpcap_socket; i++)
{
udpcap_sockets.emplace_back();
ASSERT_TRUE(udpcap_sockets.back().isValid());
const bool success = udpcap_sockets.back().bind(Udpcap::HostAddress::Any(), port);
ASSERT_TRUE(success);
udpcap_sockets.back().setMulticastLoopbackEnabled(true);

// Create a receive thread that constantly receives datagrams
receive_threads.emplace_back([&udpcap_sockets, i, multicast_group_1, multicast_group_2]()
{
while (true)
{
// Initialize variables for the sender's address and port
Udpcap::HostAddress sender_address;
uint16_t sender_port(0);
Udpcap::Error error = Udpcap::Error::ErrorCode::GENERIC_ERROR;

// Allocate buffer with max udp datagram size
std::vector<char> received_datagram;
received_datagram.resize(65536);

// blocking receive
const size_t received_bytes = udpcap_sockets[i].receiveDatagram(received_datagram.data(), received_datagram.size(), &sender_address, &sender_port, error);
received_datagram.resize(received_bytes);

if (error)
{
// Indicates that somebody closed the socket
ASSERT_EQ(error, Udpcap::Error(Udpcap::Error::ErrorCode::SOCKET_CLOSED));

// Check that the socket is closed
ASSERT_TRUE(udpcap_sockets[i].isClosed());

break;
}
else
{
// Check if the received datagram is valid and contains "Hello World"
ASSERT_FALSE(received_datagram.empty());
ASSERT_EQ(std::string(received_datagram.data(), received_datagram.size()), "Hello World");
}
}
});
}

for (int i = 0; i < num_test_loops; i++)
{
// Join the multicast group 1
for (auto& udpcap_socket : udpcap_sockets)
{
const bool success = udpcap_socket.joinMulticastGroup(Udpcap::HostAddress(multicast_group_1));
ASSERT_TRUE(success);
}

// Join the multicast group 2
for (auto& udpcap_socket : udpcap_sockets)
{
const bool success = udpcap_socket.joinMulticastGroup(Udpcap::HostAddress(multicast_group_2));
ASSERT_TRUE(success);
}

// Leave the multicast group 1
for (auto& udpcap_socket : udpcap_sockets)
{
const bool success = udpcap_socket.leaveMulticastGroup(Udpcap::HostAddress(multicast_group_1));
ASSERT_TRUE(success);
}

// Leave the multicast group 2
for (auto& udpcap_socket : udpcap_sockets)
{
const bool success = udpcap_socket.leaveMulticastGroup(Udpcap::HostAddress(multicast_group_2));
ASSERT_TRUE(success);
}
}

// Close the sockets
for (auto& udpcap_socket : udpcap_sockets)
{
udpcap_socket.close();
}

// Join the threads
for (auto& receive_thread : receive_threads)
{
receive_thread.join();
}

// Close the asio sockets
asio_socket1.close();
asio_socket2.close();

// Join the send threads
send_thread1.join();
send_thread2.join();
}
23 changes: 18 additions & 5 deletions udpcap/include/udpcap/udpcap_socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ namespace Udpcap
*
* Thread safety:
* - There must only be 1 thread calling receiveDatagram() at the same time
* - It is safe to call close() while another thread is calling receiveDatagram()
* - It is safe to call close(), join and leave multicast groups while another thread is calling receiveDatagram()
* - Other modifications to the socket must not be made while another thread is calling receiveDatagram()
*/
class UdpcapSocket
Expand Down Expand Up @@ -152,9 +152,13 @@ namespace Udpcap
*
* Thread safety:
* - This method must not be called from multiple threads at the same time
* - While one thread is calling this method, another thread may call close()
* - While one thread is calling this method, no modifications must be made to the socket (except close())
*
* - While one thread is calling this method, another thread may call one (and only one) of the following functions:
* - close()
* - joinMulticastGroup()
* - leaveMulticastGroup()
* - setMulticastLoopbackEnabled()
* - While one thread is calling this method, no other modifications must be made to the socket
*
* @param data [out]: The destination memory
* @param max_len [in]: The maximum bytes available at the destination
* @param timeout_ms [in]: Maximum time to wait for a datagram in ms. If -1, the method will block until a datagram is available
Expand Down Expand Up @@ -195,6 +199,9 @@ namespace Udpcap
* Joining a multicast group fails, when the Socket is invalid, not bound,
* the given address is not a multicast address or this Socket has already
* joined the group.
*
* Thread safety:
* - This function may be called while another thread is calling receiveDatagram()
*
* @param group_address: The multicast group to join
*
Expand All @@ -208,6 +215,9 @@ namespace Udpcap
* Leaving a multicast group fails, when the Socket is invalid, not bound,
* the given address is not a multicast address or this Socket has not
* joined the group, yet.
*
* Thread safety:
* - This function may be called while another thread is calling receiveDatagram()
*
* @param group_address: The multicast group to leave
*
Expand All @@ -219,6 +229,9 @@ namespace Udpcap
* @brief Sets whether local multicast traffic should be received
*
* If not set, the default value is true.
*
* Thread safety:
* - This function may be called while another thread is calling receiveDatagram()
*
* @param enables whether local multicast traffic should be received
*/
Expand All @@ -233,7 +246,7 @@ namespace Udpcap
* @brief Closes the socket
*
* Thread safety:
* - It is safe to call this method while another thread is calling receiveDatagram()
* - This function may be called while another thread is calling receiveDatagram()
*/
UDPCAP_EXPORT void close();

Expand Down