Skip to content

Secure Coding Patterns — Mẫu Lập Trình An Toàn

Tháng 3/2023, một startup fintech Đông Nam Á phát hiện toàn bộ mật khẩu người dùng bị lộ — không phải vì bị hack, mà vì developer dùng MD5 hash và commit JWT secret vào source code. Kẻ tấn công clone repo, decode JWT, brute-force MD5 trong vài giờ. Kết quả: 120.000 tài khoản bị xâm phạm, 2 triệu USD bồi thường.

Secure coding bắt đầu từ từng dòng code: mỗi hàm xử lý input, mỗi lần hash mật khẩu, mỗi token được tạo ra đều là cánh cửa có thể mở toang nếu thiết kế sai. Bài này đi sâu vào các mẫu lập trình an toàn — từ nguyên tắc least privilege, defense in depth, đến implementation với cryptography, JWT, và security headers.

Bức tranh tư duy

Hệ thống bảo mật giống ngôi nhà cổ truyền Việt Nam — nhiều lớp bảo vệ chồng nhau:

┌────────────────────────────────────────────────┐
│            🏯 NGÔI NHÀ AN TOÀN                 │
│  ┌─ Hàng rào ──────────────────────────────┐  │
│  │  CORS + Security Headers (CSP, HSTS)     │  │
│  │  ┌─ Cổng chính ─────────────────────┐   │  │
│  │  │  Authentication (JWT, session)     │   │  │
│  │  │  ┌─ Phòng khách ─────────────┐   │   │  │
│  │  │  │  Authorization (RBAC)      │   │   │  │
│  │  │  │  ┌─ Két sắt ─────────┐   │   │   │  │
│  │  │  │  │  Encryption/Hash   │   │   │   │  │
│  │  │  │  └────────────────────┘   │   │   │  │
│  │  │  └────────────────────────────┘   │   │  │
│  │  └───────────────────────────────────┘   │  │
│  └──────────────────────────────────────────┘  │
│  Kẻ tấn công phải vượt TẤT CẢ các lớp        │
└────────────────────────────────────────────────┘

Mỗi lớp hoạt động độc lập — kẻ trộm trèo rào vẫn phải phá cổng, vượt sân, rồi mở két.

Nguyên tắcÝ nghĩaVí dụ
Least PrivilegeChỉ cấp quyền tối thiểuDB user chỉ SELECT, không DROP
Defense in DepthNhiều lớp chồng nhauValidate + parameterized query + WAF
Secure DefaultsMặc định phải an toànCORS deny-all, strict CSP
Fail-SafeLỗi → trạng thái an toànException → deny access

Cốt lõi kỹ thuật

Least Privilege — Đặc quyền tối thiểu

python
"""least_privilege.py"""
from __future__ import annotations
import os
from dataclasses import dataclass
from enum import Enum, auto


class Permission(Enum):
    READ = auto()
    WRITE = auto()
    DELETE = auto()
    ADMIN = auto()


@dataclass(frozen=True)
class Role:
    name: str
    permissions: frozenset[Permission]

    def has_permission(self, perm: Permission) -> bool:
        return perm in self.permissions


ROLES = {
    "viewer": Role("viewer", frozenset({Permission.READ})),
    "editor": Role("editor", frozenset({Permission.READ, Permission.WRITE})),
    "admin": Role("admin", frozenset(Permission)),
}


@dataclass
class ServiceAccount:
    name: str
    role: Role
    allowed_resources: frozenset[str]

    def can_access(self, resource: str, action: Permission) -> bool:
        return resource in self.allowed_resources and self.role.has_permission(action)


def create_readonly_db_connection(host: str, database: str) -> dict:
    """DB connection READ-ONLY — không dùng admin credentials."""
    return {
        "host": host, "database": database,
        "user": os.environ.get("DB_READONLY_USER", "app_reader"),
        "password": os.environ.get("DB_READONLY_PASSWORD", ""),
        "options": "-c default_transaction_read_only=on",
    }

Defense in Depth — Validate ở mọi tầng

python
"""defense_in_depth.py"""
from __future__ import annotations
import html, re
from dataclasses import dataclass
from typing import Any


@dataclass(frozen=True)
class ValidationResult:
    is_valid: bool
    errors: tuple[str, ...] = ()


# LỚP 1: Input Validation
def validate_email(email: str) -> ValidationResult:
    if not email or len(email) > 254:
        return ValidationResult(False, ("Email không hợp lệ.",))
    if not re.match(r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$", email):
        return ValidationResult(False, ("Định dạng email sai.",))
    return ValidationResult(True)


def sanitize_input(raw: str, *, max_length: int = 500) -> str:
    return html.escape(raw[:max_length], quote=True).replace("\x00", "").strip()


# LỚP 2: Business Logic Validation
def validate_transfer(
    amount: float, balance: float,
    *, daily_limit: float = 50_000_000.0, daily_spent: float = 0.0,
) -> ValidationResult:
    errors: list[str] = []
    if amount <= 0:
        errors.append("Số tiền phải > 0.")
    if amount > balance:
        errors.append("Số dư không đủ.")
    if daily_spent + amount > daily_limit:
        errors.append(f"Vượt hạn mức ngày (còn {daily_limit - daily_spent:,.0f} VNĐ).")
    return ValidationResult(len(errors) == 0, tuple(errors))


# LỚP 3: Parameterized Queries — KHÔNG BAO GIỜ string format
def get_user_by_email(cursor: Any, email: str) -> dict | None:
    cursor.execute("SELECT id, email, name FROM users WHERE email = %s", (email,))
    row = cursor.fetchone()
    return {"id": row[0], "email": row[1], "name": row[2]} if row else None

Cryptography — Fernet Symmetric Encryption

Fernet cung cấp authenticated encryption (AES-128-CBC + HMAC-SHA256) — đảm bảo cả bí mật lẫn toàn vẹn.

python
"""encryption_service.py"""
from __future__ import annotations
import base64, os
from dataclasses import dataclass
from cryptography.fernet import Fernet, InvalidToken, MultiFernet


@dataclass
class EncryptionService:
    _fernet: MultiFernet

    @classmethod
    def from_env(cls) -> EncryptionService:
        current = os.environ["ENCRYPTION_KEY_CURRENT"]
        previous = os.environ.get("ENCRYPTION_KEY_PREVIOUS", "")
        keys = [Fernet(current.encode())]
        if previous:
            keys.append(Fernet(previous.encode()))
        return cls(_fernet=MultiFernet(keys))

    @staticmethod
    def generate_key() -> str:
        return Fernet.generate_key().decode()

    def encrypt(self, plaintext: str) -> str:
        token = self._fernet.encrypt(plaintext.encode("utf-8"))
        return base64.urlsafe_b64encode(token).decode("ascii")

    def decrypt(self, ciphertext: str) -> str:
        try:
            token = base64.urlsafe_b64decode(ciphertext.encode("ascii"))
            return self._fernet.decrypt(token).decode("utf-8")
        except InvalidToken:
            raise ValueError("Giải mã thất bại: dữ liệu hỏng hoặc key sai.")

    def rotate(self, old_ciphertext: str) -> str:
        """Re-encrypt với key mới nhất — dùng khi key rotation."""
        token = base64.urlsafe_b64decode(old_ciphertext.encode("ascii"))
        return base64.urlsafe_b64encode(self._fernet.rotate(token)).decode("ascii")

Password Hashing — bcrypt và argon2

Không bao giờ dùng MD5/SHA cho password. Luôn dùng adaptive hashing — bcrypt hoặc argon2id.

python
"""password_service.py"""
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass
import argon2, bcrypt


class PasswordHasher(ABC):
    @abstractmethod
    def hash_password(self, password: str) -> str: ...
    @abstractmethod
    def verify_password(self, password: str, hashed: str) -> bool: ...
    @abstractmethod
    def needs_rehash(self, hashed: str) -> bool: ...


@dataclass
class BcryptHasher(PasswordHasher):
    rounds: int = 12  # ≈ 250ms per hash

    def hash_password(self, password: str) -> str:
        return bcrypt.hashpw(
            password.encode("utf-8"), bcrypt.gensalt(rounds=self.rounds),
        ).decode("ascii")

    def verify_password(self, password: str, hashed: str) -> bool:
        return bcrypt.checkpw(password.encode("utf-8"), hashed.encode("ascii"))

    def needs_rehash(self, hashed: str) -> bool:
        return not hashed.startswith(f"$2b${self.rounds:02d}$")


@dataclass
class Argon2Hasher(PasswordHasher):
    """argon2id — memory-hard, chống GPU/ASIC brute-force."""
    time_cost: int = 2
    memory_cost: int = 65536  # 64 MB

    def __post_init__(self) -> None:
        self._h = argon2.PasswordHasher(
            time_cost=self.time_cost, memory_cost=self.memory_cost,
            parallelism=1, type=argon2.Type.ID,
        )

    def hash_password(self, password: str) -> str:
        return self._h.hash(password)

    def verify_password(self, password: str, hashed: str) -> bool:
        try:
            return self._h.verify(hashed, password)
        except argon2.exceptions.VerifyMismatchError:
            return False

    def needs_rehash(self, hashed: str) -> bool:
        return self._h.check_needs_rehash(hashed)

JWT Token Handling

python
"""jwt_service.py"""
from __future__ import annotations
import time
from dataclasses import dataclass, field
from typing import Any
import jwt


@dataclass
class JWTConfig:
    secret_key: str
    algorithm: str = "HS256"
    access_ttl: int = 900       # 15 phút
    refresh_ttl: int = 604_800  # 7 ngày
    issuer: str = "penalgo-api"
    audience: str = "penalgo-web"


@dataclass
class JWTService:
    config: JWTConfig
    _revoked: set[str] = field(default_factory=set)

    def create_access_token(self, user_id: str, roles: list[str]) -> str:
        now = int(time.time())
        return jwt.encode({
            "sub": user_id, "roles": roles, "type": "access",
            "iat": now, "exp": now + self.config.access_ttl,
            "iss": self.config.issuer, "aud": self.config.audience,
        }, self.config.secret_key, algorithm=self.config.algorithm)

    def create_refresh_token(self, user_id: str) -> str:
        now = int(time.time())
        return jwt.encode({
            "sub": user_id, "type": "refresh",
            "iat": now, "exp": now + self.config.refresh_ttl,
            "iss": self.config.issuer, "aud": self.config.audience,
        }, self.config.secret_key, algorithm=self.config.algorithm)

    def verify_token(self, token: str, *, expected_type: str = "access") -> dict:
        if token in self._revoked:
            raise jwt.InvalidTokenError("Token đã bị thu hồi.")
        payload = jwt.decode(
            token, self.config.secret_key,
            algorithms=[self.config.algorithm],
            issuer=self.config.issuer, audience=self.config.audience,
            options={"require": ["exp", "iss", "aud", "sub", "type"]},
        )
        if payload.get("type") != expected_type:
            raise jwt.InvalidTokenError(f"Sai token type: {payload.get('type')}")
        return payload

    def revoke(self, token: str) -> None:
        self._revoked.add(token)

Security Headers và CORS

python
"""security_middleware.py"""
from __future__ import annotations
from dataclasses import dataclass


@dataclass(frozen=True)
class SecurityHeadersConfig:
    csp: str = (
        "default-src 'self'; script-src 'self'; "
        "style-src 'self' 'unsafe-inline'; img-src 'self' data: https:; "
        "frame-ancestors 'none'; base-uri 'self'; form-action 'self'"
    )
    hsts: str = "max-age=63072000; includeSubDomains; preload"
    x_content_type: str = "nosniff"
    x_frame: str = "DENY"
    referrer: str = "strict-origin-when-cross-origin"
    permissions: str = "camera=(), microphone=(), geolocation=()"


def build_security_headers(cfg: SecurityHeadersConfig | None = None) -> dict[str, str]:
    c = cfg or SecurityHeadersConfig()
    return {
        "Content-Security-Policy": c.csp,
        "Strict-Transport-Security": c.hsts,
        "X-Content-Type-Options": c.x_content_type,
        "X-Frame-Options": c.x_frame,
        "Referrer-Policy": c.referrer,
        "Permissions-Policy": c.permissions,
    }


@dataclass(frozen=True)
class CORSConfig:
    """Mặc định DENY ALL — chỉ mở đúng origin cần thiết."""
    allowed_origins: frozenset[str] = frozenset()
    allowed_methods: frozenset[str] = frozenset({"GET", "POST", "OPTIONS"})
    allowed_headers: frozenset[str] = frozenset({"Authorization", "Content-Type"})
    allow_credentials: bool = False
    max_age: int = 3600

    def is_origin_allowed(self, origin: str) -> bool:
        return origin in self.allowed_origins


production_cors = CORSConfig(
    allowed_origins=frozenset({"https://penalgo.com", "https://app.penalgo.com"}),
    allowed_methods=frozenset({"GET", "POST", "PUT", "DELETE", "OPTIONS"}),
    allow_credentials=True,
)

Thực chiến

Security Review Automation cho CI/CD

Hệ thống scanner tự động kiểm tra security patterns trước deploy:

python
"""security_reviewer.py - Chạy trong CI/CD pipeline"""
from __future__ import annotations
import re, sys
from dataclasses import dataclass, field
from enum import Enum, auto
from pathlib import Path


class Severity(Enum):
    CRITICAL = auto()
    HIGH = auto()


@dataclass(frozen=True)
class Finding:
    severity: Severity
    rule: str
    message: str
    path: str
    line: int
    fix: str


@dataclass
class SecurityReviewer:
    findings: list[Finding] = field(default_factory=list)

    RULES: dict = field(default_factory=lambda: {
        "secrets": (
            Severity.CRITICAL,
            r'(?i)(password|secret|api_key)\s*=\s*["\'][^"\']+["\']',
            "Hardcoded secret", "Dùng env vars hoặc secret manager.",
        ),
        "eval": (
            Severity.HIGH, r'\beval\s*\(', "eval() detected",
            "Dùng ast.literal_eval().",
        ),
        "sql_inject": (
            Severity.CRITICAL, r'execute\s*\(\s*f["\']',
            "SQL injection risk", "Dùng parameterized queries.",
        ),
        "weak_hash": (
            Severity.HIGH, r'hashlib\.(md5|sha1)\b',
            "Weak hash algorithm", "Dùng SHA-256+ hoặc bcrypt.",
        ),
    })

    def review(self, directory: Path) -> list[Finding]:
        self.findings.clear()
        for py_file in directory.rglob("*.py"):
            src = py_file.read_text(encoding="utf-8")
            for rule_id, (sev, pattern, msg, fix) in self.RULES.items():
                for m in re.finditer(pattern, src):
                    self.findings.append(Finding(
                        sev, rule_id, msg, str(py_file),
                        src[:m.start()].count("\n") + 1, fix,
                    ))
        return sorted(self.findings, key=lambda f: f.severity.value)

    def report(self) -> str:
        if not self.findings:
            return "✅ Không phát hiện vấn đề bảo mật."
        lines = [f"🔍 {len(self.findings)} findings"]
        for f in self.findings:
            lines.append(f"[{f.severity.name}] {f.path}:{f.line}{f.message}")
        critical = sum(1 for f in self.findings if f.severity == Severity.CRITICAL)
        if critical:
            lines.append(f"🚫 {critical} CRITICAL — DEPLOY BLOCKED")
        return "\n".join(lines)


if __name__ == "__main__":
    r = SecurityReviewer()
    r.review(Path("src/"))
    print(r.report())
    if any(f.severity == Severity.CRITICAL for f in r.findings):
        sys.exit(1)

Sai lầm điển hình

SAI: Hash mật khẩu bằng SHA-256

python
# ⛔ SHA-256 không phải password hashing algorithm
import hashlib
def hash_wrong(pw: str) -> str:
    return hashlib.sha256(pw.encode()).hexdigest()
    # Không salt → rainbow table | Quá nhanh → brute-force | Không adaptive

ĐÚNG: Dùng argon2id

python
# ✅ argon2id — chuẩn hiện đại, memory-hard
from argon2 import PasswordHasher
hasher = PasswordHasher(time_cost=2, memory_cost=65536)
def hash_correct(pw: str) -> str:
    return hasher.hash(pw)  # $argon2id$v=19$m=65536,t=2,p=1$salt$hash

SAI: So sánh token bằng ==

python
# ⛔ Timing attack — dừng sớm khi khác → lộ thông tin
def verify_wrong(provided: str, stored: str) -> bool:
    return provided == stored

ĐÚNG: Constant-time comparison

python
# ✅ Luôn mất cùng thời gian bất kể match hay không
import hmac
def verify_correct(provided: str, stored: str) -> bool:
    return hmac.compare_digest(provided.encode(), stored.encode())

SAI: JWT không verify đầy đủ

python
# ⛔ Thiếu issuer, audience, required claims
import jwt
def decode_wrong(token: str, secret: str) -> dict:
    return jwt.decode(token, secret, algorithms=["HS256"])

ĐÚNG: Verify tất cả claims

python
# ✅ Verify đầy đủ
import jwt
def decode_correct(token: str, secret: str) -> dict:
    return jwt.decode(
        token, secret, algorithms=["HS256"],
        issuer="penalgo-api", audience="penalgo-web",
        options={"require": ["exp", "iss", "aud", "sub"], "verify_exp": True},
    )

SAI: CORS allow all + credentials

python
# ⛔ allow_origins: * + credentials: True = thảm họa
cors = {"allow_origins": ["*"], "allow_credentials": True}

ĐÚNG: Whitelist cụ thể

python
cors = {
    "allow_origins": ["https://penalgo.com", "https://app.penalgo.com"],
    "allow_methods": ["GET", "POST", "PUT", "DELETE"],
    "allow_credentials": True,
}

SAI: Secret trong source code

python
# ⛔ Secret trong code = trong Git history mãi mãi
JWT_SECRET = "super-secret-key-2024"

ĐÚNG: Environment variables + fail-fast

python
import os, sys
def require_env(name: str) -> str:
    val = os.environ.get(name)
    if not val:
        sys.exit(f"FATAL: Missing env var: {name}")
    return val
JWT_SECRET = require_env("JWT_SECRET_KEY")

Under the Hood

bcrypt Internals

Input: password + cost_factor (vd: 12)

1. Generate random 128-bit salt
2. Key schedule: lặp 2^cost lần
   cost=12 → 4096 iterations ≈ 250ms
3. Encrypt "OrpheanBeholderScryDoubt" 64 lần (Blowfish)
4. Output: $2b$12$salt_22chars_hash_31chars
            │  │  │               │
            │  │  └─ salt ────────└─ hash
            │  └─ cost factor
            └─ algorithm

An toàn: salt ngẫu nhiên, cost tăng được, chống parallel attack

argon2id — Memory-Hard Design

argon2id kết hợp argon2d (chống GPU) và argon2i (chống side-channel):

Parameters: time=2, memory=64MB, parallelism=1

Phase 1: Fill 64MB memory blocks
  B0 → B1 → B2 → ... → Bn
  (mỗi block phụ thuộc blocks trước)

Phase 2: Iterate time_cost lần
  Trộn lại blocks → buộc dùng đủ memory

Phase 3: XOR cột cuối → output hash

Chống GPU: 64MB/hash → GPU chỉ chạy vài instance
thay vì hàng nghìn như SHA-256

JWT Token Anatomy

eyJhbGci...  .  eyJzdWIi...  .  SflKxw...
──────────      ─────────────     ─────────
  HEADER          PAYLOAD         SIGNATURE

HEADER:    {"alg": "HS256", "typ": "JWT"}
PAYLOAD:   {"sub": "user_123", "exp": 1700000000, "roles": [...]}
SIGNATURE: HMACSHA256(header + "." + payload, secret)

⚠️ PAYLOAD KHÔNG MÃ HÓA — chỉ Base64 encoded!
→ Không lưu sensitive data trong payload

Timing-Safe Comparison

python
# == dừng ngay khi khác ký tự → attacker đo thời gian đoán từng ký tự
# hmac.compare_digest() XOR từng byte, OR kết quả → cùng thời gian luôn
import hmac
safe = hmac.compare_digest(b"secret_a", b"secret_b")  # constant-time

Checklist ghi nhớ

✅ Checklist triển khai

Secure Coding Patterns

  • [ ] Mỗi service account chỉ có quyền tối thiểu (least privilege)
  • [ ] Validate input ở mọi tầng: client → gateway → service → database
  • [ ] Config mặc định ở trạng thái an toàn nhất (deny-all CORS, strict CSP)
  • [ ] Exception → deny access, không default allow (fail-safe)
  • [ ] Password hashing: chỉ bcrypt (cost ≥ 12) hoặc argon2id
  • [ ] Symmetric encryption: Fernet + MultiFernet cho key rotation
  • [ ] JWT: verify exp, iss, aud; chỉ algorithms cụ thể; require claims
  • [ ] JWT payload chỉ encoded — không lưu sensitive data
  • [ ] Implement token revocation (Redis blacklist) cho logout
  • [ ] Secret comparison: luôn hmac.compare_digest()
  • [ ] Security headers trên mọi response: CSP, HSTS, X-Frame-Options
  • [ ] CORS: không * + credentials: true
  • [ ] Secrets từ env vars, validate khi startup (fail-fast)
  • [ ] Security scanner trong CI/CD — block deploy khi CRITICAL

Bài tập luyện tập

Bài 1: Secure User Registration

🧠 Quiz

Viết hàm register_user(email, password, display_name): validate email format, password strength (≥12 ký tự, chữ hoa/thường, số, đặc biệt), hash bằng argon2id, trả về User hoặc danh sách lỗi.

💡 Lời giải
python
from __future__ import annotations
import re, uuid
from dataclasses import dataclass
from typing import Union
from argon2 import PasswordHasher


@dataclass(frozen=True)
class User:
    id: str
    email: str
    display_name: str
    password_hash: str

@dataclass(frozen=True)
class RegError:
    errors: list[str]


def register_user(
    email: str, password: str, display_name: str,
) -> Union[User, RegError]:
    errors: list[str] = []
    if not re.match(r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$", email):
        errors.append("Email không hợp lệ.")
    if len(display_name.strip()) < 2:
        errors.append("Tên ≥ 2 ký tự.")
    checks = [
        (len(password) >= 12, "≥ 12 ký tự"),
        (any(c.isupper() for c in password), "1 chữ hoa"),
        (any(c.islower() for c in password), "1 chữ thường"),
        (any(c.isdigit() for c in password), "1 chữ số"),
        (any(c in "!@#$%^&*()-_=+[]{}|;:,.<>?" for c in password), "1 ký tự đặc biệt"),
    ]
    errors.extend(f"Mật khẩu cần {msg}." for ok, msg in checks if not ok)
    if errors:
        return RegError(errors)

    hasher = PasswordHasher(time_cost=2, memory_cost=65536)
    return User(str(uuid.uuid4()), email.lower().strip(),
                display_name.strip(), hasher.hash(password))

Validate TẤT CẢ trước khi hash — tránh tốn CPU cho invalid data.

Bài 2: Security Header Audit

🧠 Quiz

Viết audit_headers(headers: dict) -> AuditResult kiểm tra response có đủ security headers (CSP, HSTS, X-Content-Type-Options, X-Frame-Options, Referrer-Policy). Trả về missing, misconfigured, và score 0-100.

💡 Lời giải
python
from dataclasses import dataclass

@dataclass(frozen=True)
class AuditResult:
    missing: list[str]
    misconfigured: list[tuple[str, str, str]]
    score: int

def audit_headers(headers: dict[str, str]) -> AuditResult:
    norm = {k.lower(): v for k, v in headers.items()}
    checks = {
        "content-security-policy": ("contains", "default-src"),
        "strict-transport-security": ("contains", "max-age="),
        "x-content-type-options": ("exact", "nosniff"),
        "x-frame-options": ("oneof", ["DENY", "SAMEORIGIN"]),
        "referrer-policy": ("oneof", ["no-referrer", "strict-origin-when-cross-origin"]),
    }
    missing, bad, passed = [], [], 0
    for header, (check_type, expected) in checks.items():
        val = norm.get(header)
        if not val:
            missing.append(header)
        elif check_type == "exact" and val != expected:
            bad.append((header, val, expected))
        elif check_type == "oneof" and val not in expected:
            bad.append((header, val, str(expected)))
        elif check_type == "contains" and expected not in val:
            bad.append((header, val, f"Must contain: {expected}"))
        else:
            passed += 1
    return AuditResult(missing, bad, int(passed / len(checks) * 100))

Bài 3: JWT Refresh Token Rotation

🧠 Quiz

Implement refresh_access_token(refresh_token) với token rotation: mỗi refresh token chỉ dùng 1 lần, tạo cặp token mới, revoke cũ. Phát hiện reuse → raise SecurityError (dấu hiệu theft).

💡 Lời giải
python
from __future__ import annotations
import time
from dataclasses import dataclass, field
import jwt

@dataclass(frozen=True)
class TokenPair:
    access_token: str
    refresh_token: str

class SecurityError(Exception):
    pass

@dataclass
class TokenRefreshService:
    secret: str
    issuer: str = "penalgo-api"
    audience: str = "penalgo-web"
    _used: set[str] = field(default_factory=set)

    def refresh_access_token(self, refresh_token: str) -> TokenPair:
        if refresh_token in self._used:
            raise SecurityError("Token reuse detected — possible theft!")

        payload = jwt.decode(
            refresh_token, self.secret, algorithms=["HS256"],
            issuer=self.issuer, audience=self.audience,
            options={"require": ["exp", "sub", "type"]},
        )
        if payload.get("type") != "refresh":
            raise jwt.InvalidTokenError("Not a refresh token.")

        self._used.add(refresh_token)
        uid, now = payload["sub"], int(time.time())
        base = {"sub": uid, "iss": self.issuer, "aud": self.audience, "iat": now}

        return TokenPair(
            jwt.encode({**base, "type": "access", "exp": now + 900}, self.secret),
            jwt.encode({**base, "type": "refresh", "exp": now + 604800}, self.secret),
        )

Rotation: mỗi refresh token dùng 1 lần. Reuse = theft signal → revoke tất cả tokens của user.

Liên kết học tiếp