Hassan Agmir Hassan Agmir

Learn Python Multiprocessing

Hassan Agmir
Learn Python Multiprocessing

In today's computing landscape, efficiently utilizing multicore processors is essential for optimizing performance. Python's multiprocessing module provides a powerful interface for parallel execution, enabling developers to bypass the Global Interpreter Lock (GIL) and fully leverage multiple CPU cores. This comprehensive guide delves into the intricacies of Python's multiprocessing capabilities, offering insights, code examples, and best practices to help you harness the full potential of parallel processing.

Introduction to Multiprocessing

Multiprocessing refers to the ability of a system to run multiple processes simultaneously. In Python, the multiprocessing module allows the creation of multiple processes, each with its own Python interpreter and memory space, enabling true parallelism. This is particularly beneficial for CPU-bound tasks that require significant computational resources.

Understanding Processes and Threads

A process is an independent program in execution, with its own memory space. A thread, on the other hand, is a lightweight subprocess that shares the same memory space as other threads within the same process. While threads are suitable for I/O-bound tasks, they are limited in Python due to the GIL, making processes a better choice for CPU-bound operations.

The Global Interpreter Lock (GIL)

The GIL is a mutex that protects access to Python objects, preventing multiple native threads from executing Python bytecodes simultaneously. This means that even on multi-core systems, Python threads cannot achieve true parallelism for CPU-bound tasks. The multiprocessing module circumvents this limitation by using separate processes, each with its own Python interpreter and memory space.

Getting Started with the multiprocessing Module

Python's multiprocessing module provides a simple interface for spawning processes using an API similar to the threading module. Here's a basic example:

import multiprocessing

def worker(num):
    """Worker function"""
    print(f'Worker: {num}')

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

In this example, five separate processes are created, each executing the worker function with a different argument.(dev.to)

Creating and Managing Processes

Creating a Process

To create a new process, instantiate the Process class with the target function and its arguments:

from multiprocessing import Process

def print_func(continent='Asia'):
    print(f'The name of continent is: {continent}')

if __name__ == "__main__":
    names = ['America', 'Europe', 'Africa']
    procs = []
    proc = Process(target=print_func)
    procs.append(proc)
    proc.start()

    for name in names:
        # print(name)
        proc = Process(target=print_func, args=(name,))
        procs.append(proc)
        proc.start()

    for proc in procs:
        proc.join()

Starting and Joining Processes

  • start(): Begins the process's activity.
  • join(): Waits for the process to finish execution.(superfastpython.com)

Using join() ensures that the main program waits for all processes to complete before proceeding.

Using Process Pools

For scenarios where you need to manage a large number of processes, the Pool class offers a convenient way to parallelize the execution of a function across multiple input values.

from multiprocessing import Pool

def square(n):
    return n * n

if __name__ == "__main__":
    with Pool(5) as p:
        result = p.map(square, [1, 2, 3, 4, 5])
    print(result)

In this example, a pool of five worker processes is created to compute the square of numbers in parallel.

Inter-Process Communication (IPC)

Processes have separate memory spaces, so sharing data between them requires explicit communication mechanisms. The multiprocessing module provides several IPC tools:(dev.to)

Queues

Queues are thread and process-safe FIFO data structures.(digitalocean.com)

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())    # prints "[42, None, 'hello']"
    p.join()

Pipes

Pipes provide a two-way communication channel between two processes.

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # prints "[42, None, 'hello']"
    p.join()

Synchronization Primitives

When multiple processes access shared resources, synchronization is crucial to prevent race conditions. The multiprocessing module offers several synchronization primitives:

Locks

Locks ensure that only one process can access a resource at a time.

from multiprocessing import Process, Lock

def f(l, i):
    l.acquire()
    try:
        print('hello world', i)
    finally:
        l.release()

if __name__ == '__main__':
    lock = Lock()

    for num in range(10):
        Process(target=f, args=(lock, num)).start()

Semaphores, Events, Conditions

These primitives provide more advanced synchronization mechanisms, suitable for complex scenarios involving multiple processes.

Error Handling in Multiprocessing

Proper error handling ensures that exceptions in child processes are managed gracefully. One approach is to use a Queue to capture exceptions:

import multiprocessing
import traceback

def worker(q):
    try:
        # Simulate an error
        raise ValueError("An error occurred")
    except Exception as e:
        q.put(traceback.format_exc())

if __name__ == '__main__':
    q = multiprocessing.Queue()
    p = multiprocessing.Process(target=worker, args=(q,))
    p.start()
    p.join()
    while not q.empty():
        print(q.get())

This setup captures the traceback of exceptions in the child process and prints them in the main process.(fabio.pierazzi.com)

Best Practices and Common Pitfalls

Best Practices

  • Use if __name__ == '__main__':: Ensures that child processes are started correctly on Windows.
  • Avoid shared state: Design processes to operate independently to minimize synchronization overhead.
  • Use Pool for simple parallel tasks: Provides a high-level interface for parallel execution.
  • Handle exceptions gracefully: Implement error handling to manage exceptions in child processes.

Common Pitfalls

  • Modifying shared objects without synchronization: Can lead to race conditions.
  • Not joining processes: May result in zombie processes.
  • Overusing processes: Creating too many processes can lead to resource exhaustion.

Real-World Applications

Multiprocessing is widely used in various domains:

  • Data Processing: Parallel processing of large datasets.
  • Web Scraping: Fetching multiple web pages concurrently.
  • Scientific Computing: Running simulations or computations in parallel.
  • Machine Learning: Training models using multiple cores.(analyticsvidhya.com)

Advanced Topics

Shared Memory

The multiprocessing.shared_memory module allows the sharing of data between processes without serialization.

from multiprocessing import shared_memory
import numpy as np

a = np.array([1, 2, 3, 4])
shm = shared_memory.SharedMemory(create=True, size=a.nbytes)
b = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf)
b[:] = a[:]

Manager Objects

Managers provide a way to create shared objects that multiple processes can access.

from multiprocessing import Manager

with Manager() as manager:
    shared_list = manager.list()
    shared_list.append('Hello')
    print(shared_list)

Continuing and expanding our deep dive into Python’s multiprocessing, this section will add detailed discussions, advanced techniques, performance tuning, real-world examples, and nuanced best practices to ensure you have a truly comprehensive reference—well beyond 14,000 characters.

14. Multiprocessing Start Methods

Depending on your operating system and requirements, multiprocessing supports three start methods:

  1. fork
    • Default on Unix/Linux.
    • The child process inherits the parent’s memory space (copy-on-write).
    • Fast startup, but beware of inherited resources (e.g., open file descriptors, threads).
  2. spawn
    • Default on Windows, available on Unix.
    • A fresh Python interpreter is launched, and only picklable objects are transferred.
    • Safer when interacting with threads or GPU contexts; slightly slower startup.
  3. forkserver
    • Unix only.
    • A separate server process is started; new processes are forked from it.
    • Avoids some fork-related issues; moderate startup cost.

How to set the start method

import multiprocessing as mp

if __name__ == '__main__':
    mp.set_start_method('spawn')  # or 'fork', 'forkserver'
    # ... your code ...

Use mp.get_all_start_methods() to list available methods, and mp.get_start_method() to inspect the current one.

15. Using concurrent.futures.ProcessPoolExecutor

While the multiprocessing.Pool API is powerful, many developers prefer the concurrent.futures interface for its consistency with threads and futures:

from concurrent.futures import ProcessPoolExecutor, as_completed

def compute(x):
    return x * x

if __name__ == '__main__':
    inputs = range(20)
    with ProcessPoolExecutor(max_workers=4) as executor:
        futures = {executor.submit(compute, i): i for i in inputs}
        for future in as_completed(futures):
            i = futures[future]
            try:
                result = future.result()
            except Exception as exc:
                print(f'Input {i} generated an exception: {exc}')
            else:
                print(f'{i} squared is {result}')
  • submit() schedules a callable and returns a Future.
  • as_completed() yields futures as they complete, ideal for streaming results.
  • Provides built-in timeouts, exception propagation, and cancellation.

16. Chunking Workloads for Efficiency

When mapping large iterables to worker processes, it’s important to tune the chunk size to balance task overhead and load-balancing:

with Pool(8) as p:
    # Auto-chunking: default chunk size ≈ len(inputs) / (processes * 4)
    result = p.map(func, inputs, chunksize=10)
  • Smaller chunks yield finer load-balancing but higher overhead.
  • Larger chunks reduce scheduling overhead but risk one slow chunk delaying all others.
  • Experiment with chunksize (e.g., between 1 and len(inputs)//(nproc*2)).

17. Profiling and Benchmarking Multiprocessing Code

Using timeit for micro-benchmarks

import timeit

setup = """
from multiprocessing import Pool
def f(x): return x*x
"""

stmt = """
with Pool(4) as p:
    p.map(f, range(10000))
"""

print(timeit.timeit(stmt, setup, number=5))

Using cProfile and pstats

import cProfile, pstats
from multiprocessing import Pool

def heavy_compute(x):
    # simulate expensive work
    for _ in range(100000):
        x += 1
    return x

def run():
    with Pool(4) as p:
        p.map(heavy_compute, range(100))

if __name__ == '__main__':
    profiler = cProfile.Profile()
    profiler.enable()
    run()
    profiler.disable()
    stats = pstats.Stats(profiler).sort_stats('cumtime')
    stats.print_stats(10)
  • cumtime highlights functions where most time is consumed.
  • Use this to find bottlenecks in both main and worker code.

18. Handling Large Data: Shared Memory vs Serialization

18.1 Pickle Overhead

Every argument to a worker is pickled and unpickled, which can become a bottleneck when passing large objects (e.g., big NumPy arrays). Strategies to mitigate:

  • Move large data into shared memory (see next section).
  • Use memory-mapped files via numpy.memmap.
  • Split data into smaller chunks to reduce individual pickle sizes.

18.2 multiprocessing.shared_memory

Introduced in Python 3.8, this API lets you create a block of memory accessible from multiple processes without pickling:

from multiprocessing import shared_memory
import numpy as np

# Parent creates a shared memory block
a = np.arange(1000000, dtype=np.int64)
shm = shared_memory.SharedMemory(create=True, size=a.nbytes)
b = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf)
b[:] = a[:]  # copy data

# Worker processes can connect by name
# In child:
existing_shm = shared_memory.SharedMemory(name=shm.name)
arr = np.ndarray(a.shape, dtype=a.dtype, buffer=existing_shm.buf)
print(arr.sum())
existing_shm.close()
# After all children exit:
shm.close()
shm.unlink()
  • .unlink() deletes the shared block when no longer needed.
  • Best for large, read-heavy data you don’t want to pickle repeatedly.

19. Combining Multiprocessing with AsyncIO

While multiprocessing is for CPU-bound tasks, asyncio excels at I/O-bound concurrency. You can combine them:

import asyncio
from concurrent.futures import ProcessPoolExecutor

def cpu_bound(x):
    # expensive work
    return sum(i*i for i in range(10**6)) + x

async def main(loop):
    with ProcessPoolExecutor() as pool:
        # Schedule CPU-bound work without blocking the event loop
        tasks = [loop.run_in_executor(pool, cpu_bound, i) for i in range(4)]
        results = await asyncio.gather(*tasks)
    print(results)

if __name__ == '__main__':
    asyncio.run(main(asyncio.get_event_loop()))

  • run_in_executor lets you offload CPU work to processes while keeping your async loop responsive for I/O.

20. Robust Logging Across Processes

Standard logging isn’t process-safe out of the box. To collect logs from workers:

  1. Use a QueueHandler in the main process:
  2. import logging
    from multiprocessing import Queue, Process, current_process
    from logging.handlers import QueueHandler, QueueListener
    
    def worker_configurer(log_queue):
        h = QueueHandler(log_queue)
        root = logging.getLogger()
        root.addHandler(h)
        root.setLevel(logging.DEBUG)
    
    def worker_process(log_queue):
        worker_configurer(log_queue)
        logger = logging.getLogger(current_process().name)
        logger.info('Worker started')
    
    if __name__ == '__main__':
        log_queue = Queue()
        listener = QueueListener(log_queue, logging.StreamHandler())
        listener.start()
    
        jobs = [Process(target=worker_process, args=(log_queue,)) for _ in range(3)]
        for j in jobs:
            j.start()
        for j in jobs:
            j.join()
    
        listener.stop()
    
  3. Alternatively, write logs to separate files per process by including %(processName)s in the filename.

21. Error Recovery and Timeouts

21.1 Using apply_async with Callbacks

from multiprocessing import Pool

def task(x):
    if x == 3:
        raise ValueError("Oops on 3")
    return x*x

def on_success(result):
    print(f"Result: {result}")

def on_error(exc):
    print(f"Caught exception: {exc}")

if __name__ == '__main__':
    with Pool(4) as p:
        for i in range(6):
            p.apply_async(task, args=(i,), callback=on_success, error_callback=on_error)
        p.close()
        p.join()
  • error_callback lets you handle exceptions per job.

21.2 Applying Timeouts

from multiprocessing import Pool, TimeoutError

def long_task(x):
    import time; time.sleep(x); return x

if __name__ == '__main__':
    with Pool(2) as p:
        async_result = p.apply_async(long_task, (5,))
        try:
            print(async_result.get(timeout=2))
        except TimeoutError:
            print("Task timed out!")

22. Deadlocks and How to Avoid Them

Deadlocks occur when processes wait indefinitely for resources held by each other. Common causes:

  • Unjoined child processes holding locks.
  • Improper use of Pipe or Queue without consuming all data (e.g., full pipes).
  • Circular dependencies between locks or semaphores.

Tips to prevent deadlocks

  1. Always consume all items put into a Queue.
  2. Use timeouts when acquiring locks.
  3. Design hierarchies for lock acquisition (e.g., always acquire Lock A before Lock B).
  4. Monitor child processes and terminate those hung beyond expected runtimes.

23. Packaging Multiprocessing Code

When distributing your code as a package:

  • Ensure that entry points guard child-spawning logic under if __name__ == '__main__':.
  • Avoid global side effects at import time—child interpreters re-import modules.
  • Test on both Unix and Windows to confirm start-method compatibility.

24. Scaling Beyond a Single Machine

For workloads exceeding a single host’s capacity, consider:

  1. Distributed task queues like Celery with a message broker (RabbitMQ, Redis).
  2. Dask for scaling NumPy/Pandas workloads across clusters with a high-level API mirroring multiprocessing.
  3. Ray for building distributed Python applications with an actor/task model.

While beyond the scope of pure multiprocessing, these frameworks build on similar concepts—parallel workers, serialization, and IPC—at cluster scale.

25. Real-World Case Studies

25.1 Image Thumbnail Generation

from multiprocessing import Pool
from PIL import Image
import glob, os

def create_thumbnail(path):
    img = Image.open(path)
    img.thumbnail((128, 128))
    base, ext = os.path.splitext(path)
    thumb_path = f'{base}_thumb{ext}'
    img.save(thumb_path)

if __name__ == '__main__':
    files = glob.glob('images/*.jpg')
    with Pool() as p:
        p.map(create_thumbnail, files)

Process thousands of images in parallel, reducing runtime from hours to minutes.

25.2 Data Analysis on CSV Chunks

import pandas as pd
from multiprocessing import Pool

def process_chunk(df_chunk):
    # e.g., compute some statistics
    return df_chunk['value'].mean()

if __name__ == '__main__':
    reader = pd.read_csv('large.csv', chunksize=100000)
    with Pool(4) as p:
        results = p.map(process_chunk, reader)
    print("Averages per chunk:", results)

By chunking and parallelizing, memory stays bounded and analysis scales.

26. Summary of Best Practices

  1. Guard entry point with if __name__ == '__main__':.
  2. Choose the right start method for your platform and use case.
  3. Minimize pickling overhead by sharing large read-only data or using memory maps.
  4. Tune chunk sizes to balance overhead and load.
  5. Use robust logging patterns to collect diagnostics.
  6. Handle exceptions and timeouts per task to avoid zombie processes.
  7. Profile and benchmark to identify real bottlenecks.
  8. Design for fault tolerance: avoid deadlocks, plan for worker failures.

27. Further Reading and Resources

  • Official Python documentation for multiprocessing.
  • Python Cookbook recipes for advanced multiprocessing patterns.
  • “High Performance Python” by Micha Gorelick & Ian Ozsvald, Chapter 5.
  • Dask and Ray documentation for distributed parallelism.

With this extensive exploration—covering basics, advanced topics, real-world examples, troubleshooting, and best practices—you should now be equipped to design, implement, and optimize Python multiprocessing solutions for virtually any CPU-bound workload. Happy parallelizing!

Subscribe to my Newsletters

Stay updated with the latest programming tips, tricks, and IT insights! Join my community to receive exclusive content on coding best practices.

© Copyright 2025 by Hassan Agmir . Built with ❤ by Me