Giao diện
Multiprocessing CPU-bound
Multiprocessing = Mỗi process có GIL riêng = True parallelism
Learning Outcomes
Sau bài này, bạn sẽ:
- Hiểu khi nào cần multiprocessing thay vì threading
- Sử dụng Process pools hiệu quả
- Chia sẻ data giữa processes với SharedMemory và Manager
- Implement IPC (Inter-Process Communication) patterns
- Quyết định CPU-bound vs I/O-bound cho workload của bạn
Tại sao cần Multiprocessing?
GIL Problem Recap
python
import threading
import time
def cpu_work():
"""CPU-bound task."""
total = 0
for i in range(50_000_000):
total += i
return total
# Threading KHÔNG giúp ích cho CPU-bound!
start = time.perf_counter()
t1 = threading.Thread(target=cpu_work)
t2 = threading.Thread(target=cpu_work)
t1.start()
t2.start()
t1.join()
t2.join()
print(f"Threading: {time.perf_counter() - start:.2f}s") # ~4.5s
# Sequential cũng tương tự
start = time.perf_counter()
cpu_work()
cpu_work()
print(f"Sequential: {time.perf_counter() - start:.2f}s") # ~4.2sMultiprocessing Solution
python
from multiprocessing import Process
import time
def cpu_work():
total = 0
for i in range(50_000_000):
total += i
return total
if __name__ == "__main__":
start = time.perf_counter()
p1 = Process(target=cpu_work)
p2 = Process(target=cpu_work)
p1.start()
p2.start()
p1.join()
p2.join()
print(f"Multiprocessing: {time.perf_counter() - start:.2f}s") # ~2.3s
# Gần 2x speedup trên 2 cores!Process Pools
Pool.map() - Parallel map
python
from multiprocessing import Pool
import time
def process_item(x):
"""CPU-intensive processing."""
return sum(i * i for i in range(x * 10000))
if __name__ == "__main__":
items = list(range(100))
# Sequential
start = time.perf_counter()
results_seq = [process_item(x) for x in items]
print(f"Sequential: {time.perf_counter() - start:.2f}s")
# Parallel với Pool
start = time.perf_counter()
with Pool(processes=4) as pool:
results_par = pool.map(process_item, items)
print(f"Parallel (4 workers): {time.perf_counter() - start:.2f}s")
assert results_seq == results_parPool Methods Comparison
python
from multiprocessing import Pool
def square(x):
return x ** 2
if __name__ == "__main__":
with Pool(4) as pool:
# map() - Blocking, ordered results
results = pool.map(square, range(10))
# [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
# map_async() - Non-blocking
async_result = pool.map_async(square, range(10))
results = async_result.get(timeout=10) # Wait with timeout
# imap() - Lazy iterator, ordered
for result in pool.imap(square, range(10)):
print(result) # Process as they complete (in order)
# imap_unordered() - Lazy iterator, unordered (faster)
for result in pool.imap_unordered(square, range(10)):
print(result) # Process as they complete (any order)
# starmap() - Multiple arguments
def add(a, b):
return a + b
results = pool.starmap(add, [(1, 2), (3, 4), (5, 6)])
# [3, 7, 11]
# apply() - Single call, blocking
result = pool.apply(square, (5,)) # 25
# apply_async() - Single call, non-blocking
async_result = pool.apply_async(square, (5,))
result = async_result.get() # 25ProcessPoolExecutor (High-level API)
python
from concurrent.futures import ProcessPoolExecutor, as_completed
import time
def heavy_computation(n):
"""Simulate CPU-intensive work."""
time.sleep(0.1)
return n ** 2
if __name__ == "__main__":
numbers = list(range(20))
with ProcessPoolExecutor(max_workers=4) as executor:
# submit() - Get Future objects
futures = [executor.submit(heavy_computation, n) for n in numbers]
# as_completed() - Process results as they finish
for future in as_completed(futures):
result = future.result()
print(f"Got result: {result}")
# map() - Simpler API, ordered results
results = list(executor.map(heavy_computation, numbers))
print(f"All results: {results}")Chunksize Optimization
python
from multiprocessing import Pool
import time
def light_task(x):
return x * 2
if __name__ == "__main__":
items = list(range(1_000_000))
with Pool(4) as pool:
# Default chunksize - nhiều IPC overhead
start = time.perf_counter()
results = pool.map(light_task, items)
print(f"Default chunksize: {time.perf_counter() - start:.2f}s")
# Optimized chunksize - ít IPC overhead
start = time.perf_counter()
results = pool.map(light_task, items, chunksize=10000)
print(f"Chunksize=10000: {time.perf_counter() - start:.2f}s")💡 CHUNKSIZE RULE OF THUMB
- Light tasks: Large chunksize (10000+)
- Heavy tasks: Small chunksize (1-100)
- Formula:
chunksize = len(items) // (num_workers * 4)
Shared Memory
SharedMemory (Python 3.8+)
python
from multiprocessing import Process
from multiprocessing.shared_memory import SharedMemory
import numpy as np
def worker(shm_name, shape, dtype):
"""Worker process accesses shared memory."""
# Attach to existing shared memory
existing_shm = SharedMemory(name=shm_name)
# Create numpy array backed by shared memory
arr = np.ndarray(shape, dtype=dtype, buffer=existing_shm.buf)
# Modify array (visible to all processes!)
arr[:] = arr * 2
existing_shm.close()
if __name__ == "__main__":
# Create shared memory
arr = np.array([1, 2, 3, 4, 5], dtype=np.int64)
shm = SharedMemory(create=True, size=arr.nbytes)
# Copy data to shared memory
shared_arr = np.ndarray(arr.shape, dtype=arr.dtype, buffer=shm.buf)
shared_arr[:] = arr[:]
print(f"Before: {shared_arr}") # [1 2 3 4 5]
# Start worker process
p = Process(target=worker, args=(shm.name, arr.shape, arr.dtype))
p.start()
p.join()
print(f"After: {shared_arr}") # [2 4 6 8 10]
# Cleanup
shm.close()
shm.unlink() # Delete shared memoryManager - High-level Shared State
python
from multiprocessing import Process, Manager
def worker(shared_dict, shared_list, worker_id):
"""Worker modifies shared data structures."""
shared_dict[f"worker_{worker_id}"] = f"result_{worker_id}"
shared_list.append(worker_id)
if __name__ == "__main__":
with Manager() as manager:
# Create shared data structures
shared_dict = manager.dict()
shared_list = manager.list()
# Start workers
processes = []
for i in range(4):
p = Process(target=worker, args=(shared_dict, shared_list, i))
processes.append(p)
p.start()
for p in processes:
p.join()
print(f"Dict: {dict(shared_dict)}")
# {'worker_0': 'result_0', 'worker_1': 'result_1', ...}
print(f"List: {list(shared_list)}")
# [0, 1, 2, 3] (order may vary)Value và Array - Simple Shared Types
python
from multiprocessing import Process, Value, Array
import ctypes
def increment_counter(counter, lock):
"""Safely increment shared counter."""
for _ in range(10000):
with lock:
counter.value += 1
def modify_array(arr):
"""Modify shared array."""
for i in range(len(arr)):
arr[i] = arr[i] * 2
if __name__ == "__main__":
from multiprocessing import Lock
# Shared counter
counter = Value(ctypes.c_int, 0)
lock = Lock()
processes = [
Process(target=increment_counter, args=(counter, lock))
for _ in range(4)
]
for p in processes:
p.start()
for p in processes:
p.join()
print(f"Counter: {counter.value}") # 40000
# Shared array
arr = Array(ctypes.c_double, [1.0, 2.0, 3.0, 4.0, 5.0])
p = Process(target=modify_array, args=(arr,))
p.start()
p.join()
print(f"Array: {list(arr)}") # [2.0, 4.0, 6.0, 8.0, 10.0]IPC Patterns
Queue - Producer/Consumer
python
from multiprocessing import Process, Queue
import time
def producer(queue, items):
"""Produce items to queue."""
for item in items:
print(f"Producing: {item}")
queue.put(item)
time.sleep(0.1)
queue.put(None) # Sentinel to signal done
def consumer(queue, consumer_id):
"""Consume items from queue."""
while True:
item = queue.get()
if item is None:
queue.put(None) # Pass sentinel to next consumer
break
print(f"Consumer {consumer_id} processing: {item}")
time.sleep(0.2)
if __name__ == "__main__":
queue = Queue()
items = list(range(10))
# Start producer
prod = Process(target=producer, args=(queue, items))
# Start consumers
consumers = [
Process(target=consumer, args=(queue, i))
for i in range(3)
]
prod.start()
for c in consumers:
c.start()
prod.join()
for c in consumers:
c.join()Pipe - Two-way Communication
python
from multiprocessing import Process, Pipe
def worker(conn):
"""Worker communicates via pipe."""
while True:
msg = conn.recv()
if msg == "STOP":
break
# Process and send back
result = msg.upper()
conn.send(result)
conn.close()
if __name__ == "__main__":
parent_conn, child_conn = Pipe()
p = Process(target=worker, args=(child_conn,))
p.start()
# Send messages
for msg in ["hello", "world", "python"]:
parent_conn.send(msg)
result = parent_conn.recv()
print(f"Sent: {msg}, Received: {result}")
parent_conn.send("STOP")
p.join()Event - Signaling
python
from multiprocessing import Process, Event
import time
def waiter(event, name):
"""Wait for event to be set."""
print(f"{name}: Waiting for event...")
event.wait()
print(f"{name}: Event received!")
def setter(event):
"""Set event after delay."""
time.sleep(2)
print("Setter: Setting event!")
event.set()
if __name__ == "__main__":
event = Event()
waiters = [
Process(target=waiter, args=(event, f"Waiter-{i}"))
for i in range(3)
]
setter_proc = Process(target=setter, args=(event,))
for w in waiters:
w.start()
setter_proc.start()
for w in waiters:
w.join()
setter_proc.join()CPU-bound vs I/O-bound Decision
Decision Flowchart
Benchmark Template
python
import time
import threading
import asyncio
from multiprocessing import Pool
from concurrent.futures import ThreadPoolExecutor
def benchmark_task(n):
"""Your task here - modify to test."""
# CPU-bound example:
return sum(i * i for i in range(n))
# I/O-bound example:
# import requests
# return requests.get(f"https://api.example.com/{n}").json()
def benchmark():
items = list(range(100))
# Sequential
start = time.perf_counter()
results = [benchmark_task(x) for x in items]
seq_time = time.perf_counter() - start
print(f"Sequential: {seq_time:.2f}s")
# Threading
start = time.perf_counter()
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(benchmark_task, items))
thread_time = time.perf_counter() - start
print(f"Threading: {thread_time:.2f}s")
# Multiprocessing
start = time.perf_counter()
with Pool(4) as pool:
results = pool.map(benchmark_task, items)
mp_time = time.perf_counter() - start
print(f"Multiprocessing: {mp_time:.2f}s")
# Analysis
print("\n--- Analysis ---")
if mp_time < thread_time * 0.7:
print("✅ CPU-bound: Use multiprocessing")
elif thread_time < seq_time * 0.7:
print("✅ I/O-bound: Use threading or asyncio")
else:
print("⚠️ Task too light or not parallelizable")
if __name__ == "__main__":
benchmark()Common Workloads
| Workload | Type | Best Solution |
|---|---|---|
| Image processing | CPU-bound | multiprocessing.Pool |
| Video encoding | CPU-bound | multiprocessing.Pool |
| ML training | CPU-bound | multiprocessing + NumPy |
| Data compression | CPU-bound | multiprocessing.Pool |
| Web scraping | I/O-bound | asyncio + aiohttp |
| API calls | I/O-bound | asyncio or threading |
| Database queries | I/O-bound | asyncio + async driver |
| File reading | I/O-bound | threading |
| Mixed workload | Both | multiprocessing + asyncio |
Production Pitfalls
⚠️ NHỮNG LỖI THƯỜNG GẶP
1. Quên if __name__ == "__main__"
python
# ❌ SAI - Crash trên Windows!
from multiprocessing import Pool
def worker(x):
return x * 2
pool = Pool(4) # Recursive spawn!
results = pool.map(worker, range(10))
# ✅ ĐÚNG
from multiprocessing import Pool
def worker(x):
return x * 2
if __name__ == "__main__":
with Pool(4) as pool:
results = pool.map(worker, range(10))2. Passing unpicklable objects
python
# ❌ SAI - Lambda không pickle được
from multiprocessing import Pool
if __name__ == "__main__":
with Pool(4) as pool:
results = pool.map(lambda x: x * 2, range(10)) # Error!
# ✅ ĐÚNG - Dùng named function
def double(x):
return x * 2
if __name__ == "__main__":
with Pool(4) as pool:
results = pool.map(double, range(10))3. Quá nhiều processes
python
# ❌ SAI - 100 processes cho 4 cores
from multiprocessing import Pool
if __name__ == "__main__":
with Pool(100) as pool: # Overhead > benefit!
results = pool.map(work, items)
# ✅ ĐÚNG - Match số cores
import os
from multiprocessing import Pool
if __name__ == "__main__":
num_workers = os.cpu_count() # Hoặc cpu_count() - 1
with Pool(num_workers) as pool:
results = pool.map(work, items)4. Shared state race conditions
python
# ❌ SAI - Race condition
from multiprocessing import Process, Value
counter = Value('i', 0)
def increment():
for _ in range(10000):
counter.value += 1 # NOT atomic!
# ✅ ĐÚNG - Dùng Lock
from multiprocessing import Process, Value, Lock
counter = Value('i', 0)
lock = Lock()
def increment():
for _ in range(10000):
with lock:
counter.value += 15. Memory explosion với large data
python
# ❌ SAI - Copy toàn bộ data cho mỗi process
from multiprocessing import Pool
import numpy as np
large_array = np.random.rand(1_000_000_000) # 8GB!
def process_chunk(indices):
return large_array[indices].sum() # Copy 8GB mỗi process!
# ✅ ĐÚNG - Dùng SharedMemory
from multiprocessing import Pool
from multiprocessing.shared_memory import SharedMemory
import numpy as np
if __name__ == "__main__":
# Create shared memory
arr = np.random.rand(1_000_000)
shm = SharedMemory(create=True, size=arr.nbytes)
shared_arr = np.ndarray(arr.shape, dtype=arr.dtype, buffer=shm.buf)
shared_arr[:] = arr[:]
def process_chunk(args):
shm_name, start, end, shape, dtype = args
existing_shm = SharedMemory(name=shm_name)
arr = np.ndarray(shape, dtype=dtype, buffer=existing_shm.buf)
result = arr[start:end].sum()
existing_shm.close()
return result
# Process in chunks
chunk_size = len(arr) // 4
chunks = [
(shm.name, i * chunk_size, (i + 1) * chunk_size, arr.shape, arr.dtype)
for i in range(4)
]
with Pool(4) as pool:
results = pool.map(process_chunk, chunks)
shm.close()
shm.unlink()Bảng Tóm tắt
python
# === BASIC PROCESS ===
from multiprocessing import Process
def worker(arg):
return arg * 2
if __name__ == "__main__":
p = Process(target=worker, args=(5,))
p.start()
p.join()
# === POOL ===
from multiprocessing import Pool
if __name__ == "__main__":
with Pool(4) as pool:
# map - ordered, blocking
results = pool.map(func, items)
# map with chunksize
results = pool.map(func, items, chunksize=100)
# imap_unordered - fastest for independent tasks
for result in pool.imap_unordered(func, items):
process(result)
# starmap - multiple arguments
results = pool.starmap(func, [(a1, b1), (a2, b2)])
# === PROCESSPOOL EXECUTOR ===
from concurrent.futures import ProcessPoolExecutor, as_completed
if __name__ == "__main__":
with ProcessPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(func, arg) for arg in args]
for future in as_completed(futures):
result = future.result()
# === SHARED MEMORY ===
from multiprocessing.shared_memory import SharedMemory
import numpy as np
shm = SharedMemory(create=True, size=arr.nbytes)
shared_arr = np.ndarray(arr.shape, dtype=arr.dtype, buffer=shm.buf)
# ... use shared_arr ...
shm.close()
shm.unlink()
# === MANAGER ===
from multiprocessing import Manager
with Manager() as manager:
shared_dict = manager.dict()
shared_list = manager.list()
# === IPC ===
from multiprocessing import Queue, Pipe
# Queue
queue = Queue()
queue.put(item)
item = queue.get()
# Pipe
parent_conn, child_conn = Pipe()
parent_conn.send(msg)
msg = child_conn.recv()
# === SYNCHRONIZATION ===
from multiprocessing import Lock, Event, Semaphore
lock = Lock()
with lock:
# critical section
pass
event = Event()
event.set()
event.wait()
event.clear()