From e4b2cf44f5ef025eb6e100b186280582041b9c54 Mon Sep 17 00:00:00 2001 From: David Haim <59602013+David-Haim@users.noreply.github.com> Date: Sun, 21 Apr 2024 17:45:38 +0300 Subject: [PATCH] atomic_wait improvements (#163) --- CMakeLists.txt | 7 +- .../concurrencpp/results/impl/result_state.h | 6 +- .../results/impl/shared_result_state.h | 1 - .../results/result_fwd_declarations.h | 2 +- include/concurrencpp/threads/atomic_wait.h | 167 ++++++++++---- include/concurrencpp/utils/dlist.h | 62 +++++ include/concurrencpp/utils/math_helper.h | 15 ++ source/threads/atomic_wait.cpp | 214 +++++++++++++++++- source/utils/math_helper.cpp | 37 +++ test/CMakeLists.txt | 10 +- test/include/utils/wait_context.h | 2 +- .../tests/thread_tests/atomic_wait_tests.cpp | 81 +++++-- .../tests/util_tests/math_helper_tests.cpp | 67 ++++++ test/source/utils/wait_context.cpp | 5 +- 14 files changed, 587 insertions(+), 89 deletions(-) create mode 100644 include/concurrencpp/utils/dlist.h create mode 100644 include/concurrencpp/utils/math_helper.h create mode 100644 source/utils/math_helper.cpp create mode 100644 test/source/tests/util_tests/math_helper_tests.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index c98057dc..201d386c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -32,7 +32,8 @@ set(concurrencpp_sources source/threads/async_condition_variable.cpp source/threads/thread.cpp source/timers/timer.cpp - source/timers/timer_queue.cpp) + source/timers/timer_queue.cpp + source/utils/math_helper.cpp) set(concurrencpp_headers include/concurrencpp/concurrencpp.h @@ -81,7 +82,9 @@ set(concurrencpp_headers include/concurrencpp/timers/timer.h include/concurrencpp/timers/timer_queue.h include/concurrencpp/utils/bind.h - include/concurrencpp/utils/slist.h) + include/concurrencpp/utils/math_helper.h + include/concurrencpp/utils/slist.h + include/concurrencpp/utils/dlist.h) add_library(concurrencpp ${concurrencpp_headers} ${concurrencpp_sources}) add_library(concurrencpp::concurrencpp ALIAS concurrencpp) diff --git a/include/concurrencpp/results/impl/result_state.h b/include/concurrencpp/results/impl/result_state.h index 5fefa0e1..b31cc3cf 100644 --- a/include/concurrencpp/results/impl/result_state.h +++ b/include/concurrencpp/results/impl/result_state.h @@ -15,7 +15,7 @@ namespace concurrencpp::details { class CRCPP_API result_state_base { public: - enum class pc_status : int32_t { idle, consumer_set, consumer_waiting, consumer_done, producer_done }; + enum class pc_status : uint32_t { idle, consumer_set, consumer_waiting, consumer_done, producer_done }; protected: std::atomic m_pc_status {pc_status::idle}; @@ -42,6 +42,8 @@ namespace concurrencpp::details { producer_context m_producer; static void delete_self(result_state* state) noexcept { + assert(state != nullptr); + auto done_handle = state->m_done_handle; if (static_cast(done_handle)) { assert(done_handle.done()); @@ -151,7 +153,7 @@ namespace concurrencpp::details { } case pc_status::consumer_waiting: { - return atomic_notify_all(m_pc_status); + return atomic_notify_one(m_pc_status); } case pc_status::consumer_done: { diff --git a/include/concurrencpp/results/impl/shared_result_state.h b/include/concurrencpp/results/impl/shared_result_state.h index 52421191..8d4b1a1e 100644 --- a/include/concurrencpp/results/impl/shared_result_state.h +++ b/include/concurrencpp/results/impl/shared_result_state.h @@ -5,7 +5,6 @@ #include "concurrencpp/results/impl/result_state.h" #include -#include #include diff --git a/include/concurrencpp/results/result_fwd_declarations.h b/include/concurrencpp/results/result_fwd_declarations.h index 5deb6382..719ad0e3 100644 --- a/include/concurrencpp/results/result_fwd_declarations.h +++ b/include/concurrencpp/results/result_fwd_declarations.h @@ -30,7 +30,7 @@ namespace concurrencpp { struct null_result {}; - enum class result_status : int32_t { idle, value, exception }; + enum class result_status : uint32_t { idle, value, exception }; } // namespace concurrencpp namespace concurrencpp::details { diff --git a/include/concurrencpp/threads/atomic_wait.h b/include/concurrencpp/threads/atomic_wait.h index c557739c..69dfdd45 100644 --- a/include/concurrencpp/threads/atomic_wait.h +++ b/include/concurrencpp/threads/atomic_wait.h @@ -3,23 +3,53 @@ #include "concurrencpp/platform_defs.h" -#include #include #include +#include #include namespace concurrencpp::details { - void CRCPP_API atomic_wait_native(void* atom, int32_t old) noexcept; - void CRCPP_API atomic_wait_for_native(void* atom, int32_t old, std::chrono::milliseconds ms) noexcept; - void CRCPP_API atomic_notify_all_native(void* atom) noexcept; enum class atomic_wait_status { ok, timeout }; + template + void atomic_wait(std::atomic& atom, type old, std::memory_order order) noexcept; + + template + atomic_wait_status atomic_wait_for(std::atomic& atom, + type old, + std::chrono::milliseconds ms, + std::memory_order order) noexcept; + + template + void atomic_notify_one(std::atomic& atom) noexcept; + + template + void atomic_notify_all(std::atomic& atom) noexcept; + + template + void assert_atomic_type_waitable() noexcept { + static_assert(std::is_integral_v || std::is_enum_v, + "atomic_wait/atomic_notify - <> must be integeral or enumeration type"); + static_assert(sizeof(type) == sizeof(uint32_t), "atomic_wait/atomic_notify - <> must be 4 bytes."); + static_assert(std::is_standard_layout_v>, + "atomic_wait/atomic_notify - std::atom is not standard-layout"); + static_assert(std::atomic::is_always_lock_free, "atomic_wait/atomic_notify - std::atom is not lock free"); + } +} // namespace concurrencpp::details + +#if !defined(CRCPP_MAC_OS) + +namespace concurrencpp::details { + void CRCPP_API atomic_wait_native(void* atom, uint32_t old) noexcept; + void CRCPP_API atomic_wait_for_native(void* atom, uint32_t old, std::chrono::milliseconds ms) noexcept; + void CRCPP_API atomic_notify_one_native(void* atom) noexcept; + void CRCPP_API atomic_notify_all_native(void* atom) noexcept; + template void atomic_wait(std::atomic& atom, type old, std::memory_order order) noexcept { - static_assert(std::is_standard_layout_v>, "atomic_wait - std::atom is not standard-layout"); - static_assert(sizeof(type) == sizeof(int32_t), "atomic_wait - <> must be 4 bytes."); + assert_atomic_type_waitable(); while (true) { const auto val = atom.load(order); @@ -27,11 +57,7 @@ namespace concurrencpp::details { return; } -#if defined(CRCPP_MAC_OS) - atom.wait(old, order); -#else - atomic_wait_native(&atom, static_cast(old)); -#endif + atomic_wait_native(&atom, static_cast(old)); } } @@ -40,14 +66,10 @@ namespace concurrencpp::details { type old, std::chrono::milliseconds ms, std::memory_order order) noexcept { - static_assert(std::is_standard_layout_v>, "atomic_wait_for - std::atom is not standard-layout"); - static_assert(sizeof(type) == sizeof(int32_t), "atomic_wait_for - <> must be 4 bytes."); + assert_atomic_type_waitable(); const auto deadline = std::chrono::system_clock::now() + ms; -#if defined(CRCPP_MAC_OS) - size_t polling_cycle = 0; -#endif while (true) { if (atom.load(order) != old) { return atomic_wait_status::ok; @@ -62,45 +84,100 @@ namespace concurrencpp::details { return atomic_wait_status::timeout; } -#if defined(CRCPP_MAC_OS) - if (polling_cycle < 64) { - std::this_thread::yield(); - ++polling_cycle; - continue; - } - - if (polling_cycle < 5'000) { - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - ++polling_cycle; - continue; - } - - if (polling_cycle < 10'000) { - std::this_thread::sleep_for(std::chrono::milliseconds(2)); - ++polling_cycle; - continue; - } - - std::this_thread::sleep_for(std::chrono::milliseconds(5)); - ++polling_cycle; - -#else const auto time_diff = std::chrono::duration_cast(deadline - now); assert(time_diff.count() >= 0); - atomic_wait_for_native(&atom, static_cast(old), time_diff); -#endif + atomic_wait_for_native(&atom, static_cast(old), time_diff); } } + template + void atomic_notify_one(std::atomic& atom) noexcept { + atomic_notify_one_native(&atom); + } + template void atomic_notify_all(std::atomic& atom) noexcept { -#if defined(CRCPP_MAC_OS) + atomic_notify_all_native(&atom); + } +} // namespace concurrencpp::details - atom.notify_all(); #else - atomic_notify_all_native(&atom); -#endif + +#include + +namespace concurrencpp::details { + class atomic_wait_bucket; + + using atomic_comp_fn = bool (*)(void*, const uint32_t, std::memory_order) noexcept; + + class CRCPP_API atomic_wait_table { + + private: + std::unique_ptr m_buckets; + const size_t m_size; + + static size_t calc_table_size() noexcept; + size_t index_for(const void* atom) const noexcept; + + public: + atomic_wait_table(); + + void wait(void* atom, const uint32_t old, std::memory_order order, atomic_comp_fn comp); + atomic_wait_status wait_for(void* atom, + const uint32_t old, + std::chrono::milliseconds ms, + std::memory_order order, + atomic_comp_fn comp); + + void notify_one(const void* atom) noexcept; + void notify_all(const void* atom) noexcept; + + static atomic_wait_table& instance(); + }; + + template + void atomic_wait(std::atomic& atom, type old, std::memory_order order) noexcept { + assert_atomic_type_waitable(); + + auto comp = [](void* atom_, const uint32_t old_, std::memory_order order_) noexcept -> bool { + auto& original_atom = *static_cast*>(atom_); + const auto original_old = static_cast(old_); + + return original_atom.load(order_) == original_old; + }; + + atomic_wait_table::instance().wait(&atom, static_cast(old), order, comp); + } + + template + atomic_wait_status atomic_wait_for(std::atomic& atom, + type old, + std::chrono::milliseconds ms, + std::memory_order order) noexcept { + assert_atomic_type_waitable(); + + auto comp = [](void* atom_, const uint32_t old_, std::memory_order order_) noexcept -> bool { + auto& original_atom = *static_cast*>(atom_); + const auto original_old = static_cast(old_); + + return original_atom.load(order_) == original_old; + }; + + return atomic_wait_table::instance().wait_for(&atom, static_cast(old), ms, order, comp); } + + template + void atomic_notify_one(std::atomic& atom) noexcept { + atomic_wait_table::instance().notify_one(&atom); + } + + template + void atomic_notify_all(std::atomic& atom) noexcept { + atomic_wait_table::instance().notify_all(&atom); + } + } // namespace concurrencpp::details #endif + +#endif diff --git a/include/concurrencpp/utils/dlist.h b/include/concurrencpp/utils/dlist.h new file mode 100644 index 00000000..8a7f96fe --- /dev/null +++ b/include/concurrencpp/utils/dlist.h @@ -0,0 +1,62 @@ +#ifndef CONCURRENCPP_DLIST_H +#define CONCURRENCPP_DLIST_H + +#include "concurrencpp/platform_defs.h" + +#include + +namespace concurrencpp::details { + template + class dlist { + + private: + node_type* m_head = nullptr; + + public: + void push_front(node_type& new_node) noexcept { + assert(new_node.next == nullptr); + assert(new_node.prev == nullptr); + + if (m_head != nullptr) { + m_head->prev = &new_node; + new_node.next = m_head; + } + + m_head = &new_node; + } + + void remove_node(node_type& old_node) noexcept { + assert(m_head != nullptr); + assert(old_node.prev != nullptr || old_node.next != nullptr); + + if (old_node.prev != nullptr) { + old_node.prev->next = old_node.next; + } else { + m_head = old_node.next; + } + + if (old_node.next != nullptr) { + old_node.next->prev = old_node.prev; + } + } + + bool empty() const noexcept { + return m_head != nullptr; + } + + template + void for_each(functor_type f) { + auto cursor = m_head; + while (cursor != nullptr) { + const bool should_continue_iteration = f(*cursor); + if (!should_continue_iteration) { + return; + } + + cursor = cursor->next; + } + } + }; +} // namespace concurrencpp::details + +#endif \ No newline at end of file diff --git a/include/concurrencpp/utils/math_helper.h b/include/concurrencpp/utils/math_helper.h new file mode 100644 index 00000000..73f491c8 --- /dev/null +++ b/include/concurrencpp/utils/math_helper.h @@ -0,0 +1,15 @@ +#ifndef CONCURRENCPP_MATH_H +#define CONCURRENCPP_MATH_H + +#include "concurrencpp/platform_defs.h" + +#include + +namespace concurrencpp::details { + struct CRCPP_API math_helper { + static bool is_prime(size_t n) noexcept; + static size_t next_prime(size_t n) noexcept; + }; +} // namespace concurrencpp::details + +#endif \ No newline at end of file diff --git a/source/threads/atomic_wait.cpp b/source/threads/atomic_wait.cpp index 1e7a5c8b..d086e5bd 100644 --- a/source/threads/atomic_wait.cpp +++ b/source/threads/atomic_wait.cpp @@ -11,14 +11,18 @@ # pragma comment(lib, "Synchronization.lib") namespace concurrencpp::details { - void atomic_wait_native(void* atom, int32_t old) noexcept { + void atomic_wait_native(void* atom, uint32_t old) noexcept { ::WaitOnAddress(atom, &old, sizeof(old), INFINITE); } - void atomic_wait_for_native(void* atom, int32_t old, std::chrono::milliseconds ms) noexcept { + void atomic_wait_for_native(void* atom, uint32_t old, std::chrono::milliseconds ms) noexcept { ::WaitOnAddress(atom, &old, sizeof(old), static_cast(ms.count())); } + void atomic_notify_one_native(void* atom) noexcept { + ::WakeByAddressSingle(atom); + } + void atomic_notify_all_native(void* atom) noexcept { ::WakeByAddressAll(atom); } @@ -33,8 +37,8 @@ namespace concurrencpp::details { # include namespace concurrencpp::details { - int futex(void* addr, int32_t op, int32_t old, const timespec* ts) noexcept { - return ::syscall(SYS_futex, addr, op, old, ts, nullptr, 0); + int futex(void* addr, int32_t op, uint32_t val, const timespec* ts) noexcept { + return ::syscall(SYS_futex, addr, op, val, ts, nullptr, 0); } timespec ms_to_time_spec(size_t ms) noexcept { @@ -44,15 +48,19 @@ namespace concurrencpp::details { return req; } - void atomic_wait_native(void* atom, int32_t old) noexcept { + void atomic_wait_native(void* atom, uint32_t old) noexcept { futex(atom, FUTEX_WAIT_PRIVATE, old, nullptr); } - void atomic_wait_for_native(void* atom, int32_t old, std::chrono::milliseconds ms) noexcept { + void atomic_wait_for_native(void* atom, uint32_t old, std::chrono::milliseconds ms) noexcept { auto spec = ms_to_time_spec(ms.count()); futex(atom, FUTEX_WAIT_PRIVATE, old, &spec); } + void atomic_notify_one_native(void* atom) noexcept { + futex(atom, FUTEX_WAKE_PRIVATE, 1, nullptr); + } + void atomic_notify_all_native(void* atom) noexcept { futex(atom, FUTEX_WAKE_PRIVATE, INT_MAX, nullptr); } @@ -60,12 +68,200 @@ namespace concurrencpp::details { #else +# include +# include +# include +# include + +# include "concurrencpp/utils/dlist.h" +# include "concurrencpp/utils/math_helper.h" + namespace concurrencpp::details { - void atomic_wait_native(void* atom, int32_t old) noexcept {} - void atomic_wait_for_native(void* atom, int32_t old, std::chrono::milliseconds ms, size_t* polling_cycle_ptr) noexcept {} + struct waiting_node { + waiting_node* next = nullptr; + waiting_node* prev = nullptr; + + private: + const void* const m_address; + std::condition_variable m_cv; + bool m_notified = false; + + public: + waiting_node(const void* address) noexcept : m_address(address) {} + + void notify_one(std::unique_lock& lock) noexcept { + assert(lock.owns_lock()); + m_notified = true; + + m_cv.notify_one(); + } + + void wait(std::unique_lock& lock) { + assert(lock.owns_lock()); + + m_cv.wait(lock, [this] { + return m_notified; + }); + + assert(m_notified); + } + + void wait_until(std::unique_lock& lock, std::chrono::system_clock::time_point tp) { + assert(lock.owns_lock()); + + m_cv.wait_until(lock, tp, [this] { + return m_notified; + }); + } + + const void* address() const noexcept { + return m_address; + } + }; + + class atomic_wait_bucket { + + private: + std::mutex m_lock; + dlist m_nodes; + + public: + void wait(void* atom, const uint32_t old, std::memory_order order, atomic_comp_fn comp) { + while (true) { + if (!comp(atom, old, order)) { + return; + } + + std::unique_lock lock(m_lock); + if (!comp(atom, old, order)) { + return; + } + + waiting_node node(atom); + m_nodes.push_front(node); + node.wait(lock); + + assert(lock.owns_lock()); + m_nodes.remove_node(node); + } + } + + atomic_wait_status wait_for(void* atom, + const uint32_t old, + std::chrono::milliseconds ms, + std::memory_order order, + atomic_comp_fn comp) { + + const auto later = std::chrono::system_clock::now() + ms; + + while (true) { + if (!comp(atom, old, order)) { + return atomic_wait_status::ok; + } + + if (std::chrono::system_clock::now() >= later) { + if (!comp(atom, old, order)) { + return atomic_wait_status::ok; + } + + return atomic_wait_status::timeout; + } - void atomic_notify_all_native(void* atom) noexcept {} + std::unique_lock lock(m_lock); + if (!comp(atom, old, order)) { + return atomic_wait_status::ok; + } + + waiting_node node(atom); + m_nodes.push_front(node); + node.wait_until(lock, later); + + assert(lock.owns_lock()); + m_nodes.remove_node(node); + } + + return atomic_wait_status::timeout; + } + + void notify_one(const void* atom) noexcept { + std::unique_lock lock(m_lock); + + m_nodes.for_each([&lock, atom](waiting_node& node) noexcept -> bool { + if (node.address() == atom) { + node.notify_one(lock); + return false; + } + + return true; + }); + } + + void notify_all(const void* atom) noexcept { + std::unique_lock lock(m_lock); + + m_nodes.for_each([&lock, atom](waiting_node& node) noexcept -> bool { + if (node.address() == atom) { + node.notify_one(lock); + } + + return true; + }); + } + }; + + /* + atomic_wait_table + */ + + size_t atomic_wait_table::calc_table_size() noexcept { + const auto hc = std::thread::hardware_concurrency(); + if (hc == 0) { + return 37; // heuristic. most modern CPUs have less than 64 cores, and 37 is a prime number + } + + const auto padded_hc = hc * 2; + return math_helper::next_prime(padded_hc); + } + + size_t atomic_wait_table::index_for(const void* atom) const noexcept { + return std::hash()(atom) % m_size; + } + + atomic_wait_table::atomic_wait_table() : m_size(calc_table_size()) { + assert(m_size != 0); + m_buckets = std::make_unique(m_size); + } + + void atomic_wait_table::wait(void* atom, const uint32_t old, std::memory_order order, atomic_comp_fn comp) { + const auto index = index_for(atom); + m_buckets[index].wait(atom, old, order, comp); + } + + atomic_wait_status atomic_wait_table::wait_for(void* atom, + const uint32_t old, + std::chrono::milliseconds ms, + std::memory_order order, + atomic_comp_fn comp) { + + const auto index = index_for(atom); + return m_buckets[index].wait_for(atom, old, ms, order, comp); + } + + void atomic_wait_table::notify_one(const void* atom) noexcept { + const auto index = index_for(atom); + m_buckets[index].notify_one(atom); + } + + void atomic_wait_table::notify_all(const void* atom) noexcept { + const auto index = index_for(atom); + m_buckets[index].notify_all(atom); + } + + atomic_wait_table& atomic_wait_table::instance() { + static atomic_wait_table s_wait_table; + return s_wait_table; + } } // namespace concurrencpp::details #endif diff --git a/source/utils/math_helper.cpp b/source/utils/math_helper.cpp new file mode 100644 index 00000000..f3ec4360 --- /dev/null +++ b/source/utils/math_helper.cpp @@ -0,0 +1,37 @@ +#include "concurrencpp/utils/math_helper.h" + +using concurrencpp::details::math_helper; + +bool math_helper::is_prime(size_t n) noexcept { + if (n <= 1) { + return false; + } + + if (n <= 3) { + return true; + } + if (n % 2 == 0 || n % 3 == 0) { + return false; + } + + for (size_t i = 5; i * i <= n; i += 6) { + if (n % i == 0 || n % (i + 2) == 0) { + return false; + } + } + + return true; +} + +size_t math_helper::next_prime(size_t n) noexcept { + if (n <= 1) { + return 2; + } + + while (!is_prime(n)) { + n++; + } + + return n; +} + diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index e7d0c9eb..08faa4e7 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -104,9 +104,12 @@ function(add_test) ${TEST_PROPERTIES}) endfunction() -add_test(NAME task_tests PATH source/tests/task_tests.cpp) -add_test(NAME runtime_tests PATH source/tests/runtime_tests.cpp) +add_test(NAME math_helper_tests PATH source/tests/util_tests/math_helper_tests.cpp) + +add_test(NAME atomic_wait_tests PATH source/tests/thread_tests/atomic_wait_tests.cpp) + +add_test(NAME task_tests PATH source/tests/task_tests.cpp) add_test(NAME inline_executor_tests PATH source/tests/executor_tests/inline_executor_tests.cpp) add_test(NAME manual_executor_tests PATH source/tests/executor_tests/manual_executor_tests.cpp) add_test(NAME thread_executor_tests PATH source/tests/executor_tests/thread_executor_tests.cpp) @@ -133,7 +136,6 @@ add_test(NAME generator_tests PATH source/tests/result_tests/generator_tests.cpp add_test(NAME coroutine_promise_tests PATH source/tests/coroutine_tests/coroutine_promise_tests.cpp) add_test(NAME coroutine_tests PATH source/tests/coroutine_tests/coroutine_tests.cpp) -add_test(NAME atomic_wait_tests PATH source/tests/thread_tests/atomic_wait_tests.cpp) add_test(NAME async_lock_tests PATH source/tests/thread_tests/async_lock_tests.cpp) add_test(NAME scoped_async_lock_tests PATH source/tests/thread_tests/scoped_async_lock_tests.cpp) add_test(NAME async_condition_variable_tests PATH source/tests/thread_tests/async_condition_variable_tests.cpp) @@ -141,6 +143,8 @@ add_test(NAME async_condition_variable_tests PATH source/tests/thread_tests/asyn add_test(NAME timer_queue_tests PATH source/tests/timer_tests/timer_queue_tests.cpp) add_test(NAME timer_tests PATH source/tests/timer_tests/timer_tests.cpp) +add_test(NAME runtime_tests PATH source/tests/runtime_tests.cpp) + if(NOT ENABLE_THREAD_SANITIZER) return() endif() diff --git a/test/include/utils/wait_context.h b/test/include/utils/wait_context.h index 1e0b984b..87baf4e6 100644 --- a/test/include/utils/wait_context.h +++ b/test/include/utils/wait_context.h @@ -7,7 +7,7 @@ namespace concurrencpp::tests { class wait_context { private: - std::atomic_int m_ready {0}; + std::atomic_uint32_t m_ready {0}; public: void wait(); diff --git a/test/source/tests/thread_tests/atomic_wait_tests.cpp b/test/source/tests/thread_tests/atomic_wait_tests.cpp index 7c18e288..401d4512 100644 --- a/test/source/tests/thread_tests/atomic_wait_tests.cpp +++ b/test/source/tests/thread_tests/atomic_wait_tests.cpp @@ -3,8 +3,6 @@ #include "infra/tester.h" #include "infra/assertions.h" -#include - namespace concurrencpp::tests { void test_atomic_wait(); @@ -13,6 +11,7 @@ namespace concurrencpp::tests { void test_atomic_wait_for_success(); void test_atomic_wait_for(); + void test_atomic_notify_one(); void test_atomic_notify_all(); void test_atomic_mini_load_test(); @@ -21,11 +20,11 @@ namespace concurrencpp::tests { using namespace concurrencpp::tests; void concurrencpp::tests::test_atomic_wait() { - std::atomic_int flag {0}; + std::atomic_uint32_t flag {0}; std::atomic_bool woken {false}; std::thread waiter([&] { - concurrencpp::details::atomic_wait(flag, 0, std::memory_order_acquire); + concurrencpp::details::atomic_wait(flag, uint32_t(0), std::memory_order_acquire); woken = true; }); @@ -52,12 +51,12 @@ void concurrencpp::tests::test_atomic_wait() { void concurrencpp::tests::test_atomic_wait_for_timeout_1() { // timeout has reached - std::atomic_int flag {0}; + std::atomic_uint32_t flag {0}; constexpr auto timeout_ms = 350; const auto before = std::chrono::high_resolution_clock::now(); const auto result = - concurrencpp::details::atomic_wait_for(flag, 0, std::chrono::milliseconds(timeout_ms), std::memory_order_acquire); + concurrencpp::details::atomic_wait_for(flag, uint32_t(0), std::chrono::milliseconds(timeout_ms), std::memory_order_acquire); const auto after = std::chrono::high_resolution_clock::now(); const auto time_diff = std::chrono::duration_cast(after - before).count(); @@ -67,7 +66,7 @@ void concurrencpp::tests::test_atomic_wait_for_timeout_1() { void concurrencpp::tests::test_atomic_wait_for_timeout_2() { // notify was called, value hasn't changed - std::atomic_int flag {0}; + std::atomic_uint32_t flag {0}; constexpr auto timeout_ms = 200; std::thread modifier([&] { @@ -77,7 +76,7 @@ void concurrencpp::tests::test_atomic_wait_for_timeout_2() { const auto before = std::chrono::high_resolution_clock::now(); const auto result = - concurrencpp::details::atomic_wait_for(flag, 0, std::chrono::milliseconds(timeout_ms), std::memory_order_acquire); + concurrencpp::details::atomic_wait_for(flag, uint32_t(0), std::chrono::milliseconds(timeout_ms), std::memory_order_acquire); const auto after = std::chrono::high_resolution_clock::now(); const auto time_diff = std::chrono::duration_cast(after - before).count(); @@ -89,7 +88,7 @@ void concurrencpp::tests::test_atomic_wait_for_timeout_2() { } void concurrencpp::tests::test_atomic_wait_for_success() { - std::atomic_int flag {0}; + std::atomic_uint32_t flag {0}; std::atomic> modification_tp { std::chrono::high_resolution_clock::now() }; std::thread modifier([&] { @@ -99,7 +98,7 @@ void concurrencpp::tests::test_atomic_wait_for_success() { modification_tp.store(std::chrono::high_resolution_clock::now()); }); - const auto result = concurrencpp::details::atomic_wait_for(flag, 0, std::chrono::seconds(10), std::memory_order_acquire); + const auto result = concurrencpp::details::atomic_wait_for(flag, uint32_t(0), std::chrono::seconds(10), std::memory_order_acquire); const auto after = std::chrono::high_resolution_clock::now(); const auto time_diff = std::chrono::duration_cast(after - modification_tp.load()).count(); @@ -115,26 +114,56 @@ void concurrencpp::tests::test_atomic_wait_for() { test_atomic_wait_for_success(); } +void concurrencpp::tests::test_atomic_notify_one() { + std::thread waiters[15]; + std::atomic_size_t woken = 0; + std::atomic_uint32_t flag = 0; + + for (auto& waiter : waiters) { + waiter = std::thread([&] { + concurrencpp::details::atomic_wait(flag, uint32_t(0), std::memory_order_relaxed); + woken.fetch_add(1, std::memory_order_acq_rel); + }); + } + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + flag = 1; + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + assert_equal(woken.load(), 0); + + for (size_t i = 0; i < std::size(waiters); i++) { + concurrencpp::details::atomic_notify_one(flag); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + assert_equal(woken.load(), i + 1); + + } + + for (auto& waiter : waiters) { + waiter.join(); + } +} + void concurrencpp::tests::test_atomic_notify_all() { - std::thread waiters[5]; + std::thread waiters[15]; std::atomic_size_t woken = 0; - std::atomic_int flag = 0; + std::atomic_uint32_t flag = 0; for (auto& waiter : waiters) { waiter = std::thread([&] { - concurrencpp::details::atomic_wait(flag, 0, std::memory_order_relaxed); + concurrencpp::details::atomic_wait(flag, uint32_t(0), std::memory_order_relaxed); woken.fetch_add(1, std::memory_order_acq_rel); }); } - std::this_thread::sleep_for(std::chrono::milliseconds(50)); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); flag = 1; - std::this_thread::sleep_for(std::chrono::milliseconds(50)); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); assert_equal(woken.load(), 0); concurrencpp::details::atomic_notify_all(flag); - std::this_thread::sleep_for(std::chrono::milliseconds(15)); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); assert_equal(woken.load(), std::size(waiters)); for (auto& waiter : waiters) { @@ -143,17 +172,17 @@ void concurrencpp::tests::test_atomic_notify_all() { } void concurrencpp::tests::test_atomic_mini_load_test() { - std::thread waiters[20]; + std::thread waiters[30]; std::thread waiters_for[20]; std::thread wakers[20]; - std::atomic_int32_t atom {0}; + std::atomic_uint32_t atom {0}; - const auto test_timeout = std::chrono::system_clock::now() + std::chrono::seconds(15); + const auto test_timeout = std::chrono::system_clock::now() + std::chrono::seconds(20); for (auto& waiter : waiters) { waiter = std::thread([&] { while (std::chrono::system_clock::now() < test_timeout) { - concurrencpp::details::atomic_wait(atom, 0, std::memory_order_acquire); + concurrencpp::details::atomic_wait(atom, uint32_t(0), std::memory_order_acquire); } }); } @@ -161,7 +190,7 @@ void concurrencpp::tests::test_atomic_mini_load_test() { for (auto& waiter_for : waiters_for) { waiter_for = std::thread([&] { while (std::chrono::system_clock::now() < test_timeout) { - concurrencpp::details::atomic_wait_for(atom, 0, std::chrono::milliseconds(2), std::memory_order_acquire); + concurrencpp::details::atomic_wait_for(atom, uint32_t(0), std::chrono::milliseconds(2), std::memory_order_acquire); } }); } @@ -177,8 +206,13 @@ void concurrencpp::tests::test_atomic_mini_load_test() { } else { atom.store(0, std::memory_order_release); } - concurrencpp::details::atomic_notify_all(atom); - + + if (counter % 3 == 0) { + concurrencpp::details::atomic_notify_all(atom); + } else { + concurrencpp::details::atomic_notify_one(atom); + } + counter++; } }); @@ -205,6 +239,7 @@ int main() { tester.add_step("wait", test_atomic_wait); tester.add_step("wait_for", test_atomic_wait_for); + tester.add_step("notify_one", test_atomic_notify_one); tester.add_step("notify_all", test_atomic_notify_all); tester.add_step("mini load test", test_atomic_mini_load_test); diff --git a/test/source/tests/util_tests/math_helper_tests.cpp b/test/source/tests/util_tests/math_helper_tests.cpp new file mode 100644 index 00000000..9a37b55e --- /dev/null +++ b/test/source/tests/util_tests/math_helper_tests.cpp @@ -0,0 +1,67 @@ +#include "concurrencpp/utils/math_helper.h" + +#include "infra/tester.h" +#include "infra/assertions.h" +#include + +namespace concurrencpp::tests { + void test_math_helper_is_prime(); + void test_math_helper_next_prime(); + + constexpr size_t prime_numbers[] = {2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, + 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, 131, 137, 139, 149, 151, 157, 163, + 167, 173, 179, 181, 191, 193, 197, 199, 211, 223, 227, 229, 233, 239, 241, 251, 257, 263, 269, + 271, 277, 281, 283, 293, 307, 311, 313, 317, 331, 337, 347, 349, 353, 359, 367, 373, 379, 383, + 389, 397, 401, 409, 419, 421, 431, 433, 439, 443, 449, 457, 461, 463, 467, 479, 487, 491, 499}; + + constexpr size_t non_prime_numbers[] = { + 4, 6, 8, 9, 10, 12, 14, 15, 16, 18, 20, 21, 22, 24, 25, 26, 27, 28, 30, 32, 33, + 34, 35, 36, 38, 39, 40, 42, 44, 45, 46, 48, 49, 50, + 51, 52, 54, 55, 56, 57, 58, 60, 62, 63, 64, 65, 66, 68, 69, 70, 72, 74, 75, 76, 77, + 78, 80, 81, 82, 84, 85, 86, 87, 88, 90, 91, 92, 93, 94, 95, 96, 98, 99, 100, + 102, 104, 105, 106, 108, 110, 111, 112, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, + 128, 129, 130, 132, 133, 134, 135, 136, 138, 140, 141, 142, 143, 144, 145, 146, 147, 148, 150, + 152, 153, 154, 155, 156, 158, 159, 160, 161, 162, 164, 165, 166, 168, 169, 170, 171, 172, 174, 175, 176, + 177, 178, 180, 182, 183, 184, 185, 186, 187, 188, 189, 190, 192, 194, 195, 196, 198, 200, + 201, 202, 203, 204, 205, 206, 207, 208, 209, 210, 212, 213, 214, 215, 216, 217, 218, 219, 220, 221, 222, + 224, 225, 226, 228, 230, 231, 232, 234, 235, 236, 237, 238, 240, 242, 243, 244, 245, 246, 247, 248, 249, + 250, 252, 253, 254, 255, 256, 258, 259, 260, 261, 262, 264, 265, 266, 267, 268, 270, 272, 273, 274, 275, + 276, 278, 279, 280, 282, 284, 285, 286, 287, 288, 289, 290, 291, 292, 294, 295, 296, 297, 298, 299}; +} // namespace concurrencpp::tests + +using namespace concurrencpp::tests; +using concurrencpp::details::math_helper; + + +void concurrencpp::tests::test_math_helper_is_prime() { + for (const auto i : prime_numbers) { + assert_true(math_helper::is_prime(i)); + } + + for (const auto i : non_prime_numbers) { + assert_false(math_helper::is_prime(i)); + } +} + +void concurrencpp::tests::test_math_helper_next_prime() { + size_t last_prime_number = 0; + + for (const auto prime_number : prime_numbers) { + for (auto start = last_prime_number + 1; start < prime_number; start++) { + const auto next_prime = math_helper::next_prime(start); + assert_equal(next_prime, prime_number); + } + + last_prime_number = prime_number; + } +} + +int main() { + tester tester("math_helper test"); + + tester.add_step("is_prime", test_math_helper_is_prime); + tester.add_step("next_prime", test_math_helper_next_prime); + + tester.launch_test(); + return 0; +} diff --git a/test/source/utils/wait_context.cpp b/test/source/utils/wait_context.cpp index d258b5a8..6dda1a77 100644 --- a/test/source/utils/wait_context.cpp +++ b/test/source/utils/wait_context.cpp @@ -5,12 +5,13 @@ using concurrencpp::tests::wait_context; void wait_context::wait() { - details::atomic_wait(m_ready, 0, std::memory_order_relaxed); + details::atomic_wait(m_ready, uint32_t(0), std::memory_order_relaxed); assert(m_ready.load(std::memory_order_relaxed)); } bool wait_context::wait_for(size_t milliseconds) { - const auto res = details::atomic_wait_for(m_ready, 0, std::chrono::milliseconds(milliseconds), std::memory_order_relaxed); + const auto res = + details::atomic_wait_for(m_ready, uint32_t(0), std::chrono::milliseconds(milliseconds), std::memory_order_relaxed); return res == details::atomic_wait_status::ok; }