线程池
以下是调用单队列BlockingQueue的代码,使用双队列时将所有BlockingQueue改为BlockingQueuePro
接口
构造:用智能指针 unique_ptr 初始化阻塞队列,创建threads_num个线程并给每个绑定Worker函数开始执行task。(此时全部Worker阻塞在Pop()里)
Post: 发布任务到线程池。
析构:通知唤醒所有消费者,没有任务就自己下班。与生产者没关系。
#pragma once
#include <thread>
#include <functional>
#include <vector>
template <typename T>
class BlockingQueue;
class ThreadPool {
public:
// 初始化线程池
explicit ThreadPool(int threads_num);
// 停止线程池
~ThreadPool();
// 发布任务到线程池
void Post(std::function<void()> task);
private:
// 每个线程运行的循环函数
void Worker();
// 任务队列
std::unique_ptr<BlockingQueue<std::function<void()>>> task_queue_;
// 存放所有的线程对象。每个线程都会绑定 Worker() 执行函数
std::vector<std::thread> workers_;
};
#include "blockingqueue.h"
#include <memory>
#include "threadpool.h"
// 创建一个阻塞队列,开threads_num个线程,每个线程绑定执行 Worker() 方法。
ThreadPool::ThreadPool(int threads_num) {
task_queue_ = std::make_unique<BlockingQueue<std::function<void()>>>();
for (size_t i = 0; i < threads_num; ++i) {
workers_.emplace_back([this] {Worker();});
// 使用 lambda 捕获 this [this] { Worker(); },所以每个线程都能调用当前对象的 Worker 函数。
}
}
// 停止线程池
// 取消队列并唤醒所有阻塞线程
ThreadPool::~ThreadPool() {
task_queue_->Cancel();
for(auto &worker : workers_) {
if (worker.joinable())
worker.join();
}
}
// 发布任务
void ThreadPool::Post(std::function<void()> task) {
task_queue_->Push(task);
}
// 从task_queue_中取出任务
void ThreadPool::Worker() {
while (true) {
std::function<void()> task;
// 阻塞在Pop里实现
if (!task_queue_->Pop(task)) {
break;
}
task();
}
}
阻塞队列(单队列版)
单队列维护:
- nonblock_(bool): 未阻塞标记。为true时,队列不会阻塞。初始(构造时)默认为阻塞状态。
- queue_(std::queue): 底层存储容器。
- mutex_(std::mutex): 保证多线程操作安全的互斥锁。
- not_empty_(std::condition_variable):用于线程前同步。
template <typename T>
class BlockingQueue {
public:
BlockingQueue(bool nonblock = false) : nonblock_(nonblock) { }
// 入队操作
void Push(const T &value) {
// lock_guard自动加锁解锁,与声明周期一致(实现了构造加锁和析构解锁)
std::lock_guard<std::mutex> lock(mutex_);
// 元素入队
queue_.push(value);
// 通知一个正在 wait 的线程: 队列不空,可以取任务。
not_empty_.notify_one();
}
// 正常 pop 弹出元素
// 异常 pop 没有弹出元素
bool Pop(T &value) {
// 加锁
// 如果该线程已被加锁则进不来
// condition_variable::wait需要这个unique_lock
std::unique_lock<std::mutex> lock(mutex_);
// 这一行的主要作用是 维护取操作的安全性(在队列非空的时候才能继续往下走。
// 但是如果只判断队列是否为空来决定是否往下走,Cancel之后怎么办?
// 即我想清空队列后结束所有线程,此时消费者线程依旧会被阻塞在这一行,因为它不知道是否要结束。
// 所以我们设置标志nonblock_来告诉消费者线程是否结束)
// 当 队列非空 或者 队列执行了Cancel 时 -> predicate谓词为true->正常往下走
// 即队列里还有任务,或者队列里没任务了,但是消费者知道任务结束了,可以继续下班了。都会继续往下走。
// 当 队列为空 并且 队列未执行Cancel 时 -> 谓词为false -> 此线程自动解锁mutex,让出CPU,阻塞在这一行
// 即队列空了,并且消费者知道还没下班,则阻塞等待。
not_empty_.wait(lock, [this]{ return !queue_.empty() || nonblock_; });
if (queue_.empty()) return false; // 该消费者线程下班
value = queue_.front();
queue_.pop();
return true;
}
// 解除阻塞在当前队列的线程
void Cancel() {
// 自动加锁解锁
std::lock_guard<std::mutex> lock(mutex_);
// 告诉消费者下班
nonblock_ = true;
// 主动唤醒所有阻塞在wait的消费者(在wait的消费者被唤醒后还会走一遍wait的谓词判断。false则继续阻塞)
not_empty_.notify_all();
}
private:
bool nonblock_;
std::queue<T> queue_;
std::mutex mutex_;
std::condition_variable not_empty_;
};
阻塞队列(双队列版)
单队列中, 生产者和消费者都要竞争同一把锁。
双队列:
prod_queue_
:生产者写入队列(保护锁prod_mutex_
)。cons_queue_
:消费者读取队列(保护锁cons_mutex_
)。- 当消费者队列空时,通过
SwapQueue_()
将生产队列和消费队列 交换,实现“批量搬运”。
好处是:
- 减少锁竞争:生产者和消费者基本不抢同一把锁。
- 吞吐更高:一次交换,消费者能批量取走数据,减少频繁加锁。
template <typename T>
class BlockingQueuePro {
public:
BlockingQueuePro(bool nonblock = false) : nonblock_(nonblock) {}
void Push(const T &value) {
std::lock_guard<std::mutex> lock(prod_mutex_);
prod_queue_.push(value);
not_empty_.notify_one();
}
bool Pop(T &value) {
std::unique_lock<std::mutex> lock(cons_mutex_);
// 消费者队列为空时自动触发swap,如果swap后仍为空,则消费失败
// 此时就不需要.wait了
if (cons_queue_.empty() && SwapQueue_() == 0) {
return false;
}
value = cons_queue_.front();
cons_queue_.pop();
return true;
}
void Cancel() {
std::lock_guard<std::mutex> lock(prod_mutex_);
nonblock_ = true;
not_empty_.notify_all();
}
private:
int SwapQueue_() {
std::unique_lock<std::mutex> lock(prod_mutex_);
// 当生产者队列为空,并且未告知下班时 -> 两个false -> 阻塞
// 当Cancel了,不管队列是否为空,都会往下走
not_empty_.wait(lock, [this] {return !prod_queue_.empty() || nonblock_; });
std::swap(prod_queue_, cons_queue_);
// 返回交换后消费者队列中的任务数量
// =0只有一种情况,生产者队列为空时被告知下班
return cons_queue_.size();
}
bool nonblock_;
std::queue<T> prod_queue_;
std::queue<T> cons_queue_;
std::mutex prod_mutex_;
std::mutex cons_mutex_;
std::condition_variable not_empty_;
};
Test
理论上双队列比单队列快,因为锁的竞争/碰撞少了。下面的实验结果也是如此:
写一个实验:设置4个生产者线程,一个生产者给25000个任务,任务是1000次循环。消费者数量指定当前cpu最适合线程数(std::thread::hardware_concurrency())。我这个WSL环境是16个。
分别跑single(单队列)和double(双队列),输出总耗时和QPS。
std::atomic<int> task_counter{0};
int main() {
const int num_producers = 4;
const int num_tasks_per_producer = 25000; // 总共 100,000 个任务
const int num_threads_in_pool = std::thread::hardware_concurrency();
// 为了测试,single版本和double版本改为了继承自ThreadPoolBase的两个派生类
// 想看具体实现可见附录
ThreadPoolSingle pool(num_threads_in_pool);
auto start = std::chrono::high_resolution_clock::now();
std::vector<std::thread> producers;
// 4个生产者开始干活
for (int i = 0; i < num_producers; ++i) {
producers.emplace_back(Producer, std::ref(pool), i, num_tasks_per_producer);
}
for (auto& p : producers) {
p.join();
}
// 这里主线程忙等,用std::condition_variable,等待唤醒可能更高效
while (task_counter < num_producers * num_tasks_per_producer) {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
auto end = std::chrono::high_resolution_clock::now();
double elapsed = std::chrono::duration<double>(end - start).count();
int total_tasks = num_producers * num_tasks_per_producer;
std::cout << "[Single Queue] Total: " << total_tasks
<< " tasks. Time: " << elapsed
<< " seconds. QPS = " << total_tasks / elapsed << std::endl;
}
void Task(int id) {
volatile long sum = 0;
for (int i = 0; i < 1000; ++i) {
sum += i;
}
task_counter++;
}
void Producer(ThreadPoolSingle& pool, int producer_id, int num_tasks) {
for (int i = 0; i < num_tasks; ++i) {
int task_id = producer_id * 100000 + i;
pool.Post([task_id]() { Task(task_id); });
}
}
结果:
将任务数和主线程轮询时间各减一个0后(任务数改为2500,主线程轮询等待5)(因为完成的太快了,主线程等待时间过长影响结果)
用平均数更准,但是这么大差距也不会被误差所影响。
附录
修改版单队列和双队列
#pragma once
#include <thread>
#include <functional>
#include <vector>
#include "blockingqueue.h"
#include "blockingqueuepro.h"
// 前置声明
// blockingqueue 仅仅只能用作指针或引用
// template <typename T>
// class BlockingQueue;
// template <typename T>
// class BlockingQueuePro;
// class ThreadPool {
// public:
// // 初始化线程池
// explicit ThreadPool(int threads_num);
// // 停止线程池
// ~ThreadPool();
// // 发布任务到线程池
// void Post(std::function<void()> task);
// private:
// void Worker();
// std::unique_ptr<BlockingQueue<std::function<void()>>> task_queue_;
// std::vector<std::thread> workers_;
// };
class ThreadPoolBase {
public:
explicit ThreadPoolBase(int threads_num) : threads_num_(threads_num) {}
virtual ~ThreadPoolBase() = default;
virtual void Post(std::function<void()> task) = 0;
protected:
int threads_num_;
std::vector<std::thread> workers_;
};
class ThreadPoolSingle : public ThreadPoolBase {
public:
explicit ThreadPoolSingle(int threads_num)
: ThreadPoolBase(threads_num),
task_queue_(std::make_unique<BlockingQueue<std::function<void()>>>()) {
for (int i = 0; i < threads_num_; ++i) {
workers_.emplace_back([this] { Worker(); });
}
}
~ThreadPoolSingle() {
task_queue_->Cancel();
for (auto &w : workers_) {
if (w.joinable()) w.join();
}
}
void Post(std::function<void()> task) override {
task_queue_->Push(task);
}
private:
void Worker() {
while (true) {
std::function<void()> task;
if (!task_queue_->Pop(task)) break;
task();
}
}
std::unique_ptr<BlockingQueue<std::function<void()>>> task_queue_;
};
class ThreadPoolDouble : public ThreadPoolBase {
public:
explicit ThreadPoolDouble(int threads_num)
: ThreadPoolBase(threads_num),
task_queue_(std::make_unique<BlockingQueuePro<std::function<void()>>>()) {
for (int i = 0; i < threads_num_; ++i) {
workers_.emplace_back([this] { Worker(); });
}
}
~ThreadPoolDouble() {
task_queue_->Cancel();
for (auto &w : workers_) {
if (w.joinable()) w.join();
}
}
void Post(std::function<void()> task) override {
task_queue_->Push(task);
}
private:
void Worker() {
while (true) {
std::function<void()> task;
if (!task_queue_->Pop(task)) break;
task();
}
}
std::unique_ptr<BlockingQueuePro<std::function<void()>>> task_queue_;
};