Skip to content

Secrets Management — Quản lý bí mật trong Production

Tháng 3 năm 2023, một kỹ sư backend tại một startup fintech Việt Nam đã push một commit chứa AWS access key trực tiếp trong source code. Chỉ trong vòng 11 phút, bot scanner trên GitHub đã phát hiện và khai thác key đó — spin lên 47 EC2 instance để đào cryptocurrency. Hóa đơn AWS vào cuối tháng: $23,000 USD. Toàn bộ sự việc bắt nguồn từ một dòng code duy nhất: AWS_SECRET_KEY = "wJalrXUtnFEMI...".

Secrets management không phải là "nice-to-have" — nó là yêu cầu bắt buộc trong bất kỳ hệ thống production nào. Mỗi credential bị lộ là một cánh cửa mở toang cho attacker: database connection strings, API keys, encryption keys, OAuth tokens. Python cung cấp module secrets chuyên dụng cho việc sinh token an toàn, nhưng quản lý bí mật trong thực tế đòi hỏi kiến trúc nhiều tầng — từ environment variables, đến vault systems, đến automated key rotation.

Bài viết này đưa bạn từ nền tảng module secrets đến kiến trúc credential management cho microservices, bao gồm tích hợp HashiCorp Vault, AWS Secrets Manager, và chiến lược rotation không downtime.

Bức tranh tư duy

Hãy hình dung secrets management như két sắt ngân hàng — không phải ai cũng được vào, mỗi người chỉ mở được ngăn của mình, và mọi lần truy cập đều được ghi lại trong sổ.

┌─────────────────────────────────────────────────────────────┐
│                  SECRETS MANAGEMENT LAYERS                   │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  Layer 4: Vault / Secrets Manager (két sắt trung tâm)     │
│  ┌────────────────────────────────────────┐                │
│  │  HashiCorp Vault │ AWS Secrets Manager │                │
│  │  Dynamic secrets │ Automatic rotation  │                │
│  └────────┬─────────────────┬─────────────┘                │
│           │                 │                               │
│  Layer 3: Environment Injection (người giao chìa khóa)    │
│  ┌────────┴─────────────────┴─────────────┐                │
│  │  K8s Secrets │ Docker Secrets │  .env   │                │
│  └────────┬─────────────────┬─────────────┘                │
│           │                 │                               │
│  Layer 2: Application Layer (nhân viên dùng chìa khóa)    │
│  ┌────────┴─────────────────┴─────────────┐                │
│  │  Config classes │ Validation │  Cache   │                │
│  └────────┬─────────────────┬─────────────┘                │
│           │                 │                               │
│  Layer 1: secrets module (ổ khóa mật mã)                  │
│  ┌────────┴─────────────────┴─────────────┐                │
│  │  Token generation │ CSPRNG │ Compare    │                │
│  └────────────────────────────────────────┘                │
│                                                             │
└─────────────────────────────────────────────────────────────┘

Nguyên tắc vàng: Secrets không bao giờ nằm trong code, không bao giờ nằm trong Git history, và luôn được mã hóa khi lưu trữ (at rest) cũng như khi truyền tải (in transit). Giống như tiền trong ngân hàng — bạn không giấu tiền dưới gối, bạn đặt nó vào nơi có camera, có khóa, và có audit trail.

Cốt lõi kỹ thuật

Python secrets Module — Sinh token an toàn

Module secrets (Python 3.6+) được thiết kế riêng cho việc sinh giá trị ngẫu nhiên cryptographically secure — khác hoàn toàn với random module chỉ dành cho simulation.

python
import secrets
import string


def generate_api_key(prefix: str = "pk", length: int = 32) -> str:
    """Sinh API key với prefix để dễ nhận diện nguồn gốc."""
    random_part = secrets.token_hex(length)
    return f"{prefix}_live_{random_part}"


def generate_password(length: int = 24, include_symbols: bool = True) -> str:
    """Sinh password đạt chuẩn NIST SP 800-63B."""
    alphabet = string.ascii_letters + string.digits
    if include_symbols:
        alphabet += "!@#$%^&*()-_=+"

    while True:
        password = "".join(secrets.choice(alphabet) for _ in range(length))
        if (
            any(c.isupper() for c in password)
            and any(c.islower() for c in password)
            and any(c.isdigit() for c in password)
            and (not include_symbols or any(c in "!@#$%^&*()-_=+" for c in password))
        ):
            return password


def generate_url_safe_token(nbytes: int = 32) -> str:
    """Token an toàn cho URL — dùng trong reset password, email verification."""
    return secrets.token_urlsafe(nbytes)


def constant_time_compare(secret_a: str, secret_b: str) -> bool:
    """So sánh không bị timing attack — luôn dùng cho secret comparison."""
    return secrets.compare_digest(secret_a, secret_b)

Environment Variables — Best Practices

Environment variables là lớp đầu tiên và phổ biến nhất để inject secrets vào application. Nhưng sử dụng sai cách thì cũng nguy hiểm không kém hardcode.

python
from __future__ import annotations

import os
import sys
from dataclasses import dataclass


@dataclass(frozen=True)
class DatabaseConfig:
    """Immutable database configuration loaded from environment."""
    host: str
    port: int
    name: str
    user: str
    password: str
    ssl_mode: str = "require"
    pool_min: int = 5
    pool_max: int = 20

    @classmethod
    def from_env(cls, prefix: str = "DB") -> "DatabaseConfig":
        """Load config từ environment variables với validation chặt chẽ."""
        missing: list[str] = []

        def require(key: str) -> str:
            value = os.environ.get(f"{prefix}_{key}")
            if not value:
                missing.append(f"{prefix}_{key}")
                return ""
            return value

        host = require("HOST")
        port_str = os.environ.get(f"{prefix}_PORT", "5432")
        name = require("NAME")
        user = require("USER")
        password = require("PASSWORD")

        if missing:
            print(
                f"FATAL: Missing required env vars: {', '.join(missing)}",
                file=sys.stderr,
            )
            sys.exit(1)

        return cls(
            host=host, port=int(port_str), name=name,
            user=user, password=password,
            ssl_mode=os.environ.get(f"{prefix}_SSL_MODE", "require"),
            pool_min=int(os.environ.get(f"{prefix}_POOL_MIN", "5")),
            pool_max=int(os.environ.get(f"{prefix}_POOL_MAX", "20")),
        )

    @property
    def connection_url(self) -> str:
        """KHÔNG BAO GIỜ log giá trị này."""
        return (
            f"postgresql://{self.user}:{self.password}"
            f"@{self.host}:{self.port}/{self.name}?sslmode={self.ssl_mode}"
        )

    def __repr__(self) -> str:
        """Override repr để KHÔNG LỘ password trong logs."""
        return (
            f"DatabaseConfig(host={self.host!r}, port={self.port}, "
            f"name={self.name!r}, user={self.user!r}, password='***')"
        )

Dotenv Patterns — Quản lý .env files

python
"""
Cấu trúc .env files theo môi trường:
  .env                  ← defaults (KHÔNG chứa secrets thật)
  .env.local            ← local overrides (trong .gitignore)
  .env.production       ← production template (KHÔNG chứa giá trị thật)
"""
from pathlib import Path
from typing import Optional

from dotenv import dotenv_values, load_dotenv


class EnvLoader:
    """Load environment variables theo thứ tự ưu tiên đúng chuẩn."""

    PRIORITY_ORDER = [
        ".env",
        ".env.local",
        ".env.{environment}",
        ".env.{environment}.local",
    ]

    def __init__(self, environment: str = "development", base_path: Optional[Path] = None):
        self.environment = environment
        self.base_path = base_path or Path.cwd()

    def load(self) -> dict[str, Optional[str]]:
        """Load env files theo thứ tự ưu tiên — file sau override file trước."""
        merged: dict[str, Optional[str]] = {}
        for pattern in self.PRIORITY_ORDER:
            filepath = self.base_path / pattern.format(environment=self.environment)
            if filepath.exists():
                merged.update(dotenv_values(filepath))
        return merged

HashiCorp Vault Integration

HashiCorp Vault là giải pháp enterprise cho secrets management — cung cấp dynamic secrets, automatic rotation, và audit logging chi tiết.

python
from __future__ import annotations

import logging
import time
from dataclasses import dataclass
from typing import Any, Optional

import hvac

logger = logging.getLogger(__name__)


@dataclass
class VaultSecret:
    """Secret từ Vault kèm metadata."""
    key: str
    value: str
    lease_id: str
    lease_duration: int
    renewable: bool
    fetched_at: float

    @property
    def is_expired(self) -> bool:
        elapsed = time.time() - self.fetched_at
        return elapsed >= self.lease_duration * 0.8


class VaultClient:
    """Client wrapper cho HashiCorp Vault với caching và auto-renewal."""

    def __init__(self, vault_url: str, role_id: str, secret_id: str, namespace: Optional[str] = None):
        self._client = hvac.Client(url=vault_url, namespace=namespace)
        self._cache: dict[str, VaultSecret] = {}
        self._client.auth.approle.login(role_id=role_id, secret_id=secret_id)
        logger.info("Vault authentication successful via AppRole")

    def get_secret(self, path: str, key: str, mount_point: str = "secret") -> str:
        """Lấy secret với caching — tự động refresh khi gần hết lease."""
        cache_key = f"{mount_point}/{path}/{key}"

        if cache_key in self._cache and not self._cache[cache_key].is_expired:
            return self._cache[cache_key].value

        response = self._client.secrets.kv.v2.read_secret_version(
            path=path, mount_point=mount_point,
        )
        data = response["data"]["data"]
        if key not in data:
            raise KeyError(f"Key '{key}' not found in path '{path}'")

        self._cache[cache_key] = VaultSecret(
            key=key, value=data[key],
            lease_id=response.get("lease_id", ""),
            lease_duration=response.get("lease_duration", 3600),
            renewable=response.get("renewable", False),
            fetched_at=time.time(),
        )
        return data[key]

Key Rotation Strategies

Key rotation thay thế credentials định kỳ — dù key bị lộ thì cũng chỉ hữu dụng trong thời gian ngắn.

python
from __future__ import annotations

import hashlib
import logging
import secrets
import time
from dataclasses import dataclass
from enum import Enum
from typing import Callable, Optional

logger = logging.getLogger(__name__)


class RotationState(Enum):
    ACTIVE = "active"
    DEPRECATED = "deprecated"
    REVOKED = "revoked"


@dataclass
class ManagedKey:
    """Một key được quản lý với lifecycle đầy đủ."""
    key_id: str
    value: str
    state: RotationState
    created_at: float
    rotated_at: Optional[float] = None
    max_age_seconds: int = 86400

    @property
    def needs_rotation(self) -> bool:
        return (time.time() - self.created_at) >= self.max_age_seconds * 0.8


class KeyRotationManager:
    """Quản lý vòng đời key với zero-downtime rotation.

    Chiến lược dual-key: giữ cả key cũ và mới hoạt động song song
    trong grace period để tránh request bị reject.
    """

    def __init__(
        self,
        key_generator: Callable[[], str] = lambda: secrets.token_hex(32),
        grace_period_seconds: int = 3600,
        max_age_seconds: int = 86400,
    ):
        self._keys: dict[str, ManagedKey] = {}
        self._active_key_id: Optional[str] = None
        self._key_generator = key_generator
        self._grace_period = grace_period_seconds
        self._max_age = max_age_seconds

    def initialize(self) -> ManagedKey:
        """Tạo key ban đầu khi hệ thống khởi động."""
        key = self._create_key()
        self._active_key_id = key.key_id
        return key

    def rotate(self) -> tuple[ManagedKey, ManagedKey]:
        """Old key chuyển DEPRECATED, vẫn verify được trong grace period."""
        if not self._active_key_id:
            raise RuntimeError("No active key. Call initialize() first.")

        old_key = self._keys[self._active_key_id]
        old_key.state = RotationState.DEPRECATED
        old_key.rotated_at = time.time()

        new_key = self._create_key()
        self._active_key_id = new_key.key_id
        logger.info("Key rotated: %s -> %s", old_key.key_id[:8], new_key.key_id[:8])
        return new_key, old_key

    def get_signing_key(self) -> ManagedKey:
        if not self._active_key_id:
            raise RuntimeError("No active key available")
        return self._keys[self._active_key_id]

    def check_rotation_needed(self) -> bool:
        if not self._active_key_id:
            return False
        return self._keys[self._active_key_id].needs_rotation

    def _create_key(self) -> ManagedKey:
        value = self._key_generator()
        key_id = hashlib.sha256(value.encode()).hexdigest()[:16]
        key = ManagedKey(
            key_id=key_id, value=value, state=RotationState.ACTIVE,
            created_at=time.time(), max_age_seconds=self._max_age,
        )
        self._keys[key_id] = key
        return key

Thực chiến

Bài toán: Xây dựng credential management system cho kiến trúc microservices — mỗi service cần secrets khác nhau, credentials rotate tự động, mọi truy cập được audit.

python
"""Credential Management System cho Microservices."""
from __future__ import annotations

import logging
import os
import time
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
from typing import Any, Optional

logger = logging.getLogger(__name__)


class SecretBackend(Enum):
    ENVIRONMENT = "environment"
    VAULT = "vault"
    AWS_SECRETS_MANAGER = "aws"


@dataclass(frozen=True)
class SecretReference:
    """Tham chiếu đến một secret — không chứa giá trị thật."""
    backend: SecretBackend
    path: str
    key: str
    required: bool = True


class SecretProvider(ABC):
    @abstractmethod
    def get_secret(self, reference: SecretReference) -> str: ...


class EnvironmentSecretProvider(SecretProvider):
    def get_secret(self, reference: SecretReference) -> str:
        env_key = f"{reference.path}_{reference.key}".upper().replace("/", "_")
        value = os.environ.get(env_key)
        if not value and reference.required:
            raise KeyError(f"Required env var not found: {env_key}")
        return value or ""

class CredentialManager:
    """Quản lý trung tâm cho tất cả secrets — cache, audit, multi-backend."""

    def __init__(self) -> None:
        self._providers: dict[SecretBackend, SecretProvider] = {}
        self._cache: dict[str, str] = {}

    def register_provider(self, backend: SecretBackend, provider: SecretProvider) -> None:
        self._providers[backend] = provider

    def resolve(self, reference: SecretReference) -> str:
        """Resolve một secret reference thành giá trị thật."""
        cache_key = f"{reference.backend.value}:{reference.path}:{reference.key}"
        if cache_key in self._cache:
            return self._cache[cache_key]

        provider = self._providers.get(reference.backend)
        if not provider:
            raise RuntimeError(f"No provider for: {reference.backend.value}")

        value = provider.get_secret(reference)
        self._cache[cache_key] = value
        return value

    def resolve_batch(self, references: list[SecretReference]) -> dict[str, str]:
        """Resolve nhiều secrets — fail fast nếu thiếu required."""
        results: dict[str, str] = {}
        errors: list[str] = []
        for ref in references:
            try:
                results[f"{ref.path}/{ref.key}"] = self.resolve(ref)
            except Exception as exc:
                if ref.required:
                    errors.append(f"{ref.path}/{ref.key}: {exc}")
        if errors:
            raise RuntimeError(f"Failed to resolve: {', '.join(errors)}")
        return results


# --- Bootstrap pattern ---
def bootstrap_payment_service() -> CredentialManager:
    manager = CredentialManager()
    manager.register_provider(SecretBackend.ENVIRONMENT, EnvironmentSecretProvider())

    secrets_needed = [
        SecretReference(SecretBackend.ENVIRONMENT, "STRIPE", "API_KEY"),
        SecretReference(SecretBackend.ENVIRONMENT, "STRIPE", "WEBHOOK_SECRET"),
        SecretReference(SecretBackend.ENVIRONMENT, "DB", "PASSWORD"),
        SecretReference(SecretBackend.ENVIRONMENT, "REDIS", "URL"),
        SecretReference(SecretBackend.ENVIRONMENT, "SENTRY", "DSN", required=False),
    ]
    manager.resolve_batch(secrets_needed)
    return manager

Sai lầm điển hình

Sai lầm 1: Hardcode secrets trong source code

python
# ❌ SAI — Secret nằm trực tiếp trong code, vào Git history vĩnh viễn
class PaymentGateway:
    API_KEY = "sk_live_4eC39HqLyjWDarjtT1zdp7dc"

    def charge(self, amount: int) -> dict:
        return requests.post(
            "https://api.stripe.com/v1/charges",
            headers={"Authorization": f"Bearer {self.API_KEY}"},
            data={"amount": amount},
        ).json()
python
# ✅ ĐÚNG — Secrets inject từ environment, không bao giờ nằm trong code
class PaymentGateway:
    def __init__(self) -> None:
        self._api_key = os.environ["STRIPE_API_KEY"]

    def charge(self, amount: int) -> dict:
        return requests.post(
            "https://api.stripe.com/v1/charges",
            headers={"Authorization": f"Bearer {self._api_key}"},
            data={"amount": amount},
        ).json()

Sai lầm 2: Dùng random thay vì secrets cho security tokens

python
# ❌ SAI — random module dùng Mersenne Twister, predictable sau 624 outputs
import random

def generate_reset_token() -> str:
    return "".join(random.choices("abcdef0123456789", k=32))

def generate_session_id() -> str:
    return str(random.randint(100000, 999999))
python
# ✅ ĐÚNG — secrets module dùng OS-level CSPRNG, không thể predict
import secrets

def generate_reset_token() -> str:
    return secrets.token_urlsafe(32)

def generate_session_id() -> str:
    return secrets.token_hex(16)

Sai lầm 3: So sánh secrets bằng == — dính timing attack

python
# ❌ SAI — Toán tử == dừng sớm khi gặp byte khác → timing side-channel
def verify_webhook(payload: bytes, signature: str, secret: str) -> bool:
    expected = hmac.new(secret.encode(), payload, hashlib.sha256).hexdigest()
    return signature == expected  # Timing attack vulnerability!
python
# ✅ ĐÚNG — compare_digest luôn chạy hết cả hai chuỗi, constant time
import hmac
import hashlib

def verify_webhook(payload: bytes, signature: str, secret: str) -> bool:
    expected = hmac.new(secret.encode(), payload, hashlib.sha256).hexdigest()
    return hmac.compare_digest(signature, expected)

Sai lầm 4: Log secrets trong error messages

python
# ❌ SAI — Password lọt vào log files, monitoring systems
def connect_database(url: str) -> Connection:
    try:
        return psycopg2.connect(url)
    except Exception:
        logger.error(f"Failed to connect: {url}")  # URL chứa password!
        raise
python
# ✅ ĐÚNG — Mask sensitive data trước khi log
import re

def _mask_credentials(url: str) -> str:
    return re.sub(r"://([^:]+):([^@]+)@", r"://\1:***@", url)

def connect_database(url: str) -> Connection:
    try:
        return psycopg2.connect(url)
    except Exception:
        logger.error("Failed to connect: %s", _mask_credentials(url))
        raise

Sai lầm 5: Không đặt expiry cho secrets

python
# ❌ SAI — API key tạo một lần, dùng mãi mãi, không bao giờ rotate
api_key = generate_api_key()
save_to_config(api_key)  # Dùng cùng key 3 năm liền
python
# ✅ ĐÚNG — Mỗi key có TTL, tự động rotate trước khi hết hạn
rotation_manager = KeyRotationManager(
    max_age_seconds=86400,       # Rotate mỗi 24 giờ
    grace_period_seconds=3600,   # Key cũ còn dùng được 1 giờ
)
rotation_manager.initialize()

if rotation_manager.check_rotation_needed():
    new_key, old_key = rotation_manager.rotate()
    notify_dependent_services(new_key)

Under the Hood

CSPRNG — Cryptographically Secure Pseudo-Random Number Generator

Khi bạn gọi secrets.token_hex(32), Python không tự sinh số ngẫu nhiên — nó delegate xuống hệ điều hành:

┌───────────────────────────────────────────────────┐
│  secrets.token_bytes(n)                            │
│       │                                            │
│       ▼                                            │
│  os.urandom(n)                                     │
│       │                                            │
│       ▼                                            │
│  ┌───────────────────────────────────────────┐     │
│  │  Linux: getrandom() syscall               │     │
│  │  Windows: BCryptGenRandom()               │     │
│  │  macOS: SecRandomCopyBytes                │     │
│  └──────────────────┬────────────────────────┘     │
│                     ▼                              │
│  ┌───────────────────────────────────────────┐     │
│  │  Kernel Entropy Pool                      │     │
│  │  Sources: hardware interrupts, disk I/O,  │     │
│  │  network packets, CPU jitter, RDRAND      │     │
│  └───────────────────────────────────────────┘     │
└───────────────────────────────────────────────────┘

Tại sao random module không an toàn cho security?

Module random dùng thuật toán Mersenne Twister (MT19937) — một PRNG có chu kỳ cực dài (2^19937 - 1) nhưng hoàn toàn deterministic. Nếu attacker quan sát được 624 outputs liên tiếp (mỗi output 32 bits), họ có thể tái tạo internal state và predict mọi giá trị tương lai.

python
"""
Mersenne Twister state có thể bị clone sau 624 observations.
secrets module KHÔNG có vấn đề này vì mỗi lần gọi lấy entropy
mới từ OS — không có internal state để reconstruct.
"""
import random

# Attacker quan sát 624 outputs → reconstruct state
observed = [random.getrandbits(32) for _ in range(624)]
# Sau đó predict MỌI giá trị tiếp theo bằng "untwisting"

Timing Attacks — Khi thời gian phản hồi tiết lộ bí mật

python
"""
So sánh "abcdef" với "xbcdef":  1 comparison  → ~10ns
So sánh "abcdef" với "abcxef":  4 comparisons → ~40ns

Attacker đo thời gian → suy ra bao nhiêu byte đúng
→ brute-force từng byte thay vì toàn bộ string.

hmac.compare_digest() LUÔN so sánh HẾT tất cả bytes.
Thời gian = O(n) cố định, không phụ thuộc vị trí byte sai.
"""
import hmac


def vulnerable_compare(a: str, b: str) -> bool:
    """DỄ BỊ timing attack — dừng sớm khi gặp byte khác."""
    if len(a) != len(b):
        return False
    for x, y in zip(a, b):
        if x != y:
            return False
    return True


def safe_compare(a: str, b: str) -> bool:
    """Constant-time — KHÔNG leak thông tin qua timing."""
    return hmac.compare_digest(a.encode(), b.encode())

Memory Safety — Xóa secrets khỏi memory

python
import ctypes
import hashlib


def secure_zero_memory(secret: bytearray) -> None:
    """Ghi đè vùng nhớ chứa secret bằng zeros.

    Chỉ hoạt động với bytearray (mutable). str/bytes trong Python
    là immutable — không thể ghi đè in-place.
    """
    ctypes.memset(
        ctypes.addressof((ctypes.c_char * len(secret)).from_buffer(secret)),
        0, len(secret),
    )


def process_secret_safely(raw_secret: bytes) -> str:
    """Xử lý secret và đảm bảo xóa khỏi memory sau khi dùng."""
    mutable = bytearray(raw_secret)
    try:
        return hashlib.sha256(mutable).hexdigest()
    finally:
        secure_zero_memory(mutable)

Checklist ghi nhớ

✅ Checklist triển khai

  • [ ] Không bao giờ hardcode secrets trong source code — dùng environment variables hoặc vault
  • [ ] Thêm .env, .env.local, *.pem, *.key vào .gitignore trước khi tạo project
  • [ ] Dùng secrets module (không phải random) cho mọi security token
  • [ ] So sánh secrets bằng secrets.compare_digest() hoặc hmac.compare_digest()
  • [ ] Override __repr__() trong config classes để không lộ credentials khi log
  • [ ] Mask connection strings và credentials trước khi ghi vào log
  • [ ] Implement key rotation với grace period — không revoke key cũ ngay lập tức
  • [ ] Validate tất cả required secrets khi bootstrap — fail fast nếu thiếu
  • [ ] Dùng frozen dataclass cho config objects chứa secrets
  • [ ] Set TTL cho cached secrets — không cache vĩnh viễn
  • [ ] Audit mọi lần truy cập secret — ghi lại ai, khi nào, secret nào
  • [ ] Dùng dynamic credentials (Vault, AWS IAM roles) thay vì static keys khi có thể
  • [ ] Scan Git history bằng trufflehog hoặc git-secrets trong CI/CD pipeline
  • [ ] Encrypt secrets at rest — không lưu plaintext trong database hoặc config files
  • [ ] Thiết lập alerts khi secret access pattern bất thường

Bài tập luyện tập

Bài 1: Xây dựng Secret Configuration Loader

🧠 Quiz

Yêu cầu: Viết class SecretConfig có khả năng:

  1. Load secrets từ nhiều nguồn (env vars, .env file, defaults) theo thứ tự ưu tiên
  2. Validate tất cả required fields khi khởi tạo
  3. Không expose secret values trong __repr__, __str__, hay khi serialize

Input:

python
config = SecretConfig.load(
    required=["DATABASE_URL", "API_KEY", "JWT_SECRET"],
    optional={"SENTRY_DSN": None, "LOG_LEVEL": "INFO"},
    env_file=".env.production",
)
print(config)           # Hiển thị *** cho tất cả giá trị
print(config.get("API_KEY"))  # Trả về giá trị thật
Xem lời giải
python
from __future__ import annotations

import os
from pathlib import Path
from typing import Optional

from dotenv import dotenv_values


class SecretConfig:
    """Configuration loader với secret masking."""

    def __init__(self, values: dict[str, str], secret_keys: set[str]) -> None:
        self._values = dict(values)
        self._secret_keys = secret_keys

    @classmethod
    def load(
        cls,
        required: list[str],
        optional: Optional[dict[str, Optional[str]]] = None,
        env_file: Optional[str] = None,
    ) -> "SecretConfig":
        optional = optional or {}
        all_keys = set(required) | set(optional.keys())

        # Layer 1: Defaults
        values: dict[str, str] = {k: v for k, v in optional.items() if v is not None}

        # Layer 2: .env file
        if env_file and Path(env_file).exists():
            file_values = dotenv_values(env_file)
            for key in all_keys:
                if key in file_values and file_values[key]:
                    values[key] = file_values[key]

        # Layer 3: Environment variables (highest priority)
        for key in all_keys:
            env_val = os.environ.get(key)
            if env_val:
                values[key] = env_val

        missing = [k for k in required if k not in values]
        if missing:
            raise ValueError(f"Missing required config: {', '.join(missing)}")

        return cls(values=values, secret_keys=set(required))

    def get(self, key: str, default: Optional[str] = None) -> Optional[str]:
        return self._values.get(key, default)

    def __repr__(self) -> str:
        masked = {
            k: ("***" if k in self._secret_keys else v)
            for k, v in self._values.items()
        }
        pairs = ", ".join(f"{k}={v!r}" for k, v in masked.items())
        return f"SecretConfig({pairs})"

Bài 2: Implement Webhook Signature Verifier

🧠 Quiz

Yêu cầu: Viết hàm verify webhook signatures:

  1. Tính HMAC-SHA256 từ payload và secret
  2. So sánh constant-time với signature nhận được
  3. Kiểm tra timestamp để chống replay attack (tolerance: 300 giây)
  4. Trả về (is_valid, error_message)

Test cases:

python
# Valid signature, recent timestamp   → (True, None)
# Valid signature, old timestamp      → (False, "Timestamp expired")
# Invalid signature                   → (False, "Signature mismatch")
# Missing header                      → (False, "Missing signature header")
Xem lời giải
python
import hashlib
import hmac
import time
from typing import Optional


def verify_webhook_signature(
    payload: bytes,
    signature_header: Optional[str],
    secret: str,
    tolerance_seconds: int = 300,
) -> tuple[bool, Optional[str]]:
    """Verify webhook signature với timing attack protection."""
    if not signature_header:
        return False, "Missing signature header"

    parts: dict[str, str] = {}
    try:
        for element in signature_header.split(","):
            key, value = element.strip().split("=", 1)
            parts[key] = value
    except ValueError:
        return False, "Malformed signature header"

    if "t" not in parts or "v1" not in parts:
        return False, "Missing timestamp or signature"

    try:
        timestamp = int(parts["t"])
    except ValueError:
        return False, "Invalid timestamp format"

    age = abs(time.time() - timestamp)
    if age > tolerance_seconds:
        return False, f"Timestamp expired ({int(age)}s old, max {tolerance_seconds}s)"

    signed_payload = f"{timestamp}.".encode() + payload
    expected = hmac.new(secret.encode(), signed_payload, hashlib.sha256).hexdigest()

    if not hmac.compare_digest(expected, parts["v1"]):
        return False, "Signature mismatch"

    return True, None

Liên kết học tiếp