Python中的协程与异步I/O中的异步生成器任务调度与并发控制
这是一个关于异步生成器在并发任务中如何被调度与控制的高级主题,我会从异步生成器的基本概念开始,逐步深入到任务调度和并发控制的机制。
1. 核心概念:什么是异步生成器?
异步生成器是结合了异步编程和生成器特性的Python结构。它允许你使用async def定义一个包含yield表达式的协程函数。与普通生成器不同,它可以在yield点暂停并让出控制权,以便事件循环(event loop)在同一线程中执行其他任务,而yield的值可以被异步地获取。
async def async_generator():
for i in range(3):
# 模拟一个异步操作,比如网络请求
await asyncio.sleep(0.1) # 异步暂停
yield i # 产生一个值,并在此处暂停
2. 如何消费异步生成器:async for 和 anext()
你不能用普通的for循环来迭代异步生成器,因为每次迭代都可能涉及await。必须使用async for循环,它在每次迭代时都会调用生成器的__anext__()方法(返回一个可等待对象)。
async def main():
ag = async_generator() # 创建异步生成器对象
async for value in ag: # 异步迭代
print(value)
async for内部的工作原理大致如下:
- 调用
ag.__anext__(),它返回一个awaitable(通常是一个协程对象)。 - 使用
await等待这个awaitable。这会驱动异步生成器执行,直到下一个yield表达式。 await表达式的结果就是yield产生的值。- 如果没有更多值(生成器结束),
ag.__anext__()会抛出StopAsyncIteration异常,async for循环捕获此异常并终止。
3. 任务调度的核心:异步生成器是惰性的
关键点:异步生成器本身并不会自动并发执行。仅仅定义或创建一个异步生成器对象,并不会启动任何任务。它只有在被消费(迭代)时,才会被驱动。
看这个例子:
async def slow_async_gen(name, n):
for i in range(n):
await asyncio.sleep(0.2) # 模拟慢速I/O
yield f"{name}: {i}"
async def main():
gen1 = slow_async_gen("Gen1", 3)
gen2 = slow_async_gen("Gen2", 3)
# 顺序消费,总时间约1.2秒
async for val in gen1:
print(val)
async for val in gen2:
print(val)
这里,gen1完全消费完后,才开始消费gen2,没有并发。
4. 实现并发控制:将异步生成器包装成任务
为了让多个异步生成器并发产生值,你需要为每个生成器的消费过程创建一个独立的任务(Task)。asyncio.create_task()或asyncio.gather()可以帮助你。
async def consume_one_gen(agen):
"""消费一个异步生成器的协程函数"""
async for val in agen:
print(val)
# 在这里可以对产生的值进行即时处理
async def main_concurrent():
gen1 = slow_async_gen("Gen1", 3)
gen2 = slow_async_gen("Gen2", 3)
# 创建两个任务,它们会并发执行
task1 = asyncio.create_task(consume_one_gen(gen1))
task2 = asyncio.create_task(consume_one_gen(gen2))
# 等待两个任务都完成
await task1
await task2
运行这个版本,你会发现Gen1和Gen2的输出是交替出现的,总时间大约只有0.6秒。因为当gen1在await asyncio.sleep时,事件循环会切换到task2去驱动gen2。
5. 更精细的并发控制:使用异步队列(asyncio.Queue)
当你有多个异步生成器并发生产数据,并且有一个或多个消费者处理这些数据时,直接打印可能不够。你需要一个协调机制来防止生产者压垮消费者,或者反之。这时可以使用asyncio.Queue。
async def producer(name, n, queue):
async for i in range(n):
await asyncio.sleep(0.2)
item = f"{name}: {i}"
await queue.put(item) # 将数据放入队列,如果队列满则等待
print(f"Produced: {item}")
async def consumer(queue):
while True:
item = await queue.get() # 从队列获取数据,如果队列空则等待
print(f"Consumed: {item}")
queue.task_done() # 通知队列该项已被处理
async def main_queue():
queue = asyncio.Queue(maxsize=2) # 限制队列大小,实现背压(Backpressure)
# 启动消费者任务
consumer_task = asyncio.create_task(consumer(queue))
# 启动多个生产者任务
producer_tasks = [
asyncio.create_task(producer("P1", 3, queue)),
asyncio.create_task(producer("P2", 3, queue)),
]
# 等待所有生产者完成
await asyncio.gather(*producer_tasks)
# 等待队列清空
await queue.join()
# 取消消费者任务
consumer_task.cancel()
try:
await consumer_task
except asyncio.CancelledError:
pass
maxsize参数是关键。如果生产者快于消费者,队列会满,queue.put()会阻塞(await),从而减缓生产者的速度。这就是一种简单的背压(Backpressure)机制。
6. 使用asyncio.as_completed()处理最先可用的结果
如果你不关心数据来自哪个生成器,只希望尽快处理每个生成器产生的值,可以使用asyncio.as_completed()。这需要稍微改变模式,为每个生成器的“获取下一个值”操作创建可等待对象。
async def main_as_completed():
gens = [slow_async_gen(f"Gen{i}", 3) for i in range(3)]
# 为每个生成器创建一个“获取下一个值”的任务列表
next_tasks = {asyncio.create_task(gen.__anext__()): gen for gen in gens}
while next_tasks:
# 等待最先完成的任务
done, _ = await asyncio.wait(next_tasks.keys(), return_when=asyncio.FIRST_COMPLETED)
for task in done:
gen = next_tasks.pop(task)
try:
value = task.result() # 获取yield的值
print(f"Received: {value}")
# 为同一个生成器安排获取下一个值的任务
new_task = asyncio.create_task(gen.__anext__())
next_tasks[new_task] = gen
except StopAsyncIteration:
# 这个生成器结束了
print(f"Generator {gen} finished")
print("All generators exhausted")
这个模式更复杂,但它确保了事件循环总是在处理当前能够产生值的生成器,实现了高效的并发。
总结:
- 异步生成器本身是惰性的,需要被消费(迭代)才能执行。
- 实现并发的关键在于将消费过程包装成多个并发的
Task。 - 对于生产-消费者模式,使用
asyncio.Queue可以方便地实现流量控制(背压)。 - 使用
asyncio.as_completed()或asyncio.wait(..., return_when=FIRST_COMPLETED)可以处理多个流中“最先可用”的结果,最大化并发效率。