Python中的异步编程错误处理与取消机制详解
字数 597 2025-12-06 01:14:13
Python中的异步编程错误处理与取消机制详解
一、异步错误处理的特殊性
异步编程的错误处理与传统同步代码有显著差异,因为:
- 执行流程被挂起:协程在
await处挂起,错误可能在不同时间点发生 - 错误传播路径复杂:错误需要在任务链中正确传播
- 取消是异步特有操作:需要优雅处理任务取消请求
二、异步错误处理机制详解
1. 基本异常捕获
import asyncio
async def faulty_task():
await asyncio.sleep(1)
raise ValueError("任务出错!")
async def main():
try:
await faulty_task() # 直接await可以捕获异常
except ValueError as e:
print(f"捕获到异常: {e}")
# 执行
asyncio.run(main())
2. 任务包装与异常处理
当任务被创建但没有立即await时,异常不会立即抛出:
async def main():
task = asyncio.create_task(faulty_task())
await asyncio.sleep(2) # 等待足够时间让任务执行
# 此时异常存储在任务对象中
if task.done() and not task.cancelled():
try:
await task # 重新await会抛出异常
except ValueError as e:
print(f"任务异常: {e}")
三、任务取消机制详解
1. 基本的任务取消
async def long_running_task():
try:
await asyncio.sleep(10)
print("任务完成")
except asyncio.CancelledError:
print("任务被取消!")
raise # 必须重新抛出,否则取消不会生效
async def main():
task = asyncio.create_task(long_running_task())
await asyncio.sleep(1)
task.cancel() # 请求取消任务
try:
await task # 等待任务结束(会抛出CancelledError)
except asyncio.CancelledError:
print("主程序处理了取消异常")
2. 带清理的取消处理
async def task_with_cleanup():
resource = acquire_resource()
try:
while True:
await asyncio.sleep(1)
print("工作中...")
except asyncio.CancelledError:
print("开始清理...")
cleanup_resource(resource) # 执行清理操作
print("清理完成")
raise # 重新抛出取消异常
finally:
print("finally块执行")
def acquire_resource():
print("获取资源")
return "资源句柄"
def cleanup_resource(resource):
print(f"清理资源: {resource}")
四、shield保护任务不被取消
1. asyncio.shield的基本使用
async def critical_operation():
print("关键操作开始")
await asyncio.sleep(2)
print("关键操作完成")
return "结果"
async def main():
# 创建任务
task = asyncio.create_task(critical_operation())
# 用shield保护
shielded = asyncio.shield(task)
# 取消shielded任务
shielded.cancel()
try:
result = await shielded
except asyncio.CancelledError:
print("shielded任务被取消")
# 原始任务仍在运行
try:
result = await asyncio.wait_for(task, timeout=3)
print(f"原始任务结果: {result}")
except asyncio.TimeoutError:
print("任务超时")
五、超时处理机制
1. asyncio.wait_for的使用
async def slow_operation():
await asyncio.sleep(5)
return "结果"
async def main():
try:
# 设置3秒超时
result = await asyncio.wait_for(slow_operation(), timeout=3.0)
print(f"结果: {result}")
except asyncio.TimeoutError:
print("操作超时!")
# 超时后取消任务
task = asyncio.create_task(slow_operation())
try:
result = await asyncio.wait_for(task, timeout=2.0)
except asyncio.TimeoutError:
print("任务超时,尝试取消...")
task.cancel() # 取消任务
2. asyncio.wait与超时
async def main():
tasks = [
asyncio.create_task(asyncio.sleep(i, result=f"任务{i}"))
for i in range(1, 4)
]
# 等待所有任务完成,最多等2秒
done, pending = await asyncio.wait(tasks, timeout=2.0)
print(f"完成的任务数: {len(done)}")
print(f"未完成的任务数: {len(pending)}")
# 取消未完成的任务
for task in pending:
task.cancel()
# 等待所有任务结束
results = await asyncio.gather(*tasks, return_exceptions=True)
print(f"所有结果: {results}")
六、gather的异常处理
1. return_exceptions参数
async def task1():
await asyncio.sleep(1)
return "任务1完成"
async def task2():
await asyncio.sleep(2)
raise ValueError("任务2出错")
async def task3():
await asyncio.sleep(3)
return "任务3完成"
async def main():
tasks = [task1(), task2(), task3()]
# 方式1:返回异常对象
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results, 1):
if isinstance(result, Exception):
print(f"任务{i}异常: {result}")
else:
print(f"任务{i}结果: {result}")
print("\n" + "="*30 + "\n")
# 方式2:快速失败(默认行为)
try:
results = await asyncio.gather(*tasks, return_exceptions=False)
except ValueError as e:
print(f"捕获到异常: {e}")
2. 部分任务的异常隔离
async def main():
# 分别gather,隔离异常
try:
result1 = await asyncio.gather(task1())
result2 = await asyncio.gather(task2())
except ValueError as e:
print(f"任务2异常: {e}")
# 任务1和任务3可以正常执行
try:
result13 = await asyncio.gather(task1(), task3())
print(f"任务1和3的结果: {result13}")
except Exception as e:
print(f"其他异常: {e}")
七、异步上下文管理器的错误处理
class AsyncResource:
async def __aenter__(self):
print("获取资源")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print(f"释放资源,异常类型: {exc_type}")
if exc_type is not None:
print(f"异常信息: {exc_val}")
return False # 不抑制异常
async def operation(self):
print("执行操作")
raise RuntimeError("操作失败")
async def main():
try:
async with AsyncResource() as resource:
await resource.operation()
except RuntimeError as e:
print(f"外部捕获: {e}")
八、最佳实践与模式
1. 统一错误处理装饰器
def async_error_handler(func):
async def wrapper(*args, **kwargs):
try:
return await func(*args, **kwargs)
except asyncio.CancelledError:
print(f"{func.__name__}被取消")
raise
except Exception as e:
print(f"{func.__name__}出错: {e}")
return None
return wrapper
@async_error_handler
async def risky_operation():
await asyncio.sleep(1)
raise ValueError("操作风险")
async def main():
task = asyncio.create_task(risky_operation())
await task
2. 任务组模式
async def task_group_handler(tasks, timeout=None):
"""
统一处理任务组,确保所有任务都被正确处理
"""
task_objects = [asyncio.create_task(t) for t in tasks]
try:
if timeout:
# 带超时的等待
done, pending = await asyncio.wait(
task_objects,
timeout=timeout,
return_when=asyncio.ALL_COMPLETED
)
else:
# 无限等待
done, pending = await asyncio.wait(
task_objects,
return_when=asyncio.ALL_COMPLETED
)
finally:
# 取消所有未完成的任务
for task in pending:
task.cancel()
# 等待所有任务结束
await asyncio.gather(*task_objects, return_exceptions=True)
# 收集结果
results = []
for task in task_objects:
if task.done() and not task.cancelled():
try:
results.append(task.result())
except Exception as e:
results.append(e)
else:
results.append(asyncio.CancelledError())
return results
九、常见陷阱与解决方案
- 忘记重新抛出CancelledError
# 错误做法
async def bad_task():
try:
await asyncio.sleep(10)
except asyncio.CancelledError:
print("被取消") # 忘记重新抛出,取消无效!
# 正确做法
async def good_task():
try:
await asyncio.sleep(10)
except asyncio.CancelledError:
print("被取消")
raise # 必须重新抛出
- 未处理的异常导致静默失败
async def main():
# 错误:异常被忽略
task = asyncio.create_task(faulty_task())
# 正确:始终检查任务状态
task = asyncio.create_task(faulty_task())
try:
await asyncio.wait_for(task, timeout=5)
except Exception as e:
print(f"任务异常: {e}")
十、调试技巧
- 设置异常回调
def handle_exception(task):
if task.exception():
print(f"任务异常: {task.exception()}")
async def main():
loop = asyncio.get_running_loop()
task = asyncio.create_task(faulty_task())
task.add_done_callback(handle_exception)
await task
- 使用调试模式
import sys
async def main():
# 启用调试
asyncio.get_event_loop().set_debug(True)
# 慢回调警告阈值
asyncio.get_event_loop().slow_callback_duration = 0.1
task = asyncio.create_task(faulty_task())
await task
异步错误处理的核心是理解:异步操作的状态变化是离散的,异常可能在任何await点发生,需要在整个任务生命周期中妥善处理各种状态转换。正确的错误处理能确保程序的健壮性和资源的正确释放。