diff --git a/.github/workflows/rolling-binary-build-win.yml b/.github/workflows/rolling-binary-build-win.yml index 836da241..5d0f8b4e 100644 --- a/.github/workflows/rolling-binary-build-win.yml +++ b/.github/workflows/rolling-binary-build-win.yml @@ -32,3 +32,4 @@ jobs: ros_distro: jazzy ref_for_scheduled_build: master os_name: windows-2019 + install_boost: true diff --git a/CMakeLists.txt b/CMakeLists.txt index 8540aa63..9ccbe4c0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -30,6 +30,14 @@ foreach(Dependency IN ITEMS ${THIS_PACKAGE_INCLUDE_DEPENDS}) find_package(${Dependency} REQUIRED) endforeach() +if(POLICY CMP0167) + cmake_policy(SET CMP0167 NEW) +endif() +find_package(Boost COMPONENTS headers) +if(NOT Boost_headers_FOUND) + find_package(Boost REQUIRED) +endif() + add_library(realtime_tools SHARED src/realtime_clock.cpp src/realtime_helpers.cpp @@ -74,6 +82,15 @@ if(BUILD_TESTING) ament_add_gmock(realtime_buffer_tests test/realtime_buffer_tests.cpp) target_link_libraries(realtime_buffer_tests realtime_tools) + ament_add_gmock(lock_free_queue_tests test/lock_free_queue_tests.cpp) + if(WIN32) + # atomic is not found on Windows, but also not needed + target_link_libraries(lock_free_queue_tests realtime_tools Boost::boost) + else() + # without adding atomic, clang throws a linker error + target_link_libraries(lock_free_queue_tests realtime_tools atomic Boost::boost) + endif() + ament_add_gmock(realtime_clock_tests test/realtime_clock_tests.cpp) target_link_libraries(realtime_clock_tests realtime_tools) diff --git a/include/realtime_tools/lock_free_queue.hpp b/include/realtime_tools/lock_free_queue.hpp new file mode 100644 index 00000000..4131577c --- /dev/null +++ b/include/realtime_tools/lock_free_queue.hpp @@ -0,0 +1,333 @@ +// Copyright 2025 PAL Robotics S.L. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/// \author Sai Kishor Kothakota + +#ifndef REALTIME_TOOLS__LOCK_FREE_QUEUE_HPP_ +#define REALTIME_TOOLS__LOCK_FREE_QUEUE_HPP_ + +#include +#include +#include +#include + +#include +#include +#include + +namespace +{ +// Trait to check if the capacity is set +template +struct has_capacity : std::false_type +{ +}; + +template +struct has_capacity>> +: std::true_type +{ +}; + +template +struct has_capacity>> : std::true_type +{ +}; + +template +struct has_capacity, boost::lockfree::fixed_sized>> : std::true_type +{ +}; + +// Traits to check if the queue is spsc_queue +template +struct is_spsc_queue : std::false_type +{ +}; + +template +struct is_spsc_queue> : std::true_type +{ +}; + +template +struct is_spsc_queue>> +: std::true_type +{ +}; + +// Traits to get the capacity of the queue +// Default case: no capacity +template +struct get_boost_lockfree_queue_capacity +{ + static constexpr std::size_t value = 0; // Default to 0 if capacity is not defined +}; + +// Specialization for queues with capacity +template +struct get_boost_lockfree_queue_capacity< + boost::lockfree::spsc_queue>> +{ + static constexpr std::size_t value = Capacity; +}; + +// Specialization for queues with capacity +template +struct get_boost_lockfree_queue_capacity< + boost::lockfree::queue>> +{ + static constexpr std::size_t value = Capacity; +}; + +// Specialization for queues with capacity +template +struct get_boost_lockfree_queue_capacity, boost::lockfree::fixed_sized>> +{ + static constexpr std::size_t value = Capacity; +}; + +} // namespace + +namespace realtime_tools +{ +template +class LockFreeQueueBase +/** + * @brief A base class for lock-free queues. + * + * This class provides a base implementation for lock-free queues with various functionalities + * such as pushing, popping, and checking the state of the queue. It supports both single-producer + * single-consumer (SPSC) and multiple-producer multiple-consumer (MPMC) queues. + * + * @tparam DataType The type of data to be stored in the queue. + * @tparam LockFreeContainer The underlying lock-free container type - Typically boost::lockfree::spsc_queue or boost::lockfree::queue with their own template parameters + */ +{ +public: + using T = DataType; + + /** + * @brief Construct a new LockFreeQueueBase object + * @note This constructor is enabled only if the LockFreeContainer has a capacity set + */ + template < + bool HasCapacity = has_capacity::value, + typename std::enable_if_t = 0> + LockFreeQueueBase() : capacity_(get_boost_lockfree_queue_capacity::value) + { + } + + /** + * @brief Construct a new LockFreeQueueBase object + * @param capacity Capacity of the queue + * @note This constructor is enabled only if the LockFreeContainer has no capacity set + */ + template < + bool HasCapacity = has_capacity::value, + typename std::enable_if_t = 1> + explicit LockFreeQueueBase(std::size_t capacity) : data_queue_(capacity), capacity_(capacity) + { + } + + virtual ~LockFreeQueueBase() = default; + + /** + * @brief Pop the data from the queue + * @param data Data to be popped + * @return true If the data is popped successfully + * @return false If the queue is empty or the data could not be popped + */ + [[nodiscard]] bool pop(T & data) { return data_queue_.pop(data); } + + /** + * @brief Pop the data from the queue + * @param data Data to be popped + * @return true If the data is popped successfully + * @return false If the queue is empty or the data could not be popped + * @note This function is enabled only if the data type is convertible to the template type of the queue + */ + template + [[nodiscard]] std::enable_if_t, bool> pop(U & data) + { + return data_queue_.pop(data); + } + + /** + * @brief Push the data into the queue + * @param data Data to be pushed + * @return true If the data is pushed successfully + * @return false If the queue is full or the data could not be pushed + */ + + [[nodiscard]] bool push(const T & data) + { + if constexpr (is_spsc_queue::value) { + return data_queue_.push(data); + } else { + return data_queue_.bounded_push(data); + } + } + + /** + * @brief Push the data into the queue + * @param data Data to be pushed + * @return true If the data is pushed successfully + * @return false If the queue is full or the data could not be pushed + * @note This function is enabled only if the data type is convertible to the template type of the queue + */ + template + [[nodiscard]] std::enable_if_t, bool> push(const U & data) + { + if constexpr (is_spsc_queue::value) { + return data_queue_.push(data); + } else { + return data_queue_.bounded_push(data); + } + } + + /** + * @brief The bounded_push function pushes the data into the queue and pops the oldest data if the queue is full + * @param data Data to be pushed + * @return true If the data is pushed successfully + * @return false If the data could not be pushed + * @note This function is enabled only if the queue is a spsc_queue and only if the data type + * is convertible to the template type of the queue + * @note For a SPSC Queue, to be used in single-threaded applications + * @warning For a SPSC Queue, this method might not work as expected when used in multi-threaded applications + * if it is used with two different threads, one doing bounded_push and the other doing pop. In this case, the + * queue is no longer a single producer single consumer queue. So, the behavior might not be as expected. + */ + template + [[nodiscard]] std::enable_if_t, bool> bounded_push(const U & data) + { + if (!push(data)) { + T dummy; + data_queue_.pop(dummy); + return push(data); + } + return true; + } + + /** + * @brief Get the latest data from the queue + * @param data Data to be filled with the latest data + * @return true If the data is filled with the latest data, false otherwise + * @return false If the queue is empty + * @note This function consumes all the data in the queue until the last data and returns the last element of the queue + */ + [[nodiscard]] bool get_latest(T & data) + { + return data_queue_.consume_all([&data](const T & d) { data = d; }) > 0; + } + + /** + * @brief Check if the queue is empty + * @return true If the queue is empty + * @return false If the queue is not empty + * @note The result is only accurate, if no other thread modifies the queue. Therefore, it is rarely practical to use this value in program logic. + * @note For a SPSC Queue, it should only be called from the consumer thread where pop is called. If need to be called from producer thread, use write_available() instead. + */ + bool empty() const + { + if constexpr (is_spsc_queue::value) { + return data_queue_.read_available() == 0; + } else { + return data_queue_.empty(); + } + } + + /** + * @brief Get the capacity of the queue + * @return std::size_t Capacity of the queue + */ + size_t capacity() const { return capacity_; } + + /** + * @brief Get the size of the queue + * @return std::size_t Size of the queue + * @note This function is enabled only if the queue is a spsc_queue + */ + template < + bool IsSPSCQueue = is_spsc_queue::value, + typename std::enable_if_t = 0> + std::size_t size() const + { + return data_queue_.read_available(); + } + + /** + * @brief The method to check if the queue is lock free + * @return true If the queue is lock free, false otherwise + * @warning It only checks, if the queue head and tail nodes and the freelist can + * be modified in a lock-free manner. On most platforms, the whole implementation + * is lock-free, if this is true. Using c++0x-style atomics, there is no possibility + * to provide a completely accurate implementation, because one would need to test + * every internal node, which is impossible if further nodes will be allocated from + * the operating system. + * @link https://www.boost.org/doc/libs/1_74_0/doc/html/boost/lockfree/queue.html + */ + bool is_lock_free() const + { + return (is_spsc_queue::value) || data_queue_.is_lock_free(); + } + + /** + * @brief Get the lockfree container + * @return const LockFreeContainer& Reference to the lockfree container + */ + const LockFreeContainer & get_lockfree_container() const { return data_queue_; } + + /** + * @brief Get the lockfree container + * @return LockFreeContainer& Reference to the lockfree container + */ + LockFreeContainer & get_lockfree_container() { return data_queue_; } + +private: + LockFreeContainer data_queue_; + std::size_t capacity_; +}; // class + +/** + * @brief Lock-free Single Producer Single Consumer Queue + * @tparam DataType Type of the data to be stored in the queue + * @tparam Capacity Capacity of the queue + */ +template +using LockFreeSPSCQueue = std::conditional_t< + Capacity == 0, LockFreeQueueBase>, + LockFreeQueueBase< + DataType, boost::lockfree::spsc_queue>>>; + +/** + * @brief Lock-free Multiple Producer Multiple Consumer Queue + * @tparam DataType Type of the data to be stored in the queue + * @tparam Capacity Capacity of the queue + * @tparam FixedSize Fixed size of the queue + */ +template +using LockFreeMPMCQueue = std::conditional_t< + Capacity == 0, + LockFreeQueueBase< + DataType, boost::lockfree::queue>>, + LockFreeQueueBase< + DataType, + boost::lockfree::queue< + DataType, boost::lockfree::capacity, boost::lockfree::fixed_sized>>>; + +} // namespace realtime_tools +#endif // REALTIME_TOOLS__LOCK_FREE_QUEUE_HPP_ diff --git a/package.xml b/package.xml index 599a1768..50829129 100644 --- a/package.xml +++ b/package.xml @@ -19,6 +19,7 @@ ament_cmake + libboost-dev rclcpp rclcpp_action libcap-dev diff --git a/test/lock_free_queue_tests.cpp b/test/lock_free_queue_tests.cpp new file mode 100644 index 00000000..a4c26dcd --- /dev/null +++ b/test/lock_free_queue_tests.cpp @@ -0,0 +1,453 @@ +// Copyright 2025 PAL Robotics S.L. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include + +using realtime_tools::LockFreeMPMCQueue; +using realtime_tools::LockFreeSPSCQueue; + +class DefaultConstructable +{ +public: + DefaultConstructable() : number_(42) {} + int number_; +}; + +TEST(LockFreeSPSCQueue, default_construct) +{ + const auto default_construct_test_lambda = [](auto & queue) { + DefaultConstructable obj1; + ASSERT_EQ(10, queue.capacity()); + ASSERT_TRUE(queue.empty()) << "Buffer should be empty"; + ASSERT_FALSE(queue.pop(obj1)) << "Buffer should be empty"; + ASSERT_TRUE(queue.push(DefaultConstructable())) << "Buffer should have space for one element"; + ASSERT_EQ(10, queue.capacity()); + ASSERT_EQ(1, queue.size()) << "Buffer should have one element"; + ASSERT_TRUE(queue.pop(obj1)); + ASSERT_TRUE(queue.is_lock_free()); + ASSERT_EQ(10, queue.capacity()); + ASSERT_EQ(42, obj1.number_); + }; + + LockFreeSPSCQueue queue_1(10); + LockFreeSPSCQueue queue_2; + default_construct_test_lambda(queue_1); + default_construct_test_lambda(queue_2); +} + +TEST(LockFreeSPSCQueue, initialize_value) +{ + const auto initialize_value_test = [](auto & queue) { + ASSERT_TRUE(queue.empty()) << "Buffer should be empty"; + ASSERT_TRUE(queue.push(3.14)) << "Buffer should have space for one element"; + ASSERT_EQ(1, queue.size()) << "Buffer should have one element"; + ASSERT_TRUE(queue.is_lock_free()); + double obj1; + ASSERT_TRUE(queue.pop(obj1)); + ASSERT_DOUBLE_EQ(3.14, obj1); + ASSERT_EQ(0, queue.size()) << "Buffer should be empty"; + ASSERT_TRUE(queue.empty()); + ASSERT_TRUE(queue.push(2.71)) << "Buffer should have space for one element"; + ASSERT_EQ(1, queue.size()) << "Buffer should have one element"; + int obj2; + ASSERT_TRUE(queue.pop(obj2)); + ASSERT_EQ(2, obj2); + ASSERT_EQ(0, queue.size()) << "Buffer should be empty"; + ASSERT_TRUE(queue.empty()); + ASSERT_TRUE(queue.push(6)); + ASSERT_TRUE(queue.pop(obj1)); + ASSERT_EQ(6, obj1); + }; + + LockFreeSPSCQueue queue_1; + LockFreeSPSCQueue queue_2(10); + initialize_value_test(queue_1); + initialize_value_test(queue_2); +} + +TEST(LockFreeSPSCQueue, test_push) +{ + const auto push_test_lambda = [](auto & queue) { + ASSERT_TRUE(queue.empty()) << "Buffer should be empty"; + for (auto i = 1; i <= 10; i++) { + ASSERT_TRUE(queue.push(i)) << "Buffer should have space for element as size is 10"; + ASSERT_EQ(i, queue.size()); + ASSERT_EQ(10, queue.capacity()); + } + ASSERT_FALSE(queue.push(11)) << "Buffer should not have space for element as size is 10"; + ASSERT_EQ(10, queue.size()); + ASSERT_FALSE(queue.empty()); + }; + + LockFreeSPSCQueue queue_1; + LockFreeSPSCQueue queue_2(10); + push_test_lambda(queue_1); + push_test_lambda(queue_2); +} + +TEST(LockFreeSPSCQueue, test_pop) +{ + const auto pop_test_lambda = [](auto & queue) { + ASSERT_TRUE(queue.empty()) << "Buffer should be empty"; + for (auto i = 1; i <= 10; i++) { + ASSERT_TRUE(queue.push(i)) << "Buffer should have space for element as size is 10"; + ASSERT_EQ(i, queue.size()); + } + for (auto i = 1; i <= 10; i++) { + double obj1 = -10; + ASSERT_TRUE(queue.pop(obj1)); + ASSERT_EQ(i, obj1); + ASSERT_EQ(10 - i, queue.size()); + } + double obj1; + ASSERT_TRUE(queue.empty()) << "Buffer should be empty"; + ASSERT_FALSE(queue.pop(obj1)) << "Buffer should be empty"; + }; + + LockFreeSPSCQueue queue_1; + LockFreeSPSCQueue queue_2(10); + pop_test_lambda(queue_1); + pop_test_lambda(queue_2); +} + +TEST(LockFreeSPSCQueue, test_get_latest) +{ + const auto get_latest_test = [](auto & queue) { + ASSERT_TRUE(queue.empty()) << "Buffer should be empty"; + for (auto i = 1; i <= 10; i++) { + ASSERT_TRUE(queue.push(i)) << "Buffer should have space for element as size is 10"; + ASSERT_EQ(i, queue.size()); + } + double obj1 = -10; + ASSERT_TRUE(queue.get_latest(obj1)); + ASSERT_EQ(10, obj1); + ASSERT_TRUE(queue.empty()) << "Buffer should be empty"; + ASSERT_EQ(0, queue.size()); + ASSERT_FALSE(queue.get_latest(obj1)); + }; + + LockFreeSPSCQueue queue_1; + LockFreeSPSCQueue queue_2(10); + get_latest_test(queue_1); + get_latest_test(queue_2); +} + +TEST(LockFreeSPSCQueue, test_bounded_push) +{ + const auto bounded_push_test = [](auto & queue) { + ASSERT_TRUE(queue.empty()) << "Buffer should be empty"; + for (auto i = 1; i <= 25; i++) { + ASSERT_TRUE(queue.bounded_push(i)) << "Buffer should have space for element as size is 10"; + ASSERT_EQ(std::min(i, 10), queue.size()); + ASSERT_EQ(10, queue.capacity()); + } + ASSERT_EQ(10, queue.size()); + ASSERT_EQ(10, queue.capacity()); + + // when we start popping, the pop-ed elements should start from 16 + for (auto i = 1u; i <= queue.capacity(); i++) { + double obj1; + ASSERT_TRUE(queue.pop(obj1)); + ASSERT_EQ(i + 15, obj1); + ASSERT_EQ(queue.capacity() - i, queue.size()); + } + double obj1; + ASSERT_FALSE(queue.pop(obj1)); + ASSERT_TRUE(queue.empty()) << "Buffer should be empty"; + }; + + LockFreeSPSCQueue queue_1; + LockFreeSPSCQueue queue_2(10); + bounded_push_test(queue_1); + bounded_push_test(queue_2); +} + +TEST(LockFreeSPSCQueue, test_lockfree_queue_push) +{ + const auto lockfree_queue_push_test = [](auto & queue) { + int producer_count = 0; + std::atomic_int consumer_count(0); + std::atomic_bool done(false); + + const int iterations = 1000000; + + std::thread producer([&]() { + for (int i = 0; i < iterations; ++i) { + while (!queue.push(i)) { + std::this_thread::yield(); + } + ++producer_count; + } + }); + + std::thread consumer([&]() { + int value; + while (!done) { + while (queue.pop(value)) { + ++consumer_count; + } + } + while (queue.pop(value)) { + ++consumer_count; + } + }); + + producer.join(); + done = true; + consumer.join(); + + ASSERT_EQ(producer_count, consumer_count); + ASSERT_EQ(producer_count, iterations); + ASSERT_EQ(consumer_count, iterations); + }; + + LockFreeSPSCQueue queue_1(100); + LockFreeSPSCQueue queue_2; + lockfree_queue_push_test(queue_1); + lockfree_queue_push_test(queue_2); +} + +TEST(LockFreeMPMCQueue, default_construct) +{ + const auto default_construct_test = [](auto & queue) { + DefaultConstructable obj1; + ASSERT_EQ(10, queue.capacity()); + ASSERT_TRUE(queue.empty()) << "Buffer should be empty"; + ASSERT_FALSE(queue.pop(obj1)) << "Buffer should be empty"; + ASSERT_TRUE(queue.push(DefaultConstructable())) << "Buffer should have space for one element"; + ASSERT_EQ(10, queue.capacity()); + ASSERT_TRUE(queue.is_lock_free()); + ASSERT_TRUE(queue.pop(obj1)); + ASSERT_EQ(10, queue.capacity()); + ASSERT_EQ(42, obj1.number_); + }; + + LockFreeMPMCQueue queue_1; + LockFreeMPMCQueue queue_2(10); + default_construct_test(queue_1); + default_construct_test(queue_2); +} + +TEST(LockFreeMPMCQueue, initialize_value) +{ + const auto initialize_value_test = [](auto & queue) { + ASSERT_TRUE(queue.empty()) << "Buffer should be empty"; + ASSERT_TRUE(queue.push(3.14)) << "Buffer should have space for one element"; + ASSERT_TRUE(queue.is_lock_free()); + double obj1; + ASSERT_TRUE(queue.pop(obj1)); + ASSERT_DOUBLE_EQ(3.14, obj1); + ASSERT_TRUE(queue.empty()) << "Buffer should be empty"; + ASSERT_TRUE(queue.push(2.71)) << "Buffer should have space for one element"; + int obj2; + ASSERT_TRUE(queue.pop(obj2)); + ASSERT_EQ(2, obj2); + ASSERT_TRUE(queue.empty()); + ASSERT_TRUE(queue.push(6)); + ASSERT_TRUE(queue.pop(obj1)); + ASSERT_EQ(6, obj1); + }; + + LockFreeMPMCQueue queue_1; + LockFreeMPMCQueue queue_2(10); + initialize_value_test(queue_1); + initialize_value_test(queue_2); +} + +TEST(LockFreeMPMCQueue, test_push) +{ + const auto push_test_lambda = [](auto & queue) { + ASSERT_TRUE(queue.empty()) << "Buffer should be empty"; + for (auto i = 1; i <= 10; i++) { + ASSERT_TRUE(queue.push(i)) << "Buffer should have space for element as size is 10"; + ASSERT_EQ(10, queue.capacity()); + } + ASSERT_FALSE(queue.push(11)) << "Buffer should not have space for element as size is 10"; + ASSERT_FALSE(queue.empty()); + }; + + LockFreeMPMCQueue queue_1; + LockFreeMPMCQueue queue_2(10); + push_test_lambda(queue_1); + push_test_lambda(queue_2); +} + +TEST(LockFreeMPMCQueue, test_pop) +{ + const auto pop_test_lambda = [](auto & queue) { + ASSERT_TRUE(queue.empty()) << "Buffer should be empty"; + for (auto i = 1; i <= 10; i++) { + ASSERT_TRUE(queue.push(i)) << "Buffer should have space for element as size is 10"; + } + for (auto i = 1; i <= 10; i++) { + double obj1; + ASSERT_TRUE(queue.pop(obj1)); + ASSERT_EQ(i, obj1); + } + ASSERT_TRUE(queue.empty()) << "Buffer should be empty"; + double obj1; + ASSERT_FALSE(queue.pop(obj1)) << "Buffer should be empty"; + }; + + LockFreeMPMCQueue queue_1; + LockFreeMPMCQueue queue_2(10); + pop_test_lambda(queue_1); + pop_test_lambda(queue_2); +} + +TEST(LockFreeMPMCQueue, test_get_latest) +{ + const auto get_latest_test_lambda = [](auto & queue) { + ASSERT_TRUE(queue.empty()) << "Buffer should be empty"; + for (auto i = 1; i <= 10; i++) { + ASSERT_TRUE(queue.push(i)) << "Buffer should have space for element as size is 10"; + } + ASSERT_FALSE(queue.empty()) << "Buffer should not be empty"; + double obj1; + ASSERT_TRUE(queue.get_latest(obj1)); + ASSERT_EQ(10, obj1); + ASSERT_TRUE(queue.empty()) << "Buffer should be empty"; + ASSERT_FALSE(queue.get_latest(obj1)); + }; + + LockFreeMPMCQueue queue_1; + LockFreeMPMCQueue queue_2(10); + get_latest_test_lambda(queue_1); + get_latest_test_lambda(queue_2); +} + +TEST(LockFreeMPMCQueue, test_bounded_push) +{ + const auto bounded_push_test = [](auto & queue) { + ASSERT_TRUE(queue.empty()) << "Buffer should be empty"; + for (auto i = 1; i <= 25; i++) { + ASSERT_TRUE(queue.bounded_push(i)) << "Buffer should have space for element as size is 10"; + ASSERT_EQ(10, queue.capacity()); + } + ASSERT_EQ(10, queue.capacity()); + + // when we start popping, the pop-ed elements should start from 16 + for (auto i = 1u; i <= queue.capacity(); i++) { + double obj1 = -10.0; + ASSERT_TRUE(queue.pop(obj1)); + ASSERT_EQ(i + 15, obj1); + } + double obj1; + ASSERT_FALSE(queue.pop(obj1)); + ASSERT_TRUE(queue.empty()) << "Buffer should be empty"; + }; + + LockFreeMPMCQueue queue_1; + LockFreeMPMCQueue queue_2(10); + bounded_push_test(queue_1); + bounded_push_test(queue_2); +} + +TEST(LockFreeMPMCQueue, test_lockfree_queue_push) +{ + const auto test_queue = [](auto & queue) { + int producer_count = 0; + std::atomic_int consumer_count(0); + std::atomic_bool done(false); + + const int iterations = 1000000; + + std::thread producer([&]() { + for (int i = 0; i < iterations; ++i) { + while (!queue.push(i)) { + std::this_thread::yield(); + } + ++producer_count; + } + }); + + std::thread consumer([&]() { + int value; + while (!done) { + while (queue.pop(value)) { + ++consumer_count; + } + } + while (queue.pop(value)) { + ++consumer_count; + } + }); + + producer.join(); + done = true; + consumer.join(); + + ASSERT_EQ(producer_count, consumer_count); + ASSERT_EQ(producer_count, iterations); + ASSERT_EQ(consumer_count, iterations); + }; + + LockFreeMPMCQueue queue_1; + LockFreeMPMCQueue queue_2(100); + test_queue(queue_1); + test_queue(queue_2); +} + +TEST(LockFreeMPMCQueue, test_lockfree_queue_bounded_push) +{ + const auto bounded_push_test = [](auto & queue) { + std::atomic_int producer_count = 0; + std::atomic_int consumer_count(0); + std::atomic_bool done(false); + + const int iterations = 1000000; + + std::thread producer([&]() { + for (auto j = 0; j < iterations; ++j) { + ASSERT_TRUE(queue.bounded_push(j)); + ASSERT_EQ(100, queue.capacity()); + ++producer_count; + } + }); + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + std::cerr << "producer_count: " << producer_count << std::endl; + + std::thread consumer([&]() { + int value; + while (!done && queue.pop(value)) { + ++consumer_count; + std::this_thread::yield(); + } + while (queue.pop(value)) { + ++consumer_count; + } + }); + + producer.join(); + done = true; + consumer.join(); + + std::cerr << "producer_count: " << producer_count << std::endl; + std::cerr << "consumer_count: " << consumer_count << std::endl; + std::cerr << "iterations: " << iterations << std::endl; + ASSERT_GT(producer_count, consumer_count); + ASSERT_EQ(producer_count, iterations); + ASSERT_GT(iterations, consumer_count); + }; + + LockFreeMPMCQueue queue_1; + LockFreeMPMCQueue queue_2(100); + bounded_push_test(queue_1); + bounded_push_test(queue_2); +}