Skip to content

Multiprocessing — Song song thực sự cho CPU-bound

Một data team cần xử lý 50GB log files hàng ngày — parse, aggregate, và tính statistics. Script Python single-threaded mất 4 giờ. Họ thử threading — vẫn 4 giờ vì GIL. Chuyển sang multiprocessing.Pool với 8 workers trên máy 8 cores — thời gian giảm xuống 32 phút. Gần 8x speedup, đúng như lý thuyết.

Multiprocessing là giải pháp chính thống của Python cho CPU-bound parallelism. Mỗi process có interpreter và GIL riêng, chạy trên CPU core riêng, đạt true parallelism mà threading không thể. Trade-off rõ ràng: overhead khởi tạo process cao hơn thread, data phải serialize qua IPC (Inter-Process Communication), và memory không chia sẻ trực tiếp.

Bài này đi từ Process cơ bản đến Pool patterns production, shared memory cho zero-copy data access, và Queue/Pipe cho IPC — đủ để bạn xây dựng data processing pipeline thật sự parallel.

Bức tranh tư duy

Hãy tưởng tượng một xưởng may ở Bình Dương. Threading giống như có 8 thợ may nhưng chỉ một máy may (GIL) — mỗi lần chỉ một thợ dùng được. Multiprocessing giống như mua 8 máy may riêng cho 8 thợ — mỗi người có máy riêng, may song song thật sự.

Chi phí? Mỗi máy may tốn tiền mua (process startup overhead), tốn diện tích (memory riêng biệt), và thợ muốn chia sẻ vải thì phải đi qua kho trung gian (IPC serialization) thay vì đưa tay qua bàn bên cạnh (shared memory trong threading).

Khi analogy breakdown: Trong thực tế, processes có thể chia sẻ memory trực tiếp qua SharedMemory (Python 3.8+) — giống như đặt cuộn vải lên bàn chung mà ai cũng với tới. Nhưng phải tự đảm bảo không ai cắt cùng một mảnh vải cùng lúc (synchronization).

Cốt lõi kỹ thuật

Process — Đơn vị song song cơ bản

python
from multiprocessing import Process
import os
import time

def cpu_work(worker_id: int, n: int) -> None:
    """CPU-bound task — chạy trong process riêng."""
    print(f"Worker {worker_id}, PID={os.getpid()}: bắt đầu")
    total = sum(i * i for i in range(n))
    print(f"Worker {worker_id}: hoàn thành, total={total}")

if __name__ == "__main__":
    start = time.perf_counter()

    # Tạo 4 processes
    processes = [
        Process(target=cpu_work, args=(i, 10_000_000))
        for i in range(4)
    ]

    for p in processes:
        p.start()  # Khởi chạy process mới (fork/spawn)

    for p in processes:
        p.join()  # Đợi process hoàn thành

    print(f"Tổng: {time.perf_counter() - start:.2f}s")
    # Trên 4 cores: ~gần 4x nhanh hơn sequential

Pool — Worker Pool cho batch processing

Pool quản lý một nhóm worker processes, tự động phân phối tasks.

python
from multiprocessing import Pool
import time

def process_item(x: int) -> int:
    """CPU-intensive processing."""
    return sum(i * i for i in range(x * 1000))

if __name__ == "__main__":
    items = list(range(200))

    # Sequential
    start = time.perf_counter()
    results_seq = [process_item(x) for x in items]
    print(f"Sequential: {time.perf_counter() - start:.2f}s")

    # Parallel với Pool
    start = time.perf_counter()
    with Pool(processes=4) as pool:
        results_par = pool.map(process_item, items)
    print(f"Parallel (4 workers): {time.perf_counter() - start:.2f}s")

    assert results_seq == results_par  # Kết quả giống nhau

Các phương thức Pool

python
from multiprocessing import Pool

def square(x: int) -> int:
    return x ** 2

def add(a: int, b: int) -> int:
    return a + b

if __name__ == "__main__":
    with Pool(4) as pool:
        # map() — blocking, kết quả theo thứ tự
        results = pool.map(square, range(100))

        # imap_unordered() — lazy iterator, không theo thứ tự (nhanh nhất)
        for result in pool.imap_unordered(square, range(100)):
            pass  # Xử lý ngay khi có kết quả

        # starmap() — nhiều arguments
        pairs = [(1, 2), (3, 4), (5, 6)]
        results = pool.starmap(add, pairs)  # [3, 7, 11]

        # map_async() — non-blocking
        async_result = pool.map_async(square, range(100))
        results = async_result.get(timeout=30)

ProcessPoolExecutor — High-level API

python
from concurrent.futures import ProcessPoolExecutor, as_completed
import time

def heavy_computation(n: int) -> tuple[int, int]:
    result = sum(i * i for i in range(n * 10000))
    return n, result

if __name__ == "__main__":
    with ProcessPoolExecutor(max_workers=4) as executor:
        # submit() → Future objects
        futures = {
            executor.submit(heavy_computation, n): n
            for n in range(20)
        }

        # as_completed() — xử lý kết quả theo thứ tự hoàn thành
        for future in as_completed(futures):
            n, result = future.result()
            print(f"Task {n}: {result}")

        # map() — ordered results, simpler API
        results = list(executor.map(heavy_computation, range(20)))

SharedMemory — Chia sẻ data không cần copy

SharedMemory (Python 3.8+) cho phép nhiều processes truy cập cùng vùng nhớ — zero-copy, hiệu năng cao cho data lớn.

python
from multiprocessing import Process
from multiprocessing.shared_memory import SharedMemory
import numpy as np

def worker(shm_name: str, shape: tuple, dtype: str):
    """Worker truy cập shared memory — không cần copy data."""
    existing_shm = SharedMemory(name=shm_name)
    arr = np.ndarray(shape, dtype=dtype, buffer=existing_shm.buf)

    # Nhân đôi tất cả phần tử (visible cho process khác)
    arr[:] = arr * 2
    existing_shm.close()

if __name__ == "__main__":
    # Tạo array và shared memory
    original = np.array([1, 2, 3, 4, 5], dtype=np.float64)
    shm = SharedMemory(create=True, size=original.nbytes)
    shared_arr = np.ndarray(original.shape, dtype=original.dtype, buffer=shm.buf)
    shared_arr[:] = original[:]

    print(f"Trước: {shared_arr}")  # [1. 2. 3. 4. 5.]

    p = Process(target=worker, args=(shm.name, original.shape, str(original.dtype)))
    p.start()
    p.join()

    print(f"Sau: {shared_arr}")  # [2. 4. 6. 8. 10.]

    shm.close()
    shm.unlink()  # Giải phóng shared memory

Queue và Pipe — Inter-Process Communication

python
from multiprocessing import Process, Queue
import time

def producer(queue: Queue, num_items: int):
    for i in range(num_items):
        queue.put({"id": i, "data": f"item-{i}"})
        time.sleep(0.01)
    queue.put(None)  # Sentinel signal

def consumer(queue: Queue, consumer_id: int):
    processed = 0
    while True:
        item = queue.get()
        if item is None:
            queue.put(None)  # Truyền sentinel cho consumer khác
            break
        processed += 1
    print(f"Consumer {consumer_id}: xử lý {processed} items")

if __name__ == "__main__":
    queue = Queue(maxsize=50)

    prod = Process(target=producer, args=(queue, 100))
    consumers = [Process(target=consumer, args=(queue, i)) for i in range(3)]

    prod.start()
    for c in consumers: c.start()

    prod.join()
    for c in consumers: c.join()
python
from multiprocessing import Process, Pipe
from multiprocessing.connection import Connection

def worker(conn: Connection):
    """Worker giao tiếp qua Pipe — hai chiều."""
    while True:
        msg = conn.recv()
        if msg == "STOP":
            break
        conn.send(msg.upper())  # Xử lý và gửi lại
    conn.close()

if __name__ == "__main__":
    parent_conn, child_conn = Pipe()
    p = Process(target=worker, args=(child_conn,))
    p.start()

    for word in ["hello", "python", "world"]:
        parent_conn.send(word)
        result = parent_conn.recv()
        print(f"{word}{result}")

    parent_conn.send("STOP")
    p.join()
    # hello → HELLO, python → PYTHON, world → WORLD

Thực chiến

Tình huống: Data Processing Pipeline cho ML Training

Bối cảnh: Cần preprocess 1 triệu ảnh cho ML training — resize, normalize, augment. Mỗi ảnh mất ~50ms xử lý. Single-threaded: 14 giờ. Budget: < 2 giờ trên máy 8 cores.

Mục tiêu: Pipeline parallel hóa preprocessing, output ghi vào shared buffer.

python
from multiprocessing import Pool
from concurrent.futures import ProcessPoolExecutor, as_completed
import time
import os

def preprocess_image(image_path: str) -> dict:
    """Giả lập image preprocessing — CPU-bound."""
    time.sleep(0.005)  # ~5ms thay vì 50ms để demo nhanh

    filename = os.path.basename(image_path)
    return {
        "path": image_path,
        "size": (224, 224),
        "normalized": True,
        "augmented": True,
    }

def process_batch(batch: list[str]) -> list[dict]:
    """Xử lý một batch ảnh — giảm IPC overhead."""
    return [preprocess_image(path) for path in batch]

def chunk_list(lst: list, chunk_size: int) -> list[list]:
    return [lst[i:i + chunk_size] for i in range(0, len(lst), chunk_size)]

if __name__ == "__main__":
    # Giả lập 10,000 image paths
    image_paths = [f"/data/images/img_{i:06d}.jpg" for i in range(10_000)]

    num_workers = min(os.cpu_count() or 4, 8)
    batch_size = len(image_paths) // (num_workers * 4)

    # Chia thành batches để giảm IPC overhead
    batches = chunk_list(image_paths, batch_size)

    start = time.perf_counter()
    with Pool(processes=num_workers) as pool:
        batch_results = pool.map(process_batch, batches)

    # Flatten results
    all_results = [r for batch in batch_results for r in batch]
    elapsed = time.perf_counter() - start

    print(f"Processed {len(all_results)} images")
    print(f"Time: {elapsed:.1f}s with {num_workers} workers")
    print(f"Throughput: {len(all_results) / elapsed:.0f} images/s")

Phân tích:

  • Batch processing giảm IPC overhead — thay vì gửi 10,000 items qua pipe, gửi ~32 batches
  • os.cpu_count() tự động detect số cores
  • Pool tái sử dụng processes — không tốn startup cost cho mỗi task
  • Trade-off: batch lớn = ít IPC overhead, nhưng load balancing kém hơn nếu tasks không đều

Sai lầm điển hình

Sai lầm 1: Quên if __name__ == "__main__"

Vấn đề: Trên Windows và macOS (spawn start method), thiếu guard gây recursive process creation.

python
# SAI: Recursive spawn trên Windows!
from multiprocessing import Pool

def worker(x):
    return x * 2

pool = Pool(4)  # Module-level → mỗi child process cũng chạy dòng này!
results = pool.map(worker, range(10))

Tại sao sai: spawn start method import lại module trong child process. Code ở module level chạy lại → tạo Pool mới → tạo child processes → vòng lặp vô hạn.

python
# ĐÚNG: Guard bằng __name__ check
from multiprocessing import Pool

def worker(x):
    return x * 2

if __name__ == "__main__":
    with Pool(4) as pool:
        results = pool.map(worker, range(10))

Sai lầm 2: Truyền object không pickle được

Vấn đề: Lambda, nested function, hoặc object chứa file handle không thể serialize.

python
# SAI: Lambda không pickle được
from multiprocessing import Pool

if __name__ == "__main__":
    with Pool(4) as pool:
        results = pool.map(lambda x: x * 2, range(10))  # PickleError!

Tại sao sai: Multiprocessing serialize arguments bằng pickle để gửi qua IPC. Lambda và closures không thể pickle.

python
# ĐÚNG: Dùng named function ở module level
def double(x: int) -> int:
    return x * 2

if __name__ == "__main__":
    with Pool(4) as pool:
        results = pool.map(double, range(10))

Sai lầm 3: Tạo quá nhiều processes

Vấn đề: 100 processes trên máy 4 cores — overhead vượt benefit.

python
# SAI: 100 processes cho 4 cores
from multiprocessing import Pool

if __name__ == "__main__":
    with Pool(100) as pool:  # Context switching overhead khổng lồ!
        results = pool.map(work, items)

Tại sao sai: Mỗi process tiêu ~30-50MB RAM (Python interpreter + dependencies). Context switching giữa 100 processes trên 4 cores tốn hơn computation. Hệ thống swap → chậm cực kỳ.

python
# ĐÚNG: Số workers = số CPU cores (hoặc cores - 1)
import os
from multiprocessing import Pool

if __name__ == "__main__":
    num_workers = os.cpu_count() or 4
    with Pool(num_workers) as pool:
        results = pool.map(work, items)

Sai lầm 4: Copy data lớn cho mỗi process

Vấn đề: Truyền array 1GB làm argument → copy 1GB cho mỗi worker.

python
# SAI: Copy 1GB data cho mỗi process
import numpy as np
from multiprocessing import Pool

large_array = np.random.rand(100_000_000)  # ~800MB

def process_chunk(indices):
    return large_array[indices].sum()  # large_array bị copy!

Tại sao sai: Mỗi process nhận bản copy riêng qua pickle → 4 workers = 4GB RAM chỉ cho data.

python
# ĐÚNG: SharedMemory để chia sẻ data
from multiprocessing import Pool
from multiprocessing.shared_memory import SharedMemory
import numpy as np

if __name__ == "__main__":
    arr = np.random.rand(1_000_000)
    shm = SharedMemory(create=True, size=arr.nbytes)
    shared_arr = np.ndarray(arr.shape, dtype=arr.dtype, buffer=shm.buf)
    shared_arr[:] = arr[:]

    def process_chunk(args):
        shm_name, start, end, shape, dtype = args
        existing = SharedMemory(name=shm_name)
        data = np.ndarray(shape, dtype=dtype, buffer=existing.buf)
        result = data[start:end].sum()
        existing.close()
        return result

    chunk_size = len(arr) // 4
    chunks = [
        (shm.name, i * chunk_size, (i + 1) * chunk_size, arr.shape, str(arr.dtype))
        for i in range(4)
    ]

    with Pool(4) as pool:
        results = pool.map(process_chunk, chunks)

    print(f"Total: {sum(results):.4f}")
    shm.close()
    shm.unlink()

Under the Hood

Process Start Methods

Python hỗ trợ 3 start methods, mỗi cái có trade-off riêng:

MethodOSCơ chếTốc độAn toàn
forkLinux, macOSCopy process (COW)NhanhRủi ro với threads
spawnTất cả (mặc định Windows)Tạo interpreter mớiChậmAn toàn nhất
forkserverLinuxFork từ server processTrung bìnhAn toàn
python
import multiprocessing

# Xem start method hiện tại
print(multiprocessing.get_start_method())

# Đặt start method (phải gọi trước khi tạo Process)
multiprocessing.set_start_method("spawn")

Chunksize Optimization

Khi dùng Pool.map(), chunksize quyết định bao nhiêu items gửi cho mỗi worker trong một lần IPC.

python
from multiprocessing import Pool
import time

def light_task(x):
    return x * 2

if __name__ == "__main__":
    items = list(range(1_000_000))

    with Pool(4) as pool:
        # Default chunksize — nhiều IPC round-trips
        start = time.perf_counter()
        pool.map(light_task, items)
        print(f"Default: {time.perf_counter() - start:.2f}s")

        # Optimized chunksize
        start = time.perf_counter()
        pool.map(light_task, items, chunksize=10000)
        print(f"Chunksize=10000: {time.perf_counter() - start:.2f}s")
Task weightChunksizeLý do
Nhẹ (< 1ms)Lớn (1000-10000)Giảm IPC overhead
Nặng (> 100ms)Nhỏ (1-10)Load balancing tốt hơn
Công thức chunglen(items) // (workers × 4)Cân bằng overhead vs balance

So sánh multiprocessing vs threading vs asyncio

Metricmultiprocessingthreadingasyncio
GIL ảnh hưởngKhôngKhông (single-thread)
Memory/worker~30-50MB~8MB~1KB
Startup time~100ms~1ms~0.01ms
Max workers thực tế= CPU cores~1000~100,000
Shared stateIPC/SharedMemoryTrực tiếp (cần Lock)Trực tiếp (single-thread)
Best forCPU-boundI/O-bound (ít)I/O-bound (nhiều)

Trade-offs

Multiprocessing mạnh khi: CPU-bound tasks thuần Python, data processing, ML preprocessing, compression, scientific computing. Linear speedup với số cores.

Multiprocessing yếu khi: Tasks nhẹ (overhead IPC > computation), cần shared state phức tạp, I/O-bound (overkill — dùng threading/asyncio), hoặc khi serialization cost (pickle) cao hơn benefit.

Checklist ghi nhớ

✅ Checklist triển khai

Cơ bản

  • [ ] Luôn dùng if __name__ == "__main__" guard
  • [ ] Số workers = os.cpu_count() cho CPU-bound
  • [ ] Dùng with Pool() as pool — context manager tự động cleanup

Data Transfer

  • [ ] Dùng SharedMemory cho data lớn — tránh pickle overhead
  • [ ] Arguments và return values phải picklable — không dùng lambda, closure
  • [ ] Chọn chunksize phù hợp: task nhẹ → chunk lớn, task nặng → chunk nhỏ

IPC

  • [ ] Queue cho producer-consumer pattern — thread/process safe
  • [ ] Pipe cho two-way communication giữa 2 processes
  • [ ] Queue(maxsize=N) tạo backpressure — tránh memory explosion

Production

  • [ ] Dùng ProcessPoolExecutor thay Pool khi cần tích hợp với asyncio (run_in_executor)
  • [ ] Handle BrokenProcessPool — worker crash không nên crash toàn bộ system
  • [ ] Monitor memory usage — mỗi process tiêu ~30-50MB base
  • [ ] Dùng maxtasksperchild trong Pool để tránh memory leak dài hạn

Bài tập luyện tập

Bài 1: Parallel Word Count — Intermediate

Đề bài: Cho list 100 "files" (giả lập bằng strings), đếm tổng số từ dùng Pool.map(). So sánh thời gian với sequential.

🧠 Quiz

Câu hỏi: Khi dùng Pool(8) trên máy 4 cores cho CPU-bound tasks, hiệu năng thế nào?

  • [ ] A. 8x speedup vì 8 workers
  • [ ] B. 4x speedup vì 4 cores, 4 workers idle
  • [x] C. Gần 4x speedup nhưng kèm overhead context switching từ 8 processes trên 4 cores
  • [ ] D. Chậm hơn vì quá nhiều processes Giải thích: 4 cores chỉ chạy song song 4 processes tại một thời điểm. 4 processes còn lại chờ. Overhead context switching giữa 8 processes trên 4 cores làm giảm nhẹ hiệu năng so với dùng đúng 4 workers. B gần đúng nhưng thiếu yếu tố overhead. D chưa chính xác vì 8 vẫn nhanh hơn sequential nhiều.
💡 Gợi ý
  • Tạo list strings giả lập file content
  • Function đếm từ: len(text.split())
  • So sánh pool.map() vs list comprehension
✅ Lời giải
python
from multiprocessing import Pool
import time
import os

def count_words(text: str) -> int:
    # Giả lập xử lý nặng
    words = text.split()
    total = len(words)
    _ = sum(len(w) for w in words)  # Thêm CPU work
    return total

def generate_files(n: int) -> list[str]:
    return [f"word{i} " * 1000 for i in range(n)]

if __name__ == "__main__":
    files = generate_files(1000)

    # Sequential
    start = time.perf_counter()
    seq_counts = [count_words(f) for f in files]
    seq_time = time.perf_counter() - start

    # Parallel
    start = time.perf_counter()
    with Pool(os.cpu_count()) as pool:
        par_counts = pool.map(count_words, files, chunksize=50)
    par_time = time.perf_counter() - start

    assert sum(seq_counts) == sum(par_counts)
    print(f"Sequential: {seq_time:.2f}s")
    print(f"Parallel:   {par_time:.2f}s")
    print(f"Speedup:    {seq_time / par_time:.1f}x")

Phân tích: Speedup phụ thuộc task weight. Nếu count_words quá nhẹ, IPC overhead vượt benefit. Tăng chunksize giúp giảm overhead cho light tasks.

Bài 2: SharedMemory Array Processing — Intermediate

Đề bài: Tạo một numpy array 1 triệu phần tử trong shared memory. 4 worker processes, mỗi worker xử lý 1/4 array (nhân với 2). Verify kết quả.

💡 Gợi ý
  • Tạo SharedMemory với create=True
  • Worker nhận shm.name, start_idx, end_idx
  • Worker mở SharedMemory(name=...) để attach
✅ Lời giải
python
from multiprocessing import Process
from multiprocessing.shared_memory import SharedMemory
import numpy as np

def worker(shm_name: str, start: int, end: int, shape: tuple, dtype: str):
    shm = SharedMemory(name=shm_name)
    arr = np.ndarray(shape, dtype=dtype, buffer=shm.buf)
    arr[start:end] *= 2
    shm.close()

if __name__ == "__main__":
    n = 1_000_000
    original = np.arange(n, dtype=np.float64)

    shm = SharedMemory(create=True, size=original.nbytes)
    shared_arr = np.ndarray(original.shape, dtype=original.dtype, buffer=shm.buf)
    shared_arr[:] = original[:]

    chunk = n // 4
    processes = [
        Process(
            target=worker,
            args=(shm.name, i * chunk, (i + 1) * chunk,
                  original.shape, str(original.dtype))
        )
        for i in range(4)
    ]

    for p in processes: p.start()
    for p in processes: p.join()

    expected = original * 2
    assert np.allclose(shared_arr, expected), "Kết quả sai!"
    print("OK — SharedMemory array processed correctly")

    shm.close()
    shm.unlink()

Phân tích: Zero-copy — workers trực tiếp modify shared buffer. Mỗi worker xử lý vùng riêng nên không cần Lock. Nếu vùng overlap → phải dùng multiprocessing.Lock.

Liên kết học tiếp

Từ khóa glossary: multiprocessing, Process, Pool, SharedMemory, Queue, Pipe, IPC, pickle, fork, spawn

Tìm kiếm liên quan: python multiprocessing tutorial, pool map, shared memory python, ipc python