Multi-process Programming in Python and the multiprocessing Module

Multi-process Programming in Python and the multiprocessing Module

Knowledge Point Description
Python's multiprocessing module is used to create parallel processes, bypassing the GIL limitation to achieve true parallel computing. Unlike multithreading, each process has its own independent Python interpreter and memory space, making it suitable for CPU-intensive tasks.

Differences Between Processes and Threads

  • Process: The basic unit of operating system resource allocation, with independent memory space and high creation overhead.
  • Thread: The basic unit of CPU scheduling, sharing process memory, with low creation overhead.
  • In Python, multiprocessing can fully utilize multi-core CPUs, while multithreading is limited by the GIL.

Core Components of multiprocessing

1. Basic Usage of the Process Class

from multiprocessing import Process
import os

def worker(name):
    print(f'Child process {name} running, PID: {os.getpid()}')

if __name__ == '__main__':
    # Create a process instance
    p = Process(target=worker, args=('worker1',))
    
    # Start the process
    p.start()
    
    # Wait for the process to finish
    p.join()
    
    print('Main process ended')

Detailed Step-by-Step Analysis:

  • Process() creates a process object, target specifies the function to execute, args passes parameters.
  • start() method starts the process; the operating system creates a new Python interpreter process.
  • join() blocks the main process until the child process completes execution.
  • if __name__ == '__main__': is mandatory on Windows systems to avoid code re-execution by child processes.

2. Inter-Process Communication (IPC)
Due to independent process memory, special mechanisms are required for data exchange:

2.1 Queue Communication

from multiprocessing import Process, Queue
import time

def producer(q):
    for i in range(5):
        q.put(f'Message {i}')
        time.sleep(0.1)

def consumer(q):
    while True:
        item = q.get()
        if item is None:  # Termination signal
            break
        print(f'Consumed: {item}')

if __name__ == '__main__':
    q = Queue()
    
    p1 = Process(target=producer, args=(q,))
    p2 = Process(target=consumer, args=(q,))
    
    p1.start()
    p2.start()
    
    p1.join()  # Wait for producer to finish
    q.put(None)  # Send termination signal
    p2.join()

Queue Characteristics:

  • Thread and process safe, automatically handles synchronization issues.
  • put() and get() methods are blocking by default until the operation completes.
  • Maximum capacity can be set to prevent memory overflow.

2.2 Pipe Communication

from multiprocessing import Process, Pipe

def worker(conn):
    conn.send(['hello', 'world'])  # Send data
    data = conn.recv()  # Receive data
    print(f'Child process received: {data}')
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()  # Create both ends of the pipe
    
    p = Process(target=worker, args=(child_conn,))
    p.start()
    
    print(f'Main process received: {parent_conn.recv()}')
    parent_conn.send('Acknowledgement received')  # Send a reply
    
    p.join()

Pipe vs. Queue:

  • Pipe has higher performance but only supports communication between two processes.
  • Queue supports multiple producers and consumers.
  • Pipe returns two connection objects, while Queue is a single endpoint.

3. Process Pool (Pool)
When the number of tasks is fixed, use a process pool to avoid the overhead of frequent process creation and destruction:

from multiprocessing import Pool
import time

def square(x):
    return x * x

if __name__ == '__main__':
    with Pool(processes=4) as pool:  # Create a pool of 4 processes
        
        # apply method: synchronous execution, blocks until result is returned
        result = pool.apply(square, (10,))
        print(f'Synchronous result: {result}')
        
        # apply_async method: asynchronous execution, immediately returns an AsyncResult object
        async_result = pool.apply_async(square, (20,))
        print(f'Asynchronous result: {async_result.get()}')  # get() blocks waiting for result
        
        # map method: parallel processing of iterable objects
        results = pool.map(square, range(10))
        print(f'map results: {results}')
        
        # imap method: lazy iterator, saves memory
        for result in pool.imap(square, range(5)):
            print(f'Real-time result: {result}')

Process Pool Method Comparison:

  • apply: Synchronous blocking, suitable for sequential execution.
  • apply_async: Asynchronous non-blocking, allows callback handling.
  • map: Parallel mapping, collects all results into a list.
  • imap: Lazy evaluation, suitable for large datasets.

4. Shared State Between Processes
Although process memory is independent, multiprocessing provides shared memory mechanisms:

4.1 Value and Array Shared Memory

from multiprocessing import Process, Value, Array

def worker(n, arr):
    n.value += 1
    for i in range(len(arr)):
        arr[i] *= 2

if __name__ == '__main__':
    # Value creates a shared numerical value, 'i' indicates integer type
    num = Value('i', 0)
    
    # Array creates a shared array
    arr = Array('d', [1.0, 2.0, 3.0])  # 'd' indicates double-precision floating point
    
    processes = []
    for _ in range(3):
        p = Process(target=worker, args=(num, arr))
        p.start()
        processes.append(p)
    
    for p in processes:
        p.join()
    
    print(f'Final value: {num.value}')  # Result may be uncertain due to race conditions
    print(f'Final array: {list(arr)}')

Notes:

  • Shared variables require locking to avoid race conditions.
  • Limited supported data types (i: integer, d: float, c: character, etc.).
  • Modification operations are not atomic; manual synchronization is required.

4.2 Manager-Managed Shared Objects

from multiprocessing import Process, Manager

def worker(d, l):
    d['count'] += 1
    l.append(len(l))

if __name__ == '__main__':
    with Manager() as manager:
        # Create Manager-managed shared objects
        shared_dict = manager.dict({'count': 0})
        shared_list = manager.list()
        
        processes = []
        for _ in range(3):
            p = Process(target=worker, args=(shared_dict, shared_list))
            p.start()
            processes.append(p)
        
        for p in processes:
            p.join()
        
        print(f'Shared dictionary: {dict(shared_dict)}')
        print(f'Shared list: {list(shared_list)}')

Manager vs. Shared Memory:

  • Manager manages objects through a proxy process; lower performance but more flexible.
  • Supports complex data structures (dictionaries, lists, etc.).
  • All operations undergo network serialization, resulting in higher overhead.

Best Practice Recommendations

  1. Number of Processes: Typically set to the number of CPU cores to avoid excessive context switching.
  2. Data Serialization: Data passed between processes must be serializable (pickle).
  3. Resource Cleanup: Always join() processes promptly; use with statements for automatic management.
  4. Avoid Shared State: Prefer message passing over shared memory when possible.
  5. Error Handling: Exceptions in child processes do not automatically propagate to the main process; manual handling is required.

Applicable Scenarios Summary

  • CPU-intensive tasks: mathematical calculations, image processing, etc.
  • Scenarios requiring true parallel execution.
  • Tasks are independent with minimal communication needs.
  • Situations needing to bypass GIL limitations for multi-core utilization.

By properly utilizing the multiprocessing module, the parallel processing capability of Python programs can be significantly enhanced, especially in multi-core CPU environments.