Python中的协程任务调度与优先级队列实现
字数 719 2025-11-23 18:08:13
Python中的协程任务调度与优先级队列实现
知识点描述
协程任务调度是异步编程的核心机制,它决定了多个协程任务如何被安排执行。优先级队列则允许我们根据特定规则(如优先级数值)来决定任务的执行顺序。本知识点将深入探讨asyncio中任务调度的原理,以及如何实现带优先级的协程调度。
1. 基础概念:异步任务调度原理
- asyncio的事件循环维护一个就绪队列(ready queue)来管理可运行的协程
- 当协程遇到await表达式时,会暂停执行并将控制权交还给事件循环
- 事件循环从就绪队列中取出下一个任务继续执行
- 默认采用FIFO(先进先出)调度策略
import asyncio
async def task(name, duration):
print(f"{name} started")
await asyncio.sleep(duration)
print(f"{name} finished")
async def main():
# 默认FIFO调度:task1先执行完毕
await asyncio.gather(
task("task1", 1),
task("task2", 0.5) # 虽然耗时更短,但仍需等待task1开始
)
# asyncio.run(main())
2. 优先级调度的需求分析
- 某些场景需要优先执行高优先级任务(如用户交互、实时数据处理)
- 优先级可以用数值表示,数值越小优先级越高(类UNIX标准)
- 需要修改默认调度策略,让高优先级任务优先进入就绪队列
3. 实现优先级队列的核心组件
3.1 自定义优先级任务类
import heapq
from dataclasses import dataclass, field
from typing import Any
from asyncio import Task
@dataclass(order=True)
class PrioritizedItem:
priority: int # 优先级数值
item: Any = field(compare=False) # 不参与比较的实际任务对象
3.2 优先级队列实现
class PriorityQueue:
def __init__(self):
self._queue = []
self._index = 0 # 处理优先级相同时的顺序
def push(self, item, priority):
# 使用(priority, index, item)元组确保堆排序稳定
heapq.heappush(self._queue, (priority, self._index, item))
self._index += 1
def pop(self):
if self._queue:
return heapq.heappop(self._queue)[-1] # 返回实际任务项
raise IndexError("pop from empty queue")
def __len__(self):
return len(self._queue)
4. 集成到asyncio事件循环
4.1 自定义调度器类
class PriorityScheduler:
def __init__(self):
self._ready = PriorityQueue() # 替换默认的deque
self._scheduled = [] # 定时任务列表
def call_soon(self, callback, *args):
# 默认优先级为0
self.call_soon_threadsafe(callback, *args, priority=0)
def call_soon_threadsafe(self, callback, *args, priority=0):
self._ready.push((callback, args), priority)
def call_later(self, delay, callback, *args, priority=0):
# 定时任务实现(简化版)
async def delayed():
await asyncio.sleep(delay)
callback(*args)
asyncio.create_task(delayed(), priority=priority)
5. 完整优先级任务调度实现
import asyncio
from heapq import heappush, heappop
from time import time
class PriorityTask:
def __init__(self, coro, priority=0):
self.coro = coro
self.priority = priority
self._result = None
self._exception = None
self._done = False
def __await__(self):
return (yield self) # 允许被await
class PriorityEventLoop(asyncio.SelectorEventLoop):
def __init__(self):
super().__init__()
self._ready = [] # 使用堆结构存储就绪任务
def call_soon(self, callback, *args, context=None):
# 默认实现,不处理优先级
return super().call_soon(callback, *args, context=context)
def create_task(self, coro, *, name=None, priority=0):
"""创建带优先级的任务"""
task = asyncio.Task(coro, loop=self, name=name)
# 将优先级信息存储在任务对象中
task.priority = priority
return task
async def worker(name, duration, priority):
print(f"[{priority}] {name} 开始执行")
start = time()
await asyncio.sleep(duration)
end = time()
print(f"[{priority}] {name} 完成,耗时{end-start:.2f}秒")
async def priority_demo():
# 创建不同优先级的任务
tasks = [
asyncio.create_task(worker("高优先级任务", 1, priority=1), priority=1),
asyncio.create_task(worker("中优先级任务", 2, priority=5), priority=5),
asyncio.create_task(worker("低优先级任务", 3, priority=9), priority=9),
asyncio.create_task(worker("紧急任务", 0.5, priority=0), priority=0), # 最高优先级
]
await asyncio.gather(*tasks)
# 使用示例
# asyncio.run(priority_demo())
6. 实际应用场景与优化
6.1 网络请求优先级调度
class PriorityRequestScheduler:
def __init__(self, max_concurrent=10):
self.max_concurrent = max_concurrent
self.pending = PriorityQueue() # 待处理请求
self.active = set() # 活跃请求
async def submit(self, url, priority=0):
"""提交带优先级的请求"""
# 实现请求优先级调度逻辑
pass
6.2 性能优化考虑
- 使用heapq替代list提高队列操作效率
- 避免优先级数值范围过大导致内存浪费
- 考虑使用双向优先级队列处理动态优先级调整
7. 注意事项与最佳实践
- 优先级数值设计要合理,避免过于复杂的优先级逻辑
- 高优先级任务不应长时间阻塞事件循环
- 考虑使用asyncio.BoundedSemaphore限制并发数量
- 监控任务执行时间,避免优先级反转问题
通过这种优先级调度机制,可以确保关键任务得到及时处理,提高系统的响应性和资源利用率。