Skip to content

Input Validation — Xác thực đầu vào

Tháng 3 năm 2023, một fintech startup tại Đông Nam Á mất 2.3 triệu USD trong vòng 47 phút. Nguyên nhân không phải zero-day exploit hay APT attack — mà là một API endpoint cho phép field amount nhận giá trị âm. Kẻ tấn công gửi request {"amount": -50000, "currency": "USD"}, hệ thống xử lý refund ngược, tiền chảy ra ngoài không kiểm soát. Một dòng validation thiếu, thiệt hại hàng triệu đô.

Input validation không phải "nice-to-have" — nó là tuyến phòng thủ đầu tiên của mọi hệ thống production. Mỗi byte dữ liệu đi qua API, form, file upload, webhook đều là vector tấn công tiềm tàng. SQL injection, XSS, command injection, ReDoS — tất cả bắt nguồn từ một lỗi chung: tin tưởng dữ liệu đầu vào mà không xác thực.

Bài viết này trang bị cho bạn arsenal đầy đủ: từ Pydantic v2 — thư viện validation nhanh nhất Python hiện tại, đến Cerberus cho schema linh hoạt, JSON Schema cho API contract, và các kỹ thuật sanitization đã được kiểm chứng trong production.

Bức tranh tư duy

Hãy hình dung hệ thống của bạn như sân bay quốc tế Nội Bài. Mỗi request đến là một hành khách nhập cảnh. Không ai được phép vào lãnh thổ mà không qua hải quan — kiểm tra hộ chiếu (type validation), khai báo hành lý (schema validation), quét an ninh (sanitization), và kiểm dịch (business rule validation). Bỏ qua bất kỳ trạm kiểm soát nào, bạn không biết ai đang đi vào hệ thống.

Request đến ──► [ Trạm 1: Type Check ]
                      │ Đúng kiểu dữ liệu?

               [ Trạm 2: Schema Validation ]
                      │ Đúng cấu trúc, ràng buộc?

               [ Trạm 3: Business Rules ]
                      │ Hợp lệ về nghiệp vụ?

               [ Trạm 4: Sanitization ]
                      │ Loại bỏ ký tự nguy hiểm?

               [ Trạm 5: Context Encoding ]
                      │ Escape cho SQL / HTML / Shell?

               ✅ Dữ liệu an toàn ──► Business Logic

Nguyên tắc vàng: Validate ở biên (boundary), sanitize trước khi sử dụng, encode theo ngữ cảnh. Không bao giờ validate một lần rồi tin tưởng mãi mãi — dữ liệu có thể bị thay đổi giữa các layer.

Cốt lõi kỹ thuật

1. Nền tảng Input Validation

Trước khi dùng bất kỳ thư viện nào, bạn cần nắm vững ba nguyên tắc nền tảng:

  • Whitelist over Blacklist: Định nghĩa những gì được phép, không phải những gì bị cấm. Blacklist luôn có lỗ hổng — kẻ tấn công sẽ tìm ra encoding hoặc variation mà bạn chưa nghĩ tới.
  • Fail Closed: Khi validation gặp input không xác định, mặc định là từ chối.
  • Defense in Depth: Validate ở nhiều layer — client, API gateway, application, database. Không layer nào được tin tưởng layer khác.
python
from dataclasses import dataclass
from enum import Enum


class ValidationResult(Enum):
    ACCEPT = "accept"
    REJECT = "reject"


@dataclass(frozen=True)
class ValidationError:
    field: str
    message: str
    code: str


def validate_whitelist(
    value: str, allowed_chars: str, max_length: int = 255,
) -> tuple[ValidationResult, list[ValidationError]]:
    """Whitelist validation — chỉ chấp nhận ký tự được phép."""
    errors: list[ValidationError] = []
    if len(value) > max_length:
        errors.append(ValidationError("value", f"Vượt quá {max_length} ký tự", "MAX_LENGTH"))
    invalid = set(value) - set(allowed_chars)
    if invalid:
        errors.append(ValidationError("value", f"Ký tự không hợp lệ: {invalid}", "INVALID_CHARS"))
    return (ValidationResult.REJECT if errors else ValidationResult.ACCEPT), errors

ALLOWED_USERNAME = "abcdefghijklmnopqrstuvwxyz0123456789_"
assert validate_whitelist("john_doe", ALLOWED_USERNAME)[0] == ValidationResult.ACCEPT
assert validate_whitelist("admin'; DROP--", ALLOWED_USERNAME)[0] == ValidationResult.REJECT

2. Pydantic v2 — Validation Engine tốc độ cao

Pydantic v2 viết lại core bằng Rust (pydantic-core), nhanh gấp 5–50 lần v1. Đây là tiêu chuẩn thực tế cho validation trong Python production.

python
from datetime import date
from decimal import Decimal
from typing import Optional
from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator


class CreateOrderRequest(BaseModel):
    """Request tạo đơn hàng — production-grade validation."""
    model_config = ConfigDict(
        strict=True, str_strip_whitespace=True, str_max_length=10_000,
    )

    customer_id: str = Field(..., min_length=8, max_length=36, pattern=r"^[a-zA-Z0-9\-]+$")
    items: list["OrderItem"] = Field(..., min_length=1, max_length=100)
    shipping_address: "Address"
    note: Optional[str] = Field(default=None, max_length=500)
    coupon_code: Optional[str] = Field(default=None, pattern=r"^[A-Z0-9]{4,20}$")

    @model_validator(mode="after")
    def validate_total_amount(self) -> "CreateOrderRequest":
        total = sum(item.unit_price * item.quantity for item in self.items)
        if total > Decimal("100_000_000"):
            raise ValueError("Tổng đơn hàng vượt quá 100 triệu VND")
        return self


class OrderItem(BaseModel):
    model_config = ConfigDict(strict=True)
    sku: str = Field(..., pattern=r"^[A-Z]{2,5}-\d{4,10}$")
    quantity: int = Field(..., gt=0, le=999)
    unit_price: Decimal = Field(..., gt=0, decimal_places=2)


class Address(BaseModel):
    model_config = ConfigDict(str_strip_whitespace=True)
    street: str = Field(..., min_length=5, max_length=200)
    district: str = Field(..., min_length=1, max_length=100)
    city: str = Field(..., min_length=1, max_length=100)
    phone: str = Field(..., pattern=r"^(\+84|0)\d{9,10}$")

    @field_validator("phone")
    @classmethod
    def normalize_phone(cls, v: str) -> str:
        return f"+84{v[1:]}" if v.startswith("0") else v

Xử lý lỗi validation — trả structured error cho client:

python
from pydantic import ValidationError
from fastapi import HTTPException

def parse_order_request(raw_data: dict) -> CreateOrderRequest:
    try:
        return CreateOrderRequest.model_validate(raw_data)
    except ValidationError as exc:
        details = [
            {"field": ".".join(str(l) for l in e["loc"]), "message": e["msg"]}
            for e in exc.errors()
        ]
        raise HTTPException(status_code=422, detail=details)

3. Cerberus — Schema Validation linh hoạt

Khi bạn cần validation schema động (schema thay đổi theo config, không fix cứng lúc compile), Cerberus là lựa chọn phù hợp hơn Pydantic.

python
from cerberus import Validator

product_schema = {
    "name": {
        "type": "string", "required": True,
        "minlength": 2, "maxlength": 200,
        "regex": r"^[\w\s\-\.]+$",
    },
    "price": {
        "type": "float", "required": True,
        "min": 0.01, "max": 999_999_999.99,
    },
    "category": {
        "type": "string", "required": True,
        "allowed": ["electronics", "clothing", "food", "books"],
    },
    "tags": {
        "type": "list", "maxlength": 10,
        "schema": {"type": "string", "maxlength": 50, "regex": r"^[a-z0-9\-]+$"},
    },
}

def validate_product(data: dict) -> tuple[bool, dict | list]:
    v = Validator(product_schema, purge_unknown=True)
    if v.validate(data):
        return True, v.document
    return False, v.errors

valid, result = validate_product({
    "name": "Laptop Dell XPS 15",
    "price": 35_990_000.00,
    "category": "electronics",
    "tags": ["laptop", "dell", "premium"],
    "unknown_field": "sẽ bị loại bỏ",  # purge_unknown=True → loại bỏ
})
assert valid is True
assert "unknown_field" not in result

4. JSON Schema Validation

Khi bạn cần validation cross-language (API contract giữa Python backend và frontend), JSON Schema là tiêu chuẩn RFC.

python
from jsonschema import Draft202012Validator

webhook_schema = {
    "$schema": "https://json-schema.org/draft/2020-12/schema",
    "type": "object",
    "required": ["event", "timestamp", "payload"],
    "additionalProperties": False,
    "properties": {
        "event": {
            "type": "string",
            "enum": ["order.created", "order.updated", "payment.completed"],
        },
        "timestamp": {"type": "string", "format": "date-time"},
        "payload": {"type": "object", "minProperties": 1, "maxProperties": 50},
        "idempotency_key": {"type": "string", "pattern": "^[a-f0-9]{32}$"},
    },
}

# Compile schema một lần khi import, dùng nhiều lần
compiled_validator = Draft202012Validator(webhook_schema)

def validate_webhook(data: dict) -> list[str]:
    """Trả danh sách lỗi chi tiết."""
    errors = []
    for error in sorted(compiled_validator.iter_errors(data), key=lambda e: list(e.path)):
        path = ".".join(str(p) for p in error.absolute_path) or "(root)"
        errors.append(f"[{path}] {error.message}")
    return errors

def validate_webhook_fast(data: dict) -> bool:
    """Chỉ cần biết pass/fail — nhanh hơn."""
    return compiled_validator.is_valid(data)

5. Kỹ thuật Sanitization

Validation kiểm tra dữ liệu có hợp lệ không. Sanitization biến đổi dữ liệu để an toàn khi sử dụng trong ngữ cảnh cụ thể.

python
import html
import os
import re
import unicodedata
from pathlib import Path
from urllib.parse import urlparse


def sanitize_text(value: str, max_length: int = 1_000) -> str:
    """Sanitize text: normalize unicode, strip nulls, escape HTML."""
    value = unicodedata.normalize("NFKC", value)    # Chống homograph
    value = value.replace("\x00", "")                # Loại null bytes
    value = re.sub(r"[\x00-\x1f\x7f]", "", value)   # Loại control chars
    value = " ".join(value.split())                  # Normalize whitespace
    value = html.escape(value)                       # Escape HTML
    return value[:max_length].strip()


def sanitize_filename(filename: str, allowed_ext: set[str]) -> str:
    """Sanitize filename — chống path traversal."""
    filename = os.path.basename(filename).replace("\x00", "")
    filename = re.sub(r"[^\w.\-]", "_", filename).strip(". ")
    if not filename:
        raise ValueError("Filename rỗng sau sanitization")
    ext = Path(filename).suffix.lower()
    if ext not in allowed_ext:
        raise ValueError(f"Extension {ext} không được phép")
    return filename[:255]


def sanitize_url(url: str, allowed_hosts: set[str]) -> str:
    """Validate URL — chống SSRF và credential injection."""
    parsed = urlparse(url)
    if parsed.scheme not in ("http", "https"):
        raise ValueError(f"Scheme không hợp lệ: {parsed.scheme}")
    if parsed.netloc not in allowed_hosts:
        raise ValueError(f"Host không được phép: {parsed.netloc}")
    if parsed.username or parsed.password:
        raise ValueError("URL không được chứa credentials")
    return url

Thực chiến

API Input Hardening cho Payment Service

python
"""payment_validation.py — Production input hardening layer."""
import hashlib
import logging
from decimal import Decimal

from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator
from fastapi import FastAPI, Request, HTTPException
from starlette.middleware.base import BaseHTTPMiddleware

logger = logging.getLogger(__name__)
MAX_BODY_SIZE = 64 * 1024  # 64KB


class RequestSizeLimiter(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        content_length = request.headers.get("content-length")
        if content_length and int(content_length) > MAX_BODY_SIZE:
            raise HTTPException(status_code=413, detail="Payload quá lớn")
        return await call_next(request)


class PaymentRequest(BaseModel):
    model_config = ConfigDict(strict=True, str_strip_whitespace=True)

    idempotency_key: str = Field(..., min_length=32, max_length=64, pattern=r"^[a-f0-9\-]+$")
    amount: Decimal = Field(..., gt=0, le=Decimal("500_000_000"))
    currency: str = Field(..., pattern=r"^(VND|USD|EUR)$")
    merchant_id: str = Field(..., pattern=r"^MCH-[A-Z0-9]{8,16}$")
    customer_email: str = Field(..., max_length=254)
    description: str = Field(..., min_length=1, max_length=500)

    @field_validator("amount")
    @classmethod
    def validate_amount_precision(cls, v: Decimal) -> Decimal:
        if v.as_tuple().exponent < -2:
            raise ValueError("Tối đa 2 chữ số thập phân")
        return v

    @field_validator("customer_email")
    @classmethod
    def validate_email_strict(cls, v: str) -> str:
        import re
        if not re.match(r"^[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}$", v):
            raise ValueError("Email không hợp lệ")
        return v.lower()

    @model_validator(mode="after")
    def validate_currency_amount(self) -> "PaymentRequest":
        if self.currency == "VND" and self.amount != int(self.amount):
            raise ValueError("VND không hỗ trợ số thập phân")
        return self


app = FastAPI()
app.add_middleware(RequestSizeLimiter)


@app.post("/api/v1/payments")
async def create_payment(request: Request):
    if request.headers.get("content-type") != "application/json":
        raise HTTPException(status_code=415, detail="Chỉ chấp nhận JSON")

    try:
        raw_data = await request.json()
    except Exception:
        raise HTTPException(status_code=400, detail="JSON không hợp lệ")

    try:
        payment = PaymentRequest.model_validate(raw_data)
    except Exception as exc:
        logger.warning("Payment validation failed", extra={"error": str(exc)})
        raise HTTPException(status_code=422, detail=str(exc))

    # Fingerprint để detect duplicate/replay
    fp_data = f"{payment.merchant_id}:{payment.amount}:{payment.currency}"
    fingerprint = hashlib.sha256(fp_data.encode()).hexdigest()[:16]

    logger.info("Payment validated", extra={"merchant": payment.merchant_id, "fp": fingerprint})
    return {"status": "accepted", "fingerprint": fingerprint}

Sai lầm điển hình

Sai lầm 1: Tin tưởng type coercion mặc định

❌ SAI

python
from pydantic import BaseModel

class UserInput(BaseModel):
    is_admin: bool
    count: int

user = UserInput(is_admin="yes", count="42")
print(user.is_admin)  # True — "yes" thành True!
print(user.count)     # 42 — "42" thành 42
# Kẻ tấn công gửi is_admin="anything" → luôn True

✅ ĐÚNG

python
from pydantic import BaseModel, ConfigDict

class UserInput(BaseModel):
    model_config = ConfigDict(strict=True)
    is_admin: bool
    count: int

# UserInput(is_admin="yes", count="42")  → ValidationError!
# Chỉ chấp nhận đúng kiểu: bool và int
user = UserInput(is_admin=False, count=42)

Sai lầm 2: Validate nhưng không sanitize

❌ SAI

python
from pydantic import BaseModel, Field

class Comment(BaseModel):
    content: str = Field(..., max_length=1000)

comment = Comment(content="<script>alert('xss')</script>")
# Validation pass — nhưng content chứa XSS payload!
# Render trực tiếp vào HTML → bị tấn công

✅ ĐÚNG

python
import html
from pydantic import BaseModel, Field, field_validator

class Comment(BaseModel):
    content: str = Field(..., max_length=1000)

    @field_validator("content")
    @classmethod
    def sanitize_content(cls, v: str) -> str:
        return html.escape(v.strip())

comment = Comment(content="<script>alert('xss')</script>")
print(comment.content)
# "&lt;script&gt;alert(&#x27;xss&#x27;)&lt;/script&gt;" — an toàn

Sai lầm 3: Mass assignment

❌ SAI

python
@app.put("/users/{user_id}")
async def update_user(user_id: int, data: dict):
    for key, value in data.items():
        setattr(user, key, value)  # {"is_admin": true} → leo thang quyền!

✅ ĐÚNG

python
from pydantic import BaseModel
from typing import Optional

class UserUpdate(BaseModel):
    display_name: Optional[str] = None
    email: Optional[str] = None
    # is_admin KHÔNG có → không thể set qua API

@app.put("/users/{user_id}")
async def update_user(user_id: int, data: UserUpdate):
    for key, value in data.model_dump(exclude_unset=True).items():
        setattr(user, key, value)

Sai lầm 4: Regex không anchor

❌ SAI

python
import re
def is_valid_id(value: str) -> bool:
    return bool(re.match(r"\d+", value))

is_valid_id("123abc")        # True! — match "123", bỏ "abc"
is_valid_id("1; DROP TABLE") # True! — match "1", bỏ phần sau

✅ ĐÚNG

python
import re
def is_valid_id(value: str) -> bool:
    return bool(re.fullmatch(r"\d{1,10}", value))  # Anchor toàn bộ string

is_valid_id("123abc")        # False
is_valid_id("1; DROP TABLE") # False
is_valid_id("12345")         # True

Sai lầm 5: Không giới hạn kích thước input

❌ SAI

python
class SearchRequest(BaseModel):
    query: str       # Không giới hạn — attacker gửi 100MB string
    tags: list[str]  # Không giới hạn — 1 triệu items → OOM crash

✅ ĐÚNG

python
from pydantic import BaseModel, Field, ConfigDict

class SearchRequest(BaseModel):
    model_config = ConfigDict(str_max_length=10_000)
    query: str = Field(..., min_length=1, max_length=500)
    tags: list[str] = Field(default_factory=list, max_length=20)
    page: int = Field(default=1, ge=1, le=1000)

Under the Hood

Pydantic v2 — bên trong hoạt động thế nào?

Pydantic v2 biên dịch schema thành validator chain bằng Rust (pydantic-core). Schema được compile một lần khi import, mỗi lần validate() chạy hoàn toàn trong Rust.

Benchmark (10,000 validations, model 8 fields):

Thư việnThời gianGhi chú
Pydantic v2 (strict)~45msHot path trong Rust
jsonschema~900msPure Python
Cerberus~1,200msLinh hoạt nhưng chậm
Pydantic v1~850msDeprecated

ReDoS — Regex Denial of Service

Regex engine của Python sử dụng backtracking NFA. Một số pattern gây catastrophic backtracking — thời gian chạy tăng theo hàm mũ với độ dài input.

python
import re
import time

evil_pattern = re.compile(r"^(a+)+$")  # Nested quantifiers → nguy hiểm!

start = time.perf_counter()
evil_pattern.match("a" * 25 + "b")     # ~1-2 giây (hàm mũ!)
# "a" * 30 + "b" → hàng chục giây; "a" * 40 + "b" → treo máy

Phòng chống ReDoS — luôn giới hạn input length trước khi regex:

python
import re

def safe_regex_match(pattern: str, text: str, max_len: int = 10_000) -> bool:
    if len(text) > max_len:
        raise ValueError(f"Input quá dài: {len(text)}")
    return bool(re.match(pattern, text))

# Quy tắc: (1) không nested quantifiers (a+)+
# (2) không overlapping alternatives (a|a)+
# (3) dùng re.fullmatch() thay re.match()
# (4) cân nhắc google-re2 cho linear time guarantee

Validation Performance

Với high-throughput systems, pre-compile regex và cache results:

python
import re
from functools import lru_cache

# ✅ Pre-compile một lần, dùng nhiều lần
EMAIL_PATTERN = re.compile(r"^[\w.+-]+@[\w-]+\.[\w.]+$")

def validate_email_fast(email: str) -> bool:
    return bool(EMAIL_PATTERN.match(email))

@lru_cache(maxsize=1024)  # Cache cho batch processing
def validate_email_cached(email: str) -> bool:
    return bool(EMAIL_PATTERN.match(email))

Checklist ghi nhớ

✅ Checklist triển khai

Thiết kế Validation

  • [ ] Sử dụng strict mode trong Pydantic (ConfigDict(strict=True))
  • [ ] Whitelist thay vì blacklist cho mọi input
  • [ ] Đặt max_length cho mọi string field — không có ngoại lệ
  • [ ] Đặt max_length cho mọi list/array field
  • [ ] Dùng additionalProperties: false trong JSON Schema

Sanitization

  • [ ] Normalize Unicode (NFKC) trước khi validate
  • [ ] Loại bỏ null bytes từ mọi string input
  • [ ] HTML escape trước khi render user content
  • [ ] Sanitize filename trước khi lưu file upload
  • [ ] Validate URL scheme và host trước khi fetch (chống SSRF)

Phòng chống Injection

  • [ ] Luôn dùng parameterized queries — không bao giờ nối chuỗi SQL
  • [ ] Không dùng shell=True trong subprocess — truyền list arguments
  • [ ] Dùng separate model cho Create / Update / Response (không dùng chung)

Production Hardening

  • [ ] Giới hạn request body size ở middleware level
  • [ ] Log validation failures với đủ context để điều tra
  • [ ] Kiểm tra ReDoS cho mọi regex pattern trong codebase
  • [ ] Pre-compile regex và JSON Schema validators khi khởi động app

Bài tập luyện tập

Bài 1: Xây dựng User Registration Validator

Viết Pydantic model RegisterUser với các yêu cầu:

  • username: 3–30 ký tự, chỉ alphanumeric và underscore, không bắt đầu bằng số
  • email: email hợp lệ, normalize về lowercase
  • password: 8–72 ký tự, phải có uppercase, lowercase, digit, special char
  • date_of_birth: phải từ 13 tuổi trở lên
  • referral_code: optional, format REF-XXXX (X là alphanumeric uppercase)

🧠 Quiz

Câu hỏi kiểm tra: Tại sao password nên giới hạn tối đa 72 ký tự?

  • [ ] A. Vì database column chỉ chứa 72 ký tự
  • [x] B. Vì bcrypt chỉ hash 72 bytes đầu tiên — phần còn lại bị bỏ qua
  • [ ] C. Vì UX — người dùng không nhớ password dài hơn
  • [ ] D. Vì RFC 7617 quy định giới hạn 72 ký tự

Giải thích: bcrypt truncate input tại 72 bytes. Password "A" * 72 + "B""A" * 72 + "C" sẽ có cùng hash — tạo lỗ hổng bảo mật. Giới hạn max_length=72 đảm bảo toàn bộ password được hash.

Đáp án tham khảo
python
from datetime import date
from typing import Optional

from pydantic import BaseModel, ConfigDict, EmailStr, Field, field_validator


class RegisterUser(BaseModel):
    model_config = ConfigDict(strict=True, str_strip_whitespace=True)

    username: str = Field(
        ..., min_length=3, max_length=30,
        pattern=r"^[a-zA-Z_][a-zA-Z0-9_]*$",
    )
    email: EmailStr
    password: str = Field(..., min_length=8, max_length=72)
    date_of_birth: date
    referral_code: Optional[str] = Field(
        default=None, pattern=r"^REF-[A-Z0-9]{4}$",
    )

    @field_validator("email")
    @classmethod
    def normalize_email(cls, v: str) -> str:
        return v.lower()

    @field_validator("password")
    @classmethod
    def validate_password_strength(cls, v: str) -> str:
        checks = [
            (any(c.isupper() for c in v), "cần ít nhất 1 chữ hoa"),
            (any(c.islower() for c in v), "cần ít nhất 1 chữ thường"),
            (any(c.isdigit() for c in v), "cần ít nhất 1 chữ số"),
            (any(c in "!@#$%^&*()_+-=[]{}|;:,.<>?" for c in v), "cần ký tự đặc biệt"),
        ]
        failures = [msg for passed, msg in checks if not passed]
        if failures:
            raise ValueError(f"Password yếu: {', '.join(failures)}")
        return v

    @field_validator("date_of_birth")
    @classmethod
    def validate_minimum_age(cls, v: date) -> date:
        today = date.today()
        age = today.year - v.year - ((today.month, today.day) < (v.month, v.day))
        if age < 13:
            raise ValueError("Phải từ 13 tuổi trở lên")
        if age > 150:
            raise ValueError("Ngày sinh không hợp lệ")
        return v

Bài 2: Phát hiện và sửa lỗ hổng Validation

Đoạn code dưới đây có 4 lỗ hổng validation. Hãy tìm và sửa tất cả.

python
import subprocess, sqlite3

def search_products(query: str, sort_by: str, db_path: str) -> list:
    conn = sqlite3.connect(db_path)
    sql = f"SELECT * FROM products WHERE name LIKE '%{query}%' ORDER BY {sort_by}"
    conn.cursor().execute(sql)
    return conn.cursor().fetchall()

def generate_report(filename: str) -> str:
    return subprocess.run(
        f"cat reports/{filename}", shell=True, capture_output=True, text=True,
    ).stdout

🧠 Quiz

Câu hỏi kiểm tra: Đoạn code trên có bao nhiêu lỗ hổng injection?

  • [ ] A. 2 — SQL injection và command injection
  • [ ] B. 3 — SQL injection, command injection, path traversal
  • [x] C. 4 — SQL injection (query), SQL injection (sort_by), command injection, path traversal
  • [ ] D. 5 lỗ hổng

Giải thích: (1) query nối trực tiếp vào SQL → SQL injection. (2) sort_by cũng nối trực tiếp — ORDER BY injection. (3) filename truyền vào shell → command injection. (4) filename có thể chứa ../ → path traversal đọc file tùy ý.

Đáp án tham khảo
python
import sqlite3
from pathlib import Path
from enum import Enum

class SortColumn(Enum):
    NAME = "name"
    PRICE = "price"
    CREATED_AT = "created_at"

REPORTS_DIR = Path("/app/reports").resolve()

def search_products(query: str, sort_by: SortColumn, db_path: str) -> list[dict]:
    if len(query) > 200:
        raise ValueError("Query quá dài")
    conn = sqlite3.connect(db_path)
    conn.row_factory = sqlite3.Row
    cursor = conn.cursor()
    cursor.execute(
        f"SELECT * FROM products WHERE name LIKE ? ORDER BY {sort_by.value}",
        (f"%{query}%",),
    )
    results = [dict(row) for row in cursor.fetchall()]
    conn.close()
    return results

def generate_report(filename: str) -> str:
    safe_name = Path(filename).name
    if not safe_name or safe_name.startswith("."):
        raise ValueError("Filename không hợp lệ")
    report_path = (REPORTS_DIR / safe_name).resolve()
    if not str(report_path).startswith(str(REPORTS_DIR)):
        raise ValueError("Path traversal detected")
    return report_path.read_text(encoding="utf-8")

Bài 3: Thiết kế Webhook Validator

Thiết kế hệ thống validation cho webhook endpoint nhận events từ payment provider:

  • Validate JSON Schema cho 3 loại event: payment.success, payment.failed, refund.completed
  • Mỗi event type có payload schema khác nhau
  • Verify webhook signature (HMAC-SHA256)
  • Implement replay attack protection (timestamp check)

🧠 Quiz

Câu hỏi kiểm tra: Tại sao cần verify webhook signature trước khi validate payload?

  • [ ] A. Để tiết kiệm CPU — skip validation cho request giả
  • [ ] B. Vì payload có thể chứa malware
  • [x] C. Để đảm bảo request đến từ payment provider, không phải giả mạo
  • [ ] D. Vì JSON Schema validation không kiểm tra được source

Giải thích: Webhook endpoint là public URL. Bất kỳ ai biết URL đều có thể gửi fake request. Signature verification (HMAC) đảm bảo chỉ payment provider — nơi giữ secret key — mới có thể tạo request hợp lệ. Luôn verify signature trước khi xử lý payload.

Đáp án tham khảo
python
import hashlib, hmac, json, time
from typing import Any
from pydantic import BaseModel, ConfigDict, Field

class WebhookSignatureError(Exception):
    pass

class PaymentSuccessPayload(BaseModel):
    model_config = ConfigDict(strict=True)
    transaction_id: str = Field(..., pattern=r"^TXN-[A-Z0-9]{12}$")
    amount: int = Field(..., gt=0)
    currency: str = Field(..., pattern=r"^[A-Z]{3}$")

class PaymentFailedPayload(BaseModel):
    model_config = ConfigDict(strict=True)
    transaction_id: str = Field(..., pattern=r"^TXN-[A-Z0-9]{12}$")
    error_code: str = Field(..., pattern=r"^E\d{4}$")

class RefundPayload(BaseModel):
    model_config = ConfigDict(strict=True)
    original_transaction_id: str = Field(..., pattern=r"^TXN-[A-Z0-9]{12}$")
    refund_id: str = Field(..., pattern=r"^RFD-[A-Z0-9]{12}$")
    amount: int = Field(..., gt=0)

EVENT_MAP = {
    "payment.success": PaymentSuccessPayload,
    "payment.failed": PaymentFailedPayload,
    "refund.completed": RefundPayload,
}

def verify_signature(payload: bytes, sig: str, secret: str, ts: int | None = None) -> None:
    if ts and abs(time.time() - ts) > 300:
        raise WebhookSignatureError("Webhook quá cũ — possible replay")
    expected = hmac.new(secret.encode(), payload, hashlib.sha256).hexdigest()
    if not hmac.compare_digest(expected, sig):
        raise WebhookSignatureError("Signature không hợp lệ")

def process_webhook(raw_body: bytes, signature: str, secret: str) -> dict[str, Any]:
    data = json.loads(raw_body)
    verify_signature(raw_body, signature, secret, timestamp=data.get("timestamp"))
    event_type = data.get("event")
    if event_type not in EVENT_MAP:
        raise ValueError(f"Unknown event: {event_type}")
    validated = EVENT_MAP[event_type].model_validate(data.get("payload", {}))
    return {"event": event_type, "payload": validated.model_dump()}

Liên kết học tiếp