Python中的异步I/O原语:Event、Lock、Semaphore、Condition的原理与使用
字数 873 2025-11-20 22:15:42
Python中的异步I/O原语:Event、Lock、Semaphore、Condition的原理与使用
题目描述
在Python异步编程中,asyncio提供了多种同步原语来处理并发场景下的资源竞争和协调问题。这些原语包括Event、Lock、Semaphore、Condition等,它们的设计原理和使用场景与多线程编程中的同步原语类似,但基于异步机制实现。
核心概念:异步同步原语
- 同步原语用于控制多个协程对共享资源的访问顺序
- 异步版本不会阻塞整个事件循环,而是让出执行权
- 基于asyncio.Future和协程机制实现
Event(事件)的原理与使用
基本概念
Event是最简单的同步原语,用于协程间的信号通知:
- 初始状态为未设置(False)
- 协程可以等待事件被设置
- 设置事件后,所有等待的协程都会被唤醒
实现原理
# 简化版Event实现逻辑
class SimpleEvent:
def __init__(self):
self._set = False
self._waiters = set() # 存储等待的Future对象
async def wait(self):
if self._set:
return
fut = asyncio.Future()
self._waiters.add(fut)
await fut # 等待事件被设置
def set(self):
self._set = True
for fut in self._waiters:
if not fut.done():
fut.set_result(True) # 唤醒所有等待者
self._waiters.clear()
使用示例
import asyncio
async def waiter(event, name):
print(f'{name} 等待事件触发')
await event.wait() # 等待事件被设置
print(f'{name} 检测到事件,继续执行')
async def setter(event):
print('设置器休眠2秒后触发事件')
await asyncio.sleep(2)
event.set() # 设置事件,唤醒所有等待者
async def main():
event = asyncio.Event()
# 创建多个等待任务
tasks = [
asyncio.create_task(waiter(event, f'等待者-{i}'))
for i in range(3)
]
# 创建触发任务
setter_task = asyncio.create_task(setter(event))
await asyncio.gather(*tasks, setter_task)
# 运行结果:
# 等待者-0 等待事件触发
# 等待者-1 等待事件触发
# 等待者-2 等待事件触发
# 设置器休眠2秒后触发事件
# (2秒后所有等待者同时被唤醒)
# 等待者-0 检测到事件,继续执行
# 等待者-1 检测到事件,继续执行
# 等待者-2 检测到事件,继续执行
Lock(锁)的原理与使用
基本概念
Lock用于保护共享资源,确保同一时间只有一个协程可以访问:
- 互斥锁,支持异步上下文管理器
- 可重入:同一个协程可以多次获取锁
- 公平性:按照请求顺序获取锁
实现原理
class SimpleLock:
def __init__(self):
self._locked = False
self._waiters = collections.deque() # 等待队列
async def acquire(self):
if not self._locked:
self._locked = True
return True
fut = asyncio.Future()
self._waiters.append(fut)
await fut # 进入等待队列
return True
def release(self):
if not self._locked:
raise RuntimeError('锁未被持有')
if self._waiters:
# 唤醒队列中的第一个等待者
fut = self._waiters.popleft()
fut.set_result(True)
else:
self._locked = False
使用示例
import asyncio
class SharedResource:
def __init__(self):
self.value = 0
self.lock = asyncio.Lock()
async def safe_increment(self):
async with self.lock: # 自动获取和释放锁
current = self.value
await asyncio.sleep(0.1) # 模拟耗时操作
self.value = current + 1
async def worker(resource, name, count):
for i in range(count):
await resource.safe_increment()
print(f'{name} 完成第{i+1}次递增')
async def main():
resource = SharedResource()
# 创建多个工作协程同时修改共享资源
tasks = [
asyncio.create_task(worker(resource, 'Worker-A', 3)),
asyncio.create_task(worker(resource, 'Worker-B', 3)),
asyncio.create_task(worker(resource, 'Worker-C', 3))
]
await asyncio.gather(*tasks)
print(f'最终结果: {resource.value}') # 正确输出: 9
# 如果不加锁,由于竞态条件,结果可能小于9
Semaphore(信号量)的原理与使用
基本概念
Semaphore用于限制同时访问资源的协程数量:
- 维护一个计数器,表示可用许可数
- 获取许可时计数器减1,释放时加1
- 计数器为0时,新的获取操作需要等待
实现原理
class SimpleSemaphore:
def __init__(self, value=1):
self._value = value
self._waiters = collections.deque()
async def acquire(self):
if self._value > 0:
self._value -= 1
return True
fut = asyncio.Future()
self._waiters.append(fut)
await fut
return True
def release(self):
if self._waiters:
# 优先唤醒等待者
fut = self._waiters.popleft()
fut.set_result(True)
else:
self._value += 1
使用示例
import asyncio
async def limited_resource(semaphore, name, duration):
async with semaphore: # 限制同时只有2个协程访问
print(f'{name} 获取资源访问权')
await asyncio.sleep(duration)
print(f'{name} 释放资源')
async def main():
semaphore = asyncio.Semaphore(2) # 最多允许2个并发
tasks = [
asyncio.create_task(limited_resource(semaphore, f'任务-{i}', i))
for i in range(5)
]
await asyncio.gather(*tasks)
# 运行结果:同时只有2个任务在执行
# 任务-0 获取资源访问权
# 任务-1 获取资源访问权
# (任务-0立即完成)
# 任务-0 释放资源
# 任务-2 获取资源访问权
# (任务-1完成)
# 任务-1 释放资源
# 任务-3 获取资源访问权
Condition(条件变量)的原理与使用
基本概念
Condition用于复杂的同步场景,允许协程等待特定条件成立:
- 结合了Lock和等待/通知机制
- 可以等待多个不同的条件
- 支持通知单个或所有等待者
使用示例
import asyncio
class BoundedBuffer:
def __init__(self, capacity):
self.capacity = capacity
self.buffer = []
self.condition = asyncio.Condition()
async def put(self, item):
async with self.condition:
# 等待缓冲区有空间
while len(self.buffer) >= self.capacity:
await self.condition.wait()
self.buffer.append(item)
print(f'生产: {item}, 缓冲区: {self.buffer}')
self.condition.notify_all() # 通知所有消费者
async def get(self):
async with self.condition:
# 等待缓冲区有数据
while len(self.buffer) == 0:
await self.condition.wait()
item = self.buffer.pop(0)
print(f'消费: {item}, 缓冲区: {self.buffer}')
self.condition.notify_all() # 通知所有生产者
return item
async def producer(buffer, items):
for item in items:
await buffer.put(item)
await asyncio.sleep(0.1)
async def consumer(buffer, count):
for _ in range(count):
await buffer.get()
await asyncio.sleep(0.15)
async def main():
buffer = BoundedBuffer(3)
producer_task = asyncio.create_task(
producer(buffer, [1, 2, 3, 4, 5])
)
consumer_task = asyncio.create_task(
consumer(buffer, 5)
)
await asyncio.gather(producer_task, consumer_task)
原语选择指南
- Event:简单的信号通知,一对多通信
- Lock:保护临界区,互斥访问
- Semaphore:限制并发数量,资源池管理
- Condition:复杂条件等待,生产者-消费者模式
注意事项
- 异步原语不会阻塞事件循环,但可能引起协程间的竞态条件
- 使用async with语句确保正确获取和释放资源
- 注意避免死锁,确保获取和释放的对称性