实现生产者-消费者模型
字数 611 2025-11-04 20:48:20
实现生产者-消费者模型
题目描述
生产者-消费者模型是一种经典的多线程同步问题。要求设计一个程序,包含两类线程:生产者线程和消费者线程。它们共享一个固定大小的缓冲区。生产者向缓冲区放入数据,消费者从缓冲区取出数据。需要解决以下同步问题:
- 当缓冲区满时,生产者必须等待
- 当缓冲区空时,消费者必须等待
- 保证对缓冲区的互斥访问(同一时刻只有一个线程操作缓冲区)
解题过程
我们将使用互斥锁(mutex)和条件变量(condition variable)实现这个模型,这是最标准的解法。
步骤1:定义共享数据结构和同步工具
#include <queue>
#include <mutex>
#include <condition_variable>
const int BUFFER_SIZE = 5; // 缓冲区容量
std::queue<int> buffer; // 共享缓冲区
std::mutex mtx; // 互斥锁
std::condition_variable not_full; // 缓冲区不满的条件变量
std::condition_variable not_empty; // 缓冲区不空的条件变量
步骤2:生产者线程逻辑
void producer(int id) {
for (int i = 0; i < 10; ++i) {
std::unique_lock<std::mutex> lock(mtx);
// 等待缓冲区不满(条件变量的标准用法)
not_full.wait(lock, []{
return buffer.size() < BUFFER_SIZE;
});
// 生产数据
int item = id * 100 + i;
buffer.push(item);
std::cout << "Producer " << id << " produced " << item << std::endl;
lock.unlock();
// 通知消费者缓冲区不空
not_empty.notify_all();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
关键点解释:
wait()的第二个参数是判断条件,条件满足时才继续执行- 先解锁再通知可以提高性能
notify_all()唤醒所有等待该条件的线程
步骤3:消费者线程逻辑
void consumer(int id) {
for (int i = 0; i < 10; ++i) {
std::unique_lock<std::mutex> lock(mtx);
// 等待缓冲区不空
not_empty.wait(lock, []{
return !buffer.empty();
});
// 消费数据
int item = buffer.front();
buffer.pop();
std::cout << "Consumer " << id << " consumed " << item << std::endl;
lock.unlock();
// 通知生产者缓冲区不满
not_full.notify_all();
std::this_thread::sleep_for(std::chrono::milliseconds(150));
}
}
步骤4:完整的可运行示例
#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
const int BUFFER_SIZE = 5;
std::queue<int> buffer;
std::mutex mtx;
std::condition_variable not_full, not_empty;
void producer(int id) {
for (int i = 0; i < 10; ++i) {
std::unique_lock<std::mutex> lock(mtx);
not_full.wait(lock, []{ return buffer.size() < BUFFER_SIZE; });
int item = id * 100 + i;
buffer.push(item);
std::cout << "Producer " << id << " produced " << item << std::endl;
lock.unlock();
not_empty.notify_all();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
void consumer(int id) {
for (int i = 0; i < 10; ++i) {
std::unique_lock<std::mutex> lock(mtx);
not_empty.wait(lock, []{ return !buffer.empty(); });
int item = buffer.front();
buffer.pop();
std::cout << "Consumer " << id << " consumed " << item << std::endl;
lock.unlock();
not_full.notify_all();
std::this_thread::sleep_for(std::chrono::milliseconds(150));
}
}
int main() {
std::thread p1(producer, 1);
std::thread p2(producer, 2);
std::thread c1(consumer, 1);
std::thread c2(consumer, 2);
p1.join(); p2.join();
c1.join(); c2.join();
return 0;
}
步骤5:条件变量的工作原理深度解析
- 当线程调用
wait()时,会自动释放互斥锁并进入等待状态 - 被
notify_all()唤醒后,线程会重新获取互斥锁并检查条件 - 如果条件满足,继续执行;否则再次进入等待(避免虚假唤醒)
常见面试变体:
- 使用信号量(Semaphore)实现
- 使用阻塞队列(BlockingQueue)封装
- 分布式环境下的生产者-消费者(消息队列)
这个模型体现了多线程编程的核心思想:互斥访问 + 条件同步。理解这个模型有助于掌握更复杂的并发程序设计。