Skip to content

FastAPI — Xây dựng API hiện đại với Python

Năm 2022, đội ngũ backend của một startup fintech tại TP.HCM quyết định migrate hệ thống thanh toán từ Flask sang FastAPI. Lý do không phải vì Flask "chậm" — mà vì mỗi lần thay đổi schema request, phải sửa validation thủ công ở ba nơi khác nhau, documentation lệch so với code thực tế, và mỗi bug validation trên production đều có thể gây sai lệch giao dịch tiền thật. Sau khi chuyển sang FastAPI, số lượng bug liên quan đến input validation giảm 73% trong quý đầu tiên — không phải vì đội ngũ giỏi hơn, mà vì framework buộc họ phải khai báo type đúng từ đầu.

FastAPI không chỉ là "Flask nhanh hơn". Nó là paradigm shift (bước chuyển tư duy) trong cách xây dựng API bằng Python — type hints trở thành contract giữa client và server, validation tự động từ Pydantic, dependency injection (tiêm phụ thuộc) tích hợp sẵn, và async/await là first-class citizen. Khi bạn khai báo một endpoint, bạn đồng thời đang viết documentation, validation logic, và serialization — tất cả từ một nguồn duy nhất.

Bài viết này đi sâu vào cốt lõi kỹ thuật của FastAPI: từ routing và Pydantic model, qua dependency injection và middleware, đến WebSocket và background tasks — tất cả với code production-grade và error handling đầy đủ.

Bức tranh tư duy

Hãy hình dung FastAPI như một nhà hàng phở Sài Gòn đông khách vào giờ trưa. Mỗi thành phần đóng vai trò cụ thể:

              ┌──────────────────────────────────┐
              │      ASGI Server (Uvicorn)        │
              │   "Mặt tiền nhà hàng — đón khách"  │
              └──────────────┬───────────────────┘

              ┌──────────────▼───────────────────┐
              │       Middleware Stack             │
              │  "Bảo vệ — kiểm tra trước khi vào" │
              │  CORS │ Auth │ Logging │ RateLimit │
              └──────────────┬───────────────────┘

              ┌──────────────▼───────────────────┐
              │            Router                  │
              │  "Dẫn khách đúng bàn"              │
              └──────────────┬───────────────────┘

              ┌──────────────▼───────────────────┐
              │     Dependency Injection           │
              │  "Bếp chuẩn bị nguyên liệu sẵn"   │
              └──────────────┬───────────────────┘

              ┌──────────────▼───────────────────┐
              │      Endpoint Handler              │
              │  "Đầu bếp nấu món — logic chính"   │
              └──────────────┬───────────────────┘

              ┌──────────────▼───────────────────┐
              │   Response + Background Tasks      │
              │  "Phục vụ + dọn bàn sau"           │
              └──────────────────────────────────┘

Tư duy cốt lõi: Giống nhà hàng hiệu quả không bắt đầu bếp ra đón khách, FastAPI tách biệt rõ ràng từng layer. Middleware xử lý cross-cutting concerns trước khi request chạm business logic. Dependency injection chuẩn bị sẵn mọi thứ endpoint cần — database connection, authenticated user — để endpoint chỉ tập trung vào logic nghiệp vụ.

Điểm khác biệt so với Flask hay Django: FastAPI chạy trên ASGI (Asynchronous Server Gateway Interface), xử lý hàng nghìn request đồng thời mà không cần thread riêng — như một đầu bếp xử lý 20 nồi phở cùng lúc thay vì thuê 20 đầu bếp.

Cốt lõi kỹ thuật

Routing và Path Parameters

Routing (định tuyến) dùng decorator pattern. Path parameters (tham số đường dẫn) được khai báo trong URL và FastAPI tự validate type:

python
from fastapi import FastAPI, Path, Query, HTTPException
from enum import Enum

app = FastAPI(title="Order Management API", version="1.0.0")

class OrderStatus(str, Enum):
    pending = "pending"
    confirmed = "confirmed"
    shipping = "shipping"
    delivered = "delivered"

@app.get("/orders/{order_id}")
async def get_order(
    order_id: int = Path(..., gt=0, description="ID đơn hàng"),
):
    """Path param tự động parse string→int, validate gt=0, trả 422 nếu sai."""
    return {"order_id": order_id, "status": "confirmed"}

@app.get("/orders/")
async def list_orders(
    status: OrderStatus | None = Query(default=None),
    page: int = Query(default=1, ge=1),
    page_size: int = Query(default=20, ge=1, le=100),
):
    return {"status_filter": status, "page": page, "page_size": page_size}

Tại sao dùng Path()Query()? Chúng tự động sinh OpenAPI docs — frontend thấy ngay order_id > 0, page_size ≤ 100 mà không cần hỏi backend.

Pydantic Models — Validation từ khai báo type

Pydantic model là trái tim của FastAPI. Khai báo schema bằng Python class — FastAPI tự validate request, serialize response, và sinh docs:

python
from pydantic import BaseModel, Field, field_validator, model_validator
from datetime import datetime
from decimal import Decimal

class OrderItemCreate(BaseModel):
    product_id: int = Field(..., gt=0)
    quantity: int = Field(..., gt=0, le=1000)
    unit_price: Decimal = Field(..., gt=0, max_digits=12, decimal_places=2)

    @field_validator("unit_price")
    @classmethod
    def validate_price(cls, v: Decimal) -> Decimal:
        if v < 1000:
            raise ValueError("Đơn giá tối thiểu 1.000 VND")
        return v

class OrderCreate(BaseModel):
    customer_id: int = Field(..., gt=0)
    items: list[OrderItemCreate] = Field(..., min_length=1, max_length=50)
    shipping_address: str = Field(..., min_length=10, max_length=500)
    note: str | None = Field(default=None, max_length=1000)

    @model_validator(mode="after")
    def validate_total(self) -> "OrderCreate":
        total = sum(i.unit_price * i.quantity for i in self.items)
        if total > 500_000_000:
            raise ValueError(f"Tổng {total:,.0f} VND vượt hạn mức 500 triệu")
        return self

class OrderResponse(BaseModel):
    """Response schema — chỉ expose field cần thiết."""
    id: int
    customer_id: int
    status: str
    total_amount: Decimal
    created_at: datetime
    items_count: int
    model_config = {"from_attributes": True}

Tại sao tách OrderCreateOrderResponse? Client gửi shipping_address, server trả total_amount — dùng chung model là anti-pattern dẫn đến lộ dữ liệu nhạy cảm.

Dependency Injection — Tách biệt concerns

DI trong FastAPI qua Depends() — endpoint khai báo "tôi cần thứ này" và framework cung cấp:

python
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from typing import Annotated
import jwt

security = HTTPBearer()
SECRET_KEY = "from-env-var"  # Production: os.getenv("SECRET_KEY")

async def get_db_session():
    """Yield dependency — FastAPI tự cleanup kể cả khi exception."""
    session = AsyncSession(engine)
    try:
        yield session
    finally:
        await session.close()

async def get_current_user(
    credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)],
    db: Annotated["AsyncSession", Depends(get_db_session)],
):
    """Xác thực JWT và trả user. Dependency chain tự resolve."""
    try:
        payload = jwt.decode(credentials.credentials, SECRET_KEY, algorithms=["HS256"])
        user_id: int = payload.get("sub")
        if user_id is None:
            raise HTTPException(status_code=401, detail="Token thiếu user")
    except jwt.ExpiredSignatureError:
        raise HTTPException(status_code=401, detail="Token hết hạn")
    except jwt.InvalidTokenError:
        raise HTTPException(status_code=401, detail="Token không hợp lệ")
    user = await db.get(User, user_id)
    if not user:
        raise HTTPException(status_code=401, detail="User không tồn tại")
    return user

def require_role(role: str):
    """Factory dependency: Depends(require_role("admin"))"""
    async def checker(user: Annotated[User, Depends(get_current_user)]):
        if user.role != role:
            raise HTTPException(status_code=403, detail=f"Yêu cầu quyền {role}")
        return user
    return checker

@app.post("/orders/", response_model=OrderResponse)
async def create_order(
    data: OrderCreate,
    user: Annotated[User, Depends(get_current_user)],
    db: Annotated["AsyncSession", Depends(get_db_session)],
):
    """Endpoint focus business logic — auth và DB đã được inject."""
    order = Order(customer_id=user.id, shipping_address=data.shipping_address)
    db.add(order)
    await db.commit()
    return order

Tại sao DI quan trọng? Unit test chỉ cần override dependency thay vì mock DB layer. Đổi auth provider chỉ sửa một dependency — không sửa 50 endpoint.

Middleware — Cross-cutting concerns

Middleware chạy trước/sau mỗi request — logging, CORS, rate limiting:

python
import time, uuid, logging
from fastapi import Request, Response
from fastapi.middleware.cors import CORSMiddleware
from starlette.middleware.base import BaseHTTPMiddleware

logger = logging.getLogger(__name__)

app.add_middleware(
    CORSMiddleware,
    allow_origins=["https://app.example.com"],  # KHÔNG ["*"] trên production
    allow_credentials=True,
    allow_methods=["GET", "POST", "PUT", "DELETE"],
    allow_headers=["Authorization", "Content-Type"],
)

class RequestLoggingMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next) -> Response:
        request_id = str(uuid.uuid4())[:8]
        request.state.request_id = request_id
        start = time.perf_counter()
        try:
            response = await call_next(request)
            ms = (time.perf_counter() - start) * 1000
            logger.info(f"{request.method} {request.url.path}{response.status_code} ({ms:.1f}ms)")
            response.headers["X-Request-ID"] = request_id
            return response
        except Exception as exc:
            logger.error(f"Request failed: {exc}", extra={"request_id": request_id})
            raise

app.add_middleware(RequestLoggingMiddleware)

Thứ tự middleware: Thêm sau chạy trước (LIFO). CORS thêm đầu tiên (bao ngoài), logging thêm sau (bên trong).

Background Tasks — Xử lý sau response

Background tasks (tác vụ nền) trả response ngay, xử lý phía sau — gửi email, cập nhật cache:

python
from fastapi import BackgroundTasks

async def send_confirmation_email(order_id: int, email: str) -> None:
    """Chạy background — fail không ảnh hưởng đơn hàng."""
    try:
        logger.info(f"Sending confirmation for order {order_id} to {email}")
    except Exception as exc:
        logger.error(f"Email failed: {exc}", exc_info=True)

@app.post("/orders/", status_code=201)
async def create_order_with_tasks(
    data: OrderCreate, background_tasks: BackgroundTasks,
    user: Annotated[User, Depends(get_current_user)],
):
    order = await save_order(data, user)
    background_tasks.add_task(send_confirmation_email, order.id, user.email)
    return order

Background Tasks vs Celery? Tasks chạy trong process — phù hợp việc nhẹ (<5s). Celery cho việc nặng vì distributed và persistent.

WebSocket — Giao tiếp hai chiều thời gian thực

WebSocket cho phép server push dữ liệu đến client — lý tưởng cho notification, live dashboard:

python
from fastapi import WebSocket, WebSocketDisconnect
import json

class ConnectionManager:
    def __init__(self):
        self._active: dict[int, WebSocket] = {}

    async def connect(self, user_id: int, ws: WebSocket) -> None:
        await ws.accept()
        self._active[user_id] = ws

    def disconnect(self, user_id: int) -> None:
        self._active.pop(user_id, None)

    async def send_to_user(self, user_id: int, message: dict) -> None:
        ws = self._active.get(user_id)
        if ws:
            try:
                await ws.send_json(message)
            except Exception:
                self.disconnect(user_id)

manager = ConnectionManager()

@app.websocket("/ws/orders/{user_id}")
async def order_updates(websocket: WebSocket, user_id: int):
    await manager.connect(user_id, websocket)
    try:
        while True:
            data = json.loads(await websocket.receive_text())
            if data.get("type") == "ping":
                await websocket.send_json({"type": "pong"})
    except WebSocketDisconnect:
        manager.disconnect(user_id)

Production note: WebSocket là stateful — nhiều server instances cần message broker (Redis Pub/Sub) để đồng bộ.

Thực chiến

Tình huống: Xây dựng production API service cho hệ thống quản lý đơn hàng

Yêu cầu: CRUD đơn hàng, validation chặt, error handling thống nhất, health check, structured logging.

python
"""Order Management API. Chạy: uvicorn main:app --workers 4"""
from fastapi import FastAPI, HTTPException, Request, Depends, Query, status
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field, field_validator
from typing import Annotated
from datetime import datetime
from decimal import Decimal
from contextlib import asynccontextmanager
import logging, time, uuid

logger = logging.getLogger("order_api")

@asynccontextmanager
async def lifespan(app: FastAPI):
    logger.info("Starting Order API...")
    yield
    logger.info("Shutting down...")

app = FastAPI(title="Order Management API", version="2.0.0", lifespan=lifespan)

# --- Exception Handlers ---
@app.exception_handler(HTTPException)
async def http_exc_handler(request: Request, exc: HTTPException):
    return JSONResponse(
        status_code=exc.status_code,
        content={"error": {"code": exc.status_code, "message": exc.detail,
                           "request_id": getattr(request.state, "request_id", "?")}},
    )

@app.exception_handler(Exception)
async def unhandled_exc_handler(request: Request, exc: Exception):
    rid = getattr(request.state, "request_id", "?")
    logger.error(f"Unhandled: {exc}", extra={"request_id": rid}, exc_info=True)
    return JSONResponse(status_code=500,
        content={"error": {"code": 500, "message": "Lỗi hệ thống", "request_id": rid}})

# --- Middleware ---
app.add_middleware(CORSMiddleware, allow_origins=["https://app.example.com"],
    allow_credentials=True, allow_methods=["GET","POST","PUT","DELETE"],
    allow_headers=["Authorization","Content-Type"])

@app.middleware("http")
async def request_context(request: Request, call_next):
    rid = str(uuid.uuid4())[:8]
    request.state.request_id = rid
    start = time.perf_counter()
    response = await call_next(request)
    ms = (time.perf_counter() - start) * 1000
    response.headers["X-Request-ID"] = rid
    logger.info(f"{request.method} {request.url.path}{response.status_code} ({ms:.1f}ms)")
    return response

# --- Schemas ---
class OrderItemSchema(BaseModel):
    product_id: int = Field(..., gt=0)
    quantity: int = Field(..., gt=0, le=999)
    unit_price: Decimal = Field(..., gt=0, max_digits=12, decimal_places=2)

class CreateOrderRequest(BaseModel):
    customer_name: str = Field(..., min_length=2, max_length=200)
    items: list[OrderItemSchema] = Field(..., min_length=1, max_length=50)
    shipping_address: str = Field(..., min_length=10, max_length=500)

    @field_validator("customer_name")
    @classmethod
    def sanitize_name(cls, v: str) -> str:
        cleaned = " ".join(v.split())
        if not cleaned:
            raise ValueError("Tên không được để trống")
        return cleaned

class OrderResponseSchema(BaseModel):
    id: int
    customer_name: str
    status: str
    total_amount: Decimal
    items_count: int
    created_at: datetime
    model_config = {"from_attributes": True}

# --- Store & Dependencies ---
_orders: dict[int, dict] = {}
_next_id: int = 1

async def get_pagination(
    page: int = Query(default=1, ge=1),
    page_size: int = Query(default=20, ge=1, le=100),
) -> dict:
    return {"size": page_size, "offset": (page - 1) * page_size}

# --- Endpoints ---
@app.get("/health")
async def health():
    return {"status": "healthy", "version": "2.0.0"}

@app.post("/api/v1/orders", response_model=OrderResponseSchema, status_code=201)
async def create_order(payload: CreateOrderRequest):
    global _next_id
    total = sum(i.unit_price * i.quantity for i in payload.items)
    order = {"id": _next_id, "customer_name": payload.customer_name,
             "status": "pending", "total_amount": total,
             "items_count": len(payload.items), "created_at": datetime.utcnow()}
    _orders[_next_id] = order
    _next_id += 1
    return OrderResponseSchema(**order)

@app.get("/api/v1/orders", response_model=list[OrderResponseSchema])
async def list_orders(pg: Annotated[dict, Depends(get_pagination)], status_filter: str | None = None):
    orders = [o for o in _orders.values() if not status_filter or o["status"] == status_filter]
    return [OrderResponseSchema(**o) for o in orders[pg["offset"]:pg["offset"]+pg["size"]]]

@app.get("/api/v1/orders/{order_id}", response_model=OrderResponseSchema)
async def get_order(order_id: int):
    order = _orders.get(order_id)
    if not order:
        raise HTTPException(status_code=404, detail=f"Đơn hàng #{order_id} không tồn tại")
    return OrderResponseSchema(**order)

@app.put("/api/v1/orders/{order_id}/status")
async def update_order_status(order_id: int, new_status: str):
    """State machine — không cho phép transition tuỳ ý."""
    order = _orders.get(order_id)
    if not order:
        raise HTTPException(status_code=404, detail=f"Đơn hàng #{order_id} không tồn tại")
    valid = {"pending": ["confirmed","cancelled"], "confirmed": ["shipping","cancelled"],
             "shipping": ["delivered"], "delivered": [], "cancelled": []}
    current = order["status"]
    if new_status not in valid.get(current, []):
        raise HTTPException(status_code=422,
            detail=f"'{current}' → '{new_status}' không hợp lệ. Cho phép: {valid[current]}")
    order["status"] = new_status
    return {"order_id": order_id, "old_status": current, "new_status": new_status}

Điểm đáng chú ý:

  • Lifespan context manager thay @app.on_event("startup") (deprecated)
  • Global exception handler — không lộ stack trace, error format thống nhất
  • State machine cho order status — validate transition hợp lệ
  • Request ID — mỗi request có ID duy nhất để trace log

Sai lầm điển hình

Sai lầm 1: Trả dict thô thay vì Pydantic model

Impact: Lộ internal fields, response không nhất quán.

python
# ❌ SAI
@app.get("/orders/{order_id}")
async def get_order(order_id: int):
    order = await db.get(Order, order_id)
    return {"id": order.id, "internal_margin": order.profit_margin}  # Lộ dữ liệu!

# ✅ ĐÚNG — Pydantic kiểm soát output
@app.get("/orders/{order_id}", response_model=OrderResponse)
async def get_order(order_id: int):
    order = await db.get(Order, order_id)
    if not order:
        raise HTTPException(status_code=404, detail="Not found")
    return order  # Pydantic tự filter đúng fields

Sai lầm 2: Blocking I/O trong async endpoint

Impact: Block event loop — một request chậm làm server đứng.

python
# ❌ SAI — time.sleep block TOÀN BỘ server
@app.post("/reports")
async def generate_report():
    time.sleep(30)  # Mọi request khác phải chờ!
    return {"status": "done"}

# ✅ ĐÚNG — async sleep hoặc sync function (FastAPI tự chạy threadpool)
@app.post("/reports")
async def generate_report():
    await asyncio.sleep(30)  # Non-blocking
    return {"status": "done"}

Sai lầm 3: Không cleanup resources trong dependency

Impact: Connection leak — DB pool cạn kiệt, server crash.

python
# ❌ SAI — exception → session không đóng
async def get_db():
    return AsyncSession(engine)

# ✅ ĐÚNG — yield + finally đảm bảo cleanup
async def get_db():
    session = AsyncSession(engine)
    try:
        yield session
        await session.commit()
    except Exception:
        await session.rollback()
        raise
    finally:
        await session.close()

Sai lầm 4: CORS allow_origins=["*"] trên production

Impact: Mọi website gọi được API — nguy cơ CSRF.

python
# ❌ SAI
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True)

# ✅ ĐÚNG — whitelist domain cụ thể
ALLOWED = os.getenv("CORS_ORIGINS", "https://app.example.com").split(",")
app.add_middleware(CORSMiddleware, allow_origins=ALLOWED, allow_credentials=True,
    allow_methods=["GET","POST","PUT","DELETE"], allow_headers=["Authorization"])

Under the Hood

ASGI Request Lifecycle

Client Request


┌──────────────────┐
│ Uvicorn (ASGI)   │  1. Parse HTTP → ASGI scope
└────────┬─────────┘

┌──────────────────┐
│ Middleware Stack  │  2. Chạy outside→inside (LIFO)
└────────┬─────────┘

┌──────────────────┐
│ Router           │  3. Match URL → endpoint
└────────┬─────────┘

┌──────────────────┐
│ Dependency Graph │  4. Resolve deps (topological sort)
│ Resolution       │  5. Execute yield deps → setup
└────────┬─────────┘

┌──────────────────┐
│ Pydantic Valid.  │  6. Parse body (lazy), validate → 422 nếu fail
└────────┬─────────┘

┌──────────────────┐
│ Endpoint         │  7. Business logic → return value
└────────┬─────────┘

┌──────────────────┐
│ Response +       │  8. Serialize → JSON
│ Background Tasks │  9. Gửi response → chạy background tasks
└────────┬─────────┘

┌──────────────────┐
│ Dep Cleanup      │  10. Yield deps → finally (reverse order)
└──────────────────┘

Performance Tuning

python
# Uvicorn workers: (2 × CPU cores) + 1 → 4 cores = 9 workers

# Connection pooling — KHÔNG tạo connection mới mỗi request
from sqlalchemy.ext.asyncio import create_async_engine
engine = create_async_engine(
    "postgresql+asyncpg://user:pass@localhost/db",
    pool_size=20, max_overflow=10, pool_timeout=30,
    pool_recycle=3600, pool_pre_ping=True,
)

# CPU-bound → ProcessPoolExecutor (không chạy trên event loop)
from concurrent.futures import ProcessPoolExecutor
import asyncio
process_pool = ProcessPoolExecutor(max_workers=4)

async def handle_heavy(data: bytes):
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(process_pool, heavy_compute, data)

Bảng trade-offs

Quyết địnhLựa chọn ALựa chọn BKhuyến nghị
Sync vs Asyncdef — threadpoolasync def — event loopAsync cho I/O-bound, sync cho blocking libs
Pydantic v1 vs v2v1 — stablev2 — nhanh 5-50x, Rust corev2 cho project mới
Background Tasks vs CeleryBuilt-in, trong processDistributed, persistentTasks cho việc nhẹ, Celery cho việc nặng
Uvicorn vs Gunicorn+UvicornĐơn giảnMulti-worker managementGunicorn+Uvicorn cho production
SQLAlchemy sync vs asyncEcosystem lớnNon-blocking DBAsync nếu DB là bottleneck

Checklist ghi nhớ

✅ Checklist triển khai

Khởi tạo Project

  • [ ] Dùng asynccontextmanager lifespan thay on_event (deprecated)
  • [ ] Cấu hình CORS với whitelist domain cụ thể
  • [ ] Thiết lập global exception handler cho error format thống nhất
  • [ ] Thêm health check endpoint cho load balancer

Pydantic & Validation

  • [ ] Tách riêng Request schema và Response schema
  • [ ] Dùng Field() với constraints (gt, min_length, max_length)
  • [ ] Bật from_attributes = True khi dùng với ORM
  • [ ] Viết field_validator cho business rules phức tạp

Dependency Injection

  • [ ] Dùng yield dependency cho resource cleanup (DB session)
  • [ ] Dùng Annotated[Type, Depends()] thay vì param = Depends()
  • [ ] Factory functions cho parameterized deps (require_role("admin"))

Performance & Security

  • [ ] Không blocking I/O trong async def
  • [ ] Cấu hình connection pool cho database
  • [ ] Validate state transitions thay vì set giá trị tuỳ ý
  • [ ] Log structured data với request_id
  • [ ] Secrets từ environment variables, không hardcode

Bài tập luyện tập

Bài 1: Product CRUD API

🧠 Quiz

Đề bài: Xây dựng CRUD API cho quản lý sản phẩm:

  • GET /products — danh sách với pagination
  • POST /products — tạo mới (name, price, category, stock)
  • GET /products/{id} — chi tiết
  • DELETE /products/{id} — soft delete

Yêu cầu: Pydantic models tách request/response, validation (price > 0, name 2-200 ký tự), proper HTTP status codes.

Xem lời giải
python
from fastapi import FastAPI, HTTPException, Query, status
from pydantic import BaseModel, Field
from decimal import Decimal
from datetime import datetime

app = FastAPI(title="Product API")

class ProductCreate(BaseModel):
    name: str = Field(..., min_length=2, max_length=200)
    price: Decimal = Field(..., gt=0, max_digits=12, decimal_places=2)
    category: str = Field(..., min_length=1, max_length=100)
    stock: int = Field(..., ge=0)

class ProductResponse(BaseModel):
    id: int
    name: str
    price: Decimal
    category: str
    stock: int
    is_deleted: bool
    created_at: datetime

_products: dict[int, dict] = {}
_next_id: int = 1

@app.post("/products", response_model=ProductResponse, status_code=201)
async def create_product(data: ProductCreate):
    global _next_id
    product = {"id": _next_id, **data.model_dump(),
               "is_deleted": False, "created_at": datetime.utcnow()}
    _products[_next_id] = product
    _next_id += 1
    return product

@app.get("/products/{pid}", response_model=ProductResponse)
async def get_product(pid: int):
    p = _products.get(pid)
    if not p or p["is_deleted"]:
        raise HTTPException(status_code=404, detail="Sản phẩm không tồn tại")
    return p

@app.delete("/products/{pid}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_product(pid: int):
    p = _products.get(pid)
    if not p or p["is_deleted"]:
        raise HTTPException(status_code=404, detail="Sản phẩm không tồn tại")
    p["is_deleted"] = True

Bài 2: Rate Limiting Middleware

🧠 Quiz

Đề bài: Viết middleware rate limiting theo IP:

  • 100 requests / 60 giây mỗi IP
  • HTTP 429 khi vượt limit
  • Header X-RateLimit-Remaining
  • Sliding window algorithm

Gợi ý: defaultdict + deque track timestamps. Xoá timestamps cũ mỗi request.

Xem lời giải
python
import time
from collections import defaultdict, deque
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from starlette.middleware.base import BaseHTTPMiddleware

RATE_LIMIT, WINDOW = 100, 60

class RateLimitMiddleware(BaseHTTPMiddleware):
    def __init__(self, app):
        super().__init__(app)
        self._requests: dict[str, deque] = defaultdict(deque)

    async def dispatch(self, request: Request, call_next):
        ip = request.client.host if request.client else "unknown"
        now = time.time()
        ts = self._requests[ip]
        while ts and ts[0] < now - WINDOW:
            ts.popleft()
        remaining = RATE_LIMIT - len(ts)
        if remaining <= 0:
            return JSONResponse(
                status_code=429,
                content={"error": "Quá nhiều requests"},
                headers={"X-RateLimit-Remaining": "0", "Retry-After": str(WINDOW)},
            )
        ts.append(now)
        response = await call_next(request)
        response.headers["X-RateLimit-Remaining"] = str(remaining - 1)
        return response

Bài 3: WebSocket Notification

🧠 Quiz

Đề bài: WebSocket endpoint /ws/notify/{user_id}:

  • Server push notification khi có đơn hàng mới
  • Client gửi {"type": "ping"} → server trả {"type": "pong"}
  • Xử lý disconnect gracefully
  • API endpoint POST /notify/{user_id} để trigger notification

Gợi ý: Dùng ConnectionManager từ phần cốt lõi, thêm HTTP endpoint gọi manager.send_to_user().

Xem lời giải
python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import json

app = FastAPI()

class NotifyManager:
    def __init__(self):
        self._ws: dict[int, WebSocket] = {}

    async def connect(self, uid: int, ws: WebSocket):
        await ws.accept()
        self._ws[uid] = ws

    def disconnect(self, uid: int):
        self._ws.pop(uid, None)

    async def notify(self, uid: int, data: dict) -> bool:
        ws = self._ws.get(uid)
        if not ws:
            return False
        try:
            await ws.send_json(data)
            return True
        except Exception:
            self.disconnect(uid)
            return False

mgr = NotifyManager()

@app.websocket("/ws/notify/{user_id}")
async def ws_notify(websocket: WebSocket, user_id: int):
    await mgr.connect(user_id, websocket)
    try:
        while True:
            msg = json.loads(await websocket.receive_text())
            if msg.get("type") == "ping":
                await websocket.send_json({"type": "pong"})
    except WebSocketDisconnect:
        mgr.disconnect(user_id)

@app.post("/notify/{user_id}")
async def send_notification(user_id: int, message: str = "Đơn hàng mới"):
    sent = await mgr.notify(user_id, {"type": "notification", "message": message})
    return {"status": "sent" if sent else "user_offline"}

Liên kết học tiếp