Skip to content

Concurrent Patterns Production

Battle-tested patterns cho concurrent Python applications

Learning Outcomes

Sau bài này, bạn sẽ:

  • Implement Producer-Consumer pattern với queues
  • Xây dựng Worker Pools cho batch processing
  • Implement Rate Limiting để bảo vệ external services
  • Thiết kế real-world concurrent architectures
  • Chọn đúng pattern cho từng use case

Producer-Consumer Pattern

Basic Pattern với asyncio.Queue

python
import asyncio
from dataclasses import dataclass
from typing import Any

@dataclass
class Task:
    id: int
    data: Any

async def producer(queue: asyncio.Queue, num_tasks: int):
    """Produce tasks to queue."""
    for i in range(num_tasks):
        task = Task(id=i, data=f"data_{i}")
        await queue.put(task)
        print(f"Produced: {task.id}")
        await asyncio.sleep(0.1)  # Simulate work
    
    # Signal completion
    await queue.put(None)

async def consumer(queue: asyncio.Queue, consumer_id: int):
    """Consume tasks from queue."""
    while True:
        task = await queue.get()
        
        if task is None:
            # Pass sentinel to next consumer
            await queue.put(None)
            break
        
        print(f"Consumer {consumer_id} processing: {task.id}")
        await asyncio.sleep(0.2)  # Simulate processing
        queue.task_done()

async def main():
    queue = asyncio.Queue(maxsize=10)  # Bounded queue
    
    # Start producer and consumers
    async with asyncio.TaskGroup() as tg:
        tg.create_task(producer(queue, num_tasks=20))
        for i in range(3):
            tg.create_task(consumer(queue, consumer_id=i))

asyncio.run(main())

Multiple Producers, Multiple Consumers

python
import asyncio
from dataclasses import dataclass
from typing import Optional

@dataclass
class WorkItem:
    producer_id: int
    item_id: int
    payload: str

class ProducerConsumerSystem:
    def __init__(self, num_producers: int, num_consumers: int, queue_size: int = 100):
        self.queue: asyncio.Queue[Optional[WorkItem]] = asyncio.Queue(maxsize=queue_size)
        self.num_producers = num_producers
        self.num_consumers = num_consumers
        self.results: list = []
        self._results_lock = asyncio.Lock()
    
    async def producer(self, producer_id: int, num_items: int):
        for i in range(num_items):
            item = WorkItem(
                producer_id=producer_id,
                item_id=i,
                payload=f"data_{producer_id}_{i}"
            )
            await self.queue.put(item)
            await asyncio.sleep(0.05)
    
    async def consumer(self, consumer_id: int):
        while True:
            item = await self.queue.get()
            
            if item is None:
                await self.queue.put(None)  # Propagate sentinel
                break
            
            # Process item
            result = await self._process(item)
            
            async with self._results_lock:
                self.results.append(result)
            
            self.queue.task_done()
    
    async def _process(self, item: WorkItem) -> dict:
        await asyncio.sleep(0.1)  # Simulate processing
        return {"item": item, "processed": True}
    
    async def run(self, items_per_producer: int) -> list:
        async with asyncio.TaskGroup() as tg:
            # Start producers
            for i in range(self.num_producers):
                tg.create_task(self.producer(i, items_per_producer))
            
            # Start consumers
            for i in range(self.num_consumers):
                tg.create_task(self.consumer(i))
            
            # Wait for all items to be produced, then signal completion
            async def signal_completion():
                await asyncio.sleep(items_per_producer * 0.05 * self.num_producers + 1)
                await self.queue.put(None)
            
            tg.create_task(signal_completion())
        
        return self.results

# Usage
async def main():
    system = ProducerConsumerSystem(num_producers=3, num_consumers=5)
    results = await system.run(items_per_producer=10)
    print(f"Processed {len(results)} items")

asyncio.run(main())

Worker Pools

Async Worker Pool

python
import asyncio
from typing import Callable, Any, TypeVar
from dataclasses import dataclass

T = TypeVar('T')
R = TypeVar('R')

@dataclass
class Job:
    id: int
    func: Callable
    args: tuple
    kwargs: dict

class AsyncWorkerPool:
    def __init__(self, num_workers: int, queue_size: int = 100):
        self.num_workers = num_workers
        self.queue: asyncio.Queue = asyncio.Queue(maxsize=queue_size)
        self.results: dict[int, Any] = {}
        self._workers: list[asyncio.Task] = []
        self._running = False
    
    async def _worker(self, worker_id: int):
        while self._running:
            try:
                job = await asyncio.wait_for(self.queue.get(), timeout=1.0)
            except asyncio.TimeoutError:
                continue
            
            if job is None:
                break
            
            try:
                result = await job.func(*job.args, **job.kwargs)
                self.results[job.id] = {"status": "success", "result": result}
            except Exception as e:
                self.results[job.id] = {"status": "error", "error": str(e)}
            finally:
                self.queue.task_done()
    
    async def start(self):
        self._running = True
        self._workers = [
            asyncio.create_task(self._worker(i))
            for i in range(self.num_workers)
        ]
    
    async def submit(self, job_id: int, func: Callable, *args, **kwargs):
        job = Job(id=job_id, func=func, args=args, kwargs=kwargs)
        await self.queue.put(job)
    
    async def shutdown(self, wait: bool = True):
        if wait:
            await self.queue.join()
        
        self._running = False
        for _ in range(self.num_workers):
            await self.queue.put(None)
        
        await asyncio.gather(*self._workers, return_exceptions=True)
    
    def get_result(self, job_id: int) -> Any:
        return self.results.get(job_id)

# Usage
async def process_data(data: str) -> str:
    await asyncio.sleep(0.1)
    return data.upper()

async def main():
    pool = AsyncWorkerPool(num_workers=5)
    await pool.start()
    
    # Submit jobs
    for i in range(20):
        await pool.submit(i, process_data, f"item_{i}")
    
    # Wait and shutdown
    await pool.shutdown(wait=True)
    
    # Get results
    for i in range(20):
        print(f"Job {i}: {pool.get_result(i)}")

asyncio.run(main())

Batch Processing Pool

python
import asyncio
from typing import TypeVar, Callable, Iterable
from itertools import islice

T = TypeVar('T')
R = TypeVar('R')

class BatchProcessor:
    def __init__(self, batch_size: int = 10, max_concurrent: int = 5):
        self.batch_size = batch_size
        self.semaphore = asyncio.Semaphore(max_concurrent)
    
    async def process_batch(
        self,
        items: Iterable[T],
        processor: Callable[[T], R]
    ) -> list[R]:
        """Process items in batches with concurrency control."""
        results = []
        
        # Convert to list for batching
        items_list = list(items)
        
        # Create batches
        batches = [
            items_list[i:i + self.batch_size]
            for i in range(0, len(items_list), self.batch_size)
        ]
        
        async def process_single_batch(batch: list[T]) -> list[R]:
            async with self.semaphore:
                tasks = [processor(item) for item in batch]
                return await asyncio.gather(*tasks)
        
        # Process all batches
        async with asyncio.TaskGroup() as tg:
            batch_tasks = [
                tg.create_task(process_single_batch(batch))
                for batch in batches
            ]
        
        # Flatten results
        for task in batch_tasks:
            results.extend(task.result())
        
        return results

# Usage
async def fetch_user(user_id: int) -> dict:
    await asyncio.sleep(0.1)
    return {"id": user_id, "name": f"User {user_id}"}

async def main():
    processor = BatchProcessor(batch_size=10, max_concurrent=3)
    
    user_ids = list(range(100))
    users = await processor.process_batch(user_ids, fetch_user)
    
    print(f"Fetched {len(users)} users")

asyncio.run(main())

Rate Limiting

Token Bucket Rate Limiter

python
import asyncio
import time
from dataclasses import dataclass

@dataclass
class RateLimiter:
    """Token bucket rate limiter."""
    rate: float  # tokens per second
    capacity: float  # max tokens
    
    def __post_init__(self):
        self.tokens = self.capacity
        self.last_update = time.monotonic()
        self._lock = asyncio.Lock()
    
    async def acquire(self, tokens: float = 1.0) -> float:
        """Acquire tokens, waiting if necessary. Returns wait time."""
        async with self._lock:
            now = time.monotonic()
            
            # Refill tokens
            elapsed = now - self.last_update
            self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
            self.last_update = now
            
            if self.tokens >= tokens:
                self.tokens -= tokens
                return 0.0
            
            # Calculate wait time
            wait_time = (tokens - self.tokens) / self.rate
            return wait_time
    
    async def wait_and_acquire(self, tokens: float = 1.0):
        """Wait until tokens are available, then acquire."""
        wait_time = await self.acquire(tokens)
        if wait_time > 0:
            await asyncio.sleep(wait_time)
            await self.acquire(tokens)

# Usage
async def rate_limited_api_call(limiter: RateLimiter, url: str) -> dict:
    await limiter.wait_and_acquire()
    # Make API call
    await asyncio.sleep(0.1)  # Simulate API call
    return {"url": url, "status": "success"}

async def main():
    # 10 requests per second, burst of 20
    limiter = RateLimiter(rate=10.0, capacity=20.0)
    
    urls = [f"https://api.example.com/{i}" for i in range(50)]
    
    start = time.perf_counter()
    
    async with asyncio.TaskGroup() as tg:
        tasks = [
            tg.create_task(rate_limited_api_call(limiter, url))
            for url in urls
        ]
    
    elapsed = time.perf_counter() - start
    print(f"50 requests in {elapsed:.2f}s (rate limited to ~10/s)")

asyncio.run(main())
]

Sliding Window Rate Limiter

python
import asyncio
import time
from collections import deque
from dataclasses import dataclass, field

@dataclass
class SlidingWindowLimiter:
    """Sliding window rate limiter - more accurate than token bucket."""
    max_requests: int
    window_seconds: float
    timestamps: deque = field(default_factory=deque)
    _lock: asyncio.Lock = field(default_factory=asyncio.Lock)
    
    async def is_allowed(self) -> bool:
        """Check if request is allowed."""
        async with self._lock:
            now = time.monotonic()
            window_start = now - self.window_seconds
            
            # Remove old timestamps
            while self.timestamps and self.timestamps[0] < window_start:
                self.timestamps.popleft()
            
            if len(self.timestamps) < self.max_requests:
                self.timestamps.append(now)
                return True
            
            return False
    
    async def wait_if_needed(self) -> float:
        """Wait until request is allowed. Returns wait time."""
        async with self._lock:
            now = time.monotonic()
            window_start = now - self.window_seconds
            
            # Remove old timestamps
            while self.timestamps and self.timestamps[0] < window_start:
                self.timestamps.popleft()
            
            if len(self.timestamps) < self.max_requests:
                self.timestamps.append(now)
                return 0.0
            
            # Calculate wait time
            oldest = self.timestamps[0]
            wait_time = oldest + self.window_seconds - now
            
            if wait_time > 0:
                await asyncio.sleep(wait_time)
            
            # Re-check and add timestamp
            now = time.monotonic()
            window_start = now - self.window_seconds
            while self.timestamps and self.timestamps[0] < window_start:
                self.timestamps.popleft()
            
            self.timestamps.append(now)
            return wait_time

# Usage
async def main():
    # 5 requests per 1 second window
    limiter = SlidingWindowLimiter(max_requests=5, window_seconds=1.0)
    
    for i in range(20):
        wait_time = await limiter.wait_if_needed()
        if wait_time > 0:
            print(f"Request {i}: waited {wait_time:.2f}s")
        else:
            print(f"Request {i}: immediate")

asyncio.run(main())

Per-Client Rate Limiting

python
import asyncio
from dataclasses import dataclass, field
from typing import Dict

@dataclass
class PerClientRateLimiter:
    """Rate limiter with per-client tracking."""
    rate: float
    capacity: float
    clients: Dict[str, 'RateLimiter'] = field(default_factory=dict)
    _lock: asyncio.Lock = field(default_factory=asyncio.Lock)
    
    async def get_limiter(self, client_id: str) -> 'RateLimiter':
        async with self._lock:
            if client_id not in self.clients:
                self.clients[client_id] = RateLimiter(
                    rate=self.rate,
                    capacity=self.capacity
                )
            return self.clients[client_id]
    
    async def acquire(self, client_id: str, tokens: float = 1.0):
        limiter = await self.get_limiter(client_id)
        await limiter.wait_and_acquire(tokens)

# Usage in FastAPI
"""
from fastapi import FastAPI, Request, HTTPException

app = FastAPI()
rate_limiter = PerClientRateLimiter(rate=10.0, capacity=20.0)

@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
    client_ip = request.client.host
    
    limiter = await rate_limiter.get_limiter(client_ip)
    if not await limiter.is_allowed():
        raise HTTPException(status_code=429, detail="Rate limit exceeded")
    
    return await call_next(request)
"""

Real-world Architectures

Web Scraper Architecture

import asyncio import aiohttp from dataclasses import dataclass from typing import Optional from urllib.parse import urljoin

@dataclass class ScrapedPage: url: str status: int content: Optional[str] links: list[str]

class WebScraper: def init( self, max_concurrent: int = 10, rate_limit: float = 5.0, # requests per second timeout: float = 30.0 ): self.semaphore = asyncio.Semaphore(max_concurrent) self.rate_limiter = RateLimiter(rate=rate_limit, capacity=rate_limit * 2) self.timeout = aiohttp.ClientTimeout(total=timeout) self.visited: set[str] = set() self.results: list[ScrapedPage] = [] self._lock = asyncio.Lock()

async def fetch_page(self, session: aiohttp.ClientSession, url: str) -> ScrapedPage:
    async with self.semaphore:
        await self.rate_limiter.wait_and_acquire()
        
        try:
            async with session.get(url) as response:
                content = await response.text()
                links = self._extract_links(content, url)
                
                return ScrapedPage(
                    url=url,
                    status=response.status,
                    content=content,
                    links=links
                )
        except Exception as e:
            return ScrapedPage(
                url=url,
                status=0,
                content=None,
                links=[]
            )

def _extract_links(self, content: str, base_url: str) -> list[str]:
    # Simplified link extraction
    import re
    links = re.findall(r'href=["\']([^"\']+)["\']', content)
    return [urljoin(base_url, link) for link in links[:10]]

async def scrape(self, start_urls: list[str], max_pages: int = 100) -> list[ScrapedPage]:
    queue: asyncio.Queue[str] = asyncio.Queue()
    
    for url in start_urls:
        await queue.put(url)
    
    async with aiohttp.ClientSession(timeout=self.timeout) as session:
        async def worker():
            while True:
                try:
                    url = await asyncio.wait_for(queue.get(), timeout=5.0)
                except asyncio.TimeoutError:
                    break
                
                async with self._lock:
                    if url in self.visited or len(self.results) >= max_pages:
                        queue.task_done()
                        continue
                    self.visited.add(url)
                
                page = await self.fetch_page(session, url)
                
                async with self._lock:
                    self.results.append(page)
                    
                    # Add new links to queue
                    for link in page.links:
                        if link not in self.visited and len(self.visited) < max_pages:
                            await queue.put(link)
                
                queue.task_done()
        
        # Start workers
        workers = [asyncio.create_task(worker()) for _ in range(10)]
        
        # Wait for queue to be processed
        await queue.join()
        
        # Cancel workers
        for w in workers:
            w.cancel()
    
    return self.results

Usage

async def main(): scraper = WebScraper(max_concurrent=10, rate_limit=5.0) results = await scraper.scrape( start_urls=["https://example.com"], max_pages=50 ) print(f"Scraped {len(results)} pages")

asyncio.run(main()) ]) asyncio.run(main())



### Background Task Manager

```python
import asyncio
from dataclasses import dataclass, field
from typing import Callable, Any, Optional
from enum import Enum
from datetime import datetime

class TaskStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    CANCELLED = "cancelled"

@dataclass
class BackgroundTask:
    id: str
    func: Callable
    args: tuple
    kwargs: dict
    status: TaskStatus = TaskStatus.PENDING
    result: Any = None
    error: Optional[str] = None
    created_at: datetime = field(default_factory=datetime.now)
    started_at: Optional[datetime] = None
    completed_at: Optional[datetime] = None

class BackgroundTaskManager:
    def __init__(self, max_workers: int = 10):
        self.max_workers = max_workers
        self.queue: asyncio.Queue[BackgroundTask] = asyncio.Queue()
        self.tasks: dict[str, BackgroundTask] = {}
        self._workers: list[asyncio.Task] = []
        self._running = False
        self._lock = asyncio.Lock()
    
    async def start(self):
        self._running = True
        self._workers = [
            asyncio.create_task(self._worker(i))
            for i in range(self.max_workers)
        ]
    
    async def _worker(self, worker_id: int):
        while self._running:
            try:
                task = await asyncio.wait_for(self.queue.get(), timeout=1.0)
            except asyncio.TimeoutError:
                continue
            
            if task is None:
                break
            
            task.status = TaskStatus.RUNNING
            task.started_at = datetime.now()
            
            try:
                if asyncio.iscoroutinefunction(task.func):
                    result = await task.func(*task.args, **task.kwargs)
                else:
                    result = task.func(*task.args, **task.kwargs)
                
                task.result = result
                task.status = TaskStatus.COMPLETED
            except Exception as e:
                task.error = str(e)
                task.status = TaskStatus.FAILED
            finally:
                task.completed_at = datetime.now()
                self.queue.task_done()
    
    async def submit(self, task_id: str, func: Callable, *args, **kwargs) -> BackgroundTask:
        task = BackgroundTask(
            id=task_id,
            func=func,
            args=args,
            kwargs=kwargs
        )
        
        async with self._lock:
            self.tasks[task_id] = task
        
        await self.queue.put(task)
        return task
    
    def get_task(self, task_id: str) -> Optional[BackgroundTask]:
        return self.tasks.get(task_id)
    
    async def cancel_task(self, task_id: str) -> bool:
        task = self.tasks.get(task_id)
        if task and task.status == TaskStatus.PENDING:
            task.status = TaskStatus.CANCELLED
            return True
        return False
    
    async def shutdown(self, wait: bool = True):
        if wait:
            await self.queue.join()
        
        self._running = False
        for _ in range(self.max_workers):
            await self.queue.put(None)
        
        await asyncio.gather(*self._workers, return_exceptions=True)

# Usage with FastAPI
"""
from fastapi import FastAPI, BackgroundTasks
import uuid

app = FastAPI()
task_manager = BackgroundTaskManager(max_workers=5)

@app.on_event("startup")
async def startup():
    await task_manager.start()

@app.on_event("shutdown")
async def shutdown():
    await task_manager.shutdown()

@app.post("/tasks")
async def create_task(data: dict):
    task_id = str(uuid.uuid4())
    await task_manager.submit(task_id, process_data, data)
    return {"task_id": task_id}

@app.get("/tasks/{task_id}")
async def get_task_status(task_id: str):
    task = task_manager.get_task(task_id)
    if not task:
        raise HTTPException(404, "Task not found")
    return {
        "id": task.id,
        "status": task.status.value,
        "result": task.result,
        "error": task.error
    }
"""

Production Pitfalls

⚠️ NHỮNG LỖI THƯỜNG GẶP

1. Unbounded Queues

python
# ❌ SAI - Queue có thể grow vô hạn
queue = asyncio.Queue()  # No limit!

async def producer():
    while True:
        await queue.put(data)  # Memory explosion!

# ✅ ĐÚNG - Bounded queue với backpressure
queue = asyncio.Queue(maxsize=1000)

async def producer():
    while True:
        await queue.put(data)  # Blocks when full

2. Không handle worker failures

python
# ❌ SAI - Worker crash = silent failure
async def bad_worker():
    while True:
        item = await queue.get()
        process(item)  # Exception kills worker!

# ✅ ĐÚNG - Handle exceptions
async def good_worker():
    while True:
        item = await queue.get()
        try:
            process(item)
        except Exception as e:
            logger.error(f"Worker error: {e}")
        finally:
            queue.task_done()

3. Rate limiter không thread-safe

python
# ❌ SAI - Race condition
class BadRateLimiter:
    def acquire(self):
        if self.tokens > 0:  # Check
            self.tokens -= 1  # Modify - RACE!
            return True
        return False

# ✅ ĐÚNG - Dùng lock
class GoodRateLimiter:
    async def acquire(self):
        async with self._lock:
            if self.tokens > 0:
                self.tokens -= 1
                return True
            return False

4. Quên cleanup resources

python
# ❌ SAI - Resource leak
async def bad_scraper():
    session = aiohttp.ClientSession()
    # ... scrape ...
    # Session never closed!

# ✅ ĐÚNG - Context manager
async def good_scraper():
    async with aiohttp.ClientSession() as session:
        # ... scrape ...
    # Session automatically closed

Pattern Selection Guide

Use CasePatternKey Components
API data ingestionProducer-ConsumerQueue + Workers
Batch processingWorker PoolPool + Semaphore
External API callsRate LimitingToken Bucket
Web scrapingCombinedQueue + Rate Limit + Semaphore
Background jobsTask ManagerQueue + Status Tracking
Real-time processingStreamingAsync generators

Bảng Tóm tắt

python
# === PRODUCER-CONSUMER ===
queue = asyncio.Queue(maxsize=100)

async def producer():
    await queue.put(item)
    await queue.put(None)  # Sentinel

async def consumer():
    while (item := await queue.get()) is not None:
        process(item)
        queue.task_done()

# === WORKER POOL ===
semaphore = asyncio.Semaphore(10)

async def worker(item):
    async with semaphore:
        return await process(item)

async with asyncio.TaskGroup() as tg:
    tasks = [tg.create_task(worker(item)) for item in items]

# === RATE LIMITING ===
class RateLimiter:
    async def acquire(self):
        async with self._lock:
            # Token bucket logic
            pass

limiter = RateLimiter(rate=10.0, capacity=20.0)
await limiter.wait_and_acquire()

# === BATCH PROCESSING ===
async def process_batch(items, batch_size=10):
    for i in range(0, len(items), batch_size):
        batch = items[i:i + batch_size]
        await asyncio.gather(*[process(item) for item in batch])

# === BACKGROUND TASKS ===
class TaskManager:
    async def submit(self, task_id, func, *args):
        task = Task(id=task_id, func=func, args=args)
        await self.queue.put(task)
        return task
    
    def get_status(self, task_id):
        return self.tasks[task_id].status