Giao diện
Secure Coding Patterns Security
Patterns và best practices để xây dựng applications bảo mật từ đầu
Learning Outcomes
Sau khi hoàn thành trang này, bạn sẽ:
- 🎯 Hiểu cryptography basics và khi nào dùng encryption
- 🎯 Implement password hashing đúng cách với bcrypt/argon2
- 🎯 Handle JWT tokens an toàn
- 🎯 Configure security headers và CORS properly
- 🎯 Tránh các Production Pitfalls về security patterns
Cryptography Basics
Symmetric vs Asymmetric Encryption
┌─────────────────────────────────────────────────────────────┐
│ SYMMETRIC ENCRYPTION │
│ Same key for encrypt & decrypt │
│ Fast, good for large data │
│ Examples: AES, ChaCha20 │
│ │
│ Alice ──[key]──> Encrypt ──> Ciphertext ──> Decrypt ──> Bob│
│ │ │
│ [same key] │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ ASYMMETRIC ENCRYPTION │
│ Public key encrypts, private key decrypts │
│ Slower, good for key exchange │
│ Examples: RSA, ECDSA │
│ │
│ Alice ──[Bob's public key]──> Encrypt ──> Ciphertext │
│ │ │
│ Bob ──[Bob's private key]──> Decrypt ──> Plaintext │
└─────────────────────────────────────────────────────────────┘Using cryptography Library
python
# SECURITY: Never hardcode credentials in production code
# Use environment variables or secure credential management systems
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import base64
import os
# === SYMMETRIC ENCRYPTION (Fernet) ===
# Generate a key (store securely!)
key = Fernet.generate_key()
print(key) # b'...' - 32 bytes, base64 encoded
# Encrypt
fernet = Fernet(key)
plaintext = b"Secret message"
ciphertext = fernet.encrypt(plaintext)
print(ciphertext) # b'gAAAAA...'
# Decrypt
decrypted = fernet.decrypt(ciphertext)
print(decrypted) # b'Secret message'
# === KEY DERIVATION FROM PASSWORD ===
def derive_key_from_password(password: str, salt: bytes = None) -> tuple[bytes, bytes]:
"""
Derive encryption key from password using PBKDF2.
Args:
password: User password
salt: Salt bytes (generated if not provided)
Returns:
Tuple of (key, salt)
"""
if salt is None:
salt = os.urandom(16)
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=480000, # OWASP recommended minimum
)
key = base64.urlsafe_b64encode(kdf.derive(password.encode()))
return key, salt
# Usage
password = os.getenv("DB_PASSWORD")
key, salt = derive_key_from_password(password)
# Store salt with encrypted data
fernet = Fernet(key)
encrypted = fernet.encrypt(b"sensitive data")
# To decrypt later, need password + salt
key2, _ = derive_key_from_password(password, salt)
fernet2 = Fernet(key2)
decrypted = fernet2.decrypt(encrypted)Hashing vs Encryption
python
# HASHING: One-way, cannot reverse
# Use for: passwords, data integrity, checksums
# ENCRYPTION: Two-way, can decrypt with key
# Use for: data at rest, data in transit, secrets
import hashlib
# === HASHING (for integrity) ===
def hash_data(data: bytes) -> str:
"""Create SHA-256 hash of data."""
return hashlib.sha256(data).hexdigest()
# Verify integrity
original_hash = hash_data(b"important document")
# ... transfer document ...
received_hash = hash_data(received_document)
if original_hash != received_hash:
raise ValueError("Document was modified!")
# === HMAC (for authentication) ===
import hmac
def create_hmac(data: bytes, key: bytes) -> bytes:
"""Create HMAC for data authentication."""
return hmac.new(key, data, hashlib.sha256).digest()
def verify_hmac(data: bytes, key: bytes, signature: bytes) -> bool:
"""Verify HMAC signature."""
expected = create_hmac(data, key)
return hmac.compare_digest(expected, signature)Password Hashing
NEVER store passwords in plaintext or with reversible encryption. Use password hashing algorithms designed for this purpose.
bcrypt (Recommended)
python
import bcrypt
def hash_password(password: str) -> str:
"""
Hash password using bcrypt.
Args:
password: Plain text password
Returns:
Hashed password string (includes salt)
"""
# Generate salt and hash
salt = bcrypt.gensalt(rounds=12) # 2^12 iterations
hashed = bcrypt.hashpw(password.encode('utf-8'), salt)
return hashed.decode('utf-8')
def verify_password(password: str, hashed: str) -> bool:
"""
Verify password against hash.
Args:
password: Plain text password to verify
hashed: Stored hash
Returns:
True if password matches
"""
return bcrypt.checkpw(
password.encode('utf-8'),
hashed.encode('utf-8')
)
# Usage
hashed = hash_password("MySecureP@ssw0rd")
print(hashed) # $2b$12$...
# Verify
is_valid = verify_password("MySecureP@ssw0rd", hashed) # True
is_valid = verify_password("wrong", hashed) # FalseArgon2 (Modern Alternative)
python
from argon2 import PasswordHasher
from argon2.exceptions import VerifyMismatchError
# Create hasher with secure defaults
ph = PasswordHasher(
time_cost=3, # Number of iterations
memory_cost=65536, # 64 MB
parallelism=4, # Number of threads
hash_len=32, # Output length
salt_len=16, # Salt length
)
def hash_password_argon2(password: str) -> str:
"""Hash password using Argon2id."""
return ph.hash(password)
def verify_password_argon2(password: str, hashed: str) -> bool:
"""Verify password against Argon2 hash."""
try:
ph.verify(hashed, password)
return True
except VerifyMismatchError:
return False
def needs_rehash(hashed: str) -> bool:
"""Check if hash needs to be updated (params changed)."""
return ph.check_needs_rehash(hashed)
# Usage
hashed = hash_password_argon2("MySecureP@ssw0rd")
print(hashed) # $argon2id$v=19$m=65536,t=3,p=4$...
# Verify and rehash if needed
if verify_password_argon2(password, stored_hash):
if needs_rehash(stored_hash):
new_hash = hash_password_argon2(password)
update_user_password_hash(user_id, new_hash)Password Hashing Comparison
| Algorithm | Recommended? | Notes |
|---|---|---|
| bcrypt | ✅ Yes | Battle-tested, widely supported |
| Argon2id | ✅ Yes | Modern, memory-hard, PHC winner |
| scrypt | ✅ Yes | Memory-hard, good alternative |
| PBKDF2 | ⚠️ OK | Use high iterations (600k+) |
| SHA-256 | ❌ No | Too fast, no salt by default |
| MD5 | ❌ Never | Broken, too fast |
JWT Handling
JSON Web Tokens (JWT) are commonly used for authentication. Handle them carefully!
JWT Structure
┌─────────────────────────────────────────────────────────────┐
│ HEADER.PAYLOAD.SIGNATURE │
│ │
│ Header: {"alg": "HS256", "typ": "JWT"} │
│ Payload: {"sub": "user123", "exp": 1234567890, ...} │
│ Signature: HMAC-SHA256(header + payload, secret) │
└─────────────────────────────────────────────────────────────┘Creating and Verifying JWTs
python
import jwt
from datetime import datetime, timedelta, timezone
from typing import Optional
# Secret key (store securely, not in code!)
SECRET_KEY = "your-256-bit-secret-key-here"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
def create_access_token(
data: dict,
expires_delta: Optional[timedelta] = None
) -> str:
"""
Create JWT access token.
Args:
data: Payload data (user_id, roles, etc.)
expires_delta: Token expiration time
Returns:
Encoded JWT string
"""
to_encode = data.copy()
# Set expiration
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({
"exp": expire,
"iat": datetime.now(timezone.utc), # Issued at
"nbf": datetime.now(timezone.utc), # Not before
})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def verify_access_token(token: str) -> Optional[dict]:
"""
Verify and decode JWT token.
Args:
token: JWT string
Returns:
Decoded payload or None if invalid
"""
try:
payload = jwt.decode(
token,
SECRET_KEY,
algorithms=[ALGORITHM], # Explicit algorithm list!
options={
"require": ["exp", "iat", "sub"], # Required claims
"verify_exp": True,
"verify_iat": True,
"verify_nbf": True,
}
)
return payload
except jwt.ExpiredSignatureError:
return None # Token expired
except jwt.InvalidTokenError:
return None # Invalid token
# Usage
token = create_access_token({"sub": "user123", "role": "admin"})
print(token) # eyJhbGciOiJIUzI1NiIs...
payload = verify_access_token(token)
if payload:
print(f"User: {payload['sub']}")JWT Security Best Practices
python
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# ✅ GOOD: Verify algorithm explicitly
def decode_token_good(token: str) -> dict:
return jwt.decode(
token,
SECRET_KEY,
algorithms=["HS256"], # Only allow expected algorithm
)
# ❌ BAD: Algorithm confusion attack
def decode_token_bad(token: str) -> dict:
return jwt.decode(
token,
SECRET_KEY,
algorithms=jwt.algorithms.get_default_algorithms(), # Allows "none"!
)
# ✅ GOOD: Short-lived access tokens + refresh tokens
ACCESS_TOKEN_EXPIRE = timedelta(minutes=15)
REFRESH_TOKEN_EXPIRE = timedelta(days=7)
def create_tokens(user_id: str) -> dict:
"""Create access and refresh token pair."""
access_token = create_access_token(
{"sub": user_id, "type": "access"},
expires_delta=ACCESS_TOKEN_EXPIRE
)
refresh_token = create_access_token(
{"sub": user_id, "type": "refresh"},
expires_delta=REFRESH_TOKEN_EXPIRE
)
return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer"
}
# ✅ GOOD: Token revocation with blacklist
from functools import lru_cache
import redis
redis_client = redis.Redis()
def revoke_token(token: str, expires_in: int):
"""Add token to blacklist."""
redis_client.setex(f"blacklist:{token}", expires_in, "revoked")
def is_token_revoked(token: str) -> bool:
"""Check if token is blacklisted."""
return redis_client.exists(f"blacklist:{token}")
async def get_current_user(token: str = Depends(oauth2_scheme)):
"""Dependency to get current user from token."""
if is_token_revoked(token):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token has been revoked"
)
payload = verify_access_token(token)
if not payload:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired token"
)
return payloadSecurity Headers
HTTP security headers protect against common web attacks.
Essential Security Headers
python
from fastapi import FastAPI, Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
app = FastAPI()
class SecurityHeadersMiddleware(BaseHTTPMiddleware):
"""Add security headers to all responses."""
async def dispatch(self, request: Request, call_next):
response = await call_next(request)
# Prevent clickjacking
response.headers["X-Frame-Options"] = "DENY"
# Prevent MIME type sniffing
response.headers["X-Content-Type-Options"] = "nosniff"
# Enable XSS filter (legacy browsers)
response.headers["X-XSS-Protection"] = "1; mode=block"
# Control referrer information
response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
# Permissions Policy (formerly Feature-Policy)
response.headers["Permissions-Policy"] = (
"accelerometer=(), "
"camera=(), "
"geolocation=(), "
"gyroscope=(), "
"magnetometer=(), "
"microphone=(), "
"payment=(), "
"usb=()"
)
# Content Security Policy
response.headers["Content-Security-Policy"] = (
"default-src 'self'; "
"script-src 'self' 'unsafe-inline'; "
"style-src 'self' 'unsafe-inline'; "
"img-src 'self' data: https:; "
"font-src 'self'; "
"connect-src 'self'; "
"frame-ancestors 'none'; "
"base-uri 'self'; "
"form-action 'self'"
)
# HSTS (only for HTTPS)
if request.url.scheme == "https":
response.headers["Strict-Transport-Security"] = (
"max-age=31536000; includeSubDomains; preload"
)
return response
app.add_middleware(SecurityHeadersMiddleware)Content Security Policy (CSP) Deep Dive
from dataclasses import dataclass, field from typing import List
@dataclass class CSPBuilder: """Build Content Security Policy header."""
default_src: List[str] = field(default_factory=lambda: ["'self'"])
script_src: List[str] = field(default_factory=list)
style_src: List[str] = field(default_factory=list)
img_src: List[str] = field(default_factory=list)
font_src: List[str] = field(default_factory=list)
connect_src: List[str] = field(default_factory=list)
frame_src: List[str] = field(default_factory=list)
frame_ancestors: List[str] = field(default_factory=lambda: ["'none'"])
base_uri: List[str] = field(default_factory=lambda: ["'self'"])
form_action: List[str] = field(default_factory=lambda: ["'self'"])
report_uri: str = None
def build(self) -> str:
"""Build CSP header string."""
directives = []
if self.default_src:
directives.append(f"default-src {' '.join(self.default_src)}")
if self.script_src:
directives.append(f"script-src {' '.join(self.script_src)}")
if self.style_src:
directives.append(f"style-src {' '.join(self.style_src)}")
if self.img_src:
directives.append(f"img-src {' '.join(self.img_src)}")
if self.font_src:
directives.append(f"font-src {' '.join(self.font_src)}")
if self.connect_src:
directives.append(f"connect-src {' '.join(self.connect_src)}")
if self.frame_src:
directives.append(f"frame-src {' '.join(self.frame_src)}")
if self.frame_ancestors:
directives.append(f"frame-ancestors {' '.join(self.frame_ancestors)}")
if self.base_uri:
directives.append(f"base-uri {' '.join(self.base_uri)}")
if self.form_action:
directives.append(f"form-action {' '.join(self.form_action)}")
if self.report_uri:
directives.append(f"report-uri {self.report_uri}")
return "; ".join(directives)
Usage
csp = CSPBuilder( default_src=["'self'"], script_src=["'self'", "https://cdn.example.com"], style_src=["'self'", "'unsafe-inline'"], # Needed for some frameworks img_src=["'self'", "data:", "https:"], connect_src=["'self'", "https://api.example.com"], report_uri="/csp-report" )
print(csp.build())
default-src 'self'; script-src 'self' https://cdn.example.com; ...
]]) print(csp.build())
default-src 'self'; script-src 'self' https://cdn.example.com; ...
---
## CORS Configuration
Cross-Origin Resource Sharing (CORS) controls which domains can access your API.
### FastAPI CORS Setup
```python
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
import os
app = FastAPI()
# ❌ BAD: Allow all origins with credentials
# app.add_middleware(
# CORSMiddleware,
# allow_origins=["*"],
# allow_credentials=True, # DANGEROUS with *!
# )
# ✅ GOOD: Specific origins
ALLOWED_ORIGINS = [
"https://myapp.com",
"https://www.myapp.com",
"https://admin.myapp.com",
]
# Add localhost for development
if os.environ.get("ENVIRONMENT") == "development":
ALLOWED_ORIGINS.extend([
"http://localhost:3000",
"http://localhost:5173",
"http://127.0.0.1:3000",
])
app.add_middleware(
CORSMiddleware,
allow_origins=ALLOWED_ORIGINS,
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
allow_headers=["Authorization", "Content-Type", "X-Request-ID"],
expose_headers=["X-Request-ID", "X-RateLimit-Remaining"],
max_age=600, # Cache preflight for 10 minutes
)Dynamic CORS (Multi-tenant)
python
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import Response
class DynamicCORSMiddleware(BaseHTTPMiddleware):
"""CORS middleware with dynamic origin validation."""
def __init__(self, app, allowed_origin_pattern: str = None):
super().__init__(app)
self.allowed_origin_pattern = allowed_origin_pattern
async def dispatch(self, request: Request, call_next):
origin = request.headers.get("origin")
# Handle preflight
if request.method == "OPTIONS":
response = Response()
if self._is_allowed_origin(origin):
self._add_cors_headers(response, origin)
return response
response = await call_next(request)
if origin and self._is_allowed_origin(origin):
self._add_cors_headers(response, origin)
return response
def _is_allowed_origin(self, origin: str) -> bool:
"""Check if origin is allowed."""
if not origin:
return False
# Exact match
allowed = {
"https://myapp.com",
"https://admin.myapp.com",
}
if origin in allowed:
return True
# Pattern match (e.g., *.myapp.com)
import re
if self.allowed_origin_pattern:
if re.match(self.allowed_origin_pattern, origin):
return True
return False
def _add_cors_headers(self, response: Response, origin: str):
"""Add CORS headers to response."""
response.headers["Access-Control-Allow-Origin"] = origin
response.headers["Access-Control-Allow-Credentials"] = "true"
response.headers["Access-Control-Allow-Methods"] = "GET, POST, PUT, DELETE"
response.headers["Access-Control-Allow-Headers"] = "Authorization, Content-Type"
response.headers["Access-Control-Max-Age"] = "600"
# Usage
app.add_middleware(
DynamicCORSMiddleware,
allowed_origin_pattern=r"https://.*\.myapp\.com$"
)CORS Security Checklist
python
# ✅ DO:
# - Specify exact origins when possible
# - Use HTTPS origins only in production
# - Limit allowed methods to what's needed
# - Limit allowed headers to what's needed
# - Set reasonable max_age for preflight caching
# ❌ DON'T:
# - Use allow_origins=["*"] with allow_credentials=True
# - Allow all methods (["*"]) unless necessary
# - Allow all headers (["*"]) unless necessary
# - Forget to handle preflight (OPTIONS) requestsProduction Pitfalls
Pitfall 1: Weak JWT Secret
python
import secrets
# ❌ BUG: Weak or predictable secret
SECRET_KEY = "secret" # Too short, guessable
SECRET_KEY = "my-app-secret-key" # Still weak
# ✅ FIX: Strong random secret
SECRET_KEY = secrets.token_hex(32) # 256 bits
# Store in environment variable, not code!
# Generate once and store:
# python -c "import secrets; print(secrets.token_hex(32))"Pitfall 2: Algorithm Confusion Attack
python
import jwt
# ❌ BUG: Not specifying algorithm
def decode_bad(token: str) -> dict:
return jwt.decode(token, SECRET_KEY)
# Attacker can use "none" algorithm!
# Attack: Change header to {"alg": "none"}, remove signature
# Token: eyJhbGciOiJub25lIn0.eyJzdWIiOiJhZG1pbiJ9.
# ✅ FIX: Always specify allowed algorithms
def decode_good(token: str) -> dict:
return jwt.decode(
token,
SECRET_KEY,
algorithms=["HS256"] # Only allow expected algorithm
)Pitfall 3: Password Hash Timing Attack
python
# ❌ BUG: Early return reveals valid usernames
def login_bad(username: str, password: str) -> bool:
user = get_user(username)
if not user:
return False # Fast return = username doesn't exist
return verify_password(password, user.password_hash)
# Slow return = username exists (timing difference)
# ✅ FIX: Constant time for all paths
import bcrypt
DUMMY_HASH = bcrypt.hashpw(b"dummy", bcrypt.gensalt())
def login_good(username: str, password: str) -> bool:
user = get_user(username)
if user:
password_hash = user.password_hash.encode()
else:
password_hash = DUMMY_HASH # Still do hash comparison
# Always takes same time regardless of user existence
is_valid = bcrypt.checkpw(password.encode(), password_hash)
return is_valid and user is not Nonefrom fastapi.middleware.cors import CORSMiddleware
❌ BUG: Wildcard with credentials
app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, # DANGEROUS! )
Browsers block this, but it's still a security smell
❌ BUG: Reflecting origin without validation
@app.middleware("http") async def bad_cors(request, call_next): response = await call_next(request) origin = request.headers.get("origin") response.headers["Access-Control-Allow-Origin"] = origin # Reflects ANY origin! return response
✅ FIX: Validate origin against whitelist
ALLOWED_ORIGINS = {"https://myapp.com", "https://admin.myapp.com"}
@app.middleware("http") async def good_cors(request, call_next): response = await call_next(request) origin = request.headers.get("origin") if origin in ALLOWED_ORIGINS: response.headers["Access-Control-Allow-Origin"] = origin return response } if origin in ALLOWED_ORIGINS: response.headers["Access-Control-Allow-Origin"] = origin return response
### Pitfall 5: Missing Rate Limiting
```python
from fastapi import FastAPI, Request, HTTPException
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
limiter = Limiter(key_func=get_remote_address)
app = FastAPI()
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
# ✅ GOOD: Rate limit sensitive endpoints
@app.post("/login")
@limiter.limit("5/minute") # 5 attempts per minute
async def login(request: Request, credentials: LoginRequest):
# Login logic
pass
@app.post("/password-reset")
@limiter.limit("3/hour") # 3 requests per hour
async def password_reset(request: Request, email: str):
# Password reset logic
pass
@app.get("/api/data")
@limiter.limit("100/minute") # General API rate limit
async def get_data(request: Request):
# API logic
pass=== PASSWORD HASHING ===
import bcrypt hashed = bcrypt.hashpw(password.encode(), bcrypt.gensalt(rounds=12)) is_valid = bcrypt.checkpw(password.encode(), hashed)
=== JWT ===
import jwt token = jwt.encode({"sub": user_id, "exp": expire}, SECRET, algorithm="HS256") payload = jwt.decode(token, SECRET, algorithms=["HS256"])
=== ENCRYPTION ===
from cryptography.fernet import Fernet key = Fernet.generate_key() f = Fernet(key) encrypted = f.encrypt(data) decrypted = f.decrypt(encrypted)
=== SECURITY HEADERS ===
response.headers["X-Frame-Options"] = "DENY" response.headers["X-Content-Type-Options"] = "nosniff" response.headers["Strict-Transport-Security"] = "max-age=31536000"
=== CORS ===
from fastapi.middleware.cors import CORSMiddleware app.add_middleware(CORSMiddleware, allow_origins=["https://myapp.com"]) ])
=== CORS ===
from fastapi.middleware.cors import CORSMiddleware app.add_middleware(CORSMiddleware, allow_origins=["https://myapp.com"])
---
## Cross-links
- **Prerequisites**: [Common Vulnerabilities](/python/security/vulnerabilities)
- **Related**: [Secrets Management](/python/security/secrets) - Store keys securely
- **See Also**: [FastAPI](/python/backend/fastapi) - API security
- **See Also**: [Production Deployment](/python/backend/deployment) - Secure deployment
---
<style>
.hero-subtitle {
font-size: 1.3rem;
color: var(--vp-c-text-2);
margin-bottom: 2rem;
font-weight: 300;
}
</style>