Skip to content

Token Bucket - Rate Limiting Engine

"Không có rate limiting, một user có thể destroy cả hệ thống." - HPN

Định nghĩa

Token Bucket là thuật toán rate limiting:

  • Bucket chứa tokens (số request được phép)
  • Tokens được refill theo thời gian (constant rate)
  • Mỗi request consume 1 token
  • Nếu hết tokens → Request bị reject

📘 Two Main Algorithms

Token Bucket: Cho phép burst (nếu bucket đầy) Leaky Bucket: Traffic smooth đều đặn (no burst)

Tại sao cần Rate Limiting?

Bài toán: DDoS Protection

Một user gửi 10,000 requests/giây để crash API:

Without Rate LimitWith Token Bucket
Server overloadMax 100 req/s per user
Database crashBurst up to 50 requests
All users affectedOnly abuser blocked

🎯 REAL INCIDENT

2016 Dyn DDoS Attack: Mirai botnet gửi 1.2 Tbps traffic. Rate limiting là first line of defense.

Mô hình hóa

Token Bucket Visualization

              ┌────────────────────────┐
              │        BUCKET          │
              │   Capacity: 10 tokens  │
              │  ┌─────────────────┐   │
   Refill     │  │ 🪙🪙🪙🪙🪙🪙    │   │    Consume
   (2/sec) →  │  │ 🪙🪙🪙🪙       │   │ → (per request)
              │  └─────────────────┘   │
              │     Current: 10        │
              └────────────────────────┘

Time 0: Bucket full (10 tokens)
Time 0: 5 requests → Consume 5 tokens → 5 remaining
Time 1: Refill 2 → 7 tokens
Time 1: 10 requests → Consume 7 → 3 REJECTED!
Time 2: Refill 2 → 2 tokens

State Diagram

Cơ chế Hoạt động

Core Logic

class TokenBucket:
    capacity = 10       # Max tokens
    tokens = 10         # Current tokens
    refill_rate = 2     # Tokens per second
    last_refill = now()

    def allow_request():
        # Step 1: Refill based on time elapsed
        elapsed = now() - last_refill
        tokens = min(capacity, tokens + elapsed * refill_rate)
        last_refill = now()
        
        # Step 2: Try to consume
        if tokens >= 1:
            tokens -= 1
            return ALLOW
        else:
            return REJECT (HTTP 429)

Key Parameters

ParameterMeaningTypical Value
capacityMax burst size50-100 requests
refill_rateSustained rate10-100 req/sec

Production Implementation

python
# HPN Engineering Standard
# Implementation: Token Bucket Rate Limiter

from typing import Optional
import time
from threading import Lock
from dataclasses import dataclass


@dataclass
class RateLimitConfig:
    """Configuration for rate limiter."""
    capacity: int          # Maximum tokens (burst size)
    refill_rate: float     # Tokens per second
    initial_tokens: Optional[int] = None  # Starting tokens (default: capacity)


class TokenBucket:
    """
    Token Bucket rate limiter implementation.
    
    Features:
        - Thread-safe
        - Allows controlled bursts
        - O(1) time per request
        - No background threads needed
    
    Use Cases:
        - API rate limiting
        - DDoS protection
        - Resource access control
    """
    
    def __init__(self, config: RateLimitConfig) -> None:
        self.capacity = config.capacity
        self.refill_rate = config.refill_rate
        self.tokens = config.initial_tokens or config.capacity
        self.last_refill = time.monotonic()
        self._lock = Lock()
    
    def allow(self, tokens_needed: int = 1) -> bool:
        """
        Check if request is allowed and consume tokens.
        
        Args:
            tokens_needed: Number of tokens to consume (default: 1)
            
        Returns:
            True if allowed, False if rate limited
        """
        with self._lock:
            self._refill()
            
            if self.tokens >= tokens_needed:
                self.tokens -= tokens_needed
                return True
            return False
    
    def try_acquire(self, tokens_needed: int = 1) -> tuple[bool, float]:
        """
        Try to acquire tokens, return wait time if not available.
        
        Returns:
            (allowed, wait_time_seconds)
            If allowed=True, wait_time=0
            If allowed=False, wait_time=seconds until tokens available
        """
        with self._lock:
            self._refill()
            
            if self.tokens >= tokens_needed:
                self.tokens -= tokens_needed
                return True, 0.0
            
            # Calculate wait time
            tokens_deficit = tokens_needed - self.tokens
            wait_time = tokens_deficit / self.refill_rate
            return False, wait_time
    
    def _refill(self) -> None:
        """Add tokens based on elapsed time."""
        now = time.monotonic()
        elapsed = now - self.last_refill
        
        # Add tokens proportional to time
        tokens_to_add = elapsed * self.refill_rate
        self.tokens = min(self.capacity, self.tokens + tokens_to_add)
        self.last_refill = now
    
    @property
    def available_tokens(self) -> float:
        """Get current token count (for monitoring)."""
        with self._lock:
            self._refill()
            return self.tokens


class RateLimiter:
    """
    Multi-key rate limiter (e.g., per-user, per-IP).
    
    Use Case:
        - Each user has their own bucket
        - Shared across API endpoints
    """
    
    def __init__(self, default_config: RateLimitConfig) -> None:
        self.default_config = default_config
        self._buckets: dict[str, TokenBucket] = {}
        self._lock = Lock()
    
    def allow(self, key: str, tokens: int = 1) -> bool:
        """
        Check if request for given key is allowed.
        
        Args:
            key: Identifier (e.g., user_id, IP address)
            tokens: Tokens to consume
            
        Returns:
            True if allowed
        """
        bucket = self._get_or_create_bucket(key)
        return bucket.allow(tokens)
    
    def _get_or_create_bucket(self, key: str) -> TokenBucket:
        """Get existing bucket or create new one."""
        if key not in self._buckets:
            with self._lock:
                if key not in self._buckets:
                    self._buckets[key] = TokenBucket(self.default_config)
        return self._buckets[key]
    
    def get_status(self, key: str) -> dict:
        """Get rate limit status for key."""
        bucket = self._get_or_create_bucket(key)
        return {
            "available_tokens": bucket.available_tokens,
            "capacity": bucket.capacity,
            "refill_rate": bucket.refill_rate,
        }


# ============================================
# DECORATOR FOR FLASK/FASTAPI
# ============================================
def rate_limit(limiter: RateLimiter, key_func, tokens: int = 1):
    """
    Decorator for rate limiting endpoints.
    
    Usage:
        @rate_limit(limiter, key_func=lambda: request.remote_addr)
        def my_endpoint():
            ...
    """
    def decorator(func):
        def wrapper(*args, **kwargs):
            key = key_func()
            if not limiter.allow(key, tokens):
                # Return HTTP 429 Too Many Requests
                return {"error": "Rate limit exceeded"}, 429
            return func(*args, **kwargs)
        return wrapper
    return decorator


# ============================================
# USAGE EXAMPLE
# ============================================
if __name__ == "__main__":
    # Create rate limiter: 10 requests/second, burst up to 20
    config = RateLimitConfig(capacity=20, refill_rate=10)
    limiter = RateLimiter(config)
    
    print("=== Rate Limiting Demo ===\n")
    
    user_id = "user_123"
    
    # Simulate burst of requests
    print("Sending 25 requests burst...")
    allowed = 0
    rejected = 0
    
    for i in range(25):
        if limiter.allow(user_id):
            allowed += 1
        else:
            rejected += 1
    
    print(f"Allowed: {allowed}, Rejected: {rejected}")
    print(f"Status: {limiter.get_status(user_id)}")
    
    # Wait for refill
    print("\nWaiting 1 second for refill...")
    time.sleep(1)
    
    # Try again
    print("Sending 15 more requests...")
    allowed = 0
    rejected = 0
    
    for i in range(15):
        if limiter.allow(user_id):
            allowed += 1
        else:
            rejected += 1
    
    print(f"Allowed: {allowed}, Rejected: {rejected}")
    
    # Demonstrate try_acquire with wait time
    print("\n=== Wait Time Calculation ===")
    bucket = TokenBucket(RateLimitConfig(capacity=5, refill_rate=2))
    
    # Use all tokens
    for _ in range(5):
        bucket.allow()
    
    allowed, wait_time = bucket.try_acquire()
    print(f"Request blocked. Wait {wait_time:.2f} seconds for next token.")

Ứng dụng Thực tế

SystemImplementation
Nginxlimit_req_zone directive
AWS API GatewayUsage plans + API keys
Redisredis-cell module (GCRA algorithm)
KongRate Limiting plugin
CloudflareRate Limiting Rules

Nginx Example

nginx
# Define rate limit zone
limit_req_zone $binary_remote_addr zone=api_limit:10m rate=10r/s;

server {
    location /api/ {
        # Allow burst of 20, no delay for first 10
        limit_req zone=api_limit burst=20 nodelay;
        
        # Return 429 when rate limited
        limit_req_status 429;
        
        proxy_pass http://backend;
    }
}

Variants: Leaky Bucket vs Token Bucket

FeatureToken BucketLeaky Bucket
Burst✅ Allowed❌ No burst
Traffic shapeBurstySmooth
Use caseAPI limitingNetwork QoS
ImplementationSimplerNeeds queue

Anti-patterns

❌ TRÁNH

1. Global rate limit thay vì per-user

python
# ❌ WRONG: 1 user có thể dùng hết quota của mọi người
global_limiter = TokenBucket(capacity=1000)

def api_endpoint():
    if global_limiter.allow():
        return "OK"

# ✅ RIGHT: Per-user rate limiting
def api_endpoint(user_id):
    if per_user_limiter.allow(user_id):
        return "OK"

2. Capacity quá thấp

python
# ❌ WRONG: Legitimate burst bị block
config = RateLimitConfig(capacity=1, refill_rate=10)
# User gửi 5 requests cùng lúc (normal browser behavior)
# → 4 requests bị reject!

# ✅ RIGHT: Reasonable burst capacity
config = RateLimitConfig(capacity=20, refill_rate=10)

3. Không trả về Retry-After header

python
# ❌ WRONG: Client không biết khi nào thử lại
return {"error": "Rate limited"}, 429

# ✅ RIGHT: Include wait time
allowed, wait_time = bucket.try_acquire()
if not allowed:
    return (
        {"error": "Rate limited", "retry_after": wait_time},
        429,
        {"Retry-After": str(int(wait_time + 1))}
    )

💡 HPN's Rule

"Mọi public API PHẢI có rate limiting. Không có ngoại lệ."