From 561d370b9af1ead2327850f1d488fafd072ded2e Mon Sep 17 00:00:00 2001 From: Sai Kishor Kothakota Date: Thu, 19 Dec 2024 22:59:48 +0100 Subject: [PATCH 01/32] add libboost-dev dependency --- package.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/package.xml b/package.xml index 599a1768..3e179b6a 100644 --- a/package.xml +++ b/package.xml @@ -19,6 +19,7 @@ ament_cmake + libboost-dev rclcpp rclcpp_action libcap-dev From 2455b16ab2e614aff64f3f9595337bca925b80ea Mon Sep 17 00:00:00 2001 From: Sai Kishor Kothakota Date: Fri, 20 Dec 2024 00:19:17 +0100 Subject: [PATCH 02/32] Add LockFreeBuffer class --- include/realtime_tools/lock_free_buffer.hpp | 74 +++++++++++++++++++++ 1 file changed, 74 insertions(+) create mode 100644 include/realtime_tools/lock_free_buffer.hpp diff --git a/include/realtime_tools/lock_free_buffer.hpp b/include/realtime_tools/lock_free_buffer.hpp new file mode 100644 index 00000000..8b69dbf0 --- /dev/null +++ b/include/realtime_tools/lock_free_buffer.hpp @@ -0,0 +1,74 @@ +// Copyright 2024 PAL Robotics S.L. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/// \author Sai Kishor Kothakota + +#ifndef REALTIME_TOOLS__LOCK_FREE_BUFFER_HPP_ +#define REALTIME_TOOLS__LOCK_FREE_BUFFER_HPP_ + +#include +#include +#include +#include +#include + +namespace realtime_tools +{ +template +class LockFreeBufferBase +{ +public: + LockFreeBufferBase() : new_data_available_(false) + { + } + + /** + * @brief Constructor for objects that don't have + * a default constructor + * @param data The object to use as default value + */ + explicit LockFreeBufferBase(const T & data) + { + } + + virtual ~LockFreeBufferBase() = default; + + T readFromRT() + { + return realtime_data_.pop(); + } + + T readFromNonRT() const + { + } + + void writeFromNonRT(const T & data) + { + } + + void initialize(const T & data) + { + data_buffer_.push(data); + } + + void reset() + { + } + +private: +LockFreeContainer data_buffer_; +}; // class + +} // namespace realtime_tools +#endif // REALTIME_TOOLS__LOCK_FREE_BUFFER_HPP_ From 02db30d62c7816ef9bb8d5abbd4ce04fdf235f99 Mon Sep 17 00:00:00 2001 From: Sai Kishor Kothakota Date: Sat, 28 Dec 2024 22:13:45 +0100 Subject: [PATCH 03/32] Add LockFreeSPSCQueueBase class --- include/realtime_tools/lock_free_buffer.hpp | 74 -------------- include/realtime_tools/lock_free_queue.hpp | 108 ++++++++++++++++++++ 2 files changed, 108 insertions(+), 74 deletions(-) delete mode 100644 include/realtime_tools/lock_free_buffer.hpp create mode 100644 include/realtime_tools/lock_free_queue.hpp diff --git a/include/realtime_tools/lock_free_buffer.hpp b/include/realtime_tools/lock_free_buffer.hpp deleted file mode 100644 index 8b69dbf0..00000000 --- a/include/realtime_tools/lock_free_buffer.hpp +++ /dev/null @@ -1,74 +0,0 @@ -// Copyright 2024 PAL Robotics S.L. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/// \author Sai Kishor Kothakota - -#ifndef REALTIME_TOOLS__LOCK_FREE_BUFFER_HPP_ -#define REALTIME_TOOLS__LOCK_FREE_BUFFER_HPP_ - -#include -#include -#include -#include -#include - -namespace realtime_tools -{ -template -class LockFreeBufferBase -{ -public: - LockFreeBufferBase() : new_data_available_(false) - { - } - - /** - * @brief Constructor for objects that don't have - * a default constructor - * @param data The object to use as default value - */ - explicit LockFreeBufferBase(const T & data) - { - } - - virtual ~LockFreeBufferBase() = default; - - T readFromRT() - { - return realtime_data_.pop(); - } - - T readFromNonRT() const - { - } - - void writeFromNonRT(const T & data) - { - } - - void initialize(const T & data) - { - data_buffer_.push(data); - } - - void reset() - { - } - -private: -LockFreeContainer data_buffer_; -}; // class - -} // namespace realtime_tools -#endif // REALTIME_TOOLS__LOCK_FREE_BUFFER_HPP_ diff --git a/include/realtime_tools/lock_free_queue.hpp b/include/realtime_tools/lock_free_queue.hpp new file mode 100644 index 00000000..87609d67 --- /dev/null +++ b/include/realtime_tools/lock_free_queue.hpp @@ -0,0 +1,108 @@ +// Copyright 2025 PAL Robotics S.L. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/// \author Sai Kishor Kothakota + +#ifndef REALTIME_TOOLS__LOCK_FREE_QUEUE_HPP_ +#define REALTIME_TOOLS__LOCK_FREE_QUEUE_HPP_ + +#include +#include +#include +#include +#include +#include + +namespace +{ + // Trait to check if the capacity is set +template +struct has_capacity : std::false_type {}; + +template +struct has_capacity>> : std::true_type {}; + +template +struct has_capacity>> : std::true_type {}; + +template +struct has_capacity, boost::lockfree::fixed_sized>> : std::true_type {}; +} + +namespace realtime_tools +{ +template +class LockFreeSPSCQueueBase +{ +public: + using T = DataType; + + // enable this constructor only if the queue has capacity set + template ::value, typename std::enable_if_t = 0> + LockFreeSPSCQueueBase() + { + } + + // enable this constructor only if the queue has no capacity set + template ::value, typename std::enable_if_t = 1> + LockFreeSPSCQueueBase(std::size_t capacity) + : data_queue_(capacity) + { + } + + /** + * @brief Constructor for objects that don't have + * a default constructor + * @param data The object to use as default value + */ + explicit LockFreeSPSCQueueBase(const T & data) + { + } + + virtual ~LockFreeSPSCQueueBase() = default; + + [[nodiscard]] bool pop(T & data) + { + return data_queue_.pop(data); + } + + [[nodiscard]] bool push(const T & data) + { + return data_queue_.push(data); + } + + bool empty() const + { + return data_queue_.empty(); + } + + const LockFreeSPSCContainer& get_lockfree_container() const + { + return data_queue_; + } + + LockFreeSPSCContainer& get_lockfree_container() + { + return data_queue_; + } + +private: +LockFreeSPSCContainer data_queue_; +}; // class + +template +using LockFreeBuffer = LockFreeSPSCQueueBase>>; + +} // namespace realtime_tools +#endif // REALTIME_TOOLS__LOCK_FREE_QUEUE_HPP_ From 1468fe8efba4d29e82da74f1232af3d5ea7ae672 Mon Sep 17 00:00:00 2001 From: Sai Kishor Kothakota Date: Sun, 29 Dec 2024 01:40:19 +0100 Subject: [PATCH 04/32] Add more functionality and tests --- include/realtime_tools/lock_free_queue.hpp | 132 +++++++++++++----- test/lock_free_queue_tests.cpp | 153 +++++++++++++++++++++ 2 files changed, 247 insertions(+), 38 deletions(-) create mode 100644 test/lock_free_queue_tests.cpp diff --git a/include/realtime_tools/lock_free_queue.hpp b/include/realtime_tools/lock_free_queue.hpp index 87609d67..fd3a0dfa 100644 --- a/include/realtime_tools/lock_free_queue.hpp +++ b/include/realtime_tools/lock_free_queue.hpp @@ -19,26 +19,78 @@ #include #include +#include +#include + #include #include #include -#include namespace { - // Trait to check if the capacity is set +// Trait to check if the capacity is set template -struct has_capacity : std::false_type {}; +struct has_capacity : std::false_type +{ +}; template -struct has_capacity>> : std::true_type {}; +struct has_capacity>> +: std::true_type +{ +}; template -struct has_capacity>> : std::true_type {}; +struct has_capacity>> : std::true_type +{ +}; template -struct has_capacity, boost::lockfree::fixed_sized>> : std::true_type {}; -} +struct has_capacity, boost::lockfree::fixed_sized>> : std::true_type +{ +}; + +template +struct is_spsc_queue : std::false_type +{ +}; + +template +struct is_spsc_queue> : std::true_type +{ +}; + +template +struct is_spsc_queue>> +: std::true_type +{ +}; + +// Default case: no capacity +template +struct get_boost_lockfree_queue_capacity +{ + static constexpr std::size_t value = 0; // Default to 0 if capacity is not defined +}; + +// Specialization for queues with capacity +template +struct get_boost_lockfree_queue_capacity< + boost::lockfree::spsc_queue>> +{ + static constexpr std::size_t value = Capacity; +}; + +// Specialization for queues with capacity +template +struct get_boost_lockfree_queue_capacity< + boost::lockfree::queue>> +{ + static constexpr std::size_t value = Capacity; +}; + +} // namespace namespace realtime_tools { @@ -49,60 +101,64 @@ class LockFreeSPSCQueueBase using T = DataType; // enable this constructor only if the queue has capacity set - template ::value, typename std::enable_if_t = 0> + template < + bool HasCapacity = has_capacity::value, + typename std::enable_if_t = 0> LockFreeSPSCQueueBase() + : capacity_(get_boost_lockfree_queue_capacity::value) { } // enable this constructor only if the queue has no capacity set - template ::value, typename std::enable_if_t = 1> - LockFreeSPSCQueueBase(std::size_t capacity) - : data_queue_(capacity) - { - } - - /** - * @brief Constructor for objects that don't have - * a default constructor - * @param data The object to use as default value - */ - explicit LockFreeSPSCQueueBase(const T & data) + template < + bool HasCapacity = has_capacity::value, + typename std::enable_if_t = 1> + explicit LockFreeSPSCQueueBase(std::size_t capacity) : data_queue_(capacity), capacity_(capacity) { } virtual ~LockFreeSPSCQueueBase() = default; - [[nodiscard]] bool pop(T & data) - { - return data_queue_.pop(data); - } + [[nodiscard]] bool pop(T & data) { return data_queue_.pop(data); } - [[nodiscard]] bool push(const T & data) + template + std::enable_if_t, bool> push(const U & data) { return data_queue_.push(data); } - bool empty() const + template + std::enable_if_t, bool> bounded_push(const U & data) { - return data_queue_.empty(); + if (!data_queue_.push(data)) { + data_queue_.pop(); + return data_queue_.push(data); + } + return true; } - const LockFreeSPSCContainer& get_lockfree_container() const - { - return data_queue_; - } + [[nodiscard]] bool push(const T & data) { return data_queue_.push(data); } - LockFreeSPSCContainer& get_lockfree_container() - { - return data_queue_; - } + bool empty() const { return data_queue_.read_available() == 0u; } + + size_t capacity() const { return capacity_; } + + std::size_t size() const { return data_queue_.read_available(); } + + const LockFreeSPSCContainer & get_lockfree_container() const { return data_queue_; } + + LockFreeSPSCContainer & get_lockfree_container() { return data_queue_; } private: -LockFreeSPSCContainer data_queue_; + LockFreeSPSCContainer data_queue_; + std::size_t capacity_; }; // class -template -using LockFreeBuffer = LockFreeSPSCQueueBase>>; +template +using LockFreeQueue = std::conditional_t< + Capacity == 0, LockFreeSPSCQueueBase>, + LockFreeSPSCQueueBase< + DataType, boost::lockfree::spsc_queue>>>; } // namespace realtime_tools #endif // REALTIME_TOOLS__LOCK_FREE_QUEUE_HPP_ diff --git a/test/lock_free_queue_tests.cpp b/test/lock_free_queue_tests.cpp new file mode 100644 index 00000000..5537e4c8 --- /dev/null +++ b/test/lock_free_queue_tests.cpp @@ -0,0 +1,153 @@ +// Copyright (c) 2019, Open Source Robotics Foundation, Inc. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// +// * Neither the name of the Open Source Robotics Foundation, Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. + +#include +#include + +using realtime_tools::LockFreeQueue; + +class DefaultConstructable +{ +public: + DefaultConstructable() : number_(42) {} + ~DefaultConstructable() {} + int number_; +}; + +TEST(LockFreeQueue, default_construct) +{ + { + LockFreeQueue buffer(10); + DefaultConstructable obj1; + ASSERT_EQ(10, buffer.capacity()); + ASSERT_TRUE(buffer.empty()) << "Buffer should be empty"; + ASSERT_FALSE(buffer.pop(obj1)) << "Buffer should be empty"; + ASSERT_TRUE(buffer.push(DefaultConstructable())) << "Buffer should have space for one element"; + ASSERT_EQ(10, buffer.capacity()); + ASSERT_EQ(1, buffer.size()) << "Buffer should have one element"; + ASSERT_TRUE(buffer.pop(obj1)); + ASSERT_EQ(10, buffer.capacity()); + ASSERT_EQ(42, obj1.number_); + } + { + LockFreeQueue buffer; + DefaultConstructable obj1; + ASSERT_EQ(10, buffer.capacity()); + ASSERT_TRUE(buffer.empty()) << "Buffer should be empty"; + ASSERT_FALSE(buffer.pop(obj1)) << "Buffer should be empty"; + ASSERT_TRUE(buffer.push(DefaultConstructable())) << "Buffer should have space for one element"; + ASSERT_EQ(10, buffer.capacity()); + ASSERT_EQ(1, buffer.size()) << "Buffer should have one element"; + ASSERT_TRUE(buffer.pop(obj1)); + ASSERT_EQ(10, buffer.capacity()); + ASSERT_EQ(42, obj1.number_); + } +} + +TEST(LockFreeQueue, initialize_value) +{ + LockFreeQueue buffer; + ASSERT_TRUE(buffer.empty()) << "Buffer should be empty"; + ASSERT_TRUE(buffer.push(3.14)) << "Buffer should have space for one element"; + ASSERT_EQ(1, buffer.size()) << "Buffer should have one element"; + double obj1; + ASSERT_TRUE(buffer.pop(obj1)); + ASSERT_DOUBLE_EQ(3.14, obj1); + ASSERT_EQ(0, buffer.size()) << "Buffer should be empty"; +} + +TEST(LockFreeQueue, test_push) +{ + LockFreeQueue buffer; + ASSERT_TRUE(buffer.empty()) << "Buffer should be empty"; + for (auto i = 1; i <= 10; i++) { + ASSERT_TRUE(buffer.push(i)) << "Buffer should have space for element as size is 10"; + ASSERT_EQ(i, buffer.size()); + ASSERT_EQ(10, buffer.capacity()); + } + ASSERT_FALSE(buffer.push(11)) << "Buffer should not have space for element as size is 10"; + ASSERT_EQ(10, buffer.size()); +} + +TEST(LockFreeQueue, test_pop) +{ + LockFreeQueue buffer; + ASSERT_TRUE(buffer.empty()) << "Buffer should be empty"; + for (auto i = 1; i <= 10; i++) { + ASSERT_TRUE(buffer.push(i)) << "Buffer should have space for element as size is 10"; + ASSERT_EQ(i, buffer.size()); + } + for (auto i = 1; i <= 10; i++) { + double obj1; + ASSERT_TRUE(buffer.pop(obj1)); + ASSERT_EQ(i, obj1); + ASSERT_EQ(10 - i, buffer.size()); + } + ASSERT_TRUE(buffer.empty()) << "Buffer should be empty"; + double obj1; + ASSERT_FALSE(buffer.pop(obj1)) << "Buffer should be empty"; +} + +TEST(LockFreeQueue, test_bounded_push) +{ + LockFreeQueue buffer; + ASSERT_TRUE(buffer.empty()) << "Buffer should be empty"; + for (auto i = 1; i <= 25; i++) { + ASSERT_TRUE(buffer.bounded_push(i)) << "Buffer should have space for element as size is 10"; + ASSERT_EQ(std::min(i, 10), buffer.size()); + ASSERT_EQ(10, buffer.capacity()); + } + ASSERT_EQ(10, buffer.size()); + ASSERT_EQ(10, buffer.capacity()); + + // when we start popping, the pop-ed elements should start from 16 + for (auto i = 1; i <= buffer.capacity(); i++) { + double obj1; + ASSERT_TRUE(buffer.pop(obj1)); + ASSERT_EQ(i + 15, obj1); + ASSERT_EQ(buffer.capacity() - i, buffer.size()); + } + double obj1; + ASSERT_FALSE(buffer.pop(obj1)); + ASSERT_TRUE(buffer.empty()) << "Buffer should be empty"; +} + +// TEST(LockFreeQueue, write_read_non_rt) +// { +// LockFreeQueue buffer(42); + +// buffer.writeFromNonRT(28); +// EXPECT_EQ(28, *buffer.readFromNonRT()); +// } + +// TEST(LockFreeQueue, initRT) +// { +// LockFreeQueue buffer(42); +// buffer.initRT(28); +// EXPECT_EQ(28, *buffer.readFromRT()); +// } From 2ef8020f6536026b09cefd2a430bde60343ded51 Mon Sep 17 00:00:00 2001 From: Sai Kishor Kothakota Date: Mon, 30 Dec 2024 16:46:39 +0100 Subject: [PATCH 05/32] Add tests to CMakeLists --- CMakeLists.txt | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CMakeLists.txt b/CMakeLists.txt index 8540aa63..5127302c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -74,6 +74,9 @@ if(BUILD_TESTING) ament_add_gmock(realtime_buffer_tests test/realtime_buffer_tests.cpp) target_link_libraries(realtime_buffer_tests realtime_tools) + ament_add_gmock(lock_free_queue_tests test/lock_free_queue_tests.cpp) + target_link_libraries(lock_free_queue_tests realtime_tools) + ament_add_gmock(realtime_clock_tests test/realtime_clock_tests.cpp) target_link_libraries(realtime_clock_tests realtime_tools) From 268138b0fe02152fee0436f1442d8592f8244350 Mon Sep 17 00:00:00 2001 From: Sai Kishor Kothakota Date: Mon, 30 Dec 2024 16:52:32 +0100 Subject: [PATCH 06/32] Draft: functional lockfree queue (not spsc_queue) --- include/realtime_tools/lock_free_queue.hpp | 28 +++- test/lock_free_queue_tests.cpp | 153 ++++++++++++++++----- 2 files changed, 142 insertions(+), 39 deletions(-) diff --git a/include/realtime_tools/lock_free_queue.hpp b/include/realtime_tools/lock_free_queue.hpp index fd3a0dfa..a1a716b0 100644 --- a/include/realtime_tools/lock_free_queue.hpp +++ b/include/realtime_tools/lock_free_queue.hpp @@ -90,6 +90,14 @@ struct get_boost_lockfree_queue_capacity< static constexpr std::size_t value = Capacity; }; +// Specialization for queues with capacity +template +struct get_boost_lockfree_queue_capacity< + boost::lockfree::queue, boost::lockfree::fixed_sized>> +{ + static constexpr std::size_t value = Capacity; +}; + } // namespace namespace realtime_tools @@ -131,19 +139,27 @@ class LockFreeSPSCQueueBase std::enable_if_t, bool> bounded_push(const U & data) { if (!data_queue_.push(data)) { - data_queue_.pop(); - return data_queue_.push(data); + // data_queue_.pop(); + // return data_queue_.push(data); + T dummy; + if (!data_queue_.pop(dummy)) { + std::cerr << "LockFreeSPSCQueueBase::bounded_push: queue is full and pop succeeded\n"; + return data_queue_.push(data); + } else { + // std::cerr << "LockFreeSPSCQueueBase::bounded_push: queue is full and pop failed\n"; + return data_queue_.push(data); + } } return true; } [[nodiscard]] bool push(const T & data) { return data_queue_.push(data); } - bool empty() const { return data_queue_.read_available() == 0u; } + bool empty() const { return data_queue_.empty(); } size_t capacity() const { return capacity_; } - std::size_t size() const { return data_queue_.read_available(); } + std::size_t size() const { return 10; } const LockFreeSPSCContainer & get_lockfree_container() const { return data_queue_; } @@ -156,9 +172,9 @@ class LockFreeSPSCQueueBase template using LockFreeQueue = std::conditional_t< - Capacity == 0, LockFreeSPSCQueueBase>, + Capacity == 0, LockFreeSPSCQueueBase>, LockFreeSPSCQueueBase< - DataType, boost::lockfree::spsc_queue>>>; + DataType, boost::lockfree::queue, boost::lockfree::fixed_sized>>>; } // namespace realtime_tools #endif // REALTIME_TOOLS__LOCK_FREE_QUEUE_HPP_ diff --git a/test/lock_free_queue_tests.cpp b/test/lock_free_queue_tests.cpp index 5537e4c8..dcd2e8db 100644 --- a/test/lock_free_queue_tests.cpp +++ b/test/lock_free_queue_tests.cpp @@ -27,6 +27,8 @@ // POSSIBILITY OF SUCH DAMAGE. #include +#include + #include using realtime_tools::LockFreeQueue; @@ -35,25 +37,24 @@ class DefaultConstructable { public: DefaultConstructable() : number_(42) {} - ~DefaultConstructable() {} int number_; }; TEST(LockFreeQueue, default_construct) { - { - LockFreeQueue buffer(10); - DefaultConstructable obj1; - ASSERT_EQ(10, buffer.capacity()); - ASSERT_TRUE(buffer.empty()) << "Buffer should be empty"; - ASSERT_FALSE(buffer.pop(obj1)) << "Buffer should be empty"; - ASSERT_TRUE(buffer.push(DefaultConstructable())) << "Buffer should have space for one element"; - ASSERT_EQ(10, buffer.capacity()); - ASSERT_EQ(1, buffer.size()) << "Buffer should have one element"; - ASSERT_TRUE(buffer.pop(obj1)); - ASSERT_EQ(10, buffer.capacity()); - ASSERT_EQ(42, obj1.number_); - } + // { + // LockFreeQueue buffer(10); + // DefaultConstructable obj1; + // ASSERT_EQ(10, buffer.capacity()); + // ASSERT_TRUE(buffer.empty()) << "Buffer should be empty"; + // ASSERT_FALSE(buffer.pop(obj1)) << "Buffer should be empty"; + // ASSERT_TRUE(buffer.push(DefaultConstructable())) << "Buffer should have space for one element"; + // ASSERT_EQ(10, buffer.capacity()); + // ASSERT_EQ(1, buffer.size()) << "Buffer should have one element"; + // ASSERT_TRUE(buffer.pop(obj1)); + // ASSERT_EQ(10, buffer.capacity()); + // ASSERT_EQ(42, obj1.number_); + // } { LockFreeQueue buffer; DefaultConstructable obj1; @@ -62,7 +63,7 @@ TEST(LockFreeQueue, default_construct) ASSERT_FALSE(buffer.pop(obj1)) << "Buffer should be empty"; ASSERT_TRUE(buffer.push(DefaultConstructable())) << "Buffer should have space for one element"; ASSERT_EQ(10, buffer.capacity()); - ASSERT_EQ(1, buffer.size()) << "Buffer should have one element"; + // ASSERT_EQ(1, buffer.size()) << "Buffer should have one element"; ASSERT_TRUE(buffer.pop(obj1)); ASSERT_EQ(10, buffer.capacity()); ASSERT_EQ(42, obj1.number_); @@ -74,11 +75,11 @@ TEST(LockFreeQueue, initialize_value) LockFreeQueue buffer; ASSERT_TRUE(buffer.empty()) << "Buffer should be empty"; ASSERT_TRUE(buffer.push(3.14)) << "Buffer should have space for one element"; - ASSERT_EQ(1, buffer.size()) << "Buffer should have one element"; + // ASSERT_EQ(1, buffer.size()) << "Buffer should have one element"; double obj1; ASSERT_TRUE(buffer.pop(obj1)); ASSERT_DOUBLE_EQ(3.14, obj1); - ASSERT_EQ(0, buffer.size()) << "Buffer should be empty"; + // ASSERT_EQ(0, buffer.size()) << "Buffer should be empty"; } TEST(LockFreeQueue, test_push) @@ -87,11 +88,11 @@ TEST(LockFreeQueue, test_push) ASSERT_TRUE(buffer.empty()) << "Buffer should be empty"; for (auto i = 1; i <= 10; i++) { ASSERT_TRUE(buffer.push(i)) << "Buffer should have space for element as size is 10"; - ASSERT_EQ(i, buffer.size()); + // ASSERT_EQ(i, buffer.size()); ASSERT_EQ(10, buffer.capacity()); } ASSERT_FALSE(buffer.push(11)) << "Buffer should not have space for element as size is 10"; - ASSERT_EQ(10, buffer.size()); + // ASSERT_EQ(10, buffer.size()); } TEST(LockFreeQueue, test_pop) @@ -100,13 +101,13 @@ TEST(LockFreeQueue, test_pop) ASSERT_TRUE(buffer.empty()) << "Buffer should be empty"; for (auto i = 1; i <= 10; i++) { ASSERT_TRUE(buffer.push(i)) << "Buffer should have space for element as size is 10"; - ASSERT_EQ(i, buffer.size()); + // ASSERT_EQ(i, buffer.size()); } for (auto i = 1; i <= 10; i++) { double obj1; ASSERT_TRUE(buffer.pop(obj1)); ASSERT_EQ(i, obj1); - ASSERT_EQ(10 - i, buffer.size()); + // ASSERT_EQ(10 - i, buffer.size()); } ASSERT_TRUE(buffer.empty()) << "Buffer should be empty"; double obj1; @@ -115,26 +116,112 @@ TEST(LockFreeQueue, test_pop) TEST(LockFreeQueue, test_bounded_push) { - LockFreeQueue buffer; - ASSERT_TRUE(buffer.empty()) << "Buffer should be empty"; + LockFreeQueue queue; + ASSERT_TRUE(queue.empty()) << "Buffer should be empty"; for (auto i = 1; i <= 25; i++) { - ASSERT_TRUE(buffer.bounded_push(i)) << "Buffer should have space for element as size is 10"; - ASSERT_EQ(std::min(i, 10), buffer.size()); - ASSERT_EQ(10, buffer.capacity()); + ASSERT_TRUE(queue.bounded_push(i)) << "Buffer should have space for element as size is 10"; + // ASSERT_EQ(std::min(i, 10), queue.size()); + ASSERT_EQ(10, queue.capacity()); } - ASSERT_EQ(10, buffer.size()); - ASSERT_EQ(10, buffer.capacity()); + // ASSERT_EQ(10, queue.size()); + ASSERT_EQ(10, queue.capacity()); // when we start popping, the pop-ed elements should start from 16 - for (auto i = 1; i <= buffer.capacity(); i++) { + for (auto i = 1u; i <= queue.capacity(); i++) { double obj1; - ASSERT_TRUE(buffer.pop(obj1)); + ASSERT_TRUE(queue.pop(obj1)); ASSERT_EQ(i + 15, obj1); - ASSERT_EQ(buffer.capacity() - i, buffer.size()); + // ASSERT_EQ(queue.capacity() - i, queue.size()); } double obj1; - ASSERT_FALSE(buffer.pop(obj1)); - ASSERT_TRUE(buffer.empty()) << "Buffer should be empty"; + ASSERT_FALSE(queue.pop(obj1)); + ASSERT_TRUE(queue.empty()) << "Buffer should be empty"; +} + +TEST(LockFreeQueue, test_lockfree_spsc_queue_push) +{ + LockFreeQueue spsc_queue(100); + int producer_count = 0; + std::atomic_int consumer_count(0); + std::atomic_bool done(false); + + const int iterations = 1000000; + + std::thread producer([&]() { + for (int i = 0; i < iterations; ++i) { + while (!spsc_queue.push(i)) { + std::this_thread::yield(); + } + ++producer_count; + } + }); + + std::thread consumer([&]() { + int value; + while (!done) { + while (spsc_queue.pop(value)) { + ++consumer_count; + } + } + while (spsc_queue.pop(value)) { + ++consumer_count; + } + }); + + producer.join(); + done = true; + consumer.join(); + + ASSERT_EQ(producer_count, consumer_count); + ASSERT_EQ(producer_count, iterations); + ASSERT_EQ(consumer_count, iterations); +} + +TEST(LockFreeQueue, test_lockfree_spsc_queue_bounded_push) +{ + LockFreeQueue spsc_queue; + std::atomic_int producer_count = 0; + std::atomic_int consumer_count(0); + std::atomic_bool done(false); + + const int iterations = 1000000; + + std::thread producer([&]() { + for (auto j = 0; j < iterations; ++j) { + ASSERT_TRUE(spsc_queue.bounded_push(j)); + ASSERT_EQ(100, spsc_queue.capacity()); + ++producer_count; + } + }); + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + std::cerr << "producer_count: " << producer_count << std::endl; + + std::thread consumer([&]() { + int value; + while (!done) { + while (spsc_queue.pop(value)) { + ++consumer_count; + std::this_thread::yield(); + } + } + while (spsc_queue.pop(value)) { + ++consumer_count; + } + }); + + + producer.join(); + done = true; + consumer.join(); + + std::cerr << "producer_count: " << producer_count << std::endl; + std::cerr << "consumer_count: " << consumer_count << std::endl; + std::cerr << "iterations: " << iterations << std::endl; + ASSERT_GT(producer_count, consumer_count); + ASSERT_EQ(producer_count, iterations); + ASSERT_GT(iterations, consumer_count); } // TEST(LockFreeQueue, write_read_non_rt) From 5e94cc95b8fb90d6e297b7f02c8dedae0bbea624 Mon Sep 17 00:00:00 2001 From: Sai Kishor Kothakota Date: Tue, 31 Dec 2024 11:46:14 +0100 Subject: [PATCH 07/32] Add more template enabled methods --- include/realtime_tools/lock_free_queue.hpp | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/include/realtime_tools/lock_free_queue.hpp b/include/realtime_tools/lock_free_queue.hpp index a1a716b0..0ef7ff61 100644 --- a/include/realtime_tools/lock_free_queue.hpp +++ b/include/realtime_tools/lock_free_queue.hpp @@ -142,24 +142,30 @@ class LockFreeSPSCQueueBase // data_queue_.pop(); // return data_queue_.push(data); T dummy; - if (!data_queue_.pop(dummy)) { - std::cerr << "LockFreeSPSCQueueBase::bounded_push: queue is full and pop succeeded\n"; - return data_queue_.push(data); - } else { - // std::cerr << "LockFreeSPSCQueueBase::bounded_push: queue is full and pop failed\n"; - return data_queue_.push(data); - } + data_queue_.pop(dummy); + return data_queue_.push(data); } return true; } [[nodiscard]] bool push(const T & data) { return data_queue_.push(data); } + template < + bool IsSPSCQueue = is_spsc_queue::value, + typename std::enable_if_t = 0> + bool empty() const { return data_queue_.read_available() == 0; } + + template < + bool IsSPSCQueue = is_spsc_queue::value, + typename std::enable_if_t = 1> bool empty() const { return data_queue_.empty(); } size_t capacity() const { return capacity_; } - std::size_t size() const { return 10; } + template < + bool IsSPSCQueue = is_spsc_queue::value, + typename std::enable_if_t = 0> + std::size_t size() const { return data_queue_.read_available(); } const LockFreeSPSCContainer & get_lockfree_container() const { return data_queue_; } From 34d03b594ebf9d35f1a4218d6f9677fee0dc93ee Mon Sep 17 00:00:00 2001 From: Sai Kishor Kothakota Date: Tue, 31 Dec 2024 11:50:37 +0100 Subject: [PATCH 08/32] update tests --- test/lock_free_queue_tests.cpp | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/test/lock_free_queue_tests.cpp b/test/lock_free_queue_tests.cpp index dcd2e8db..b3bed511 100644 --- a/test/lock_free_queue_tests.cpp +++ b/test/lock_free_queue_tests.cpp @@ -42,19 +42,19 @@ class DefaultConstructable TEST(LockFreeQueue, default_construct) { - // { - // LockFreeQueue buffer(10); - // DefaultConstructable obj1; - // ASSERT_EQ(10, buffer.capacity()); - // ASSERT_TRUE(buffer.empty()) << "Buffer should be empty"; - // ASSERT_FALSE(buffer.pop(obj1)) << "Buffer should be empty"; - // ASSERT_TRUE(buffer.push(DefaultConstructable())) << "Buffer should have space for one element"; - // ASSERT_EQ(10, buffer.capacity()); - // ASSERT_EQ(1, buffer.size()) << "Buffer should have one element"; - // ASSERT_TRUE(buffer.pop(obj1)); - // ASSERT_EQ(10, buffer.capacity()); - // ASSERT_EQ(42, obj1.number_); - // } + { + LockFreeQueue buffer(10); + DefaultConstructable obj1; + ASSERT_EQ(10, buffer.capacity()); + ASSERT_TRUE(buffer.empty()) << "Buffer should be empty"; + ASSERT_FALSE(buffer.pop(obj1)) << "Buffer should be empty"; + ASSERT_TRUE(buffer.push(DefaultConstructable())) << "Buffer should have space for one element"; + ASSERT_EQ(10, buffer.capacity()); + // ASSERT_EQ(1, buffer.size()) << "Buffer should have one element"; + ASSERT_TRUE(buffer.pop(obj1)); + ASSERT_EQ(10, buffer.capacity()); + ASSERT_EQ(42, obj1.number_); + } { LockFreeQueue buffer; DefaultConstructable obj1; From 6b629b13b1a8e481aa943fc5f086d8b35ae9366d Mon Sep 17 00:00:00 2001 From: Sai Kishor Kothakota Date: Tue, 31 Dec 2024 18:47:08 +0100 Subject: [PATCH 09/32] Update the LockFreeQueue to completely support SPSC and MPMC queues + update tests --- include/realtime_tools/lock_free_queue.hpp | 68 ++++- test/lock_free_queue_tests.cpp | 317 +++++++++++++++++---- 2 files changed, 322 insertions(+), 63 deletions(-) diff --git a/include/realtime_tools/lock_free_queue.hpp b/include/realtime_tools/lock_free_queue.hpp index 0ef7ff61..36f1e974 100644 --- a/include/realtime_tools/lock_free_queue.hpp +++ b/include/realtime_tools/lock_free_queue.hpp @@ -92,8 +92,8 @@ struct get_boost_lockfree_queue_capacity< // Specialization for queues with capacity template -struct get_boost_lockfree_queue_capacity< - boost::lockfree::queue, boost::lockfree::fixed_sized>> +struct get_boost_lockfree_queue_capacity, boost::lockfree::fixed_sized>> { static constexpr std::size_t value = Capacity; }; @@ -103,7 +103,7 @@ struct get_boost_lockfree_queue_capacity< namespace realtime_tools { template -class LockFreeSPSCQueueBase +class LockFreeQueueBase { public: using T = DataType; @@ -112,8 +112,7 @@ class LockFreeSPSCQueueBase template < bool HasCapacity = has_capacity::value, typename std::enable_if_t = 0> - LockFreeSPSCQueueBase() - : capacity_(get_boost_lockfree_queue_capacity::value) + LockFreeQueueBase() : capacity_(get_boost_lockfree_queue_capacity::value) { } @@ -121,11 +120,11 @@ class LockFreeSPSCQueueBase template < bool HasCapacity = has_capacity::value, typename std::enable_if_t = 1> - explicit LockFreeSPSCQueueBase(std::size_t capacity) : data_queue_(capacity), capacity_(capacity) + explicit LockFreeQueueBase(std::size_t capacity) : data_queue_(capacity), capacity_(capacity) { } - virtual ~LockFreeSPSCQueueBase() = default; + virtual ~LockFreeQueueBase() = default; [[nodiscard]] bool pop(T & data) { return data_queue_.pop(data); } @@ -135,7 +134,9 @@ class LockFreeSPSCQueueBase return data_queue_.push(data); } - template + template < + typename U, bool IsSPSCQueue = is_spsc_queue::value, + typename std::enable_if_t = 0> std::enable_if_t, bool> bounded_push(const U & data) { if (!data_queue_.push(data)) { @@ -148,24 +149,53 @@ class LockFreeSPSCQueueBase return true; } + template < + typename U, bool IsSPSCQueue = is_spsc_queue::value, + typename std::enable_if_t = 1> + std::enable_if_t, bool> bounded_push(const U & data) + { + if (!data_queue_.bounded_push(data)) { + // data_queue_.pop(); + // return data_queue_.push(data); + T dummy; + data_queue_.pop(dummy); + return data_queue_.bounded_push(data); + } + return true; + } + [[nodiscard]] bool push(const T & data) { return data_queue_.push(data); } template < bool IsSPSCQueue = is_spsc_queue::value, typename std::enable_if_t = 0> - bool empty() const { return data_queue_.read_available() == 0; } + bool empty() const + { + return data_queue_.read_available() == 0; + } template < bool IsSPSCQueue = is_spsc_queue::value, typename std::enable_if_t = 1> - bool empty() const { return data_queue_.empty(); } + bool empty() const + { + return data_queue_.empty(); + } size_t capacity() const { return capacity_; } template < bool IsSPSCQueue = is_spsc_queue::value, typename std::enable_if_t = 0> - std::size_t size() const { return data_queue_.read_available(); } + std::size_t size() const + { + return data_queue_.read_available(); + } + + // template < + // bool IsSPSCQueue = is_spsc_queue::value, + // typename std::enable_if_t = 1> + // std::size_t size() const { return 10; } const LockFreeSPSCContainer & get_lockfree_container() const { return data_queue_; } @@ -177,10 +207,18 @@ class LockFreeSPSCQueueBase }; // class template -using LockFreeQueue = std::conditional_t< - Capacity == 0, LockFreeSPSCQueueBase>, - LockFreeSPSCQueueBase< - DataType, boost::lockfree::queue, boost::lockfree::fixed_sized>>>; +using LockFreeSPSCQueue = std::conditional_t< + Capacity == 0, LockFreeQueueBase>, + LockFreeQueueBase< + DataType, boost::lockfree::spsc_queue>>>; + +template +using LockFreeMPMCQueue = std::conditional_t< + Capacity == 0, LockFreeQueueBase>, + LockFreeQueueBase< + DataType, + boost::lockfree::queue< + DataType, boost::lockfree::capacity, boost::lockfree::fixed_sized>>>; } // namespace realtime_tools #endif // REALTIME_TOOLS__LOCK_FREE_QUEUE_HPP_ diff --git a/test/lock_free_queue_tests.cpp b/test/lock_free_queue_tests.cpp index b3bed511..2933dfaf 100644 --- a/test/lock_free_queue_tests.cpp +++ b/test/lock_free_queue_tests.cpp @@ -31,7 +31,8 @@ #include -using realtime_tools::LockFreeQueue; +using realtime_tools::LockFreeMPMCQueue; +using realtime_tools::LockFreeSPSCQueue; class DefaultConstructable { @@ -40,107 +41,328 @@ class DefaultConstructable int number_; }; -TEST(LockFreeQueue, default_construct) +TEST(LockFreeSPSCQueue, default_construct) { { - LockFreeQueue buffer(10); + LockFreeSPSCQueue buffer(10); DefaultConstructable obj1; ASSERT_EQ(10, buffer.capacity()); ASSERT_TRUE(buffer.empty()) << "Buffer should be empty"; ASSERT_FALSE(buffer.pop(obj1)) << "Buffer should be empty"; ASSERT_TRUE(buffer.push(DefaultConstructable())) << "Buffer should have space for one element"; ASSERT_EQ(10, buffer.capacity()); - // ASSERT_EQ(1, buffer.size()) << "Buffer should have one element"; + ASSERT_EQ(1, buffer.size()) << "Buffer should have one element"; ASSERT_TRUE(buffer.pop(obj1)); ASSERT_EQ(10, buffer.capacity()); ASSERT_EQ(42, obj1.number_); } { - LockFreeQueue buffer; + LockFreeSPSCQueue buffer; DefaultConstructable obj1; ASSERT_EQ(10, buffer.capacity()); ASSERT_TRUE(buffer.empty()) << "Buffer should be empty"; ASSERT_FALSE(buffer.pop(obj1)) << "Buffer should be empty"; ASSERT_TRUE(buffer.push(DefaultConstructable())) << "Buffer should have space for one element"; ASSERT_EQ(10, buffer.capacity()); - // ASSERT_EQ(1, buffer.size()) << "Buffer should have one element"; + ASSERT_EQ(1, buffer.size()) << "Buffer should have one element"; ASSERT_TRUE(buffer.pop(obj1)); ASSERT_EQ(10, buffer.capacity()); ASSERT_EQ(42, obj1.number_); } } -TEST(LockFreeQueue, initialize_value) +TEST(LockFreeSPSCQueue, initialize_value) { - LockFreeQueue buffer; + LockFreeSPSCQueue buffer; ASSERT_TRUE(buffer.empty()) << "Buffer should be empty"; ASSERT_TRUE(buffer.push(3.14)) << "Buffer should have space for one element"; - // ASSERT_EQ(1, buffer.size()) << "Buffer should have one element"; + ASSERT_EQ(1, buffer.size()) << "Buffer should have one element"; double obj1; ASSERT_TRUE(buffer.pop(obj1)); ASSERT_DOUBLE_EQ(3.14, obj1); - // ASSERT_EQ(0, buffer.size()) << "Buffer should be empty"; + ASSERT_EQ(0, buffer.size()) << "Buffer should be empty"; + ASSERT_TRUE(buffer.empty()); } -TEST(LockFreeQueue, test_push) +TEST(LockFreeSPSCQueue, test_push) { - LockFreeQueue buffer; + LockFreeSPSCQueue buffer; ASSERT_TRUE(buffer.empty()) << "Buffer should be empty"; for (auto i = 1; i <= 10; i++) { ASSERT_TRUE(buffer.push(i)) << "Buffer should have space for element as size is 10"; - // ASSERT_EQ(i, buffer.size()); + ASSERT_EQ(i, buffer.size()); ASSERT_EQ(10, buffer.capacity()); } ASSERT_FALSE(buffer.push(11)) << "Buffer should not have space for element as size is 10"; - // ASSERT_EQ(10, buffer.size()); + ASSERT_EQ(10, buffer.size()); + ASSERT_FALSE(buffer.empty()); } -TEST(LockFreeQueue, test_pop) +TEST(LockFreeSPSCQueue, test_pop) { - LockFreeQueue buffer; + LockFreeSPSCQueue buffer; ASSERT_TRUE(buffer.empty()) << "Buffer should be empty"; for (auto i = 1; i <= 10; i++) { ASSERT_TRUE(buffer.push(i)) << "Buffer should have space for element as size is 10"; - // ASSERT_EQ(i, buffer.size()); + ASSERT_EQ(i, buffer.size()); } for (auto i = 1; i <= 10; i++) { double obj1; ASSERT_TRUE(buffer.pop(obj1)); ASSERT_EQ(i, obj1); - // ASSERT_EQ(10 - i, buffer.size()); + ASSERT_EQ(10 - i, buffer.size()); } ASSERT_TRUE(buffer.empty()) << "Buffer should be empty"; double obj1; ASSERT_FALSE(buffer.pop(obj1)) << "Buffer should be empty"; } -TEST(LockFreeQueue, test_bounded_push) +TEST(LockFreeSPSCQueue, test_bounded_push) { - LockFreeQueue queue; - ASSERT_TRUE(queue.empty()) << "Buffer should be empty"; - for (auto i = 1; i <= 25; i++) { - ASSERT_TRUE(queue.bounded_push(i)) << "Buffer should have space for element as size is 10"; - // ASSERT_EQ(std::min(i, 10), queue.size()); + { + LockFreeSPSCQueue queue; + ASSERT_TRUE(queue.empty()) << "Buffer should be empty"; + for (auto i = 1; i <= 25; i++) { + ASSERT_TRUE(queue.bounded_push(i)) << "Buffer should have space for element as size is 10"; + ASSERT_EQ(std::min(i, 10), queue.size()); + ASSERT_EQ(10, queue.capacity()); + } + ASSERT_EQ(10, queue.size()); ASSERT_EQ(10, queue.capacity()); + + // when we start popping, the pop-ed elements should start from 16 + for (auto i = 1u; i <= queue.capacity(); i++) { + double obj1; + ASSERT_TRUE(queue.pop(obj1)); + ASSERT_EQ(i + 15, obj1); + ASSERT_EQ(queue.capacity() - i, queue.size()); + } + double obj1; + ASSERT_FALSE(queue.pop(obj1)); + ASSERT_TRUE(queue.empty()) << "Buffer should be empty"; } - // ASSERT_EQ(10, queue.size()); - ASSERT_EQ(10, queue.capacity()); + { + LockFreeSPSCQueue queue(10); + ASSERT_TRUE(queue.empty()) << "Buffer should be empty"; + for (auto i = 1; i <= 25; i++) { + ASSERT_TRUE(queue.bounded_push(i)) << "Buffer should have space for element as size is 10"; + ASSERT_EQ(std::min(i, 10), queue.size()); + ASSERT_EQ(10, queue.capacity()); + } + ASSERT_EQ(10, queue.size()); + ASSERT_EQ(10, queue.capacity()); - // when we start popping, the pop-ed elements should start from 16 - for (auto i = 1u; i <= queue.capacity(); i++) { + // when we start popping, the pop-ed elements should start from 16 + for (auto i = 1u; i <= queue.capacity(); i++) { + double obj1; + ASSERT_TRUE(queue.pop(obj1)); + ASSERT_EQ(i + 15, obj1); + ASSERT_EQ(queue.capacity() - i, queue.size()); + } double obj1; - ASSERT_TRUE(queue.pop(obj1)); - ASSERT_EQ(i + 15, obj1); - // ASSERT_EQ(queue.capacity() - i, queue.size()); + ASSERT_FALSE(queue.pop(obj1)); + ASSERT_TRUE(queue.empty()) << "Buffer should be empty"; } +} + +TEST(LockFreeSPSCQueue, test_lockfree_queue_push) +{ + LockFreeSPSCQueue queue(100); + int producer_count = 0; + std::atomic_int consumer_count(0); + std::atomic_bool done(false); + + const int iterations = 1000000; + + std::thread producer([&]() { + for (int i = 0; i < iterations; ++i) { + while (!queue.push(i)) { + std::this_thread::yield(); + } + ++producer_count; + } + }); + + std::thread consumer([&]() { + int value; + while (!done) { + while (queue.pop(value)) { + ++consumer_count; + } + } + while (queue.pop(value)) { + ++consumer_count; + } + }); + + producer.join(); + done = true; + consumer.join(); + + ASSERT_EQ(producer_count, consumer_count); + ASSERT_EQ(producer_count, iterations); + ASSERT_EQ(consumer_count, iterations); +} + +// TEST(LockFreeSPSCQueue, test_lockfree_queue_bounded_push) +// { +// LockFreeSPSCQueue queue; +// std::atomic_int producer_count = 0; +// std::atomic_int consumer_count(0); +// std::atomic_bool done(false); + +// const int iterations = 1000000; + +// std::thread producer([&]() { +// for (auto j = 0; j < iterations; ++j) { +// ASSERT_TRUE(queue.bounded_push(j)); +// ASSERT_EQ(100, queue.capacity()); +// ++producer_count; +// } +// }); + +// std::this_thread::sleep_for(std::chrono::milliseconds(100)); + +// std::cerr << "producer_count: " << producer_count << std::endl; + +// std::thread consumer([&]() { +// int value; +// while (!done) { +// while (queue.pop(value)) { +// ++consumer_count; +// std::this_thread::yield(); +// } +// } +// while (queue.pop(value)) { +// ++consumer_count; +// } +// }); + +// producer.join(); +// done = true; +// consumer.join(); + +// std::cerr << "producer_count: " << producer_count << std::endl; +// std::cerr << "consumer_count: " << consumer_count << std::endl; +// std::cerr << "iterations: " << iterations << std::endl; +// ASSERT_GT(producer_count, consumer_count); +// ASSERT_EQ(producer_count, iterations); +// ASSERT_GT(iterations, consumer_count); +// } + +TEST(LockFreeMPMCQueue, default_construct) +{ + { + LockFreeMPMCQueue buffer(10); + DefaultConstructable obj1; + ASSERT_EQ(10, buffer.capacity()); + ASSERT_TRUE(buffer.empty()) << "Buffer should be empty"; + ASSERT_FALSE(buffer.pop(obj1)) << "Buffer should be empty"; + ASSERT_TRUE(buffer.push(DefaultConstructable())) << "Buffer should have space for one element"; + ASSERT_EQ(10, buffer.capacity()); + ASSERT_TRUE(buffer.pop(obj1)); + ASSERT_EQ(10, buffer.capacity()); + ASSERT_EQ(42, obj1.number_); + } + { + LockFreeMPMCQueue buffer; + DefaultConstructable obj1; + ASSERT_EQ(10, buffer.capacity()); + ASSERT_TRUE(buffer.empty()) << "Buffer should be empty"; + ASSERT_FALSE(buffer.pop(obj1)) << "Buffer should be empty"; + ASSERT_TRUE(buffer.push(DefaultConstructable())) << "Buffer should have space for one element"; + ASSERT_EQ(10, buffer.capacity()); + ASSERT_TRUE(buffer.pop(obj1)); + ASSERT_EQ(10, buffer.capacity()); + ASSERT_EQ(42, obj1.number_); + } +} + +TEST(LockFreeMPMCQueue, initialize_value) +{ + LockFreeMPMCQueue buffer; + ASSERT_TRUE(buffer.empty()) << "Buffer should be empty"; + ASSERT_TRUE(buffer.push(3.14)) << "Buffer should have space for one element"; double obj1; - ASSERT_FALSE(queue.pop(obj1)); - ASSERT_TRUE(queue.empty()) << "Buffer should be empty"; + ASSERT_TRUE(buffer.pop(obj1)); + ASSERT_DOUBLE_EQ(3.14, obj1); + ASSERT_TRUE(buffer.empty()) << "Buffer should be empty"; +} + +TEST(LockFreeMPMCQueue, test_push) +{ + LockFreeMPMCQueue buffer; + ASSERT_TRUE(buffer.empty()) << "Buffer should be empty"; + for (auto i = 1; i <= 10; i++) { + ASSERT_TRUE(buffer.push(i)) << "Buffer should have space for element as size is 10"; + ASSERT_EQ(10, buffer.capacity()); + } + ASSERT_FALSE(buffer.push(11)) << "Buffer should not have space for element as size is 10"; + ASSERT_FALSE(buffer.empty()); } -TEST(LockFreeQueue, test_lockfree_spsc_queue_push) +TEST(LockFreeMPMCQueue, test_pop) { - LockFreeQueue spsc_queue(100); + LockFreeMPMCQueue buffer; + ASSERT_TRUE(buffer.empty()) << "Buffer should be empty"; + for (auto i = 1; i <= 10; i++) { + ASSERT_TRUE(buffer.push(i)) << "Buffer should have space for element as size is 10"; + } + for (auto i = 1; i <= 10; i++) { + double obj1; + ASSERT_TRUE(buffer.pop(obj1)); + ASSERT_EQ(i, obj1); + } + ASSERT_TRUE(buffer.empty()) << "Buffer should be empty"; + double obj1; + ASSERT_FALSE(buffer.pop(obj1)) << "Buffer should be empty"; +} + +TEST(LockFreeMPMCQueue, test_bounded_push) +{ + { + LockFreeMPMCQueue queue; + ASSERT_TRUE(queue.empty()) << "Buffer should be empty"; + for (auto i = 1; i <= 25; i++) { + ASSERT_TRUE(queue.bounded_push(i)) << "Buffer should have space for element as size is 10"; + ASSERT_EQ(10, queue.capacity()); + } + ASSERT_EQ(10, queue.capacity()); + + // when we start popping, the pop-ed elements should start from 16 + for (auto i = 1u; i <= queue.capacity(); i++) { + double obj1; + ASSERT_TRUE(queue.pop(obj1)); + ASSERT_EQ(i + 15, obj1); + } + double obj1; + ASSERT_FALSE(queue.pop(obj1)); + ASSERT_TRUE(queue.empty()) << "Buffer should be empty"; + } + { + LockFreeMPMCQueue queue(10); + ASSERT_TRUE(queue.empty()) << "Buffer should be empty"; + for (auto i = 1; i <= 25; i++) { + ASSERT_TRUE(queue.bounded_push(i)) << "Buffer should have space for element as size is 10"; + ASSERT_EQ(10, queue.capacity()); + } + ASSERT_EQ(10, queue.capacity()); + + // when we start popping, the pop-ed elements should start from 16 + for (auto i = 1u; i <= queue.capacity(); i++) { + double obj1; + ASSERT_TRUE(queue.pop(obj1)); + ASSERT_EQ(i + 15, obj1); + } + double obj1; + ASSERT_FALSE(queue.pop(obj1)); + ASSERT_TRUE(queue.empty()) << "Buffer should be empty"; + } +} + +TEST(LockFreeMPMCQueue, test_lockfree_queue_push) +{ + LockFreeMPMCQueue queue; int producer_count = 0; std::atomic_int consumer_count(0); std::atomic_bool done(false); @@ -149,7 +371,7 @@ TEST(LockFreeQueue, test_lockfree_spsc_queue_push) std::thread producer([&]() { for (int i = 0; i < iterations; ++i) { - while (!spsc_queue.push(i)) { + while (!queue.push(i)) { std::this_thread::yield(); } ++producer_count; @@ -159,11 +381,11 @@ TEST(LockFreeQueue, test_lockfree_spsc_queue_push) std::thread consumer([&]() { int value; while (!done) { - while (spsc_queue.pop(value)) { + while (queue.pop(value)) { ++consumer_count; } } - while (spsc_queue.pop(value)) { + while (queue.pop(value)) { ++consumer_count; } }); @@ -177,9 +399,9 @@ TEST(LockFreeQueue, test_lockfree_spsc_queue_push) ASSERT_EQ(consumer_count, iterations); } -TEST(LockFreeQueue, test_lockfree_spsc_queue_bounded_push) +TEST(LockFreeMPMCQueue, test_lockfree_queue_bounded_push) { - LockFreeQueue spsc_queue; + LockFreeMPMCQueue queue; std::atomic_int producer_count = 0; std::atomic_int consumer_count(0); std::atomic_bool done(false); @@ -187,11 +409,11 @@ TEST(LockFreeQueue, test_lockfree_spsc_queue_bounded_push) const int iterations = 1000000; std::thread producer([&]() { - for (auto j = 0; j < iterations; ++j) { - ASSERT_TRUE(spsc_queue.bounded_push(j)); - ASSERT_EQ(100, spsc_queue.capacity()); - ++producer_count; - } + for (auto j = 0; j < iterations; ++j) { + ASSERT_TRUE(queue.bounded_push(j)); + ASSERT_EQ(100, queue.capacity()); + ++producer_count; + } }); std::this_thread::sleep_for(std::chrono::milliseconds(100)); @@ -201,17 +423,16 @@ TEST(LockFreeQueue, test_lockfree_spsc_queue_bounded_push) std::thread consumer([&]() { int value; while (!done) { - while (spsc_queue.pop(value)) { + while (queue.pop(value)) { ++consumer_count; std::this_thread::yield(); } } - while (spsc_queue.pop(value)) { + while (queue.pop(value)) { ++consumer_count; } }); - producer.join(); done = true; consumer.join(); From 9a9951dad9783cfb658471d001eb6072219ac595 Mon Sep 17 00:00:00 2001 From: Sai Kishor Kothakota Date: Wed, 1 Jan 2025 18:45:47 +0100 Subject: [PATCH 10/32] cleanup and change license --- test/lock_free_queue_tests.cpp | 49 +++++++--------------------------- 1 file changed, 10 insertions(+), 39 deletions(-) diff --git a/test/lock_free_queue_tests.cpp b/test/lock_free_queue_tests.cpp index 2933dfaf..e229b131 100644 --- a/test/lock_free_queue_tests.cpp +++ b/test/lock_free_queue_tests.cpp @@ -1,30 +1,16 @@ -// Copyright (c) 2019, Open Source Robotics Foundation, Inc. +// Copyright 2025 PAL Robotics S.L. // -// Redistribution and use in source and binary forms, with or without -// modification, are permitted provided that the following conditions are met: +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at // -// * Redistributions of source code must retain the above copyright -// notice, this list of conditions and the following disclaimer. +// http://www.apache.org/licenses/LICENSE-2.0 // -// * Redistributions in binary form must reproduce the above copyright -// notice, this list of conditions and the following disclaimer in the -// documentation and/or other materials provided with the distribution. -// -// * Neither the name of the Open Source Robotics Foundation, Inc. nor the names of its -// contributors may be used to endorse or promote products derived from -// this software without specific prior written permission. -// -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE -// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF -// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN -// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) -// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE -// POSSIBILITY OF SUCH DAMAGE. +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. #include #include @@ -444,18 +430,3 @@ TEST(LockFreeMPMCQueue, test_lockfree_queue_bounded_push) ASSERT_EQ(producer_count, iterations); ASSERT_GT(iterations, consumer_count); } - -// TEST(LockFreeQueue, write_read_non_rt) -// { -// LockFreeQueue buffer(42); - -// buffer.writeFromNonRT(28); -// EXPECT_EQ(28, *buffer.readFromNonRT()); -// } - -// TEST(LockFreeQueue, initRT) -// { -// LockFreeQueue buffer(42); -// buffer.initRT(28); -// EXPECT_EQ(28, *buffer.readFromRT()); -// } From 8c2c974902b23cf0d8ca2b5c3c16eb11f134bd06 Mon Sep 17 00:00:00 2001 From: Sai Kishor Kothakota Date: Wed, 1 Jan 2025 23:55:17 +0100 Subject: [PATCH 11/32] Update the documentation and add some minor tests --- include/realtime_tools/lock_free_queue.hpp | 127 ++++++++++++++++++--- test/lock_free_queue_tests.cpp | 18 +++ 2 files changed, 131 insertions(+), 14 deletions(-) diff --git a/include/realtime_tools/lock_free_queue.hpp b/include/realtime_tools/lock_free_queue.hpp index 36f1e974..231da29f 100644 --- a/include/realtime_tools/lock_free_queue.hpp +++ b/include/realtime_tools/lock_free_queue.hpp @@ -51,6 +51,7 @@ struct has_capacity struct is_spsc_queue : std::false_type { @@ -67,6 +68,7 @@ struct is_spsc_queue struct get_boost_lockfree_queue_capacity @@ -102,13 +104,21 @@ struct get_boost_lockfree_queue_capacity class LockFreeQueueBase { public: using T = DataType; - // enable this constructor only if the queue has capacity set + /** + * @brief Construct a new LockFreeQueueBase object + * @note This constructor is enabled only if the LockFreeSPSCContainer has a capacity set + */ template < bool HasCapacity = has_capacity::value, typename std::enable_if_t = 0> @@ -116,7 +126,11 @@ class LockFreeQueueBase { } - // enable this constructor only if the queue has no capacity set + /** + * @brief Construct a new LockFreeQueueBase object + * @param capacity Capacity of the queue + * @note This constructor is enabled only if the LockFreeSPSCContainer has no capacity set + */ template < bool HasCapacity = has_capacity::value, typename std::enable_if_t = 1> @@ -126,18 +140,62 @@ class LockFreeQueueBase virtual ~LockFreeQueueBase() = default; + /** + * @brief Pop the data from the queue + * @param data Data to be popped + * @return true If the data is popped successfully + * @return false If the queue is empty or the data could not be popped + */ [[nodiscard]] bool pop(T & data) { return data_queue_.pop(data); } + /** + * @brief Pop the data from the queue + * @param data Data to be popped + * @return true If the data is popped successfully + * @return false If the queue is empty or the data could not be popped + * @note This function is enabled only if the data type is convertible to the template type of the queue + */ template - std::enable_if_t, bool> push(const U & data) + [[nodiscard]] std::enable_if_t, bool> pop(U & data) + { + return data_queue_.pop(data); + } + + /** + * @brief Push the data into the queue + * @param data Data to be pushed + * @return true If the data is pushed successfully + * @return false If the queue is full or the data could not be pushed + */ + + [[nodiscard]] bool push(const T & data) { return data_queue_.push(data); } + + /** + * @brief Push the data into the queue + * @param data Data to be pushed + * @return true If the data is pushed successfully + * @return false If the queue is full or the data could not be pushed + * @note This function is enabled only if the data type is convertible to the template type of the queue + */ + template + [[nodiscard]] std::enable_if_t, bool> push(const U & data) { return data_queue_.push(data); } + /** + * @brief The bounded_push function pushes the data into the queue and pops the oldest data if the queue is full + * @param data Data to be pushed + * @return true If the data is pushed successfully + * @return false If the data could not be pushed + * @note This function is enabled only if the queue is a spsc_queue and only if the data type is convertible to the template type of the queue + * @note To be used in a single threaded applications + * @warning This method might not work as expected if it is used with 2 different threads one doing bounded_push and the other doing pop. In this case, the queue is no more a single producer single consumer queue. So, the behaviour might not be as expected. + */ template < typename U, bool IsSPSCQueue = is_spsc_queue::value, typename std::enable_if_t = 0> - std::enable_if_t, bool> bounded_push(const U & data) + [[nodiscard]] std::enable_if_t, bool> bounded_push(const U & data) { if (!data_queue_.push(data)) { // data_queue_.pop(); @@ -149,10 +207,18 @@ class LockFreeQueueBase return true; } + /** + * @brief The bounded_push function pushes the data into the queue and pops the oldest data if the queue is full + * @param data Data to be pushed + * @return true If the data is pushed successfully + * @return false If the data could not be pushed + * @note This function is enabled only if the queue is of multiple producer and multiple consumer type and only if the data type is convertible to the template type of the queue + * @note Can be used in a multi threaded applications + */ template < typename U, bool IsSPSCQueue = is_spsc_queue::value, typename std::enable_if_t = 1> - std::enable_if_t, bool> bounded_push(const U & data) + [[nodiscard]] std::enable_if_t, bool> bounded_push(const U & data) { if (!data_queue_.bounded_push(data)) { // data_queue_.pop(); @@ -164,8 +230,12 @@ class LockFreeQueueBase return true; } - [[nodiscard]] bool push(const T & data) { return data_queue_.push(data); } - + /** + * @brief Check if the queue is empty + * @return true If the queue is empty + * @return false If the queue is not empty + * @note This function is enabled only if the queue is a spsc_queue + */ template < bool IsSPSCQueue = is_spsc_queue::value, typename std::enable_if_t = 0> @@ -174,6 +244,12 @@ class LockFreeQueueBase return data_queue_.read_available() == 0; } + /** + * @brief Check if the queue is empty + * @return true If the queue is empty + * @return false If the queue is not empty + * @note This function is enabled only if the queue is of multiple producer and multiple consumer type + */ template < bool IsSPSCQueue = is_spsc_queue::value, typename std::enable_if_t = 1> @@ -182,8 +258,17 @@ class LockFreeQueueBase return data_queue_.empty(); } + /** + * @brief Get the capacity of the queue + * @return std::size_t Capacity of the queue + */ size_t capacity() const { return capacity_; } + /** + * @brief Get the size of the queue + * @return std::size_t Size of the queue + * @note This function is enabled only if the queue is a spsc_queue + */ template < bool IsSPSCQueue = is_spsc_queue::value, typename std::enable_if_t = 0> @@ -192,13 +277,16 @@ class LockFreeQueueBase return data_queue_.read_available(); } - // template < - // bool IsSPSCQueue = is_spsc_queue::value, - // typename std::enable_if_t = 1> - // std::size_t size() const { return 10; } - + /** + * @brief Get the lockfree container + * @return const LockFreeSPSCContainer& Reference to the lockfree container + */ const LockFreeSPSCContainer & get_lockfree_container() const { return data_queue_; } + /** + * @brief Get the lockfree container + * @return LockFreeSPSCContainer& Reference to the lockfree container + */ LockFreeSPSCContainer & get_lockfree_container() { return data_queue_; } private: @@ -206,19 +294,30 @@ class LockFreeQueueBase std::size_t capacity_; }; // class +/** + * @brief Lock-free Single Producer Single Consumer Queue + * @tparam DataType Type of the data to be stored in the queue + * @tparam Capacity Capacity of the queue + */ template using LockFreeSPSCQueue = std::conditional_t< Capacity == 0, LockFreeQueueBase>, LockFreeQueueBase< DataType, boost::lockfree::spsc_queue>>>; -template +/** + * @brief Lock-free Multiple Producer Multiple Consumer Queue + * @tparam DataType Type of the data to be stored in the queue + * @tparam Capacity Capacity of the queue + * @tparam FixedSize Fixed size of the queue + */ +template using LockFreeMPMCQueue = std::conditional_t< Capacity == 0, LockFreeQueueBase>, LockFreeQueueBase< DataType, boost::lockfree::queue< - DataType, boost::lockfree::capacity, boost::lockfree::fixed_sized>>>; + DataType, boost::lockfree::capacity, boost::lockfree::fixed_sized>>>; } // namespace realtime_tools #endif // REALTIME_TOOLS__LOCK_FREE_QUEUE_HPP_ diff --git a/test/lock_free_queue_tests.cpp b/test/lock_free_queue_tests.cpp index e229b131..7c75839a 100644 --- a/test/lock_free_queue_tests.cpp +++ b/test/lock_free_queue_tests.cpp @@ -68,6 +68,16 @@ TEST(LockFreeSPSCQueue, initialize_value) ASSERT_DOUBLE_EQ(3.14, obj1); ASSERT_EQ(0, buffer.size()) << "Buffer should be empty"; ASSERT_TRUE(buffer.empty()); + ASSERT_TRUE(buffer.push(2.71)) << "Buffer should have space for one element"; + ASSERT_EQ(1, buffer.size()) << "Buffer should have one element"; + int obj2; + ASSERT_TRUE(buffer.pop(obj2)); + ASSERT_EQ(2, obj2); + ASSERT_EQ(0, buffer.size()) << "Buffer should be empty"; + ASSERT_TRUE(buffer.empty()); + ASSERT_TRUE(buffer.push(6)); + ASSERT_TRUE(buffer.pop(obj1)); + ASSERT_EQ(6, obj1); } TEST(LockFreeSPSCQueue, test_push) @@ -273,6 +283,14 @@ TEST(LockFreeMPMCQueue, initialize_value) ASSERT_TRUE(buffer.pop(obj1)); ASSERT_DOUBLE_EQ(3.14, obj1); ASSERT_TRUE(buffer.empty()) << "Buffer should be empty"; + ASSERT_TRUE(buffer.push(2.71)) << "Buffer should have space for one element"; + int obj2; + ASSERT_TRUE(buffer.pop(obj2)); + ASSERT_EQ(2, obj2); + ASSERT_TRUE(buffer.empty()); + ASSERT_TRUE(buffer.push(6)); + ASSERT_TRUE(buffer.pop(obj1)); + ASSERT_EQ(6, obj1); } TEST(LockFreeMPMCQueue, test_push) From 860029ee3e8d3d6213107be21f341b32a79ec5a2 Mon Sep 17 00:00:00 2001 From: Sai Kishor Kothakota Date: Thu, 2 Jan 2025 00:23:00 +0100 Subject: [PATCH 12/32] Add is_lock_free method --- include/realtime_tools/lock_free_queue.hpp | 20 ++++++++++++++++++++ test/lock_free_queue_tests.cpp | 6 ++++++ 2 files changed, 26 insertions(+) diff --git a/include/realtime_tools/lock_free_queue.hpp b/include/realtime_tools/lock_free_queue.hpp index 231da29f..2718a45a 100644 --- a/include/realtime_tools/lock_free_queue.hpp +++ b/include/realtime_tools/lock_free_queue.hpp @@ -277,6 +277,26 @@ class LockFreeQueueBase return data_queue_.read_available(); } + /** + * @brief The method to check if the queue is lock free + * @return true If the queue is lock free, false otherwise + * @warning It only checks, if the queue head and tail nodes and the freelist can + * be modified in a lock-free manner. On most platforms, the whole implementation + * is lock-free, if this is true. Using c++0x-style atomics, there is no possibility + * to provide a completely accurate implementation, because one would need to test + * every internal node, which is impossible if further nodes will be allocated from + * the operating system. + * @link https://www.boost.org/doc/libs/1_74_0/doc/html/boost/lockfree/queue.html + */ + bool is_lock_free() const + { + if constexpr (is_spsc_queue::value) { + return true; + } else { + return data_queue_.is_lock_free(); + } + } + /** * @brief Get the lockfree container * @return const LockFreeSPSCContainer& Reference to the lockfree container diff --git a/test/lock_free_queue_tests.cpp b/test/lock_free_queue_tests.cpp index 7c75839a..93efab43 100644 --- a/test/lock_free_queue_tests.cpp +++ b/test/lock_free_queue_tests.cpp @@ -39,6 +39,7 @@ TEST(LockFreeSPSCQueue, default_construct) ASSERT_EQ(10, buffer.capacity()); ASSERT_EQ(1, buffer.size()) << "Buffer should have one element"; ASSERT_TRUE(buffer.pop(obj1)); + ASSERT_TRUE(buffer.is_lock_free()); ASSERT_EQ(10, buffer.capacity()); ASSERT_EQ(42, obj1.number_); } @@ -52,6 +53,7 @@ TEST(LockFreeSPSCQueue, default_construct) ASSERT_EQ(10, buffer.capacity()); ASSERT_EQ(1, buffer.size()) << "Buffer should have one element"; ASSERT_TRUE(buffer.pop(obj1)); + ASSERT_TRUE(buffer.is_lock_free()); ASSERT_EQ(10, buffer.capacity()); ASSERT_EQ(42, obj1.number_); } @@ -63,6 +65,7 @@ TEST(LockFreeSPSCQueue, initialize_value) ASSERT_TRUE(buffer.empty()) << "Buffer should be empty"; ASSERT_TRUE(buffer.push(3.14)) << "Buffer should have space for one element"; ASSERT_EQ(1, buffer.size()) << "Buffer should have one element"; + ASSERT_TRUE(buffer.is_lock_free()); double obj1; ASSERT_TRUE(buffer.pop(obj1)); ASSERT_DOUBLE_EQ(3.14, obj1); @@ -256,6 +259,7 @@ TEST(LockFreeMPMCQueue, default_construct) ASSERT_FALSE(buffer.pop(obj1)) << "Buffer should be empty"; ASSERT_TRUE(buffer.push(DefaultConstructable())) << "Buffer should have space for one element"; ASSERT_EQ(10, buffer.capacity()); + ASSERT_TRUE(buffer.is_lock_free()); ASSERT_TRUE(buffer.pop(obj1)); ASSERT_EQ(10, buffer.capacity()); ASSERT_EQ(42, obj1.number_); @@ -269,6 +273,7 @@ TEST(LockFreeMPMCQueue, default_construct) ASSERT_TRUE(buffer.push(DefaultConstructable())) << "Buffer should have space for one element"; ASSERT_EQ(10, buffer.capacity()); ASSERT_TRUE(buffer.pop(obj1)); + ASSERT_TRUE(buffer.is_lock_free()); ASSERT_EQ(10, buffer.capacity()); ASSERT_EQ(42, obj1.number_); } @@ -279,6 +284,7 @@ TEST(LockFreeMPMCQueue, initialize_value) LockFreeMPMCQueue buffer; ASSERT_TRUE(buffer.empty()) << "Buffer should be empty"; ASSERT_TRUE(buffer.push(3.14)) << "Buffer should have space for one element"; + ASSERT_TRUE(buffer.is_lock_free()); double obj1; ASSERT_TRUE(buffer.pop(obj1)); ASSERT_DOUBLE_EQ(3.14, obj1); From fe75f45296e50e17ff462ed7024b06565d965191 Mon Sep 17 00:00:00 2001 From: Sai Kishor Kothakota Date: Thu, 2 Jan 2025 00:23:09 +0100 Subject: [PATCH 13/32] Update documentation --- include/realtime_tools/lock_free_queue.hpp | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/include/realtime_tools/lock_free_queue.hpp b/include/realtime_tools/lock_free_queue.hpp index 2718a45a..34eff51c 100644 --- a/include/realtime_tools/lock_free_queue.hpp +++ b/include/realtime_tools/lock_free_queue.hpp @@ -188,9 +188,12 @@ class LockFreeQueueBase * @param data Data to be pushed * @return true If the data is pushed successfully * @return false If the data could not be pushed - * @note This function is enabled only if the queue is a spsc_queue and only if the data type is convertible to the template type of the queue + * @note This function is enabled only if the queue is a spsc_queue and only if the data type + * is convertible to the template type of the queue * @note To be used in a single threaded applications - * @warning This method might not work as expected if it is used with 2 different threads one doing bounded_push and the other doing pop. In this case, the queue is no more a single producer single consumer queue. So, the behaviour might not be as expected. + * @warning This method might not work as expected if it is used with 2 different threads one + * doing bounded_push and the other doing pop. In this case, the queue is no more a single producer + * single consumer queue. So, the behaviour might not be as expected. */ template < typename U, bool IsSPSCQueue = is_spsc_queue::value, @@ -212,7 +215,8 @@ class LockFreeQueueBase * @param data Data to be pushed * @return true If the data is pushed successfully * @return false If the data could not be pushed - * @note This function is enabled only if the queue is of multiple producer and multiple consumer type and only if the data type is convertible to the template type of the queue + * @note This function is enabled only if the queue is of multiple producer and multiple consumer + * type and only if the data type is convertible to the template type of the queue * @note Can be used in a multi threaded applications */ template < @@ -235,6 +239,7 @@ class LockFreeQueueBase * @return true If the queue is empty * @return false If the queue is not empty * @note This function is enabled only if the queue is a spsc_queue + * @note Should only be called from the consumer thread where pop is called */ template < bool IsSPSCQueue = is_spsc_queue::value, @@ -248,6 +253,8 @@ class LockFreeQueueBase * @brief Check if the queue is empty * @return true If the queue is empty * @return false If the queue is not empty + * @note The result is only accurate, if no other thread modifies the queue. + * Therefore it is rarely practical to use this value in program logic. * @note This function is enabled only if the queue is of multiple producer and multiple consumer type */ template < From 52f53b1d13297ba06ed9d4c86df69d1d7efbae02 Mon Sep 17 00:00:00 2001 From: Sai Kishor Kothakota Date: Thu, 2 Jan 2025 00:36:37 +0100 Subject: [PATCH 14/32] Add get_latest method to the LockFreeQueue --- include/realtime_tools/lock_free_queue.hpp | 12 +++++++++ test/lock_free_queue_tests.cpp | 31 ++++++++++++++++++++++ 2 files changed, 43 insertions(+) diff --git a/include/realtime_tools/lock_free_queue.hpp b/include/realtime_tools/lock_free_queue.hpp index 34eff51c..7e438a23 100644 --- a/include/realtime_tools/lock_free_queue.hpp +++ b/include/realtime_tools/lock_free_queue.hpp @@ -210,6 +210,18 @@ class LockFreeQueueBase return true; } + /** + * @brief Get the latest data from the queue + * @param data Data to be filled with the latest data + * @return true If the data is filled with the latest data, false otherwise + * @return false If the queue is empty + * @note This function consumes all the data in the queue until the last data and returns the last element of the queue + */ + [[nodiscard]] bool get_latest(T & data) + { + return data_queue_.consume_all([&data](const T & d) { data = d; }) > 0; + } + /** * @brief The bounded_push function pushes the data into the queue and pops the oldest data if the queue is full * @param data Data to be pushed diff --git a/test/lock_free_queue_tests.cpp b/test/lock_free_queue_tests.cpp index 93efab43..54327a3f 100644 --- a/test/lock_free_queue_tests.cpp +++ b/test/lock_free_queue_tests.cpp @@ -116,6 +116,22 @@ TEST(LockFreeSPSCQueue, test_pop) ASSERT_FALSE(buffer.pop(obj1)) << "Buffer should be empty"; } +TEST(LockFreeSPSCQueue, test_get_latest) +{ + LockFreeSPSCQueue buffer; + ASSERT_TRUE(buffer.empty()) << "Buffer should be empty"; + for (auto i = 1; i <= 10; i++) { + ASSERT_TRUE(buffer.push(i)) << "Buffer should have space for element as size is 10"; + ASSERT_EQ(i, buffer.size()); + } + double obj1; + ASSERT_TRUE(buffer.get_latest(obj1)); + ASSERT_EQ(10, obj1); + ASSERT_TRUE(buffer.empty()) << "Buffer should be empty"; + ASSERT_EQ(0, buffer.size()); + ASSERT_FALSE(buffer.get_latest(obj1)); +} + TEST(LockFreeSPSCQueue, test_bounded_push) { { @@ -328,6 +344,21 @@ TEST(LockFreeMPMCQueue, test_pop) ASSERT_FALSE(buffer.pop(obj1)) << "Buffer should be empty"; } +TEST(LockFreeMPMCQueue, test_get_latest) +{ + LockFreeMPMCQueue buffer; + ASSERT_TRUE(buffer.empty()) << "Buffer should be empty"; + for (auto i = 1; i <= 10; i++) { + ASSERT_TRUE(buffer.push(i)) << "Buffer should have space for element as size is 10"; + } + ASSERT_FALSE(buffer.empty()) << "Buffer should not be empty"; + double obj1; + ASSERT_TRUE(buffer.get_latest(obj1)); + ASSERT_EQ(10, obj1); + ASSERT_TRUE(buffer.empty()) << "Buffer should be empty"; + ASSERT_FALSE(buffer.get_latest(obj1)); +} + TEST(LockFreeMPMCQueue, test_bounded_push) { { From 356d87ef562a4c1f7021f2132b6be122e1cd38b2 Mon Sep 17 00:00:00 2001 From: Sai Kishor Kothakota Date: Thu, 2 Jan 2025 11:11:05 +0100 Subject: [PATCH 15/32] update CMakeLists.txt --- CMakeLists.txt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 5127302c..821e5665 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -29,6 +29,7 @@ find_package(ament_cmake REQUIRED) foreach(Dependency IN ITEMS ${THIS_PACKAGE_INCLUDE_DEPENDS}) find_package(${Dependency} REQUIRED) endforeach() +find_package(Boost REQUIRED) add_library(realtime_tools SHARED src/realtime_clock.cpp @@ -75,7 +76,7 @@ if(BUILD_TESTING) target_link_libraries(realtime_buffer_tests realtime_tools) ament_add_gmock(lock_free_queue_tests test/lock_free_queue_tests.cpp) - target_link_libraries(lock_free_queue_tests realtime_tools) + target_link_libraries(lock_free_queue_tests realtime_tools atomic) ament_add_gmock(realtime_clock_tests test/realtime_clock_tests.cpp) target_link_libraries(realtime_clock_tests realtime_tools) From ac7ae53e7523a7bec57c90d461afa929ba77774d Mon Sep 17 00:00:00 2001 From: Sai Kishor Kothakota Date: Thu, 2 Jan 2025 21:56:10 +0100 Subject: [PATCH 16/32] Apply suggestions from code review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Christoph Fröhlich --- include/realtime_tools/lock_free_queue.hpp | 4 ---- package.xml | 2 +- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/include/realtime_tools/lock_free_queue.hpp b/include/realtime_tools/lock_free_queue.hpp index 7e438a23..8cecb5ec 100644 --- a/include/realtime_tools/lock_free_queue.hpp +++ b/include/realtime_tools/lock_free_queue.hpp @@ -201,8 +201,6 @@ class LockFreeQueueBase [[nodiscard]] std::enable_if_t, bool> bounded_push(const U & data) { if (!data_queue_.push(data)) { - // data_queue_.pop(); - // return data_queue_.push(data); T dummy; data_queue_.pop(dummy); return data_queue_.push(data); @@ -237,8 +235,6 @@ class LockFreeQueueBase [[nodiscard]] std::enable_if_t, bool> bounded_push(const U & data) { if (!data_queue_.bounded_push(data)) { - // data_queue_.pop(); - // return data_queue_.push(data); T dummy; data_queue_.pop(dummy); return data_queue_.bounded_push(data); diff --git a/package.xml b/package.xml index 3e179b6a..50829129 100644 --- a/package.xml +++ b/package.xml @@ -19,7 +19,7 @@ ament_cmake - libboost-dev + libboost-dev rclcpp rclcpp_action libcap-dev From 66dcdfdc50605449e2285c9a719b10ebada5b147 Mon Sep 17 00:00:00 2001 From: Sai Kishor Kothakota Date: Thu, 2 Jan 2025 21:57:23 +0100 Subject: [PATCH 17/32] Change template arg LockFreeSPSCContainer to LockFreeContainer --- include/realtime_tools/lock_free_queue.hpp | 36 +++++++++++----------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/include/realtime_tools/lock_free_queue.hpp b/include/realtime_tools/lock_free_queue.hpp index 8cecb5ec..7665d047 100644 --- a/include/realtime_tools/lock_free_queue.hpp +++ b/include/realtime_tools/lock_free_queue.hpp @@ -107,9 +107,9 @@ namespace realtime_tools /** * @brief Base class for lock-free queues * @tparam DataType Type of the data to be stored in the queue - * @tparam LockFreeSPSCContainer Type of the lock-free container - Typically boost::lockfree::spsc_queue or boost::lockfree::queue with their own template parameters + * @tparam LockFreeContainer Type of the lock-free container - Typically boost::lockfree::spsc_queue or boost::lockfree::queue with their own template parameters */ -template +template class LockFreeQueueBase { public: @@ -117,22 +117,22 @@ class LockFreeQueueBase /** * @brief Construct a new LockFreeQueueBase object - * @note This constructor is enabled only if the LockFreeSPSCContainer has a capacity set + * @note This constructor is enabled only if the LockFreeContainer has a capacity set */ template < - bool HasCapacity = has_capacity::value, + bool HasCapacity = has_capacity::value, typename std::enable_if_t = 0> - LockFreeQueueBase() : capacity_(get_boost_lockfree_queue_capacity::value) + LockFreeQueueBase() : capacity_(get_boost_lockfree_queue_capacity::value) { } /** * @brief Construct a new LockFreeQueueBase object * @param capacity Capacity of the queue - * @note This constructor is enabled only if the LockFreeSPSCContainer has no capacity set + * @note This constructor is enabled only if the LockFreeContainer has no capacity set */ template < - bool HasCapacity = has_capacity::value, + bool HasCapacity = has_capacity::value, typename std::enable_if_t = 1> explicit LockFreeQueueBase(std::size_t capacity) : data_queue_(capacity), capacity_(capacity) { @@ -196,7 +196,7 @@ class LockFreeQueueBase * single consumer queue. So, the behaviour might not be as expected. */ template < - typename U, bool IsSPSCQueue = is_spsc_queue::value, + typename U, bool IsSPSCQueue = is_spsc_queue::value, typename std::enable_if_t = 0> [[nodiscard]] std::enable_if_t, bool> bounded_push(const U & data) { @@ -230,7 +230,7 @@ class LockFreeQueueBase * @note Can be used in a multi threaded applications */ template < - typename U, bool IsSPSCQueue = is_spsc_queue::value, + typename U, bool IsSPSCQueue = is_spsc_queue::value, typename std::enable_if_t = 1> [[nodiscard]] std::enable_if_t, bool> bounded_push(const U & data) { @@ -250,7 +250,7 @@ class LockFreeQueueBase * @note Should only be called from the consumer thread where pop is called */ template < - bool IsSPSCQueue = is_spsc_queue::value, + bool IsSPSCQueue = is_spsc_queue::value, typename std::enable_if_t = 0> bool empty() const { @@ -266,7 +266,7 @@ class LockFreeQueueBase * @note This function is enabled only if the queue is of multiple producer and multiple consumer type */ template < - bool IsSPSCQueue = is_spsc_queue::value, + bool IsSPSCQueue = is_spsc_queue::value, typename std::enable_if_t = 1> bool empty() const { @@ -285,7 +285,7 @@ class LockFreeQueueBase * @note This function is enabled only if the queue is a spsc_queue */ template < - bool IsSPSCQueue = is_spsc_queue::value, + bool IsSPSCQueue = is_spsc_queue::value, typename std::enable_if_t = 0> std::size_t size() const { @@ -305,7 +305,7 @@ class LockFreeQueueBase */ bool is_lock_free() const { - if constexpr (is_spsc_queue::value) { + if constexpr (is_spsc_queue::value) { return true; } else { return data_queue_.is_lock_free(); @@ -314,18 +314,18 @@ class LockFreeQueueBase /** * @brief Get the lockfree container - * @return const LockFreeSPSCContainer& Reference to the lockfree container + * @return const LockFreeContainer& Reference to the lockfree container */ - const LockFreeSPSCContainer & get_lockfree_container() const { return data_queue_; } + const LockFreeContainer & get_lockfree_container() const { return data_queue_; } /** * @brief Get the lockfree container - * @return LockFreeSPSCContainer& Reference to the lockfree container + * @return LockFreeContainer& Reference to the lockfree container */ - LockFreeSPSCContainer & get_lockfree_container() { return data_queue_; } + LockFreeContainer & get_lockfree_container() { return data_queue_; } private: - LockFreeSPSCContainer data_queue_; + LockFreeContainer data_queue_; std::size_t capacity_; }; // class From 5d990c067377ee744494a6a612d7866571e067b6 Mon Sep 17 00:00:00 2001 From: Christoph Froehlich Date: Thu, 2 Jan 2025 20:58:35 +0000 Subject: [PATCH 18/32] Install boost --- .github/workflows/rolling-binary-build-win.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/rolling-binary-build-win.yml b/.github/workflows/rolling-binary-build-win.yml index 836da241..e4fab850 100644 --- a/.github/workflows/rolling-binary-build-win.yml +++ b/.github/workflows/rolling-binary-build-win.yml @@ -27,8 +27,9 @@ jobs: # (github.event_name == 'issue_comment' && contains(github.event.comment.body, '/check-windows')) || # (github.event_name == 'pull_request' && contains(github.event.label.name, 'check-windows')) || # (github.event_name == 'workflow_dispatch') - uses: ros-controls/ros2_control_ci/.github/workflows/reusable-ros-tooling-win-build.yml@master + uses: ros-controls/ros2_control_ci/.github/workflows/reusable-ros-tooling-win-build.yml@windows_ci_boost with: ros_distro: jazzy ref_for_scheduled_build: master os_name: windows-2019 + install_boost: true From 71321b30e8d80f17a4e3121508ef215bd45a77de Mon Sep 17 00:00:00 2001 From: Sai Kishor Kothakota Date: Thu, 2 Jan 2025 22:02:47 +0100 Subject: [PATCH 19/32] cleanup the commented tests --- test/lock_free_queue_tests.cpp | 46 ---------------------------------- 1 file changed, 46 deletions(-) diff --git a/test/lock_free_queue_tests.cpp b/test/lock_free_queue_tests.cpp index 54327a3f..cfef6e2a 100644 --- a/test/lock_free_queue_tests.cpp +++ b/test/lock_free_queue_tests.cpp @@ -219,52 +219,6 @@ TEST(LockFreeSPSCQueue, test_lockfree_queue_push) ASSERT_EQ(consumer_count, iterations); } -// TEST(LockFreeSPSCQueue, test_lockfree_queue_bounded_push) -// { -// LockFreeSPSCQueue queue; -// std::atomic_int producer_count = 0; -// std::atomic_int consumer_count(0); -// std::atomic_bool done(false); - -// const int iterations = 1000000; - -// std::thread producer([&]() { -// for (auto j = 0; j < iterations; ++j) { -// ASSERT_TRUE(queue.bounded_push(j)); -// ASSERT_EQ(100, queue.capacity()); -// ++producer_count; -// } -// }); - -// std::this_thread::sleep_for(std::chrono::milliseconds(100)); - -// std::cerr << "producer_count: " << producer_count << std::endl; - -// std::thread consumer([&]() { -// int value; -// while (!done) { -// while (queue.pop(value)) { -// ++consumer_count; -// std::this_thread::yield(); -// } -// } -// while (queue.pop(value)) { -// ++consumer_count; -// } -// }); - -// producer.join(); -// done = true; -// consumer.join(); - -// std::cerr << "producer_count: " << producer_count << std::endl; -// std::cerr << "consumer_count: " << consumer_count << std::endl; -// std::cerr << "iterations: " << iterations << std::endl; -// ASSERT_GT(producer_count, consumer_count); -// ASSERT_EQ(producer_count, iterations); -// ASSERT_GT(iterations, consumer_count); -// } - TEST(LockFreeMPMCQueue, default_construct) { { From 4f6162445c51855d41d9a97b27c8debeb702d2e5 Mon Sep 17 00:00:00 2001 From: Sai Kishor Kothakota Date: Fri, 3 Jan 2025 09:22:24 +0100 Subject: [PATCH 20/32] Update include/realtime_tools/lock_free_queue.hpp Co-authored-by: Erick G. Islas-Osuna --- include/realtime_tools/lock_free_queue.hpp | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/include/realtime_tools/lock_free_queue.hpp b/include/realtime_tools/lock_free_queue.hpp index 7665d047..e9491d01 100644 --- a/include/realtime_tools/lock_free_queue.hpp +++ b/include/realtime_tools/lock_free_queue.hpp @@ -305,11 +305,7 @@ class LockFreeQueueBase */ bool is_lock_free() const { - if constexpr (is_spsc_queue::value) { - return true; - } else { - return data_queue_.is_lock_free(); - } + return (is_spsc_queue::value) || data_queue_.is_lock_free(); } /** From 9a8acdf68e1ca9daf8ca9ce5a3d352ee8fe5de40 Mon Sep 17 00:00:00 2001 From: Sai Kishor Kothakota Date: Fri, 3 Jan 2025 09:28:59 +0100 Subject: [PATCH 21/32] Update tests --- test/lock_free_queue_tests.cpp | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/test/lock_free_queue_tests.cpp b/test/lock_free_queue_tests.cpp index cfef6e2a..19ebb990 100644 --- a/test/lock_free_queue_tests.cpp +++ b/test/lock_free_queue_tests.cpp @@ -417,11 +417,9 @@ TEST(LockFreeMPMCQueue, test_lockfree_queue_bounded_push) std::thread consumer([&]() { int value; - while (!done) { - while (queue.pop(value)) { - ++consumer_count; - std::this_thread::yield(); - } + while (!done && queue.pop(value)) { + ++consumer_count; + std::this_thread::yield(); } while (queue.pop(value)) { ++consumer_count; From ddb5ca990753e4a6da1fcbadffd12bb022ea2d4f Mon Sep 17 00:00:00 2001 From: Sai Kishor Kothakota Date: Fri, 3 Jan 2025 10:18:56 +0100 Subject: [PATCH 22/32] Update condition for bounded push and update documentation --- include/realtime_tools/lock_free_queue.hpp | 60 +++++++++------------- 1 file changed, 24 insertions(+), 36 deletions(-) diff --git a/include/realtime_tools/lock_free_queue.hpp b/include/realtime_tools/lock_free_queue.hpp index e9491d01..14a8a0ab 100644 --- a/include/realtime_tools/lock_free_queue.hpp +++ b/include/realtime_tools/lock_free_queue.hpp @@ -104,13 +104,18 @@ struct get_boost_lockfree_queue_capacity class LockFreeQueueBase +/** + * @brief A base class for lock-free queues. + * + * This class provides a base implementation for lock-free queues with various functionalities + * such as pushing, popping, and checking the state of the queue. It supports both single-producer + * single-consumer (SPSC) and multiple-producer multiple-consumer (MPMC) queues. + * + * @tparam DataType The type of data to be stored in the queue. + * @tparam LockFreeContainer The underlying lock-free container type - Typically boost::lockfree::spsc_queue or boost::lockfree::queue with their own template parameters + */ { public: using T = DataType; @@ -190,20 +195,25 @@ class LockFreeQueueBase * @return false If the data could not be pushed * @note This function is enabled only if the queue is a spsc_queue and only if the data type * is convertible to the template type of the queue - * @note To be used in a single threaded applications - * @warning This method might not work as expected if it is used with 2 different threads one - * doing bounded_push and the other doing pop. In this case, the queue is no more a single producer - * single consumer queue. So, the behaviour might not be as expected. + * @note For a SPSC Queue, to be used in single-threaded applications + * @warning For a SPSC Queue, this method might not work as expected when used in multi-threaded applications + * if it is used with two different threads, one doing bounded_push and the other doing pop. In this case, the + * queue is no longer a single producer single consumer queue. So, the behavior might not be as expected. */ - template < - typename U, bool IsSPSCQueue = is_spsc_queue::value, - typename std::enable_if_t = 0> + template [[nodiscard]] std::enable_if_t, bool> bounded_push(const U & data) { - if (!data_queue_.push(data)) { + const auto bounded_push = [this](const U & info) -> bool { + if constexpr (is_spsc_queue::value) { + return data_queue_.push(info); + } else { + return data_queue_.bounded_push(info); + } + }; + if (!bounded_push(data)) { T dummy; data_queue_.pop(dummy); - return data_queue_.push(data); + return bounded_push(data); } return true; } @@ -220,28 +230,6 @@ class LockFreeQueueBase return data_queue_.consume_all([&data](const T & d) { data = d; }) > 0; } - /** - * @brief The bounded_push function pushes the data into the queue and pops the oldest data if the queue is full - * @param data Data to be pushed - * @return true If the data is pushed successfully - * @return false If the data could not be pushed - * @note This function is enabled only if the queue is of multiple producer and multiple consumer - * type and only if the data type is convertible to the template type of the queue - * @note Can be used in a multi threaded applications - */ - template < - typename U, bool IsSPSCQueue = is_spsc_queue::value, - typename std::enable_if_t = 1> - [[nodiscard]] std::enable_if_t, bool> bounded_push(const U & data) - { - if (!data_queue_.bounded_push(data)) { - T dummy; - data_queue_.pop(dummy); - return data_queue_.bounded_push(data); - } - return true; - } - /** * @brief Check if the queue is empty * @return true If the queue is empty From 8d1bae4c0f6260c0c7d11b6612540943fcde7af4 Mon Sep 17 00:00:00 2001 From: Sai Kishor Kothakota Date: Fri, 3 Jan 2025 11:21:55 +0100 Subject: [PATCH 23/32] Unify empty method with internal conditioning of SPSC queue or MPMC queue --- include/realtime_tools/lock_free_queue.hpp | 29 ++++++---------------- 1 file changed, 7 insertions(+), 22 deletions(-) diff --git a/include/realtime_tools/lock_free_queue.hpp b/include/realtime_tools/lock_free_queue.hpp index 14a8a0ab..fa347879 100644 --- a/include/realtime_tools/lock_free_queue.hpp +++ b/include/realtime_tools/lock_free_queue.hpp @@ -234,31 +234,16 @@ class LockFreeQueueBase * @brief Check if the queue is empty * @return true If the queue is empty * @return false If the queue is not empty - * @note This function is enabled only if the queue is a spsc_queue - * @note Should only be called from the consumer thread where pop is called - */ - template < - bool IsSPSCQueue = is_spsc_queue::value, - typename std::enable_if_t = 0> - bool empty() const - { - return data_queue_.read_available() == 0; - } - - /** - * @brief Check if the queue is empty - * @return true If the queue is empty - * @return false If the queue is not empty - * @note The result is only accurate, if no other thread modifies the queue. - * Therefore it is rarely practical to use this value in program logic. - * @note This function is enabled only if the queue is of multiple producer and multiple consumer type + * @note The result is only accurate, if no other thread modifies the queue. Therefore, it is rarely practical to use this value in program logic. + * @note For a SPSC Queue, it should only be called from the consumer thread where pop is called. If need to be called from producer thread, use write_available() instead. */ - template < - bool IsSPSCQueue = is_spsc_queue::value, - typename std::enable_if_t = 1> bool empty() const { - return data_queue_.empty(); + if constexpr (is_spsc_queue::value) { + return data_queue_.read_available() == 0; + } else { + return data_queue_.empty(); + } } /** From f90d22bd880bc9b0310d9074217638e1ecc688b2 Mon Sep 17 00:00:00 2001 From: Christoph Froehlich Date: Fri, 3 Jan 2025 10:34:44 +0000 Subject: [PATCH 24/32] Try to set CMP0167 for boost on windows --- CMakeLists.txt | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 821e5665..99bcd660 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -29,7 +29,14 @@ find_package(ament_cmake REQUIRED) foreach(Dependency IN ITEMS ${THIS_PACKAGE_INCLUDE_DEPENDS}) find_package(${Dependency} REQUIRED) endforeach() -find_package(Boost REQUIRED) + +if(POLICY CMP0167) + cmake_policy(SET CMP0167 NEW) +endif() +find_package(Boost QUIET COMPONENTS headers) +if(NOT Boost_headers_FOUND) + find_package(Boost REQUIRED) +endif() add_library(realtime_tools SHARED src/realtime_clock.cpp From 081d2f66d421f22877c3b63a0ab772c91a1177cc Mon Sep 17 00:00:00 2001 From: Christoph Froehlich Date: Fri, 3 Jan 2025 10:47:44 +0000 Subject: [PATCH 25/32] Remove quiet and add Boost::boost --- CMakeLists.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 99bcd660..4a0a23ac 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -33,7 +33,7 @@ endforeach() if(POLICY CMP0167) cmake_policy(SET CMP0167 NEW) endif() -find_package(Boost QUIET COMPONENTS headers) +find_package(Boost COMPONENTS headers) if(NOT Boost_headers_FOUND) find_package(Boost REQUIRED) endif() @@ -83,7 +83,7 @@ if(BUILD_TESTING) target_link_libraries(realtime_buffer_tests realtime_tools) ament_add_gmock(lock_free_queue_tests test/lock_free_queue_tests.cpp) - target_link_libraries(lock_free_queue_tests realtime_tools atomic) + target_link_libraries(lock_free_queue_tests realtime_tools atomic Boost::boost) ament_add_gmock(realtime_clock_tests test/realtime_clock_tests.cpp) target_link_libraries(realtime_clock_tests realtime_tools) From 2f8d285eb6df401ced5ad0bf2f16a4c3e44c763c Mon Sep 17 00:00:00 2001 From: Christoph Froehlich Date: Fri, 3 Jan 2025 11:01:03 +0000 Subject: [PATCH 26/32] Remove atomic link libraries --- CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 4a0a23ac..22937a50 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -83,7 +83,7 @@ if(BUILD_TESTING) target_link_libraries(realtime_buffer_tests realtime_tools) ament_add_gmock(lock_free_queue_tests test/lock_free_queue_tests.cpp) - target_link_libraries(lock_free_queue_tests realtime_tools atomic Boost::boost) + target_link_libraries(lock_free_queue_tests realtime_tools Boost::boost) ament_add_gmock(realtime_clock_tests test/realtime_clock_tests.cpp) target_link_libraries(realtime_clock_tests realtime_tools) From fff6ee350e352647c49bd04c6bf2fa6c136d8371 Mon Sep 17 00:00:00 2001 From: Christoph Froehlich Date: Fri, 3 Jan 2025 12:18:59 +0000 Subject: [PATCH 27/32] Try to fix msvc atomic --- CMakeLists.txt | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 22937a50..ce6f289c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -16,6 +16,8 @@ if(WIN32) ) # set the same behavior for windows as it is on linux set(CMAKE_WINDOWS_EXPORT_ALL_SYMBOLS ON) + + set(FOUND_LIBATOMIC TRUE) endif() set(THIS_PACKAGE_INCLUDE_DEPENDS @@ -83,7 +85,7 @@ if(BUILD_TESTING) target_link_libraries(realtime_buffer_tests realtime_tools) ament_add_gmock(lock_free_queue_tests test/lock_free_queue_tests.cpp) - target_link_libraries(lock_free_queue_tests realtime_tools Boost::boost) + target_link_libraries(lock_free_queue_tests realtime_tools atomic Boost::boost) ament_add_gmock(realtime_clock_tests test/realtime_clock_tests.cpp) target_link_libraries(realtime_clock_tests realtime_tools) From 2bb651934f9fb05d17251e60acf2e28b9a00074e Mon Sep 17 00:00:00 2001 From: Christoph Froehlich Date: Fri, 3 Jan 2025 12:33:26 +0000 Subject: [PATCH 28/32] Try to fix atomic.lib on msvc https://stackoverflow.com/a/64112384 --- CMakeLists.txt | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index ce6f289c..c0c6c438 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -16,8 +16,6 @@ if(WIN32) ) # set the same behavior for windows as it is on linux set(CMAKE_WINDOWS_EXPORT_ALL_SYMBOLS ON) - - set(FOUND_LIBATOMIC TRUE) endif() set(THIS_PACKAGE_INCLUDE_DEPENDS @@ -84,6 +82,11 @@ if(BUILD_TESTING) ament_add_gmock(realtime_buffer_tests test/realtime_buffer_tests.cpp) target_link_libraries(realtime_buffer_tests realtime_tools) + if(WIN32) + set(HAVE_CXX_ATOMICS64_WITHOUT_LIB True) + set(HAVE_CXX_ATOMICS_WITHOUT_LIB True) + set(FOUND_LIBATOMIC TRUE) + endif() ament_add_gmock(lock_free_queue_tests test/lock_free_queue_tests.cpp) target_link_libraries(lock_free_queue_tests realtime_tools atomic Boost::boost) From 551a9fb0809e4db92d1b1d5fdcd782d755d1ae51 Mon Sep 17 00:00:00 2001 From: Christoph Froehlich Date: Fri, 3 Jan 2025 12:42:00 +0000 Subject: [PATCH 29/32] Don't link atomic on windows --- CMakeLists.txt | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index c0c6c438..9ccbe4c0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -82,13 +82,14 @@ if(BUILD_TESTING) ament_add_gmock(realtime_buffer_tests test/realtime_buffer_tests.cpp) target_link_libraries(realtime_buffer_tests realtime_tools) + ament_add_gmock(lock_free_queue_tests test/lock_free_queue_tests.cpp) if(WIN32) - set(HAVE_CXX_ATOMICS64_WITHOUT_LIB True) - set(HAVE_CXX_ATOMICS_WITHOUT_LIB True) - set(FOUND_LIBATOMIC TRUE) + # atomic is not found on Windows, but also not needed + target_link_libraries(lock_free_queue_tests realtime_tools Boost::boost) + else() + # without adding atomic, clang throws a linker error + target_link_libraries(lock_free_queue_tests realtime_tools atomic Boost::boost) endif() - ament_add_gmock(lock_free_queue_tests test/lock_free_queue_tests.cpp) - target_link_libraries(lock_free_queue_tests realtime_tools atomic Boost::boost) ament_add_gmock(realtime_clock_tests test/realtime_clock_tests.cpp) target_link_libraries(realtime_clock_tests realtime_tools) From 689f63cec6e0fd7c74a615f76f7e73e027d3d3b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20Fr=C3=B6hlich?= Date: Fri, 3 Jan 2025 13:51:44 +0100 Subject: [PATCH 30/32] Use master branch of CI repo --- .github/workflows/rolling-binary-build-win.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/rolling-binary-build-win.yml b/.github/workflows/rolling-binary-build-win.yml index e4fab850..5d0f8b4e 100644 --- a/.github/workflows/rolling-binary-build-win.yml +++ b/.github/workflows/rolling-binary-build-win.yml @@ -27,7 +27,7 @@ jobs: # (github.event_name == 'issue_comment' && contains(github.event.comment.body, '/check-windows')) || # (github.event_name == 'pull_request' && contains(github.event.label.name, 'check-windows')) || # (github.event_name == 'workflow_dispatch') - uses: ros-controls/ros2_control_ci/.github/workflows/reusable-ros-tooling-win-build.yml@windows_ci_boost + uses: ros-controls/ros2_control_ci/.github/workflows/reusable-ros-tooling-win-build.yml@master with: ros_distro: jazzy ref_for_scheduled_build: master From b8becada8a7c7473e8ac9e76de2628d63a62890f Mon Sep 17 00:00:00 2001 From: Sai Kishor Kothakota Date: Thu, 9 Jan 2025 14:27:26 +0100 Subject: [PATCH 31/32] Simplify push method and also add fixed_size arg to the MPMC queue --- include/realtime_tools/lock_free_queue.hpp | 30 +++++++++++++--------- test/lock_free_queue_tests.cpp | 27 ++++++++++++++----- 2 files changed, 38 insertions(+), 19 deletions(-) diff --git a/include/realtime_tools/lock_free_queue.hpp b/include/realtime_tools/lock_free_queue.hpp index fa347879..4131577c 100644 --- a/include/realtime_tools/lock_free_queue.hpp +++ b/include/realtime_tools/lock_free_queue.hpp @@ -173,7 +173,14 @@ class LockFreeQueueBase * @return false If the queue is full or the data could not be pushed */ - [[nodiscard]] bool push(const T & data) { return data_queue_.push(data); } + [[nodiscard]] bool push(const T & data) + { + if constexpr (is_spsc_queue::value) { + return data_queue_.push(data); + } else { + return data_queue_.bounded_push(data); + } + } /** * @brief Push the data into the queue @@ -185,7 +192,11 @@ class LockFreeQueueBase template [[nodiscard]] std::enable_if_t, bool> push(const U & data) { - return data_queue_.push(data); + if constexpr (is_spsc_queue::value) { + return data_queue_.push(data); + } else { + return data_queue_.bounded_push(data); + } } /** @@ -203,17 +214,10 @@ class LockFreeQueueBase template [[nodiscard]] std::enable_if_t, bool> bounded_push(const U & data) { - const auto bounded_push = [this](const U & info) -> bool { - if constexpr (is_spsc_queue::value) { - return data_queue_.push(info); - } else { - return data_queue_.bounded_push(info); - } - }; - if (!bounded_push(data)) { + if (!push(data)) { T dummy; data_queue_.pop(dummy); - return bounded_push(data); + return push(data); } return true; } @@ -317,7 +321,9 @@ using LockFreeSPSCQueue = std::conditional_t< */ template using LockFreeMPMCQueue = std::conditional_t< - Capacity == 0, LockFreeQueueBase>, + Capacity == 0, + LockFreeQueueBase< + DataType, boost::lockfree::queue>>, LockFreeQueueBase< DataType, boost::lockfree::queue< diff --git a/test/lock_free_queue_tests.cpp b/test/lock_free_queue_tests.cpp index 19ebb990..909c14fc 100644 --- a/test/lock_free_queue_tests.cpp +++ b/test/lock_free_queue_tests.cpp @@ -271,14 +271,27 @@ TEST(LockFreeMPMCQueue, initialize_value) TEST(LockFreeMPMCQueue, test_push) { - LockFreeMPMCQueue buffer; - ASSERT_TRUE(buffer.empty()) << "Buffer should be empty"; - for (auto i = 1; i <= 10; i++) { - ASSERT_TRUE(buffer.push(i)) << "Buffer should have space for element as size is 10"; - ASSERT_EQ(10, buffer.capacity()); + { + LockFreeMPMCQueue buffer; + ASSERT_TRUE(buffer.empty()) << "Buffer should be empty"; + for (auto i = 1; i <= 10; i++) { + ASSERT_TRUE(buffer.push(i)) << "Buffer should have space for element as size is 10"; + ASSERT_EQ(10, buffer.capacity()); + } + ASSERT_FALSE(buffer.push(11)) << "Buffer should not have space for element as size is 10"; + ASSERT_FALSE(buffer.empty()); + } + { + LockFreeMPMCQueue buffer(10); + ASSERT_TRUE(buffer.empty()) << "Buffer should be empty"; + for (auto i = 1; i <= 10; i++) { + ASSERT_TRUE(buffer.push(i)) << "Buffer should have space for element as size is 10"; + ASSERT_EQ(10, buffer.capacity()); + } + ASSERT_FALSE(buffer.push(11)) << "Buffer should not have space for element as size is 10"; + ASSERT_FALSE(buffer.push(12)) << "Buffer should not have space for element as size is 10"; + ASSERT_FALSE(buffer.empty()); } - ASSERT_FALSE(buffer.push(11)) << "Buffer should not have space for element as size is 10"; - ASSERT_FALSE(buffer.empty()); } TEST(LockFreeMPMCQueue, test_pop) From bd74a53e491ac0b8ec88bc29e61a29f568f0563c Mon Sep 17 00:00:00 2001 From: Sai Kishor Kothakota Date: Thu, 9 Jan 2025 19:04:21 +0100 Subject: [PATCH 32/32] reuse tests to test all constructable types of LockFreeQueues --- test/lock_free_queue_tests.cpp | 611 +++++++++++++++++---------------- 1 file changed, 306 insertions(+), 305 deletions(-) diff --git a/test/lock_free_queue_tests.cpp b/test/lock_free_queue_tests.cpp index 909c14fc..a4c26dcd 100644 --- a/test/lock_free_queue_tests.cpp +++ b/test/lock_free_queue_tests.cpp @@ -29,113 +29,126 @@ class DefaultConstructable TEST(LockFreeSPSCQueue, default_construct) { - { - LockFreeSPSCQueue buffer(10); + const auto default_construct_test_lambda = [](auto & queue) { DefaultConstructable obj1; - ASSERT_EQ(10, buffer.capacity()); - ASSERT_TRUE(buffer.empty()) << "Buffer should be empty"; - ASSERT_FALSE(buffer.pop(obj1)) << "Buffer should be empty"; - ASSERT_TRUE(buffer.push(DefaultConstructable())) << "Buffer should have space for one element"; - ASSERT_EQ(10, buffer.capacity()); - ASSERT_EQ(1, buffer.size()) << "Buffer should have one element"; - ASSERT_TRUE(buffer.pop(obj1)); - ASSERT_TRUE(buffer.is_lock_free()); - ASSERT_EQ(10, buffer.capacity()); - ASSERT_EQ(42, obj1.number_); - } - { - LockFreeSPSCQueue buffer; - DefaultConstructable obj1; - ASSERT_EQ(10, buffer.capacity()); - ASSERT_TRUE(buffer.empty()) << "Buffer should be empty"; - ASSERT_FALSE(buffer.pop(obj1)) << "Buffer should be empty"; - ASSERT_TRUE(buffer.push(DefaultConstructable())) << "Buffer should have space for one element"; - ASSERT_EQ(10, buffer.capacity()); - ASSERT_EQ(1, buffer.size()) << "Buffer should have one element"; - ASSERT_TRUE(buffer.pop(obj1)); - ASSERT_TRUE(buffer.is_lock_free()); - ASSERT_EQ(10, buffer.capacity()); + ASSERT_EQ(10, queue.capacity()); + ASSERT_TRUE(queue.empty()) << "Buffer should be empty"; + ASSERT_FALSE(queue.pop(obj1)) << "Buffer should be empty"; + ASSERT_TRUE(queue.push(DefaultConstructable())) << "Buffer should have space for one element"; + ASSERT_EQ(10, queue.capacity()); + ASSERT_EQ(1, queue.size()) << "Buffer should have one element"; + ASSERT_TRUE(queue.pop(obj1)); + ASSERT_TRUE(queue.is_lock_free()); + ASSERT_EQ(10, queue.capacity()); ASSERT_EQ(42, obj1.number_); - } + }; + + LockFreeSPSCQueue queue_1(10); + LockFreeSPSCQueue queue_2; + default_construct_test_lambda(queue_1); + default_construct_test_lambda(queue_2); } TEST(LockFreeSPSCQueue, initialize_value) { - LockFreeSPSCQueue buffer; - ASSERT_TRUE(buffer.empty()) << "Buffer should be empty"; - ASSERT_TRUE(buffer.push(3.14)) << "Buffer should have space for one element"; - ASSERT_EQ(1, buffer.size()) << "Buffer should have one element"; - ASSERT_TRUE(buffer.is_lock_free()); - double obj1; - ASSERT_TRUE(buffer.pop(obj1)); - ASSERT_DOUBLE_EQ(3.14, obj1); - ASSERT_EQ(0, buffer.size()) << "Buffer should be empty"; - ASSERT_TRUE(buffer.empty()); - ASSERT_TRUE(buffer.push(2.71)) << "Buffer should have space for one element"; - ASSERT_EQ(1, buffer.size()) << "Buffer should have one element"; - int obj2; - ASSERT_TRUE(buffer.pop(obj2)); - ASSERT_EQ(2, obj2); - ASSERT_EQ(0, buffer.size()) << "Buffer should be empty"; - ASSERT_TRUE(buffer.empty()); - ASSERT_TRUE(buffer.push(6)); - ASSERT_TRUE(buffer.pop(obj1)); - ASSERT_EQ(6, obj1); + const auto initialize_value_test = [](auto & queue) { + ASSERT_TRUE(queue.empty()) << "Buffer should be empty"; + ASSERT_TRUE(queue.push(3.14)) << "Buffer should have space for one element"; + ASSERT_EQ(1, queue.size()) << "Buffer should have one element"; + ASSERT_TRUE(queue.is_lock_free()); + double obj1; + ASSERT_TRUE(queue.pop(obj1)); + ASSERT_DOUBLE_EQ(3.14, obj1); + ASSERT_EQ(0, queue.size()) << "Buffer should be empty"; + ASSERT_TRUE(queue.empty()); + ASSERT_TRUE(queue.push(2.71)) << "Buffer should have space for one element"; + ASSERT_EQ(1, queue.size()) << "Buffer should have one element"; + int obj2; + ASSERT_TRUE(queue.pop(obj2)); + ASSERT_EQ(2, obj2); + ASSERT_EQ(0, queue.size()) << "Buffer should be empty"; + ASSERT_TRUE(queue.empty()); + ASSERT_TRUE(queue.push(6)); + ASSERT_TRUE(queue.pop(obj1)); + ASSERT_EQ(6, obj1); + }; + + LockFreeSPSCQueue queue_1; + LockFreeSPSCQueue queue_2(10); + initialize_value_test(queue_1); + initialize_value_test(queue_2); } TEST(LockFreeSPSCQueue, test_push) { - LockFreeSPSCQueue buffer; - ASSERT_TRUE(buffer.empty()) << "Buffer should be empty"; - for (auto i = 1; i <= 10; i++) { - ASSERT_TRUE(buffer.push(i)) << "Buffer should have space for element as size is 10"; - ASSERT_EQ(i, buffer.size()); - ASSERT_EQ(10, buffer.capacity()); - } - ASSERT_FALSE(buffer.push(11)) << "Buffer should not have space for element as size is 10"; - ASSERT_EQ(10, buffer.size()); - ASSERT_FALSE(buffer.empty()); + const auto push_test_lambda = [](auto & queue) { + ASSERT_TRUE(queue.empty()) << "Buffer should be empty"; + for (auto i = 1; i <= 10; i++) { + ASSERT_TRUE(queue.push(i)) << "Buffer should have space for element as size is 10"; + ASSERT_EQ(i, queue.size()); + ASSERT_EQ(10, queue.capacity()); + } + ASSERT_FALSE(queue.push(11)) << "Buffer should not have space for element as size is 10"; + ASSERT_EQ(10, queue.size()); + ASSERT_FALSE(queue.empty()); + }; + + LockFreeSPSCQueue queue_1; + LockFreeSPSCQueue queue_2(10); + push_test_lambda(queue_1); + push_test_lambda(queue_2); } TEST(LockFreeSPSCQueue, test_pop) { - LockFreeSPSCQueue buffer; - ASSERT_TRUE(buffer.empty()) << "Buffer should be empty"; - for (auto i = 1; i <= 10; i++) { - ASSERT_TRUE(buffer.push(i)) << "Buffer should have space for element as size is 10"; - ASSERT_EQ(i, buffer.size()); - } - for (auto i = 1; i <= 10; i++) { + const auto pop_test_lambda = [](auto & queue) { + ASSERT_TRUE(queue.empty()) << "Buffer should be empty"; + for (auto i = 1; i <= 10; i++) { + ASSERT_TRUE(queue.push(i)) << "Buffer should have space for element as size is 10"; + ASSERT_EQ(i, queue.size()); + } + for (auto i = 1; i <= 10; i++) { + double obj1 = -10; + ASSERT_TRUE(queue.pop(obj1)); + ASSERT_EQ(i, obj1); + ASSERT_EQ(10 - i, queue.size()); + } double obj1; - ASSERT_TRUE(buffer.pop(obj1)); - ASSERT_EQ(i, obj1); - ASSERT_EQ(10 - i, buffer.size()); - } - ASSERT_TRUE(buffer.empty()) << "Buffer should be empty"; - double obj1; - ASSERT_FALSE(buffer.pop(obj1)) << "Buffer should be empty"; + ASSERT_TRUE(queue.empty()) << "Buffer should be empty"; + ASSERT_FALSE(queue.pop(obj1)) << "Buffer should be empty"; + }; + + LockFreeSPSCQueue queue_1; + LockFreeSPSCQueue queue_2(10); + pop_test_lambda(queue_1); + pop_test_lambda(queue_2); } TEST(LockFreeSPSCQueue, test_get_latest) { - LockFreeSPSCQueue buffer; - ASSERT_TRUE(buffer.empty()) << "Buffer should be empty"; - for (auto i = 1; i <= 10; i++) { - ASSERT_TRUE(buffer.push(i)) << "Buffer should have space for element as size is 10"; - ASSERT_EQ(i, buffer.size()); - } - double obj1; - ASSERT_TRUE(buffer.get_latest(obj1)); - ASSERT_EQ(10, obj1); - ASSERT_TRUE(buffer.empty()) << "Buffer should be empty"; - ASSERT_EQ(0, buffer.size()); - ASSERT_FALSE(buffer.get_latest(obj1)); + const auto get_latest_test = [](auto & queue) { + ASSERT_TRUE(queue.empty()) << "Buffer should be empty"; + for (auto i = 1; i <= 10; i++) { + ASSERT_TRUE(queue.push(i)) << "Buffer should have space for element as size is 10"; + ASSERT_EQ(i, queue.size()); + } + double obj1 = -10; + ASSERT_TRUE(queue.get_latest(obj1)); + ASSERT_EQ(10, obj1); + ASSERT_TRUE(queue.empty()) << "Buffer should be empty"; + ASSERT_EQ(0, queue.size()); + ASSERT_FALSE(queue.get_latest(obj1)); + }; + + LockFreeSPSCQueue queue_1; + LockFreeSPSCQueue queue_2(10); + get_latest_test(queue_1); + get_latest_test(queue_2); } TEST(LockFreeSPSCQueue, test_bounded_push) { - { - LockFreeSPSCQueue queue; + const auto bounded_push_test = [](auto & queue) { ASSERT_TRUE(queue.empty()) << "Buffer should be empty"; for (auto i = 1; i <= 25; i++) { ASSERT_TRUE(queue.bounded_push(i)) << "Buffer should have space for element as size is 10"; @@ -155,181 +168,171 @@ TEST(LockFreeSPSCQueue, test_bounded_push) double obj1; ASSERT_FALSE(queue.pop(obj1)); ASSERT_TRUE(queue.empty()) << "Buffer should be empty"; - } - { - LockFreeSPSCQueue queue(10); - ASSERT_TRUE(queue.empty()) << "Buffer should be empty"; - for (auto i = 1; i <= 25; i++) { - ASSERT_TRUE(queue.bounded_push(i)) << "Buffer should have space for element as size is 10"; - ASSERT_EQ(std::min(i, 10), queue.size()); - ASSERT_EQ(10, queue.capacity()); - } - ASSERT_EQ(10, queue.size()); - ASSERT_EQ(10, queue.capacity()); + }; - // when we start popping, the pop-ed elements should start from 16 - for (auto i = 1u; i <= queue.capacity(); i++) { - double obj1; - ASSERT_TRUE(queue.pop(obj1)); - ASSERT_EQ(i + 15, obj1); - ASSERT_EQ(queue.capacity() - i, queue.size()); - } - double obj1; - ASSERT_FALSE(queue.pop(obj1)); - ASSERT_TRUE(queue.empty()) << "Buffer should be empty"; - } + LockFreeSPSCQueue queue_1; + LockFreeSPSCQueue queue_2(10); + bounded_push_test(queue_1); + bounded_push_test(queue_2); } TEST(LockFreeSPSCQueue, test_lockfree_queue_push) { - LockFreeSPSCQueue queue(100); - int producer_count = 0; - std::atomic_int consumer_count(0); - std::atomic_bool done(false); - - const int iterations = 1000000; - - std::thread producer([&]() { - for (int i = 0; i < iterations; ++i) { - while (!queue.push(i)) { - std::this_thread::yield(); + const auto lockfree_queue_push_test = [](auto & queue) { + int producer_count = 0; + std::atomic_int consumer_count(0); + std::atomic_bool done(false); + + const int iterations = 1000000; + + std::thread producer([&]() { + for (int i = 0; i < iterations; ++i) { + while (!queue.push(i)) { + std::this_thread::yield(); + } + ++producer_count; + } + }); + + std::thread consumer([&]() { + int value; + while (!done) { + while (queue.pop(value)) { + ++consumer_count; + } } - ++producer_count; - } - }); - - std::thread consumer([&]() { - int value; - while (!done) { while (queue.pop(value)) { ++consumer_count; } - } - while (queue.pop(value)) { - ++consumer_count; - } - }); + }); + + producer.join(); + done = true; + consumer.join(); - producer.join(); - done = true; - consumer.join(); + ASSERT_EQ(producer_count, consumer_count); + ASSERT_EQ(producer_count, iterations); + ASSERT_EQ(consumer_count, iterations); + }; - ASSERT_EQ(producer_count, consumer_count); - ASSERT_EQ(producer_count, iterations); - ASSERT_EQ(consumer_count, iterations); + LockFreeSPSCQueue queue_1(100); + LockFreeSPSCQueue queue_2; + lockfree_queue_push_test(queue_1); + lockfree_queue_push_test(queue_2); } TEST(LockFreeMPMCQueue, default_construct) { - { - LockFreeMPMCQueue buffer(10); - DefaultConstructable obj1; - ASSERT_EQ(10, buffer.capacity()); - ASSERT_TRUE(buffer.empty()) << "Buffer should be empty"; - ASSERT_FALSE(buffer.pop(obj1)) << "Buffer should be empty"; - ASSERT_TRUE(buffer.push(DefaultConstructable())) << "Buffer should have space for one element"; - ASSERT_EQ(10, buffer.capacity()); - ASSERT_TRUE(buffer.is_lock_free()); - ASSERT_TRUE(buffer.pop(obj1)); - ASSERT_EQ(10, buffer.capacity()); - ASSERT_EQ(42, obj1.number_); - } - { - LockFreeMPMCQueue buffer; + const auto default_construct_test = [](auto & queue) { DefaultConstructable obj1; - ASSERT_EQ(10, buffer.capacity()); - ASSERT_TRUE(buffer.empty()) << "Buffer should be empty"; - ASSERT_FALSE(buffer.pop(obj1)) << "Buffer should be empty"; - ASSERT_TRUE(buffer.push(DefaultConstructable())) << "Buffer should have space for one element"; - ASSERT_EQ(10, buffer.capacity()); - ASSERT_TRUE(buffer.pop(obj1)); - ASSERT_TRUE(buffer.is_lock_free()); - ASSERT_EQ(10, buffer.capacity()); + ASSERT_EQ(10, queue.capacity()); + ASSERT_TRUE(queue.empty()) << "Buffer should be empty"; + ASSERT_FALSE(queue.pop(obj1)) << "Buffer should be empty"; + ASSERT_TRUE(queue.push(DefaultConstructable())) << "Buffer should have space for one element"; + ASSERT_EQ(10, queue.capacity()); + ASSERT_TRUE(queue.is_lock_free()); + ASSERT_TRUE(queue.pop(obj1)); + ASSERT_EQ(10, queue.capacity()); ASSERT_EQ(42, obj1.number_); - } + }; + + LockFreeMPMCQueue queue_1; + LockFreeMPMCQueue queue_2(10); + default_construct_test(queue_1); + default_construct_test(queue_2); } TEST(LockFreeMPMCQueue, initialize_value) { - LockFreeMPMCQueue buffer; - ASSERT_TRUE(buffer.empty()) << "Buffer should be empty"; - ASSERT_TRUE(buffer.push(3.14)) << "Buffer should have space for one element"; - ASSERT_TRUE(buffer.is_lock_free()); - double obj1; - ASSERT_TRUE(buffer.pop(obj1)); - ASSERT_DOUBLE_EQ(3.14, obj1); - ASSERT_TRUE(buffer.empty()) << "Buffer should be empty"; - ASSERT_TRUE(buffer.push(2.71)) << "Buffer should have space for one element"; - int obj2; - ASSERT_TRUE(buffer.pop(obj2)); - ASSERT_EQ(2, obj2); - ASSERT_TRUE(buffer.empty()); - ASSERT_TRUE(buffer.push(6)); - ASSERT_TRUE(buffer.pop(obj1)); - ASSERT_EQ(6, obj1); + const auto initialize_value_test = [](auto & queue) { + ASSERT_TRUE(queue.empty()) << "Buffer should be empty"; + ASSERT_TRUE(queue.push(3.14)) << "Buffer should have space for one element"; + ASSERT_TRUE(queue.is_lock_free()); + double obj1; + ASSERT_TRUE(queue.pop(obj1)); + ASSERT_DOUBLE_EQ(3.14, obj1); + ASSERT_TRUE(queue.empty()) << "Buffer should be empty"; + ASSERT_TRUE(queue.push(2.71)) << "Buffer should have space for one element"; + int obj2; + ASSERT_TRUE(queue.pop(obj2)); + ASSERT_EQ(2, obj2); + ASSERT_TRUE(queue.empty()); + ASSERT_TRUE(queue.push(6)); + ASSERT_TRUE(queue.pop(obj1)); + ASSERT_EQ(6, obj1); + }; + + LockFreeMPMCQueue queue_1; + LockFreeMPMCQueue queue_2(10); + initialize_value_test(queue_1); + initialize_value_test(queue_2); } TEST(LockFreeMPMCQueue, test_push) { - { - LockFreeMPMCQueue buffer; - ASSERT_TRUE(buffer.empty()) << "Buffer should be empty"; - for (auto i = 1; i <= 10; i++) { - ASSERT_TRUE(buffer.push(i)) << "Buffer should have space for element as size is 10"; - ASSERT_EQ(10, buffer.capacity()); - } - ASSERT_FALSE(buffer.push(11)) << "Buffer should not have space for element as size is 10"; - ASSERT_FALSE(buffer.empty()); - } - { - LockFreeMPMCQueue buffer(10); - ASSERT_TRUE(buffer.empty()) << "Buffer should be empty"; + const auto push_test_lambda = [](auto & queue) { + ASSERT_TRUE(queue.empty()) << "Buffer should be empty"; for (auto i = 1; i <= 10; i++) { - ASSERT_TRUE(buffer.push(i)) << "Buffer should have space for element as size is 10"; - ASSERT_EQ(10, buffer.capacity()); + ASSERT_TRUE(queue.push(i)) << "Buffer should have space for element as size is 10"; + ASSERT_EQ(10, queue.capacity()); } - ASSERT_FALSE(buffer.push(11)) << "Buffer should not have space for element as size is 10"; - ASSERT_FALSE(buffer.push(12)) << "Buffer should not have space for element as size is 10"; - ASSERT_FALSE(buffer.empty()); - } + ASSERT_FALSE(queue.push(11)) << "Buffer should not have space for element as size is 10"; + ASSERT_FALSE(queue.empty()); + }; + + LockFreeMPMCQueue queue_1; + LockFreeMPMCQueue queue_2(10); + push_test_lambda(queue_1); + push_test_lambda(queue_2); } TEST(LockFreeMPMCQueue, test_pop) { - LockFreeMPMCQueue buffer; - ASSERT_TRUE(buffer.empty()) << "Buffer should be empty"; - for (auto i = 1; i <= 10; i++) { - ASSERT_TRUE(buffer.push(i)) << "Buffer should have space for element as size is 10"; - } - for (auto i = 1; i <= 10; i++) { + const auto pop_test_lambda = [](auto & queue) { + ASSERT_TRUE(queue.empty()) << "Buffer should be empty"; + for (auto i = 1; i <= 10; i++) { + ASSERT_TRUE(queue.push(i)) << "Buffer should have space for element as size is 10"; + } + for (auto i = 1; i <= 10; i++) { + double obj1; + ASSERT_TRUE(queue.pop(obj1)); + ASSERT_EQ(i, obj1); + } + ASSERT_TRUE(queue.empty()) << "Buffer should be empty"; double obj1; - ASSERT_TRUE(buffer.pop(obj1)); - ASSERT_EQ(i, obj1); - } - ASSERT_TRUE(buffer.empty()) << "Buffer should be empty"; - double obj1; - ASSERT_FALSE(buffer.pop(obj1)) << "Buffer should be empty"; + ASSERT_FALSE(queue.pop(obj1)) << "Buffer should be empty"; + }; + + LockFreeMPMCQueue queue_1; + LockFreeMPMCQueue queue_2(10); + pop_test_lambda(queue_1); + pop_test_lambda(queue_2); } TEST(LockFreeMPMCQueue, test_get_latest) { - LockFreeMPMCQueue buffer; - ASSERT_TRUE(buffer.empty()) << "Buffer should be empty"; - for (auto i = 1; i <= 10; i++) { - ASSERT_TRUE(buffer.push(i)) << "Buffer should have space for element as size is 10"; - } - ASSERT_FALSE(buffer.empty()) << "Buffer should not be empty"; - double obj1; - ASSERT_TRUE(buffer.get_latest(obj1)); - ASSERT_EQ(10, obj1); - ASSERT_TRUE(buffer.empty()) << "Buffer should be empty"; - ASSERT_FALSE(buffer.get_latest(obj1)); + const auto get_latest_test_lambda = [](auto & queue) { + ASSERT_TRUE(queue.empty()) << "Buffer should be empty"; + for (auto i = 1; i <= 10; i++) { + ASSERT_TRUE(queue.push(i)) << "Buffer should have space for element as size is 10"; + } + ASSERT_FALSE(queue.empty()) << "Buffer should not be empty"; + double obj1; + ASSERT_TRUE(queue.get_latest(obj1)); + ASSERT_EQ(10, obj1); + ASSERT_TRUE(queue.empty()) << "Buffer should be empty"; + ASSERT_FALSE(queue.get_latest(obj1)); + }; + + LockFreeMPMCQueue queue_1; + LockFreeMPMCQueue queue_2(10); + get_latest_test_lambda(queue_1); + get_latest_test_lambda(queue_2); } TEST(LockFreeMPMCQueue, test_bounded_push) { - { - LockFreeMPMCQueue queue; + const auto bounded_push_test = [](auto & queue) { ASSERT_TRUE(queue.empty()) << "Buffer should be empty"; for (auto i = 1; i <= 25; i++) { ASSERT_TRUE(queue.bounded_push(i)) << "Buffer should have space for element as size is 10"; @@ -339,114 +342,112 @@ TEST(LockFreeMPMCQueue, test_bounded_push) // when we start popping, the pop-ed elements should start from 16 for (auto i = 1u; i <= queue.capacity(); i++) { - double obj1; + double obj1 = -10.0; ASSERT_TRUE(queue.pop(obj1)); ASSERT_EQ(i + 15, obj1); } double obj1; ASSERT_FALSE(queue.pop(obj1)); ASSERT_TRUE(queue.empty()) << "Buffer should be empty"; - } - { - LockFreeMPMCQueue queue(10); - ASSERT_TRUE(queue.empty()) << "Buffer should be empty"; - for (auto i = 1; i <= 25; i++) { - ASSERT_TRUE(queue.bounded_push(i)) << "Buffer should have space for element as size is 10"; - ASSERT_EQ(10, queue.capacity()); - } - ASSERT_EQ(10, queue.capacity()); + }; - // when we start popping, the pop-ed elements should start from 16 - for (auto i = 1u; i <= queue.capacity(); i++) { - double obj1; - ASSERT_TRUE(queue.pop(obj1)); - ASSERT_EQ(i + 15, obj1); - } - double obj1; - ASSERT_FALSE(queue.pop(obj1)); - ASSERT_TRUE(queue.empty()) << "Buffer should be empty"; - } + LockFreeMPMCQueue queue_1; + LockFreeMPMCQueue queue_2(10); + bounded_push_test(queue_1); + bounded_push_test(queue_2); } TEST(LockFreeMPMCQueue, test_lockfree_queue_push) { - LockFreeMPMCQueue queue; - int producer_count = 0; - std::atomic_int consumer_count(0); - std::atomic_bool done(false); - - const int iterations = 1000000; - - std::thread producer([&]() { - for (int i = 0; i < iterations; ++i) { - while (!queue.push(i)) { - std::this_thread::yield(); + const auto test_queue = [](auto & queue) { + int producer_count = 0; + std::atomic_int consumer_count(0); + std::atomic_bool done(false); + + const int iterations = 1000000; + + std::thread producer([&]() { + for (int i = 0; i < iterations; ++i) { + while (!queue.push(i)) { + std::this_thread::yield(); + } + ++producer_count; + } + }); + + std::thread consumer([&]() { + int value; + while (!done) { + while (queue.pop(value)) { + ++consumer_count; + } } - ++producer_count; - } - }); - - std::thread consumer([&]() { - int value; - while (!done) { while (queue.pop(value)) { ++consumer_count; } - } - while (queue.pop(value)) { - ++consumer_count; - } - }); + }); + + producer.join(); + done = true; + consumer.join(); - producer.join(); - done = true; - consumer.join(); + ASSERT_EQ(producer_count, consumer_count); + ASSERT_EQ(producer_count, iterations); + ASSERT_EQ(consumer_count, iterations); + }; - ASSERT_EQ(producer_count, consumer_count); - ASSERT_EQ(producer_count, iterations); - ASSERT_EQ(consumer_count, iterations); + LockFreeMPMCQueue queue_1; + LockFreeMPMCQueue queue_2(100); + test_queue(queue_1); + test_queue(queue_2); } TEST(LockFreeMPMCQueue, test_lockfree_queue_bounded_push) { - LockFreeMPMCQueue queue; - std::atomic_int producer_count = 0; - std::atomic_int consumer_count(0); - std::atomic_bool done(false); - - const int iterations = 1000000; - - std::thread producer([&]() { - for (auto j = 0; j < iterations; ++j) { - ASSERT_TRUE(queue.bounded_push(j)); - ASSERT_EQ(100, queue.capacity()); - ++producer_count; - } - }); + const auto bounded_push_test = [](auto & queue) { + std::atomic_int producer_count = 0; + std::atomic_int consumer_count(0); + std::atomic_bool done(false); + + const int iterations = 1000000; + + std::thread producer([&]() { + for (auto j = 0; j < iterations; ++j) { + ASSERT_TRUE(queue.bounded_push(j)); + ASSERT_EQ(100, queue.capacity()); + ++producer_count; + } + }); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); - std::cerr << "producer_count: " << producer_count << std::endl; + std::cerr << "producer_count: " << producer_count << std::endl; - std::thread consumer([&]() { - int value; - while (!done && queue.pop(value)) { - ++consumer_count; - std::this_thread::yield(); - } - while (queue.pop(value)) { - ++consumer_count; - } - }); - - producer.join(); - done = true; - consumer.join(); - - std::cerr << "producer_count: " << producer_count << std::endl; - std::cerr << "consumer_count: " << consumer_count << std::endl; - std::cerr << "iterations: " << iterations << std::endl; - ASSERT_GT(producer_count, consumer_count); - ASSERT_EQ(producer_count, iterations); - ASSERT_GT(iterations, consumer_count); + std::thread consumer([&]() { + int value; + while (!done && queue.pop(value)) { + ++consumer_count; + std::this_thread::yield(); + } + while (queue.pop(value)) { + ++consumer_count; + } + }); + + producer.join(); + done = true; + consumer.join(); + + std::cerr << "producer_count: " << producer_count << std::endl; + std::cerr << "consumer_count: " << consumer_count << std::endl; + std::cerr << "iterations: " << iterations << std::endl; + ASSERT_GT(producer_count, consumer_count); + ASSERT_EQ(producer_count, iterations); + ASSERT_GT(iterations, consumer_count); + }; + + LockFreeMPMCQueue queue_1; + LockFreeMPMCQueue queue_2(100); + bounded_push_test(queue_1); + bounded_push_test(queue_2); }