Skip to content

Commit

Permalink
thread_pool_improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
David-Haim committed Apr 22, 2024
1 parent ff9b565 commit 519d694
Showing 1 changed file with 34 additions and 34 deletions.
68 changes: 34 additions & 34 deletions source/executors/thread_pool_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,11 @@ namespace concurrencpp::details {
const std::function<void(std::string_view thread_name)> m_thread_terminated_callback;

void balance_work();
bool drain_queue_impl();
bool drain_private_queue();

void work_loop();

void notify_task_exists() noexcept;
void ensure_worker_active(std::unique_lock<std::mutex>& lock);
void mark_worker_stale();

Expand Down Expand Up @@ -86,14 +87,7 @@ namespace concurrencpp::details {
ensure_worker_active(lock);
}

auto expected = details::executor_status::idle;
const auto swapped = m_status.compare_exchange_strong(expected,
details::executor_status::task_available,
std::memory_order_relaxed,
std::memory_order_relaxed);
if (swapped) {
details::atomic_notify_one(m_status);
}
notify_task_exists();
}
};
} // namespace concurrencpp::details
Expand Down Expand Up @@ -196,9 +190,9 @@ thread_pool_worker::thread_pool_worker(thread_pool_executor& parent_pool,
}

thread_pool_worker::thread_pool_worker(thread_pool_worker&& rhs) noexcept :
m_parent_pool(rhs.m_parent_pool), m_index(rhs.m_index), m_pool_size(rhs.m_pool_size), m_max_idle_time(rhs.m_max_idle_time),
m_parent_pool(rhs.m_parent_pool), m_index(0), m_pool_size(0), m_max_idle_time(std::chrono::seconds(0)),
m_status(executor_status::idle), m_worker_stale(true) {
std::abort(); // shouldn't be called
std::abort(); // shouldn't be called, this ctor is only so vector::emplace_back won't conpmail
}

thread_pool_worker::~thread_pool_worker() noexcept {
Expand Down Expand Up @@ -274,12 +268,11 @@ void thread_pool_worker::balance_work() {
m_idle_worker_list.clear();
}

bool thread_pool_worker::drain_queue_impl() {
bool thread_pool_worker::drain_private_queue() {
while (!m_private_queue.empty()) {
balance_work();

if (m_atomic_abort.load(std::memory_order_relaxed)) {
mark_worker_stale();
return false;
}

Expand All @@ -303,7 +296,7 @@ void thread_pool_worker::work_loop() {

auto status = m_status.load(std::memory_order_relaxed);
if (status == details::executor_status::abort) {
return mark_worker_stale();
break;
}

if (status == details::executor_status::task_available) {
Expand All @@ -313,16 +306,16 @@ void thread_pool_worker::work_loop() {
std::memory_order_relaxed);

if (!swapped) {
continue; // abort
continue; // m_status is either "abort" or "task_available"
}

{
std::unique_lock<std::mutex> lock(m_lock);
std::swap(m_private_queue, m_public_queue); // reuse underlying allocations.
}

if (!drain_queue_impl()) {
return;
if (!drain_private_queue()) {
break;
}

continue;
Expand All @@ -332,13 +325,14 @@ void thread_pool_worker::work_loop() {
const auto wait_status =
atomic_wait_for(m_status, details::executor_status::idle, m_max_idle_time, std::memory_order_relaxed);

if (wait_status == atomic_wait_status::timeout) {
return mark_worker_stale();
if (wait_status == atomic_wait_status::timeout) {
break;
}
}
} catch (const errors::runtime_shutdown&) {
mark_worker_stale();
}

mark_worker_stale();
}

void thread_pool_worker::ensure_worker_active(std::unique_lock<std::mutex>& lock) {
Expand Down Expand Up @@ -371,6 +365,17 @@ void concurrencpp::details::thread_pool_worker::mark_worker_stale() {
m_worker_stale = true;
}

void concurrencpp::details::thread_pool_worker::notify_task_exists() noexcept {
auto expected = details::executor_status::idle;
const auto swapped = m_status.compare_exchange_strong(expected,
details::executor_status::task_available,
std::memory_order_relaxed,
std::memory_order_relaxed);
if (swapped) {
details::atomic_notify_one(m_status);
}
}

void thread_pool_worker::enqueue_foreign(concurrencpp::task& task) {
if (m_status.load(std::memory_order_relaxed) == details::executor_status::abort) {
details::throw_runtime_shutdown_exception(m_parent_pool.name);
Expand All @@ -382,14 +387,7 @@ void thread_pool_worker::enqueue_foreign(concurrencpp::task& task) {
ensure_worker_active(lock);
}

auto expected = details::executor_status::idle;
const auto swapped = m_status.compare_exchange_strong(expected,
details::executor_status::task_available,
std::memory_order_relaxed,
std::memory_order_relaxed);
if (swapped) {
details::atomic_notify_one(m_status);
}
notify_task_exists();
}

void thread_pool_worker::enqueue_foreign(std::span<concurrencpp::task> tasks) {
Expand All @@ -413,24 +411,26 @@ void thread_pool_worker::enqueue_local(std::span<concurrencpp::task> tasks) {
}

void thread_pool_worker::shutdown() {
assert(!m_atomic_abort.load(std::memory_order_relaxed));
assert(!m_atomic_abort.load(std::memory_order_relaxed));
m_atomic_abort.store(true, std::memory_order_relaxed);
m_status.store(executor_status::abort, std::memory_order_relaxed);
atomic_notify_one(m_status);

if (m_thread.joinable()) {
m_thread.join();
}

decltype(m_thread) worker;
decltype(m_public_queue) public_queue;
decltype(m_private_queue) private_queue;

{
std::unique_lock<std::mutex> lock(m_lock);
worker = std::move(m_thread);
public_queue = std::move(m_public_queue);
private_queue = std::move(m_private_queue);
}

if (worker.joinable()) {
worker.join();
}

public_queue.clear();
private_queue.clear();
}
Expand Down Expand Up @@ -512,7 +512,7 @@ void thread_pool_executor::enqueue(std::span<concurrencpp::task> tasks) {
return this_worker->enqueue_local(tasks);
}

if (tasks.size() < m_workers.size()) {
if (tasks.size() <= m_workers.size()) {
for (auto& task : tasks) {
enqueue(std::move(task));
}
Expand Down

0 comments on commit 519d694

Please sign in to comment.