Python中的多进程编程与multiprocessing模块
字数 1284 2025-11-05 23:47:39
Python中的多进程编程与multiprocessing模块
知识点描述
Python的multiprocessing模块用于创建并行进程,绕过GIL限制实现真正的并行计算。与多线程不同,每个进程拥有独立的Python解释器和内存空间,适合CPU密集型任务。
进程与线程的区别
- 进程:操作系统资源分配的基本单位,有独立内存空间,创建开销大
- 线程:CPU调度的基本单位,共享进程内存,创建开销小
- Python中多进程可充分利用多核CPU,多线程受GIL限制
multiprocessing核心组件
1. Process类的基本使用
from multiprocessing import Process
import os
def worker(name):
print(f'子进程 {name} 运行中,PID: {os.getpid()}')
if __name__ == '__main__':
# 创建进程实例
p = Process(target=worker, args=('worker1',))
# 启动进程
p.start()
# 等待进程结束
p.join()
print('主进程结束')
详细步骤解析:
Process()创建进程对象,target指定要执行的函数,args传递参数start()方法启动进程,操作系统会创建新的Python解释器进程join()阻塞主进程,直到子进程执行完成if __name__ == '__main__':在Windows系统下必须使用,避免子进程重复执行代码
2. 进程间通信(IPC)
由于进程内存独立,需要特殊机制进行数据交换:
2.1 Queue队列通信
from multiprocessing import Process, Queue
import time
def producer(q):
for i in range(5):
q.put(f'消息 {i}')
time.sleep(0.1)
def consumer(q):
while True:
item = q.get()
if item is None: # 终止信号
break
print(f'消费: {item}')
if __name__ == '__main__':
q = Queue()
p1 = Process(target=producer, args=(q,))
p2 = Process(target=consumer, args=(q,))
p1.start()
p2.start()
p1.join() # 等待生产者结束
q.put(None) # 发送终止信号
p2.join()
Queue特性:
- 线程和进程安全,自动处理同步问题
put()和get()方法默认阻塞,直到操作完成- 可设置最大容量防止内存溢出
2.2 Pipe管道通信
from multiprocessing import Process, Pipe
def worker(conn):
conn.send(['hello', 'world']) # 发送数据
data = conn.recv() # 接收数据
print(f'子进程收到: {data}')
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe() # 创建管道两端
p = Process(target=worker, args=(child_conn,))
p.start()
print(f'主进程收到: {parent_conn.recv()}')
parent_conn.send('确认收到') # 发送回复
p.join()
Pipe与Queue对比:
- Pipe性能更高,但只能两个进程间通信
- Queue支持多生产者和消费者模式
- Pipe返回两个连接对象,Queue是单一端点
3. 进程池Pool
当任务数量固定时,使用进程池避免频繁创建销毁进程的开销:
from multiprocessing import Pool
import time
def square(x):
return x * x
if __name__ == '__main__':
with Pool(processes=4) as pool: # 创建4个进程的池
# apply方法:同步执行,阻塞直到返回结果
result = pool.apply(square, (10,))
print(f'同步结果: {result}')
# apply_async方法:异步执行,立即返回AsyncResult对象
async_result = pool.apply_async(square, (20,))
print(f'异步结果: {async_result.get()}') # get()阻塞等待结果
# map方法:并行处理可迭代对象
results = pool.map(square, range(10))
print(f'map结果: {results}')
# imap方法:惰性迭代器,节省内存
for result in pool.imap(square, range(5)):
print(f'实时结果: {result}')
进程池方法比较:
apply:同步阻塞,适合顺序执行apply_async:异步非阻塞,可回调处理map:并行映射,收集所有结果返回列表imap:惰性求值,适合大数据集处理
4. 进程间共享状态
虽然进程内存独立,但multiprocessing提供了共享内存机制:
4.1 Value和Array共享内存
from multiprocessing import Process, Value, Array
def worker(n, arr):
n.value += 1
for i in range(len(arr)):
arr[i] *= 2
if __name__ == '__main__':
# Value创建共享数值,'i'表示整数类型
num = Value('i', 0)
# Array创建共享数组
arr = Array('d', [1.0, 2.0, 3.0]) # 'd'表示双精度浮点
processes = []
for _ in range(3):
p = Process(target=worker, args=(num, arr))
p.start()
processes.append(p)
for p in processes:
p.join()
print(f'最终数值: {num.value}') # 结果可能因竞态条件不确定
print(f'最终数组: {list(arr)}')
注意事项:
- 共享变量需要加锁避免竞态条件
- 支持的数据类型有限(i:整型,d:浮点,c:字符等)
- 修改操作不是原子性的,需要手动同步
4.2 Manager管理的共享对象
from multiprocessing import Process, Manager
def worker(d, l):
d['count'] += 1
l.append(len(l))
if __name__ == '__main__':
with Manager() as manager:
# 创建Manager管理的共享对象
shared_dict = manager.dict({'count': 0})
shared_list = manager.list()
processes = []
for _ in range(3):
p = Process(target=worker, args=(shared_dict, shared_list))
p.start()
processes.append(p)
for p in processes:
p.join()
print(f'共享字典: {dict(shared_dict)}')
print(f'共享列表: {list(shared_list)}')
Manager与共享内存对比:
- Manager通过代理进程管理对象,性能较低但更灵活
- 支持复杂数据结构(字典、列表等)
- 所有操作都经过网络序列化,开销较大
最佳实践建议
- 进程数量:通常设置为CPU核心数,避免过度切换
- 数据序列化:进程间传递的数据必须可序列化(pickle)
- 资源清理:及时join()进程,使用with语句自动管理
- 避免共享状态:尽量使用消息传递而非共享内存
- 错误处理:子进程异常不会自动传播到主进程,需要手动处理
适用场景总结
- CPU密集型任务:数学计算、图像处理等
- 需要真正并行执行的场景
- 任务相互独立,通信需求较少
- 需要绕过GIL限制的多核利用
通过合理使用multiprocessing模块,可以显著提升Python程序的并行处理能力,特别是在多核CPU环境下。