Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

C++17 -ify and add maximum task queue size #75

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
ThreadPool
==========

A simple C++11 Thread Pool implementation.
A simple C++20 Thread Pool implementation.

Basic usage:
```c++
// create thread pool with 4 worker threads
ThreadPool pool(4);
ThreadPool pool{4};

// enqueue and store future
auto result = pool.enqueue([](int answer) { return answer; }, 42);
Expand Down
126 changes: 57 additions & 69 deletions ThreadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,89 +10,77 @@
#include <future>
#include <functional>
#include <stdexcept>
constexpr size_t maxQueueSize = 100;

class ThreadPool {
class ThreadPool
{
public:
ThreadPool(size_t);
template<class F, class... Args>
auto enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>;
~ThreadPool();
ThreadPool(size_t threads = std::thread::hardware_concurrency() - 1)
{
workers.reserve(threads);
for (size_t i{}; i < threads; ++i)
makeThread();
}

template <class F, class... Args>
auto enqueue(F &&f, Args &&... args)
{
using return_type = std::invoke_result_t<F, Args...>;
//...v This need C++20!
std::packaged_task<return_type()> task{[f = std::forward<F>(f), ... args = std::forward<Args>(args)]() mutable { return f(std::forward<Args>(args)...); }};
auto result = task.get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);

// don't allow enqueueing after stopping the pool
if (stop)
throw std::runtime_error("enqueue on stopped ThreadPool");

if (tasks.size() >= maxQueueSize)
queueStatus.wait(lock, [&]() { return tasks.size() < maxQueueSize; });

tasks.emplace(std::move(task));//No need for std::shared_ptr anymore!
}
hasNewTask.notify_one();
return result;
}

~ThreadPool()
{
stop = true;
hasNewTask.notify_all();
for (auto &worker : workers)
worker.join();
}

private:
// need to keep track of threads so we can join them
std::vector< std::thread > workers;
// the task queue
std::queue< std::function<void()> > tasks;

// synchronization
std::vector<std::thread> workers;
std::queue<std::packaged_task<void()>> tasks;
std::mutex queue_mutex;
std::condition_variable condition;
bool stop;
};

// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads)
: stop(false)
{
for(size_t i = 0;i<threads;++i)
std::condition_variable queueStatus;
std::condition_variable hasNewTask;
std::atomic_bool stop{false};

void makeThread()
{
workers.emplace_back(
[this]
{
for(;;)
[this] {
while (true)
{
std::function<void()> task;

std::packaged_task<void()> task;
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(lock,
[this]{ return this->stop || !this->tasks.empty(); });
if(this->stop && this->tasks.empty())
this->hasNewTask.wait(lock, [&] { return this->stop || !this->tasks.empty(); });
if (this->stop && this->tasks.empty())
return;
task = std::move(this->tasks.front());
this->tasks.pop();
queueStatus.notify_one();
}

task();
}
}
);
}

// add new work item to the pool
template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>
{
using return_type = typename std::result_of<F(Args...)>::type;

auto task = std::make_shared< std::packaged_task<return_type()> >(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);

std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);

// don't allow enqueueing after stopping the pool
if(stop)
throw std::runtime_error("enqueue on stopped ThreadPool");

tasks.emplace([task](){ (*task)(); });
}
condition.notify_one();
return res;
}

// the destructor joins all threads
inline ThreadPool::~ThreadPool()
{
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
});
}
condition.notify_all();
for(std::thread &worker: workers)
worker.join();
}
};

#endif