Implementing a Blocking Queue
Problem Description
A blocking queue is a special type of queue. When the queue is empty, operations that retrieve elements from the queue are blocked until new elements become available. When the queue is full, operations that add elements to the queue are blocked until space becomes available. Please implement a thread-safe blocking queue.
Background Knowledge
Blocking queues are important synchronization tools in multithreaded programming, commonly used in producer-consumer models. They coordinate threads through internal condition variables and mutex locks.
Implementation Steps
-
Basic Structure Design
We need a queue container (such as an array or linked list), a mutex lock (to ensure thread safety), and two condition variables (corresponding to the conditions "queue not empty" and "queue not full"). -
Core Operations Analysis
put(element): Adds an element to the queue. If the queue is full, blocks the calling thread until space is available.take(): Retrieves an element from the queue. If the queue is empty, blocks the calling thread until a new element is available.
- Detailed Implementation Process
Step 1: Define Class Members
#include <queue>
#include <mutex>
#include <condition_variable>
template<typename T>
class BlockingQueue {
private:
std::queue<T> queue_; // Underlying queue container
std::mutex mutex_; // Mutex lock
std::condition_variable not_empty_; // Not empty condition variable
std::condition_variable not_full_; // Not full condition variable
size_t capacity_; // Queue capacity
Here, the standard library's queue is used as the underlying container. Mutual exclusion is achieved via a mutex, and two condition_variables are used for waiting on the "not empty" and "not full" conditions, respectively.
Step 2: Constructor Initialization
public:
explicit BlockingQueue(size_t capacity) : capacity_(capacity) {}
The constructor takes a capacity parameter to specify the maximum length of the queue.
Step 3: Implement the put Method
void put(const T& item) {
std::unique_lock<std::mutex> lock(mutex_);
// Wait for the queue to not be full (standard usage of condition variables: loop check to prevent spurious wakeups)
not_full_.wait(lock, [this]() {
return queue_.size() < capacity_;
});
queue_.push(item); // Add element to the queue
not_empty_.notify_one(); // Notify a potentially waiting consumer
}
- After acquiring the mutex lock, check if the queue is full.
- If full, release the lock via
waitand block until woken up by another thread. - After adding the element, wake up one potentially waiting consumer thread via
notify_one.
Step 4: Implement the take Method
T take() {
std::unique_lock<std::mutex> lock(mutex_);
// Wait for the queue to not be empty
not_empty_.wait(lock, [this]() {
return !queue_.empty();
});
T item = queue_.front(); // Get the front element
queue_.pop(); // Remove the front element
not_full_.notify_one(); // Notify a potentially waiting producer
return item;
}
- Symmetric to the
putmethod, first wait for the queue to not be empty. - After retrieving the element, notify a potentially blocked producer thread.
Step 5: Optional Helper Methods
bool empty() const {
std::lock_guard<std::mutex> lock(mutex_);
return queue_.empty();
}
size_t size() const {
std::lock_guard<std::mutex> lock(mutex_);
return queue_.size();
}
These helper methods also require locking to ensure thread safety.
Key Points
- Condition Variable Usage Pattern: Always use a while loop (or lambda expression) to check the condition, preventing spurious wakeups.
- RAII Lock Management: Use
unique_lockto automatically manage lock acquisition and release. - Notification Timing: Notify waiting threads immediately after changing the condition state.
- Exception Safety: Ensure locks are properly released in case of exceptions.
This implementation provides the core functionality of a blocking queue, which can be safely used in a multithreaded environment. It is a classic case for understanding thread synchronization.