Skip to content

Commit

Permalink
fix threadpool scheduling bug where tasks are not scheduled on anothe…
Browse files Browse the repository at this point in the history
…r tp if the current executor is already a tp
  • Loading branch information
David-Haim committed Nov 13, 2023
1 parent 2d1e2a7 commit a0c2c67
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 12 deletions.
23 changes: 13 additions & 10 deletions source/executors/thread_pool_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,16 @@ using concurrencpp::details::thread_pool_worker;
namespace concurrencpp::details {
namespace {
struct thread_pool_per_thread_data {
thread_pool_worker* this_worker;
size_t this_thread_index;
const size_t this_thread_hashed_id;
thread_pool_worker* this_worker = nullptr;
const thread_pool_executor* parent_pool = nullptr;
size_t this_thread_index = static_cast<size_t>(-1);
const size_t this_thread_hashed_id = calculate_hashed_id();

static size_t calculate_hashed_id() noexcept {
const auto this_thread_id = thread::get_current_virtual_id();
const std::hash<size_t> hash;
return hash(this_thread_id);
}

thread_pool_per_thread_data() noexcept :
this_worker(nullptr), this_thread_index(static_cast<size_t>(-1)), this_thread_hashed_id(calculate_hashed_id()) {}
};

thread_local thread_pool_per_thread_data s_tl_thread_pool_data;
Expand Down Expand Up @@ -365,6 +363,7 @@ bool thread_pool_worker::drain_queue() {

void thread_pool_worker::work_loop() {
s_tl_thread_pool_data.this_worker = this;
s_tl_thread_pool_data.parent_pool = &m_parent_pool;
s_tl_thread_pool_data.this_thread_index = m_index;

try {
Expand Down Expand Up @@ -556,9 +555,10 @@ void thread_pool_executor::mark_worker_active(size_t index) noexcept {

void thread_pool_executor::enqueue(concurrencpp::task task) {
const auto this_worker = details::s_tl_thread_pool_data.this_worker;
const auto parent_pool = details::s_tl_thread_pool_data.parent_pool;
const auto this_worker_index = details::s_tl_thread_pool_data.this_thread_index;

if (this_worker != nullptr && this_worker->appears_empty()) {
if ((this_worker != nullptr) && (this == parent_pool) && (this_worker->appears_empty())) {
return this_worker->enqueue_local(task);
}

Expand All @@ -567,7 +567,7 @@ void thread_pool_executor::enqueue(concurrencpp::task task) {
return m_workers[idle_worker_pos].enqueue_foreign(task);
}

if (this_worker != nullptr) {
if ((this_worker != nullptr) && (this == parent_pool)) {
return this_worker->enqueue_local(task);
}

Expand All @@ -576,8 +576,11 @@ void thread_pool_executor::enqueue(concurrencpp::task task) {
}

void thread_pool_executor::enqueue(std::span<concurrencpp::task> tasks) {
if (details::s_tl_thread_pool_data.this_worker != nullptr) {
return details::s_tl_thread_pool_data.this_worker->enqueue_local(tasks);
const auto this_worker = details::s_tl_thread_pool_data.this_worker;
const auto parent_pool = details::s_tl_thread_pool_data.parent_pool;

if ((this_worker != nullptr) && (this == parent_pool)) {
return this_worker->enqueue_local(tasks);
}

if (tasks.size() < m_workers.size()) {
Expand Down
5 changes: 3 additions & 2 deletions test/source/tests/result_tests/resume_on_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,13 @@ void concurrencpp::tests::test_resume_on_shutdown_executor_delayed() {
void concurrencpp::tests::test_resume_on_shared_ptr() {

concurrencpp::runtime runtime;
std::shared_ptr<concurrencpp::executor> executors[4];
std::shared_ptr<concurrencpp::executor> executors[5];

executors[0] = runtime.thread_executor();
executors[1] = runtime.thread_pool_executor();
executors[2] = runtime.make_worker_thread_executor();
executors[2] = runtime.background_executor();
executors[3] = runtime.make_worker_thread_executor();
executors[4] = runtime.make_worker_thread_executor();

std::unordered_set<size_t> set;
resume_on_many_executors_shared(executors, set).get();
Expand Down

0 comments on commit a0c2c67

Please sign in to comment.