Giao diện
Concurrent Patterns Production
Battle-tested patterns cho concurrent Python applications
Learning Outcomes
Sau bài này, bạn sẽ:
- Implement Producer-Consumer pattern với queues
- Xây dựng Worker Pools cho batch processing
- Implement Rate Limiting để bảo vệ external services
- Thiết kế real-world concurrent architectures
- Chọn đúng pattern cho từng use case
Producer-Consumer Pattern
Basic Pattern với asyncio.Queue
python
import asyncio
from dataclasses import dataclass
from typing import Any
@dataclass
class Task:
id: int
data: Any
async def producer(queue: asyncio.Queue, num_tasks: int):
"""Produce tasks to queue."""
for i in range(num_tasks):
task = Task(id=i, data=f"data_{i}")
await queue.put(task)
print(f"Produced: {task.id}")
await asyncio.sleep(0.1) # Simulate work
# Signal completion
await queue.put(None)
async def consumer(queue: asyncio.Queue, consumer_id: int):
"""Consume tasks from queue."""
while True:
task = await queue.get()
if task is None:
# Pass sentinel to next consumer
await queue.put(None)
break
print(f"Consumer {consumer_id} processing: {task.id}")
await asyncio.sleep(0.2) # Simulate processing
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=10) # Bounded queue
# Start producer and consumers
async with asyncio.TaskGroup() as tg:
tg.create_task(producer(queue, num_tasks=20))
for i in range(3):
tg.create_task(consumer(queue, consumer_id=i))
asyncio.run(main())Multiple Producers, Multiple Consumers
python
import asyncio
from dataclasses import dataclass
from typing import Optional
@dataclass
class WorkItem:
producer_id: int
item_id: int
payload: str
class ProducerConsumerSystem:
def __init__(self, num_producers: int, num_consumers: int, queue_size: int = 100):
self.queue: asyncio.Queue[Optional[WorkItem]] = asyncio.Queue(maxsize=queue_size)
self.num_producers = num_producers
self.num_consumers = num_consumers
self.results: list = []
self._results_lock = asyncio.Lock()
async def producer(self, producer_id: int, num_items: int):
for i in range(num_items):
item = WorkItem(
producer_id=producer_id,
item_id=i,
payload=f"data_{producer_id}_{i}"
)
await self.queue.put(item)
await asyncio.sleep(0.05)
async def consumer(self, consumer_id: int):
while True:
item = await self.queue.get()
if item is None:
await self.queue.put(None) # Propagate sentinel
break
# Process item
result = await self._process(item)
async with self._results_lock:
self.results.append(result)
self.queue.task_done()
async def _process(self, item: WorkItem) -> dict:
await asyncio.sleep(0.1) # Simulate processing
return {"item": item, "processed": True}
async def run(self, items_per_producer: int) -> list:
async with asyncio.TaskGroup() as tg:
# Start producers
for i in range(self.num_producers):
tg.create_task(self.producer(i, items_per_producer))
# Start consumers
for i in range(self.num_consumers):
tg.create_task(self.consumer(i))
# Wait for all items to be produced, then signal completion
async def signal_completion():
await asyncio.sleep(items_per_producer * 0.05 * self.num_producers + 1)
await self.queue.put(None)
tg.create_task(signal_completion())
return self.results
# Usage
async def main():
system = ProducerConsumerSystem(num_producers=3, num_consumers=5)
results = await system.run(items_per_producer=10)
print(f"Processed {len(results)} items")
asyncio.run(main())Worker Pools
Async Worker Pool
python
import asyncio
from typing import Callable, Any, TypeVar
from dataclasses import dataclass
T = TypeVar('T')
R = TypeVar('R')
@dataclass
class Job:
id: int
func: Callable
args: tuple
kwargs: dict
class AsyncWorkerPool:
def __init__(self, num_workers: int, queue_size: int = 100):
self.num_workers = num_workers
self.queue: asyncio.Queue = asyncio.Queue(maxsize=queue_size)
self.results: dict[int, Any] = {}
self._workers: list[asyncio.Task] = []
self._running = False
async def _worker(self, worker_id: int):
while self._running:
try:
job = await asyncio.wait_for(self.queue.get(), timeout=1.0)
except asyncio.TimeoutError:
continue
if job is None:
break
try:
result = await job.func(*job.args, **job.kwargs)
self.results[job.id] = {"status": "success", "result": result}
except Exception as e:
self.results[job.id] = {"status": "error", "error": str(e)}
finally:
self.queue.task_done()
async def start(self):
self._running = True
self._workers = [
asyncio.create_task(self._worker(i))
for i in range(self.num_workers)
]
async def submit(self, job_id: int, func: Callable, *args, **kwargs):
job = Job(id=job_id, func=func, args=args, kwargs=kwargs)
await self.queue.put(job)
async def shutdown(self, wait: bool = True):
if wait:
await self.queue.join()
self._running = False
for _ in range(self.num_workers):
await self.queue.put(None)
await asyncio.gather(*self._workers, return_exceptions=True)
def get_result(self, job_id: int) -> Any:
return self.results.get(job_id)
# Usage
async def process_data(data: str) -> str:
await asyncio.sleep(0.1)
return data.upper()
async def main():
pool = AsyncWorkerPool(num_workers=5)
await pool.start()
# Submit jobs
for i in range(20):
await pool.submit(i, process_data, f"item_{i}")
# Wait and shutdown
await pool.shutdown(wait=True)
# Get results
for i in range(20):
print(f"Job {i}: {pool.get_result(i)}")
asyncio.run(main())Batch Processing Pool
python
import asyncio
from typing import TypeVar, Callable, Iterable
from itertools import islice
T = TypeVar('T')
R = TypeVar('R')
class BatchProcessor:
def __init__(self, batch_size: int = 10, max_concurrent: int = 5):
self.batch_size = batch_size
self.semaphore = asyncio.Semaphore(max_concurrent)
async def process_batch(
self,
items: Iterable[T],
processor: Callable[[T], R]
) -> list[R]:
"""Process items in batches with concurrency control."""
results = []
# Convert to list for batching
items_list = list(items)
# Create batches
batches = [
items_list[i:i + self.batch_size]
for i in range(0, len(items_list), self.batch_size)
]
async def process_single_batch(batch: list[T]) -> list[R]:
async with self.semaphore:
tasks = [processor(item) for item in batch]
return await asyncio.gather(*tasks)
# Process all batches
async with asyncio.TaskGroup() as tg:
batch_tasks = [
tg.create_task(process_single_batch(batch))
for batch in batches
]
# Flatten results
for task in batch_tasks:
results.extend(task.result())
return results
# Usage
async def fetch_user(user_id: int) -> dict:
await asyncio.sleep(0.1)
return {"id": user_id, "name": f"User {user_id}"}
async def main():
processor = BatchProcessor(batch_size=10, max_concurrent=3)
user_ids = list(range(100))
users = await processor.process_batch(user_ids, fetch_user)
print(f"Fetched {len(users)} users")
asyncio.run(main())Rate Limiting
Token Bucket Rate Limiter
python
import asyncio
import time
from dataclasses import dataclass
@dataclass
class RateLimiter:
"""Token bucket rate limiter."""
rate: float # tokens per second
capacity: float # max tokens
def __post_init__(self):
self.tokens = self.capacity
self.last_update = time.monotonic()
self._lock = asyncio.Lock()
async def acquire(self, tokens: float = 1.0) -> float:
"""Acquire tokens, waiting if necessary. Returns wait time."""
async with self._lock:
now = time.monotonic()
# Refill tokens
elapsed = now - self.last_update
self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
self.last_update = now
if self.tokens >= tokens:
self.tokens -= tokens
return 0.0
# Calculate wait time
wait_time = (tokens - self.tokens) / self.rate
return wait_time
async def wait_and_acquire(self, tokens: float = 1.0):
"""Wait until tokens are available, then acquire."""
wait_time = await self.acquire(tokens)
if wait_time > 0:
await asyncio.sleep(wait_time)
await self.acquire(tokens)
# Usage
async def rate_limited_api_call(limiter: RateLimiter, url: str) -> dict:
await limiter.wait_and_acquire()
# Make API call
await asyncio.sleep(0.1) # Simulate API call
return {"url": url, "status": "success"}
async def main():
# 10 requests per second, burst of 20
limiter = RateLimiter(rate=10.0, capacity=20.0)
urls = [f"https://api.example.com/{i}" for i in range(50)]
start = time.perf_counter()
async with asyncio.TaskGroup() as tg:
tasks = [
tg.create_task(rate_limited_api_call(limiter, url))
for url in urls
]
elapsed = time.perf_counter() - start
print(f"50 requests in {elapsed:.2f}s (rate limited to ~10/s)")
asyncio.run(main())
]Sliding Window Rate Limiter
python
import asyncio
import time
from collections import deque
from dataclasses import dataclass, field
@dataclass
class SlidingWindowLimiter:
"""Sliding window rate limiter - more accurate than token bucket."""
max_requests: int
window_seconds: float
timestamps: deque = field(default_factory=deque)
_lock: asyncio.Lock = field(default_factory=asyncio.Lock)
async def is_allowed(self) -> bool:
"""Check if request is allowed."""
async with self._lock:
now = time.monotonic()
window_start = now - self.window_seconds
# Remove old timestamps
while self.timestamps and self.timestamps[0] < window_start:
self.timestamps.popleft()
if len(self.timestamps) < self.max_requests:
self.timestamps.append(now)
return True
return False
async def wait_if_needed(self) -> float:
"""Wait until request is allowed. Returns wait time."""
async with self._lock:
now = time.monotonic()
window_start = now - self.window_seconds
# Remove old timestamps
while self.timestamps and self.timestamps[0] < window_start:
self.timestamps.popleft()
if len(self.timestamps) < self.max_requests:
self.timestamps.append(now)
return 0.0
# Calculate wait time
oldest = self.timestamps[0]
wait_time = oldest + self.window_seconds - now
if wait_time > 0:
await asyncio.sleep(wait_time)
# Re-check and add timestamp
now = time.monotonic()
window_start = now - self.window_seconds
while self.timestamps and self.timestamps[0] < window_start:
self.timestamps.popleft()
self.timestamps.append(now)
return wait_time
# Usage
async def main():
# 5 requests per 1 second window
limiter = SlidingWindowLimiter(max_requests=5, window_seconds=1.0)
for i in range(20):
wait_time = await limiter.wait_if_needed()
if wait_time > 0:
print(f"Request {i}: waited {wait_time:.2f}s")
else:
print(f"Request {i}: immediate")
asyncio.run(main())Per-Client Rate Limiting
python
import asyncio
from dataclasses import dataclass, field
from typing import Dict
@dataclass
class PerClientRateLimiter:
"""Rate limiter with per-client tracking."""
rate: float
capacity: float
clients: Dict[str, 'RateLimiter'] = field(default_factory=dict)
_lock: asyncio.Lock = field(default_factory=asyncio.Lock)
async def get_limiter(self, client_id: str) -> 'RateLimiter':
async with self._lock:
if client_id not in self.clients:
self.clients[client_id] = RateLimiter(
rate=self.rate,
capacity=self.capacity
)
return self.clients[client_id]
async def acquire(self, client_id: str, tokens: float = 1.0):
limiter = await self.get_limiter(client_id)
await limiter.wait_and_acquire(tokens)
# Usage in FastAPI
"""
from fastapi import FastAPI, Request, HTTPException
app = FastAPI()
rate_limiter = PerClientRateLimiter(rate=10.0, capacity=20.0)
@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
client_ip = request.client.host
limiter = await rate_limiter.get_limiter(client_ip)
if not await limiter.is_allowed():
raise HTTPException(status_code=429, detail="Rate limit exceeded")
return await call_next(request)
"""Real-world Architectures
Web Scraper Architecture
import asyncio import aiohttp from dataclasses import dataclass from typing import Optional from urllib.parse import urljoin
@dataclass class ScrapedPage: url: str status: int content: Optional[str] links: list[str]
class WebScraper: def init( self, max_concurrent: int = 10, rate_limit: float = 5.0, # requests per second timeout: float = 30.0 ): self.semaphore = asyncio.Semaphore(max_concurrent) self.rate_limiter = RateLimiter(rate=rate_limit, capacity=rate_limit * 2) self.timeout = aiohttp.ClientTimeout(total=timeout) self.visited: set[str] = set() self.results: list[ScrapedPage] = [] self._lock = asyncio.Lock()
async def fetch_page(self, session: aiohttp.ClientSession, url: str) -> ScrapedPage:
async with self.semaphore:
await self.rate_limiter.wait_and_acquire()
try:
async with session.get(url) as response:
content = await response.text()
links = self._extract_links(content, url)
return ScrapedPage(
url=url,
status=response.status,
content=content,
links=links
)
except Exception as e:
return ScrapedPage(
url=url,
status=0,
content=None,
links=[]
)
def _extract_links(self, content: str, base_url: str) -> list[str]:
# Simplified link extraction
import re
links = re.findall(r'href=["\']([^"\']+)["\']', content)
return [urljoin(base_url, link) for link in links[:10]]
async def scrape(self, start_urls: list[str], max_pages: int = 100) -> list[ScrapedPage]:
queue: asyncio.Queue[str] = asyncio.Queue()
for url in start_urls:
await queue.put(url)
async with aiohttp.ClientSession(timeout=self.timeout) as session:
async def worker():
while True:
try:
url = await asyncio.wait_for(queue.get(), timeout=5.0)
except asyncio.TimeoutError:
break
async with self._lock:
if url in self.visited or len(self.results) >= max_pages:
queue.task_done()
continue
self.visited.add(url)
page = await self.fetch_page(session, url)
async with self._lock:
self.results.append(page)
# Add new links to queue
for link in page.links:
if link not in self.visited and len(self.visited) < max_pages:
await queue.put(link)
queue.task_done()
# Start workers
workers = [asyncio.create_task(worker()) for _ in range(10)]
# Wait for queue to be processed
await queue.join()
# Cancel workers
for w in workers:
w.cancel()
return self.results
Usage
async def main(): scraper = WebScraper(max_concurrent=10, rate_limit=5.0) results = await scraper.scrape( start_urls=["https://example.com"], max_pages=50 ) print(f"Scraped {len(results)} pages")
asyncio.run(main()) ]) asyncio.run(main())
### Background Task Manager
```python
import asyncio
from dataclasses import dataclass, field
from typing import Callable, Any, Optional
from enum import Enum
from datetime import datetime
class TaskStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
@dataclass
class BackgroundTask:
id: str
func: Callable
args: tuple
kwargs: dict
status: TaskStatus = TaskStatus.PENDING
result: Any = None
error: Optional[str] = None
created_at: datetime = field(default_factory=datetime.now)
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
class BackgroundTaskManager:
def __init__(self, max_workers: int = 10):
self.max_workers = max_workers
self.queue: asyncio.Queue[BackgroundTask] = asyncio.Queue()
self.tasks: dict[str, BackgroundTask] = {}
self._workers: list[asyncio.Task] = []
self._running = False
self._lock = asyncio.Lock()
async def start(self):
self._running = True
self._workers = [
asyncio.create_task(self._worker(i))
for i in range(self.max_workers)
]
async def _worker(self, worker_id: int):
while self._running:
try:
task = await asyncio.wait_for(self.queue.get(), timeout=1.0)
except asyncio.TimeoutError:
continue
if task is None:
break
task.status = TaskStatus.RUNNING
task.started_at = datetime.now()
try:
if asyncio.iscoroutinefunction(task.func):
result = await task.func(*task.args, **task.kwargs)
else:
result = task.func(*task.args, **task.kwargs)
task.result = result
task.status = TaskStatus.COMPLETED
except Exception as e:
task.error = str(e)
task.status = TaskStatus.FAILED
finally:
task.completed_at = datetime.now()
self.queue.task_done()
async def submit(self, task_id: str, func: Callable, *args, **kwargs) -> BackgroundTask:
task = BackgroundTask(
id=task_id,
func=func,
args=args,
kwargs=kwargs
)
async with self._lock:
self.tasks[task_id] = task
await self.queue.put(task)
return task
def get_task(self, task_id: str) -> Optional[BackgroundTask]:
return self.tasks.get(task_id)
async def cancel_task(self, task_id: str) -> bool:
task = self.tasks.get(task_id)
if task and task.status == TaskStatus.PENDING:
task.status = TaskStatus.CANCELLED
return True
return False
async def shutdown(self, wait: bool = True):
if wait:
await self.queue.join()
self._running = False
for _ in range(self.max_workers):
await self.queue.put(None)
await asyncio.gather(*self._workers, return_exceptions=True)
# Usage with FastAPI
"""
from fastapi import FastAPI, BackgroundTasks
import uuid
app = FastAPI()
task_manager = BackgroundTaskManager(max_workers=5)
@app.on_event("startup")
async def startup():
await task_manager.start()
@app.on_event("shutdown")
async def shutdown():
await task_manager.shutdown()
@app.post("/tasks")
async def create_task(data: dict):
task_id = str(uuid.uuid4())
await task_manager.submit(task_id, process_data, data)
return {"task_id": task_id}
@app.get("/tasks/{task_id}")
async def get_task_status(task_id: str):
task = task_manager.get_task(task_id)
if not task:
raise HTTPException(404, "Task not found")
return {
"id": task.id,
"status": task.status.value,
"result": task.result,
"error": task.error
}
"""Production Pitfalls
⚠️ NHỮNG LỖI THƯỜNG GẶP
1. Unbounded Queues
python
# ❌ SAI - Queue có thể grow vô hạn
queue = asyncio.Queue() # No limit!
async def producer():
while True:
await queue.put(data) # Memory explosion!
# ✅ ĐÚNG - Bounded queue với backpressure
queue = asyncio.Queue(maxsize=1000)
async def producer():
while True:
await queue.put(data) # Blocks when full2. Không handle worker failures
python
# ❌ SAI - Worker crash = silent failure
async def bad_worker():
while True:
item = await queue.get()
process(item) # Exception kills worker!
# ✅ ĐÚNG - Handle exceptions
async def good_worker():
while True:
item = await queue.get()
try:
process(item)
except Exception as e:
logger.error(f"Worker error: {e}")
finally:
queue.task_done()3. Rate limiter không thread-safe
python
# ❌ SAI - Race condition
class BadRateLimiter:
def acquire(self):
if self.tokens > 0: # Check
self.tokens -= 1 # Modify - RACE!
return True
return False
# ✅ ĐÚNG - Dùng lock
class GoodRateLimiter:
async def acquire(self):
async with self._lock:
if self.tokens > 0:
self.tokens -= 1
return True
return False4. Quên cleanup resources
python
# ❌ SAI - Resource leak
async def bad_scraper():
session = aiohttp.ClientSession()
# ... scrape ...
# Session never closed!
# ✅ ĐÚNG - Context manager
async def good_scraper():
async with aiohttp.ClientSession() as session:
# ... scrape ...
# Session automatically closedPattern Selection Guide
| Use Case | Pattern | Key Components |
|---|---|---|
| API data ingestion | Producer-Consumer | Queue + Workers |
| Batch processing | Worker Pool | Pool + Semaphore |
| External API calls | Rate Limiting | Token Bucket |
| Web scraping | Combined | Queue + Rate Limit + Semaphore |
| Background jobs | Task Manager | Queue + Status Tracking |
| Real-time processing | Streaming | Async generators |
Bảng Tóm tắt
python
# === PRODUCER-CONSUMER ===
queue = asyncio.Queue(maxsize=100)
async def producer():
await queue.put(item)
await queue.put(None) # Sentinel
async def consumer():
while (item := await queue.get()) is not None:
process(item)
queue.task_done()
# === WORKER POOL ===
semaphore = asyncio.Semaphore(10)
async def worker(item):
async with semaphore:
return await process(item)
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(worker(item)) for item in items]
# === RATE LIMITING ===
class RateLimiter:
async def acquire(self):
async with self._lock:
# Token bucket logic
pass
limiter = RateLimiter(rate=10.0, capacity=20.0)
await limiter.wait_and_acquire()
# === BATCH PROCESSING ===
async def process_batch(items, batch_size=10):
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
await asyncio.gather(*[process(item) for item in batch])
# === BACKGROUND TASKS ===
class TaskManager:
async def submit(self, task_id, func, *args):
task = Task(id=task_id, func=func, args=args)
await self.queue.put(task)
return task
def get_status(self, task_id):
return self.tasks[task_id].status