标签搜索

目 录CONTENT

文章目录

基于Cpp11的线程池

小小城
2021-12-17 / 0 评论 / 0 点赞 / 7 阅读 / 5,993 字 / 正在检测是否收录...
温馨提示:
本文最后更新于 2022-05-02,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

基于Cpp11的线程池

[toc]

一、代码

#pragma once
#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>

#define DEBUG_THREADPOOL
#ifdef DEBUG_THREADPOOL
#define LOGDBG(fmt, ...) \
do { \
    printf( fmt "\n", ##__VA_ARGS__); \
} while (0)
#else
#define LOGDBG(fmt, ...) ((void *)(0))
#endif

//namespace {
class ThreadPool {
public:
    ThreadPool(size_t);
    template<class F, class... Args>
    auto enqueue(F&& f, Args&&... args) 
        -> std::future<typename std::result_of<F(Args...)>::type>;
    void wait_all_finished();
    ~ThreadPool();

private:
    // need to keep track of threads so we can join them
    std::vector< std::thread > _workers;
    // the task queue
    std::queue< std::function<void()> > _tasks;

    // synchronization
    std::mutex _queue_mutex;
    std::condition_variable _condition;
    bool _stop;

    // supervisor
    std::mutex _worker_running_mutex;
    std::condition_variable _worker_running_condition;
    std::atomic<std::size_t> _worker_running_count;

    // for waiting all workers finished.
    struct WorkerSupervisor {
        WorkerSupervisor(ThreadPool &tp)
            : _tp(tp)
        { }

        ~WorkerSupervisor() {
            std::size_t prev = std::atomic_fetch_sub_explicit(&_tp._worker_running_count,
                                                     std::size_t(1),
                                                     std::memory_order_acq_rel);
            if (prev == 1) {
                std::unique_lock<std::mutex> lock(_tp._worker_running_mutex);
                _tp._worker_running_condition.notify_all();
            }
        }

        ThreadPool &_tp;
    };
};
 
// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads)
    :   _stop(false)
    , _worker_running_count(0) {
    LOGDBG("ThreadPool constructor with %d threads.", threads);
    for (size_t i = 0;i < threads; ++i) {
        _workers.emplace_back(
            [this]
            {
                for (;;) {
                    std::function<void()> task;

                    {
                        std::unique_lock<std::mutex> lock(this->_queue_mutex);
                        // template <class Predicate>
                        // void wait (unique_lock<mutex>& lck, Predicate pred);
                        // 只有当 pred 条件为 false 时调用 wait() 才会阻塞当前线程,并且在收到其他线程的通知后只有当 pred 为 true 时才会被解除阻塞,即先让 pred 为 true 再进行 notice
                        // while (!pred()) wait(lck);
                        this->_condition.wait(lock,
                            [this]{ 
                                return this->_stop || !this->_tasks.empty(); 
                            }
                        );
                        
                        if (this->_stop && this->_tasks.empty()) {
                            LOGDBG("ThreadPool constructor stop and empty");
                            return;
                        }
                        
                        LOGDBG("ThreadPool constructor run task.");
                        task = std::move(this->_tasks.front());
                        this->_tasks.pop();
                    }
                    WorkerSupervisor supervisor(*this);
                    task();
                }
            }
        );
    }
}

// add new work item to the pool
template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args) 
    -> std::future<typename std::result_of<F(Args...)>::type> {
    LOGDBG("ThreadPool enqueue a task.");
    using return_type = typename std::result_of<F(Args...)>::type;

    auto task = std::make_shared< std::packaged_task<return_type()> >(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );
        
    std::future<return_type> res = task->get_future();
    {
        std::unique_lock<std::mutex> lock(_queue_mutex);

        // don't allow enqueueing after stopping the pool
        if (_stop) {
            throw std::runtime_error("enqueue on stopped ThreadPool");
        }

        _tasks.emplace([task](){ (*task)(); });
        std::atomic_fetch_add_explicit(&_worker_running_count,
                                       std::size_t(1),
                                       std::memory_order_relaxed);
    }
    _condition.notify_one();
    return res;
}

// the destructor joins all threads
inline ThreadPool::~ThreadPool() {
    LOGDBG("ThreadPool destructor.");
    {
        std::unique_lock<std::mutex> lock(_queue_mutex);
        _stop = true;
    }
    _condition.notify_all();
    LOGDBG("ThreadPool join threads start.");
    for (std::thread &worker: _workers) {
        worker.join();
    }
    assert(_worker_running_count == 0);
    LOGDBG("ThreadPool join threads end.");
}

inline void ThreadPool::wait_all_finished() {
    std::unique_lock<std::mutex> lock(this->_worker_running_mutex);
    this->_worker_running_condition.wait(lock, [this]{ return this->_worker_running_count == 0; });
}
//}//namespace

二、备注

  1. 测试
ThreadPool pool(10);
std::vector< std::future<int> > results;
results.emplace_back(pool.enqueue([=]() {
    printf("thread pool\n");
    return 1 * 1;
});
pool.wait_all_finished();

for(auto && result: results) {
    std::cout << "synth ret:" << result.get() << std::endl;
}
  1. 可变模版参数
  • 可变参数模板和普通模板的语义是一样的,只是写法上稍有区别
  • 声明可变参数模板时需要在typename或class后面带上省略号“...”。
  • 比如我们常常这样声明一个可变模版参数:template<typename...>或者template<class...>
  • 一个典型的可变模版参数的定义是这样的:
template<class... T> 
void fun(T... args) { }

上面的可变模版参数的定义当中,省略号的作用有两个:

  • 声明一个参数包T... args,这个参数包中可以包含0到任意个模板参数;
  • 在模板定义的右边,可以将参数包展开成一个一个独立的参数。
  • 上面的参数args前面有省略号,所以它就是一个可变模版参数,我们把带省略号的参数称为“参数包”,它里面包含了0到N(N>=0)个模版参数。
  • 我们无法直接获取参数包args中的每个参数的,只能通过展开参数包的方式来获取参数包中的每个参数,这是使用可变模版参数的一个主要特点,也是最大的难点,即如何展开可变模版参数
  • 可变模版参数和普通的模版参数语义是一致的,所以可以应用于函数和类,即可变模版参数函数和可变模版参数类
  • 然而,模版函数不支持偏特化,所以可变模版参数函数和可变模版参数类展开可变模版参数的方法还不尽相同,包括递归函数方式展开参数包、逗号表达式展开参数包
  1. std::result_of
  • 原型
template <class Fn, class... ArgTypes>
class result_of<Fn(ArgTypes...)>
  • 作用:用于在编译的时候推导出一个可调用对象(函数,std::funciton或者重载了operator()操作的对象等)的返回值类型.主要用于模板编写中.

  • 解释:模板参数Fn:可调用对象;class... ArgTypes模板参数列表;成员:
    type可调用对象的返回类型.(主要用该成员来获得结果)

  • 实例

#include <iostream>
#include <type_traits>
 
int fn(int) {return int();}                            // function
typedef int(&fn_ref)(int);                             // function reference
typedef int(*fn_ptr)(int);                             // function pointer
struct fn_class { int operator()(int i){return i;} };  // function-like class
 
int main() {
  typedef std::result_of<decltype(fn)&(int)>::type A;  // int
  typedef std::result_of<fn_ref(int)>::type B;         // int
  typedef std::result_of<fn_ptr(int)>::type C;         // int
  typedef std::result_of<fn_class(int)>::type D;       // int
 
  std::cout << std::boolalpha;
  std::cout << "typedefs of int:" << std::endl;
 
  std::cout << "A: " << std::is_same<int,A>::value << std::endl;
  std::cout << "B: " << std::is_same<int,B>::value << std::endl;
  std::cout << "C: " << std::is_same<int,C>::value << std::endl;
  std::cout << "D: " << std::is_same<int,D>::value << std::endl;
 
  return 0;
}
  1. typename关键字
  • 用在模板定义里, 标明其后的模板参数是类型参数
  1. std::future
  • 我们想要从线程中返回异步任务结果,一般需要依靠全局变量;从安全角度看,有些不妥;为此C++11提供了std::future类模板,future对象提供访问异步操作结果的机制,很轻松解决从异步任务中返回结果
    image.png
  1. 返回值类型后置
0

评论区