Giao diện
Token Bucket - Rate Limiting Engine
"Không có rate limiting, một user có thể destroy cả hệ thống." - HPN
Định nghĩa
Token Bucket là thuật toán rate limiting:
- Bucket chứa tokens (số request được phép)
- Tokens được refill theo thời gian (constant rate)
- Mỗi request consume 1 token
- Nếu hết tokens → Request bị reject
📘 Two Main Algorithms
Token Bucket: Cho phép burst (nếu bucket đầy) Leaky Bucket: Traffic smooth đều đặn (no burst)
Tại sao cần Rate Limiting?
Bài toán: DDoS Protection
Một user gửi 10,000 requests/giây để crash API:
| Without Rate Limit | With Token Bucket |
|---|---|
| Server overload | Max 100 req/s per user |
| Database crash | Burst up to 50 requests |
| All users affected | Only abuser blocked |
🎯 REAL INCIDENT
2016 Dyn DDoS Attack: Mirai botnet gửi 1.2 Tbps traffic. Rate limiting là first line of defense.
Mô hình hóa
Token Bucket Visualization
┌────────────────────────┐
│ BUCKET │
│ Capacity: 10 tokens │
│ ┌─────────────────┐ │
Refill │ │ 🪙🪙🪙🪙🪙🪙 │ │ Consume
(2/sec) → │ │ 🪙🪙🪙🪙 │ │ → (per request)
│ └─────────────────┘ │
│ Current: 10 │
└────────────────────────┘
Time 0: Bucket full (10 tokens)
Time 0: 5 requests → Consume 5 tokens → 5 remaining
Time 1: Refill 2 → 7 tokens
Time 1: 10 requests → Consume 7 → 3 REJECTED!
Time 2: Refill 2 → 2 tokensState Diagram
Cơ chế Hoạt động
Core Logic
class TokenBucket:
capacity = 10 # Max tokens
tokens = 10 # Current tokens
refill_rate = 2 # Tokens per second
last_refill = now()
def allow_request():
# Step 1: Refill based on time elapsed
elapsed = now() - last_refill
tokens = min(capacity, tokens + elapsed * refill_rate)
last_refill = now()
# Step 2: Try to consume
if tokens >= 1:
tokens -= 1
return ALLOW
else:
return REJECT (HTTP 429)Key Parameters
| Parameter | Meaning | Typical Value |
|---|---|---|
| capacity | Max burst size | 50-100 requests |
| refill_rate | Sustained rate | 10-100 req/sec |
Production Implementation
python
# HPN Engineering Standard
# Implementation: Token Bucket Rate Limiter
from typing import Optional
import time
from threading import Lock
from dataclasses import dataclass
@dataclass
class RateLimitConfig:
"""Configuration for rate limiter."""
capacity: int # Maximum tokens (burst size)
refill_rate: float # Tokens per second
initial_tokens: Optional[int] = None # Starting tokens (default: capacity)
class TokenBucket:
"""
Token Bucket rate limiter implementation.
Features:
- Thread-safe
- Allows controlled bursts
- O(1) time per request
- No background threads needed
Use Cases:
- API rate limiting
- DDoS protection
- Resource access control
"""
def __init__(self, config: RateLimitConfig) -> None:
self.capacity = config.capacity
self.refill_rate = config.refill_rate
self.tokens = config.initial_tokens or config.capacity
self.last_refill = time.monotonic()
self._lock = Lock()
def allow(self, tokens_needed: int = 1) -> bool:
"""
Check if request is allowed and consume tokens.
Args:
tokens_needed: Number of tokens to consume (default: 1)
Returns:
True if allowed, False if rate limited
"""
with self._lock:
self._refill()
if self.tokens >= tokens_needed:
self.tokens -= tokens_needed
return True
return False
def try_acquire(self, tokens_needed: int = 1) -> tuple[bool, float]:
"""
Try to acquire tokens, return wait time if not available.
Returns:
(allowed, wait_time_seconds)
If allowed=True, wait_time=0
If allowed=False, wait_time=seconds until tokens available
"""
with self._lock:
self._refill()
if self.tokens >= tokens_needed:
self.tokens -= tokens_needed
return True, 0.0
# Calculate wait time
tokens_deficit = tokens_needed - self.tokens
wait_time = tokens_deficit / self.refill_rate
return False, wait_time
def _refill(self) -> None:
"""Add tokens based on elapsed time."""
now = time.monotonic()
elapsed = now - self.last_refill
# Add tokens proportional to time
tokens_to_add = elapsed * self.refill_rate
self.tokens = min(self.capacity, self.tokens + tokens_to_add)
self.last_refill = now
@property
def available_tokens(self) -> float:
"""Get current token count (for monitoring)."""
with self._lock:
self._refill()
return self.tokens
class RateLimiter:
"""
Multi-key rate limiter (e.g., per-user, per-IP).
Use Case:
- Each user has their own bucket
- Shared across API endpoints
"""
def __init__(self, default_config: RateLimitConfig) -> None:
self.default_config = default_config
self._buckets: dict[str, TokenBucket] = {}
self._lock = Lock()
def allow(self, key: str, tokens: int = 1) -> bool:
"""
Check if request for given key is allowed.
Args:
key: Identifier (e.g., user_id, IP address)
tokens: Tokens to consume
Returns:
True if allowed
"""
bucket = self._get_or_create_bucket(key)
return bucket.allow(tokens)
def _get_or_create_bucket(self, key: str) -> TokenBucket:
"""Get existing bucket or create new one."""
if key not in self._buckets:
with self._lock:
if key not in self._buckets:
self._buckets[key] = TokenBucket(self.default_config)
return self._buckets[key]
def get_status(self, key: str) -> dict:
"""Get rate limit status for key."""
bucket = self._get_or_create_bucket(key)
return {
"available_tokens": bucket.available_tokens,
"capacity": bucket.capacity,
"refill_rate": bucket.refill_rate,
}
# ============================================
# DECORATOR FOR FLASK/FASTAPI
# ============================================
def rate_limit(limiter: RateLimiter, key_func, tokens: int = 1):
"""
Decorator for rate limiting endpoints.
Usage:
@rate_limit(limiter, key_func=lambda: request.remote_addr)
def my_endpoint():
...
"""
def decorator(func):
def wrapper(*args, **kwargs):
key = key_func()
if not limiter.allow(key, tokens):
# Return HTTP 429 Too Many Requests
return {"error": "Rate limit exceeded"}, 429
return func(*args, **kwargs)
return wrapper
return decorator
# ============================================
# USAGE EXAMPLE
# ============================================
if __name__ == "__main__":
# Create rate limiter: 10 requests/second, burst up to 20
config = RateLimitConfig(capacity=20, refill_rate=10)
limiter = RateLimiter(config)
print("=== Rate Limiting Demo ===\n")
user_id = "user_123"
# Simulate burst of requests
print("Sending 25 requests burst...")
allowed = 0
rejected = 0
for i in range(25):
if limiter.allow(user_id):
allowed += 1
else:
rejected += 1
print(f"Allowed: {allowed}, Rejected: {rejected}")
print(f"Status: {limiter.get_status(user_id)}")
# Wait for refill
print("\nWaiting 1 second for refill...")
time.sleep(1)
# Try again
print("Sending 15 more requests...")
allowed = 0
rejected = 0
for i in range(15):
if limiter.allow(user_id):
allowed += 1
else:
rejected += 1
print(f"Allowed: {allowed}, Rejected: {rejected}")
# Demonstrate try_acquire with wait time
print("\n=== Wait Time Calculation ===")
bucket = TokenBucket(RateLimitConfig(capacity=5, refill_rate=2))
# Use all tokens
for _ in range(5):
bucket.allow()
allowed, wait_time = bucket.try_acquire()
print(f"Request blocked. Wait {wait_time:.2f} seconds for next token.")Ứng dụng Thực tế
| System | Implementation |
|---|---|
| Nginx | limit_req_zone directive |
| AWS API Gateway | Usage plans + API keys |
| Redis | redis-cell module (GCRA algorithm) |
| Kong | Rate Limiting plugin |
| Cloudflare | Rate Limiting Rules |
Nginx Example
nginx
# Define rate limit zone
limit_req_zone $binary_remote_addr zone=api_limit:10m rate=10r/s;
server {
location /api/ {
# Allow burst of 20, no delay for first 10
limit_req zone=api_limit burst=20 nodelay;
# Return 429 when rate limited
limit_req_status 429;
proxy_pass http://backend;
}
}Variants: Leaky Bucket vs Token Bucket
| Feature | Token Bucket | Leaky Bucket |
|---|---|---|
| Burst | ✅ Allowed | ❌ No burst |
| Traffic shape | Bursty | Smooth |
| Use case | API limiting | Network QoS |
| Implementation | Simpler | Needs queue |
Anti-patterns
❌ TRÁNH
1. Global rate limit thay vì per-user
python
# ❌ WRONG: 1 user có thể dùng hết quota của mọi người
global_limiter = TokenBucket(capacity=1000)
def api_endpoint():
if global_limiter.allow():
return "OK"
# ✅ RIGHT: Per-user rate limiting
def api_endpoint(user_id):
if per_user_limiter.allow(user_id):
return "OK"2. Capacity quá thấp
python
# ❌ WRONG: Legitimate burst bị block
config = RateLimitConfig(capacity=1, refill_rate=10)
# User gửi 5 requests cùng lúc (normal browser behavior)
# → 4 requests bị reject!
# ✅ RIGHT: Reasonable burst capacity
config = RateLimitConfig(capacity=20, refill_rate=10)3. Không trả về Retry-After header
python
# ❌ WRONG: Client không biết khi nào thử lại
return {"error": "Rate limited"}, 429
# ✅ RIGHT: Include wait time
allowed, wait_time = bucket.try_acquire()
if not allowed:
return (
{"error": "Rate limited", "retry_after": wait_time},
429,
{"Retry-After": str(int(wait_time + 1))}
)💡 HPN's Rule
"Mọi public API PHẢI có rate limiting. Không có ngoại lệ."