Python中的协程任务取消与超时处理机制
字数 676 2025-12-06 04:17:05
Python中的协程任务取消与超时处理机制
描述:
在Python的asyncio异步编程中,协程任务的取消和超时处理是重要的错误处理机制。这涉及到如何优雅地终止正在执行的异步操作,以及如何为异步操作设置时间限制,防止无限期等待。
解题过程:
1. 任务取消的基本机制
协程任务取消通过cancel()方法实现,这会向任务发送取消请求:
import asyncio
async def my_task():
try:
await asyncio.sleep(10)
except asyncio.CancelledError:
print("任务被取消了")
raise # 必须重新抛出异常
async def main():
task = asyncio.create_task(my_task())
await asyncio.sleep(1) # 等待1秒
task.cancel() # 发送取消请求
try:
await task
except asyncio.CancelledError:
print("任务已确认取消")
asyncio.run(main())
关键点:
cancel()方法不会立即终止任务,而是发送一个取消请求- 任务在下一个
await点会收到CancelledError异常 - 捕获
CancelledError后可以选择清理资源,但必须重新抛出异常
2. 超时处理的实现方式
asyncio提供了几种超时处理机制:
2.1 使用asyncio.wait_for()
import asyncio
async def slow_operation():
await asyncio.sleep(5)
return "操作完成"
async def main():
try:
# 设置3秒超时
result = await asyncio.wait_for(slow_operation(), timeout=3.0)
print(f"结果: {result}")
except asyncio.TimeoutError:
print("操作超时")
asyncio.run(main())
工作原理:
- 为协程创建包装任务
- 设置超时计时器
- 超时时取消原任务
- 原任务收到
CancelledError - 包装任务抛出
TimeoutError
2.2 使用asyncio.shield()保护任务不被取消
import asyncio
async def critical_operation():
try:
await asyncio.sleep(2)
return "关键操作完成"
except asyncio.CancelledError:
# 即使被取消也完成操作
await asyncio.sleep(1) # 模拟清理操作
return "取消后完成"
async def main():
task = asyncio.create_task(critical_operation())
try:
# 使用shield保护任务
result = await asyncio.wait_for(
asyncio.shield(task),
timeout=1.0
)
print(f"结果: {result}")
except asyncio.TimeoutError:
print("超时,但关键操作仍在运行")
# 等待关键操作完成
result = await task
print(f"最终结果: {result}")
asyncio.run(main())
3. 超时处理的底层实现
wait_for的内部简化逻辑:
async def wait_for(fut, timeout):
# 创建事件循环
loop = asyncio.get_running_loop()
# 将future包装为任务
task = asyncio.ensure_future(fut)
if timeout is None:
return await task
# 创建超时处理器
timeout_handle = loop.call_later(
timeout,
lambda: task.cancel() if not task.done() else None
)
try:
return await task
except asyncio.CancelledError:
# 检查是否是超时导致的取消
if not task.done():
# 是超时取消
raise asyncio.TimeoutError()
else:
# 是其他原因取消
raise
finally:
timeout_handle.cancel() # 清理超时计时器
4. 复杂的超时控制模式
4.1 分组超时控制
import asyncio
from typing import List
async def batch_operations_with_timeout(
tasks: List[asyncio.Task],
timeout: float
):
"""批量操作,整体超时"""
done, pending = await asyncio.wait(
tasks,
timeout=timeout
)
# 处理已完成的
for task in done:
if task.exception():
print(f"任务异常: {task.exception()}")
else:
print(f"任务结果: {task.result()}")
# 取消未完成的
for task in pending:
task.cancel()
async def operation(name, duration):
await asyncio.sleep(duration)
return f"{name}完成"
async def main():
tasks = [
asyncio.create_task(operation("任务1", 2)),
asyncio.create_task(operation("任务2", 4)),
asyncio.create_task(operation("任务3", 6)),
]
await batch_operations_with_timeout(tasks, timeout=3.0)
asyncio.run(main())
4.2 可恢复的超时处理
import asyncio
import contextlib
@contextlib.asynccontextmanager
async def timeout_context(timeout: float):
"""可恢复的超时上下文管理器"""
task = asyncio.current_task()
loop = asyncio.get_running_loop()
if timeout is None:
yield
return
timeout_handle = None
def cancel_task():
if not task.done():
task.cancel()
timeout_handle = loop.call_later(timeout, cancel_task)
try:
yield
except asyncio.CancelledError:
# 检查是否是超时导致的取消
if timeout_handle and not timeout_handle.cancelled():
# 是超时
timeout_handle = None
raise asyncio.TimeoutError()
raise
finally:
if timeout_handle:
timeout_handle.cancel()
async def resumable_operation():
"""可分阶段恢复的操作"""
async with timeout_context(2.0):
print("阶段1")
await asyncio.sleep(1)
# 第一阶段完成,重置超时
async with timeout_context(2.0):
print("阶段2")
await asyncio.sleep(3) # 这会超时
asyncio.run(resumable_operation())
5. 最佳实践和陷阱
5.1 资源清理
async def task_with_cleanup():
resource = acquire_resource()
try:
while True:
await asyncio.sleep(1)
# 正常操作
except asyncio.CancelledError:
print("收到取消请求,开始清理...")
await cleanup_resource(resource) # 清理资源
print("清理完成")
raise # 必须重新抛出
finally:
# 无论如何都要释放资源
release_resource(resource)
5.2 超时传播控制
async def operation_with_timeout_propagation():
"""控制超时是否传播到内部操作"""
try:
# 内部操作有自己的超时控制
inner_task = asyncio.create_task(inner_operation())
return await asyncio.wait_for(inner_task, timeout=2.0)
except asyncio.TimeoutError:
# 不取消内部任务,让其继续运行
print("外部超时,但内部操作继续")
return None
关键要点:
- 任务取消是协作式的,需要任务在
await点检查取消请求 - 超时本质上是定时取消任务
- 使用
shield()可以保护关键操作不被取消 - 必须妥善处理
CancelledError,进行资源清理 - 复杂的超时控制需要考虑任务分组和超时传播策略