Skip to content

SQLAlchemy — Database Toolkit cho Python

Năm 2023, một startup fintech tại TP.HCM phải rollback toàn bộ release lúc 2 giờ sáng vì một migration Alembic thiếu down_revision. Hệ thống xử lý 50.000 giao dịch/ngày bị lock toàn bộ bảng transactions trong 12 phút — đủ để mất hàng trăm triệu đồng doanh thu. Nguyên nhân gốc không phải bug logic mà là thiếu hiểu biết về cách SQLAlchemy quản lý connection, session, và schema evolution.

SQLAlchemy không chỉ là một ORM (Object-Relational Mapper). Nó là một database toolkit hoàn chỉnh với hai tầng kiến trúc riêng biệt: Core cho SQL expression thuần và ORM cho object mapping. Sai lầm phổ biến nhất của developer Python là chỉ dùng ORM mà không hiểu Core bên dưới — giống như lái xe số tự động mà không biết nguyên lý hộp số. Khi gặp vấn đề hiệu năng với hàng triệu records, bạn sẽ không biết phải tối ưu từ đâu.

Bài viết này đi sâu vào kiến trúc thực sự của SQLAlchemy 2.0, từ connection pooling đến identity map, từ Unit of Work pattern đến async session — những kiến thức mà production code đòi hỏi nhưng tutorial thông thường bỏ qua.

Bức tranh tư duy

Hãy tưởng tượng SQLAlchemy như một nhà hàng có hệ thống bếp chuyên nghiệp. Tầng ORM giống như thực đơn — bạn gọi "Phở bò tái" và nhà bếp lo mọi thứ. Tầng Core giống như bạn vào thẳng bếp, tự chọn nguyên liệu, tự nấu — kiểm soát hoàn toàn nhưng phải hiểu quy trình. Connection Pool (nhóm kết nối) là đội ngũ phục vụ — có giới hạn, nếu hết bàn thì khách mới phải đợi. Session giống như một bữa ăn — bắt đầu khi ngồi xuống, kết thúc khi thanh toán, và mọi thay đổi (gọi thêm món, hủy món) chỉ được xác nhận khi commit.

┌────────────────────────────────────────────────┐
│        Application Code (FastAPI, Flask)         │
├────────────────────────────────────────────────┤
│  ┌─────────────────┐   ┌──────────────────┐     │
│  │  ORM Layer       │   │  Core Layer      │     │
│  │  Mapped Classes  │   │  select/insert   │     │
│  │  Session (UoW)  ─│───│─ Connection      │     │
│  └────────┬────────┘   └───────┬──────────┘     │
│           └──────┬─────────────┘                 │
│          ┌───────▼───────────┐                   │
│          │ Engine + QueuePool│                   │
│          └───────┬───────────┘                   │
├──────────────────┼───────────────────────────────┤
│          ┌───────▼───────────┐                   │
│          │ DBAPI (asyncpg)   │                   │
│          └───────┬───────────┘                   │
│          ┌───────▼───────────┐                   │
│          │ PostgreSQL/MySQL  │                   │
│          └───────────────────┘                   │
└────────────────────────────────────────────────┘

Nguyên tắc cốt lõi: ORM xây dựng trên Core, không thay thế Core. Trong production, bạn sẽ dùng cả hai — ORM cho business logic thông thường, Core cho batch operations và queries phức tạp cần kiểm soát hiệu năng.

Cốt lõi kỹ thuật

ORM vs Core — Khi nào dùng gì?

Câu hỏi đầu tiên mọi team đều đặt ra: dùng ORM hay viết SQL thuần? Câu trả lời là cả hai, nhưng ở đúng ngữ cảnh.

Core — SQL Expression Language — cho phép xây dựng câu SQL bằng Python objects mà không cần mapping sang class:

python
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, select

engine = create_engine("postgresql+psycopg2://user:pass@localhost:5432/mydb")
metadata = MetaData()

users_table = Table(
    "users", metadata,
    Column("id", Integer, primary_key=True),
    Column("email", String(255), nullable=False, unique=True),
    Column("display_name", String(100), nullable=False),
)

stmt = select(users_table.c.email).where(users_table.c.id > 1000).limit(50)
with engine.connect() as conn:
    for row in conn.execute(stmt):
        print(row.email)

ORM — mapping Python class sang database table, tự động tracking thay đổi:

python
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, Session

class Base(DeclarativeBase):
    pass

class User(Base):
    __tablename__ = "users"
    id: Mapped[int] = mapped_column(primary_key=True)
    email: Mapped[str] = mapped_column(String(255), unique=True)
    display_name: Mapped[str] = mapped_column(String(100))

with Session(engine) as session:
    user = session.get(User, 42)
    if user:
        user.display_name = "Nguyễn Văn A"
        session.commit()
Tiêu chíORMCore
CRUD đơn giản✅ Nhanh, ít codeNhiều code hơn
Batch insert/update triệu recordsChậm do tracking overhead✅ Nhanh gấp 5-10x
Business logic phức tạp✅ Relationship, lazy loadingPhải JOIN thủ công
Reporting / analytics queriesOverhead không cần thiết✅ Kiểm soát SQL chính xác
Type safety (an toàn kiểu dữ liệu)✅ Mapped classes + mypyYếu hơn

SQLAlchemy 2.0 — Declarative Models hiện đại

SQLAlchemy 2.0 thay đổi hoàn toàn cách khai báo model bằng Mapped type annotations — bắt lỗi ngay lúc viết code thay vì runtime:

python
from datetime import datetime
from typing import Optional
from sqlalchemy import String, Text, ForeignKey, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship


class Base(DeclarativeBase):
    pass


class TimestampMixin:
    created_at: Mapped[datetime] = mapped_column(server_default=func.now())
    updated_at: Mapped[datetime] = mapped_column(
        server_default=func.now(), onupdate=func.now(),
    )


class User(TimestampMixin, Base):
    __tablename__ = "users"

    id: Mapped[int] = mapped_column(primary_key=True)
    email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
    full_name: Mapped[str] = mapped_column(String(150))
    is_active: Mapped[bool] = mapped_column(default=True)
    bio: Mapped[Optional[str]] = mapped_column(Text, nullable=True)

    orders: Mapped[list["Order"]] = relationship(
        back_populates="user", cascade="all, delete-orphan",
    )


class Order(TimestampMixin, Base):
    __tablename__ = "orders"

    id: Mapped[int] = mapped_column(primary_key=True)
    user_id: Mapped[int] = mapped_column(ForeignKey("users.id"), index=True)
    total_amount: Mapped[int] = mapped_column(comment="Đơn vị: VNĐ")
    status: Mapped[str] = mapped_column(String(20), default="pending", index=True)

    user: Mapped["User"] = relationship(back_populates="orders")

Tại sao Mapped quan trọng? Vì nó cho phép mypy và IDE phát hiện lỗi kiểu dữ liệu trước khi chạy code. Mapped[Optional[str]] nói rõ field này có thể None, còn Mapped[str] đảm bảo luôn có giá trị.

Async Sessions — Bất đồng bộ với database

Khi dùng FastAPI hoặc bất kỳ async framework nào, bắt buộc phải dùng async engine để tránh block event loop (vòng lặp sự kiện):

python
from sqlalchemy.ext.asyncio import (
    AsyncSession, async_sessionmaker, create_async_engine,
)
from contextlib import asynccontextmanager
from typing import AsyncGenerator

async_engine = create_async_engine(
    "postgresql+asyncpg://user:pass@localhost:5432/mydb",
    pool_size=20,
    max_overflow=10,
    pool_timeout=30,
    pool_recycle=1800,
    pool_pre_ping=True,
)

AsyncSessionLocal = async_sessionmaker(
    bind=async_engine, class_=AsyncSession, expire_on_commit=False,
)


@asynccontextmanager
async def get_db_session() -> AsyncGenerator[AsyncSession, None]:
    session = AsyncSessionLocal()
    try:
        yield session
        await session.commit()
    except Exception:
        await session.rollback()
        raise
    finally:
        await session.close()

Lưu ý quan trọng: Async session không hỗ trợ lazy loading mặc định. Mọi relationship phải được load eager hoặc qua selectinload — đây là nguồn gốc của rất nhiều bug trong async code.

Relationship Loading Strategies — Chiến lược tải quan hệ

Đây là phần quyết định hiệu năng nhất khi dùng ORM. Mỗi strategy phù hợp với ngữ cảnh khác nhau:

python
from sqlalchemy.orm import selectinload, joinedload, raiseload
from sqlalchemy import select

# JOINED LOAD — JOIN trong cùng một query (tốt cho one-to-one)
stmt = (
    select(User)
    .options(joinedload(User.orders))
    .where(User.is_active == True)
)

# SELECTIN LOAD — query riêng với IN clause ✅ KHUYÊN DÙNG cho async
stmt = (
    select(User)
    .options(selectinload(User.orders))
    .where(User.is_active == True)
)

# RAISE LOAD — ném exception nếu lazy load ✅ BẮT BUỘC trong async
stmt = select(User).options(selectinload(User.orders), raiseload("*"))
StrategySố queriesBộ nhớDùng khi
lazyloadN+1 (tệ nhất)ThấpChỉ khi chắc chắn không loop
joinedload1Cao (dữ liệu trùng lặp)One-to-one, collection nhỏ
selectinload2-3Trung bình✅ Mặc định tốt nhất
subqueryload2Trung bìnhDataset rất lớn
raiseload0 (raise error)Debug, async safety

Alembic Migrations — Quản lý schema evolution

Alembic là công cụ migration chính thức cho SQLAlchemy — giống như Git nhưng cho database schema. Mỗi thay đổi schema phải qua migration, không bao giờ chạy CREATE TABLE thủ công trên production.

bash
# Khởi tạo Alembic
alembic init alembic

# Tạo migration tự động từ model changes
alembic revision --autogenerate -m "add_users_and_orders_tables"

# Áp dụng migration
alembic upgrade head

# Rollback một bước
alembic downgrade -1

File migration production-grade:

python
"""add_users_table

Revision ID: a1b2c3d4e5f6
Revises: None
"""
from alembic import op
import sqlalchemy as sa

revision: str = "a1b2c3d4e5f6"
down_revision: str | None = None


def upgrade() -> None:
    op.create_table(
        "users",
        sa.Column("id", sa.Integer(), primary_key=True),
        sa.Column("email", sa.String(255), nullable=False),
        sa.Column("full_name", sa.String(150), nullable=False),
        sa.Column("is_active", sa.Boolean(), server_default="true"),
        sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now()),
    )
    op.create_index("ix_users_email", "users", ["email"], unique=True)


def downgrade() -> None:
    op.drop_index("ix_users_email", table_name="users")
    op.drop_table("users")

Quy tắc vàng cho migrations:

  • Luôn viết hàm downgrade() — không bao giờ để trống
  • Đặt tên index rõ ràng thay vì để autogenerate
  • Test migration trên bản sao database trước khi chạy production
  • Migrations phải idempotent (chạy lại không lỗi) khi có thể

Thực chiến

Tình huống: Thiết kế database layer cho hệ thống e-commerce với hàng triệu records

Yêu cầu: API xử lý 2.000 request/giây, database PostgreSQL với 5 triệu users, 20 triệu orders. Cần connection pooling, error handling đúng chuẩn, và monitoring.

Database configuration module

python
"""database.py — Production database configuration."""
import logging
from typing import AsyncGenerator
from contextlib import asynccontextmanager

from sqlalchemy.ext.asyncio import (
    AsyncSession, async_sessionmaker, create_async_engine, AsyncEngine,
)
from sqlalchemy import text

logger = logging.getLogger(__name__)


class DatabaseConfig:
    def __init__(self, database_url: str, *, debug: bool = False) -> None:
        self._engine: AsyncEngine = create_async_engine(
            database_url,
            pool_size=20,
            max_overflow=10,
            pool_timeout=30,
            pool_recycle=1800,
            pool_pre_ping=True,
            echo=debug,
        )
        self._session_factory = async_sessionmaker(
            bind=self._engine, class_=AsyncSession, expire_on_commit=False,
        )

    @asynccontextmanager
    async def session(self) -> AsyncGenerator[AsyncSession, None]:
        async with self._session_factory() as session:
            try:
                yield session
                await session.commit()
            except Exception:
                await session.rollback()
                logger.exception("Database session error")
                raise

    async def check_health(self) -> bool:
        try:
            async with self._engine.connect() as conn:
                await conn.execute(text("SELECT 1"))
            return True
        except Exception:
            return False

    async def dispose(self) -> None:
        await self._engine.dispose()


db = DatabaseConfig("postgresql+asyncpg://user:pass@localhost:5432/ecommerce")

Repository pattern cho business logic

python
"""repositories.py — Data access layer tách biệt business logic khỏi ORM."""
from typing import Sequence, Optional
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from models import User, Order


class UserRepository:
    def __init__(self, session: AsyncSession) -> None:
        self._session = session

    async def get_by_id(self, user_id: int) -> Optional[User]:
        stmt = (
            select(User)
            .options(selectinload(User.orders))
            .where(User.id == user_id)
        )
        result = await self._session.execute(stmt)
        return result.scalar_one_or_none()

    async def get_by_email(self, email: str) -> Optional[User]:
        result = await self._session.execute(
            select(User).where(User.email == email)
        )
        return result.scalar_one_or_none()

    async def list_active(self, *, offset: int = 0, limit: int = 20) -> Sequence[User]:
        stmt = (
            select(User)
            .where(User.is_active == True)
            .order_by(User.created_at.desc())
            .offset(offset)
            .limit(limit)
        )
        return (await self._session.execute(stmt)).scalars().all()

    async def create(self, *, email: str, full_name: str) -> User:
        user = User(email=email, full_name=full_name)
        self._session.add(user)
        await self._session.flush()
        return user

Tích hợp với FastAPI

python
"""main.py — FastAPI application với database lifecycle."""
from contextlib import asynccontextmanager
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from database import db
from repositories import UserRepository


@asynccontextmanager
async def lifespan(app: FastAPI):
    health = await db.check_health()
    if not health:
        raise RuntimeError("Cannot connect to database")
    yield
    await db.dispose()


app = FastAPI(lifespan=lifespan)


async def get_session():
    async with db.session() as session:
        yield session


@app.get("/users/{user_id}")
async def get_user(
    user_id: int,
    session: AsyncSession = Depends(get_session),
):
    repo = UserRepository(session)
    user = await repo.get_by_id(user_id)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return {"id": user.id, "email": user.email, "full_name": user.full_name}

Sai lầm điển hình

1. N+1 Query — Kẻ giết hiệu năng thầm lặng

SAI — Mỗi user trong vòng lặp tạo thêm 1 query:

python
# ❌ N+1 queries — 1 query lấy users + N queries lấy orders
result = await session.execute(select(User))
users = result.scalars().all()
for user in users:
    print(f"{user.email}: {len(user.orders)} orders")  # mỗi lần = 1 query

ĐÚNG — Dùng selectinload để gộp thành 2 queries:

python
# ✅ Chỉ 2 queries bất kể bao nhiêu users
stmt = select(User).options(selectinload(User.orders)).where(User.is_active == True)
result = await session.execute(stmt)
for user in result.scalars().all():
    print(f"{user.email}: {len(user.orders)} orders")

2. Session leak — Quên đóng session

SAI — Session không được đóng khi có exception:

python
# ❌ Session leak — exception → connection bị giữ → pool cạn
order = Order(**data)
session.add(order)
await session.commit()

ĐÚNG — Luôn dùng context manager:

python
# ✅ Context manager đảm bảo cleanup
async with db.session() as session:
    order = Order(**data)
    session.add(order)
    await session.flush()
    return order.id

3. Thiếu index — Query chậm gấp 100x

SAI — Foreign key không có index:

python
# ❌ Không index → full table scan 20 triệu rows
class Order(Base):
    __tablename__ = "orders"
    user_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
    status: Mapped[str] = mapped_column(String(20))

ĐÚNG — Index mọi cột dùng trong WHERE, JOIN, ORDER BY:

python
# ✅ Index cho các cột truy vấn thường xuyên
class Order(Base):
    __tablename__ = "orders"
    user_id: Mapped[int] = mapped_column(ForeignKey("users.id"), index=True)
    status: Mapped[str] = mapped_column(String(20), index=True)

4. Lazy loading trong async — Lỗi runtime bất ngờ

SAI — Truy cập relationship chưa được load trong async:

python
# ❌ MissingGreenlet error trong async
user = await session.get(User, user_id)
order_count = len(user.orders)  # 💥 MissingGreenlet!

ĐÚNG — Luôn eager load hoặc dùng raiseload:

python
# ✅ Eager load trước khi truy cập
stmt = (
    select(User)
    .options(selectinload(User.orders), raiseload("*"))
    .where(User.id == user_id)
)
result = await session.execute(stmt)
user = result.scalar_one_or_none()
if user:
    order_count = len(user.orders)

5. Commit trong vòng lặp — Chậm gấp 100x

SAI — Mỗi record một lần commit:

python
# ❌ 10.000 commits = 10.000 round trips
for data in users_data:
    session.add(User(**data))
    await session.commit()

ĐÚNG — Batch insert với Core:

python
# ✅ Batch insert — một lần commit duy nhất
from sqlalchemy import insert

batch_size = 1000
for i in range(0, len(users_data), batch_size):
    await session.execute(insert(User), users_data[i : i + batch_size])
await session.commit()

Under the Hood

Connection Pool — Bể chứa kết nối

SQLAlchemy dùng QueuePool mặc định — duy trì một tập hợp connections sẵn sàng thay vì tạo mới cho mỗi request. Hiểu cơ chế này giúp tránh hai vấn đề production phổ biến nhất: connection exhaustion (cạn kết nối) và stale connections (kết nối cũ).

Request đến → Pool có connection rảnh?
    ├── CÓ → Cấp connection (checkout)
    │         └── Request dùng xong → Trả về pool (checkin)

    └── KHÔNG → Pool đã đầy (pool_size + max_overflow)?
                ├── CHƯA → Tạo connection mới (overflow)
                └── RỒI → Đợi pool_timeout giây
                          ├── Có connection trả về → dùng
                          └── Timeout → raise TimeoutError

pool_pre_ping=True giải quyết vấn đề "stale connection" — trước mỗi lần cấp connection, SQLAlchemy gửi SELECT 1 để kiểm tra. Chi phí: ~1ms mỗi request. Đáng đánh đổi so với việc nhận OperationalError: server closed the connection unexpectedly lúc 3 giờ sáng.

Session Lifecycle — Vòng đời phiên làm việc

Session là trung tâm của ORM, implement hai pattern quan trọng:

  1. Identity Map (bản đồ định danh): Mỗi session giữ một dictionary {(class, primary_key): instance}. Nếu bạn query cùng một User hai lần, session trả về cùng một object — đảm bảo consistency.

  2. Unit of Work (đơn vị công việc): Session theo dõi mọi thay đổi (new, dirty, deleted) và flush tất cả trong một transaction khi commit(). Thứ tự flush được tính toán tự động dựa trên foreign key dependencies.

session = Session()


    [TRANSIENT]     object mới tạo, chưa add vào session
        │ session.add()

    [PENDING]       đã add, chưa flush — chưa có trong DB
        │ session.flush() hoặc auto-flush

    [PERSISTENT]    đã có trong DB, session đang track
        │ session.commit()

    [DETACHED]      commit xong, object tách khỏi session
                    (nếu expire_on_commit=True, attributes bị expired)

Bảng tối ưu hiệu năng

Tham sốMặc địnhProduction khuyến nghịGiải thích
pool_size515-30Số connection thường trực. Rule: (CPU cores * 2) + disk spindles
max_overflow1010-20Connection tạm khi peak. Tổng max = pool_size + max_overflow
pool_timeout3030Giây chờ lấy connection. Quá thấp → lỗi giả, quá cao → request treo
pool_recycle-1 (vô hạn)1800Tái tạo connection sau N giây. Tránh timeout từ DB/firewall
pool_pre_pingFalseTrueKiểm tra connection sống. Chi phí ~1ms, tránh stale connection
echoFalseFalseLog toàn bộ SQL. Chỉ bật khi debug, tắt trên production
expire_on_commitTrueFalse (async)Khi True, attributes bị expired sau commit → trigger lazy load

Công thức tính pool_size: Đo số request đồng thời trung bình (concurrent requests), nhân 1.5 làm buffer. Ví dụ: 20 concurrent requests → pool_size=30. Dùng monitoring để điều chỉnh — nếu thấy nhiều overflow connections, tăng pool_size.

Checklist ghi nhớ

✅ Checklist triển khai

Cấu hình Engine

  • [ ] Dùng create_async_engine cho async frameworks (FastAPI, Starlette)
  • [ ] Đặt pool_pre_ping=True để tránh stale connections
  • [ ] Cấu hình pool_recycle=1800 để tương thích với firewall/DB timeout
  • [ ] Tắt echo=True trên production

Models & Relationships

  • [ ] Dùng Mapped[] type annotations (SQLAlchemy 2.0 style)
  • [ ] Đặt index=True trên mọi foreign key và cột dùng trong WHERE/ORDER BY
  • [ ] Dùng selectinload làm loading strategy mặc định cho async
  • [ ] Thêm raiseload("*") trong async code để phát hiện missing eager loads
  • [ ] Tạo TimestampMixin cho created_at / updated_at

Session Management

  • [ ] Luôn dùng context manager cho session (async with)
  • [ ] Đặt expire_on_commit=False cho async sessions
  • [ ] Commit một lần cuối transaction, không commit trong vòng lặp
  • [ ] Dùng flush() thay commit() khi cần ID mà chưa muốn kết thúc transaction

Migrations

  • [ ] Mọi thay đổi schema qua Alembic migration — không chạy DDL thủ công
  • [ ] Luôn viết hàm downgrade() đầy đủ
  • [ ] Đặt tên index/constraint rõ ràng, không dùng tên tự sinh
  • [ ] Test migration trên bản sao database trước khi áp dụng production

Bài tập luyện tập

🧠 Quiz — Bài 1: Phân tích N+1 Query

Đoạn code sau chạy bao nhiêu queries khi có 500 users trong database?

python
async def get_report(session: AsyncSession):
    result = await session.execute(select(User))
    users = result.scalars().all()

    report = []
    for user in users:
        stmt = select(func.count()).select_from(Order).where(
            Order.user_id == user.id
        )
        count_result = await session.execute(stmt)
        order_count = count_result.scalar_one()
        report.append({"email": user.email, "orders": order_count})

    return report

A. 1 query B. 2 queries C. 500 queries D. 501 queries

📝 Đáp án và giải thích

Đáp án: D. 501 queries

  • 1 query đầu: SELECT * FROM users
  • 500 queries trong vòng lặp: mỗi user 1 lần SELECT count(*) FROM orders WHERE user_id = ?

Cách tối ưu: Dùng một query duy nhất với GROUP BY:

python
async def get_report(session: AsyncSession):
    stmt = (
        select(
            User.email,
            func.count(Order.id).label("order_count"),
        )
        .outerjoin(Order, User.id == Order.user_id)
        .group_by(User.id, User.email)
    )
    result = await session.execute(stmt)
    return [
        {"email": row.email, "orders": row.order_count}
        for row in result.all()
    ]

Từ 501 queries → 1 query duy nhất. Với 500 users, thời gian giảm từ ~2.5 giây xuống ~50ms.

🧠 Quiz — Bài 2: Thiết kế Repository

Viết một ProductRepository với các method sau:

  • get_by_id(product_id) — trả về product kèm category
  • search(keyword, min_price, max_price) — tìm kiếm với phân trang
  • bulk_update_prices(product_ids, discount_percent) — giảm giá hàng loạt dùng Core

Model:

python
class Product(Base):
    __tablename__ = "products"
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(200))
    price: Mapped[int] = mapped_column(comment="VNĐ")
    category_id: Mapped[int] = mapped_column(
        ForeignKey("categories.id"), index=True,
    )
    category: Mapped["Category"] = relationship()
📝 Lời giải tham khảo
python
class ProductRepository:
    def __init__(self, session: AsyncSession) -> None:
        self._session = session

    async def get_by_id(self, product_id: int) -> Optional[Product]:
        stmt = select(Product).options(selectinload(Product.category)).where(Product.id == product_id)
        return (await self._session.execute(stmt)).scalar_one_or_none()

    async def search(
        self, *, keyword: Optional[str] = None,
        min_price: Optional[int] = None, max_price: Optional[int] = None,
        offset: int = 0, limit: int = 20,
    ) -> tuple[Sequence[Product], int]:
        conditions = []
        if keyword:
            conditions.append(Product.name.ilike(f"%{keyword}%"))
        if min_price is not None:
            conditions.append(Product.price >= min_price)
        if max_price is not None:
            conditions.append(Product.price <= max_price)
        where_clause = and_(*conditions) if conditions else True
        total = (await self._session.execute(
            select(func.count()).select_from(Product).where(where_clause)
        )).scalar_one()
        result = await self._session.execute(
            select(Product).options(selectinload(Product.category))
            .where(where_clause).order_by(Product.name).offset(offset).limit(limit)
        )
        return result.scalars().all(), total

    async def bulk_update_prices(self, product_ids: list[int], discount_percent: int) -> int:
        if not product_ids or not (1 <= discount_percent <= 99):
            return 0
        multiplier = (100 - discount_percent) / 100
        stmt = update(Product).where(Product.id.in_(product_ids)).values(
            price=func.round(Product.price * multiplier)
        )
        return (await self._session.execute(stmt)).rowcount

Điểm chính: get_by_id dùng selectinload, search trả về (data, count) cho phân trang, bulk_update_prices dùng Core update() — 1 query thay vì N queries.

🧠 Quiz — Bài 3: Debug Connection Pool

Production monitoring báo cáo: pool_size=20, nhưng thường xuyên thấy 30+ connections active và timeout errors. Phân tích nguyên nhân và đề xuất giải pháp.

📝 Phân tích và giải pháp

Nguyên nhân có thể:

  1. Session leak — code path không đóng session (thiếu context manager)
  2. Long-running transactions — query giữ session quá lâu
  3. max_overflow quá cao — cho phép tạo quá nhiều connection tạm

Giải pháp:

python
engine = create_async_engine(
    DATABASE_URL,
    pool_size=20, max_overflow=10, pool_timeout=30,
    echo_pool="debug",
)

@event.listens_for(engine.sync_engine, "connect")
def set_connection_timeout(dbapi_conn, connection_record):
    cursor = dbapi_conn.cursor()
    cursor.execute("SET statement_timeout = '30s'")
    cursor.close()

async def get_pool_stats() -> dict:
    pool = engine.pool
    return {"pool_size": pool.size(), "checked_out": pool.checkedout()}

Checklist: Bật echo_pool="debug", tìm session không trong context manager, đặt statement_timeout, monitor pool.checkedout().

Liên kết học tiếp