From 519d694959e2c757918f299a51407d58e80bc6ed Mon Sep 17 00:00:00 2001 From: David-Haim Date: Mon, 22 Apr 2024 13:59:50 +0300 Subject: [PATCH] thread_pool_improvements --- source/executors/thread_pool_executor.cpp | 68 +++++++++++------------ 1 file changed, 34 insertions(+), 34 deletions(-) diff --git a/source/executors/thread_pool_executor.cpp b/source/executors/thread_pool_executor.cpp index ac004460..6a569621 100644 --- a/source/executors/thread_pool_executor.cpp +++ b/source/executors/thread_pool_executor.cpp @@ -44,10 +44,11 @@ namespace concurrencpp::details { const std::function m_thread_terminated_callback; void balance_work(); - bool drain_queue_impl(); + bool drain_private_queue(); void work_loop(); + void notify_task_exists() noexcept; void ensure_worker_active(std::unique_lock& lock); void mark_worker_stale(); @@ -86,14 +87,7 @@ namespace concurrencpp::details { ensure_worker_active(lock); } - auto expected = details::executor_status::idle; - const auto swapped = m_status.compare_exchange_strong(expected, - details::executor_status::task_available, - std::memory_order_relaxed, - std::memory_order_relaxed); - if (swapped) { - details::atomic_notify_one(m_status); - } + notify_task_exists(); } }; } // namespace concurrencpp::details @@ -196,9 +190,9 @@ thread_pool_worker::thread_pool_worker(thread_pool_executor& parent_pool, } thread_pool_worker::thread_pool_worker(thread_pool_worker&& rhs) noexcept : - m_parent_pool(rhs.m_parent_pool), m_index(rhs.m_index), m_pool_size(rhs.m_pool_size), m_max_idle_time(rhs.m_max_idle_time), + m_parent_pool(rhs.m_parent_pool), m_index(0), m_pool_size(0), m_max_idle_time(std::chrono::seconds(0)), m_status(executor_status::idle), m_worker_stale(true) { - std::abort(); // shouldn't be called + std::abort(); // shouldn't be called, this ctor is only so vector::emplace_back won't conpmail } thread_pool_worker::~thread_pool_worker() noexcept { @@ -274,12 +268,11 @@ void thread_pool_worker::balance_work() { m_idle_worker_list.clear(); } -bool thread_pool_worker::drain_queue_impl() { +bool thread_pool_worker::drain_private_queue() { while (!m_private_queue.empty()) { balance_work(); if (m_atomic_abort.load(std::memory_order_relaxed)) { - mark_worker_stale(); return false; } @@ -303,7 +296,7 @@ void thread_pool_worker::work_loop() { auto status = m_status.load(std::memory_order_relaxed); if (status == details::executor_status::abort) { - return mark_worker_stale(); + break; } if (status == details::executor_status::task_available) { @@ -313,7 +306,7 @@ void thread_pool_worker::work_loop() { std::memory_order_relaxed); if (!swapped) { - continue; // abort + continue; // m_status is either "abort" or "task_available" } { @@ -321,8 +314,8 @@ void thread_pool_worker::work_loop() { std::swap(m_private_queue, m_public_queue); // reuse underlying allocations. } - if (!drain_queue_impl()) { - return; + if (!drain_private_queue()) { + break; } continue; @@ -332,13 +325,14 @@ void thread_pool_worker::work_loop() { const auto wait_status = atomic_wait_for(m_status, details::executor_status::idle, m_max_idle_time, std::memory_order_relaxed); - if (wait_status == atomic_wait_status::timeout) { - return mark_worker_stale(); + if (wait_status == atomic_wait_status::timeout) { + break; } } } catch (const errors::runtime_shutdown&) { - mark_worker_stale(); } + + mark_worker_stale(); } void thread_pool_worker::ensure_worker_active(std::unique_lock& lock) { @@ -371,6 +365,17 @@ void concurrencpp::details::thread_pool_worker::mark_worker_stale() { m_worker_stale = true; } +void concurrencpp::details::thread_pool_worker::notify_task_exists() noexcept { + auto expected = details::executor_status::idle; + const auto swapped = m_status.compare_exchange_strong(expected, + details::executor_status::task_available, + std::memory_order_relaxed, + std::memory_order_relaxed); + if (swapped) { + details::atomic_notify_one(m_status); + } +} + void thread_pool_worker::enqueue_foreign(concurrencpp::task& task) { if (m_status.load(std::memory_order_relaxed) == details::executor_status::abort) { details::throw_runtime_shutdown_exception(m_parent_pool.name); @@ -382,14 +387,7 @@ void thread_pool_worker::enqueue_foreign(concurrencpp::task& task) { ensure_worker_active(lock); } - auto expected = details::executor_status::idle; - const auto swapped = m_status.compare_exchange_strong(expected, - details::executor_status::task_available, - std::memory_order_relaxed, - std::memory_order_relaxed); - if (swapped) { - details::atomic_notify_one(m_status); - } + notify_task_exists(); } void thread_pool_worker::enqueue_foreign(std::span tasks) { @@ -413,24 +411,26 @@ void thread_pool_worker::enqueue_local(std::span tasks) { } void thread_pool_worker::shutdown() { - assert(!m_atomic_abort.load(std::memory_order_relaxed)); + assert(!m_atomic_abort.load(std::memory_order_relaxed)); m_atomic_abort.store(true, std::memory_order_relaxed); m_status.store(executor_status::abort, std::memory_order_relaxed); atomic_notify_one(m_status); - if (m_thread.joinable()) { - m_thread.join(); - } - + decltype(m_thread) worker; decltype(m_public_queue) public_queue; decltype(m_private_queue) private_queue; { std::unique_lock lock(m_lock); + worker = std::move(m_thread); public_queue = std::move(m_public_queue); private_queue = std::move(m_private_queue); } + if (worker.joinable()) { + worker.join(); + } + public_queue.clear(); private_queue.clear(); } @@ -512,7 +512,7 @@ void thread_pool_executor::enqueue(std::span tasks) { return this_worker->enqueue_local(tasks); } - if (tasks.size() < m_workers.size()) { + if (tasks.size() <= m_workers.size()) { for (auto& task : tasks) { enqueue(std::move(task)); }