Giao diện
Secure Coding Patterns — Mẫu Lập Trình An Toàn
Tháng 3/2023, một startup fintech Đông Nam Á phát hiện toàn bộ mật khẩu người dùng bị lộ — không phải vì bị hack, mà vì developer dùng MD5 hash và commit JWT secret vào source code. Kẻ tấn công clone repo, decode JWT, brute-force MD5 trong vài giờ. Kết quả: 120.000 tài khoản bị xâm phạm, 2 triệu USD bồi thường.
Secure coding bắt đầu từ từng dòng code: mỗi hàm xử lý input, mỗi lần hash mật khẩu, mỗi token được tạo ra đều là cánh cửa có thể mở toang nếu thiết kế sai. Bài này đi sâu vào các mẫu lập trình an toàn — từ nguyên tắc least privilege, defense in depth, đến implementation với cryptography, JWT, và security headers.
Bức tranh tư duy
Hệ thống bảo mật giống ngôi nhà cổ truyền Việt Nam — nhiều lớp bảo vệ chồng nhau:
┌────────────────────────────────────────────────┐
│ 🏯 NGÔI NHÀ AN TOÀN │
│ ┌─ Hàng rào ──────────────────────────────┐ │
│ │ CORS + Security Headers (CSP, HSTS) │ │
│ │ ┌─ Cổng chính ─────────────────────┐ │ │
│ │ │ Authentication (JWT, session) │ │ │
│ │ │ ┌─ Phòng khách ─────────────┐ │ │ │
│ │ │ │ Authorization (RBAC) │ │ │ │
│ │ │ │ ┌─ Két sắt ─────────┐ │ │ │ │
│ │ │ │ │ Encryption/Hash │ │ │ │ │
│ │ │ │ └────────────────────┘ │ │ │ │
│ │ │ └────────────────────────────┘ │ │ │
│ │ └───────────────────────────────────┘ │ │
│ └──────────────────────────────────────────┘ │
│ Kẻ tấn công phải vượt TẤT CẢ các lớp │
└────────────────────────────────────────────────┘Mỗi lớp hoạt động độc lập — kẻ trộm trèo rào vẫn phải phá cổng, vượt sân, rồi mở két.
| Nguyên tắc | Ý nghĩa | Ví dụ |
|---|---|---|
| Least Privilege | Chỉ cấp quyền tối thiểu | DB user chỉ SELECT, không DROP |
| Defense in Depth | Nhiều lớp chồng nhau | Validate + parameterized query + WAF |
| Secure Defaults | Mặc định phải an toàn | CORS deny-all, strict CSP |
| Fail-Safe | Lỗi → trạng thái an toàn | Exception → deny access |
Cốt lõi kỹ thuật
Least Privilege — Đặc quyền tối thiểu
python
"""least_privilege.py"""
from __future__ import annotations
import os
from dataclasses import dataclass
from enum import Enum, auto
class Permission(Enum):
READ = auto()
WRITE = auto()
DELETE = auto()
ADMIN = auto()
@dataclass(frozen=True)
class Role:
name: str
permissions: frozenset[Permission]
def has_permission(self, perm: Permission) -> bool:
return perm in self.permissions
ROLES = {
"viewer": Role("viewer", frozenset({Permission.READ})),
"editor": Role("editor", frozenset({Permission.READ, Permission.WRITE})),
"admin": Role("admin", frozenset(Permission)),
}
@dataclass
class ServiceAccount:
name: str
role: Role
allowed_resources: frozenset[str]
def can_access(self, resource: str, action: Permission) -> bool:
return resource in self.allowed_resources and self.role.has_permission(action)
def create_readonly_db_connection(host: str, database: str) -> dict:
"""DB connection READ-ONLY — không dùng admin credentials."""
return {
"host": host, "database": database,
"user": os.environ.get("DB_READONLY_USER", "app_reader"),
"password": os.environ.get("DB_READONLY_PASSWORD", ""),
"options": "-c default_transaction_read_only=on",
}Defense in Depth — Validate ở mọi tầng
python
"""defense_in_depth.py"""
from __future__ import annotations
import html, re
from dataclasses import dataclass
from typing import Any
@dataclass(frozen=True)
class ValidationResult:
is_valid: bool
errors: tuple[str, ...] = ()
# LỚP 1: Input Validation
def validate_email(email: str) -> ValidationResult:
if not email or len(email) > 254:
return ValidationResult(False, ("Email không hợp lệ.",))
if not re.match(r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$", email):
return ValidationResult(False, ("Định dạng email sai.",))
return ValidationResult(True)
def sanitize_input(raw: str, *, max_length: int = 500) -> str:
return html.escape(raw[:max_length], quote=True).replace("\x00", "").strip()
# LỚP 2: Business Logic Validation
def validate_transfer(
amount: float, balance: float,
*, daily_limit: float = 50_000_000.0, daily_spent: float = 0.0,
) -> ValidationResult:
errors: list[str] = []
if amount <= 0:
errors.append("Số tiền phải > 0.")
if amount > balance:
errors.append("Số dư không đủ.")
if daily_spent + amount > daily_limit:
errors.append(f"Vượt hạn mức ngày (còn {daily_limit - daily_spent:,.0f} VNĐ).")
return ValidationResult(len(errors) == 0, tuple(errors))
# LỚP 3: Parameterized Queries — KHÔNG BAO GIỜ string format
def get_user_by_email(cursor: Any, email: str) -> dict | None:
cursor.execute("SELECT id, email, name FROM users WHERE email = %s", (email,))
row = cursor.fetchone()
return {"id": row[0], "email": row[1], "name": row[2]} if row else NoneCryptography — Fernet Symmetric Encryption
Fernet cung cấp authenticated encryption (AES-128-CBC + HMAC-SHA256) — đảm bảo cả bí mật lẫn toàn vẹn.
python
"""encryption_service.py"""
from __future__ import annotations
import base64, os
from dataclasses import dataclass
from cryptography.fernet import Fernet, InvalidToken, MultiFernet
@dataclass
class EncryptionService:
_fernet: MultiFernet
@classmethod
def from_env(cls) -> EncryptionService:
current = os.environ["ENCRYPTION_KEY_CURRENT"]
previous = os.environ.get("ENCRYPTION_KEY_PREVIOUS", "")
keys = [Fernet(current.encode())]
if previous:
keys.append(Fernet(previous.encode()))
return cls(_fernet=MultiFernet(keys))
@staticmethod
def generate_key() -> str:
return Fernet.generate_key().decode()
def encrypt(self, plaintext: str) -> str:
token = self._fernet.encrypt(plaintext.encode("utf-8"))
return base64.urlsafe_b64encode(token).decode("ascii")
def decrypt(self, ciphertext: str) -> str:
try:
token = base64.urlsafe_b64decode(ciphertext.encode("ascii"))
return self._fernet.decrypt(token).decode("utf-8")
except InvalidToken:
raise ValueError("Giải mã thất bại: dữ liệu hỏng hoặc key sai.")
def rotate(self, old_ciphertext: str) -> str:
"""Re-encrypt với key mới nhất — dùng khi key rotation."""
token = base64.urlsafe_b64decode(old_ciphertext.encode("ascii"))
return base64.urlsafe_b64encode(self._fernet.rotate(token)).decode("ascii")Password Hashing — bcrypt và argon2
Không bao giờ dùng MD5/SHA cho password. Luôn dùng adaptive hashing — bcrypt hoặc argon2id.
python
"""password_service.py"""
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass
import argon2, bcrypt
class PasswordHasher(ABC):
@abstractmethod
def hash_password(self, password: str) -> str: ...
@abstractmethod
def verify_password(self, password: str, hashed: str) -> bool: ...
@abstractmethod
def needs_rehash(self, hashed: str) -> bool: ...
@dataclass
class BcryptHasher(PasswordHasher):
rounds: int = 12 # ≈ 250ms per hash
def hash_password(self, password: str) -> str:
return bcrypt.hashpw(
password.encode("utf-8"), bcrypt.gensalt(rounds=self.rounds),
).decode("ascii")
def verify_password(self, password: str, hashed: str) -> bool:
return bcrypt.checkpw(password.encode("utf-8"), hashed.encode("ascii"))
def needs_rehash(self, hashed: str) -> bool:
return not hashed.startswith(f"$2b${self.rounds:02d}$")
@dataclass
class Argon2Hasher(PasswordHasher):
"""argon2id — memory-hard, chống GPU/ASIC brute-force."""
time_cost: int = 2
memory_cost: int = 65536 # 64 MB
def __post_init__(self) -> None:
self._h = argon2.PasswordHasher(
time_cost=self.time_cost, memory_cost=self.memory_cost,
parallelism=1, type=argon2.Type.ID,
)
def hash_password(self, password: str) -> str:
return self._h.hash(password)
def verify_password(self, password: str, hashed: str) -> bool:
try:
return self._h.verify(hashed, password)
except argon2.exceptions.VerifyMismatchError:
return False
def needs_rehash(self, hashed: str) -> bool:
return self._h.check_needs_rehash(hashed)JWT Token Handling
python
"""jwt_service.py"""
from __future__ import annotations
import time
from dataclasses import dataclass, field
from typing import Any
import jwt
@dataclass
class JWTConfig:
secret_key: str
algorithm: str = "HS256"
access_ttl: int = 900 # 15 phút
refresh_ttl: int = 604_800 # 7 ngày
issuer: str = "penalgo-api"
audience: str = "penalgo-web"
@dataclass
class JWTService:
config: JWTConfig
_revoked: set[str] = field(default_factory=set)
def create_access_token(self, user_id: str, roles: list[str]) -> str:
now = int(time.time())
return jwt.encode({
"sub": user_id, "roles": roles, "type": "access",
"iat": now, "exp": now + self.config.access_ttl,
"iss": self.config.issuer, "aud": self.config.audience,
}, self.config.secret_key, algorithm=self.config.algorithm)
def create_refresh_token(self, user_id: str) -> str:
now = int(time.time())
return jwt.encode({
"sub": user_id, "type": "refresh",
"iat": now, "exp": now + self.config.refresh_ttl,
"iss": self.config.issuer, "aud": self.config.audience,
}, self.config.secret_key, algorithm=self.config.algorithm)
def verify_token(self, token: str, *, expected_type: str = "access") -> dict:
if token in self._revoked:
raise jwt.InvalidTokenError("Token đã bị thu hồi.")
payload = jwt.decode(
token, self.config.secret_key,
algorithms=[self.config.algorithm],
issuer=self.config.issuer, audience=self.config.audience,
options={"require": ["exp", "iss", "aud", "sub", "type"]},
)
if payload.get("type") != expected_type:
raise jwt.InvalidTokenError(f"Sai token type: {payload.get('type')}")
return payload
def revoke(self, token: str) -> None:
self._revoked.add(token)Security Headers và CORS
python
"""security_middleware.py"""
from __future__ import annotations
from dataclasses import dataclass
@dataclass(frozen=True)
class SecurityHeadersConfig:
csp: str = (
"default-src 'self'; script-src 'self'; "
"style-src 'self' 'unsafe-inline'; img-src 'self' data: https:; "
"frame-ancestors 'none'; base-uri 'self'; form-action 'self'"
)
hsts: str = "max-age=63072000; includeSubDomains; preload"
x_content_type: str = "nosniff"
x_frame: str = "DENY"
referrer: str = "strict-origin-when-cross-origin"
permissions: str = "camera=(), microphone=(), geolocation=()"
def build_security_headers(cfg: SecurityHeadersConfig | None = None) -> dict[str, str]:
c = cfg or SecurityHeadersConfig()
return {
"Content-Security-Policy": c.csp,
"Strict-Transport-Security": c.hsts,
"X-Content-Type-Options": c.x_content_type,
"X-Frame-Options": c.x_frame,
"Referrer-Policy": c.referrer,
"Permissions-Policy": c.permissions,
}
@dataclass(frozen=True)
class CORSConfig:
"""Mặc định DENY ALL — chỉ mở đúng origin cần thiết."""
allowed_origins: frozenset[str] = frozenset()
allowed_methods: frozenset[str] = frozenset({"GET", "POST", "OPTIONS"})
allowed_headers: frozenset[str] = frozenset({"Authorization", "Content-Type"})
allow_credentials: bool = False
max_age: int = 3600
def is_origin_allowed(self, origin: str) -> bool:
return origin in self.allowed_origins
production_cors = CORSConfig(
allowed_origins=frozenset({"https://penalgo.com", "https://app.penalgo.com"}),
allowed_methods=frozenset({"GET", "POST", "PUT", "DELETE", "OPTIONS"}),
allow_credentials=True,
)Thực chiến
Security Review Automation cho CI/CD
Hệ thống scanner tự động kiểm tra security patterns trước deploy:
python
"""security_reviewer.py - Chạy trong CI/CD pipeline"""
from __future__ import annotations
import re, sys
from dataclasses import dataclass, field
from enum import Enum, auto
from pathlib import Path
class Severity(Enum):
CRITICAL = auto()
HIGH = auto()
@dataclass(frozen=True)
class Finding:
severity: Severity
rule: str
message: str
path: str
line: int
fix: str
@dataclass
class SecurityReviewer:
findings: list[Finding] = field(default_factory=list)
RULES: dict = field(default_factory=lambda: {
"secrets": (
Severity.CRITICAL,
r'(?i)(password|secret|api_key)\s*=\s*["\'][^"\']+["\']',
"Hardcoded secret", "Dùng env vars hoặc secret manager.",
),
"eval": (
Severity.HIGH, r'\beval\s*\(', "eval() detected",
"Dùng ast.literal_eval().",
),
"sql_inject": (
Severity.CRITICAL, r'execute\s*\(\s*f["\']',
"SQL injection risk", "Dùng parameterized queries.",
),
"weak_hash": (
Severity.HIGH, r'hashlib\.(md5|sha1)\b',
"Weak hash algorithm", "Dùng SHA-256+ hoặc bcrypt.",
),
})
def review(self, directory: Path) -> list[Finding]:
self.findings.clear()
for py_file in directory.rglob("*.py"):
src = py_file.read_text(encoding="utf-8")
for rule_id, (sev, pattern, msg, fix) in self.RULES.items():
for m in re.finditer(pattern, src):
self.findings.append(Finding(
sev, rule_id, msg, str(py_file),
src[:m.start()].count("\n") + 1, fix,
))
return sorted(self.findings, key=lambda f: f.severity.value)
def report(self) -> str:
if not self.findings:
return "✅ Không phát hiện vấn đề bảo mật."
lines = [f"🔍 {len(self.findings)} findings"]
for f in self.findings:
lines.append(f"[{f.severity.name}] {f.path}:{f.line} — {f.message}")
critical = sum(1 for f in self.findings if f.severity == Severity.CRITICAL)
if critical:
lines.append(f"🚫 {critical} CRITICAL — DEPLOY BLOCKED")
return "\n".join(lines)
if __name__ == "__main__":
r = SecurityReviewer()
r.review(Path("src/"))
print(r.report())
if any(f.severity == Severity.CRITICAL for f in r.findings):
sys.exit(1)Sai lầm điển hình
❌ SAI: Hash mật khẩu bằng SHA-256
python
# ⛔ SHA-256 không phải password hashing algorithm
import hashlib
def hash_wrong(pw: str) -> str:
return hashlib.sha256(pw.encode()).hexdigest()
# Không salt → rainbow table | Quá nhanh → brute-force | Không adaptive✅ ĐÚNG: Dùng argon2id
python
# ✅ argon2id — chuẩn hiện đại, memory-hard
from argon2 import PasswordHasher
hasher = PasswordHasher(time_cost=2, memory_cost=65536)
def hash_correct(pw: str) -> str:
return hasher.hash(pw) # $argon2id$v=19$m=65536,t=2,p=1$salt$hash❌ SAI: So sánh token bằng ==
python
# ⛔ Timing attack — dừng sớm khi khác → lộ thông tin
def verify_wrong(provided: str, stored: str) -> bool:
return provided == stored✅ ĐÚNG: Constant-time comparison
python
# ✅ Luôn mất cùng thời gian bất kể match hay không
import hmac
def verify_correct(provided: str, stored: str) -> bool:
return hmac.compare_digest(provided.encode(), stored.encode())❌ SAI: JWT không verify đầy đủ
python
# ⛔ Thiếu issuer, audience, required claims
import jwt
def decode_wrong(token: str, secret: str) -> dict:
return jwt.decode(token, secret, algorithms=["HS256"])✅ ĐÚNG: Verify tất cả claims
python
# ✅ Verify đầy đủ
import jwt
def decode_correct(token: str, secret: str) -> dict:
return jwt.decode(
token, secret, algorithms=["HS256"],
issuer="penalgo-api", audience="penalgo-web",
options={"require": ["exp", "iss", "aud", "sub"], "verify_exp": True},
)❌ SAI: CORS allow all + credentials
python
# ⛔ allow_origins: * + credentials: True = thảm họa
cors = {"allow_origins": ["*"], "allow_credentials": True}✅ ĐÚNG: Whitelist cụ thể
python
cors = {
"allow_origins": ["https://penalgo.com", "https://app.penalgo.com"],
"allow_methods": ["GET", "POST", "PUT", "DELETE"],
"allow_credentials": True,
}❌ SAI: Secret trong source code
python
# ⛔ Secret trong code = trong Git history mãi mãi
JWT_SECRET = "super-secret-key-2024"✅ ĐÚNG: Environment variables + fail-fast
python
import os, sys
def require_env(name: str) -> str:
val = os.environ.get(name)
if not val:
sys.exit(f"FATAL: Missing env var: {name}")
return val
JWT_SECRET = require_env("JWT_SECRET_KEY")Under the Hood
bcrypt Internals
Input: password + cost_factor (vd: 12)
1. Generate random 128-bit salt
2. Key schedule: lặp 2^cost lần
cost=12 → 4096 iterations ≈ 250ms
3. Encrypt "OrpheanBeholderScryDoubt" 64 lần (Blowfish)
4. Output: $2b$12$salt_22chars_hash_31chars
│ │ │ │
│ │ └─ salt ────────└─ hash
│ └─ cost factor
└─ algorithm
An toàn: salt ngẫu nhiên, cost tăng được, chống parallel attackargon2id — Memory-Hard Design
argon2id kết hợp argon2d (chống GPU) và argon2i (chống side-channel):
Parameters: time=2, memory=64MB, parallelism=1
Phase 1: Fill 64MB memory blocks
B0 → B1 → B2 → ... → Bn
(mỗi block phụ thuộc blocks trước)
Phase 2: Iterate time_cost lần
Trộn lại blocks → buộc dùng đủ memory
Phase 3: XOR cột cuối → output hash
Chống GPU: 64MB/hash → GPU chỉ chạy vài instance
thay vì hàng nghìn như SHA-256JWT Token Anatomy
eyJhbGci... . eyJzdWIi... . SflKxw...
────────── ───────────── ─────────
HEADER PAYLOAD SIGNATURE
HEADER: {"alg": "HS256", "typ": "JWT"}
PAYLOAD: {"sub": "user_123", "exp": 1700000000, "roles": [...]}
SIGNATURE: HMACSHA256(header + "." + payload, secret)
⚠️ PAYLOAD KHÔNG MÃ HÓA — chỉ Base64 encoded!
→ Không lưu sensitive data trong payloadTiming-Safe Comparison
python
# == dừng ngay khi khác ký tự → attacker đo thời gian đoán từng ký tự
# hmac.compare_digest() XOR từng byte, OR kết quả → cùng thời gian luôn
import hmac
safe = hmac.compare_digest(b"secret_a", b"secret_b") # constant-timeChecklist ghi nhớ
✅ Checklist triển khai
Secure Coding Patterns
- [ ] Mỗi service account chỉ có quyền tối thiểu (least privilege)
- [ ] Validate input ở mọi tầng: client → gateway → service → database
- [ ] Config mặc định ở trạng thái an toàn nhất (deny-all CORS, strict CSP)
- [ ] Exception → deny access, không default allow (fail-safe)
- [ ] Password hashing: chỉ bcrypt (cost ≥ 12) hoặc argon2id
- [ ] Symmetric encryption: Fernet + MultiFernet cho key rotation
- [ ] JWT: verify exp, iss, aud; chỉ algorithms cụ thể; require claims
- [ ] JWT payload chỉ encoded — không lưu sensitive data
- [ ] Implement token revocation (Redis blacklist) cho logout
- [ ] Secret comparison: luôn
hmac.compare_digest() - [ ] Security headers trên mọi response: CSP, HSTS, X-Frame-Options
- [ ] CORS: không
*+credentials: true - [ ] Secrets từ env vars, validate khi startup (fail-fast)
- [ ] Security scanner trong CI/CD — block deploy khi CRITICAL
Bài tập luyện tập
Bài 1: Secure User Registration
🧠 Quiz
Viết hàm register_user(email, password, display_name): validate email format, password strength (≥12 ký tự, chữ hoa/thường, số, đặc biệt), hash bằng argon2id, trả về User hoặc danh sách lỗi.
💡 Lời giải
python
from __future__ import annotations
import re, uuid
from dataclasses import dataclass
from typing import Union
from argon2 import PasswordHasher
@dataclass(frozen=True)
class User:
id: str
email: str
display_name: str
password_hash: str
@dataclass(frozen=True)
class RegError:
errors: list[str]
def register_user(
email: str, password: str, display_name: str,
) -> Union[User, RegError]:
errors: list[str] = []
if not re.match(r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$", email):
errors.append("Email không hợp lệ.")
if len(display_name.strip()) < 2:
errors.append("Tên ≥ 2 ký tự.")
checks = [
(len(password) >= 12, "≥ 12 ký tự"),
(any(c.isupper() for c in password), "1 chữ hoa"),
(any(c.islower() for c in password), "1 chữ thường"),
(any(c.isdigit() for c in password), "1 chữ số"),
(any(c in "!@#$%^&*()-_=+[]{}|;:,.<>?" for c in password), "1 ký tự đặc biệt"),
]
errors.extend(f"Mật khẩu cần {msg}." for ok, msg in checks if not ok)
if errors:
return RegError(errors)
hasher = PasswordHasher(time_cost=2, memory_cost=65536)
return User(str(uuid.uuid4()), email.lower().strip(),
display_name.strip(), hasher.hash(password))Validate TẤT CẢ trước khi hash — tránh tốn CPU cho invalid data.
Bài 2: Security Header Audit
🧠 Quiz
Viết audit_headers(headers: dict) -> AuditResult kiểm tra response có đủ security headers (CSP, HSTS, X-Content-Type-Options, X-Frame-Options, Referrer-Policy). Trả về missing, misconfigured, và score 0-100.
💡 Lời giải
python
from dataclasses import dataclass
@dataclass(frozen=True)
class AuditResult:
missing: list[str]
misconfigured: list[tuple[str, str, str]]
score: int
def audit_headers(headers: dict[str, str]) -> AuditResult:
norm = {k.lower(): v for k, v in headers.items()}
checks = {
"content-security-policy": ("contains", "default-src"),
"strict-transport-security": ("contains", "max-age="),
"x-content-type-options": ("exact", "nosniff"),
"x-frame-options": ("oneof", ["DENY", "SAMEORIGIN"]),
"referrer-policy": ("oneof", ["no-referrer", "strict-origin-when-cross-origin"]),
}
missing, bad, passed = [], [], 0
for header, (check_type, expected) in checks.items():
val = norm.get(header)
if not val:
missing.append(header)
elif check_type == "exact" and val != expected:
bad.append((header, val, expected))
elif check_type == "oneof" and val not in expected:
bad.append((header, val, str(expected)))
elif check_type == "contains" and expected not in val:
bad.append((header, val, f"Must contain: {expected}"))
else:
passed += 1
return AuditResult(missing, bad, int(passed / len(checks) * 100))Bài 3: JWT Refresh Token Rotation
🧠 Quiz
Implement refresh_access_token(refresh_token) với token rotation: mỗi refresh token chỉ dùng 1 lần, tạo cặp token mới, revoke cũ. Phát hiện reuse → raise SecurityError (dấu hiệu theft).
💡 Lời giải
python
from __future__ import annotations
import time
from dataclasses import dataclass, field
import jwt
@dataclass(frozen=True)
class TokenPair:
access_token: str
refresh_token: str
class SecurityError(Exception):
pass
@dataclass
class TokenRefreshService:
secret: str
issuer: str = "penalgo-api"
audience: str = "penalgo-web"
_used: set[str] = field(default_factory=set)
def refresh_access_token(self, refresh_token: str) -> TokenPair:
if refresh_token in self._used:
raise SecurityError("Token reuse detected — possible theft!")
payload = jwt.decode(
refresh_token, self.secret, algorithms=["HS256"],
issuer=self.issuer, audience=self.audience,
options={"require": ["exp", "sub", "type"]},
)
if payload.get("type") != "refresh":
raise jwt.InvalidTokenError("Not a refresh token.")
self._used.add(refresh_token)
uid, now = payload["sub"], int(time.time())
base = {"sub": uid, "iss": self.issuer, "aud": self.audience, "iat": now}
return TokenPair(
jwt.encode({**base, "type": "access", "exp": now + 900}, self.secret),
jwt.encode({**base, "type": "refresh", "exp": now + 604800}, self.secret),
)Rotation: mỗi refresh token dùng 1 lần. Reuse = theft signal → revoke tất cả tokens của user.