Skip to content

Commit

Permalink
Add 'Thread_barrier' and 'Thread_pool' std implems.
Browse files Browse the repository at this point in the history
  • Loading branch information
kouchy committed Nov 9, 2024
1 parent 649753d commit 7da9afc
Show file tree
Hide file tree
Showing 7 changed files with 343 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#ifndef THREAD_BARRIER_STANDARD_HPP_
#define THREAD_BARRIER_STANDARD_HPP_

#include <atomic>
#include <cstdint>

namespace spu
{
namespace tools
{

// implementations exist in pthread lib and C++20 stdlib but not in C++11 stdlib :-(
class Thread_barrier_standard
{
private:
const uint32_t n_threads;
std::atomic<uint32_t> count;

public:
inline Thread_barrier_standard(const uint32_t n_threads);
inline Thread_barrier_standard(const Thread_barrier_standard& other);
inline ~Thread_barrier_standard() = default;
inline void arrive();
inline void reset();
inline void wait();
};

}
}

#ifndef DOXYGEN_SHOULD_SKIP_THIS
#include "Tools/Thread/Thread_barrier/Standard/Thread_barrier_standard.hxx"
#endif

#endif /* THREAD_BARRIER_STANDARD_HPP_ */
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#include <iostream>
#include <sstream>

#include "Tools/Exception/exception.hpp"
#include "Tools/Thread/Thread_barrier/Standard/Thread_barrier_standard.hpp"

namespace spu
{
namespace tools
{

Thread_barrier_standard::Thread_barrier_standard(const uint32_t n_threads)
: n_threads(n_threads)
, count(0)
{
}

Thread_barrier_standard::Thread_barrier_standard(const Thread_barrier_standard& other)
: n_threads(other.n_threads)
, count(0)
{
}

void
Thread_barrier_standard::arrive()
{
this->count++;
if (this->count > this->n_threads)
{
std::stringstream message;
message << "Something went wrong, 'count' cannot be higher than 'n_threads' ('count' = " << this->count
<< " , 'n_threads' = " << n_threads << ").";
throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
}
}

void
Thread_barrier_standard::reset()
{
this->count = 0;
}

void
Thread_barrier_standard::wait()
{
while (this->count != this->n_threads)
std::this_thread::sleep_for(std::chrono::microseconds(1));
this->reset();
}

}
}
42 changes: 42 additions & 0 deletions include/Tools/Thread/Thread_pool/Standard/Thread_pool_standard.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#ifndef THREAD_POOL_STANDARD_HPP_
#define THREAD_POOL_STANDARD_HPP_

#include <condition_variable>
#include <mutex>
#include <thread>

#include "Tools/Thread/Thread_barrier/Standard/Thread_barrier_standard.hpp"
#include "Tools/Thread/Thread_pool/Thread_pool.hpp"

namespace spu
{
namespace tools
{

class Thread_pool_standard : public Thread_pool
{
private:
std::vector<std::thread> pool;
std::vector<std::mutex> mtx;
std::vector<std::condition_variable> cnd;
Thread_barrier_standard barrier;

public:
Thread_pool_standard(const size_t n_threads, std::function<void(const size_t)>& func_init);
Thread_pool_standard(const size_t n_threads);
Thread_pool_standard(const Thread_pool_standard& other);
virtual ~Thread_pool_standard();

virtual void init(const bool async = false);
virtual void run(const bool async = false);
virtual void run(std::function<void(const size_t)>& func_exec, const bool async = false);
virtual void wait();

protected:
void _start_thread(const size_t tid);
};

}
}

#endif /* THREAD_POOL_STANDARD_HPP_ */
41 changes: 41 additions & 0 deletions include/Tools/Thread/Thread_pool/Thread_pool.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#ifndef THREAD_POOL_HPP_
#define THREAD_POOL_HPP_

#include <cstdint>
#include <functional>

#include <vector>

namespace spu
{
namespace tools
{

class Thread_pool
{
protected:
const size_t n_threads;
bool initialized;
std::function<void(const size_t)> func_init;
std::function<void(const size_t)> func_deinit;
std::function<void(const size_t)> func_exec;
bool stop_threads;

public:
Thread_pool(const size_t n_threads);
Thread_pool(const Thread_pool& other);
virtual ~Thread_pool() = default;
void set_func_init(std::function<void(const size_t)>& func_init);
void set_func_deinit(std::function<void(const size_t)>& func_deinit);
void set_func_exec(std::function<void(const size_t)>& func_exec);
void unset_func_exec();
virtual void run(const bool async = false) = 0;
virtual void run(std::function<void(const size_t)>& func_exec, const bool async = false) = 0;
virtual void wait() = 0;
virtual void init(const bool async = false) = 0;
};

}
}

#endif /* THREAD_POOL_HPP_ */
9 changes: 9 additions & 0 deletions include/streampu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,11 +252,20 @@
#ifndef SYSTEM_MEMORY_HPP__
#include <Tools/System/memory.hpp>
#endif
#ifndef THREAD_BARRIER_STANDARD_HPP_
#include <Tools/Thread/Thread_barrier/Standard/Thread_barrier_standard.hpp>
#endif
#ifndef THREAD_PINNING_HPP
#include <Tools/Thread/Thread_pinning/Thread_pinning.hpp>
#endif
#ifndef THREAD_PINNING_UTILS_HPP
#include <Tools/Thread/Thread_pinning/Thread_pinning_utils.hpp>
#endif
#ifndef THREAD_POOL_STANDARD_HPP_
#include <Tools/Thread/Thread_pool/Standard/Thread_pool_standard.hpp>
#endif
#ifndef THREAD_POOL_HPP_
#include <Tools/Thread/Thread_pool/Thread_pool.hpp>
#endif

#endif
113 changes: 113 additions & 0 deletions src/Tools/Thread/Thread_pool/Standard/Thread_pool_standard.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
#include "Tools/Thread/Thread_pool/Standard/Thread_pool_standard.hpp"

using namespace spu;
using namespace spu::tools;

Thread_pool_standard::Thread_pool_standard(const size_t n_threads)
: Thread_pool(n_threads)
, pool(n_threads)
, mtx(n_threads)
, cnd(n_threads)
, barrier(n_threads)
{
}

Thread_pool_standard::Thread_pool_standard(const size_t n_threads, std::function<void(const size_t)>& func_init)
: Thread_pool_standard(n_threads)
{
this->set_func_init(func_init);
this->init();
}

Thread_pool_standard::Thread_pool_standard(const Thread_pool_standard& other)
: Thread_pool(other)
, pool(other.n_threads)
, mtx(other.n_threads)
, cnd(other.n_threads)
, barrier(other.barrier)
{
}

void
Thread_pool_standard::init(const bool async)
{
if (this->initialized)
{
std::stringstream message;
message << "This pool of threads has already been initialized and cannot be initialized twice.";
throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
}

for (size_t tid = 0; tid < n_threads; tid++)
this->pool[tid] = std::thread(&Thread_pool_standard::_start_thread, this, tid);

if (!async)
{
this->barrier.wait();
this->initialized = true;
}
}

Thread_pool_standard::~Thread_pool_standard()
{
// stop the threads pool
this->stop_threads = true;
for (size_t tid = 0; tid < this->n_threads; tid++)
{
std::lock_guard<std::mutex> lock(this->mtx[tid]);
this->cnd[tid].notify_one();
}

for (size_t tid = 0; tid < n_threads; tid++)
this->pool[tid].join();
}

void
Thread_pool_standard::run(const bool async)
{
if (!this->initialized)
{
std::stringstream message;
message << "This pool of threads cannot be run because it has not been initialized.";
throw tools::runtime_error(__FILE__, __LINE__, __func__, message.str());
}

for (size_t tid = 0; tid < n_threads; tid++)
{
std::lock_guard<std::mutex> lock(this->mtx[tid]);
this->cnd[tid].notify_one();
}

if (!async) this->barrier.wait();
}

void
Thread_pool_standard::run(std::function<void(const size_t)>& set_func_exec, const bool async)
{
this->set_func_exec(set_func_exec);
this->run(async);
}

void
Thread_pool_standard::wait()
{
this->barrier.wait();
this->initialized = true;
}

void
Thread_pool_standard::_start_thread(const size_t tid)
{
this->func_init(tid);

while (!this->stop_threads)
{
std::unique_lock<std::mutex> lock(this->mtx[tid]);
this->barrier.arrive();
this->cnd[tid].wait(lock);

if (!this->stop_threads) this->func_exec(tid);
}

this->func_deinit(tid);
}
51 changes: 51 additions & 0 deletions src/Tools/Thread/Thread_pool/Thread_pool.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#include <iostream>

#include "Tools/Exception/exception.hpp"
#include "Tools/Thread/Thread_pool/Thread_pool.hpp"

using namespace spu;
using namespace spu::tools;

Thread_pool::Thread_pool(const size_t n_threads)
: n_threads(n_threads)
, initialized(false)
, func_init([](const size_t) {})
, func_deinit([](const size_t) {})
, func_exec([](const size_t) { throw tools::unimplemented_error(__FILE__, __LINE__, __func__); })
, stop_threads(false)
{
}

Thread_pool::Thread_pool(const Thread_pool& other)
: n_threads(other.n_threads)
, initialized(false)
, func_init(other.func_init)
, func_deinit(other.func_deinit)
, func_exec(other.func_deinit)
, stop_threads(other.stop_threads)
{
}

void
Thread_pool::set_func_init(std::function<void(const size_t)>& func_init)
{
this->func_init = func_init;
}

void
Thread_pool::set_func_deinit(std::function<void(const size_t)>& func_deinit)
{
this->func_deinit = func_deinit;
}

void
Thread_pool::set_func_exec(std::function<void(const size_t)>& func_exec)
{
this->func_exec = func_exec;
}

void
Thread_pool::unset_func_exec()
{
this->func_exec = [](const size_t) { throw tools::unimplemented_error(__FILE__, __LINE__, __func__); };
}

0 comments on commit 7da9afc

Please sign in to comment.