Skip to content

Concurrent Patterns — Kiến trúc đồng thời Production

Bạn đã biết asyncio.gather()Pool.map() — nhưng khi xây dựng hệ thống thật, bạn cần nhiều hơn primitives. Một data ingestion pipeline cần producer đọc từ Kafka, consumers xử lý song song, rate limiter bảo vệ downstream services, và dead letter queue cho failures. Đây là lúc patterns trở nên quan trọng hơn tools.

Concurrent patterns là những kiến trúc đã được chứng minh hiệu quả qua hàng thập kỷ — từ Unix pipes (1973) đến Erlang actor model (1986) đến Go channels (2009). Python implement chúng qua asyncio.Queue, concurrent.futures, và multiprocessing. Pattern đúng biến hệ thống phức tạp thành tổ hợp các thành phần đơn giản, testable, có thể scale độc lập.

Bài này cover 4 patterns cốt lõi: Producer-Consumer (backbone của mọi message queue), Thread/Worker Pool (resource management), Actor Model (isolated state), và Fan-out/Fan-in (parallel aggregation). Mỗi pattern kèm production code, không phải toy example.

Bức tranh tư duy

Hãy tưởng tượng một nhà máy sản xuất bánh mì ở Đà Nẵng:

Producer-Consumer: Lò nướng (producer) cho bánh ra băng chuyền (queue). Thợ đóng gói (consumers) lấy bánh từ băng chuyền, mỗi người lấy cái nào đến trước. Băng chuyền có giới hạn — nếu đầy, lò phải chờ (backpressure).

Worker Pool: Thay vì mỗi đơn hàng thuê một thợ mới, nhà máy giữ 10 thợ cố định. Đơn hàng xếp hàng, thợ nào rảnh nhận đơn tiếp. Giới hạn số thợ = giới hạn tài nguyên.

Fan-out/Fan-in: Một đơn hàng lớn (1000 bánh) được chia cho 10 thợ (fan-out), mỗi người làm 100 cái. Khi tất cả xong, gom lại thành một lô (fan-in) để giao hàng.

Actor Model: Mỗi thợ có bàn làm việc riêng (private state), nhận chỉ thị qua giấy nhắn (messages), không ai được sờ vào bàn người khác. Không cần khóa, không race condition.

Cốt lõi kỹ thuật

Producer-Consumer Pattern

Pattern cốt lõi nhất — tách biệt việc tạo work khỏi việc xử lý work bằng queue trung gian.

python
import asyncio
from dataclasses import dataclass
from typing import Any

@dataclass
class WorkItem:
    id: int
    payload: Any

async def producer(
    queue: asyncio.Queue[WorkItem | None],
    num_items: int,
    producer_id: int,
):
    """Tạo work items và đẩy vào queue."""
    for i in range(num_items):
        item = WorkItem(id=i, payload=f"data-{producer_id}-{i}")
        await queue.put(item)  # Block nếu queue đầy (backpressure)
    print(f"Producer {producer_id}: hoàn thành {num_items} items")

async def consumer(
    queue: asyncio.Queue[WorkItem | None],
    consumer_id: int,
) -> int:
    """Lấy và xử lý work items từ queue."""
    processed = 0
    while True:
        item = await queue.get()
        if item is None:
            break
        # Xử lý item
        await asyncio.sleep(0.01)  # Giả lập processing
        processed += 1
        queue.task_done()
    return processed

async def run_pipeline(
    num_producers: int = 2,
    num_consumers: int = 5,
    items_per_producer: int = 100,
):
    queue: asyncio.Queue[WorkItem | None] = asyncio.Queue(maxsize=50)

    async with asyncio.TaskGroup() as tg:
        # Start producers
        for i in range(num_producers):
            tg.create_task(producer(queue, items_per_producer, i))

        # Start consumers
        consumer_tasks = [
            tg.create_task(consumer(queue, i))
            for i in range(num_consumers)
        ]

        # Đợi producers xong, rồi gửi sentinels
        async def send_sentinels():
            # Chờ đủ thời gian cho producers hoàn thành
            await asyncio.sleep(items_per_producer * 0.02 + 0.5)
            for _ in range(num_consumers):
                await queue.put(None)

        tg.create_task(send_sentinels())

    total = sum(t.result() for t in consumer_tasks)
    print(f"Tổng: {total} items processed bởi {num_consumers} consumers")

asyncio.run(run_pipeline())

Worker Pool Pattern

Giới hạn số lượng concurrent workers — kiểm soát tài nguyên, tránh overload.

python
import asyncio
from typing import Callable, Any
from dataclasses import dataclass, field
from enum import Enum

class JobStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    DONE = "done"
    FAILED = "failed"

@dataclass
class Job:
    id: str
    func: Callable
    args: tuple = ()
    status: JobStatus = JobStatus.PENDING
    result: Any = None
    error: str | None = None

class WorkerPool:
    """Async worker pool với job tracking."""

    def __init__(self, num_workers: int, queue_size: int = 100):
        self._num_workers = num_workers
        self._queue: asyncio.Queue[Job | None] = asyncio.Queue(maxsize=queue_size)
        self._jobs: dict[str, Job] = {}
        self._running = False

    async def _worker(self, worker_id: int):
        while self._running:
            try:
                job = await asyncio.wait_for(self._queue.get(), timeout=1.0)
            except asyncio.TimeoutError:
                continue

            if job is None:
                break

            job.status = JobStatus.RUNNING
            try:
                job.result = await job.func(*job.args)
                job.status = JobStatus.DONE
            except Exception as e:
                job.error = str(e)
                job.status = JobStatus.FAILED
            finally:
                self._queue.task_done()

    async def start(self):
        self._running = True
        self._workers = [
            asyncio.create_task(self._worker(i))
            for i in range(self._num_workers)
        ]

    async def submit(self, job_id: str, func: Callable, *args) -> Job:
        job = Job(id=job_id, func=func, args=args)
        self._jobs[job_id] = job
        await self._queue.put(job)
        return job

    async def shutdown(self, wait: bool = True):
        if wait:
            await self._queue.join()
        self._running = False
        for _ in range(self._num_workers):
            await self._queue.put(None)
        await asyncio.gather(*self._workers, return_exceptions=True)

    def get_job(self, job_id: str) -> Job | None:
        return self._jobs.get(job_id)

# Sử dụng
async def process_order(order_id: int) -> dict:
    await asyncio.sleep(0.1)
    return {"order_id": order_id, "status": "processed"}

async def main():
    pool = WorkerPool(num_workers=5, queue_size=50)
    await pool.start()

    for i in range(20):
        await pool.submit(f"order-{i}", process_order, i)

    await pool.shutdown(wait=True)

    for i in range(20):
        job = pool.get_job(f"order-{i}")
        assert job is not None and job.status == JobStatus.DONE

    print("20 orders processed successfully")

asyncio.run(main())

Actor Model Pattern

Mỗi actor có state riêng, nhận messages qua mailbox (queue), xử lý tuần tự — không cần lock.

python
import asyncio
from dataclasses import dataclass
from typing import Any

@dataclass
class Message:
    type: str
    payload: Any
    reply_to: asyncio.Queue | None = None

class Actor:
    """Base actor class — isolated state, message-driven."""

    def __init__(self, name: str):
        self.name = name
        self._mailbox: asyncio.Queue[Message | None] = asyncio.Queue()
        self._running = False

    async def start(self):
        self._running = True
        self._task = asyncio.create_task(self._run())

    async def _run(self):
        while self._running:
            try:
                msg = await asyncio.wait_for(self._mailbox.get(), timeout=1.0)
            except asyncio.TimeoutError:
                continue

            if msg is None:
                break

            result = await self.handle(msg)
            if msg.reply_to is not None:
                await msg.reply_to.put(result)

    async def handle(self, msg: Message) -> Any:
        """Override trong subclass."""
        raise NotImplementedError

    async def send(self, msg: Message):
        await self._mailbox.put(msg)

    async def ask(self, msg: Message, timeout: float = 5.0) -> Any:
        """Gửi message và đợi reply."""
        reply_queue: asyncio.Queue[Any] = asyncio.Queue()
        msg.reply_to = reply_queue
        await self._mailbox.put(msg)
        return await asyncio.wait_for(reply_queue.get(), timeout=timeout)

    async def stop(self):
        self._running = False
        await self._mailbox.put(None)
        await self._task

class BankAccountActor(Actor):
    """Actor quản lý tài khoản — không cần Lock."""

    def __init__(self, account_id: str, initial_balance: float = 0):
        super().__init__(f"account-{account_id}")
        self._balance = initial_balance

    async def handle(self, msg: Message) -> Any:
        if msg.type == "deposit":
            self._balance += msg.payload
            return {"balance": self._balance}

        elif msg.type == "withdraw":
            if self._balance >= msg.payload:
                self._balance -= msg.payload
                return {"balance": self._balance, "success": True}
            return {"balance": self._balance, "success": False}

        elif msg.type == "get_balance":
            return {"balance": self._balance}

async def main():
    account = BankAccountActor("001", initial_balance=1000)
    await account.start()

    # Deposit
    result = await account.ask(Message("deposit", 500))
    print(f"Deposit: {result}")  # balance: 1500

    # Concurrent withdrawals — xử lý tuần tự qua mailbox
    tasks = [
        account.ask(Message("withdraw", 300))
        for _ in range(5)
    ]
    results = await asyncio.gather(*tasks)
    for r in results:
        print(f"Withdraw: {r}")

    balance = await account.ask(Message("get_balance", None))
    print(f"Final: {balance}")

    await account.stop()

asyncio.run(main())

Fan-out/Fan-in Pattern

Chia task lớn thành nhiều subtasks (fan-out), xử lý song song, gom kết quả (fan-in).

python
import asyncio
import time
from typing import TypeVar, Callable

T = TypeVar("T")
R = TypeVar("R")

async def fan_out_fan_in(
    items: list[T],
    processor: Callable[[T], R],
    max_concurrent: int = 10,
    aggregator: Callable[[list[R]], Any] | None = None,
) -> Any:
    """Generic fan-out/fan-in pattern."""
    sem = asyncio.Semaphore(max_concurrent)

    async def bounded_process(item: T) -> R:
        async with sem:
            return await processor(item)

    # Fan-out: chạy song song
    results = await asyncio.gather(*[bounded_process(item) for item in items])

    # Fan-in: aggregate kết quả
    if aggregator:
        return aggregator(list(results))
    return list(results)

# Ứng dụng: Map-Reduce style
async def fetch_page_word_count(url: str) -> dict:
    """Fan-out: đếm từ trên mỗi page."""
    await asyncio.sleep(0.1)  # Giả lập fetch
    word_count = len(url) * 10  # Giả lập
    return {"url": url, "words": word_count}

def aggregate_counts(results: list[dict]) -> dict:
    """Fan-in: gom kết quả."""
    total = sum(r["words"] for r in results)
    return {"total_pages": len(results), "total_words": total}

async def main():
    urls = [f"https://example.com/page/{i}" for i in range(100)]

    start = time.perf_counter()
    summary = await fan_out_fan_in(
        items=urls,
        processor=fetch_page_word_count,
        max_concurrent=20,
        aggregator=aggregate_counts,
    )
    elapsed = time.perf_counter() - start

    print(f"Summary: {summary}")
    print(f"Time: {elapsed:.2f}s")

asyncio.run(main())

Pipeline Pattern (Multi-stage)

Nối nhiều stage xử lý, mỗi stage là producer-consumer — dữ liệu chảy qua pipeline.

python
import asyncio
from typing import Any

async def pipeline_stage(
    name: str,
    input_queue: asyncio.Queue,
    output_queue: asyncio.Queue | None,
    processor: Any,
    num_workers: int = 1,
):
    """Một stage trong pipeline."""
    async def worker():
        while True:
            item = await input_queue.get()
            if item is None:
                if output_queue:
                    await output_queue.put(None)
                break
            result = await processor(item)
            if output_queue and result is not None:
                await output_queue.put(result)
            input_queue.task_done()

    workers = [asyncio.create_task(worker()) for _ in range(num_workers)]
    await asyncio.gather(*workers)

async def run_etl_pipeline():
    """ETL pipeline: Extract → Transform → Load."""
    extract_q: asyncio.Queue = asyncio.Queue(maxsize=100)
    transform_q: asyncio.Queue = asyncio.Queue(maxsize=100)
    load_q: asyncio.Queue = asyncio.Queue(maxsize=100)

    async def extract(raw): return {"raw": raw, "extracted": True}
    async def transform(data): data["transformed"] = True; return data
    async def load(data): pass
    async def feeder():
        for i in range(50):
            await extract_q.put(f"record-{i}")
        await extract_q.put(None)

    async with asyncio.TaskGroup() as tg:
        tg.create_task(feeder())
        tg.create_task(pipeline_stage("extract", extract_q, transform_q, extract, 2))
        tg.create_task(pipeline_stage("transform", transform_q, load_q, transform, 3))
        tg.create_task(pipeline_stage("load", load_q, None, load, 2))

asyncio.run(run_etl_pipeline())

Thực chiến

Tình huống: Task Queue cho Background Processing

Bối cảnh: Web app cần xử lý background tasks: gửi email, resize ảnh, generate report. Mỗi task có priority và retry logic. Không dùng Celery — in-process solution.

Mục tiêu: Task queue với priority, retry, status tracking, graceful shutdown.

python
import asyncio
import time
from dataclasses import dataclass, field
from enum import IntEnum
from typing import Callable, Any

class Priority(IntEnum):
    HIGH = 0
    MEDIUM = 1
    LOW = 2

@dataclass(order=True)
class PriorityTask:
    priority: Priority
    task_id: str = field(compare=False)
    func: Callable = field(compare=False)
    args: tuple = field(compare=False, default=())
    max_retries: int = field(compare=False, default=3)
    attempt: int = field(compare=False, default=0)

class TaskQueue:
    def __init__(self, num_workers: int = 3):
        self._queue: asyncio.PriorityQueue = asyncio.PriorityQueue()
        self._num_workers = num_workers
        self._results: dict[str, Any] = {}
        self._running = False

    async def start(self):
        self._running = True
        self._workers = [
            asyncio.create_task(self._worker(i))
            for i in range(self._num_workers)
        ]

    async def _worker(self, worker_id: int):
        while self._running:
            try:
                task = await asyncio.wait_for(self._queue.get(), timeout=1.0)
            except asyncio.TimeoutError:
                continue

            if task is None:
                break

            task.attempt += 1
            try:
                result = await task.func(*task.args)
                self._results[task.task_id] = {"status": "done", "result": result}
            except Exception as e:
                if task.attempt < task.max_retries:
                    # Retry với backoff
                    await asyncio.sleep(2 ** task.attempt)
                    await self._queue.put(task)
                else:
                    self._results[task.task_id] = {
                        "status": "failed",
                        "error": str(e),
                        "attempts": task.attempt,
                    }
            finally:
                self._queue.task_done()

    async def enqueue(
        self, task_id: str, func: Callable, *args,
        priority: Priority = Priority.MEDIUM,
        max_retries: int = 3,
    ):
        task = PriorityTask(priority, task_id, func, args, max_retries)
        await self._queue.put(task)

    async def shutdown(self):
        await self._queue.join()
        self._running = False
        for _ in range(self._num_workers):
            await self._queue.put(None)
        await asyncio.gather(*self._workers, return_exceptions=True)

    def get_result(self, task_id: str) -> dict | None:
        return self._results.get(task_id)

async def send_email(to: str) -> str:
    await asyncio.sleep(0.1)
    return f"Email sent to {to}"

async def resize_image(path: str) -> str:
    await asyncio.sleep(0.2)
    return f"Resized {path}"

async def main():
    tq = TaskQueue(num_workers=3)
    await tq.start()

    await tq.enqueue("email-1", send_email, "user@example.com", priority=Priority.HIGH)
    await tq.enqueue("img-1", resize_image, "/img/photo.jpg", priority=Priority.LOW)
    await tq.enqueue("email-2", send_email, "admin@example.com", priority=Priority.HIGH)

    await tq.shutdown()

    for tid in ["email-1", "img-1", "email-2"]:
        print(f"{tid}: {tq.get_result(tid)}")

asyncio.run(main())

Phân tích:

  • PriorityQueue đảm bảo HIGH tasks xử lý trước LOW
  • Retry với exponential backoff — transient failures recover tự động
  • task_done() cho phép join() biết khi nào tất cả tasks xong
  • Graceful shutdown: drain queue trước khi stop workers

Sai lầm điển hình

Sai lầm 1: Unbounded Queue — Memory explosion

Vấn đề: Queue không giới hạn size, producer nhanh hơn consumer.

python
# SAI: Queue vô hạn — producer nhanh, consumer chậm → OOM
queue = asyncio.Queue()  # Không giới hạn!

async def fast_producer():
    while True:
        await queue.put(generate_data())  # Tích tụ vô hạn!

Tại sao sai: Producer tạo 10,000 items/s, consumer xử lý 100 items/s → queue tăng 9,900 items/s → OOM trong vài phút.

python
# ĐÚNG: Bounded queue — backpressure tự nhiên
queue = asyncio.Queue(maxsize=1000)

async def controlled_producer():
    while True:
        await queue.put(generate_data())  # Block khi queue đầy
        # Producer tự động chậm lại = backpressure

Sai lầm 2: Worker crash im lặng

Vấn đề: Worker exception không được handle — worker chết, queue tích tụ.

python
# SAI: Exception giết worker — không ai xử lý queue nữa
async def fragile_worker(queue):
    while True:
        item = await queue.get()
        process(item)  # Exception → worker chết!
        queue.task_done()

Tại sao sai: Một item lỗi giết worker. Items còn lại trong queue không bao giờ được xử lý. queue.join() block vĩnh viễn.

python
# ĐÚNG: Try-except bao bọc processing, log error, tiếp tục
import logging

logger = logging.getLogger(__name__)

async def resilient_worker(queue):
    while True:
        item = await queue.get()
        try:
            await process(item)
        except Exception:
            logger.exception(f"Worker error processing {item}")
        finally:
            queue.task_done()  # LUÔN gọi task_done()

Sai lầm 3: Fan-out không giới hạn

Vấn đề: Fan-out 10,000 tasks cùng lúc — exhaust file descriptors, crash server.

python
# SAI: 10,000 concurrent connections
async def bad_fan_out(urls):
    tasks = [fetch(url) for url in urls]  # 10,000 tasks!
    return await asyncio.gather(*tasks)
python
# ĐÚNG: Semaphore giới hạn fan-out
async def good_fan_out(urls, max_concurrent=50):
    sem = asyncio.Semaphore(max_concurrent)
    async def limited(url):
        async with sem:
            return await fetch(url)
    return await asyncio.gather(*[limited(url) for url in urls])

Sai lầm 4: Resource leak — quên cleanup

Vấn đề: Session, connection không được đóng khi shutdown.

python
# SAI: Session không bao giờ close
async def leaky_worker():
    session = aiohttp.ClientSession()
    while True:
        item = await queue.get()
        await session.get(item["url"])
    # session.close() không bao giờ được gọi!
python
# ĐÚNG: Context manager đảm bảo cleanup
async def clean_worker():
    async with aiohttp.ClientSession() as session:
        while True:
            item = await queue.get()
            if item is None:
                break
            await session.get(item["url"])
    # session tự động close

Under the Hood

Khi nào dùng pattern nào

PatternUse CaseThroughputComplexityVí dụ thực tế
Producer-ConsumerDecouple generation từ processingCaoThấpMessage queue, log processing
Worker PoolGiới hạn tài nguyên, batch processingTrung bình-CaoTrung bìnhAPI calls, image processing
Actor ModelIsolated state, concurrent accessTrung bìnhCaoBank accounts, game entities
Fan-out/Fan-inParallel aggregationCaoThấpMap-reduce, scatter-gather
PipelineMulti-stage processingCaoTrung bìnhETL, data enrichment

Queue Performance

Queue TypeThread-safeProcess-safeBoundedUse Case
asyncio.QueueN/A (single-thread)KhôngCó (maxsize)Async tasks
queue.QueueKhôngCó (maxsize)Threading
multiprocessing.QueueCross-process
asyncio.PriorityQueueN/AKhôngPriority scheduling

Backpressure Strategies

Khi producer nhanh hơn consumer, có 3 strategies:

  1. Block producer (bounded queue) — đơn giản nhất, đúng nhất cho hầu hết cases
  2. Drop messages — acceptable cho metrics, logging (lossy)
  3. Buffer to disk — khi data quan trọng nhưng memory hạn chế

Trade-offs

Producer-Consumer tốt khi: Cần decouple, rate khác nhau giữa stages, cần buffer/backpressure. Yếu khi: Latency-sensitive (queue thêm latency), debugging khó (messages "biến mất" trong queue).

Actor Model tốt khi: Concurrent access vào shared state, cần isolation. Yếu khi: Communication overhead (message passing chậm hơn direct call), reasoning about message ordering phức tạp.

Checklist ghi nhớ

✅ Checklist triển khai

Producer-Consumer

  • [ ] Luôn dùng bounded queue (maxsize) — tránh OOM
  • [ ] Sentinel value (None) để signal completion
  • [ ] task_done() sau mỗi item — cho join() hoạt động đúng
  • [ ] Handle consumer exceptions — không để worker chết im lặng

Worker Pool

  • [ ] Giới hạn workers hợp lý — không quá số cores (CPU) hoặc FD limit (I/O)
  • [ ] Implement graceful shutdown — drain queue trước khi stop
  • [ ] Track job status — cho phép query kết quả

Fan-out/Fan-in

  • [ ] Semaphore giới hạn concurrent fan-out
  • [ ] Handle partial failures — một task fail không nên block tất cả
  • [ ] Aggregator function cho fan-in — rõ ràng cách gom kết quả

Pipeline

  • [ ] Mỗi stage là unit độc lập — testable riêng
  • [ ] Bounded queues giữa stages — backpressure cascade
  • [ ] Monitor queue sizes — phát hiện bottleneck stage
  • [ ] Error handling per-stage — bad record không block pipeline

Bài tập luyện tập

Bài 1: Rate-limited API Client — Advanced

Đề bài: Implement RateLimitedClient dùng token bucket algorithm. Giới hạn 10 requests/giây, burst tối đa 20. Test với 100 requests.

🧠 Quiz

Câu hỏi: Trong producer-consumer pattern, tại sao nên dùng bounded queue thay vì unbounded?

  • [ ] A. Bounded queue nhanh hơn
  • [ ] B. Unbounded queue không thread-safe
  • [x] C. Bounded queue tạo backpressure tự nhiên — producer chậm lại khi consumer không kịp xử lý
  • [ ] D. Python không hỗ trợ unbounded queue Giải thích: Bounded queue block producer khi đầy (maxsize), tạo backpressure — cơ chế tự nhiên cân bằng tốc độ producer-consumer. Không có backpressure, producer nhanh sẽ fill memory. A sai vì performance gần như giống nhau. B sai vì cả hai đều thread-safe. D sai vì asyncio.Queue() mặc định unbounded.
💡 Gợi ý
  • Dùng token bucket: mỗi giây thêm rate tokens, tối đa capacity
  • asyncio.Lock bảo vệ token state
  • Khi hết tokens → asyncio.sleep đủ thời gian
✅ Lời giải
python
import asyncio
import time

class TokenBucketRateLimiter:
    def __init__(self, rate: float, capacity: float):
        self._rate = rate
        self._capacity = capacity
        self._tokens = capacity
        self._last_refill = time.monotonic()
        self._lock = asyncio.Lock()

    async def acquire(self):
        async with self._lock:
            now = time.monotonic()
            elapsed = now - self._last_refill
            self._tokens = min(self._capacity, self._tokens + elapsed * self._rate)
            self._last_refill = now

            if self._tokens >= 1.0:
                self._tokens -= 1.0
                return

            wait = (1.0 - self._tokens) / self._rate
            await asyncio.sleep(wait)
            self._tokens = 0

async def api_call(limiter: TokenBucketRateLimiter, request_id: int) -> dict:
    await limiter.acquire()
    await asyncio.sleep(0.01)  # Giả lập API latency
    return {"id": request_id, "ts": time.monotonic()}

async def main():
    limiter = TokenBucketRateLimiter(rate=10.0, capacity=20.0)

    start = time.perf_counter()
    tasks = [api_call(limiter, i) for i in range(100)]
    results = await asyncio.gather(*tasks)
    elapsed = time.perf_counter() - start

    print(f"100 requests trong {elapsed:.1f}s")
    print(f"Effective rate: {100 / elapsed:.1f} req/s")
    # Kỳ vọng: ~10s (burst 20 ngay, còn 80 ở rate 10/s)

asyncio.run(main())

Phân tích: Burst 20 requests đầu tiên ngay lập tức (capacity=20). 80 requests còn lại ở rate 10/s = 8s. Tổng ~8-9s.

Bài 2: Multi-stage Pipeline — Advanced

Đề bài: Xây ETL pipeline 3 stages: Extract (đọc raw data), Transform (parse + validate), Load (ghi output). Mỗi stage có multiple workers. Dùng asyncio.Queue giữa các stages.

💡 Gợi ý
  • 3 bounded queues nối 3 stages
  • Sentinel propagation: khi extract xong → gửi None vào transform queue
  • Mỗi stage: while True, get(), process(), put()
✅ Lời giải
python
import asyncio

async def extract_worker(raw_q, transform_q, worker_id):
    while True:
        item = await raw_q.get()
        if item is None:
            await transform_q.put(None)
            break
        # Extract: parse raw string
        await asyncio.sleep(0.005)
        await transform_q.put({"raw": item, "extracted": True})
        raw_q.task_done()

async def transform_worker(transform_q, load_q, worker_id):
    while True:
        item = await transform_q.get()
        if item is None:
            await load_q.put(None)
            break
        # Transform: validate + enrich
        await asyncio.sleep(0.01)
        item["transformed"] = True
        await load_q.put(item)
        transform_q.task_done()

async def load_worker(load_q, results, worker_id):
    while True:
        item = await load_q.get()
        if item is None:
            break
        # Load: write to storage
        await asyncio.sleep(0.005)
        results.append(item)
        load_q.task_done()

async def main():
    raw_q = asyncio.Queue(maxsize=50)
    transform_q = asyncio.Queue(maxsize=50)
    load_q = asyncio.Queue(maxsize=50)
    results = []

    # Feed raw data
    for i in range(100):
        await raw_q.put(f"record-{i}")
    await raw_q.put(None)

    async with asyncio.TaskGroup() as tg:
        tg.create_task(extract_worker(raw_q, transform_q, 0))
        tg.create_task(transform_worker(transform_q, load_q, 0))
        tg.create_task(load_worker(load_q, results, 0))

    print(f"Pipeline: {len(results)} records processed")
    assert all(r["extracted"] and r["transformed"] for r in results)

asyncio.run(main())

Phân tích: Data chảy qua 3 stages tuần tự. Bounded queues tạo backpressure — nếu transform chậm, extract tự động chờ. Mở rộng: tăng workers per stage cho stage chậm nhất.

Liên kết học tiếp

Từ khóa glossary: producer-consumer, worker pool, actor model, fan-out/fan-in, pipeline, backpressure, rate limiting, task queue

Tìm kiếm liên quan: python concurrent patterns, producer consumer asyncio, actor model python, fan out fan in