Giao diện
Input Validation — Xác thực đầu vào
Tháng 3 năm 2023, một fintech startup tại Đông Nam Á mất 2.3 triệu USD trong vòng 47 phút. Nguyên nhân không phải zero-day exploit hay APT attack — mà là một API endpoint cho phép field amount nhận giá trị âm. Kẻ tấn công gửi request {"amount": -50000, "currency": "USD"}, hệ thống xử lý refund ngược, tiền chảy ra ngoài không kiểm soát. Một dòng validation thiếu, thiệt hại hàng triệu đô.
Input validation không phải "nice-to-have" — nó là tuyến phòng thủ đầu tiên của mọi hệ thống production. Mỗi byte dữ liệu đi qua API, form, file upload, webhook đều là vector tấn công tiềm tàng. SQL injection, XSS, command injection, ReDoS — tất cả bắt nguồn từ một lỗi chung: tin tưởng dữ liệu đầu vào mà không xác thực.
Bài viết này trang bị cho bạn arsenal đầy đủ: từ Pydantic v2 — thư viện validation nhanh nhất Python hiện tại, đến Cerberus cho schema linh hoạt, JSON Schema cho API contract, và các kỹ thuật sanitization đã được kiểm chứng trong production.
Bức tranh tư duy
Hãy hình dung hệ thống của bạn như sân bay quốc tế Nội Bài. Mỗi request đến là một hành khách nhập cảnh. Không ai được phép vào lãnh thổ mà không qua hải quan — kiểm tra hộ chiếu (type validation), khai báo hành lý (schema validation), quét an ninh (sanitization), và kiểm dịch (business rule validation). Bỏ qua bất kỳ trạm kiểm soát nào, bạn không biết ai đang đi vào hệ thống.
Request đến ──► [ Trạm 1: Type Check ]
│ Đúng kiểu dữ liệu?
▼
[ Trạm 2: Schema Validation ]
│ Đúng cấu trúc, ràng buộc?
▼
[ Trạm 3: Business Rules ]
│ Hợp lệ về nghiệp vụ?
▼
[ Trạm 4: Sanitization ]
│ Loại bỏ ký tự nguy hiểm?
▼
[ Trạm 5: Context Encoding ]
│ Escape cho SQL / HTML / Shell?
▼
✅ Dữ liệu an toàn ──► Business LogicNguyên tắc vàng: Validate ở biên (boundary), sanitize trước khi sử dụng, encode theo ngữ cảnh. Không bao giờ validate một lần rồi tin tưởng mãi mãi — dữ liệu có thể bị thay đổi giữa các layer.
Cốt lõi kỹ thuật
1. Nền tảng Input Validation
Trước khi dùng bất kỳ thư viện nào, bạn cần nắm vững ba nguyên tắc nền tảng:
- Whitelist over Blacklist: Định nghĩa những gì được phép, không phải những gì bị cấm. Blacklist luôn có lỗ hổng — kẻ tấn công sẽ tìm ra encoding hoặc variation mà bạn chưa nghĩ tới.
- Fail Closed: Khi validation gặp input không xác định, mặc định là từ chối.
- Defense in Depth: Validate ở nhiều layer — client, API gateway, application, database. Không layer nào được tin tưởng layer khác.
python
from dataclasses import dataclass
from enum import Enum
class ValidationResult(Enum):
ACCEPT = "accept"
REJECT = "reject"
@dataclass(frozen=True)
class ValidationError:
field: str
message: str
code: str
def validate_whitelist(
value: str, allowed_chars: str, max_length: int = 255,
) -> tuple[ValidationResult, list[ValidationError]]:
"""Whitelist validation — chỉ chấp nhận ký tự được phép."""
errors: list[ValidationError] = []
if len(value) > max_length:
errors.append(ValidationError("value", f"Vượt quá {max_length} ký tự", "MAX_LENGTH"))
invalid = set(value) - set(allowed_chars)
if invalid:
errors.append(ValidationError("value", f"Ký tự không hợp lệ: {invalid}", "INVALID_CHARS"))
return (ValidationResult.REJECT if errors else ValidationResult.ACCEPT), errors
ALLOWED_USERNAME = "abcdefghijklmnopqrstuvwxyz0123456789_"
assert validate_whitelist("john_doe", ALLOWED_USERNAME)[0] == ValidationResult.ACCEPT
assert validate_whitelist("admin'; DROP--", ALLOWED_USERNAME)[0] == ValidationResult.REJECT2. Pydantic v2 — Validation Engine tốc độ cao
Pydantic v2 viết lại core bằng Rust (pydantic-core), nhanh gấp 5–50 lần v1. Đây là tiêu chuẩn thực tế cho validation trong Python production.
python
from datetime import date
from decimal import Decimal
from typing import Optional
from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator
class CreateOrderRequest(BaseModel):
"""Request tạo đơn hàng — production-grade validation."""
model_config = ConfigDict(
strict=True, str_strip_whitespace=True, str_max_length=10_000,
)
customer_id: str = Field(..., min_length=8, max_length=36, pattern=r"^[a-zA-Z0-9\-]+$")
items: list["OrderItem"] = Field(..., min_length=1, max_length=100)
shipping_address: "Address"
note: Optional[str] = Field(default=None, max_length=500)
coupon_code: Optional[str] = Field(default=None, pattern=r"^[A-Z0-9]{4,20}$")
@model_validator(mode="after")
def validate_total_amount(self) -> "CreateOrderRequest":
total = sum(item.unit_price * item.quantity for item in self.items)
if total > Decimal("100_000_000"):
raise ValueError("Tổng đơn hàng vượt quá 100 triệu VND")
return self
class OrderItem(BaseModel):
model_config = ConfigDict(strict=True)
sku: str = Field(..., pattern=r"^[A-Z]{2,5}-\d{4,10}$")
quantity: int = Field(..., gt=0, le=999)
unit_price: Decimal = Field(..., gt=0, decimal_places=2)
class Address(BaseModel):
model_config = ConfigDict(str_strip_whitespace=True)
street: str = Field(..., min_length=5, max_length=200)
district: str = Field(..., min_length=1, max_length=100)
city: str = Field(..., min_length=1, max_length=100)
phone: str = Field(..., pattern=r"^(\+84|0)\d{9,10}$")
@field_validator("phone")
@classmethod
def normalize_phone(cls, v: str) -> str:
return f"+84{v[1:]}" if v.startswith("0") else vXử lý lỗi validation — trả structured error cho client:
python
from pydantic import ValidationError
from fastapi import HTTPException
def parse_order_request(raw_data: dict) -> CreateOrderRequest:
try:
return CreateOrderRequest.model_validate(raw_data)
except ValidationError as exc:
details = [
{"field": ".".join(str(l) for l in e["loc"]), "message": e["msg"]}
for e in exc.errors()
]
raise HTTPException(status_code=422, detail=details)3. Cerberus — Schema Validation linh hoạt
Khi bạn cần validation schema động (schema thay đổi theo config, không fix cứng lúc compile), Cerberus là lựa chọn phù hợp hơn Pydantic.
python
from cerberus import Validator
product_schema = {
"name": {
"type": "string", "required": True,
"minlength": 2, "maxlength": 200,
"regex": r"^[\w\s\-\.]+$",
},
"price": {
"type": "float", "required": True,
"min": 0.01, "max": 999_999_999.99,
},
"category": {
"type": "string", "required": True,
"allowed": ["electronics", "clothing", "food", "books"],
},
"tags": {
"type": "list", "maxlength": 10,
"schema": {"type": "string", "maxlength": 50, "regex": r"^[a-z0-9\-]+$"},
},
}
def validate_product(data: dict) -> tuple[bool, dict | list]:
v = Validator(product_schema, purge_unknown=True)
if v.validate(data):
return True, v.document
return False, v.errors
valid, result = validate_product({
"name": "Laptop Dell XPS 15",
"price": 35_990_000.00,
"category": "electronics",
"tags": ["laptop", "dell", "premium"],
"unknown_field": "sẽ bị loại bỏ", # purge_unknown=True → loại bỏ
})
assert valid is True
assert "unknown_field" not in result4. JSON Schema Validation
Khi bạn cần validation cross-language (API contract giữa Python backend và frontend), JSON Schema là tiêu chuẩn RFC.
python
from jsonschema import Draft202012Validator
webhook_schema = {
"$schema": "https://json-schema.org/draft/2020-12/schema",
"type": "object",
"required": ["event", "timestamp", "payload"],
"additionalProperties": False,
"properties": {
"event": {
"type": "string",
"enum": ["order.created", "order.updated", "payment.completed"],
},
"timestamp": {"type": "string", "format": "date-time"},
"payload": {"type": "object", "minProperties": 1, "maxProperties": 50},
"idempotency_key": {"type": "string", "pattern": "^[a-f0-9]{32}$"},
},
}
# Compile schema một lần khi import, dùng nhiều lần
compiled_validator = Draft202012Validator(webhook_schema)
def validate_webhook(data: dict) -> list[str]:
"""Trả danh sách lỗi chi tiết."""
errors = []
for error in sorted(compiled_validator.iter_errors(data), key=lambda e: list(e.path)):
path = ".".join(str(p) for p in error.absolute_path) or "(root)"
errors.append(f"[{path}] {error.message}")
return errors
def validate_webhook_fast(data: dict) -> bool:
"""Chỉ cần biết pass/fail — nhanh hơn."""
return compiled_validator.is_valid(data)5. Kỹ thuật Sanitization
Validation kiểm tra dữ liệu có hợp lệ không. Sanitization biến đổi dữ liệu để an toàn khi sử dụng trong ngữ cảnh cụ thể.
python
import html
import os
import re
import unicodedata
from pathlib import Path
from urllib.parse import urlparse
def sanitize_text(value: str, max_length: int = 1_000) -> str:
"""Sanitize text: normalize unicode, strip nulls, escape HTML."""
value = unicodedata.normalize("NFKC", value) # Chống homograph
value = value.replace("\x00", "") # Loại null bytes
value = re.sub(r"[\x00-\x1f\x7f]", "", value) # Loại control chars
value = " ".join(value.split()) # Normalize whitespace
value = html.escape(value) # Escape HTML
return value[:max_length].strip()
def sanitize_filename(filename: str, allowed_ext: set[str]) -> str:
"""Sanitize filename — chống path traversal."""
filename = os.path.basename(filename).replace("\x00", "")
filename = re.sub(r"[^\w.\-]", "_", filename).strip(". ")
if not filename:
raise ValueError("Filename rỗng sau sanitization")
ext = Path(filename).suffix.lower()
if ext not in allowed_ext:
raise ValueError(f"Extension {ext} không được phép")
return filename[:255]
def sanitize_url(url: str, allowed_hosts: set[str]) -> str:
"""Validate URL — chống SSRF và credential injection."""
parsed = urlparse(url)
if parsed.scheme not in ("http", "https"):
raise ValueError(f"Scheme không hợp lệ: {parsed.scheme}")
if parsed.netloc not in allowed_hosts:
raise ValueError(f"Host không được phép: {parsed.netloc}")
if parsed.username or parsed.password:
raise ValueError("URL không được chứa credentials")
return urlThực chiến
API Input Hardening cho Payment Service
python
"""payment_validation.py — Production input hardening layer."""
import hashlib
import logging
from decimal import Decimal
from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator
from fastapi import FastAPI, Request, HTTPException
from starlette.middleware.base import BaseHTTPMiddleware
logger = logging.getLogger(__name__)
MAX_BODY_SIZE = 64 * 1024 # 64KB
class RequestSizeLimiter(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
content_length = request.headers.get("content-length")
if content_length and int(content_length) > MAX_BODY_SIZE:
raise HTTPException(status_code=413, detail="Payload quá lớn")
return await call_next(request)
class PaymentRequest(BaseModel):
model_config = ConfigDict(strict=True, str_strip_whitespace=True)
idempotency_key: str = Field(..., min_length=32, max_length=64, pattern=r"^[a-f0-9\-]+$")
amount: Decimal = Field(..., gt=0, le=Decimal("500_000_000"))
currency: str = Field(..., pattern=r"^(VND|USD|EUR)$")
merchant_id: str = Field(..., pattern=r"^MCH-[A-Z0-9]{8,16}$")
customer_email: str = Field(..., max_length=254)
description: str = Field(..., min_length=1, max_length=500)
@field_validator("amount")
@classmethod
def validate_amount_precision(cls, v: Decimal) -> Decimal:
if v.as_tuple().exponent < -2:
raise ValueError("Tối đa 2 chữ số thập phân")
return v
@field_validator("customer_email")
@classmethod
def validate_email_strict(cls, v: str) -> str:
import re
if not re.match(r"^[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}$", v):
raise ValueError("Email không hợp lệ")
return v.lower()
@model_validator(mode="after")
def validate_currency_amount(self) -> "PaymentRequest":
if self.currency == "VND" and self.amount != int(self.amount):
raise ValueError("VND không hỗ trợ số thập phân")
return self
app = FastAPI()
app.add_middleware(RequestSizeLimiter)
@app.post("/api/v1/payments")
async def create_payment(request: Request):
if request.headers.get("content-type") != "application/json":
raise HTTPException(status_code=415, detail="Chỉ chấp nhận JSON")
try:
raw_data = await request.json()
except Exception:
raise HTTPException(status_code=400, detail="JSON không hợp lệ")
try:
payment = PaymentRequest.model_validate(raw_data)
except Exception as exc:
logger.warning("Payment validation failed", extra={"error": str(exc)})
raise HTTPException(status_code=422, detail=str(exc))
# Fingerprint để detect duplicate/replay
fp_data = f"{payment.merchant_id}:{payment.amount}:{payment.currency}"
fingerprint = hashlib.sha256(fp_data.encode()).hexdigest()[:16]
logger.info("Payment validated", extra={"merchant": payment.merchant_id, "fp": fingerprint})
return {"status": "accepted", "fingerprint": fingerprint}Sai lầm điển hình
Sai lầm 1: Tin tưởng type coercion mặc định
❌ SAI
python
from pydantic import BaseModel
class UserInput(BaseModel):
is_admin: bool
count: int
user = UserInput(is_admin="yes", count="42")
print(user.is_admin) # True — "yes" thành True!
print(user.count) # 42 — "42" thành 42
# Kẻ tấn công gửi is_admin="anything" → luôn True✅ ĐÚNG
python
from pydantic import BaseModel, ConfigDict
class UserInput(BaseModel):
model_config = ConfigDict(strict=True)
is_admin: bool
count: int
# UserInput(is_admin="yes", count="42") → ValidationError!
# Chỉ chấp nhận đúng kiểu: bool và int
user = UserInput(is_admin=False, count=42)Sai lầm 2: Validate nhưng không sanitize
❌ SAI
python
from pydantic import BaseModel, Field
class Comment(BaseModel):
content: str = Field(..., max_length=1000)
comment = Comment(content="<script>alert('xss')</script>")
# Validation pass — nhưng content chứa XSS payload!
# Render trực tiếp vào HTML → bị tấn công✅ ĐÚNG
python
import html
from pydantic import BaseModel, Field, field_validator
class Comment(BaseModel):
content: str = Field(..., max_length=1000)
@field_validator("content")
@classmethod
def sanitize_content(cls, v: str) -> str:
return html.escape(v.strip())
comment = Comment(content="<script>alert('xss')</script>")
print(comment.content)
# "<script>alert('xss')</script>" — an toànSai lầm 3: Mass assignment
❌ SAI
python
@app.put("/users/{user_id}")
async def update_user(user_id: int, data: dict):
for key, value in data.items():
setattr(user, key, value) # {"is_admin": true} → leo thang quyền!✅ ĐÚNG
python
from pydantic import BaseModel
from typing import Optional
class UserUpdate(BaseModel):
display_name: Optional[str] = None
email: Optional[str] = None
# is_admin KHÔNG có → không thể set qua API
@app.put("/users/{user_id}")
async def update_user(user_id: int, data: UserUpdate):
for key, value in data.model_dump(exclude_unset=True).items():
setattr(user, key, value)Sai lầm 4: Regex không anchor
❌ SAI
python
import re
def is_valid_id(value: str) -> bool:
return bool(re.match(r"\d+", value))
is_valid_id("123abc") # True! — match "123", bỏ "abc"
is_valid_id("1; DROP TABLE") # True! — match "1", bỏ phần sau✅ ĐÚNG
python
import re
def is_valid_id(value: str) -> bool:
return bool(re.fullmatch(r"\d{1,10}", value)) # Anchor toàn bộ string
is_valid_id("123abc") # False
is_valid_id("1; DROP TABLE") # False
is_valid_id("12345") # TrueSai lầm 5: Không giới hạn kích thước input
❌ SAI
python
class SearchRequest(BaseModel):
query: str # Không giới hạn — attacker gửi 100MB string
tags: list[str] # Không giới hạn — 1 triệu items → OOM crash✅ ĐÚNG
python
from pydantic import BaseModel, Field, ConfigDict
class SearchRequest(BaseModel):
model_config = ConfigDict(str_max_length=10_000)
query: str = Field(..., min_length=1, max_length=500)
tags: list[str] = Field(default_factory=list, max_length=20)
page: int = Field(default=1, ge=1, le=1000)Under the Hood
Pydantic v2 — bên trong hoạt động thế nào?
Pydantic v2 biên dịch schema thành validator chain bằng Rust (pydantic-core). Schema được compile một lần khi import, mỗi lần validate() chạy hoàn toàn trong Rust.
Benchmark (10,000 validations, model 8 fields):
| Thư viện | Thời gian | Ghi chú |
|---|---|---|
| Pydantic v2 (strict) | ~45ms | Hot path trong Rust |
| jsonschema | ~900ms | Pure Python |
| Cerberus | ~1,200ms | Linh hoạt nhưng chậm |
| Pydantic v1 | ~850ms | Deprecated |
ReDoS — Regex Denial of Service
Regex engine của Python sử dụng backtracking NFA. Một số pattern gây catastrophic backtracking — thời gian chạy tăng theo hàm mũ với độ dài input.
python
import re
import time
evil_pattern = re.compile(r"^(a+)+$") # Nested quantifiers → nguy hiểm!
start = time.perf_counter()
evil_pattern.match("a" * 25 + "b") # ~1-2 giây (hàm mũ!)
# "a" * 30 + "b" → hàng chục giây; "a" * 40 + "b" → treo máyPhòng chống ReDoS — luôn giới hạn input length trước khi regex:
python
import re
def safe_regex_match(pattern: str, text: str, max_len: int = 10_000) -> bool:
if len(text) > max_len:
raise ValueError(f"Input quá dài: {len(text)}")
return bool(re.match(pattern, text))
# Quy tắc: (1) không nested quantifiers (a+)+
# (2) không overlapping alternatives (a|a)+
# (3) dùng re.fullmatch() thay re.match()
# (4) cân nhắc google-re2 cho linear time guaranteeValidation Performance
Với high-throughput systems, pre-compile regex và cache results:
python
import re
from functools import lru_cache
# ✅ Pre-compile một lần, dùng nhiều lần
EMAIL_PATTERN = re.compile(r"^[\w.+-]+@[\w-]+\.[\w.]+$")
def validate_email_fast(email: str) -> bool:
return bool(EMAIL_PATTERN.match(email))
@lru_cache(maxsize=1024) # Cache cho batch processing
def validate_email_cached(email: str) -> bool:
return bool(EMAIL_PATTERN.match(email))Checklist ghi nhớ
✅ Checklist triển khai
Thiết kế Validation
- [ ] Sử dụng strict mode trong Pydantic (
ConfigDict(strict=True)) - [ ] Whitelist thay vì blacklist cho mọi input
- [ ] Đặt
max_lengthcho mọi string field — không có ngoại lệ - [ ] Đặt
max_lengthcho mọi list/array field - [ ] Dùng
additionalProperties: falsetrong JSON Schema
Sanitization
- [ ] Normalize Unicode (
NFKC) trước khi validate - [ ] Loại bỏ null bytes từ mọi string input
- [ ] HTML escape trước khi render user content
- [ ] Sanitize filename trước khi lưu file upload
- [ ] Validate URL scheme và host trước khi fetch (chống SSRF)
Phòng chống Injection
- [ ] Luôn dùng parameterized queries — không bao giờ nối chuỗi SQL
- [ ] Không dùng
shell=Truetrong subprocess — truyền list arguments - [ ] Dùng separate model cho Create / Update / Response (không dùng chung)
Production Hardening
- [ ] Giới hạn request body size ở middleware level
- [ ] Log validation failures với đủ context để điều tra
- [ ] Kiểm tra ReDoS cho mọi regex pattern trong codebase
- [ ] Pre-compile regex và JSON Schema validators khi khởi động app
Bài tập luyện tập
Bài 1: Xây dựng User Registration Validator
Viết Pydantic model RegisterUser với các yêu cầu:
username: 3–30 ký tự, chỉ alphanumeric và underscore, không bắt đầu bằng sốemail: email hợp lệ, normalize về lowercasepassword: 8–72 ký tự, phải có uppercase, lowercase, digit, special chardate_of_birth: phải từ 13 tuổi trở lênreferral_code: optional, formatREF-XXXX(X là alphanumeric uppercase)
🧠 Quiz
Câu hỏi kiểm tra: Tại sao password nên giới hạn tối đa 72 ký tự?
- [ ] A. Vì database column chỉ chứa 72 ký tự
- [x] B. Vì bcrypt chỉ hash 72 bytes đầu tiên — phần còn lại bị bỏ qua
- [ ] C. Vì UX — người dùng không nhớ password dài hơn
- [ ] D. Vì RFC 7617 quy định giới hạn 72 ký tự
Giải thích: bcrypt truncate input tại 72 bytes. Password "A" * 72 + "B" và "A" * 72 + "C" sẽ có cùng hash — tạo lỗ hổng bảo mật. Giới hạn max_length=72 đảm bảo toàn bộ password được hash.
Đáp án tham khảo
python
from datetime import date
from typing import Optional
from pydantic import BaseModel, ConfigDict, EmailStr, Field, field_validator
class RegisterUser(BaseModel):
model_config = ConfigDict(strict=True, str_strip_whitespace=True)
username: str = Field(
..., min_length=3, max_length=30,
pattern=r"^[a-zA-Z_][a-zA-Z0-9_]*$",
)
email: EmailStr
password: str = Field(..., min_length=8, max_length=72)
date_of_birth: date
referral_code: Optional[str] = Field(
default=None, pattern=r"^REF-[A-Z0-9]{4}$",
)
@field_validator("email")
@classmethod
def normalize_email(cls, v: str) -> str:
return v.lower()
@field_validator("password")
@classmethod
def validate_password_strength(cls, v: str) -> str:
checks = [
(any(c.isupper() for c in v), "cần ít nhất 1 chữ hoa"),
(any(c.islower() for c in v), "cần ít nhất 1 chữ thường"),
(any(c.isdigit() for c in v), "cần ít nhất 1 chữ số"),
(any(c in "!@#$%^&*()_+-=[]{}|;:,.<>?" for c in v), "cần ký tự đặc biệt"),
]
failures = [msg for passed, msg in checks if not passed]
if failures:
raise ValueError(f"Password yếu: {', '.join(failures)}")
return v
@field_validator("date_of_birth")
@classmethod
def validate_minimum_age(cls, v: date) -> date:
today = date.today()
age = today.year - v.year - ((today.month, today.day) < (v.month, v.day))
if age < 13:
raise ValueError("Phải từ 13 tuổi trở lên")
if age > 150:
raise ValueError("Ngày sinh không hợp lệ")
return vBài 2: Phát hiện và sửa lỗ hổng Validation
Đoạn code dưới đây có 4 lỗ hổng validation. Hãy tìm và sửa tất cả.
python
import subprocess, sqlite3
def search_products(query: str, sort_by: str, db_path: str) -> list:
conn = sqlite3.connect(db_path)
sql = f"SELECT * FROM products WHERE name LIKE '%{query}%' ORDER BY {sort_by}"
conn.cursor().execute(sql)
return conn.cursor().fetchall()
def generate_report(filename: str) -> str:
return subprocess.run(
f"cat reports/{filename}", shell=True, capture_output=True, text=True,
).stdout🧠 Quiz
Câu hỏi kiểm tra: Đoạn code trên có bao nhiêu lỗ hổng injection?
- [ ] A. 2 — SQL injection và command injection
- [ ] B. 3 — SQL injection, command injection, path traversal
- [x] C. 4 — SQL injection (query), SQL injection (sort_by), command injection, path traversal
- [ ] D. 5 lỗ hổng
Giải thích: (1) query nối trực tiếp vào SQL → SQL injection. (2) sort_by cũng nối trực tiếp — ORDER BY injection. (3) filename truyền vào shell → command injection. (4) filename có thể chứa ../ → path traversal đọc file tùy ý.
Đáp án tham khảo
python
import sqlite3
from pathlib import Path
from enum import Enum
class SortColumn(Enum):
NAME = "name"
PRICE = "price"
CREATED_AT = "created_at"
REPORTS_DIR = Path("/app/reports").resolve()
def search_products(query: str, sort_by: SortColumn, db_path: str) -> list[dict]:
if len(query) > 200:
raise ValueError("Query quá dài")
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute(
f"SELECT * FROM products WHERE name LIKE ? ORDER BY {sort_by.value}",
(f"%{query}%",),
)
results = [dict(row) for row in cursor.fetchall()]
conn.close()
return results
def generate_report(filename: str) -> str:
safe_name = Path(filename).name
if not safe_name or safe_name.startswith("."):
raise ValueError("Filename không hợp lệ")
report_path = (REPORTS_DIR / safe_name).resolve()
if not str(report_path).startswith(str(REPORTS_DIR)):
raise ValueError("Path traversal detected")
return report_path.read_text(encoding="utf-8")Bài 3: Thiết kế Webhook Validator
Thiết kế hệ thống validation cho webhook endpoint nhận events từ payment provider:
- Validate JSON Schema cho 3 loại event:
payment.success,payment.failed,refund.completed - Mỗi event type có payload schema khác nhau
- Verify webhook signature (HMAC-SHA256)
- Implement replay attack protection (timestamp check)
🧠 Quiz
Câu hỏi kiểm tra: Tại sao cần verify webhook signature trước khi validate payload?
- [ ] A. Để tiết kiệm CPU — skip validation cho request giả
- [ ] B. Vì payload có thể chứa malware
- [x] C. Để đảm bảo request đến từ payment provider, không phải giả mạo
- [ ] D. Vì JSON Schema validation không kiểm tra được source
Giải thích: Webhook endpoint là public URL. Bất kỳ ai biết URL đều có thể gửi fake request. Signature verification (HMAC) đảm bảo chỉ payment provider — nơi giữ secret key — mới có thể tạo request hợp lệ. Luôn verify signature trước khi xử lý payload.
Đáp án tham khảo
python
import hashlib, hmac, json, time
from typing import Any
from pydantic import BaseModel, ConfigDict, Field
class WebhookSignatureError(Exception):
pass
class PaymentSuccessPayload(BaseModel):
model_config = ConfigDict(strict=True)
transaction_id: str = Field(..., pattern=r"^TXN-[A-Z0-9]{12}$")
amount: int = Field(..., gt=0)
currency: str = Field(..., pattern=r"^[A-Z]{3}$")
class PaymentFailedPayload(BaseModel):
model_config = ConfigDict(strict=True)
transaction_id: str = Field(..., pattern=r"^TXN-[A-Z0-9]{12}$")
error_code: str = Field(..., pattern=r"^E\d{4}$")
class RefundPayload(BaseModel):
model_config = ConfigDict(strict=True)
original_transaction_id: str = Field(..., pattern=r"^TXN-[A-Z0-9]{12}$")
refund_id: str = Field(..., pattern=r"^RFD-[A-Z0-9]{12}$")
amount: int = Field(..., gt=0)
EVENT_MAP = {
"payment.success": PaymentSuccessPayload,
"payment.failed": PaymentFailedPayload,
"refund.completed": RefundPayload,
}
def verify_signature(payload: bytes, sig: str, secret: str, ts: int | None = None) -> None:
if ts and abs(time.time() - ts) > 300:
raise WebhookSignatureError("Webhook quá cũ — possible replay")
expected = hmac.new(secret.encode(), payload, hashlib.sha256).hexdigest()
if not hmac.compare_digest(expected, sig):
raise WebhookSignatureError("Signature không hợp lệ")
def process_webhook(raw_body: bytes, signature: str, secret: str) -> dict[str, Any]:
data = json.loads(raw_body)
verify_signature(raw_body, signature, secret, timestamp=data.get("timestamp"))
event_type = data.get("event")
if event_type not in EVENT_MAP:
raise ValueError(f"Unknown event: {event_type}")
validated = EVENT_MAP[event_type].model_validate(data.get("payload", {}))
return {"event": event_type, "payload": validated.model_dump()}