Giao diện
FastAPI — Xây dựng API hiện đại với Python
Năm 2022, đội ngũ backend của một startup fintech tại TP.HCM quyết định migrate hệ thống thanh toán từ Flask sang FastAPI. Lý do không phải vì Flask "chậm" — mà vì mỗi lần thay đổi schema request, phải sửa validation thủ công ở ba nơi khác nhau, documentation lệch so với code thực tế, và mỗi bug validation trên production đều có thể gây sai lệch giao dịch tiền thật. Sau khi chuyển sang FastAPI, số lượng bug liên quan đến input validation giảm 73% trong quý đầu tiên — không phải vì đội ngũ giỏi hơn, mà vì framework buộc họ phải khai báo type đúng từ đầu.
FastAPI không chỉ là "Flask nhanh hơn". Nó là paradigm shift (bước chuyển tư duy) trong cách xây dựng API bằng Python — type hints trở thành contract giữa client và server, validation tự động từ Pydantic, dependency injection (tiêm phụ thuộc) tích hợp sẵn, và async/await là first-class citizen. Khi bạn khai báo một endpoint, bạn đồng thời đang viết documentation, validation logic, và serialization — tất cả từ một nguồn duy nhất.
Bài viết này đi sâu vào cốt lõi kỹ thuật của FastAPI: từ routing và Pydantic model, qua dependency injection và middleware, đến WebSocket và background tasks — tất cả với code production-grade và error handling đầy đủ.
Bức tranh tư duy
Hãy hình dung FastAPI như một nhà hàng phở Sài Gòn đông khách vào giờ trưa. Mỗi thành phần đóng vai trò cụ thể:
┌──────────────────────────────────┐
│ ASGI Server (Uvicorn) │
│ "Mặt tiền nhà hàng — đón khách" │
└──────────────┬───────────────────┘
│
┌──────────────▼───────────────────┐
│ Middleware Stack │
│ "Bảo vệ — kiểm tra trước khi vào" │
│ CORS │ Auth │ Logging │ RateLimit │
└──────────────┬───────────────────┘
│
┌──────────────▼───────────────────┐
│ Router │
│ "Dẫn khách đúng bàn" │
└──────────────┬───────────────────┘
│
┌──────────────▼───────────────────┐
│ Dependency Injection │
│ "Bếp chuẩn bị nguyên liệu sẵn" │
└──────────────┬───────────────────┘
│
┌──────────────▼───────────────────┐
│ Endpoint Handler │
│ "Đầu bếp nấu món — logic chính" │
└──────────────┬───────────────────┘
│
┌──────────────▼───────────────────┐
│ Response + Background Tasks │
│ "Phục vụ + dọn bàn sau" │
└──────────────────────────────────┘Tư duy cốt lõi: Giống nhà hàng hiệu quả không bắt đầu bếp ra đón khách, FastAPI tách biệt rõ ràng từng layer. Middleware xử lý cross-cutting concerns trước khi request chạm business logic. Dependency injection chuẩn bị sẵn mọi thứ endpoint cần — database connection, authenticated user — để endpoint chỉ tập trung vào logic nghiệp vụ.
Điểm khác biệt so với Flask hay Django: FastAPI chạy trên ASGI (Asynchronous Server Gateway Interface), xử lý hàng nghìn request đồng thời mà không cần thread riêng — như một đầu bếp xử lý 20 nồi phở cùng lúc thay vì thuê 20 đầu bếp.
Cốt lõi kỹ thuật
Routing và Path Parameters
Routing (định tuyến) dùng decorator pattern. Path parameters (tham số đường dẫn) được khai báo trong URL và FastAPI tự validate type:
python
from fastapi import FastAPI, Path, Query, HTTPException
from enum import Enum
app = FastAPI(title="Order Management API", version="1.0.0")
class OrderStatus(str, Enum):
pending = "pending"
confirmed = "confirmed"
shipping = "shipping"
delivered = "delivered"
@app.get("/orders/{order_id}")
async def get_order(
order_id: int = Path(..., gt=0, description="ID đơn hàng"),
):
"""Path param tự động parse string→int, validate gt=0, trả 422 nếu sai."""
return {"order_id": order_id, "status": "confirmed"}
@app.get("/orders/")
async def list_orders(
status: OrderStatus | None = Query(default=None),
page: int = Query(default=1, ge=1),
page_size: int = Query(default=20, ge=1, le=100),
):
return {"status_filter": status, "page": page, "page_size": page_size}Tại sao dùng Path() và Query()? Chúng tự động sinh OpenAPI docs — frontend thấy ngay order_id > 0, page_size ≤ 100 mà không cần hỏi backend.
Pydantic Models — Validation từ khai báo type
Pydantic model là trái tim của FastAPI. Khai báo schema bằng Python class — FastAPI tự validate request, serialize response, và sinh docs:
python
from pydantic import BaseModel, Field, field_validator, model_validator
from datetime import datetime
from decimal import Decimal
class OrderItemCreate(BaseModel):
product_id: int = Field(..., gt=0)
quantity: int = Field(..., gt=0, le=1000)
unit_price: Decimal = Field(..., gt=0, max_digits=12, decimal_places=2)
@field_validator("unit_price")
@classmethod
def validate_price(cls, v: Decimal) -> Decimal:
if v < 1000:
raise ValueError("Đơn giá tối thiểu 1.000 VND")
return v
class OrderCreate(BaseModel):
customer_id: int = Field(..., gt=0)
items: list[OrderItemCreate] = Field(..., min_length=1, max_length=50)
shipping_address: str = Field(..., min_length=10, max_length=500)
note: str | None = Field(default=None, max_length=1000)
@model_validator(mode="after")
def validate_total(self) -> "OrderCreate":
total = sum(i.unit_price * i.quantity for i in self.items)
if total > 500_000_000:
raise ValueError(f"Tổng {total:,.0f} VND vượt hạn mức 500 triệu")
return self
class OrderResponse(BaseModel):
"""Response schema — chỉ expose field cần thiết."""
id: int
customer_id: int
status: str
total_amount: Decimal
created_at: datetime
items_count: int
model_config = {"from_attributes": True}Tại sao tách OrderCreate và OrderResponse? Client gửi shipping_address, server trả total_amount — dùng chung model là anti-pattern dẫn đến lộ dữ liệu nhạy cảm.
Dependency Injection — Tách biệt concerns
DI trong FastAPI qua Depends() — endpoint khai báo "tôi cần thứ này" và framework cung cấp:
python
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from typing import Annotated
import jwt
security = HTTPBearer()
SECRET_KEY = "from-env-var" # Production: os.getenv("SECRET_KEY")
async def get_db_session():
"""Yield dependency — FastAPI tự cleanup kể cả khi exception."""
session = AsyncSession(engine)
try:
yield session
finally:
await session.close()
async def get_current_user(
credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)],
db: Annotated["AsyncSession", Depends(get_db_session)],
):
"""Xác thực JWT và trả user. Dependency chain tự resolve."""
try:
payload = jwt.decode(credentials.credentials, SECRET_KEY, algorithms=["HS256"])
user_id: int = payload.get("sub")
if user_id is None:
raise HTTPException(status_code=401, detail="Token thiếu user")
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token hết hạn")
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="Token không hợp lệ")
user = await db.get(User, user_id)
if not user:
raise HTTPException(status_code=401, detail="User không tồn tại")
return user
def require_role(role: str):
"""Factory dependency: Depends(require_role("admin"))"""
async def checker(user: Annotated[User, Depends(get_current_user)]):
if user.role != role:
raise HTTPException(status_code=403, detail=f"Yêu cầu quyền {role}")
return user
return checker
@app.post("/orders/", response_model=OrderResponse)
async def create_order(
data: OrderCreate,
user: Annotated[User, Depends(get_current_user)],
db: Annotated["AsyncSession", Depends(get_db_session)],
):
"""Endpoint focus business logic — auth và DB đã được inject."""
order = Order(customer_id=user.id, shipping_address=data.shipping_address)
db.add(order)
await db.commit()
return orderTại sao DI quan trọng? Unit test chỉ cần override dependency thay vì mock DB layer. Đổi auth provider chỉ sửa một dependency — không sửa 50 endpoint.
Middleware — Cross-cutting concerns
Middleware chạy trước/sau mỗi request — logging, CORS, rate limiting:
python
import time, uuid, logging
from fastapi import Request, Response
from fastapi.middleware.cors import CORSMiddleware
from starlette.middleware.base import BaseHTTPMiddleware
logger = logging.getLogger(__name__)
app.add_middleware(
CORSMiddleware,
allow_origins=["https://app.example.com"], # KHÔNG ["*"] trên production
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE"],
allow_headers=["Authorization", "Content-Type"],
)
class RequestLoggingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next) -> Response:
request_id = str(uuid.uuid4())[:8]
request.state.request_id = request_id
start = time.perf_counter()
try:
response = await call_next(request)
ms = (time.perf_counter() - start) * 1000
logger.info(f"{request.method} {request.url.path} → {response.status_code} ({ms:.1f}ms)")
response.headers["X-Request-ID"] = request_id
return response
except Exception as exc:
logger.error(f"Request failed: {exc}", extra={"request_id": request_id})
raise
app.add_middleware(RequestLoggingMiddleware)Thứ tự middleware: Thêm sau chạy trước (LIFO). CORS thêm đầu tiên (bao ngoài), logging thêm sau (bên trong).
Background Tasks — Xử lý sau response
Background tasks (tác vụ nền) trả response ngay, xử lý phía sau — gửi email, cập nhật cache:
python
from fastapi import BackgroundTasks
async def send_confirmation_email(order_id: int, email: str) -> None:
"""Chạy background — fail không ảnh hưởng đơn hàng."""
try:
logger.info(f"Sending confirmation for order {order_id} to {email}")
except Exception as exc:
logger.error(f"Email failed: {exc}", exc_info=True)
@app.post("/orders/", status_code=201)
async def create_order_with_tasks(
data: OrderCreate, background_tasks: BackgroundTasks,
user: Annotated[User, Depends(get_current_user)],
):
order = await save_order(data, user)
background_tasks.add_task(send_confirmation_email, order.id, user.email)
return orderBackground Tasks vs Celery? Tasks chạy trong process — phù hợp việc nhẹ (<5s). Celery cho việc nặng vì distributed và persistent.
WebSocket — Giao tiếp hai chiều thời gian thực
WebSocket cho phép server push dữ liệu đến client — lý tưởng cho notification, live dashboard:
python
from fastapi import WebSocket, WebSocketDisconnect
import json
class ConnectionManager:
def __init__(self):
self._active: dict[int, WebSocket] = {}
async def connect(self, user_id: int, ws: WebSocket) -> None:
await ws.accept()
self._active[user_id] = ws
def disconnect(self, user_id: int) -> None:
self._active.pop(user_id, None)
async def send_to_user(self, user_id: int, message: dict) -> None:
ws = self._active.get(user_id)
if ws:
try:
await ws.send_json(message)
except Exception:
self.disconnect(user_id)
manager = ConnectionManager()
@app.websocket("/ws/orders/{user_id}")
async def order_updates(websocket: WebSocket, user_id: int):
await manager.connect(user_id, websocket)
try:
while True:
data = json.loads(await websocket.receive_text())
if data.get("type") == "ping":
await websocket.send_json({"type": "pong"})
except WebSocketDisconnect:
manager.disconnect(user_id)Production note: WebSocket là stateful — nhiều server instances cần message broker (Redis Pub/Sub) để đồng bộ.
Thực chiến
Tình huống: Xây dựng production API service cho hệ thống quản lý đơn hàng
Yêu cầu: CRUD đơn hàng, validation chặt, error handling thống nhất, health check, structured logging.
python
"""Order Management API. Chạy: uvicorn main:app --workers 4"""
from fastapi import FastAPI, HTTPException, Request, Depends, Query, status
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field, field_validator
from typing import Annotated
from datetime import datetime
from decimal import Decimal
from contextlib import asynccontextmanager
import logging, time, uuid
logger = logging.getLogger("order_api")
@asynccontextmanager
async def lifespan(app: FastAPI):
logger.info("Starting Order API...")
yield
logger.info("Shutting down...")
app = FastAPI(title="Order Management API", version="2.0.0", lifespan=lifespan)
# --- Exception Handlers ---
@app.exception_handler(HTTPException)
async def http_exc_handler(request: Request, exc: HTTPException):
return JSONResponse(
status_code=exc.status_code,
content={"error": {"code": exc.status_code, "message": exc.detail,
"request_id": getattr(request.state, "request_id", "?")}},
)
@app.exception_handler(Exception)
async def unhandled_exc_handler(request: Request, exc: Exception):
rid = getattr(request.state, "request_id", "?")
logger.error(f"Unhandled: {exc}", extra={"request_id": rid}, exc_info=True)
return JSONResponse(status_code=500,
content={"error": {"code": 500, "message": "Lỗi hệ thống", "request_id": rid}})
# --- Middleware ---
app.add_middleware(CORSMiddleware, allow_origins=["https://app.example.com"],
allow_credentials=True, allow_methods=["GET","POST","PUT","DELETE"],
allow_headers=["Authorization","Content-Type"])
@app.middleware("http")
async def request_context(request: Request, call_next):
rid = str(uuid.uuid4())[:8]
request.state.request_id = rid
start = time.perf_counter()
response = await call_next(request)
ms = (time.perf_counter() - start) * 1000
response.headers["X-Request-ID"] = rid
logger.info(f"{request.method} {request.url.path} → {response.status_code} ({ms:.1f}ms)")
return response
# --- Schemas ---
class OrderItemSchema(BaseModel):
product_id: int = Field(..., gt=0)
quantity: int = Field(..., gt=0, le=999)
unit_price: Decimal = Field(..., gt=0, max_digits=12, decimal_places=2)
class CreateOrderRequest(BaseModel):
customer_name: str = Field(..., min_length=2, max_length=200)
items: list[OrderItemSchema] = Field(..., min_length=1, max_length=50)
shipping_address: str = Field(..., min_length=10, max_length=500)
@field_validator("customer_name")
@classmethod
def sanitize_name(cls, v: str) -> str:
cleaned = " ".join(v.split())
if not cleaned:
raise ValueError("Tên không được để trống")
return cleaned
class OrderResponseSchema(BaseModel):
id: int
customer_name: str
status: str
total_amount: Decimal
items_count: int
created_at: datetime
model_config = {"from_attributes": True}
# --- Store & Dependencies ---
_orders: dict[int, dict] = {}
_next_id: int = 1
async def get_pagination(
page: int = Query(default=1, ge=1),
page_size: int = Query(default=20, ge=1, le=100),
) -> dict:
return {"size": page_size, "offset": (page - 1) * page_size}
# --- Endpoints ---
@app.get("/health")
async def health():
return {"status": "healthy", "version": "2.0.0"}
@app.post("/api/v1/orders", response_model=OrderResponseSchema, status_code=201)
async def create_order(payload: CreateOrderRequest):
global _next_id
total = sum(i.unit_price * i.quantity for i in payload.items)
order = {"id": _next_id, "customer_name": payload.customer_name,
"status": "pending", "total_amount": total,
"items_count": len(payload.items), "created_at": datetime.utcnow()}
_orders[_next_id] = order
_next_id += 1
return OrderResponseSchema(**order)
@app.get("/api/v1/orders", response_model=list[OrderResponseSchema])
async def list_orders(pg: Annotated[dict, Depends(get_pagination)], status_filter: str | None = None):
orders = [o for o in _orders.values() if not status_filter or o["status"] == status_filter]
return [OrderResponseSchema(**o) for o in orders[pg["offset"]:pg["offset"]+pg["size"]]]
@app.get("/api/v1/orders/{order_id}", response_model=OrderResponseSchema)
async def get_order(order_id: int):
order = _orders.get(order_id)
if not order:
raise HTTPException(status_code=404, detail=f"Đơn hàng #{order_id} không tồn tại")
return OrderResponseSchema(**order)
@app.put("/api/v1/orders/{order_id}/status")
async def update_order_status(order_id: int, new_status: str):
"""State machine — không cho phép transition tuỳ ý."""
order = _orders.get(order_id)
if not order:
raise HTTPException(status_code=404, detail=f"Đơn hàng #{order_id} không tồn tại")
valid = {"pending": ["confirmed","cancelled"], "confirmed": ["shipping","cancelled"],
"shipping": ["delivered"], "delivered": [], "cancelled": []}
current = order["status"]
if new_status not in valid.get(current, []):
raise HTTPException(status_code=422,
detail=f"'{current}' → '{new_status}' không hợp lệ. Cho phép: {valid[current]}")
order["status"] = new_status
return {"order_id": order_id, "old_status": current, "new_status": new_status}Điểm đáng chú ý:
- Lifespan context manager thay
@app.on_event("startup")(deprecated) - Global exception handler — không lộ stack trace, error format thống nhất
- State machine cho order status — validate transition hợp lệ
- Request ID — mỗi request có ID duy nhất để trace log
Sai lầm điển hình
Sai lầm 1: Trả dict thô thay vì Pydantic model
Impact: Lộ internal fields, response không nhất quán.
python
# ❌ SAI
@app.get("/orders/{order_id}")
async def get_order(order_id: int):
order = await db.get(Order, order_id)
return {"id": order.id, "internal_margin": order.profit_margin} # Lộ dữ liệu!
# ✅ ĐÚNG — Pydantic kiểm soát output
@app.get("/orders/{order_id}", response_model=OrderResponse)
async def get_order(order_id: int):
order = await db.get(Order, order_id)
if not order:
raise HTTPException(status_code=404, detail="Not found")
return order # Pydantic tự filter đúng fieldsSai lầm 2: Blocking I/O trong async endpoint
Impact: Block event loop — một request chậm làm server đứng.
python
# ❌ SAI — time.sleep block TOÀN BỘ server
@app.post("/reports")
async def generate_report():
time.sleep(30) # Mọi request khác phải chờ!
return {"status": "done"}
# ✅ ĐÚNG — async sleep hoặc sync function (FastAPI tự chạy threadpool)
@app.post("/reports")
async def generate_report():
await asyncio.sleep(30) # Non-blocking
return {"status": "done"}Sai lầm 3: Không cleanup resources trong dependency
Impact: Connection leak — DB pool cạn kiệt, server crash.
python
# ❌ SAI — exception → session không đóng
async def get_db():
return AsyncSession(engine)
# ✅ ĐÚNG — yield + finally đảm bảo cleanup
async def get_db():
session = AsyncSession(engine)
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()Sai lầm 4: CORS allow_origins=["*"] trên production
Impact: Mọi website gọi được API — nguy cơ CSRF.
python
# ❌ SAI
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True)
# ✅ ĐÚNG — whitelist domain cụ thể
ALLOWED = os.getenv("CORS_ORIGINS", "https://app.example.com").split(",")
app.add_middleware(CORSMiddleware, allow_origins=ALLOWED, allow_credentials=True,
allow_methods=["GET","POST","PUT","DELETE"], allow_headers=["Authorization"])Under the Hood
ASGI Request Lifecycle
Client Request
│
▼
┌──────────────────┐
│ Uvicorn (ASGI) │ 1. Parse HTTP → ASGI scope
└────────┬─────────┘
▼
┌──────────────────┐
│ Middleware Stack │ 2. Chạy outside→inside (LIFO)
└────────┬─────────┘
▼
┌──────────────────┐
│ Router │ 3. Match URL → endpoint
└────────┬─────────┘
▼
┌──────────────────┐
│ Dependency Graph │ 4. Resolve deps (topological sort)
│ Resolution │ 5. Execute yield deps → setup
└────────┬─────────┘
▼
┌──────────────────┐
│ Pydantic Valid. │ 6. Parse body (lazy), validate → 422 nếu fail
└────────┬─────────┘
▼
┌──────────────────┐
│ Endpoint │ 7. Business logic → return value
└────────┬─────────┘
▼
┌──────────────────┐
│ Response + │ 8. Serialize → JSON
│ Background Tasks │ 9. Gửi response → chạy background tasks
└────────┬─────────┘
▼
┌──────────────────┐
│ Dep Cleanup │ 10. Yield deps → finally (reverse order)
└──────────────────┘Performance Tuning
python
# Uvicorn workers: (2 × CPU cores) + 1 → 4 cores = 9 workers
# Connection pooling — KHÔNG tạo connection mới mỗi request
from sqlalchemy.ext.asyncio import create_async_engine
engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/db",
pool_size=20, max_overflow=10, pool_timeout=30,
pool_recycle=3600, pool_pre_ping=True,
)
# CPU-bound → ProcessPoolExecutor (không chạy trên event loop)
from concurrent.futures import ProcessPoolExecutor
import asyncio
process_pool = ProcessPoolExecutor(max_workers=4)
async def handle_heavy(data: bytes):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(process_pool, heavy_compute, data)Bảng trade-offs
| Quyết định | Lựa chọn A | Lựa chọn B | Khuyến nghị |
|---|---|---|---|
| Sync vs Async | def — threadpool | async def — event loop | Async cho I/O-bound, sync cho blocking libs |
| Pydantic v1 vs v2 | v1 — stable | v2 — nhanh 5-50x, Rust core | v2 cho project mới |
| Background Tasks vs Celery | Built-in, trong process | Distributed, persistent | Tasks cho việc nhẹ, Celery cho việc nặng |
| Uvicorn vs Gunicorn+Uvicorn | Đơn giản | Multi-worker management | Gunicorn+Uvicorn cho production |
| SQLAlchemy sync vs async | Ecosystem lớn | Non-blocking DB | Async nếu DB là bottleneck |
Checklist ghi nhớ
✅ Checklist triển khai
Khởi tạo Project
- [ ] Dùng
asynccontextmanagerlifespan thayon_event(deprecated) - [ ] Cấu hình CORS với whitelist domain cụ thể
- [ ] Thiết lập global exception handler cho error format thống nhất
- [ ] Thêm health check endpoint cho load balancer
Pydantic & Validation
- [ ] Tách riêng Request schema và Response schema
- [ ] Dùng
Field()với constraints (gt,min_length,max_length) - [ ] Bật
from_attributes = Truekhi dùng với ORM - [ ] Viết
field_validatorcho business rules phức tạp
Dependency Injection
- [ ] Dùng
yielddependency cho resource cleanup (DB session) - [ ] Dùng
Annotated[Type, Depends()]thay vìparam = Depends() - [ ] Factory functions cho parameterized deps (
require_role("admin"))
Performance & Security
- [ ] Không blocking I/O trong
async def - [ ] Cấu hình connection pool cho database
- [ ] Validate state transitions thay vì set giá trị tuỳ ý
- [ ] Log structured data với request_id
- [ ] Secrets từ environment variables, không hardcode
Bài tập luyện tập
Bài 1: Product CRUD API
🧠 Quiz
Đề bài: Xây dựng CRUD API cho quản lý sản phẩm:
GET /products— danh sách với paginationPOST /products— tạo mới (name, price, category, stock)GET /products/{id}— chi tiếtDELETE /products/{id}— soft delete
Yêu cầu: Pydantic models tách request/response, validation (price > 0, name 2-200 ký tự), proper HTTP status codes.
Xem lời giải
python
from fastapi import FastAPI, HTTPException, Query, status
from pydantic import BaseModel, Field
from decimal import Decimal
from datetime import datetime
app = FastAPI(title="Product API")
class ProductCreate(BaseModel):
name: str = Field(..., min_length=2, max_length=200)
price: Decimal = Field(..., gt=0, max_digits=12, decimal_places=2)
category: str = Field(..., min_length=1, max_length=100)
stock: int = Field(..., ge=0)
class ProductResponse(BaseModel):
id: int
name: str
price: Decimal
category: str
stock: int
is_deleted: bool
created_at: datetime
_products: dict[int, dict] = {}
_next_id: int = 1
@app.post("/products", response_model=ProductResponse, status_code=201)
async def create_product(data: ProductCreate):
global _next_id
product = {"id": _next_id, **data.model_dump(),
"is_deleted": False, "created_at": datetime.utcnow()}
_products[_next_id] = product
_next_id += 1
return product
@app.get("/products/{pid}", response_model=ProductResponse)
async def get_product(pid: int):
p = _products.get(pid)
if not p or p["is_deleted"]:
raise HTTPException(status_code=404, detail="Sản phẩm không tồn tại")
return p
@app.delete("/products/{pid}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_product(pid: int):
p = _products.get(pid)
if not p or p["is_deleted"]:
raise HTTPException(status_code=404, detail="Sản phẩm không tồn tại")
p["is_deleted"] = TrueBài 2: Rate Limiting Middleware
🧠 Quiz
Đề bài: Viết middleware rate limiting theo IP:
- 100 requests / 60 giây mỗi IP
- HTTP 429 khi vượt limit
- Header
X-RateLimit-Remaining - Sliding window algorithm
Gợi ý: defaultdict + deque track timestamps. Xoá timestamps cũ mỗi request.
Xem lời giải
python
import time
from collections import defaultdict, deque
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from starlette.middleware.base import BaseHTTPMiddleware
RATE_LIMIT, WINDOW = 100, 60
class RateLimitMiddleware(BaseHTTPMiddleware):
def __init__(self, app):
super().__init__(app)
self._requests: dict[str, deque] = defaultdict(deque)
async def dispatch(self, request: Request, call_next):
ip = request.client.host if request.client else "unknown"
now = time.time()
ts = self._requests[ip]
while ts and ts[0] < now - WINDOW:
ts.popleft()
remaining = RATE_LIMIT - len(ts)
if remaining <= 0:
return JSONResponse(
status_code=429,
content={"error": "Quá nhiều requests"},
headers={"X-RateLimit-Remaining": "0", "Retry-After": str(WINDOW)},
)
ts.append(now)
response = await call_next(request)
response.headers["X-RateLimit-Remaining"] = str(remaining - 1)
return responseBài 3: WebSocket Notification
🧠 Quiz
Đề bài: WebSocket endpoint /ws/notify/{user_id}:
- Server push notification khi có đơn hàng mới
- Client gửi
{"type": "ping"}→ server trả{"type": "pong"} - Xử lý disconnect gracefully
- API endpoint
POST /notify/{user_id}để trigger notification
Gợi ý: Dùng ConnectionManager từ phần cốt lõi, thêm HTTP endpoint gọi manager.send_to_user().
Xem lời giải
python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import json
app = FastAPI()
class NotifyManager:
def __init__(self):
self._ws: dict[int, WebSocket] = {}
async def connect(self, uid: int, ws: WebSocket):
await ws.accept()
self._ws[uid] = ws
def disconnect(self, uid: int):
self._ws.pop(uid, None)
async def notify(self, uid: int, data: dict) -> bool:
ws = self._ws.get(uid)
if not ws:
return False
try:
await ws.send_json(data)
return True
except Exception:
self.disconnect(uid)
return False
mgr = NotifyManager()
@app.websocket("/ws/notify/{user_id}")
async def ws_notify(websocket: WebSocket, user_id: int):
await mgr.connect(user_id, websocket)
try:
while True:
msg = json.loads(await websocket.receive_text())
if msg.get("type") == "ping":
await websocket.send_json({"type": "pong"})
except WebSocketDisconnect:
mgr.disconnect(user_id)
@app.post("/notify/{user_id}")
async def send_notification(user_id: int, message: str = "Đơn hàng mới"):
sent = await mgr.notify(user_id, {"type": "notification", "message": message})
return {"status": "sent" if sent else "user_offline"}