Implementing the Producer-Consumer Model

Implementing the Producer-Consumer Model

Problem Description
The producer-consumer model is a classic multithreading synchronization problem. The task is to design a program containing two types of threads: producer threads and consumer threads. They share a fixed-size buffer. Producers put data into the buffer, and consumers take data from the buffer. The following synchronization issues need to be addressed:

  1. When the buffer is full, producers must wait.
  2. When the buffer is empty, consumers must wait.
  3. Ensure mutually exclusive access to the buffer (only one thread operates on the buffer at a time).

Solution Process
We will implement this model using mutex locks and condition variables, which is the most standard solution.

Step 1: Define Shared Data Structures and Synchronization Tools

#include <queue>
#include <mutex>
#include <condition_variable>

const int BUFFER_SIZE = 5;  // Buffer capacity

std::queue<int> buffer;     // Shared buffer
std::mutex mtx;             // Mutex lock
std::condition_variable not_full;   // Condition variable for buffer not full
std::condition_variable not_empty;  // Condition variable for buffer not empty

Step 2: Producer Thread Logic

void producer(int id) {
    for (int i = 0; i < 10; ++i) {
        std::unique_lock<std::mutex> lock(mtx);
        
        // Wait until the buffer is not full (standard usage of condition variable)
        not_full.wait(lock, []{ 
            return buffer.size() < BUFFER_SIZE; 
        });
        
        // Produce data
        int item = id * 100 + i;
        buffer.push(item);
        std::cout << "Producer " << id << " produced " << item << std::endl;
        
        lock.unlock();
        // Notify consumers that the buffer is not empty
        not_empty.notify_all();
        
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
}

Key Points Explanation:

  • The second parameter of wait() is a predicate; execution continues only when the condition is met.
  • Unlocking before notifying can improve performance.
  • notify_all() wakes up all threads waiting on this condition.

Step 3: Consumer Thread Logic

void consumer(int id) {
    for (int i = 0; i < 10; ++i) {
        std::unique_lock<std::mutex> lock(mtx);
        
        // Wait until the buffer is not empty
        not_empty.wait(lock, []{ 
            return !buffer.empty(); 
        });
        
        // Consume data
        int item = buffer.front();
        buffer.pop();
        std::cout << "Consumer " << id << " consumed " << item << std::endl;
        
        lock.unlock();
        // Notify producers that the buffer is not full
        not_full.notify_all();
        
        std::this_thread::sleep_for(std::chrono::milliseconds(150));
    }
}

Step 4: Complete Runnable Example

#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>

const int BUFFER_SIZE = 5;
std::queue<int> buffer;
std::mutex mtx;
std::condition_variable not_full, not_empty;

void producer(int id) {
    for (int i = 0; i < 10; ++i) {
        std::unique_lock<std::mutex> lock(mtx);
        not_full.wait(lock, []{ return buffer.size() < BUFFER_SIZE; });
        
        int item = id * 100 + i;
        buffer.push(item);
        std::cout << "Producer " << id << " produced " << item << std::endl;
        
        lock.unlock();
        not_empty.notify_all();
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
}

void consumer(int id) {
    for (int i = 0; i < 10; ++i) {
        std::unique_lock<std::mutex> lock(mtx);
        not_empty.wait(lock, []{ return !buffer.empty(); });
        
        int item = buffer.front();
        buffer.pop();
        std::cout << "Consumer " << id << " consumed " << item << std::endl;
        
        lock.unlock();
        not_full.notify_all();
        std::this_thread::sleep_for(std::chrono::milliseconds(150));
    }
}

int main() {
    std::thread p1(producer, 1);
    std::thread p2(producer, 2);
    std::thread c1(consumer, 1);
    std::thread c2(consumer, 2);
    
    p1.join(); p2.join();
    c1.join(); c2.join();
    return 0;
}

Step 5: In-Depth Analysis of How Condition Variables Work

  1. When a thread calls wait(), it automatically releases the mutex and enters a waiting state.
  2. When awakened by notify_all(), the thread reacquires the mutex and checks the condition.
  3. If the condition is met, it proceeds; otherwise, it waits again (avoiding spurious wake-ups).

Common Interview Variations:

  1. Implementation using semaphores.
  2. Encapsulation using a blocking queue.
  3. Producer-consumer in a distributed environment (message queue).

This model embodies the core concepts of multithreaded programming: mutual exclusion + conditional synchronization. Understanding this model is essential for mastering more complex concurrent programming designs.