Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A few minor upgrades update of ThreadPool.h #120

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 34 additions & 17 deletions ThreadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,31 +10,37 @@
#include <future>
#include <functional>
#include <stdexcept>
#include <type_traits>

class ThreadPool {
public:
ThreadPool(size_t);
template<class F, class... Args>
auto enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>;
-> std::future<typename std::invoke_result<F, Args...>::type>;
~ThreadPool();

// 获取当前任务队列大小
size_t getTaskCount();
// 获取当前活跃线程数
size_t getActiveThreadCount();

private:
// need to keep track of threads so we can join them
std::vector< std::thread > workers;
// the task queue
std::queue< std::function<void()> > tasks;
// 跟踪线程以便我们可以加入它们
std::vector<std::thread> workers;
// 任务队列
std::queue<std::function<void()>> tasks;

// synchronization
// 同步
std::mutex queue_mutex;
std::condition_variable condition;
bool stop;
};

// the constructor just launches some amount of workers

inline ThreadPool::ThreadPool(size_t threads)
: stop(false)
: stop(false)
{
for(size_t i = 0;i<threads;++i)
for(size_t i = 0; i < threads; ++i)
workers.emplace_back(
[this]
{
Expand All @@ -52,28 +58,30 @@ inline ThreadPool::ThreadPool(size_t threads)
this->tasks.pop();
}

task();
try {
task();
} catch (const std::exception& e) {
// 处理任务中的异常
}
}
}
);
}

// add new work item to the pool
template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>
-> std::future<typename std::invoke_result<F, Args...>::type>
{
using return_type = typename std::result_of<F(Args...)>::type;
using return_type = typename std::invoke_result<F, Args...>::type;

auto task = std::make_shared< std::packaged_task<return_type()> >(
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);

std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);

// don't allow enqueueing after stopping the pool
if(stop)
throw std::runtime_error("enqueue on stopped ThreadPool");

Expand All @@ -83,7 +91,6 @@ auto ThreadPool::enqueue(F&& f, Args&&... args)
return res;
}

// the destructor joins all threads
inline ThreadPool::~ThreadPool()
{
{
Expand All @@ -95,4 +102,14 @@ inline ThreadPool::~ThreadPool()
worker.join();
}

size_t ThreadPool::getTaskCount() {
std::unique_lock<std::mutex> lock(queue_mutex);
return tasks.size();
}

size_t ThreadPool::getActiveThreadCount() {
std::unique_lock<std::mutex> lock(queue_mutex);
return workers.size();
}

#endif