Giao diện
Secrets Management — Quản lý bí mật trong Production
Tháng 3 năm 2023, một kỹ sư backend tại một startup fintech Việt Nam đã push một commit chứa AWS access key trực tiếp trong source code. Chỉ trong vòng 11 phút, bot scanner trên GitHub đã phát hiện và khai thác key đó — spin lên 47 EC2 instance để đào cryptocurrency. Hóa đơn AWS vào cuối tháng: $23,000 USD. Toàn bộ sự việc bắt nguồn từ một dòng code duy nhất: AWS_SECRET_KEY = "wJalrXUtnFEMI...".
Secrets management không phải là "nice-to-have" — nó là yêu cầu bắt buộc trong bất kỳ hệ thống production nào. Mỗi credential bị lộ là một cánh cửa mở toang cho attacker: database connection strings, API keys, encryption keys, OAuth tokens. Python cung cấp module secrets chuyên dụng cho việc sinh token an toàn, nhưng quản lý bí mật trong thực tế đòi hỏi kiến trúc nhiều tầng — từ environment variables, đến vault systems, đến automated key rotation.
Bài viết này đưa bạn từ nền tảng module secrets đến kiến trúc credential management cho microservices, bao gồm tích hợp HashiCorp Vault, AWS Secrets Manager, và chiến lược rotation không downtime.
Bức tranh tư duy
Hãy hình dung secrets management như két sắt ngân hàng — không phải ai cũng được vào, mỗi người chỉ mở được ngăn của mình, và mọi lần truy cập đều được ghi lại trong sổ.
┌─────────────────────────────────────────────────────────────┐
│ SECRETS MANAGEMENT LAYERS │
├─────────────────────────────────────────────────────────────┤
│ │
│ Layer 4: Vault / Secrets Manager (két sắt trung tâm) │
│ ┌────────────────────────────────────────┐ │
│ │ HashiCorp Vault │ AWS Secrets Manager │ │
│ │ Dynamic secrets │ Automatic rotation │ │
│ └────────┬─────────────────┬─────────────┘ │
│ │ │ │
│ Layer 3: Environment Injection (người giao chìa khóa) │
│ ┌────────┴─────────────────┴─────────────┐ │
│ │ K8s Secrets │ Docker Secrets │ .env │ │
│ └────────┬─────────────────┬─────────────┘ │
│ │ │ │
│ Layer 2: Application Layer (nhân viên dùng chìa khóa) │
│ ┌────────┴─────────────────┴─────────────┐ │
│ │ Config classes │ Validation │ Cache │ │
│ └────────┬─────────────────┬─────────────┘ │
│ │ │ │
│ Layer 1: secrets module (ổ khóa mật mã) │
│ ┌────────┴─────────────────┴─────────────┐ │
│ │ Token generation │ CSPRNG │ Compare │ │
│ └────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘Nguyên tắc vàng: Secrets không bao giờ nằm trong code, không bao giờ nằm trong Git history, và luôn được mã hóa khi lưu trữ (at rest) cũng như khi truyền tải (in transit). Giống như tiền trong ngân hàng — bạn không giấu tiền dưới gối, bạn đặt nó vào nơi có camera, có khóa, và có audit trail.
Cốt lõi kỹ thuật
Python secrets Module — Sinh token an toàn
Module secrets (Python 3.6+) được thiết kế riêng cho việc sinh giá trị ngẫu nhiên cryptographically secure — khác hoàn toàn với random module chỉ dành cho simulation.
python
import secrets
import string
def generate_api_key(prefix: str = "pk", length: int = 32) -> str:
"""Sinh API key với prefix để dễ nhận diện nguồn gốc."""
random_part = secrets.token_hex(length)
return f"{prefix}_live_{random_part}"
def generate_password(length: int = 24, include_symbols: bool = True) -> str:
"""Sinh password đạt chuẩn NIST SP 800-63B."""
alphabet = string.ascii_letters + string.digits
if include_symbols:
alphabet += "!@#$%^&*()-_=+"
while True:
password = "".join(secrets.choice(alphabet) for _ in range(length))
if (
any(c.isupper() for c in password)
and any(c.islower() for c in password)
and any(c.isdigit() for c in password)
and (not include_symbols or any(c in "!@#$%^&*()-_=+" for c in password))
):
return password
def generate_url_safe_token(nbytes: int = 32) -> str:
"""Token an toàn cho URL — dùng trong reset password, email verification."""
return secrets.token_urlsafe(nbytes)
def constant_time_compare(secret_a: str, secret_b: str) -> bool:
"""So sánh không bị timing attack — luôn dùng cho secret comparison."""
return secrets.compare_digest(secret_a, secret_b)Environment Variables — Best Practices
Environment variables là lớp đầu tiên và phổ biến nhất để inject secrets vào application. Nhưng sử dụng sai cách thì cũng nguy hiểm không kém hardcode.
python
from __future__ import annotations
import os
import sys
from dataclasses import dataclass
@dataclass(frozen=True)
class DatabaseConfig:
"""Immutable database configuration loaded from environment."""
host: str
port: int
name: str
user: str
password: str
ssl_mode: str = "require"
pool_min: int = 5
pool_max: int = 20
@classmethod
def from_env(cls, prefix: str = "DB") -> "DatabaseConfig":
"""Load config từ environment variables với validation chặt chẽ."""
missing: list[str] = []
def require(key: str) -> str:
value = os.environ.get(f"{prefix}_{key}")
if not value:
missing.append(f"{prefix}_{key}")
return ""
return value
host = require("HOST")
port_str = os.environ.get(f"{prefix}_PORT", "5432")
name = require("NAME")
user = require("USER")
password = require("PASSWORD")
if missing:
print(
f"FATAL: Missing required env vars: {', '.join(missing)}",
file=sys.stderr,
)
sys.exit(1)
return cls(
host=host, port=int(port_str), name=name,
user=user, password=password,
ssl_mode=os.environ.get(f"{prefix}_SSL_MODE", "require"),
pool_min=int(os.environ.get(f"{prefix}_POOL_MIN", "5")),
pool_max=int(os.environ.get(f"{prefix}_POOL_MAX", "20")),
)
@property
def connection_url(self) -> str:
"""KHÔNG BAO GIỜ log giá trị này."""
return (
f"postgresql://{self.user}:{self.password}"
f"@{self.host}:{self.port}/{self.name}?sslmode={self.ssl_mode}"
)
def __repr__(self) -> str:
"""Override repr để KHÔNG LỘ password trong logs."""
return (
f"DatabaseConfig(host={self.host!r}, port={self.port}, "
f"name={self.name!r}, user={self.user!r}, password='***')"
)Dotenv Patterns — Quản lý .env files
python
"""
Cấu trúc .env files theo môi trường:
.env ← defaults (KHÔNG chứa secrets thật)
.env.local ← local overrides (trong .gitignore)
.env.production ← production template (KHÔNG chứa giá trị thật)
"""
from pathlib import Path
from typing import Optional
from dotenv import dotenv_values, load_dotenv
class EnvLoader:
"""Load environment variables theo thứ tự ưu tiên đúng chuẩn."""
PRIORITY_ORDER = [
".env",
".env.local",
".env.{environment}",
".env.{environment}.local",
]
def __init__(self, environment: str = "development", base_path: Optional[Path] = None):
self.environment = environment
self.base_path = base_path or Path.cwd()
def load(self) -> dict[str, Optional[str]]:
"""Load env files theo thứ tự ưu tiên — file sau override file trước."""
merged: dict[str, Optional[str]] = {}
for pattern in self.PRIORITY_ORDER:
filepath = self.base_path / pattern.format(environment=self.environment)
if filepath.exists():
merged.update(dotenv_values(filepath))
return mergedHashiCorp Vault Integration
HashiCorp Vault là giải pháp enterprise cho secrets management — cung cấp dynamic secrets, automatic rotation, và audit logging chi tiết.
python
from __future__ import annotations
import logging
import time
from dataclasses import dataclass
from typing import Any, Optional
import hvac
logger = logging.getLogger(__name__)
@dataclass
class VaultSecret:
"""Secret từ Vault kèm metadata."""
key: str
value: str
lease_id: str
lease_duration: int
renewable: bool
fetched_at: float
@property
def is_expired(self) -> bool:
elapsed = time.time() - self.fetched_at
return elapsed >= self.lease_duration * 0.8
class VaultClient:
"""Client wrapper cho HashiCorp Vault với caching và auto-renewal."""
def __init__(self, vault_url: str, role_id: str, secret_id: str, namespace: Optional[str] = None):
self._client = hvac.Client(url=vault_url, namespace=namespace)
self._cache: dict[str, VaultSecret] = {}
self._client.auth.approle.login(role_id=role_id, secret_id=secret_id)
logger.info("Vault authentication successful via AppRole")
def get_secret(self, path: str, key: str, mount_point: str = "secret") -> str:
"""Lấy secret với caching — tự động refresh khi gần hết lease."""
cache_key = f"{mount_point}/{path}/{key}"
if cache_key in self._cache and not self._cache[cache_key].is_expired:
return self._cache[cache_key].value
response = self._client.secrets.kv.v2.read_secret_version(
path=path, mount_point=mount_point,
)
data = response["data"]["data"]
if key not in data:
raise KeyError(f"Key '{key}' not found in path '{path}'")
self._cache[cache_key] = VaultSecret(
key=key, value=data[key],
lease_id=response.get("lease_id", ""),
lease_duration=response.get("lease_duration", 3600),
renewable=response.get("renewable", False),
fetched_at=time.time(),
)
return data[key]Key Rotation Strategies
Key rotation thay thế credentials định kỳ — dù key bị lộ thì cũng chỉ hữu dụng trong thời gian ngắn.
python
from __future__ import annotations
import hashlib
import logging
import secrets
import time
from dataclasses import dataclass
from enum import Enum
from typing import Callable, Optional
logger = logging.getLogger(__name__)
class RotationState(Enum):
ACTIVE = "active"
DEPRECATED = "deprecated"
REVOKED = "revoked"
@dataclass
class ManagedKey:
"""Một key được quản lý với lifecycle đầy đủ."""
key_id: str
value: str
state: RotationState
created_at: float
rotated_at: Optional[float] = None
max_age_seconds: int = 86400
@property
def needs_rotation(self) -> bool:
return (time.time() - self.created_at) >= self.max_age_seconds * 0.8
class KeyRotationManager:
"""Quản lý vòng đời key với zero-downtime rotation.
Chiến lược dual-key: giữ cả key cũ và mới hoạt động song song
trong grace period để tránh request bị reject.
"""
def __init__(
self,
key_generator: Callable[[], str] = lambda: secrets.token_hex(32),
grace_period_seconds: int = 3600,
max_age_seconds: int = 86400,
):
self._keys: dict[str, ManagedKey] = {}
self._active_key_id: Optional[str] = None
self._key_generator = key_generator
self._grace_period = grace_period_seconds
self._max_age = max_age_seconds
def initialize(self) -> ManagedKey:
"""Tạo key ban đầu khi hệ thống khởi động."""
key = self._create_key()
self._active_key_id = key.key_id
return key
def rotate(self) -> tuple[ManagedKey, ManagedKey]:
"""Old key chuyển DEPRECATED, vẫn verify được trong grace period."""
if not self._active_key_id:
raise RuntimeError("No active key. Call initialize() first.")
old_key = self._keys[self._active_key_id]
old_key.state = RotationState.DEPRECATED
old_key.rotated_at = time.time()
new_key = self._create_key()
self._active_key_id = new_key.key_id
logger.info("Key rotated: %s -> %s", old_key.key_id[:8], new_key.key_id[:8])
return new_key, old_key
def get_signing_key(self) -> ManagedKey:
if not self._active_key_id:
raise RuntimeError("No active key available")
return self._keys[self._active_key_id]
def check_rotation_needed(self) -> bool:
if not self._active_key_id:
return False
return self._keys[self._active_key_id].needs_rotation
def _create_key(self) -> ManagedKey:
value = self._key_generator()
key_id = hashlib.sha256(value.encode()).hexdigest()[:16]
key = ManagedKey(
key_id=key_id, value=value, state=RotationState.ACTIVE,
created_at=time.time(), max_age_seconds=self._max_age,
)
self._keys[key_id] = key
return keyThực chiến
Bài toán: Xây dựng credential management system cho kiến trúc microservices — mỗi service cần secrets khác nhau, credentials rotate tự động, mọi truy cập được audit.
python
"""Credential Management System cho Microservices."""
from __future__ import annotations
import logging
import os
import time
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
from typing import Any, Optional
logger = logging.getLogger(__name__)
class SecretBackend(Enum):
ENVIRONMENT = "environment"
VAULT = "vault"
AWS_SECRETS_MANAGER = "aws"
@dataclass(frozen=True)
class SecretReference:
"""Tham chiếu đến một secret — không chứa giá trị thật."""
backend: SecretBackend
path: str
key: str
required: bool = True
class SecretProvider(ABC):
@abstractmethod
def get_secret(self, reference: SecretReference) -> str: ...
class EnvironmentSecretProvider(SecretProvider):
def get_secret(self, reference: SecretReference) -> str:
env_key = f"{reference.path}_{reference.key}".upper().replace("/", "_")
value = os.environ.get(env_key)
if not value and reference.required:
raise KeyError(f"Required env var not found: {env_key}")
return value or ""
class CredentialManager:
"""Quản lý trung tâm cho tất cả secrets — cache, audit, multi-backend."""
def __init__(self) -> None:
self._providers: dict[SecretBackend, SecretProvider] = {}
self._cache: dict[str, str] = {}
def register_provider(self, backend: SecretBackend, provider: SecretProvider) -> None:
self._providers[backend] = provider
def resolve(self, reference: SecretReference) -> str:
"""Resolve một secret reference thành giá trị thật."""
cache_key = f"{reference.backend.value}:{reference.path}:{reference.key}"
if cache_key in self._cache:
return self._cache[cache_key]
provider = self._providers.get(reference.backend)
if not provider:
raise RuntimeError(f"No provider for: {reference.backend.value}")
value = provider.get_secret(reference)
self._cache[cache_key] = value
return value
def resolve_batch(self, references: list[SecretReference]) -> dict[str, str]:
"""Resolve nhiều secrets — fail fast nếu thiếu required."""
results: dict[str, str] = {}
errors: list[str] = []
for ref in references:
try:
results[f"{ref.path}/{ref.key}"] = self.resolve(ref)
except Exception as exc:
if ref.required:
errors.append(f"{ref.path}/{ref.key}: {exc}")
if errors:
raise RuntimeError(f"Failed to resolve: {', '.join(errors)}")
return results
# --- Bootstrap pattern ---
def bootstrap_payment_service() -> CredentialManager:
manager = CredentialManager()
manager.register_provider(SecretBackend.ENVIRONMENT, EnvironmentSecretProvider())
secrets_needed = [
SecretReference(SecretBackend.ENVIRONMENT, "STRIPE", "API_KEY"),
SecretReference(SecretBackend.ENVIRONMENT, "STRIPE", "WEBHOOK_SECRET"),
SecretReference(SecretBackend.ENVIRONMENT, "DB", "PASSWORD"),
SecretReference(SecretBackend.ENVIRONMENT, "REDIS", "URL"),
SecretReference(SecretBackend.ENVIRONMENT, "SENTRY", "DSN", required=False),
]
manager.resolve_batch(secrets_needed)
return managerSai lầm điển hình
Sai lầm 1: Hardcode secrets trong source code
python
# ❌ SAI — Secret nằm trực tiếp trong code, vào Git history vĩnh viễn
class PaymentGateway:
API_KEY = "sk_live_4eC39HqLyjWDarjtT1zdp7dc"
def charge(self, amount: int) -> dict:
return requests.post(
"https://api.stripe.com/v1/charges",
headers={"Authorization": f"Bearer {self.API_KEY}"},
data={"amount": amount},
).json()python
# ✅ ĐÚNG — Secrets inject từ environment, không bao giờ nằm trong code
class PaymentGateway:
def __init__(self) -> None:
self._api_key = os.environ["STRIPE_API_KEY"]
def charge(self, amount: int) -> dict:
return requests.post(
"https://api.stripe.com/v1/charges",
headers={"Authorization": f"Bearer {self._api_key}"},
data={"amount": amount},
).json()Sai lầm 2: Dùng random thay vì secrets cho security tokens
python
# ❌ SAI — random module dùng Mersenne Twister, predictable sau 624 outputs
import random
def generate_reset_token() -> str:
return "".join(random.choices("abcdef0123456789", k=32))
def generate_session_id() -> str:
return str(random.randint(100000, 999999))python
# ✅ ĐÚNG — secrets module dùng OS-level CSPRNG, không thể predict
import secrets
def generate_reset_token() -> str:
return secrets.token_urlsafe(32)
def generate_session_id() -> str:
return secrets.token_hex(16)Sai lầm 3: So sánh secrets bằng == — dính timing attack
python
# ❌ SAI — Toán tử == dừng sớm khi gặp byte khác → timing side-channel
def verify_webhook(payload: bytes, signature: str, secret: str) -> bool:
expected = hmac.new(secret.encode(), payload, hashlib.sha256).hexdigest()
return signature == expected # Timing attack vulnerability!python
# ✅ ĐÚNG — compare_digest luôn chạy hết cả hai chuỗi, constant time
import hmac
import hashlib
def verify_webhook(payload: bytes, signature: str, secret: str) -> bool:
expected = hmac.new(secret.encode(), payload, hashlib.sha256).hexdigest()
return hmac.compare_digest(signature, expected)Sai lầm 4: Log secrets trong error messages
python
# ❌ SAI — Password lọt vào log files, monitoring systems
def connect_database(url: str) -> Connection:
try:
return psycopg2.connect(url)
except Exception:
logger.error(f"Failed to connect: {url}") # URL chứa password!
raisepython
# ✅ ĐÚNG — Mask sensitive data trước khi log
import re
def _mask_credentials(url: str) -> str:
return re.sub(r"://([^:]+):([^@]+)@", r"://\1:***@", url)
def connect_database(url: str) -> Connection:
try:
return psycopg2.connect(url)
except Exception:
logger.error("Failed to connect: %s", _mask_credentials(url))
raiseSai lầm 5: Không đặt expiry cho secrets
python
# ❌ SAI — API key tạo một lần, dùng mãi mãi, không bao giờ rotate
api_key = generate_api_key()
save_to_config(api_key) # Dùng cùng key 3 năm liềnpython
# ✅ ĐÚNG — Mỗi key có TTL, tự động rotate trước khi hết hạn
rotation_manager = KeyRotationManager(
max_age_seconds=86400, # Rotate mỗi 24 giờ
grace_period_seconds=3600, # Key cũ còn dùng được 1 giờ
)
rotation_manager.initialize()
if rotation_manager.check_rotation_needed():
new_key, old_key = rotation_manager.rotate()
notify_dependent_services(new_key)Under the Hood
CSPRNG — Cryptographically Secure Pseudo-Random Number Generator
Khi bạn gọi secrets.token_hex(32), Python không tự sinh số ngẫu nhiên — nó delegate xuống hệ điều hành:
┌───────────────────────────────────────────────────┐
│ secrets.token_bytes(n) │
│ │ │
│ ▼ │
│ os.urandom(n) │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────┐ │
│ │ Linux: getrandom() syscall │ │
│ │ Windows: BCryptGenRandom() │ │
│ │ macOS: SecRandomCopyBytes │ │
│ └──────────────────┬────────────────────────┘ │
│ ▼ │
│ ┌───────────────────────────────────────────┐ │
│ │ Kernel Entropy Pool │ │
│ │ Sources: hardware interrupts, disk I/O, │ │
│ │ network packets, CPU jitter, RDRAND │ │
│ └───────────────────────────────────────────┘ │
└───────────────────────────────────────────────────┘Tại sao random module không an toàn cho security?
Module random dùng thuật toán Mersenne Twister (MT19937) — một PRNG có chu kỳ cực dài (2^19937 - 1) nhưng hoàn toàn deterministic. Nếu attacker quan sát được 624 outputs liên tiếp (mỗi output 32 bits), họ có thể tái tạo internal state và predict mọi giá trị tương lai.
python
"""
Mersenne Twister state có thể bị clone sau 624 observations.
secrets module KHÔNG có vấn đề này vì mỗi lần gọi lấy entropy
mới từ OS — không có internal state để reconstruct.
"""
import random
# Attacker quan sát 624 outputs → reconstruct state
observed = [random.getrandbits(32) for _ in range(624)]
# Sau đó predict MỌI giá trị tiếp theo bằng "untwisting"Timing Attacks — Khi thời gian phản hồi tiết lộ bí mật
python
"""
So sánh "abcdef" với "xbcdef": 1 comparison → ~10ns
So sánh "abcdef" với "abcxef": 4 comparisons → ~40ns
Attacker đo thời gian → suy ra bao nhiêu byte đúng
→ brute-force từng byte thay vì toàn bộ string.
hmac.compare_digest() LUÔN so sánh HẾT tất cả bytes.
Thời gian = O(n) cố định, không phụ thuộc vị trí byte sai.
"""
import hmac
def vulnerable_compare(a: str, b: str) -> bool:
"""DỄ BỊ timing attack — dừng sớm khi gặp byte khác."""
if len(a) != len(b):
return False
for x, y in zip(a, b):
if x != y:
return False
return True
def safe_compare(a: str, b: str) -> bool:
"""Constant-time — KHÔNG leak thông tin qua timing."""
return hmac.compare_digest(a.encode(), b.encode())Memory Safety — Xóa secrets khỏi memory
python
import ctypes
import hashlib
def secure_zero_memory(secret: bytearray) -> None:
"""Ghi đè vùng nhớ chứa secret bằng zeros.
Chỉ hoạt động với bytearray (mutable). str/bytes trong Python
là immutable — không thể ghi đè in-place.
"""
ctypes.memset(
ctypes.addressof((ctypes.c_char * len(secret)).from_buffer(secret)),
0, len(secret),
)
def process_secret_safely(raw_secret: bytes) -> str:
"""Xử lý secret và đảm bảo xóa khỏi memory sau khi dùng."""
mutable = bytearray(raw_secret)
try:
return hashlib.sha256(mutable).hexdigest()
finally:
secure_zero_memory(mutable)Checklist ghi nhớ
✅ Checklist triển khai
- [ ] Không bao giờ hardcode secrets trong source code — dùng environment variables hoặc vault
- [ ] Thêm
.env,.env.local,*.pem,*.keyvào.gitignoretrước khi tạo project - [ ] Dùng
secretsmodule (không phảirandom) cho mọi security token - [ ] So sánh secrets bằng
secrets.compare_digest()hoặchmac.compare_digest() - [ ] Override
__repr__()trong config classes để không lộ credentials khi log - [ ] Mask connection strings và credentials trước khi ghi vào log
- [ ] Implement key rotation với grace period — không revoke key cũ ngay lập tức
- [ ] Validate tất cả required secrets khi bootstrap — fail fast nếu thiếu
- [ ] Dùng frozen dataclass cho config objects chứa secrets
- [ ] Set TTL cho cached secrets — không cache vĩnh viễn
- [ ] Audit mọi lần truy cập secret — ghi lại ai, khi nào, secret nào
- [ ] Dùng dynamic credentials (Vault, AWS IAM roles) thay vì static keys khi có thể
- [ ] Scan Git history bằng
trufflehoghoặcgit-secretstrong CI/CD pipeline - [ ] Encrypt secrets at rest — không lưu plaintext trong database hoặc config files
- [ ] Thiết lập alerts khi secret access pattern bất thường
Bài tập luyện tập
Bài 1: Xây dựng Secret Configuration Loader
🧠 Quiz
Yêu cầu: Viết class SecretConfig có khả năng:
- Load secrets từ nhiều nguồn (env vars,
.envfile, defaults) theo thứ tự ưu tiên - Validate tất cả required fields khi khởi tạo
- Không expose secret values trong
__repr__,__str__, hay khi serialize
Input:
python
config = SecretConfig.load(
required=["DATABASE_URL", "API_KEY", "JWT_SECRET"],
optional={"SENTRY_DSN": None, "LOG_LEVEL": "INFO"},
env_file=".env.production",
)
print(config) # Hiển thị *** cho tất cả giá trị
print(config.get("API_KEY")) # Trả về giá trị thậtXem lời giải
python
from __future__ import annotations
import os
from pathlib import Path
from typing import Optional
from dotenv import dotenv_values
class SecretConfig:
"""Configuration loader với secret masking."""
def __init__(self, values: dict[str, str], secret_keys: set[str]) -> None:
self._values = dict(values)
self._secret_keys = secret_keys
@classmethod
def load(
cls,
required: list[str],
optional: Optional[dict[str, Optional[str]]] = None,
env_file: Optional[str] = None,
) -> "SecretConfig":
optional = optional or {}
all_keys = set(required) | set(optional.keys())
# Layer 1: Defaults
values: dict[str, str] = {k: v for k, v in optional.items() if v is not None}
# Layer 2: .env file
if env_file and Path(env_file).exists():
file_values = dotenv_values(env_file)
for key in all_keys:
if key in file_values and file_values[key]:
values[key] = file_values[key]
# Layer 3: Environment variables (highest priority)
for key in all_keys:
env_val = os.environ.get(key)
if env_val:
values[key] = env_val
missing = [k for k in required if k not in values]
if missing:
raise ValueError(f"Missing required config: {', '.join(missing)}")
return cls(values=values, secret_keys=set(required))
def get(self, key: str, default: Optional[str] = None) -> Optional[str]:
return self._values.get(key, default)
def __repr__(self) -> str:
masked = {
k: ("***" if k in self._secret_keys else v)
for k, v in self._values.items()
}
pairs = ", ".join(f"{k}={v!r}" for k, v in masked.items())
return f"SecretConfig({pairs})"Bài 2: Implement Webhook Signature Verifier
🧠 Quiz
Yêu cầu: Viết hàm verify webhook signatures:
- Tính HMAC-SHA256 từ payload và secret
- So sánh constant-time với signature nhận được
- Kiểm tra timestamp để chống replay attack (tolerance: 300 giây)
- Trả về
(is_valid, error_message)
Test cases:
python
# Valid signature, recent timestamp → (True, None)
# Valid signature, old timestamp → (False, "Timestamp expired")
# Invalid signature → (False, "Signature mismatch")
# Missing header → (False, "Missing signature header")Xem lời giải
python
import hashlib
import hmac
import time
from typing import Optional
def verify_webhook_signature(
payload: bytes,
signature_header: Optional[str],
secret: str,
tolerance_seconds: int = 300,
) -> tuple[bool, Optional[str]]:
"""Verify webhook signature với timing attack protection."""
if not signature_header:
return False, "Missing signature header"
parts: dict[str, str] = {}
try:
for element in signature_header.split(","):
key, value = element.strip().split("=", 1)
parts[key] = value
except ValueError:
return False, "Malformed signature header"
if "t" not in parts or "v1" not in parts:
return False, "Missing timestamp or signature"
try:
timestamp = int(parts["t"])
except ValueError:
return False, "Invalid timestamp format"
age = abs(time.time() - timestamp)
if age > tolerance_seconds:
return False, f"Timestamp expired ({int(age)}s old, max {tolerance_seconds}s)"
signed_payload = f"{timestamp}.".encode() + payload
expected = hmac.new(secret.encode(), signed_payload, hashlib.sha256).hexdigest()
if not hmac.compare_digest(expected, parts["v1"]):
return False, "Signature mismatch"
return True, None