Python中的并发编程:线程池与进程池(concurrent.futures)详解
字数 1359 2025-12-09 15:54:30
Python中的并发编程:线程池与进程池(concurrent.futures)详解
描述
concurrent.futures模块是Python 3.2+引入的用于简化并发编程的高层接口。它提供了ThreadPoolExecutor(线程池)和ProcessPoolExecutor(进程池)两个核心类,统一了多线程和多进程的使用方式。这个模块通过"Future"对象封装异步操作,使并发任务的提交、管理和结果获取变得更加简洁。掌握线程池和进程池的使用场景、内部机制以及最佳实践,是编写高效Python并发程序的关键。
详细讲解
1. 模块设计思想
concurrent.futures模块基于"执行器-未来对象"模式:
- Executor:执行器抽象基类,提供任务提交的接口
- Future:表示异步计算的结果,封装了任务状态、结果和异常
- 统一接口:无论使用线程还是进程,API完全一致
2. ThreadPoolExecutor(线程池)详解
2.1 基本使用
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
# 创建线程池(推荐使用with语句自动管理资源)
def task(name, duration):
"""模拟耗时任务"""
time.sleep(duration)
return f"任务{name}完成,耗时{duration}秒"
# 方法1:submit逐个提交
with ThreadPoolExecutor(max_workers=3) as executor:
# 提交任务,立即返回Future对象
future1 = executor.submit(task, "A", 2)
future2 = executor.submit(task, "B", 1)
# 获取结果(会阻塞直到任务完成)
print(future1.result()) # 输出:任务A完成,耗时2秒
print(future2.result()) # 输出:任务B完成,耗时1秒
# 方法2:map批量提交
with ThreadPoolExecutor(max_workers=3) as executor:
# map按顺序提交,按顺序返回结果
results = executor.map(task, ["C", "D", "E"], [2, 1, 3])
for result in results:
print(result) # 按任务提交顺序输出
2.2 线程池内部机制
- 工作队列:
ThreadPoolExecutor内部维护一个任务队列(queue.SimpleQueue) - 工作线程:启动
max_workers个后台线程,不断从队列获取任务执行 - 任务分发:
submit():将任务包装为_WorkItem放入队列- 空闲工作线程从队列取出任务执行
- 结果处理:任务结果设置到对应的
Future对象
2.3 最佳实践
# 控制并发数:根据任务类型设置合适的max_workers
# I/O密集型:通常设置为CPU核心数×5
# CPU密集型:通常设置为CPU核心数
import os
cpu_count = os.cpu_count() or 1
io_bound_workers = cpu_count * 5
cpu_bound_workers = cpu_count
# 使用as_completed获取完成的任务
with ThreadPoolExecutor(max_workers=3) as executor:
# 提交多个任务
futures = {executor.submit(task, f"Task{i}", i%3+1): f"Task{i}"
for i in range(5)}
# 按完成顺序处理结果
for future in as_completed(futures):
task_name = futures[future]
try:
result = future.result(timeout=2) # 设置超时
print(f"{task_name}: {result}")
except TimeoutError:
print(f"{task_name}: 超时")
except Exception as e:
print(f"{task_name}: 异常 {e}")
3. ProcessPoolExecutor(进程池)详解
3.1 基本使用
from concurrent.futures import ProcessPoolExecutor
import math
def is_prime(n):
"""CPU密集型任务:判断素数"""
if n < 2:
return False
for i in range(2, int(math.sqrt(n)) + 1):
if n % i == 0:
return False
return True
# 使用进程池处理CPU密集型任务
with ProcessPoolExecutor(max_workers=4) as executor:
numbers = range(10**6, 10**6 + 1000)
# 使用map并行计算
results = list(executor.map(is_prime, numbers))
prime_count = sum(results)
print(f"素数数量: {prime_count}")
3.2 进程池特性
- 独立进程:每个工作进程是独立的Python解释器实例
- 进程间通信:通过
pickle序列化传递数据和结果 - 全局变量隔离:每个进程有自己的内存空间,修改全局变量不会影响其他进程
- 启动开销:进程创建和销毁的开销比线程大
3.3 注意事项
# 注意1:函数和参数必须可pickle
# 错误示例:lambda函数不可pickle
# executor.submit(lambda x: x*2, 5) # 会报错
# 正确做法:使用普通函数
def double(x):
return x * 2
# 注意2:初始化和资源清理
def init_worker():
"""每个进程初始化时执行"""
print(f"进程{os.getpid()}启动")
# 可以在这里建立数据库连接等
def cleanup():
"""每个进程结束时执行"""
print(f"进程{os.getpid()}结束")
# 注意3:共享数据问题
# 进程间不能直接共享内存,需要使用multiprocessing.Manager
from multiprocessing import Manager
def worker(shared_list, value):
shared_list.append(value)
return shared_list
with ProcessPoolExecutor() as executor, Manager() as manager:
shared_list = manager.list()
futures = [executor.submit(worker, shared_list, i) for i in range(5)]
4. Future对象详解
4.1 Future状态和生命周期
from concurrent.futures import Future
import threading
def set_future_result(future, value):
"""模拟异步设置Future结果"""
time.sleep(1)
future.set_result(value)
# 手动创建和操作Future
future = Future()
print(f"初始状态: {future.running()}") # False
print(f"是否完成: {future.done()}") # False
# 异步设置结果
thread = threading.Thread(target=set_future_result, args=(future, "结果"))
thread.start()
# 等待结果
result = future.result(timeout=2) # 阻塞直到有结果或超时
print(f"结果: {result}") # 输出: 结果
print(f"现在是否完成: {future.done()}") # True
4.2 Future的回调和异常处理
def callback(future):
"""Future完成时的回调函数"""
if future.cancelled():
print("任务被取消")
elif future.exception():
print(f"任务异常: {future.exception()}")
else:
print(f"任务结果: {future.result()}")
with ThreadPoolExecutor() as executor:
future = executor.submit(task, "回调测试", 1)
future.add_done_callback(callback) # 添加回调
5. 高级特性和最佳实践
5.1 上下文管理器与资源清理
# 使用with语句确保资源正确释放
with ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(task, f"Task{i}", 1) for i in range(5)]
# with块结束时自动调用executor.shutdown(wait=True)
# 等待所有任务完成,然后关闭工作线程
# 手动控制关闭
executor = ThreadPoolExecutor(max_workers=3)
try:
future = executor.submit(task, "手动控制", 1)
result = future.result()
finally:
executor.shutdown(wait=True) # wait=True等待任务完成
5.2 超时和取消机制
with ThreadPoolExecutor(max_workers=2) as executor:
# 提交长时任务
future1 = executor.submit(time.sleep, 5)
future2 = executor.submit(time.sleep, 1)
# 取消未来开始的任务
if future1.running():
print("任务1运行中,无法取消")
else:
future1.cancel() # 如果还没开始,可以取消
# 设置整体超时
try:
result = future2.result(timeout=0.5) # 0.5秒超时
except TimeoutError:
print("任务2超时")
5.3 性能调优
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from functools import partial
# 任务类型选择指南
def io_bound_task(url):
"""I/O密集型任务:适合线程池"""
time.sleep(0.1) # 模拟网络请求
return f"获取{url}完成"
def cpu_bound_task(n):
"""CPU密集型任务:适合进程池"""
return sum(i*i for i in range(n))
# 基准测试
def benchmark():
n = 10**6
# CPU密集型:比较线程池和进程池
start = time.time()
with ThreadPoolExecutor() as executor:
list(executor.map(cpu_bound_task, [n]*4))
print(f"线程池CPU任务耗时: {time.time()-start:.2f}s")
start = time.time()
with ProcessPoolExecutor() as executor:
list(executor.map(cpu_bound_task, [n]*4))
print(f"进程池CPU任务耗时: {time.time()-start:.2f}s")
# 使用partial预设参数
def power(base, exponent):
return base ** exponent
with ThreadPoolExecutor() as executor:
square = partial(power, exponent=2)
cube = partial(power, exponent=3)
# 只传递base参数
futures = [executor.submit(square, i) for i in range(10)]
6. 常见陷阱和解决方案
6.1 死锁问题
# 错误示例:任务之间相互等待
def worker_a(lock1, lock2):
with lock1:
time.sleep(0.1)
with lock2: # 可能死锁
return "A完成"
def worker_b(lock1, lock2):
with lock2:
time.sleep(0.1)
with lock1: # 可能死锁
return "B完成"
# 解决方案1:统一获取锁的顺序
# 解决方案2:使用timeout
def safe_worker(lock1, lock2, timeout=1):
if lock1.acquire(timeout=timeout):
try:
if lock2.acquire(timeout=timeout):
try:
return "安全完成"
finally:
lock2.release()
finally:
lock1.release()
return "获取锁超时"
6.2 内存泄漏
# 错误示例:Future对象被长期持有
futures = []
with ThreadPoolExecutor() as executor:
for i in range(10000):
future = executor.submit(task, f"Task{i}", 0.1)
futures.append(future) # 内存泄漏!
# 正确做法:及时清理
with ThreadPoolExecutor() as executor:
for i in range(10000):
future = executor.submit(task, f"Task{i}", 0.1)
# 添加回调自动清理
future.add_done_callback(lambda f: f.result())
7. 实际应用案例
7.1 Web请求并发处理
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
def fetch_url(url, timeout=5):
try:
response = requests.get(url, timeout=timeout)
return url, response.status_code, len(response.text)
except Exception as e:
return url, None, str(e)
def concurrent_fetch(urls, max_workers=10):
results = {}
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# 提交所有任务
future_to_url = {executor.submit(fetch_url, url): url
for url in urls}
# 按完成顺序处理
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
result = future.result()
results[url] = result
except Exception as e:
results[url] = (url, None, str(e))
return results
7.2 数据处理流水线
from concurrent.futures import ProcessPoolExecutor
import multiprocessing as mp
def stage1(data):
"""第一阶段:数据预处理"""
return [x * 2 for x in data]
def stage2(data):
"""第二阶段:CPU密集型计算"""
return sum(x * x for x in data)
def stage3(result):
"""第三阶段:结果处理"""
return result / 1000
def pipeline(data_chunks):
"""流水线处理"""
with ProcessPoolExecutor() as executor:
# 第一阶段并行处理
stage1_results = list(executor.map(stage1, data_chunks))
# 第二阶段并行处理
stage2_results = list(executor.map(stage2, stage1_results))
# 第三阶段串行处理
final_result = sum(stage3(r) for r in stage2_results)
return final_result
总结
concurrent.futures模块通过统一的接口简化了Python并发编程:
-
选择策略:
- I/O密集型任务:使用
ThreadPoolExecutor - CPU密集型任务:使用
ProcessPoolExecutor - 混合型任务:根据瓶颈选择或结合使用
- I/O密集型任务:使用
-
关键特性:
- Future对象提供统一的结果处理接口
- 自动资源管理和任务调度
- 支持回调和异常处理
-
最佳实践:
- 总是使用with语句管理执行器
- 合理设置max_workers数量
- 使用as_completed处理完成的任务
- 注意进程间的数据序列化限制
-
性能优化:
- 避免在任务间共享可变状态
- 使用partial预设函数参数
- 及时清理完成的Future对象
- 合理设置超时防止任务阻塞
这个模块平衡了易用性和性能,是Python并发编程的首选方案之一,特别适合需要将串行代码并行化的场景。