Giao diện
Multiprocessing — Song song thực sự cho CPU-bound
Một data team cần xử lý 50GB log files hàng ngày — parse, aggregate, và tính statistics. Script Python single-threaded mất 4 giờ. Họ thử threading — vẫn 4 giờ vì GIL. Chuyển sang multiprocessing.Pool với 8 workers trên máy 8 cores — thời gian giảm xuống 32 phút. Gần 8x speedup, đúng như lý thuyết.
Multiprocessing là giải pháp chính thống của Python cho CPU-bound parallelism. Mỗi process có interpreter và GIL riêng, chạy trên CPU core riêng, đạt true parallelism mà threading không thể. Trade-off rõ ràng: overhead khởi tạo process cao hơn thread, data phải serialize qua IPC (Inter-Process Communication), và memory không chia sẻ trực tiếp.
Bài này đi từ Process cơ bản đến Pool patterns production, shared memory cho zero-copy data access, và Queue/Pipe cho IPC — đủ để bạn xây dựng data processing pipeline thật sự parallel.
Bức tranh tư duy
Hãy tưởng tượng một xưởng may ở Bình Dương. Threading giống như có 8 thợ may nhưng chỉ một máy may (GIL) — mỗi lần chỉ một thợ dùng được. Multiprocessing giống như mua 8 máy may riêng cho 8 thợ — mỗi người có máy riêng, may song song thật sự.
Chi phí? Mỗi máy may tốn tiền mua (process startup overhead), tốn diện tích (memory riêng biệt), và thợ muốn chia sẻ vải thì phải đi qua kho trung gian (IPC serialization) thay vì đưa tay qua bàn bên cạnh (shared memory trong threading).
Khi analogy breakdown: Trong thực tế, processes có thể chia sẻ memory trực tiếp qua SharedMemory (Python 3.8+) — giống như đặt cuộn vải lên bàn chung mà ai cũng với tới. Nhưng phải tự đảm bảo không ai cắt cùng một mảnh vải cùng lúc (synchronization).
Cốt lõi kỹ thuật
Process — Đơn vị song song cơ bản
python
from multiprocessing import Process
import os
import time
def cpu_work(worker_id: int, n: int) -> None:
"""CPU-bound task — chạy trong process riêng."""
print(f"Worker {worker_id}, PID={os.getpid()}: bắt đầu")
total = sum(i * i for i in range(n))
print(f"Worker {worker_id}: hoàn thành, total={total}")
if __name__ == "__main__":
start = time.perf_counter()
# Tạo 4 processes
processes = [
Process(target=cpu_work, args=(i, 10_000_000))
for i in range(4)
]
for p in processes:
p.start() # Khởi chạy process mới (fork/spawn)
for p in processes:
p.join() # Đợi process hoàn thành
print(f"Tổng: {time.perf_counter() - start:.2f}s")
# Trên 4 cores: ~gần 4x nhanh hơn sequentialPool — Worker Pool cho batch processing
Pool quản lý một nhóm worker processes, tự động phân phối tasks.
python
from multiprocessing import Pool
import time
def process_item(x: int) -> int:
"""CPU-intensive processing."""
return sum(i * i for i in range(x * 1000))
if __name__ == "__main__":
items = list(range(200))
# Sequential
start = time.perf_counter()
results_seq = [process_item(x) for x in items]
print(f"Sequential: {time.perf_counter() - start:.2f}s")
# Parallel với Pool
start = time.perf_counter()
with Pool(processes=4) as pool:
results_par = pool.map(process_item, items)
print(f"Parallel (4 workers): {time.perf_counter() - start:.2f}s")
assert results_seq == results_par # Kết quả giống nhauCác phương thức Pool
python
from multiprocessing import Pool
def square(x: int) -> int:
return x ** 2
def add(a: int, b: int) -> int:
return a + b
if __name__ == "__main__":
with Pool(4) as pool:
# map() — blocking, kết quả theo thứ tự
results = pool.map(square, range(100))
# imap_unordered() — lazy iterator, không theo thứ tự (nhanh nhất)
for result in pool.imap_unordered(square, range(100)):
pass # Xử lý ngay khi có kết quả
# starmap() — nhiều arguments
pairs = [(1, 2), (3, 4), (5, 6)]
results = pool.starmap(add, pairs) # [3, 7, 11]
# map_async() — non-blocking
async_result = pool.map_async(square, range(100))
results = async_result.get(timeout=30)ProcessPoolExecutor — High-level API
python
from concurrent.futures import ProcessPoolExecutor, as_completed
import time
def heavy_computation(n: int) -> tuple[int, int]:
result = sum(i * i for i in range(n * 10000))
return n, result
if __name__ == "__main__":
with ProcessPoolExecutor(max_workers=4) as executor:
# submit() → Future objects
futures = {
executor.submit(heavy_computation, n): n
for n in range(20)
}
# as_completed() — xử lý kết quả theo thứ tự hoàn thành
for future in as_completed(futures):
n, result = future.result()
print(f"Task {n}: {result}")
# map() — ordered results, simpler API
results = list(executor.map(heavy_computation, range(20)))SharedMemory — Chia sẻ data không cần copy
SharedMemory (Python 3.8+) cho phép nhiều processes truy cập cùng vùng nhớ — zero-copy, hiệu năng cao cho data lớn.
python
from multiprocessing import Process
from multiprocessing.shared_memory import SharedMemory
import numpy as np
def worker(shm_name: str, shape: tuple, dtype: str):
"""Worker truy cập shared memory — không cần copy data."""
existing_shm = SharedMemory(name=shm_name)
arr = np.ndarray(shape, dtype=dtype, buffer=existing_shm.buf)
# Nhân đôi tất cả phần tử (visible cho process khác)
arr[:] = arr * 2
existing_shm.close()
if __name__ == "__main__":
# Tạo array và shared memory
original = np.array([1, 2, 3, 4, 5], dtype=np.float64)
shm = SharedMemory(create=True, size=original.nbytes)
shared_arr = np.ndarray(original.shape, dtype=original.dtype, buffer=shm.buf)
shared_arr[:] = original[:]
print(f"Trước: {shared_arr}") # [1. 2. 3. 4. 5.]
p = Process(target=worker, args=(shm.name, original.shape, str(original.dtype)))
p.start()
p.join()
print(f"Sau: {shared_arr}") # [2. 4. 6. 8. 10.]
shm.close()
shm.unlink() # Giải phóng shared memoryQueue và Pipe — Inter-Process Communication
python
from multiprocessing import Process, Queue
import time
def producer(queue: Queue, num_items: int):
for i in range(num_items):
queue.put({"id": i, "data": f"item-{i}"})
time.sleep(0.01)
queue.put(None) # Sentinel signal
def consumer(queue: Queue, consumer_id: int):
processed = 0
while True:
item = queue.get()
if item is None:
queue.put(None) # Truyền sentinel cho consumer khác
break
processed += 1
print(f"Consumer {consumer_id}: xử lý {processed} items")
if __name__ == "__main__":
queue = Queue(maxsize=50)
prod = Process(target=producer, args=(queue, 100))
consumers = [Process(target=consumer, args=(queue, i)) for i in range(3)]
prod.start()
for c in consumers: c.start()
prod.join()
for c in consumers: c.join()python
from multiprocessing import Process, Pipe
from multiprocessing.connection import Connection
def worker(conn: Connection):
"""Worker giao tiếp qua Pipe — hai chiều."""
while True:
msg = conn.recv()
if msg == "STOP":
break
conn.send(msg.upper()) # Xử lý và gửi lại
conn.close()
if __name__ == "__main__":
parent_conn, child_conn = Pipe()
p = Process(target=worker, args=(child_conn,))
p.start()
for word in ["hello", "python", "world"]:
parent_conn.send(word)
result = parent_conn.recv()
print(f"{word} → {result}")
parent_conn.send("STOP")
p.join()
# hello → HELLO, python → PYTHON, world → WORLDThực chiến
Tình huống: Data Processing Pipeline cho ML Training
Bối cảnh: Cần preprocess 1 triệu ảnh cho ML training — resize, normalize, augment. Mỗi ảnh mất ~50ms xử lý. Single-threaded: 14 giờ. Budget: < 2 giờ trên máy 8 cores.
Mục tiêu: Pipeline parallel hóa preprocessing, output ghi vào shared buffer.
python
from multiprocessing import Pool
from concurrent.futures import ProcessPoolExecutor, as_completed
import time
import os
def preprocess_image(image_path: str) -> dict:
"""Giả lập image preprocessing — CPU-bound."""
time.sleep(0.005) # ~5ms thay vì 50ms để demo nhanh
filename = os.path.basename(image_path)
return {
"path": image_path,
"size": (224, 224),
"normalized": True,
"augmented": True,
}
def process_batch(batch: list[str]) -> list[dict]:
"""Xử lý một batch ảnh — giảm IPC overhead."""
return [preprocess_image(path) for path in batch]
def chunk_list(lst: list, chunk_size: int) -> list[list]:
return [lst[i:i + chunk_size] for i in range(0, len(lst), chunk_size)]
if __name__ == "__main__":
# Giả lập 10,000 image paths
image_paths = [f"/data/images/img_{i:06d}.jpg" for i in range(10_000)]
num_workers = min(os.cpu_count() or 4, 8)
batch_size = len(image_paths) // (num_workers * 4)
# Chia thành batches để giảm IPC overhead
batches = chunk_list(image_paths, batch_size)
start = time.perf_counter()
with Pool(processes=num_workers) as pool:
batch_results = pool.map(process_batch, batches)
# Flatten results
all_results = [r for batch in batch_results for r in batch]
elapsed = time.perf_counter() - start
print(f"Processed {len(all_results)} images")
print(f"Time: {elapsed:.1f}s with {num_workers} workers")
print(f"Throughput: {len(all_results) / elapsed:.0f} images/s")Phân tích:
- Batch processing giảm IPC overhead — thay vì gửi 10,000 items qua pipe, gửi ~32 batches
os.cpu_count()tự động detect số cores- Pool tái sử dụng processes — không tốn startup cost cho mỗi task
- Trade-off: batch lớn = ít IPC overhead, nhưng load balancing kém hơn nếu tasks không đều
Sai lầm điển hình
❌ Sai lầm 1: Quên if __name__ == "__main__"
Vấn đề: Trên Windows và macOS (spawn start method), thiếu guard gây recursive process creation.
python
# SAI: Recursive spawn trên Windows!
from multiprocessing import Pool
def worker(x):
return x * 2
pool = Pool(4) # Module-level → mỗi child process cũng chạy dòng này!
results = pool.map(worker, range(10))Tại sao sai: spawn start method import lại module trong child process. Code ở module level chạy lại → tạo Pool mới → tạo child processes → vòng lặp vô hạn.
python
# ĐÚNG: Guard bằng __name__ check
from multiprocessing import Pool
def worker(x):
return x * 2
if __name__ == "__main__":
with Pool(4) as pool:
results = pool.map(worker, range(10))❌ Sai lầm 2: Truyền object không pickle được
Vấn đề: Lambda, nested function, hoặc object chứa file handle không thể serialize.
python
# SAI: Lambda không pickle được
from multiprocessing import Pool
if __name__ == "__main__":
with Pool(4) as pool:
results = pool.map(lambda x: x * 2, range(10)) # PickleError!Tại sao sai: Multiprocessing serialize arguments bằng pickle để gửi qua IPC. Lambda và closures không thể pickle.
python
# ĐÚNG: Dùng named function ở module level
def double(x: int) -> int:
return x * 2
if __name__ == "__main__":
with Pool(4) as pool:
results = pool.map(double, range(10))❌ Sai lầm 3: Tạo quá nhiều processes
Vấn đề: 100 processes trên máy 4 cores — overhead vượt benefit.
python
# SAI: 100 processes cho 4 cores
from multiprocessing import Pool
if __name__ == "__main__":
with Pool(100) as pool: # Context switching overhead khổng lồ!
results = pool.map(work, items)Tại sao sai: Mỗi process tiêu ~30-50MB RAM (Python interpreter + dependencies). Context switching giữa 100 processes trên 4 cores tốn hơn computation. Hệ thống swap → chậm cực kỳ.
python
# ĐÚNG: Số workers = số CPU cores (hoặc cores - 1)
import os
from multiprocessing import Pool
if __name__ == "__main__":
num_workers = os.cpu_count() or 4
with Pool(num_workers) as pool:
results = pool.map(work, items)❌ Sai lầm 4: Copy data lớn cho mỗi process
Vấn đề: Truyền array 1GB làm argument → copy 1GB cho mỗi worker.
python
# SAI: Copy 1GB data cho mỗi process
import numpy as np
from multiprocessing import Pool
large_array = np.random.rand(100_000_000) # ~800MB
def process_chunk(indices):
return large_array[indices].sum() # large_array bị copy!Tại sao sai: Mỗi process nhận bản copy riêng qua pickle → 4 workers = 4GB RAM chỉ cho data.
python
# ĐÚNG: SharedMemory để chia sẻ data
from multiprocessing import Pool
from multiprocessing.shared_memory import SharedMemory
import numpy as np
if __name__ == "__main__":
arr = np.random.rand(1_000_000)
shm = SharedMemory(create=True, size=arr.nbytes)
shared_arr = np.ndarray(arr.shape, dtype=arr.dtype, buffer=shm.buf)
shared_arr[:] = arr[:]
def process_chunk(args):
shm_name, start, end, shape, dtype = args
existing = SharedMemory(name=shm_name)
data = np.ndarray(shape, dtype=dtype, buffer=existing.buf)
result = data[start:end].sum()
existing.close()
return result
chunk_size = len(arr) // 4
chunks = [
(shm.name, i * chunk_size, (i + 1) * chunk_size, arr.shape, str(arr.dtype))
for i in range(4)
]
with Pool(4) as pool:
results = pool.map(process_chunk, chunks)
print(f"Total: {sum(results):.4f}")
shm.close()
shm.unlink()Under the Hood
Process Start Methods
Python hỗ trợ 3 start methods, mỗi cái có trade-off riêng:
| Method | OS | Cơ chế | Tốc độ | An toàn |
|---|---|---|---|---|
fork | Linux, macOS | Copy process (COW) | Nhanh | Rủi ro với threads |
spawn | Tất cả (mặc định Windows) | Tạo interpreter mới | Chậm | An toàn nhất |
forkserver | Linux | Fork từ server process | Trung bình | An toàn |
python
import multiprocessing
# Xem start method hiện tại
print(multiprocessing.get_start_method())
# Đặt start method (phải gọi trước khi tạo Process)
multiprocessing.set_start_method("spawn")Chunksize Optimization
Khi dùng Pool.map(), chunksize quyết định bao nhiêu items gửi cho mỗi worker trong một lần IPC.
python
from multiprocessing import Pool
import time
def light_task(x):
return x * 2
if __name__ == "__main__":
items = list(range(1_000_000))
with Pool(4) as pool:
# Default chunksize — nhiều IPC round-trips
start = time.perf_counter()
pool.map(light_task, items)
print(f"Default: {time.perf_counter() - start:.2f}s")
# Optimized chunksize
start = time.perf_counter()
pool.map(light_task, items, chunksize=10000)
print(f"Chunksize=10000: {time.perf_counter() - start:.2f}s")| Task weight | Chunksize | Lý do |
|---|---|---|
| Nhẹ (< 1ms) | Lớn (1000-10000) | Giảm IPC overhead |
| Nặng (> 100ms) | Nhỏ (1-10) | Load balancing tốt hơn |
| Công thức chung | len(items) // (workers × 4) | Cân bằng overhead vs balance |
So sánh multiprocessing vs threading vs asyncio
| Metric | multiprocessing | threading | asyncio |
|---|---|---|---|
| GIL ảnh hưởng | Không | Có | Không (single-thread) |
| Memory/worker | ~30-50MB | ~8MB | ~1KB |
| Startup time | ~100ms | ~1ms | ~0.01ms |
| Max workers thực tế | = CPU cores | ~1000 | ~100,000 |
| Shared state | IPC/SharedMemory | Trực tiếp (cần Lock) | Trực tiếp (single-thread) |
| Best for | CPU-bound | I/O-bound (ít) | I/O-bound (nhiều) |
Trade-offs
Multiprocessing mạnh khi: CPU-bound tasks thuần Python, data processing, ML preprocessing, compression, scientific computing. Linear speedup với số cores.
Multiprocessing yếu khi: Tasks nhẹ (overhead IPC > computation), cần shared state phức tạp, I/O-bound (overkill — dùng threading/asyncio), hoặc khi serialization cost (pickle) cao hơn benefit.
Checklist ghi nhớ
✅ Checklist triển khai
Cơ bản
- [ ] Luôn dùng
if __name__ == "__main__"guard - [ ] Số workers =
os.cpu_count()cho CPU-bound - [ ] Dùng
with Pool() as pool— context manager tự động cleanup
Data Transfer
- [ ] Dùng
SharedMemorycho data lớn — tránh pickle overhead - [ ] Arguments và return values phải picklable — không dùng lambda, closure
- [ ] Chọn
chunksizephù hợp: task nhẹ → chunk lớn, task nặng → chunk nhỏ
IPC
- [ ]
Queuecho producer-consumer pattern — thread/process safe - [ ]
Pipecho two-way communication giữa 2 processes - [ ]
Queue(maxsize=N)tạo backpressure — tránh memory explosion
Production
- [ ] Dùng
ProcessPoolExecutorthayPoolkhi cần tích hợp với asyncio (run_in_executor) - [ ] Handle
BrokenProcessPool— worker crash không nên crash toàn bộ system - [ ] Monitor memory usage — mỗi process tiêu ~30-50MB base
- [ ] Dùng
maxtasksperchildtrong Pool để tránh memory leak dài hạn
Bài tập luyện tập
Bài 1: Parallel Word Count — Intermediate
Đề bài: Cho list 100 "files" (giả lập bằng strings), đếm tổng số từ dùng Pool.map(). So sánh thời gian với sequential.
🧠 Quiz
Câu hỏi: Khi dùng Pool(8) trên máy 4 cores cho CPU-bound tasks, hiệu năng thế nào?
- [ ] A. 8x speedup vì 8 workers
- [ ] B. 4x speedup vì 4 cores, 4 workers idle
- [x] C. Gần 4x speedup nhưng kèm overhead context switching từ 8 processes trên 4 cores
- [ ] D. Chậm hơn vì quá nhiều processes Giải thích: 4 cores chỉ chạy song song 4 processes tại một thời điểm. 4 processes còn lại chờ. Overhead context switching giữa 8 processes trên 4 cores làm giảm nhẹ hiệu năng so với dùng đúng 4 workers. B gần đúng nhưng thiếu yếu tố overhead. D chưa chính xác vì 8 vẫn nhanh hơn sequential nhiều.
💡 Gợi ý
- Tạo list strings giả lập file content
- Function đếm từ:
len(text.split()) - So sánh
pool.map()vs list comprehension
✅ Lời giải
python
from multiprocessing import Pool
import time
import os
def count_words(text: str) -> int:
# Giả lập xử lý nặng
words = text.split()
total = len(words)
_ = sum(len(w) for w in words) # Thêm CPU work
return total
def generate_files(n: int) -> list[str]:
return [f"word{i} " * 1000 for i in range(n)]
if __name__ == "__main__":
files = generate_files(1000)
# Sequential
start = time.perf_counter()
seq_counts = [count_words(f) for f in files]
seq_time = time.perf_counter() - start
# Parallel
start = time.perf_counter()
with Pool(os.cpu_count()) as pool:
par_counts = pool.map(count_words, files, chunksize=50)
par_time = time.perf_counter() - start
assert sum(seq_counts) == sum(par_counts)
print(f"Sequential: {seq_time:.2f}s")
print(f"Parallel: {par_time:.2f}s")
print(f"Speedup: {seq_time / par_time:.1f}x")Phân tích: Speedup phụ thuộc task weight. Nếu count_words quá nhẹ, IPC overhead vượt benefit. Tăng chunksize giúp giảm overhead cho light tasks.
Bài 2: SharedMemory Array Processing — Intermediate
Đề bài: Tạo một numpy array 1 triệu phần tử trong shared memory. 4 worker processes, mỗi worker xử lý 1/4 array (nhân với 2). Verify kết quả.
💡 Gợi ý
- Tạo
SharedMemoryvớicreate=True - Worker nhận
shm.name,start_idx,end_idx - Worker mở
SharedMemory(name=...)để attach
✅ Lời giải
python
from multiprocessing import Process
from multiprocessing.shared_memory import SharedMemory
import numpy as np
def worker(shm_name: str, start: int, end: int, shape: tuple, dtype: str):
shm = SharedMemory(name=shm_name)
arr = np.ndarray(shape, dtype=dtype, buffer=shm.buf)
arr[start:end] *= 2
shm.close()
if __name__ == "__main__":
n = 1_000_000
original = np.arange(n, dtype=np.float64)
shm = SharedMemory(create=True, size=original.nbytes)
shared_arr = np.ndarray(original.shape, dtype=original.dtype, buffer=shm.buf)
shared_arr[:] = original[:]
chunk = n // 4
processes = [
Process(
target=worker,
args=(shm.name, i * chunk, (i + 1) * chunk,
original.shape, str(original.dtype))
)
for i in range(4)
]
for p in processes: p.start()
for p in processes: p.join()
expected = original * 2
assert np.allclose(shared_arr, expected), "Kết quả sai!"
print("OK — SharedMemory array processed correctly")
shm.close()
shm.unlink()Phân tích: Zero-copy — workers trực tiếp modify shared buffer. Mỗi worker xử lý vùng riêng nên không cần Lock. Nếu vùng overlap → phải dùng multiprocessing.Lock.
Liên kết học tiếp
Từ khóa glossary: multiprocessing, Process, Pool, SharedMemory, Queue, Pipe, IPC, pickle, fork, spawn
Tìm kiếm liên quan: python multiprocessing tutorial, pool map, shared memory python, ipc python