CTPL线程池笔记

原文有一个基于boost并发队列的实现,也有一个自己使用STL和锁实现的并发队列,我选择参考STL的版本。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
namespace ctpl{
namespace detail{
template <typename T>
class Queue{
public:
bool push(T const & value){
std::unique_lock<std::shared_mutex> writeLock(this->m_mutex);
this->q.push(value);
return true;
}

bool pop(T& v){
std::unique_lock<std::shared_mutex> writeLock(this->m_mutex);
if (this->q.empty())
return false;
v = this->q.front();
this->q.pop();
return true;
}

bool empty(){
std::shared_lock<std::shared_mutex> readLock(this->m_mutex);
return this->q.empty();
}
private:
std::queue<T> q;
// mutable 允许在 const 成员函数中修改 m_mutex
mutable std::shared_mutex m_mutex;
};
}
}

在原有基础上修改为读写锁,在读取时使用 shared_lock,在写入操作时使用unique_lock。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
namespace ctpl{
class thread_pool{
public:
thread_pool(){this->init();}
thread_pool(int nThreads){
this->init();
this->resize(nThreads);
}

~thread_pool() noexcept{
this->stop(true); // 等待所有线程完成
}

// 返回线程池大小
int size() const noexcept {
std::unique_lock<std::mutex> lock(m_mutex);
return static_cast<int>(this->threads.size());
}

// 返回空闲线程数量
int idle() const noexcept {
std::unique_lock<std::mutex> lock(m_mutex);
return this->nWaiting;
}

std::thread& get_thread(int i) noexcept {
return *this->threads[i];
}

// 重置线程池大小
// 如果新大小小于当前大小,则停止多余的线程
// 如果新大小大于当前大小,则创建新的线程
void resize(int nThreads){
if(!this->isStop && !this->isDone){
int oldNThreads = static_cast<int>(this->threads.size());
if(oldNThreads < nThreads){
this->threads.resize(nThreads);
this->flags.resize(nThreads);

for(int i =oldNThreads;i<nThreads;i++){
flags[i] = std::make_shared<std::atomic<bool>>(false);
this->set_thread(i); // 设置新线程
}
}else{
for(int i=oldNThreads-1;i>=nThreads;i--){
(*flags[i]).store(true); // 设置线程结束标志
threads[i]->detach(); // 分离线程
}
{
std::unique_lock<std::mutex> lock(m_mutex);
cv.notify_all();

}

this->threads.resize(nThreads); // 调整线程池大小
this->flags.resize(nThreads); // 调整标志大小
}
}
}

// 清空工作队列
void clear_queue() noexcept {
std::function<void(int id)> *_f;
while(this->q.pop(_f)){
delete _f; // 清空队列
}
}

std::function<void(int)>pop(){
std::function<void(int id)> *_f = nullptr;
this->q.pop(_f);
std::unique_ptr<std::function<void(int id)>> func(_f);
std::function<void(int)> f;
// 若 f = *_f 抛出异常(例如 std::function 拷贝构造失败)
// unique_ptr 会在作用域结束时自动释放 _f,避免泄漏。
if(_f){
f = *_f; // 如果队列不为空,返回函数
}
return f;
}

void stop(bool isWait = false){
if(!isWait){
if(this->isStop) return; // 如果线程池已经停止,则直接返回
this->isStop = true; // 设置停止标志
for(int i=0,n=this->size();i<n;i++){
(*flags[i]).store(true); // 设置每个线程的停止标志
}
this->clear_queue(); // 清空工作队列
}
else{
if(this->isDone || this->isStop) return; // 如果线程池已经完成或停止,则直接返回
this->isDone = true; // 设置完成标志
}
{
std::unique_lock<std::mutex> lock(m_mutex);
cv.notify_all(); // 通知所有线程
}
for (int i = 0; i < static_cast<int>(threads.size()); ++i) { // wait for the computing threads to finish
if (threads[i]->joinable())
threads[i]->join();
}
clear_queue(); // 清空工作队列
threads.clear(); // 清空线程池
flags.clear(); // 清空线程标志
}

// F是函数类型,Rest是可变参数模板,用于接收函数的参数。
// 使用尾置返回类型,通过decltype(f(0, rest...))推断函数返回值类型
// 并包装在std::future中。

// 原代码使用的后置返回类型 -> decltype(f(0, rest...)) 可以直接通过 f 和参数推断返回类型
// 但前置返回类型语法要求在函数名之前明确指定类型
// 此时,由于模板参数 F 和 Rest... 尚未实例化,无法直接调用 f(0, rest...)
// 因此需要借助 std::invoke_result 这种编译期类型推导工具。
template<typename F,typename... Rest>
auto push(F &&f ,Rest&&... rest) ->std::future<decltype(f(0,rest...))>{
// std::packaged_task
// 用于将可调用对象(如函数、lambda 表达式、函数对象等)封装为一个异步任务
// 并允许获取该任务的异步结果
auto pck = std::make_shared<std::packaged_task<decltype(f(0,rest...))(int)>>(
std::bind(std::forward<F>(f),std::placeholders::_1,std::forward<Rest>(rest)...)
);
auto _f = new std::function<void(int id)>([pck](int id){
(*pck)(id);
});
this->q.push(_f); // 将任务添加到队列中
std::unique_lock<std::mutex> lock(this->m_mutex);
this->cv.notify_one(); // 通知一个等待的线程
return pck->get_future(); // 返回任务的 future 对象
}

template<typename F>
auto push(F &&f) ->std::future<decltype(f(0))>{
// pck(std::packaged_task 的智能指针)使用 std::shared_ptr 是合理的:
// 多个对象需要共享生命周期:任务函数(std::function)和返回的 future 都需要引用同一个 packaged_task。
// 异步执行的特性:任务可能在线程池中的某个线程执行,而 future 可能在主线程被访问。
auto pck = std::make_shared<std::packaged_task<decltype(f(0))(int)>>(
// 1. 这里可以直接写 std::forward<F>(f)
// 2. 也可以使用 std::bind 来绑定函数和参数
// std::bind(std::forward<F>(f),std::placeholders::_1)
std::forward<F>(f)
);
auto _f = new std::function<void(int id)>([pck](int id) {
(*pck)(id);
});
this->q.push(std::move(_f)); // 将任务添加到队列中
std::unique_lock<std::mutex> lock(this->m_mutex);
this->cv.notify_one(); // 通知一个等待的线程
return pck->get_future(); // 返回任务的 future 对象
}

private:
// 禁用了线程池类的拷贝构造函数
// 移动构造函数、拷贝赋值运算符和移动赋值运算符
thread_pool(const thread_pool &) =delete;
thread_pool(thread_pool &&) =delete;
thread_pool & operator=(const thread_pool &) =delete;
thread_pool & operator=(thread_pool &&) =delete;

// 设置线程函数
void set_thread(int i){
// 创建一个共享指针,指向线程的flag标志
std::shared_ptr<std::atomic<bool>> flag(this->flags[i]);

auto f = [this,i,flag](){
// 获取共享指针的引用,确保线程可以访问到正确的标志
std::atomic<bool> & _flag = *flag;
// 定义一个函数指针,用于执行任务
std::function<void(int id)> * _f;
bool isPop = this->q.pop(_f);
while(true){
while(isPop){
// 确保函数指针在返回时被删除
std::unique_ptr<std::function<void(int id)>> func(_f);
(*_f)(i); // 执行任务
if(_flag.load()){
return; // 如果线程被要求停止,则返回
} else {
isPop = this->q.pop(_f); // 继续从队列中获取任务
}
}
// 如果队列为空,等待新的任务
std::unique_lock<std::mutex> lock(m_mutex);
++nWaiting; // 增加等待线程计数
// 工作线程通过 cv.wait() 进入休眠状态,释放锁 lock
// 等到调用notify_one() 或 notify_all() 时,线程会被唤醒
cv.wait(lock,[this,&_f,&isPop,&_flag](){
isPop = q.pop(_f); // 尝试从队列中获取任务
return isPop || isDone.load() || _flag.load(); // 等待条件满足
});
--nWaiting; // 减少等待线程计数
if(!isPop) {
return; // 如果队列仍然为空且线程池已完成或线程被要求停止,则返回
}
}
};

this->threads[i].reset(new std::thread(f)); // 使用 make_unique 创建线程
}
void init(){
this->nWaiting = 0;
this->isStop = false;
this->isDone = false;
}
std::vector<std::unique_ptr<std::thread>> threads;
std::vector<std::shared_ptr<std::atomic<bool>>> flags;
// 线程池的任务队列
detail::Queue<std::function<void(int id)> *> q;
std::atomic<bool> isDone;
std::atomic<bool> isStop;
std::atomic<int> nWaiting; // 等待线程的数量
// 线程池的读写互斥锁
mutable std::mutex m_mutex;
// 条件变量,用于线程间的同步
std::condition_variable cv;
};
}
  1. std::thread::join() 会阻塞当前线程,直到目标线程执行完毕(退出)。

  2. template<typename F, typename... Rest> 这里Rest是一个模板参数包,代表零个或多个模板参数,常见展开方式有:

    1. 函数调用展开

      1
      func(rest...);
    2. 递归展开等

    3. 与完美转发结合

      1
      2
      3
      4
      5
      6
      7
      void push(F &&f, Rest&&... rest) {
      // 使用 std::forward 完美转发参数
      auto task = [f = std::forward<F>(f), args = std::make_tuple(std::forward<Rest>(rest)...)]() mutable {
      std::apply(f, args); // 展开参数包并调用
      };
      // ...
      }
  3. decltype 类型推导关键字,用于在编译时 获取表达式的类型

  4. packaged_task将可调用对象封装成一个异步对象

  5. std::bind 将函数与参数绑定

  6. auto function() -> returnType是后置返回类型,特别适用于 返回类型依赖于模板参数需要推导复杂表达式类型 的场景。

  7. std::condition_variable 条件变量,等待 notify_one等函数唤醒,wait()要求传入 std::unique_lock

    1
    2
    std::unique_lock<std::mutex> lock(mtx);
    cv.wait(lock, []{ return !task_queue.empty(); });
  8. std::function是 C++11 引入的一个 多态函数包装器

    1
    2
    std::function<返回类型(参数类型列表)> func = 可调用对象;
    func(参数值...); // 直接调用,等价于 func.operator()(参数值...)