Giao diện
SQLAlchemy — Database Toolkit cho Python
Năm 2023, một startup fintech tại TP.HCM phải rollback toàn bộ release lúc 2 giờ sáng vì một migration Alembic thiếu down_revision. Hệ thống xử lý 50.000 giao dịch/ngày bị lock toàn bộ bảng transactions trong 12 phút — đủ để mất hàng trăm triệu đồng doanh thu. Nguyên nhân gốc không phải bug logic mà là thiếu hiểu biết về cách SQLAlchemy quản lý connection, session, và schema evolution.
SQLAlchemy không chỉ là một ORM (Object-Relational Mapper). Nó là một database toolkit hoàn chỉnh với hai tầng kiến trúc riêng biệt: Core cho SQL expression thuần và ORM cho object mapping. Sai lầm phổ biến nhất của developer Python là chỉ dùng ORM mà không hiểu Core bên dưới — giống như lái xe số tự động mà không biết nguyên lý hộp số. Khi gặp vấn đề hiệu năng với hàng triệu records, bạn sẽ không biết phải tối ưu từ đâu.
Bài viết này đi sâu vào kiến trúc thực sự của SQLAlchemy 2.0, từ connection pooling đến identity map, từ Unit of Work pattern đến async session — những kiến thức mà production code đòi hỏi nhưng tutorial thông thường bỏ qua.
Bức tranh tư duy
Hãy tưởng tượng SQLAlchemy như một nhà hàng có hệ thống bếp chuyên nghiệp. Tầng ORM giống như thực đơn — bạn gọi "Phở bò tái" và nhà bếp lo mọi thứ. Tầng Core giống như bạn vào thẳng bếp, tự chọn nguyên liệu, tự nấu — kiểm soát hoàn toàn nhưng phải hiểu quy trình. Connection Pool (nhóm kết nối) là đội ngũ phục vụ — có giới hạn, nếu hết bàn thì khách mới phải đợi. Session giống như một bữa ăn — bắt đầu khi ngồi xuống, kết thúc khi thanh toán, và mọi thay đổi (gọi thêm món, hủy món) chỉ được xác nhận khi commit.
┌────────────────────────────────────────────────┐
│ Application Code (FastAPI, Flask) │
├────────────────────────────────────────────────┤
│ ┌─────────────────┐ ┌──────────────────┐ │
│ │ ORM Layer │ │ Core Layer │ │
│ │ Mapped Classes │ │ select/insert │ │
│ │ Session (UoW) ─│───│─ Connection │ │
│ └────────┬────────┘ └───────┬──────────┘ │
│ └──────┬─────────────┘ │
│ ┌───────▼───────────┐ │
│ │ Engine + QueuePool│ │
│ └───────┬───────────┘ │
├──────────────────┼───────────────────────────────┤
│ ┌───────▼───────────┐ │
│ │ DBAPI (asyncpg) │ │
│ └───────┬───────────┘ │
│ ┌───────▼───────────┐ │
│ │ PostgreSQL/MySQL │ │
│ └───────────────────┘ │
└────────────────────────────────────────────────┘Nguyên tắc cốt lõi: ORM xây dựng trên Core, không thay thế Core. Trong production, bạn sẽ dùng cả hai — ORM cho business logic thông thường, Core cho batch operations và queries phức tạp cần kiểm soát hiệu năng.
Cốt lõi kỹ thuật
ORM vs Core — Khi nào dùng gì?
Câu hỏi đầu tiên mọi team đều đặt ra: dùng ORM hay viết SQL thuần? Câu trả lời là cả hai, nhưng ở đúng ngữ cảnh.
Core — SQL Expression Language — cho phép xây dựng câu SQL bằng Python objects mà không cần mapping sang class:
python
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, select
engine = create_engine("postgresql+psycopg2://user:pass@localhost:5432/mydb")
metadata = MetaData()
users_table = Table(
"users", metadata,
Column("id", Integer, primary_key=True),
Column("email", String(255), nullable=False, unique=True),
Column("display_name", String(100), nullable=False),
)
stmt = select(users_table.c.email).where(users_table.c.id > 1000).limit(50)
with engine.connect() as conn:
for row in conn.execute(stmt):
print(row.email)ORM — mapping Python class sang database table, tự động tracking thay đổi:
python
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, Session
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
email: Mapped[str] = mapped_column(String(255), unique=True)
display_name: Mapped[str] = mapped_column(String(100))
with Session(engine) as session:
user = session.get(User, 42)
if user:
user.display_name = "Nguyễn Văn A"
session.commit()| Tiêu chí | ORM | Core |
|---|---|---|
| CRUD đơn giản | ✅ Nhanh, ít code | Nhiều code hơn |
| Batch insert/update triệu records | Chậm do tracking overhead | ✅ Nhanh gấp 5-10x |
| Business logic phức tạp | ✅ Relationship, lazy loading | Phải JOIN thủ công |
| Reporting / analytics queries | Overhead không cần thiết | ✅ Kiểm soát SQL chính xác |
| Type safety (an toàn kiểu dữ liệu) | ✅ Mapped classes + mypy | Yếu hơn |
SQLAlchemy 2.0 — Declarative Models hiện đại
SQLAlchemy 2.0 thay đổi hoàn toàn cách khai báo model bằng Mapped type annotations — bắt lỗi ngay lúc viết code thay vì runtime:
python
from datetime import datetime
from typing import Optional
from sqlalchemy import String, Text, ForeignKey, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
class Base(DeclarativeBase):
pass
class TimestampMixin:
created_at: Mapped[datetime] = mapped_column(server_default=func.now())
updated_at: Mapped[datetime] = mapped_column(
server_default=func.now(), onupdate=func.now(),
)
class User(TimestampMixin, Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
full_name: Mapped[str] = mapped_column(String(150))
is_active: Mapped[bool] = mapped_column(default=True)
bio: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
orders: Mapped[list["Order"]] = relationship(
back_populates="user", cascade="all, delete-orphan",
)
class Order(TimestampMixin, Base):
__tablename__ = "orders"
id: Mapped[int] = mapped_column(primary_key=True)
user_id: Mapped[int] = mapped_column(ForeignKey("users.id"), index=True)
total_amount: Mapped[int] = mapped_column(comment="Đơn vị: VNĐ")
status: Mapped[str] = mapped_column(String(20), default="pending", index=True)
user: Mapped["User"] = relationship(back_populates="orders")Tại sao Mapped quan trọng? Vì nó cho phép mypy và IDE phát hiện lỗi kiểu dữ liệu trước khi chạy code. Mapped[Optional[str]] nói rõ field này có thể None, còn Mapped[str] đảm bảo luôn có giá trị.
Async Sessions — Bất đồng bộ với database
Khi dùng FastAPI hoặc bất kỳ async framework nào, bắt buộc phải dùng async engine để tránh block event loop (vòng lặp sự kiện):
python
from sqlalchemy.ext.asyncio import (
AsyncSession, async_sessionmaker, create_async_engine,
)
from contextlib import asynccontextmanager
from typing import AsyncGenerator
async_engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost:5432/mydb",
pool_size=20,
max_overflow=10,
pool_timeout=30,
pool_recycle=1800,
pool_pre_ping=True,
)
AsyncSessionLocal = async_sessionmaker(
bind=async_engine, class_=AsyncSession, expire_on_commit=False,
)
@asynccontextmanager
async def get_db_session() -> AsyncGenerator[AsyncSession, None]:
session = AsyncSessionLocal()
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()Lưu ý quan trọng: Async session không hỗ trợ lazy loading mặc định. Mọi relationship phải được load eager hoặc qua selectinload — đây là nguồn gốc của rất nhiều bug trong async code.
Relationship Loading Strategies — Chiến lược tải quan hệ
Đây là phần quyết định hiệu năng nhất khi dùng ORM. Mỗi strategy phù hợp với ngữ cảnh khác nhau:
python
from sqlalchemy.orm import selectinload, joinedload, raiseload
from sqlalchemy import select
# JOINED LOAD — JOIN trong cùng một query (tốt cho one-to-one)
stmt = (
select(User)
.options(joinedload(User.orders))
.where(User.is_active == True)
)
# SELECTIN LOAD — query riêng với IN clause ✅ KHUYÊN DÙNG cho async
stmt = (
select(User)
.options(selectinload(User.orders))
.where(User.is_active == True)
)
# RAISE LOAD — ném exception nếu lazy load ✅ BẮT BUỘC trong async
stmt = select(User).options(selectinload(User.orders), raiseload("*"))| Strategy | Số queries | Bộ nhớ | Dùng khi |
|---|---|---|---|
lazyload | N+1 (tệ nhất) | Thấp | Chỉ khi chắc chắn không loop |
joinedload | 1 | Cao (dữ liệu trùng lặp) | One-to-one, collection nhỏ |
selectinload | 2-3 | Trung bình | ✅ Mặc định tốt nhất |
subqueryload | 2 | Trung bình | Dataset rất lớn |
raiseload | 0 (raise error) | — | Debug, async safety |
Alembic Migrations — Quản lý schema evolution
Alembic là công cụ migration chính thức cho SQLAlchemy — giống như Git nhưng cho database schema. Mỗi thay đổi schema phải qua migration, không bao giờ chạy CREATE TABLE thủ công trên production.
bash
# Khởi tạo Alembic
alembic init alembic
# Tạo migration tự động từ model changes
alembic revision --autogenerate -m "add_users_and_orders_tables"
# Áp dụng migration
alembic upgrade head
# Rollback một bước
alembic downgrade -1File migration production-grade:
python
"""add_users_table
Revision ID: a1b2c3d4e5f6
Revises: None
"""
from alembic import op
import sqlalchemy as sa
revision: str = "a1b2c3d4e5f6"
down_revision: str | None = None
def upgrade() -> None:
op.create_table(
"users",
sa.Column("id", sa.Integer(), primary_key=True),
sa.Column("email", sa.String(255), nullable=False),
sa.Column("full_name", sa.String(150), nullable=False),
sa.Column("is_active", sa.Boolean(), server_default="true"),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now()),
)
op.create_index("ix_users_email", "users", ["email"], unique=True)
def downgrade() -> None:
op.drop_index("ix_users_email", table_name="users")
op.drop_table("users")Quy tắc vàng cho migrations:
- Luôn viết hàm
downgrade()— không bao giờ để trống - Đặt tên index rõ ràng thay vì để autogenerate
- Test migration trên bản sao database trước khi chạy production
- Migrations phải idempotent (chạy lại không lỗi) khi có thể
Thực chiến
Tình huống: Thiết kế database layer cho hệ thống e-commerce với hàng triệu records
Yêu cầu: API xử lý 2.000 request/giây, database PostgreSQL với 5 triệu users, 20 triệu orders. Cần connection pooling, error handling đúng chuẩn, và monitoring.
Database configuration module
python
"""database.py — Production database configuration."""
import logging
from typing import AsyncGenerator
from contextlib import asynccontextmanager
from sqlalchemy.ext.asyncio import (
AsyncSession, async_sessionmaker, create_async_engine, AsyncEngine,
)
from sqlalchemy import text
logger = logging.getLogger(__name__)
class DatabaseConfig:
def __init__(self, database_url: str, *, debug: bool = False) -> None:
self._engine: AsyncEngine = create_async_engine(
database_url,
pool_size=20,
max_overflow=10,
pool_timeout=30,
pool_recycle=1800,
pool_pre_ping=True,
echo=debug,
)
self._session_factory = async_sessionmaker(
bind=self._engine, class_=AsyncSession, expire_on_commit=False,
)
@asynccontextmanager
async def session(self) -> AsyncGenerator[AsyncSession, None]:
async with self._session_factory() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
logger.exception("Database session error")
raise
async def check_health(self) -> bool:
try:
async with self._engine.connect() as conn:
await conn.execute(text("SELECT 1"))
return True
except Exception:
return False
async def dispose(self) -> None:
await self._engine.dispose()
db = DatabaseConfig("postgresql+asyncpg://user:pass@localhost:5432/ecommerce")Repository pattern cho business logic
python
"""repositories.py — Data access layer tách biệt business logic khỏi ORM."""
from typing import Sequence, Optional
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from models import User, Order
class UserRepository:
def __init__(self, session: AsyncSession) -> None:
self._session = session
async def get_by_id(self, user_id: int) -> Optional[User]:
stmt = (
select(User)
.options(selectinload(User.orders))
.where(User.id == user_id)
)
result = await self._session.execute(stmt)
return result.scalar_one_or_none()
async def get_by_email(self, email: str) -> Optional[User]:
result = await self._session.execute(
select(User).where(User.email == email)
)
return result.scalar_one_or_none()
async def list_active(self, *, offset: int = 0, limit: int = 20) -> Sequence[User]:
stmt = (
select(User)
.where(User.is_active == True)
.order_by(User.created_at.desc())
.offset(offset)
.limit(limit)
)
return (await self._session.execute(stmt)).scalars().all()
async def create(self, *, email: str, full_name: str) -> User:
user = User(email=email, full_name=full_name)
self._session.add(user)
await self._session.flush()
return userTích hợp với FastAPI
python
"""main.py — FastAPI application với database lifecycle."""
from contextlib import asynccontextmanager
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from database import db
from repositories import UserRepository
@asynccontextmanager
async def lifespan(app: FastAPI):
health = await db.check_health()
if not health:
raise RuntimeError("Cannot connect to database")
yield
await db.dispose()
app = FastAPI(lifespan=lifespan)
async def get_session():
async with db.session() as session:
yield session
@app.get("/users/{user_id}")
async def get_user(
user_id: int,
session: AsyncSession = Depends(get_session),
):
repo = UserRepository(session)
user = await repo.get_by_id(user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return {"id": user.id, "email": user.email, "full_name": user.full_name}Sai lầm điển hình
1. N+1 Query — Kẻ giết hiệu năng thầm lặng
SAI — Mỗi user trong vòng lặp tạo thêm 1 query:
python
# ❌ N+1 queries — 1 query lấy users + N queries lấy orders
result = await session.execute(select(User))
users = result.scalars().all()
for user in users:
print(f"{user.email}: {len(user.orders)} orders") # mỗi lần = 1 queryĐÚNG — Dùng selectinload để gộp thành 2 queries:
python
# ✅ Chỉ 2 queries bất kể bao nhiêu users
stmt = select(User).options(selectinload(User.orders)).where(User.is_active == True)
result = await session.execute(stmt)
for user in result.scalars().all():
print(f"{user.email}: {len(user.orders)} orders")2. Session leak — Quên đóng session
SAI — Session không được đóng khi có exception:
python
# ❌ Session leak — exception → connection bị giữ → pool cạn
order = Order(**data)
session.add(order)
await session.commit()ĐÚNG — Luôn dùng context manager:
python
# ✅ Context manager đảm bảo cleanup
async with db.session() as session:
order = Order(**data)
session.add(order)
await session.flush()
return order.id3. Thiếu index — Query chậm gấp 100x
SAI — Foreign key không có index:
python
# ❌ Không index → full table scan 20 triệu rows
class Order(Base):
__tablename__ = "orders"
user_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
status: Mapped[str] = mapped_column(String(20))ĐÚNG — Index mọi cột dùng trong WHERE, JOIN, ORDER BY:
python
# ✅ Index cho các cột truy vấn thường xuyên
class Order(Base):
__tablename__ = "orders"
user_id: Mapped[int] = mapped_column(ForeignKey("users.id"), index=True)
status: Mapped[str] = mapped_column(String(20), index=True)4. Lazy loading trong async — Lỗi runtime bất ngờ
SAI — Truy cập relationship chưa được load trong async:
python
# ❌ MissingGreenlet error trong async
user = await session.get(User, user_id)
order_count = len(user.orders) # 💥 MissingGreenlet!ĐÚNG — Luôn eager load hoặc dùng raiseload:
python
# ✅ Eager load trước khi truy cập
stmt = (
select(User)
.options(selectinload(User.orders), raiseload("*"))
.where(User.id == user_id)
)
result = await session.execute(stmt)
user = result.scalar_one_or_none()
if user:
order_count = len(user.orders)5. Commit trong vòng lặp — Chậm gấp 100x
SAI — Mỗi record một lần commit:
python
# ❌ 10.000 commits = 10.000 round trips
for data in users_data:
session.add(User(**data))
await session.commit()ĐÚNG — Batch insert với Core:
python
# ✅ Batch insert — một lần commit duy nhất
from sqlalchemy import insert
batch_size = 1000
for i in range(0, len(users_data), batch_size):
await session.execute(insert(User), users_data[i : i + batch_size])
await session.commit()Under the Hood
Connection Pool — Bể chứa kết nối
SQLAlchemy dùng QueuePool mặc định — duy trì một tập hợp connections sẵn sàng thay vì tạo mới cho mỗi request. Hiểu cơ chế này giúp tránh hai vấn đề production phổ biến nhất: connection exhaustion (cạn kết nối) và stale connections (kết nối cũ).
Request đến → Pool có connection rảnh?
├── CÓ → Cấp connection (checkout)
│ └── Request dùng xong → Trả về pool (checkin)
│
└── KHÔNG → Pool đã đầy (pool_size + max_overflow)?
├── CHƯA → Tạo connection mới (overflow)
└── RỒI → Đợi pool_timeout giây
├── Có connection trả về → dùng
└── Timeout → raise TimeoutErrorpool_pre_ping=True giải quyết vấn đề "stale connection" — trước mỗi lần cấp connection, SQLAlchemy gửi SELECT 1 để kiểm tra. Chi phí: ~1ms mỗi request. Đáng đánh đổi so với việc nhận OperationalError: server closed the connection unexpectedly lúc 3 giờ sáng.
Session Lifecycle — Vòng đời phiên làm việc
Session là trung tâm của ORM, implement hai pattern quan trọng:
Identity Map (bản đồ định danh): Mỗi session giữ một dictionary
{(class, primary_key): instance}. Nếu bạn query cùng một User hai lần, session trả về cùng một object — đảm bảo consistency.Unit of Work (đơn vị công việc): Session theo dõi mọi thay đổi (new, dirty, deleted) và flush tất cả trong một transaction khi
commit(). Thứ tự flush được tính toán tự động dựa trên foreign key dependencies.
session = Session()
│
▼
[TRANSIENT] object mới tạo, chưa add vào session
│ session.add()
▼
[PENDING] đã add, chưa flush — chưa có trong DB
│ session.flush() hoặc auto-flush
▼
[PERSISTENT] đã có trong DB, session đang track
│ session.commit()
▼
[DETACHED] commit xong, object tách khỏi session
(nếu expire_on_commit=True, attributes bị expired)Bảng tối ưu hiệu năng
| Tham số | Mặc định | Production khuyến nghị | Giải thích |
|---|---|---|---|
pool_size | 5 | 15-30 | Số connection thường trực. Rule: (CPU cores * 2) + disk spindles |
max_overflow | 10 | 10-20 | Connection tạm khi peak. Tổng max = pool_size + max_overflow |
pool_timeout | 30 | 30 | Giây chờ lấy connection. Quá thấp → lỗi giả, quá cao → request treo |
pool_recycle | -1 (vô hạn) | 1800 | Tái tạo connection sau N giây. Tránh timeout từ DB/firewall |
pool_pre_ping | False | True | Kiểm tra connection sống. Chi phí ~1ms, tránh stale connection |
echo | False | False | Log toàn bộ SQL. Chỉ bật khi debug, tắt trên production |
expire_on_commit | True | False (async) | Khi True, attributes bị expired sau commit → trigger lazy load |
Công thức tính pool_size: Đo số request đồng thời trung bình (concurrent requests), nhân 1.5 làm buffer. Ví dụ: 20 concurrent requests → pool_size=30. Dùng monitoring để điều chỉnh — nếu thấy nhiều overflow connections, tăng pool_size.
Checklist ghi nhớ
✅ Checklist triển khai
Cấu hình Engine
- [ ] Dùng
create_async_enginecho async frameworks (FastAPI, Starlette) - [ ] Đặt
pool_pre_ping=Trueđể tránh stale connections - [ ] Cấu hình
pool_recycle=1800để tương thích với firewall/DB timeout - [ ] Tắt
echo=Truetrên production
Models & Relationships
- [ ] Dùng
Mapped[]type annotations (SQLAlchemy 2.0 style) - [ ] Đặt
index=Truetrên mọi foreign key và cột dùng trong WHERE/ORDER BY - [ ] Dùng
selectinloadlàm loading strategy mặc định cho async - [ ] Thêm
raiseload("*")trong async code để phát hiện missing eager loads - [ ] Tạo
TimestampMixinchocreated_at/updated_at
Session Management
- [ ] Luôn dùng context manager cho session (
async with) - [ ] Đặt
expire_on_commit=Falsecho async sessions - [ ] Commit một lần cuối transaction, không commit trong vòng lặp
- [ ] Dùng
flush()thaycommit()khi cần ID mà chưa muốn kết thúc transaction
Migrations
- [ ] Mọi thay đổi schema qua Alembic migration — không chạy DDL thủ công
- [ ] Luôn viết hàm
downgrade()đầy đủ - [ ] Đặt tên index/constraint rõ ràng, không dùng tên tự sinh
- [ ] Test migration trên bản sao database trước khi áp dụng production
Bài tập luyện tập
🧠 Quiz — Bài 1: Phân tích N+1 Query
Đoạn code sau chạy bao nhiêu queries khi có 500 users trong database?
python
async def get_report(session: AsyncSession):
result = await session.execute(select(User))
users = result.scalars().all()
report = []
for user in users:
stmt = select(func.count()).select_from(Order).where(
Order.user_id == user.id
)
count_result = await session.execute(stmt)
order_count = count_result.scalar_one()
report.append({"email": user.email, "orders": order_count})
return reportA. 1 query B. 2 queries C. 500 queries D. 501 queries
📝 Đáp án và giải thích
Đáp án: D. 501 queries
- 1 query đầu:
SELECT * FROM users - 500 queries trong vòng lặp: mỗi user 1 lần
SELECT count(*) FROM orders WHERE user_id = ?
Cách tối ưu: Dùng một query duy nhất với GROUP BY:
python
async def get_report(session: AsyncSession):
stmt = (
select(
User.email,
func.count(Order.id).label("order_count"),
)
.outerjoin(Order, User.id == Order.user_id)
.group_by(User.id, User.email)
)
result = await session.execute(stmt)
return [
{"email": row.email, "orders": row.order_count}
for row in result.all()
]Từ 501 queries → 1 query duy nhất. Với 500 users, thời gian giảm từ ~2.5 giây xuống ~50ms.
🧠 Quiz — Bài 2: Thiết kế Repository
Viết một ProductRepository với các method sau:
get_by_id(product_id)— trả về product kèm categorysearch(keyword, min_price, max_price)— tìm kiếm với phân trangbulk_update_prices(product_ids, discount_percent)— giảm giá hàng loạt dùng Core
Model:
python
class Product(Base):
__tablename__ = "products"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(200))
price: Mapped[int] = mapped_column(comment="VNĐ")
category_id: Mapped[int] = mapped_column(
ForeignKey("categories.id"), index=True,
)
category: Mapped["Category"] = relationship()📝 Lời giải tham khảo
python
class ProductRepository:
def __init__(self, session: AsyncSession) -> None:
self._session = session
async def get_by_id(self, product_id: int) -> Optional[Product]:
stmt = select(Product).options(selectinload(Product.category)).where(Product.id == product_id)
return (await self._session.execute(stmt)).scalar_one_or_none()
async def search(
self, *, keyword: Optional[str] = None,
min_price: Optional[int] = None, max_price: Optional[int] = None,
offset: int = 0, limit: int = 20,
) -> tuple[Sequence[Product], int]:
conditions = []
if keyword:
conditions.append(Product.name.ilike(f"%{keyword}%"))
if min_price is not None:
conditions.append(Product.price >= min_price)
if max_price is not None:
conditions.append(Product.price <= max_price)
where_clause = and_(*conditions) if conditions else True
total = (await self._session.execute(
select(func.count()).select_from(Product).where(where_clause)
)).scalar_one()
result = await self._session.execute(
select(Product).options(selectinload(Product.category))
.where(where_clause).order_by(Product.name).offset(offset).limit(limit)
)
return result.scalars().all(), total
async def bulk_update_prices(self, product_ids: list[int], discount_percent: int) -> int:
if not product_ids or not (1 <= discount_percent <= 99):
return 0
multiplier = (100 - discount_percent) / 100
stmt = update(Product).where(Product.id.in_(product_ids)).values(
price=func.round(Product.price * multiplier)
)
return (await self._session.execute(stmt)).rowcountĐiểm chính: get_by_id dùng selectinload, search trả về (data, count) cho phân trang, bulk_update_prices dùng Core update() — 1 query thay vì N queries.
🧠 Quiz — Bài 3: Debug Connection Pool
Production monitoring báo cáo: pool_size=20, nhưng thường xuyên thấy 30+ connections active và timeout errors. Phân tích nguyên nhân và đề xuất giải pháp.
📝 Phân tích và giải pháp
Nguyên nhân có thể:
- Session leak — code path không đóng session (thiếu context manager)
- Long-running transactions — query giữ session quá lâu
- max_overflow quá cao — cho phép tạo quá nhiều connection tạm
Giải pháp:
python
engine = create_async_engine(
DATABASE_URL,
pool_size=20, max_overflow=10, pool_timeout=30,
echo_pool="debug",
)
@event.listens_for(engine.sync_engine, "connect")
def set_connection_timeout(dbapi_conn, connection_record):
cursor = dbapi_conn.cursor()
cursor.execute("SET statement_timeout = '30s'")
cursor.close()
async def get_pool_stats() -> dict:
pool = engine.pool
return {"pool_size": pool.size(), "checked_out": pool.checkedout()}Checklist: Bật echo_pool="debug", tìm session không trong context manager, đặt statement_timeout, monitor pool.checkedout().