Implementing the Producer-Consumer Model
Problem Description
The producer-consumer model is a classic multithreading synchronization problem. The task is to design a program containing two types of threads: producer threads and consumer threads. They share a fixed-size buffer. Producers put data into the buffer, and consumers take data from the buffer. The following synchronization issues need to be addressed:
- When the buffer is full, producers must wait.
- When the buffer is empty, consumers must wait.
- Ensure mutually exclusive access to the buffer (only one thread operates on the buffer at a time).
Solution Process
We will implement this model using mutex locks and condition variables, which is the most standard solution.
Step 1: Define Shared Data Structures and Synchronization Tools
#include <queue>
#include <mutex>
#include <condition_variable>
const int BUFFER_SIZE = 5; // Buffer capacity
std::queue<int> buffer; // Shared buffer
std::mutex mtx; // Mutex lock
std::condition_variable not_full; // Condition variable for buffer not full
std::condition_variable not_empty; // Condition variable for buffer not empty
Step 2: Producer Thread Logic
void producer(int id) {
for (int i = 0; i < 10; ++i) {
std::unique_lock<std::mutex> lock(mtx);
// Wait until the buffer is not full (standard usage of condition variable)
not_full.wait(lock, []{
return buffer.size() < BUFFER_SIZE;
});
// Produce data
int item = id * 100 + i;
buffer.push(item);
std::cout << "Producer " << id << " produced " << item << std::endl;
lock.unlock();
// Notify consumers that the buffer is not empty
not_empty.notify_all();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
Key Points Explanation:
- The second parameter of
wait()is a predicate; execution continues only when the condition is met. - Unlocking before notifying can improve performance.
notify_all()wakes up all threads waiting on this condition.
Step 3: Consumer Thread Logic
void consumer(int id) {
for (int i = 0; i < 10; ++i) {
std::unique_lock<std::mutex> lock(mtx);
// Wait until the buffer is not empty
not_empty.wait(lock, []{
return !buffer.empty();
});
// Consume data
int item = buffer.front();
buffer.pop();
std::cout << "Consumer " << id << " consumed " << item << std::endl;
lock.unlock();
// Notify producers that the buffer is not full
not_full.notify_all();
std::this_thread::sleep_for(std::chrono::milliseconds(150));
}
}
Step 4: Complete Runnable Example
#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
const int BUFFER_SIZE = 5;
std::queue<int> buffer;
std::mutex mtx;
std::condition_variable not_full, not_empty;
void producer(int id) {
for (int i = 0; i < 10; ++i) {
std::unique_lock<std::mutex> lock(mtx);
not_full.wait(lock, []{ return buffer.size() < BUFFER_SIZE; });
int item = id * 100 + i;
buffer.push(item);
std::cout << "Producer " << id << " produced " << item << std::endl;
lock.unlock();
not_empty.notify_all();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
void consumer(int id) {
for (int i = 0; i < 10; ++i) {
std::unique_lock<std::mutex> lock(mtx);
not_empty.wait(lock, []{ return !buffer.empty(); });
int item = buffer.front();
buffer.pop();
std::cout << "Consumer " << id << " consumed " << item << std::endl;
lock.unlock();
not_full.notify_all();
std::this_thread::sleep_for(std::chrono::milliseconds(150));
}
}
int main() {
std::thread p1(producer, 1);
std::thread p2(producer, 2);
std::thread c1(consumer, 1);
std::thread c2(consumer, 2);
p1.join(); p2.join();
c1.join(); c2.join();
return 0;
}
Step 5: In-Depth Analysis of How Condition Variables Work
- When a thread calls
wait(), it automatically releases the mutex and enters a waiting state. - When awakened by
notify_all(), the thread reacquires the mutex and checks the condition. - If the condition is met, it proceeds; otherwise, it waits again (avoiding spurious wake-ups).
Common Interview Variations:
- Implementation using semaphores.
- Encapsulation using a blocking queue.
- Producer-consumer in a distributed environment (message queue).
This model embodies the core concepts of multithreaded programming: mutual exclusion + conditional synchronization. Understanding this model is essential for mastering more complex concurrent programming designs.