Python中的并发编程模式:生产者-消费者问题与Queue实现
字数 717 2025-11-17 02:56:57
Python中的并发编程模式:生产者-消费者问题与Queue实现
问题描述
生产者-消费者问题是并发编程中的经典同步问题,涉及两类线程:生产者(生成数据)和消费者(处理数据)。它们通过共享的有限容量缓冲区(队列)进行通信。核心挑战在于:
- 当缓冲区满时,生产者必须等待;
- 当缓冲区空时,消费者必须等待;
- 需保证对缓冲区的操作是线程安全的。
循序渐进讲解
1. 基础概念:为什么需要缓冲区?
- 生产者与消费者速度不匹配(如生产者生成数据快,消费者处理慢),缓冲区起到解耦和平衡负载的作用。
- 在Python中,通常使用
queue.Queue作为线程安全的缓冲区。
2. Queue的核心方法
put(item, block=True, timeout=None): 添加数据,若队列满则阻塞。get(block=True, timeout=None): 取出数据,若队列空则阻塞。task_done(): 标记当前任务处理完成,与join()配合使用。join(): 阻塞直到所有任务被处理完。
3. 实现基础生产者-消费者模型
import threading
import queue
import time
def producer(q, items):
for item in items:
q.put(item) # 队列满时自动阻塞
print(f"生产: {item}")
time.sleep(0.1) # 模拟生产耗时
def consumer(q):
while True:
item = q.get() # 队列空时自动阻塞
print(f"消费: {item}")
q.task_done() # 通知队列任务完成
q = queue.Queue()
threading.Thread(target=consumer, args=(q,), daemon=True).start()
producer(q, range(5))
q.join() # 等待所有任务被消费
print("所有任务完成")
4. 处理多个生产者和消费者
- 创建多个线程分别执行生产者和消费者函数:
q = queue.Queue(maxsize=3) # 限制缓冲区大小
producers = [threading.Thread(target=producer, args=(q, [1, 2, 3])) for _ in range(2)]
consumers = [threading.Thread(target=consumer, args=(q,)) for _ in range(3)]
for p in producers:
p.start()
for c in consumers:
c.daemon = True # 设为守护线程,主线程退出时自动结束
c.start()
q.join() # 等待所有任务处理完毕
5. 优雅终止消费者
- 通过发送特殊信号(如
None)通知消费者退出:
def consumer(q):
while True:
item = q.get()
if item is None: # 终止信号
q.task_done()
break
print(f"消费: {item}")
q.task_done()
# 生产者结束后发送终止信号
for _ in consumers:
q.put(None)
6. 使用条件变量(Condition)手动实现队列
(进阶)理解底层同步机制:
import threading
class ManualQueue:
def __init__(self, maxsize):
self.maxsize = maxsize
self.queue = []
self.mutex = threading.Lock()
self.not_empty = threading.Condition(self.mutex) # 条件变量
self.not_full = threading.Condition(self.mutex)
def put(self, item):
with self.not_full:
while len(self.queue) >= self.maxsize:
self.not_full.wait() # 等待"非满"信号
self.queue.append(item)
self.not_empty.notify() # 通知消费者有数据
def get(self):
with self.not_empty:
while len(self.queue) == 0:
self.not_empty.wait() # 等待"非空"信号
item = self.queue.pop(0)
self.not_full.notify() # 通知生产者有空间
return item
关键要点总结
- 线程安全:
Queue内部使用锁和条件变量保证操作的原子性。 - 阻塞控制:通过条件变量自动管理线程的等待和唤醒。
- 资源管理:设置缓冲区大小防止内存溢出,
join()确保任务完整性。 - 扩展性:模型可轻松扩展为多生产者和多消费者场景。