Skip to content

Multiprocessing CPU-bound

Multiprocessing = Mỗi process có GIL riêng = True parallelism

Learning Outcomes

Sau bài này, bạn sẽ:

  • Hiểu khi nào cần multiprocessing thay vì threading
  • Sử dụng Process pools hiệu quả
  • Chia sẻ data giữa processes với SharedMemory và Manager
  • Implement IPC (Inter-Process Communication) patterns
  • Quyết định CPU-bound vs I/O-bound cho workload của bạn

Tại sao cần Multiprocessing?

GIL Problem Recap

python
import threading
import time

def cpu_work():
    """CPU-bound task."""
    total = 0
    for i in range(50_000_000):
        total += i
    return total

# Threading KHÔNG giúp ích cho CPU-bound!
start = time.perf_counter()
t1 = threading.Thread(target=cpu_work)
t2 = threading.Thread(target=cpu_work)
t1.start()
t2.start()
t1.join()
t2.join()
print(f"Threading: {time.perf_counter() - start:.2f}s")  # ~4.5s

# Sequential cũng tương tự
start = time.perf_counter()
cpu_work()
cpu_work()
print(f"Sequential: {time.perf_counter() - start:.2f}s")  # ~4.2s

Multiprocessing Solution

python
from multiprocessing import Process
import time

def cpu_work():
    total = 0
    for i in range(50_000_000):
        total += i
    return total

if __name__ == "__main__":
    start = time.perf_counter()
    
    p1 = Process(target=cpu_work)
    p2 = Process(target=cpu_work)
    
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    
    print(f"Multiprocessing: {time.perf_counter() - start:.2f}s")  # ~2.3s
    # Gần 2x speedup trên 2 cores!

Process Pools

Pool.map() - Parallel map

python
from multiprocessing import Pool
import time

def process_item(x):
    """CPU-intensive processing."""
    return sum(i * i for i in range(x * 10000))

if __name__ == "__main__":
    items = list(range(100))
    
    # Sequential
    start = time.perf_counter()
    results_seq = [process_item(x) for x in items]
    print(f"Sequential: {time.perf_counter() - start:.2f}s")
    
    # Parallel với Pool
    start = time.perf_counter()
    with Pool(processes=4) as pool:
        results_par = pool.map(process_item, items)
    print(f"Parallel (4 workers): {time.perf_counter() - start:.2f}s")
    
    assert results_seq == results_par

Pool Methods Comparison

python
from multiprocessing import Pool

def square(x):
    return x ** 2

if __name__ == "__main__":
    with Pool(4) as pool:
        # map() - Blocking, ordered results
        results = pool.map(square, range(10))
        # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
        
        # map_async() - Non-blocking
        async_result = pool.map_async(square, range(10))
        results = async_result.get(timeout=10)  # Wait with timeout
        
        # imap() - Lazy iterator, ordered
        for result in pool.imap(square, range(10)):
            print(result)  # Process as they complete (in order)
        
        # imap_unordered() - Lazy iterator, unordered (faster)
        for result in pool.imap_unordered(square, range(10)):
            print(result)  # Process as they complete (any order)
        
        # starmap() - Multiple arguments
        def add(a, b):
            return a + b
        
        results = pool.starmap(add, [(1, 2), (3, 4), (5, 6)])
        # [3, 7, 11]
        
        # apply() - Single call, blocking
        result = pool.apply(square, (5,))  # 25
        
        # apply_async() - Single call, non-blocking
        async_result = pool.apply_async(square, (5,))
        result = async_result.get()  # 25

ProcessPoolExecutor (High-level API)

python
from concurrent.futures import ProcessPoolExecutor, as_completed
import time

def heavy_computation(n):
    """Simulate CPU-intensive work."""
    time.sleep(0.1)
    return n ** 2

if __name__ == "__main__":
    numbers = list(range(20))
    
    with ProcessPoolExecutor(max_workers=4) as executor:
        # submit() - Get Future objects
        futures = [executor.submit(heavy_computation, n) for n in numbers]
        
        # as_completed() - Process results as they finish
        for future in as_completed(futures):
            result = future.result()
            print(f"Got result: {result}")
        
        # map() - Simpler API, ordered results
        results = list(executor.map(heavy_computation, numbers))
        print(f"All results: {results}")

Chunksize Optimization

python
from multiprocessing import Pool
import time

def light_task(x):
    return x * 2

if __name__ == "__main__":
    items = list(range(1_000_000))
    
    with Pool(4) as pool:
        # Default chunksize - nhiều IPC overhead
        start = time.perf_counter()
        results = pool.map(light_task, items)
        print(f"Default chunksize: {time.perf_counter() - start:.2f}s")
        
        # Optimized chunksize - ít IPC overhead
        start = time.perf_counter()
        results = pool.map(light_task, items, chunksize=10000)
        print(f"Chunksize=10000: {time.perf_counter() - start:.2f}s")

💡 CHUNKSIZE RULE OF THUMB

  • Light tasks: Large chunksize (10000+)
  • Heavy tasks: Small chunksize (1-100)
  • Formula: chunksize = len(items) // (num_workers * 4)

Shared Memory

SharedMemory (Python 3.8+)

python
from multiprocessing import Process
from multiprocessing.shared_memory import SharedMemory
import numpy as np

def worker(shm_name, shape, dtype):
    """Worker process accesses shared memory."""
    # Attach to existing shared memory
    existing_shm = SharedMemory(name=shm_name)
    
    # Create numpy array backed by shared memory
    arr = np.ndarray(shape, dtype=dtype, buffer=existing_shm.buf)
    
    # Modify array (visible to all processes!)
    arr[:] = arr * 2
    
    existing_shm.close()

if __name__ == "__main__":
    # Create shared memory
    arr = np.array([1, 2, 3, 4, 5], dtype=np.int64)
    shm = SharedMemory(create=True, size=arr.nbytes)
    
    # Copy data to shared memory
    shared_arr = np.ndarray(arr.shape, dtype=arr.dtype, buffer=shm.buf)
    shared_arr[:] = arr[:]
    
    print(f"Before: {shared_arr}")  # [1 2 3 4 5]
    
    # Start worker process
    p = Process(target=worker, args=(shm.name, arr.shape, arr.dtype))
    p.start()
    p.join()
    
    print(f"After: {shared_arr}")  # [2 4 6 8 10]
    
    # Cleanup
    shm.close()
    shm.unlink()  # Delete shared memory

Manager - High-level Shared State

python
from multiprocessing import Process, Manager

def worker(shared_dict, shared_list, worker_id):
    """Worker modifies shared data structures."""
    shared_dict[f"worker_{worker_id}"] = f"result_{worker_id}"
    shared_list.append(worker_id)

if __name__ == "__main__":
    with Manager() as manager:
        # Create shared data structures
        shared_dict = manager.dict()
        shared_list = manager.list()
        
        # Start workers
        processes = []
        for i in range(4):
            p = Process(target=worker, args=(shared_dict, shared_list, i))
            processes.append(p)
            p.start()
        
        for p in processes:
            p.join()
        
        print(f"Dict: {dict(shared_dict)}")
        # {'worker_0': 'result_0', 'worker_1': 'result_1', ...}
        
        print(f"List: {list(shared_list)}")
        # [0, 1, 2, 3] (order may vary)

Value và Array - Simple Shared Types

python
from multiprocessing import Process, Value, Array
import ctypes

def increment_counter(counter, lock):
    """Safely increment shared counter."""
    for _ in range(10000):
        with lock:
            counter.value += 1

def modify_array(arr):
    """Modify shared array."""
    for i in range(len(arr)):
        arr[i] = arr[i] * 2

if __name__ == "__main__":
    from multiprocessing import Lock
    
    # Shared counter
    counter = Value(ctypes.c_int, 0)
    lock = Lock()
    
    processes = [
        Process(target=increment_counter, args=(counter, lock))
        for _ in range(4)
    ]
    
    for p in processes:
        p.start()
    for p in processes:
        p.join()
    
    print(f"Counter: {counter.value}")  # 40000
    
    # Shared array
    arr = Array(ctypes.c_double, [1.0, 2.0, 3.0, 4.0, 5.0])
    p = Process(target=modify_array, args=(arr,))
    p.start()
    p.join()
    
    print(f"Array: {list(arr)}")  # [2.0, 4.0, 6.0, 8.0, 10.0]

IPC Patterns

Queue - Producer/Consumer

python
from multiprocessing import Process, Queue
import time

def producer(queue, items):
    """Produce items to queue."""
    for item in items:
        print(f"Producing: {item}")
        queue.put(item)
        time.sleep(0.1)
    queue.put(None)  # Sentinel to signal done

def consumer(queue, consumer_id):
    """Consume items from queue."""
    while True:
        item = queue.get()
        if item is None:
            queue.put(None)  # Pass sentinel to next consumer
            break
        print(f"Consumer {consumer_id} processing: {item}")
        time.sleep(0.2)

if __name__ == "__main__":
    queue = Queue()
    items = list(range(10))
    
    # Start producer
    prod = Process(target=producer, args=(queue, items))
    
    # Start consumers
    consumers = [
        Process(target=consumer, args=(queue, i))
        for i in range(3)
    ]
    
    prod.start()
    for c in consumers:
        c.start()
    
    prod.join()
    for c in consumers:
        c.join()

Pipe - Two-way Communication

python
from multiprocessing import Process, Pipe

def worker(conn):
    """Worker communicates via pipe."""
    while True:
        msg = conn.recv()
        if msg == "STOP":
            break
        
        # Process and send back
        result = msg.upper()
        conn.send(result)
    
    conn.close()

if __name__ == "__main__":
    parent_conn, child_conn = Pipe()
    
    p = Process(target=worker, args=(child_conn,))
    p.start()
    
    # Send messages
    for msg in ["hello", "world", "python"]:
        parent_conn.send(msg)
        result = parent_conn.recv()
        print(f"Sent: {msg}, Received: {result}")
    
    parent_conn.send("STOP")
    p.join()

Event - Signaling

python
from multiprocessing import Process, Event
import time

def waiter(event, name):
    """Wait for event to be set."""
    print(f"{name}: Waiting for event...")
    event.wait()
    print(f"{name}: Event received!")

def setter(event):
    """Set event after delay."""
    time.sleep(2)
    print("Setter: Setting event!")
    event.set()

if __name__ == "__main__":
    event = Event()
    
    waiters = [
        Process(target=waiter, args=(event, f"Waiter-{i}"))
        for i in range(3)
    ]
    setter_proc = Process(target=setter, args=(event,))
    
    for w in waiters:
        w.start()
    setter_proc.start()
    
    for w in waiters:
        w.join()
    setter_proc.join()

CPU-bound vs I/O-bound Decision

Decision Flowchart

Benchmark Template

python
import time
import threading
import asyncio
from multiprocessing import Pool
from concurrent.futures import ThreadPoolExecutor

def benchmark_task(n):
    """Your task here - modify to test."""
    # CPU-bound example:
    return sum(i * i for i in range(n))
    
    # I/O-bound example:
    # import requests
    # return requests.get(f"https://api.example.com/{n}").json()

def benchmark():
    items = list(range(100))
    
    # Sequential
    start = time.perf_counter()
    results = [benchmark_task(x) for x in items]
    seq_time = time.perf_counter() - start
    print(f"Sequential: {seq_time:.2f}s")
    
    # Threading
    start = time.perf_counter()
    with ThreadPoolExecutor(max_workers=4) as executor:
        results = list(executor.map(benchmark_task, items))
    thread_time = time.perf_counter() - start
    print(f"Threading: {thread_time:.2f}s")
    
    # Multiprocessing
    start = time.perf_counter()
    with Pool(4) as pool:
        results = pool.map(benchmark_task, items)
    mp_time = time.perf_counter() - start
    print(f"Multiprocessing: {mp_time:.2f}s")
    
    # Analysis
    print("\n--- Analysis ---")
    if mp_time < thread_time * 0.7:
        print("✅ CPU-bound: Use multiprocessing")
    elif thread_time < seq_time * 0.7:
        print("✅ I/O-bound: Use threading or asyncio")
    else:
        print("⚠️ Task too light or not parallelizable")

if __name__ == "__main__":
    benchmark()

Common Workloads

WorkloadTypeBest Solution
Image processingCPU-boundmultiprocessing.Pool
Video encodingCPU-boundmultiprocessing.Pool
ML trainingCPU-boundmultiprocessing + NumPy
Data compressionCPU-boundmultiprocessing.Pool
Web scrapingI/O-boundasyncio + aiohttp
API callsI/O-boundasyncio or threading
Database queriesI/O-boundasyncio + async driver
File readingI/O-boundthreading
Mixed workloadBothmultiprocessing + asyncio

Production Pitfalls

⚠️ NHỮNG LỖI THƯỜNG GẶP

1. Quên if __name__ == "__main__"

python
# ❌ SAI - Crash trên Windows!
from multiprocessing import Pool

def worker(x):
    return x * 2

pool = Pool(4)  # Recursive spawn!
results = pool.map(worker, range(10))

# ✅ ĐÚNG
from multiprocessing import Pool

def worker(x):
    return x * 2

if __name__ == "__main__":
    with Pool(4) as pool:
        results = pool.map(worker, range(10))

2. Passing unpicklable objects

python
# ❌ SAI - Lambda không pickle được
from multiprocessing import Pool

if __name__ == "__main__":
    with Pool(4) as pool:
        results = pool.map(lambda x: x * 2, range(10))  # Error!

# ✅ ĐÚNG - Dùng named function
def double(x):
    return x * 2

if __name__ == "__main__":
    with Pool(4) as pool:
        results = pool.map(double, range(10))

3. Quá nhiều processes

python
# ❌ SAI - 100 processes cho 4 cores
from multiprocessing import Pool

if __name__ == "__main__":
    with Pool(100) as pool:  # Overhead > benefit!
        results = pool.map(work, items)

# ✅ ĐÚNG - Match số cores
import os
from multiprocessing import Pool

if __name__ == "__main__":
    num_workers = os.cpu_count()  # Hoặc cpu_count() - 1
    with Pool(num_workers) as pool:
        results = pool.map(work, items)

4. Shared state race conditions

python
# ❌ SAI - Race condition
from multiprocessing import Process, Value

counter = Value('i', 0)

def increment():
    for _ in range(10000):
        counter.value += 1  # NOT atomic!

# ✅ ĐÚNG - Dùng Lock
from multiprocessing import Process, Value, Lock

counter = Value('i', 0)
lock = Lock()

def increment():
    for _ in range(10000):
        with lock:
            counter.value += 1

5. Memory explosion với large data

python
# ❌ SAI - Copy toàn bộ data cho mỗi process
from multiprocessing import Pool
import numpy as np

large_array = np.random.rand(1_000_000_000)  # 8GB!

def process_chunk(indices):
    return large_array[indices].sum()  # Copy 8GB mỗi process!

# ✅ ĐÚNG - Dùng SharedMemory
from multiprocessing import Pool
from multiprocessing.shared_memory import SharedMemory
import numpy as np

if __name__ == "__main__":
    # Create shared memory
    arr = np.random.rand(1_000_000)
    shm = SharedMemory(create=True, size=arr.nbytes)
    shared_arr = np.ndarray(arr.shape, dtype=arr.dtype, buffer=shm.buf)
    shared_arr[:] = arr[:]
    
    def process_chunk(args):
        shm_name, start, end, shape, dtype = args
        existing_shm = SharedMemory(name=shm_name)
        arr = np.ndarray(shape, dtype=dtype, buffer=existing_shm.buf)
        result = arr[start:end].sum()
        existing_shm.close()
        return result
    
    # Process in chunks
    chunk_size = len(arr) // 4
    chunks = [
        (shm.name, i * chunk_size, (i + 1) * chunk_size, arr.shape, arr.dtype)
        for i in range(4)
    ]
    
    with Pool(4) as pool:
        results = pool.map(process_chunk, chunks)
    
    shm.close()
    shm.unlink()

Bảng Tóm tắt

python
# === BASIC PROCESS ===
from multiprocessing import Process

def worker(arg):
    return arg * 2

if __name__ == "__main__":
    p = Process(target=worker, args=(5,))
    p.start()
    p.join()

# === POOL ===
from multiprocessing import Pool

if __name__ == "__main__":
    with Pool(4) as pool:
        # map - ordered, blocking
        results = pool.map(func, items)
        
        # map with chunksize
        results = pool.map(func, items, chunksize=100)
        
        # imap_unordered - fastest for independent tasks
        for result in pool.imap_unordered(func, items):
            process(result)
        
        # starmap - multiple arguments
        results = pool.starmap(func, [(a1, b1), (a2, b2)])

# === PROCESSPOOL EXECUTOR ===
from concurrent.futures import ProcessPoolExecutor, as_completed

if __name__ == "__main__":
    with ProcessPoolExecutor(max_workers=4) as executor:
        futures = [executor.submit(func, arg) for arg in args]
        
        for future in as_completed(futures):
            result = future.result()

# === SHARED MEMORY ===
from multiprocessing.shared_memory import SharedMemory
import numpy as np

shm = SharedMemory(create=True, size=arr.nbytes)
shared_arr = np.ndarray(arr.shape, dtype=arr.dtype, buffer=shm.buf)
# ... use shared_arr ...
shm.close()
shm.unlink()

# === MANAGER ===
from multiprocessing import Manager

with Manager() as manager:
    shared_dict = manager.dict()
    shared_list = manager.list()

# === IPC ===
from multiprocessing import Queue, Pipe

# Queue
queue = Queue()
queue.put(item)
item = queue.get()

# Pipe
parent_conn, child_conn = Pipe()
parent_conn.send(msg)
msg = child_conn.recv()

# === SYNCHRONIZATION ===
from multiprocessing import Lock, Event, Semaphore

lock = Lock()
with lock:
    # critical section
    pass

event = Event()
event.set()
event.wait()
event.clear()