Python中的异步编程陷阱与解决方案:任务取消、异常处理与资源管理
这是一个在异步编程实践中至关重要的知识点。让我们来详细、循序渐进地学习。
1. 问题背景与核心挑战
在异步编程中,我们处理的是协作式多任务。任务(Task)可以随时被取消(比如用户中断操作、超时等)。与同步编程不同,异步任务一旦被取消,会抛出一个特殊的asyncio.CancelledError异常。这给我们的程序带来了几个核心挑战:
- 取消传播:一个任务被取消,如何确保其内部创建的“子任务”也能被正确取消,避免资源泄漏?
- 异常隔离:
CancelledError是一种特殊的BaseException,它不应该被当作普通的Exception来处理,否则会破坏取消机制。但其他“业务逻辑”异常又需要被正常捕获和处理。 - 资源清理:在任务被取消的瞬间,如何确保文件、网络连接、数据库会话等资源能被正确、及时地释放?
2. 任务取消的机制与陷阱
2.1 取消是如何工作的?
当你在一个asyncio.Task对象上调用.cancel()方法时,事件循环并不会立即停止任务执行。它只是在任务内部标记一个取消状态,并在任务下一次主动让出控制权(例如,遇到await)时,在该await点抛出CancelledError。
import asyncio
async def simple_task():
print("开始执行")
try:
await asyncio.sleep(1) # 取消会在这个await点生效
except asyncio.CancelledError:
print("任务被取消!")
raise # 必须重新抛出
async def main():
task = asyncio.create_task(simple_task())
await asyncio.sleep(0.1) # 确保任务开始
task.cancel() # 请求取消
try:
await task
except asyncio.CancelledError:
print("主函数捕获到任务被取消")
asyncio.run(main())
# 输出:
# 开始执行
# 任务被取消!
# 主函数捕获到任务被取消
陷阱1:屏蔽取消
如果你在except asyncio.CancelledError块中,没有重新抛出(raise)这个异常,那么取消请求就被“屏蔽”了。任务会继续运行,就像没被取消一样。这通常是个错误。
async def bad_task():
try:
await asyncio.sleep(10)
except asyncio.CancelledError:
print("取消了,但我就是不退出!")
# 这里没有 raise,任务会继续往下执行
await asyncio.sleep(2) # 任务依然在运行!
print("任务意外地完成了")
async def main():
task = asyncio.create_task(bad_task())
await asyncio.sleep(0.1)
task.cancel()
await asyncio.sleep(3) # 等待足够长时间
print(f"任务状态: {task.done()}") # 可能为False
asyncio.run(main())
2.2 正确处理取消与清理
正确的模式是:捕获CancelledError,执行必要的清理操作,然后重新抛出。
async def proper_task():
resource_acquired = False
try:
# 模拟获取资源
resource_acquired = True
print("资源已获取")
await asyncio.sleep(10) # 长时间操作
except asyncio.CancelledError:
print("收到取消请求,开始清理...")
if resource_acquired:
# 释放资源,例如关闭文件、网络连接
print("资源已释放")
raise # 必须重新抛出,让任务真正结束
finally:
# finally块在重新抛出CancelledError后依然会执行
print("finally块执行")
asyncio.run(proper_task())
3. 异常处理的层级与隔离
陷阱2:混淆CancelledError和普通异常
CancelledError继承自BaseException,而不是Exception。这意味着except Exception:不会捕获到它。这是有意设计的,防止取消机制被意外抑制。
async def task_with_exception():
try:
await asyncio.sleep(1)
1 / 0 # 这里会抛出 ZeroDivisionError
except Exception as e:
print(f"捕获到普通异常: {e}")
return "处理了错误"
async def task_to_be_cancelled():
try:
await asyncio.sleep(10)
except Exception as e: # 这里不会捕获CancelledError!
print(f"这行不会打印")
finally:
print("finally块会执行")
async def main():
task1 = asyncio.create_task(task_with_exception())
task2 = asyncio.create_task(task_to_be_cancelled())
await asyncio.sleep(0.1)
task2.cancel() # 这会直接触发CancelledError,跳过except Exception
await asyncio.gather(task1, task2, return_exceptions=True)
asyncio.run(main())
正确做法:如果需要同时处理取消和业务异常,应该分别捕获。
async def robust_task():
try:
await some_async_operation()
except asyncio.CancelledError:
# 处理取消逻辑
print("任务被取消")
raise
except (CustomError, IOError) as e:
# 处理特定的业务异常
print(f"业务异常: {e}")
return None
except Exception as e:
# 兜底的异常捕获
print(f"未知异常: {e}")
raise
4. 资源管理的最佳实践
陷阱3:取消导致的资源泄漏
在异步操作中,如果在await获取资源(如数据库连接)和释放资源的代码之间发生取消,资源可能无法释放。
解决方案1:使用异步上下文管理器(async with)
async with语句确保了即使在块内发生取消,__aexit__方法也会被调用,从而进行清理。
import asyncio
class AsyncResource:
async def __aenter__(self):
print("获取资源")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("释放资源")
return False # 不抑制异常
async def use_resource():
async with AsyncResource():
print("使用资源中...")
await asyncio.sleep(2) # 如果在此处被取消,__aexit__依然会被调用
print("资源使用完毕")
async def main():
task = asyncio.create_task(use_resource())
await asyncio.sleep(0.1)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("任务被取消,但资源已释放")
asyncio.run(main())
解决方案2:使用asyncio.shield保护关键代码段(谨慎使用!)
asyncio.shield可以暂时防止一个awaitable对象被取消。但不能滥用,因为它会破坏取消的传播。
async def critical_section():
print("开始关键操作(不可取消)")
await asyncio.sleep(1) # 模拟关键IO
print("关键操作完成")
async def shielded_task():
try:
# shield保护期间,任务取消请求会被推迟
await asyncio.shield(critical_section())
print("关键部分已完成,现在可被取消")
await asyncio.sleep(2) # 这里可以被取消
except asyncio.CancelledError:
print("在非关键部分被取消")
raise
async def main():
task = asyncio.create_task(shielded_task())
await asyncio.sleep(0.1)
task.cancel()
await asyncio.sleep(3) # 观察输出顺序
5. 子任务管理与传播取消
陷阱4:忘记取消子任务
当一个父任务被取消时,它创建的、尚未完成的子任务不会自动被取消,可能导致“孤儿任务”和资源泄漏。
解决方案:使用asyncio.create_task创建子任务时保存引用,并在父任务取消时显式取消它们,或使用asyncio.gather或asyncio.wait。
import asyncio
async def child_task(name, delay):
try:
print(f"{name}: 开始")
await asyncio.sleep(delay)
print(f"{name}: 完成")
except asyncio.CancelledError:
print(f"{name}: 被取消")
raise
async def parent_with_proper_cleanup():
tasks = []
try:
# 创建子任务并保存引用
tasks.append(asyncio.create_task(child_task("子任务1", 5)))
tasks.append(asyncio.create_task(child_task("子任务2", 10)))
await asyncio.sleep(1)
print("父任务完成")
except asyncio.CancelledError:
print("父任务被取消,正在取消子任务...")
# 取消所有子任务
for t in tasks:
if not t.done():
t.cancel()
# 等待所有子任务完成(被取消)
await asyncio.gather(*tasks, return_exceptions=True)
raise # 重新抛出取消异常
async def main():
task = asyncio.create_task(parent_with_proper_cleanup())
await asyncio.sleep(0.5)
task.cancel() # 这会触发父任务取消,进而取消所有子任务
await asyncio.sleep(0.1)
print("主程序结束")
asyncio.run(main())
6. 超时与取消结合
asyncio.wait_for和asyncio.as_completed都内置了超时机制,超时本质上是取消任务。
async def long_operation():
await asyncio.sleep(10)
return "结果"
async def main():
try:
# wait_for在超时后会取消内部任务
result = await asyncio.wait_for(long_operation(), timeout=1.0)
except asyncio.TimeoutError:
print("操作超时")
except asyncio.CancelledError:
# 注意:被wait_for取消的任务抛出的是CancelledError
# 但wait_for本身会将其转换为TimeoutError抛出
print("这行不会执行")
总结与最佳实践:
- 永远不要吞掉
asyncio.CancelledError:捕获后必须进行清理,然后重新抛出。 - 区分取消和业务异常:
CancelledError是BaseException,用单独的except块处理。 - 使用异步上下文管理器:确保资源在任何情况下(包括取消)都能被释放。
- 管理子任务生命周期:父任务取消时,应显式取消其创建的所有子任务。
- 谨慎使用
asyncio.shield:只用于保护绝对不能中断的关键操作,并尽量缩短保护范围。 - 编写可取消的代码:在长时间循环或计算中,定期使用
await asyncio.sleep(0)来插入“取消检查点”,让任务能够响应取消请求。
掌握这些陷阱和解决方案,将使你编写的异步代码更加健壮、可靠,并能有效地管理资源和处理并发异常。