CTPL线程池笔记
原文有一个基于boost并发队列的实现,也有一个自己使用STL和锁实现的并发队列,我选择参考STL的版本。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| namespace ctpl{ namespace detail{ template <typename T> class Queue{ public: bool push(T const & value){ std::unique_lock<std::shared_mutex> writeLock(this->m_mutex); this->q.push(value); return true; }
bool pop(T& v){ std::unique_lock<std::shared_mutex> writeLock(this->m_mutex); if (this->q.empty()) return false; v = this->q.front(); this->q.pop(); return true; }
bool empty(){ std::shared_lock<std::shared_mutex> readLock(this->m_mutex); return this->q.empty(); } private: std::queue<T> q; mutable std::shared_mutex m_mutex; }; } }
|
在原有基础上修改为读写锁,在读取时使用 shared_lock,在写入操作时使用unique_lock。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218
| namespace ctpl{ class thread_pool{ public: thread_pool(){this->init();} thread_pool(int nThreads){ this->init(); this->resize(nThreads); }
~thread_pool() noexcept{ this->stop(true); } int size() const noexcept { std::unique_lock<std::mutex> lock(m_mutex); return static_cast<int>(this->threads.size()); }
int idle() const noexcept { std::unique_lock<std::mutex> lock(m_mutex); return this->nWaiting; }
std::thread& get_thread(int i) noexcept { return *this->threads[i]; }
void resize(int nThreads){ if(!this->isStop && !this->isDone){ int oldNThreads = static_cast<int>(this->threads.size()); if(oldNThreads < nThreads){ this->threads.resize(nThreads); this->flags.resize(nThreads);
for(int i =oldNThreads;i<nThreads;i++){ flags[i] = std::make_shared<std::atomic<bool>>(false); this->set_thread(i); } }else{ for(int i=oldNThreads-1;i>=nThreads;i--){ (*flags[i]).store(true); threads[i]->detach(); } { std::unique_lock<std::mutex> lock(m_mutex); cv.notify_all();
}
this->threads.resize(nThreads); this->flags.resize(nThreads); } } }
void clear_queue() noexcept { std::function<void(int id)> *_f; while(this->q.pop(_f)){ delete _f; } }
std::function<void(int)>pop(){ std::function<void(int id)> *_f = nullptr; this->q.pop(_f); std::unique_ptr<std::function<void(int id)>> func(_f); std::function<void(int)> f; if(_f){ f = *_f; } return f; }
void stop(bool isWait = false){ if(!isWait){ if(this->isStop) return; this->isStop = true; for(int i=0,n=this->size();i<n;i++){ (*flags[i]).store(true); } this->clear_queue(); } else{ if(this->isDone || this->isStop) return; this->isDone = true; } { std::unique_lock<std::mutex> lock(m_mutex); cv.notify_all(); } for (int i = 0; i < static_cast<int>(threads.size()); ++i) { if (threads[i]->joinable()) threads[i]->join(); } clear_queue(); threads.clear(); flags.clear(); }
template<typename F,typename... Rest> auto push(F &&f ,Rest&&... rest) ->std::future<decltype(f(0,rest...))>{ auto pck = std::make_shared<std::packaged_task<decltype(f(0,rest...))(int)>>( std::bind(std::forward<F>(f),std::placeholders::_1,std::forward<Rest>(rest)...) ); auto _f = new std::function<void(int id)>([pck](int id){ (*pck)(id); }); this->q.push(_f); std::unique_lock<std::mutex> lock(this->m_mutex); this->cv.notify_one(); return pck->get_future(); }
template<typename F> auto push(F &&f) ->std::future<decltype(f(0))>{ auto pck = std::make_shared<std::packaged_task<decltype(f(0))(int)>>( std::forward<F>(f) ); auto _f = new std::function<void(int id)>([pck](int id) { (*pck)(id); }); this->q.push(std::move(_f)); std::unique_lock<std::mutex> lock(this->m_mutex); this->cv.notify_one(); return pck->get_future(); } private: thread_pool(const thread_pool &) =delete; thread_pool(thread_pool &&) =delete; thread_pool & operator=(const thread_pool &) =delete; thread_pool & operator=(thread_pool &&) =delete;
void set_thread(int i){ std::shared_ptr<std::atomic<bool>> flag(this->flags[i]); auto f = [this,i,flag](){ std::atomic<bool> & _flag = *flag; std::function<void(int id)> * _f; bool isPop = this->q.pop(_f); while(true){ while(isPop){ std::unique_ptr<std::function<void(int id)>> func(_f); (*_f)(i); if(_flag.load()){ return; } else { isPop = this->q.pop(_f); } } std::unique_lock<std::mutex> lock(m_mutex); ++nWaiting; cv.wait(lock,[this,&_f,&isPop,&_flag](){ isPop = q.pop(_f); return isPop || isDone.load() || _flag.load(); }); --nWaiting; if(!isPop) { return; } } };
this->threads[i].reset(new std::thread(f)); } void init(){ this->nWaiting = 0; this->isStop = false; this->isDone = false; } std::vector<std::unique_ptr<std::thread>> threads; std::vector<std::shared_ptr<std::atomic<bool>>> flags; detail::Queue<std::function<void(int id)> *> q; std::atomic<bool> isDone; std::atomic<bool> isStop; std::atomic<int> nWaiting; mutable std::mutex m_mutex; std::condition_variable cv; }; }
|
std::thread::join()
会阻塞当前线程,直到目标线程执行完毕(退出)。
template<typename F, typename... Rest>
这里Rest是一个模板参数包,代表零个或多个模板参数,常见展开方式有:
函数调用展开
递归展开等
与完美转发结合
1 2 3 4 5 6 7
| void push(F &&f, Rest&&... rest) { auto task = [f = std::forward<F>(f), args = std::make_tuple(std::forward<Rest>(rest)...)]() mutable { std::apply(f, args); }; }
|
decltype
类型推导关键字,用于在编译时 获取表达式的类型
packaged_task
将可调用对象封装成一个异步对象
std::bind
将函数与参数绑定
auto function() -> returnType
是后置返回类型,特别适用于 返回类型依赖于模板参数 或 需要推导复杂表达式类型 的场景。
std::condition_variable
条件变量,等待 notify_one
等函数唤醒,wait()要求传入 std::unique_lock
1 2
| std::unique_lock<std::mutex> lock(mtx); cv.wait(lock, []{ return !task_queue.empty(); });
|
std::function
是 C++11 引入的一个 多态函数包装器
1 2
| std::function<返回类型(参数类型列表)> func = 可调用对象; func(参数值...);
|