Python中的并发编程模式:生产者-消费者问题与Queue实现
字数 717 2025-11-17 02:56:57

Python中的并发编程模式:生产者-消费者问题与Queue实现

问题描述
生产者-消费者问题是并发编程中的经典同步问题,涉及两类线程:生产者(生成数据)和消费者(处理数据)。它们通过共享的有限容量缓冲区(队列)进行通信。核心挑战在于:

  1. 当缓冲区满时,生产者必须等待;
  2. 当缓冲区空时,消费者必须等待;
  3. 需保证对缓冲区的操作是线程安全的。

循序渐进讲解

1. 基础概念:为什么需要缓冲区?

  • 生产者与消费者速度不匹配(如生产者生成数据快,消费者处理慢),缓冲区起到解耦和平衡负载的作用。
  • 在Python中,通常使用queue.Queue作为线程安全的缓冲区。

2. Queue的核心方法

  • put(item, block=True, timeout=None): 添加数据,若队列满则阻塞。
  • get(block=True, timeout=None): 取出数据,若队列空则阻塞。
  • task_done(): 标记当前任务处理完成,与join()配合使用。
  • join(): 阻塞直到所有任务被处理完。

3. 实现基础生产者-消费者模型

import threading
import queue
import time

def producer(q, items):
    for item in items:
        q.put(item)  # 队列满时自动阻塞
        print(f"生产: {item}")
        time.sleep(0.1)  # 模拟生产耗时

def consumer(q):
    while True:
        item = q.get()  # 队列空时自动阻塞
        print(f"消费: {item}")
        q.task_done()  # 通知队列任务完成

q = queue.Queue()
threading.Thread(target=consumer, args=(q,), daemon=True).start()
producer(q, range(5))
q.join()  # 等待所有任务被消费
print("所有任务完成")

4. 处理多个生产者和消费者

  • 创建多个线程分别执行生产者和消费者函数:
q = queue.Queue(maxsize=3)  # 限制缓冲区大小
producers = [threading.Thread(target=producer, args=(q, [1, 2, 3])) for _ in range(2)]
consumers = [threading.Thread(target=consumer, args=(q,)) for _ in range(3)]

for p in producers:
    p.start()
for c in consumers:
    c.daemon = True  # 设为守护线程,主线程退出时自动结束
    c.start()
q.join()  # 等待所有任务处理完毕

5. 优雅终止消费者

  • 通过发送特殊信号(如None)通知消费者退出:
def consumer(q):
    while True:
        item = q.get()
        if item is None:  # 终止信号
            q.task_done()
            break
        print(f"消费: {item}")
        q.task_done()

# 生产者结束后发送终止信号
for _ in consumers:
    q.put(None)

6. 使用条件变量(Condition)手动实现队列
(进阶)理解底层同步机制:

import threading

class ManualQueue:
    def __init__(self, maxsize):
        self.maxsize = maxsize
        self.queue = []
        self.mutex = threading.Lock()
        self.not_empty = threading.Condition(self.mutex)  # 条件变量
        self.not_full = threading.Condition(self.mutex)

    def put(self, item):
        with self.not_full:
            while len(self.queue) >= self.maxsize:
                self.not_full.wait()  # 等待"非满"信号
            self.queue.append(item)
            self.not_empty.notify()  # 通知消费者有数据

    def get(self):
        with self.not_empty:
            while len(self.queue) == 0:
                self.not_empty.wait()  # 等待"非空"信号
            item = self.queue.pop(0)
            self.not_full.notify()  # 通知生产者有空间
            return item

关键要点总结

  • 线程安全Queue内部使用锁和条件变量保证操作的原子性。
  • 阻塞控制:通过条件变量自动管理线程的等待和唤醒。
  • 资源管理:设置缓冲区大小防止内存溢出,join()确保任务完整性。
  • 扩展性:模型可轻松扩展为多生产者和多消费者场景。
Python中的并发编程模式:生产者-消费者问题与Queue实现 问题描述 生产者-消费者问题是并发编程中的经典同步问题,涉及两类线程:生产者(生成数据)和消费者(处理数据)。它们通过共享的有限容量缓冲区(队列)进行通信。核心挑战在于: 当缓冲区满时,生产者必须等待; 当缓冲区空时,消费者必须等待; 需保证对缓冲区的操作是线程安全的。 循序渐进讲解 1. 基础概念:为什么需要缓冲区? 生产者与消费者速度不匹配(如生产者生成数据快,消费者处理慢),缓冲区起到解耦和平衡负载的作用。 在Python中,通常使用 queue.Queue 作为线程安全的缓冲区。 2. Queue的核心方法 put(item, block=True, timeout=None) : 添加数据,若队列满则阻塞。 get(block=True, timeout=None) : 取出数据,若队列空则阻塞。 task_done() : 标记当前任务处理完成,与 join() 配合使用。 join() : 阻塞直到所有任务被处理完。 3. 实现基础生产者-消费者模型 4. 处理多个生产者和消费者 创建多个线程分别执行生产者和消费者函数: 5. 优雅终止消费者 通过发送特殊信号(如 None )通知消费者退出: 6. 使用条件变量(Condition)手动实现队列 (进阶)理解底层同步机制: 关键要点总结 线程安全 : Queue 内部使用锁和条件变量保证操作的原子性。 阻塞控制 :通过条件变量自动管理线程的等待和唤醒。 资源管理 :设置缓冲区大小防止内存溢出, join() 确保任务完整性。 扩展性 :模型可轻松扩展为多生产者和多消费者场景。