Giao diện
Concurrent Patterns — Kiến trúc đồng thời Production
Bạn đã biết asyncio.gather() và Pool.map() — nhưng khi xây dựng hệ thống thật, bạn cần nhiều hơn primitives. Một data ingestion pipeline cần producer đọc từ Kafka, consumers xử lý song song, rate limiter bảo vệ downstream services, và dead letter queue cho failures. Đây là lúc patterns trở nên quan trọng hơn tools.
Concurrent patterns là những kiến trúc đã được chứng minh hiệu quả qua hàng thập kỷ — từ Unix pipes (1973) đến Erlang actor model (1986) đến Go channels (2009). Python implement chúng qua asyncio.Queue, concurrent.futures, và multiprocessing. Pattern đúng biến hệ thống phức tạp thành tổ hợp các thành phần đơn giản, testable, có thể scale độc lập.
Bài này cover 4 patterns cốt lõi: Producer-Consumer (backbone của mọi message queue), Thread/Worker Pool (resource management), Actor Model (isolated state), và Fan-out/Fan-in (parallel aggregation). Mỗi pattern kèm production code, không phải toy example.
Bức tranh tư duy
Hãy tưởng tượng một nhà máy sản xuất bánh mì ở Đà Nẵng:
Producer-Consumer: Lò nướng (producer) cho bánh ra băng chuyền (queue). Thợ đóng gói (consumers) lấy bánh từ băng chuyền, mỗi người lấy cái nào đến trước. Băng chuyền có giới hạn — nếu đầy, lò phải chờ (backpressure).
Worker Pool: Thay vì mỗi đơn hàng thuê một thợ mới, nhà máy giữ 10 thợ cố định. Đơn hàng xếp hàng, thợ nào rảnh nhận đơn tiếp. Giới hạn số thợ = giới hạn tài nguyên.
Fan-out/Fan-in: Một đơn hàng lớn (1000 bánh) được chia cho 10 thợ (fan-out), mỗi người làm 100 cái. Khi tất cả xong, gom lại thành một lô (fan-in) để giao hàng.
Actor Model: Mỗi thợ có bàn làm việc riêng (private state), nhận chỉ thị qua giấy nhắn (messages), không ai được sờ vào bàn người khác. Không cần khóa, không race condition.
Cốt lõi kỹ thuật
Producer-Consumer Pattern
Pattern cốt lõi nhất — tách biệt việc tạo work khỏi việc xử lý work bằng queue trung gian.
python
import asyncio
from dataclasses import dataclass
from typing import Any
@dataclass
class WorkItem:
id: int
payload: Any
async def producer(
queue: asyncio.Queue[WorkItem | None],
num_items: int,
producer_id: int,
):
"""Tạo work items và đẩy vào queue."""
for i in range(num_items):
item = WorkItem(id=i, payload=f"data-{producer_id}-{i}")
await queue.put(item) # Block nếu queue đầy (backpressure)
print(f"Producer {producer_id}: hoàn thành {num_items} items")
async def consumer(
queue: asyncio.Queue[WorkItem | None],
consumer_id: int,
) -> int:
"""Lấy và xử lý work items từ queue."""
processed = 0
while True:
item = await queue.get()
if item is None:
break
# Xử lý item
await asyncio.sleep(0.01) # Giả lập processing
processed += 1
queue.task_done()
return processed
async def run_pipeline(
num_producers: int = 2,
num_consumers: int = 5,
items_per_producer: int = 100,
):
queue: asyncio.Queue[WorkItem | None] = asyncio.Queue(maxsize=50)
async with asyncio.TaskGroup() as tg:
# Start producers
for i in range(num_producers):
tg.create_task(producer(queue, items_per_producer, i))
# Start consumers
consumer_tasks = [
tg.create_task(consumer(queue, i))
for i in range(num_consumers)
]
# Đợi producers xong, rồi gửi sentinels
async def send_sentinels():
# Chờ đủ thời gian cho producers hoàn thành
await asyncio.sleep(items_per_producer * 0.02 + 0.5)
for _ in range(num_consumers):
await queue.put(None)
tg.create_task(send_sentinels())
total = sum(t.result() for t in consumer_tasks)
print(f"Tổng: {total} items processed bởi {num_consumers} consumers")
asyncio.run(run_pipeline())Worker Pool Pattern
Giới hạn số lượng concurrent workers — kiểm soát tài nguyên, tránh overload.
python
import asyncio
from typing import Callable, Any
from dataclasses import dataclass, field
from enum import Enum
class JobStatus(Enum):
PENDING = "pending"
RUNNING = "running"
DONE = "done"
FAILED = "failed"
@dataclass
class Job:
id: str
func: Callable
args: tuple = ()
status: JobStatus = JobStatus.PENDING
result: Any = None
error: str | None = None
class WorkerPool:
"""Async worker pool với job tracking."""
def __init__(self, num_workers: int, queue_size: int = 100):
self._num_workers = num_workers
self._queue: asyncio.Queue[Job | None] = asyncio.Queue(maxsize=queue_size)
self._jobs: dict[str, Job] = {}
self._running = False
async def _worker(self, worker_id: int):
while self._running:
try:
job = await asyncio.wait_for(self._queue.get(), timeout=1.0)
except asyncio.TimeoutError:
continue
if job is None:
break
job.status = JobStatus.RUNNING
try:
job.result = await job.func(*job.args)
job.status = JobStatus.DONE
except Exception as e:
job.error = str(e)
job.status = JobStatus.FAILED
finally:
self._queue.task_done()
async def start(self):
self._running = True
self._workers = [
asyncio.create_task(self._worker(i))
for i in range(self._num_workers)
]
async def submit(self, job_id: str, func: Callable, *args) -> Job:
job = Job(id=job_id, func=func, args=args)
self._jobs[job_id] = job
await self._queue.put(job)
return job
async def shutdown(self, wait: bool = True):
if wait:
await self._queue.join()
self._running = False
for _ in range(self._num_workers):
await self._queue.put(None)
await asyncio.gather(*self._workers, return_exceptions=True)
def get_job(self, job_id: str) -> Job | None:
return self._jobs.get(job_id)
# Sử dụng
async def process_order(order_id: int) -> dict:
await asyncio.sleep(0.1)
return {"order_id": order_id, "status": "processed"}
async def main():
pool = WorkerPool(num_workers=5, queue_size=50)
await pool.start()
for i in range(20):
await pool.submit(f"order-{i}", process_order, i)
await pool.shutdown(wait=True)
for i in range(20):
job = pool.get_job(f"order-{i}")
assert job is not None and job.status == JobStatus.DONE
print("20 orders processed successfully")
asyncio.run(main())Actor Model Pattern
Mỗi actor có state riêng, nhận messages qua mailbox (queue), xử lý tuần tự — không cần lock.
python
import asyncio
from dataclasses import dataclass
from typing import Any
@dataclass
class Message:
type: str
payload: Any
reply_to: asyncio.Queue | None = None
class Actor:
"""Base actor class — isolated state, message-driven."""
def __init__(self, name: str):
self.name = name
self._mailbox: asyncio.Queue[Message | None] = asyncio.Queue()
self._running = False
async def start(self):
self._running = True
self._task = asyncio.create_task(self._run())
async def _run(self):
while self._running:
try:
msg = await asyncio.wait_for(self._mailbox.get(), timeout=1.0)
except asyncio.TimeoutError:
continue
if msg is None:
break
result = await self.handle(msg)
if msg.reply_to is not None:
await msg.reply_to.put(result)
async def handle(self, msg: Message) -> Any:
"""Override trong subclass."""
raise NotImplementedError
async def send(self, msg: Message):
await self._mailbox.put(msg)
async def ask(self, msg: Message, timeout: float = 5.0) -> Any:
"""Gửi message và đợi reply."""
reply_queue: asyncio.Queue[Any] = asyncio.Queue()
msg.reply_to = reply_queue
await self._mailbox.put(msg)
return await asyncio.wait_for(reply_queue.get(), timeout=timeout)
async def stop(self):
self._running = False
await self._mailbox.put(None)
await self._task
class BankAccountActor(Actor):
"""Actor quản lý tài khoản — không cần Lock."""
def __init__(self, account_id: str, initial_balance: float = 0):
super().__init__(f"account-{account_id}")
self._balance = initial_balance
async def handle(self, msg: Message) -> Any:
if msg.type == "deposit":
self._balance += msg.payload
return {"balance": self._balance}
elif msg.type == "withdraw":
if self._balance >= msg.payload:
self._balance -= msg.payload
return {"balance": self._balance, "success": True}
return {"balance": self._balance, "success": False}
elif msg.type == "get_balance":
return {"balance": self._balance}
async def main():
account = BankAccountActor("001", initial_balance=1000)
await account.start()
# Deposit
result = await account.ask(Message("deposit", 500))
print(f"Deposit: {result}") # balance: 1500
# Concurrent withdrawals — xử lý tuần tự qua mailbox
tasks = [
account.ask(Message("withdraw", 300))
for _ in range(5)
]
results = await asyncio.gather(*tasks)
for r in results:
print(f"Withdraw: {r}")
balance = await account.ask(Message("get_balance", None))
print(f"Final: {balance}")
await account.stop()
asyncio.run(main())Fan-out/Fan-in Pattern
Chia task lớn thành nhiều subtasks (fan-out), xử lý song song, gom kết quả (fan-in).
python
import asyncio
import time
from typing import TypeVar, Callable
T = TypeVar("T")
R = TypeVar("R")
async def fan_out_fan_in(
items: list[T],
processor: Callable[[T], R],
max_concurrent: int = 10,
aggregator: Callable[[list[R]], Any] | None = None,
) -> Any:
"""Generic fan-out/fan-in pattern."""
sem = asyncio.Semaphore(max_concurrent)
async def bounded_process(item: T) -> R:
async with sem:
return await processor(item)
# Fan-out: chạy song song
results = await asyncio.gather(*[bounded_process(item) for item in items])
# Fan-in: aggregate kết quả
if aggregator:
return aggregator(list(results))
return list(results)
# Ứng dụng: Map-Reduce style
async def fetch_page_word_count(url: str) -> dict:
"""Fan-out: đếm từ trên mỗi page."""
await asyncio.sleep(0.1) # Giả lập fetch
word_count = len(url) * 10 # Giả lập
return {"url": url, "words": word_count}
def aggregate_counts(results: list[dict]) -> dict:
"""Fan-in: gom kết quả."""
total = sum(r["words"] for r in results)
return {"total_pages": len(results), "total_words": total}
async def main():
urls = [f"https://example.com/page/{i}" for i in range(100)]
start = time.perf_counter()
summary = await fan_out_fan_in(
items=urls,
processor=fetch_page_word_count,
max_concurrent=20,
aggregator=aggregate_counts,
)
elapsed = time.perf_counter() - start
print(f"Summary: {summary}")
print(f"Time: {elapsed:.2f}s")
asyncio.run(main())Pipeline Pattern (Multi-stage)
Nối nhiều stage xử lý, mỗi stage là producer-consumer — dữ liệu chảy qua pipeline.
python
import asyncio
from typing import Any
async def pipeline_stage(
name: str,
input_queue: asyncio.Queue,
output_queue: asyncio.Queue | None,
processor: Any,
num_workers: int = 1,
):
"""Một stage trong pipeline."""
async def worker():
while True:
item = await input_queue.get()
if item is None:
if output_queue:
await output_queue.put(None)
break
result = await processor(item)
if output_queue and result is not None:
await output_queue.put(result)
input_queue.task_done()
workers = [asyncio.create_task(worker()) for _ in range(num_workers)]
await asyncio.gather(*workers)
async def run_etl_pipeline():
"""ETL pipeline: Extract → Transform → Load."""
extract_q: asyncio.Queue = asyncio.Queue(maxsize=100)
transform_q: asyncio.Queue = asyncio.Queue(maxsize=100)
load_q: asyncio.Queue = asyncio.Queue(maxsize=100)
async def extract(raw): return {"raw": raw, "extracted": True}
async def transform(data): data["transformed"] = True; return data
async def load(data): pass
async def feeder():
for i in range(50):
await extract_q.put(f"record-{i}")
await extract_q.put(None)
async with asyncio.TaskGroup() as tg:
tg.create_task(feeder())
tg.create_task(pipeline_stage("extract", extract_q, transform_q, extract, 2))
tg.create_task(pipeline_stage("transform", transform_q, load_q, transform, 3))
tg.create_task(pipeline_stage("load", load_q, None, load, 2))
asyncio.run(run_etl_pipeline())Thực chiến
Tình huống: Task Queue cho Background Processing
Bối cảnh: Web app cần xử lý background tasks: gửi email, resize ảnh, generate report. Mỗi task có priority và retry logic. Không dùng Celery — in-process solution.
Mục tiêu: Task queue với priority, retry, status tracking, graceful shutdown.
python
import asyncio
import time
from dataclasses import dataclass, field
from enum import IntEnum
from typing import Callable, Any
class Priority(IntEnum):
HIGH = 0
MEDIUM = 1
LOW = 2
@dataclass(order=True)
class PriorityTask:
priority: Priority
task_id: str = field(compare=False)
func: Callable = field(compare=False)
args: tuple = field(compare=False, default=())
max_retries: int = field(compare=False, default=3)
attempt: int = field(compare=False, default=0)
class TaskQueue:
def __init__(self, num_workers: int = 3):
self._queue: asyncio.PriorityQueue = asyncio.PriorityQueue()
self._num_workers = num_workers
self._results: dict[str, Any] = {}
self._running = False
async def start(self):
self._running = True
self._workers = [
asyncio.create_task(self._worker(i))
for i in range(self._num_workers)
]
async def _worker(self, worker_id: int):
while self._running:
try:
task = await asyncio.wait_for(self._queue.get(), timeout=1.0)
except asyncio.TimeoutError:
continue
if task is None:
break
task.attempt += 1
try:
result = await task.func(*task.args)
self._results[task.task_id] = {"status": "done", "result": result}
except Exception as e:
if task.attempt < task.max_retries:
# Retry với backoff
await asyncio.sleep(2 ** task.attempt)
await self._queue.put(task)
else:
self._results[task.task_id] = {
"status": "failed",
"error": str(e),
"attempts": task.attempt,
}
finally:
self._queue.task_done()
async def enqueue(
self, task_id: str, func: Callable, *args,
priority: Priority = Priority.MEDIUM,
max_retries: int = 3,
):
task = PriorityTask(priority, task_id, func, args, max_retries)
await self._queue.put(task)
async def shutdown(self):
await self._queue.join()
self._running = False
for _ in range(self._num_workers):
await self._queue.put(None)
await asyncio.gather(*self._workers, return_exceptions=True)
def get_result(self, task_id: str) -> dict | None:
return self._results.get(task_id)
async def send_email(to: str) -> str:
await asyncio.sleep(0.1)
return f"Email sent to {to}"
async def resize_image(path: str) -> str:
await asyncio.sleep(0.2)
return f"Resized {path}"
async def main():
tq = TaskQueue(num_workers=3)
await tq.start()
await tq.enqueue("email-1", send_email, "user@example.com", priority=Priority.HIGH)
await tq.enqueue("img-1", resize_image, "/img/photo.jpg", priority=Priority.LOW)
await tq.enqueue("email-2", send_email, "admin@example.com", priority=Priority.HIGH)
await tq.shutdown()
for tid in ["email-1", "img-1", "email-2"]:
print(f"{tid}: {tq.get_result(tid)}")
asyncio.run(main())Phân tích:
PriorityQueueđảm bảo HIGH tasks xử lý trước LOW- Retry với exponential backoff — transient failures recover tự động
task_done()cho phépjoin()biết khi nào tất cả tasks xong- Graceful shutdown: drain queue trước khi stop workers
Sai lầm điển hình
❌ Sai lầm 1: Unbounded Queue — Memory explosion
Vấn đề: Queue không giới hạn size, producer nhanh hơn consumer.
python
# SAI: Queue vô hạn — producer nhanh, consumer chậm → OOM
queue = asyncio.Queue() # Không giới hạn!
async def fast_producer():
while True:
await queue.put(generate_data()) # Tích tụ vô hạn!Tại sao sai: Producer tạo 10,000 items/s, consumer xử lý 100 items/s → queue tăng 9,900 items/s → OOM trong vài phút.
python
# ĐÚNG: Bounded queue — backpressure tự nhiên
queue = asyncio.Queue(maxsize=1000)
async def controlled_producer():
while True:
await queue.put(generate_data()) # Block khi queue đầy
# Producer tự động chậm lại = backpressure❌ Sai lầm 2: Worker crash im lặng
Vấn đề: Worker exception không được handle — worker chết, queue tích tụ.
python
# SAI: Exception giết worker — không ai xử lý queue nữa
async def fragile_worker(queue):
while True:
item = await queue.get()
process(item) # Exception → worker chết!
queue.task_done()Tại sao sai: Một item lỗi giết worker. Items còn lại trong queue không bao giờ được xử lý. queue.join() block vĩnh viễn.
python
# ĐÚNG: Try-except bao bọc processing, log error, tiếp tục
import logging
logger = logging.getLogger(__name__)
async def resilient_worker(queue):
while True:
item = await queue.get()
try:
await process(item)
except Exception:
logger.exception(f"Worker error processing {item}")
finally:
queue.task_done() # LUÔN gọi task_done()❌ Sai lầm 3: Fan-out không giới hạn
Vấn đề: Fan-out 10,000 tasks cùng lúc — exhaust file descriptors, crash server.
python
# SAI: 10,000 concurrent connections
async def bad_fan_out(urls):
tasks = [fetch(url) for url in urls] # 10,000 tasks!
return await asyncio.gather(*tasks)python
# ĐÚNG: Semaphore giới hạn fan-out
async def good_fan_out(urls, max_concurrent=50):
sem = asyncio.Semaphore(max_concurrent)
async def limited(url):
async with sem:
return await fetch(url)
return await asyncio.gather(*[limited(url) for url in urls])❌ Sai lầm 4: Resource leak — quên cleanup
Vấn đề: Session, connection không được đóng khi shutdown.
python
# SAI: Session không bao giờ close
async def leaky_worker():
session = aiohttp.ClientSession()
while True:
item = await queue.get()
await session.get(item["url"])
# session.close() không bao giờ được gọi!python
# ĐÚNG: Context manager đảm bảo cleanup
async def clean_worker():
async with aiohttp.ClientSession() as session:
while True:
item = await queue.get()
if item is None:
break
await session.get(item["url"])
# session tự động closeUnder the Hood
Khi nào dùng pattern nào
| Pattern | Use Case | Throughput | Complexity | Ví dụ thực tế |
|---|---|---|---|---|
| Producer-Consumer | Decouple generation từ processing | Cao | Thấp | Message queue, log processing |
| Worker Pool | Giới hạn tài nguyên, batch processing | Trung bình-Cao | Trung bình | API calls, image processing |
| Actor Model | Isolated state, concurrent access | Trung bình | Cao | Bank accounts, game entities |
| Fan-out/Fan-in | Parallel aggregation | Cao | Thấp | Map-reduce, scatter-gather |
| Pipeline | Multi-stage processing | Cao | Trung bình | ETL, data enrichment |
Queue Performance
| Queue Type | Thread-safe | Process-safe | Bounded | Use Case |
|---|---|---|---|---|
asyncio.Queue | N/A (single-thread) | Không | Có (maxsize) | Async tasks |
queue.Queue | Có | Không | Có (maxsize) | Threading |
multiprocessing.Queue | Có | Có | Có | Cross-process |
asyncio.PriorityQueue | N/A | Không | Có | Priority scheduling |
Backpressure Strategies
Khi producer nhanh hơn consumer, có 3 strategies:
- Block producer (bounded queue) — đơn giản nhất, đúng nhất cho hầu hết cases
- Drop messages — acceptable cho metrics, logging (lossy)
- Buffer to disk — khi data quan trọng nhưng memory hạn chế
Trade-offs
Producer-Consumer tốt khi: Cần decouple, rate khác nhau giữa stages, cần buffer/backpressure. Yếu khi: Latency-sensitive (queue thêm latency), debugging khó (messages "biến mất" trong queue).
Actor Model tốt khi: Concurrent access vào shared state, cần isolation. Yếu khi: Communication overhead (message passing chậm hơn direct call), reasoning about message ordering phức tạp.
Checklist ghi nhớ
✅ Checklist triển khai
Producer-Consumer
- [ ] Luôn dùng bounded queue (
maxsize) — tránh OOM - [ ] Sentinel value (None) để signal completion
- [ ]
task_done()sau mỗi item — chojoin()hoạt động đúng - [ ] Handle consumer exceptions — không để worker chết im lặng
Worker Pool
- [ ] Giới hạn workers hợp lý — không quá số cores (CPU) hoặc FD limit (I/O)
- [ ] Implement graceful shutdown — drain queue trước khi stop
- [ ] Track job status — cho phép query kết quả
Fan-out/Fan-in
- [ ] Semaphore giới hạn concurrent fan-out
- [ ] Handle partial failures — một task fail không nên block tất cả
- [ ] Aggregator function cho fan-in — rõ ràng cách gom kết quả
Pipeline
- [ ] Mỗi stage là unit độc lập — testable riêng
- [ ] Bounded queues giữa stages — backpressure cascade
- [ ] Monitor queue sizes — phát hiện bottleneck stage
- [ ] Error handling per-stage — bad record không block pipeline
Bài tập luyện tập
Bài 1: Rate-limited API Client — Advanced
Đề bài: Implement RateLimitedClient dùng token bucket algorithm. Giới hạn 10 requests/giây, burst tối đa 20. Test với 100 requests.
🧠 Quiz
Câu hỏi: Trong producer-consumer pattern, tại sao nên dùng bounded queue thay vì unbounded?
- [ ] A. Bounded queue nhanh hơn
- [ ] B. Unbounded queue không thread-safe
- [x] C. Bounded queue tạo backpressure tự nhiên — producer chậm lại khi consumer không kịp xử lý
- [ ] D. Python không hỗ trợ unbounded queue Giải thích: Bounded queue block producer khi đầy (
maxsize), tạo backpressure — cơ chế tự nhiên cân bằng tốc độ producer-consumer. Không có backpressure, producer nhanh sẽ fill memory. A sai vì performance gần như giống nhau. B sai vì cả hai đều thread-safe. D sai vìasyncio.Queue()mặc định unbounded.
💡 Gợi ý
- Dùng token bucket: mỗi giây thêm
ratetokens, tối đacapacity asyncio.Lockbảo vệ token state- Khi hết tokens →
asyncio.sleepđủ thời gian
✅ Lời giải
python
import asyncio
import time
class TokenBucketRateLimiter:
def __init__(self, rate: float, capacity: float):
self._rate = rate
self._capacity = capacity
self._tokens = capacity
self._last_refill = time.monotonic()
self._lock = asyncio.Lock()
async def acquire(self):
async with self._lock:
now = time.monotonic()
elapsed = now - self._last_refill
self._tokens = min(self._capacity, self._tokens + elapsed * self._rate)
self._last_refill = now
if self._tokens >= 1.0:
self._tokens -= 1.0
return
wait = (1.0 - self._tokens) / self._rate
await asyncio.sleep(wait)
self._tokens = 0
async def api_call(limiter: TokenBucketRateLimiter, request_id: int) -> dict:
await limiter.acquire()
await asyncio.sleep(0.01) # Giả lập API latency
return {"id": request_id, "ts": time.monotonic()}
async def main():
limiter = TokenBucketRateLimiter(rate=10.0, capacity=20.0)
start = time.perf_counter()
tasks = [api_call(limiter, i) for i in range(100)]
results = await asyncio.gather(*tasks)
elapsed = time.perf_counter() - start
print(f"100 requests trong {elapsed:.1f}s")
print(f"Effective rate: {100 / elapsed:.1f} req/s")
# Kỳ vọng: ~10s (burst 20 ngay, còn 80 ở rate 10/s)
asyncio.run(main())Phân tích: Burst 20 requests đầu tiên ngay lập tức (capacity=20). 80 requests còn lại ở rate 10/s = 8s. Tổng ~8-9s.
Bài 2: Multi-stage Pipeline — Advanced
Đề bài: Xây ETL pipeline 3 stages: Extract (đọc raw data), Transform (parse + validate), Load (ghi output). Mỗi stage có multiple workers. Dùng asyncio.Queue giữa các stages.
💡 Gợi ý
- 3 bounded queues nối 3 stages
- Sentinel propagation: khi extract xong → gửi None vào transform queue
- Mỗi stage: while True, get(), process(), put()
✅ Lời giải
python
import asyncio
async def extract_worker(raw_q, transform_q, worker_id):
while True:
item = await raw_q.get()
if item is None:
await transform_q.put(None)
break
# Extract: parse raw string
await asyncio.sleep(0.005)
await transform_q.put({"raw": item, "extracted": True})
raw_q.task_done()
async def transform_worker(transform_q, load_q, worker_id):
while True:
item = await transform_q.get()
if item is None:
await load_q.put(None)
break
# Transform: validate + enrich
await asyncio.sleep(0.01)
item["transformed"] = True
await load_q.put(item)
transform_q.task_done()
async def load_worker(load_q, results, worker_id):
while True:
item = await load_q.get()
if item is None:
break
# Load: write to storage
await asyncio.sleep(0.005)
results.append(item)
load_q.task_done()
async def main():
raw_q = asyncio.Queue(maxsize=50)
transform_q = asyncio.Queue(maxsize=50)
load_q = asyncio.Queue(maxsize=50)
results = []
# Feed raw data
for i in range(100):
await raw_q.put(f"record-{i}")
await raw_q.put(None)
async with asyncio.TaskGroup() as tg:
tg.create_task(extract_worker(raw_q, transform_q, 0))
tg.create_task(transform_worker(transform_q, load_q, 0))
tg.create_task(load_worker(load_q, results, 0))
print(f"Pipeline: {len(results)} records processed")
assert all(r["extracted"] and r["transformed"] for r in results)
asyncio.run(main())Phân tích: Data chảy qua 3 stages tuần tự. Bounded queues tạo backpressure — nếu transform chậm, extract tự động chờ. Mở rộng: tăng workers per stage cho stage chậm nhất.
Liên kết học tiếp
Từ khóa glossary: producer-consumer, worker pool, actor model, fan-out/fan-in, pipeline, backpressure, rate limiting, task queue
Tìm kiếm liên quan: python concurrent patterns, producer consumer asyncio, actor model python, fan out fan in