Python中的协程与异步编程中的异步队列(asyncio.Queue)原理与应用
字数 1126 2025-11-30 00:59:43
Python中的协程与异步编程中的异步队列(asyncio.Queue)原理与应用
1. 异步队列的应用场景
在异步编程中,多个协程可能需要安全地交换数据。例如:
- 生产者-消费者模型:生产者协程生成数据,消费者协程处理数据。
- 限流控制:通过队列长度限制并发任务数量。
- 任务调度:协程通过队列传递任务,实现负载均衡。
异步队列(asyncio.Queue)的核心需求是:在协程间高效传递数据,且避免阻塞事件循环。
2. 异步队列的基本用法
(1)创建队列
import asyncio
queue = asyncio.Queue(maxsize=3) # 最大容量为3,默认为0(无界队列)
(2)生产者协程:放入数据
async def producer():
for i in range(5):
await queue.put(i) # 如果队列已满,则等待直到有空位
print(f"Produced: {i}")
(3)消费者协程:取出数据
async def consumer():
while True:
item = await queue.get() # 如果队列为空,则等待直到有数据
print(f"Consumed: {item}")
queue.task_done() # 标记任务完成,用于同步
(4)启动多个协程
async def main():
# 启动消费者任务
consumer_task = asyncio.create_task(consumer())
# 生产者生产数据
await producer()
# 等待队列中所有任务被处理完毕
await queue.join()
consumer_task.cancel() # 取消消费者循环任务
asyncio.run(main())
3. 异步队列的底层原理
(1)队列的组成部分
- 底层数据结构:使用
collections.deque存储数据。 - 同步原语:通过
asyncio.Lock或类似机制保证线程安全(协程安全)。 - 等待机制:
- 当队列已满时,
put操作会阻塞,并记录一个Future对象,等待消费者通过get唤醒。 - 当队列为空时,
get操作会阻塞,并记录一个Future对象,等待生产者通过put唤醒。
- 当队列已满时,
(2)put 方法的伪代码逻辑
async def put(self, item):
if 队列已满:
# 创建Future对象,并添加到等待队列(_putters)
future = Future()
self._putters.append(future)
await future # 等待消费者唤醒
# 将数据放入队列
self._queue.append(item)
# 如果有消费者在等待,唤醒最早的一个
if self._getters:
waiter = self._getters.popleft()
waiter.set_result(None) # 唤醒消费者
(3)get 方法的伪代码逻辑
async def get(self):
if 队列为空:
# 创建Future对象,并添加到等待队列(_getters)
future = Future()
self._getters.append(future)
await future # 等待生产者唤醒
# 从队列取出数据
item = self._queue.popleft()
# 如果有生产者在等待,唤醒最早的一个
if self._putters:
waiter = self._putters.popleft()
waiter.set_result(None) # 唤醒生产者
return item
4. 异步队列的进阶特性
(1)任务同步:queue.task_done() 与 queue.join()
task_done():消费者每处理完一个任务后调用,减少内部计数器。join():阻塞直到队列中所有任务被处理(内部计数器归零)。
async def consumer():
while True:
item = await queue.get()
# 处理item...
queue.task_done() # 标记任务完成
# 在主协程中等待所有任务完成
await queue.join()
(2)非阻塞方法:put_nowait() 与 get_nowait()
- 如果队列已满,
put_nowait()抛出asyncio.QueueFull。 - 如果队列为空,
get_nowait()抛出asyncio.QueueEmpty。
(3)优先级队列:asyncio.PriorityQueue
继承自 asyncio.Queue,但按优先级顺序取出数据(优先级小的先取出):
pq = asyncio.PriorityQueue()
await pq.put((2, "低优先级"))
await pq.put((1, "高优先级"))
print(await pq.get()) # (1, "高优先级")
(4)LIFO队列:asyncio.LifoQueue
继承自 asyncio.Queue,后进先出(栈结构)。
5. 实际应用:限制并发任务数量
通过队列限制同时运行的协程数量(例如最多3个并发):
async def worker(queue):
while True:
task = await queue.get()
await process_task(task)
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=3)
# 启动3个消费者 worker
workers = [asyncio.create_task(worker(queue)) for _ in range(3)]
# 添加任务
for task in tasks:
await queue.put(task)
await queue.join()
# 取消worker任务
for w in workers:
w.cancel()
6. 总结
- 异步队列的核心价值:在协程间安全传递数据,避免阻塞事件循环。
- 关键机制:通过
Future对象实现等待与唤醒,保证协程协作。 - 适用场景:生产者-消费者模型、任务调度、限流控制等。
- 扩展变体:优先级队列(
PriorityQueue)、LIFO队列(LifoQueue)满足不同需求。