Python中的异步I/O原语:Event、Lock、Semaphore、Condition的原理与使用
字数 1618 2025-11-23 03:05:17
Python中的异步I/O原语:Event、Lock、Semaphore、Condition的原理与使用
1. 异步I/O原语概述
在异步编程中,多个协程可能并发访问共享资源或需要协调执行顺序。为了避免竞争条件(Race Condition)或实现复杂的协作逻辑,asyncio提供了以下同步原语:
- Event:用于协程间的通知机制,一个协程等待事件触发,另一个协程触发事件。
- Lock:互斥锁,确保同一时间只有一个协程访问共享资源。
- Semaphore:信号量,限制同时访问资源的协程数量。
- Condition:条件变量,允许协程等待某个条件成立后再执行。
这些原语的异步版本与线程同步原语的关键区别:
- 异步原语在等待时主动让出事件循环(await),而不是阻塞线程,从而避免浪费CPU资源。
2. Event(异步事件)
核心机制
- Event内部维护一个布尔状态
_is_set(默认为False)和一个等待队列(协程列表)。 - 当协程调用
await event.wait()时,如果_is_set为False,则协程进入等待队列。 - 当某个协程调用
event.set()时,_is_set变为True,并唤醒所有等待的协程。
使用示例
import asyncio
async def waiter(event):
print("等待事件触发...")
await event.wait() # 让出控制权,等待事件被set
print("事件已触发,继续执行")
async def setter(event):
await asyncio.sleep(2)
print("触发事件")
event.set() # 唤醒所有等待的协程
async def main():
event = asyncio.Event()
await asyncio.gather(waiter(event), setter(event))
asyncio.run(main())
输出:
等待事件触发...
(等待2秒)
触发事件
事件已触发,继续执行
关键点
event.clear()可将状态重置为False,后续调用wait()的协程会重新等待。- 如果事件已触发(
_is_set为True),wait()会立即返回。
3. Lock(异步互斥锁)
解决的问题
当多个协程修改同一数据时,可能因交替执行导致数据不一致。例如:
async def unsafe_counter():
global count
temp = count
await asyncio.sleep(0.01) # 模拟I/O操作,此时可能被其他协程抢占
count = temp + 1
两个协程可能同时读取count=0,最终结果变为1(而非预期的2)。
使用方法
import asyncio
count = 0
lock = asyncio.Lock()
async def safe_counter():
global count
async with lock: # 自动获取和释放锁
temp = count
await asyncio.sleep(0.01)
count = temp + 1
async def main():
await asyncio.gather(*[safe_counter() for _ in range(100)])
print(count) # 正确输出100
asyncio.run(main())
底层原理
- Lock内部维护一个队列。协程调用
await lock.acquire()时,如果锁未被占用则立即获取;否则加入队列等待。 lock.release()会唤醒队列中的第一个协程。- 必须使用
async with或try/finally确保释放锁,避免死锁。
4. Semaphore(异步信号量)
核心功能
限制同时访问某资源的协程数量(例如限制并发HTTP请求数)。
示例:限制并发数为2
import asyncio
semaphore = asyncio.Semaphore(2)
async def limited_task(id):
async with semaphore:
print(f"任务{id} 获取信号量")
await asyncio.sleep(1)
print(f"任务{id} 释放信号量")
async def main():
await asyncio.gather(*[limited_task(i) for i in range(5)])
asyncio.run(main())
输出:
任务0 获取信号量
任务1 获取信号量
(等待1秒)
任务0 释放信号量
任务1 释放信号量
任务2 获取信号量
任务3 获取信号量
...
原理
- 信号量维护一个计数器
_value。 await semaphore.acquire():如果_value>0则减1并立即返回;否则等待。semaphore.release():将_value加1,并唤醒一个等待协程。
5. Condition(异步条件变量)
应用场景
协程需要等待某个条件成立(如共享数据达到特定状态)后再执行。
示例:生产者-消费者模型
import asyncio
queue = []
condition = asyncio.Condition()
async def producer():
async with condition:
queue.append("数据")
print("生产数据,通知消费者")
condition.notify_all() # 唤醒所有等待的消费者
async def consumer(id):
async with condition:
while len(queue) == 0:
print(f"消费者{id} 等待数据")
await condition.wait() # 释放锁并等待通知
data = queue.pop()
print(f"消费者{id} 消费 {data}")
async def main():
await asyncio.gather(consumer(1), consumer(2), producer())
asyncio.run(main())
输出:
消费者1 等待数据
消费者2 等待数据
生产数据,通知消费者
消费者1 消费 数据
消费者2 消费 数据
关键细节
- 必须使用
async with condition:因为wait()会释放锁,允许其他协程修改条件。 - 总是用循环检查条件:被唤醒后条件可能已被其他协程改变(例如多个消费者抢一个数据)。
notify(n)可唤醒指定数量的等待协程。
6. 总结与注意事项
- 异步原语不会阻塞事件循环:等待时通过await让出控制权,与线程同步原语有本质区别。
- 避免死锁:确保锁总是被释放,例如用
async with替代手动acquire/release。 - 性能考量:在高频场景中,同步原语可能成为瓶颈,需考虑无锁设计或更高级并发模型(如Actor模式)。
- 与线程原语的对比:
特性 线程同步原语 异步同步原语 阻塞行为 阻塞线程 让出事件循环 适用场景 CPU密集型/阻塞I/O 异步I/O 资源消耗 线程上下文切换成本高 协程切换成本低
通过合理使用这些原语,可以安全地协调异步任务,构建高效的并发程序。