Giao diện
SQLAlchemy & Databases Backend
Database toolkit mạnh nhất cho Python - từ raw SQL đến full ORM
Learning Outcomes
Sau khi hoàn thành trang này, bạn sẽ:
- 🎯 Hiểu sự khác biệt giữa ORM vs Core và khi nào dùng cái nào
- 🎯 Implement async sessions cho high-performance APIs
- 🎯 Master Alembic migrations cho database schema evolution
- 🎯 Configure connection pooling cho production workloads
- 🎯 Tránh các Production Pitfalls phổ biến
SQLAlchemy Architecture
┌─────────────────────────────────────────────────────────┐
│ SQLAlchemy │
├─────────────────────────────────────────────────────────┤
│ ORM Layer (High-level) │
│ - Declarative Models │
│ - Session, Query │
│ - Relationships │
├─────────────────────────────────────────────────────────┤
│ Core Layer (Low-level) │
│ - Table, Column, MetaData │
│ - select(), insert(), update() │
│ - Connection, Engine │
├─────────────────────────────────────────────────────────┤
│ DBAPI (Database Driver) │
│ - psycopg2, asyncpg, pymysql, sqlite3 │
└─────────────────────────────────────────────────────────┘ORM vs Core
SQLAlchemy Core (SQL Expression Language)
python
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, select
# Engine và MetaData
engine = create_engine("postgresql://user:pass@localhost/db")
metadata = MetaData()
# Define table
users = Table(
"users",
metadata,
Column("id", Integer, primary_key=True),
Column("name", String(100)),
Column("email", String(255), unique=True),
)
# Create tables
metadata.create_all(engine)
# === CRUD với Core ===
# INSERT
with engine.connect() as conn:
result = conn.execute(
users.insert().values(name="HPN", email="hpn@test.com")
)
conn.commit()
print(f"Inserted ID: {result.inserted_primary_key[0]}")
# SELECT
with engine.connect() as conn:
stmt = select(users).where(users.c.name == "HPN")
result = conn.execute(stmt)
for row in result:
print(row.id, row.name, row.email)
# UPDATE
with engine.connect() as conn:
stmt = users.update().where(users.c.id == 1).values(name="Updated")
conn.execute(stmt)
conn.commit()
# DELETE
with engine.connect() as conn:
stmt = users.delete().where(users.c.id == 1)
conn.execute(stmt)
conn.commit()
)SQLAlchemy ORM (Object Relational Mapper)
from sqlalchemy import create_engine, Column, Integer, String from sqlalchemy.orm import declarative_base, sessionmaker
Base = declarative_base()
Define model
class User(Base): tablename = "users"
id = Column(Integer, primary_key=True)
name = Column(String(100))
email = Column(String(255), unique=True)
def __repr__(self):
return f"<User(id={self.id}, name={self.name})>"
Engine và Session
engine = create_engine("postgresql://user:pass@localhost/db") Base.metadata.create_all(engine) SessionLocal = sessionmaker(bind=engine)
=== CRUD với ORM ===
CREATE
with SessionLocal() as session: user = User(name="HPN", email="hpn@test.com") session.add(user) session.commit() session.refresh(user) # Get generated ID print(f"Created: {user}")
READ
with SessionLocal() as session: # Get by ID user = session.get(User, 1)
# Query
users = session.query(User).filter(User.name == "HPN").all()
# Modern select() syntax (SQLAlchemy 2.0)
from sqlalchemy import select
stmt = select(User).where(User.name == "HPN")
users = session.scalars(stmt).all()
UPDATE
with SessionLocal() as session: user = session.get(User, 1) user.name = "Updated" session.commit()
DELETE
with SessionLocal() as session: user = session.get(User, 1) session.delete(user) session.commit() ) session.commit()
### Khi nào dùng ORM vs Core?
| Use Case | ORM | Core |
|----------|-----|------|
| CRUD đơn giản | ✅ | ⚠️ |
| Complex queries | ⚠️ | ✅ |
| Bulk operations | ❌ Slow | ✅ Fast |
| Relationships | ✅ Easy | ⚠️ Manual |
| Raw performance | ⚠️ | ✅ |
| Type safety | ✅ | ⚠️ |
| Learning curve | Steeper | Easier |
```python
# ✅ ORM: CRUD, relationships, business logic
user = session.get(User, 1)
user.posts.append(Post(title="New Post"))
session.commit()
# ✅ Core: Bulk operations, complex queries, performance
stmt = users.insert().values([
{"name": "User1", "email": "u1@test.com"},
{"name": "User2", "email": "u2@test.com"},
# ... thousands of rows
])
conn.execute(stmt)SQLAlchemy 2.0 Style
SQLAlchemy 2.0 introduces new patterns - use these for new projects.
Declarative Models (2.0 Style)
python
from sqlalchemy import String, ForeignKey
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
from typing import List, Optional
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
# Mapped columns với type hints
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(100))
email: Mapped[str] = mapped_column(String(255), unique=True)
age: Mapped[Optional[int]] = mapped_column(default=None)
# Relationship
posts: Mapped[List["Post"]] = relationship(back_populates="author")
class Post(Base):
__tablename__ = "posts"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(200))
author_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
author: Mapped["User"] = relationship(back_populates="posts")Select Statements (2.0 Style)
python
from sqlalchemy import select, and_, or_, func
from sqlalchemy.orm import Session
# Basic select
stmt = select(User).where(User.name == "HPN")
users = session.scalars(stmt).all()
# Single result
user = session.scalars(stmt).first()
user = session.scalars(stmt).one() # Raises if not exactly one
# Filter với multiple conditions
stmt = select(User).where(
and_(
User.age >= 18,
User.email.like("%@gmail.com")
)
)
# OR conditions
stmt = select(User).where(
or_(
User.name == "HPN",
User.name == "Admin"
)
)
# Order by
stmt = select(User).order_by(User.name.asc())
# Limit và offset
stmt = select(User).limit(10).offset(20)
# Aggregations
stmt = select(func.count(User.id))
count = session.scalar(stmt)
# Group by
stmt = select(User.age, func.count(User.id)).group_by(User.age)
results = session.execute(stmt).all()Joins (2.0 Style)
python
from sqlalchemy import select
from sqlalchemy.orm import joinedload, selectinload
# Implicit join
stmt = select(User, Post).join(Post)
# Explicit join condition
stmt = select(User).join(Post, User.id == Post.author_id)
# Left outer join
stmt = select(User).outerjoin(Post)
# Eager loading - joinedload (single query với JOIN)
stmt = select(User).options(joinedload(User.posts))
users = session.scalars(stmt).unique().all()
# Eager loading - selectinload (separate SELECT IN query)
stmt = select(User).options(selectinload(User.posts))
users = session.scalars(stmt).all()Async SQLAlchemy
Setup Async Engine
python
from sqlalchemy.ext.asyncio import (
create_async_engine,
AsyncSession,
async_sessionmaker
)
from sqlalchemy.orm import DeclarativeBase
# Async engine (note: asyncpg driver)
engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/db",
echo=True, # Log SQL statements
pool_size=5,
max_overflow=10,
)
# Async session factory
async_session = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
)
class Base(DeclarativeBase):
passAsync CRUD Operations
python
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
async def create_user(session: AsyncSession, name: str, email: str) -> User:
user = User(name=name, email=email)
session.add(user)
await session.commit()
await session.refresh(user)
return user
async def get_user(session: AsyncSession, user_id: int) -> User | None:
return await session.get(User, user_id)
async def list_users(session: AsyncSession) -> list[User]:
stmt = select(User).order_by(User.id)
result = await session.scalars(stmt)
return result.all()
async def update_user(session: AsyncSession, user_id: int, name: str) -> User:
user = await session.get(User, user_id)
if user:
user.name = name
await session.commit()
await session.refresh(user)
return user
async def delete_user(session: AsyncSession, user_id: int) -> bool:
user = await session.get(User, user_id)
if user:
await session.delete(user)
await session.commit()
return True
return FalseFastAPI Integration
python
from fastapi import FastAPI, Depends
from sqlalchemy.ext.asyncio import AsyncSession
app = FastAPI()
# Dependency để get async session
async def get_db() -> AsyncSession:
async with async_session() as session:
try:
yield session
finally:
await session.close()
@app.get("/users/{user_id}")
async def get_user_endpoint(
user_id: int,
db: AsyncSession = Depends(get_db)
):
user = await get_user(db, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
@app.post("/users")
async def create_user_endpoint(
user_data: UserCreate,
db: AsyncSession = Depends(get_db)
):
user = await create_user(db, user_data.name, user_data.email)
return userAsync Relationships (Lazy Loading Issue)
python
from sqlalchemy.orm import selectinload
# ❌ BUG: Lazy loading không work với async
async def get_user_with_posts_wrong(session: AsyncSession, user_id: int):
user = await session.get(User, user_id)
# user.posts # MissingGreenlet error!
return user
# ✅ FIX: Eager loading với selectinload
async def get_user_with_posts(session: AsyncSession, user_id: int):
stmt = select(User).options(selectinload(User.posts)).where(User.id == user_id)
result = await session.scalars(stmt)
return result.first()Alembic Migrations
Alembic là migration tool chính thức cho SQLAlchemy.
Setup Alembic
bash
# Install
pip install alembic
# Initialize
alembic init alembicConfigure alembic.ini
ini
# alembic.ini
[alembic]
script_location = alembic
sqlalchemy.url = postgresql://user:pass@localhost/dbConfigure env.py
python
# alembic/env.py
from logging.config import fileConfig
from sqlalchemy import engine_from_config, pool
from alembic import context
# Import your models
from app.models import Base
config = context.config
target_metadata = Base.metadata
def run_migrations_offline():
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata,
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()Migration Commands
bash
# Auto-generate migration từ model changes
alembic revision --autogenerate -m "Add users table"
# Create empty migration
alembic revision -m "Custom migration"
# Apply migrations
alembic upgrade head
# Rollback one step
alembic downgrade -1
# Rollback to specific revision
alembic downgrade abc123
# Show current revision
alembic current
# Show migration history
alembic historyMigration Script Example
python
# alembic/versions/abc123_add_users_table.py
"""Add users table
Revision ID: abc123
Revises:
Create Date: 2024-01-01 00:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
revision = 'abc123'
down_revision = None
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'users',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('name', sa.String(100), nullable=False),
sa.Column('email', sa.String(255), nullable=False),
sa.Column('created_at', sa.DateTime(), server_default=sa.func.now()),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('email')
)
op.create_index('ix_users_email', 'users', ['email'])
def downgrade():
op.drop_index('ix_users_email', 'users')
op.drop_table('users')Async Alembic
python
# alembic/env.py for async
from sqlalchemy.ext.asyncio import create_async_engine
async def run_migrations_online():
connectable = create_async_engine(
config.get_main_option("sqlalchemy.url"),
poolclass=pool.NullPool,
)
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
def do_run_migrations(connection):
context.configure(
connection=connection,
target_metadata=target_metadata,
)
with context.begin_transaction():
context.run_migrations()
import asyncio
asyncio.run(run_migrations_online())Connection Pooling
Pool Configuration
python
from sqlalchemy import create_engine
engine = create_engine(
"postgresql://user:pass@localhost/db",
# Pool size
pool_size=5, # Number of persistent connections
max_overflow=10, # Extra connections when pool exhausted
# Timeouts
pool_timeout=30, # Seconds to wait for connection
pool_recycle=1800, # Recycle connections after 30 minutes
pool_pre_ping=True, # Test connection before using
# Echo SQL (for debugging)
echo=False,
echo_pool=False,
)Pool Types
from sqlalchemy import create_engine from sqlalchemy.pool import QueuePool, NullPool, StaticPool
QueuePool (default) - Connection pooling
engine = create_engine(url, poolclass=QueuePool)
NullPool - No pooling (new connection each time)
Good for: serverless, testing
engine = create_engine(url, poolclass=NullPool)
StaticPool - Single connection reused
Good for: SQLite in-memory, testing
engine = create_engine("sqlite:///:memory:", poolclass=StaticPool) )
Good for: SQLite in-memory, testing
engine = create_engine("sqlite:///:memory:", poolclass=StaticPool)
### Pool Events
```python
from sqlalchemy import event
@event.listens_for(engine, "connect")
def on_connect(dbapi_connection, connection_record):
"""Called when a new connection is created."""
print("New connection created")
@event.listens_for(engine, "checkout")
def on_checkout(dbapi_connection, connection_record, connection_proxy):
"""Called when a connection is checked out from pool."""
print("Connection checked out")
@event.listens_for(engine, "checkin")
def on_checkin(dbapi_connection, connection_record):
"""Called when a connection is returned to pool."""
print("Connection returned to pool")Production Pool Settings
python
# Production configuration
engine = create_engine(
DATABASE_URL,
# Pool sizing (tune based on your workload)
pool_size=10, # Base connections
max_overflow=20, # Burst capacity
# Health checks
pool_pre_ping=True, # Verify connection is alive
pool_recycle=3600, # Recycle after 1 hour
# Timeouts
pool_timeout=30, # Wait max 30s for connection
connect_args={
"connect_timeout": 10, # Connection timeout
"options": "-c statement_timeout=30000" # Query timeout 30s
}
)Relationships
One-to-Many
python
from sqlalchemy.orm import relationship, Mapped, mapped_column
from sqlalchemy import ForeignKey
from typing import List
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(100))
# One user has many posts
posts: Mapped[List["Post"]] = relationship(back_populates="author")
class Post(Base):
__tablename__ = "posts"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(200))
author_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
# Many posts belong to one user
author: Mapped["User"] = relationship(back_populates="posts")
# Usage
user = User(name="HPN")
user.posts.append(Post(title="First Post"))
session.add(user)
session.commit()Many-to-Many
python
from sqlalchemy import Table, Column, ForeignKey
# Association table
post_tags = Table(
"post_tags",
Base.metadata,
Column("post_id", ForeignKey("posts.id"), primary_key=True),
Column("tag_id", ForeignKey("tags.id"), primary_key=True),
)
class Post(Base):
__tablename__ = "posts"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(200))
tags: Mapped[List["Tag"]] = relationship(
secondary=post_tags,
back_populates="posts"
)
class Tag(Base):
__tablename__ = "tags"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(50), unique=True)
posts: Mapped[List["Post"]] = relationship(
secondary=post_tags,
back_populates="tags"
)
# Usage
post = Post(title="SQLAlchemy Guide")
post.tags.append(Tag(name="python"))
post.tags.append(Tag(name="database"))One-to-One
python
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(100))
# One-to-one với uselist=False
profile: Mapped["Profile"] = relationship(
back_populates="user",
uselist=False
)
class Profile(Base):
__tablename__ = "profiles"
id: Mapped[int] = mapped_column(primary_key=True)
user_id: Mapped[int] = mapped_column(ForeignKey("users.id"), unique=True)
bio: Mapped[str] = mapped_column(String(500))
user: Mapped["User"] = relationship(back_populates="profile")Production Pitfalls
Pitfall 1: N+1 Query Problem
python
# ❌ BUG: N+1 queries
users = session.query(User).all()
for user in users:
print(user.posts) # Each access = 1 query!
# ✅ FIX: Eager loading
from sqlalchemy.orm import selectinload
stmt = select(User).options(selectinload(User.posts))
users = session.scalars(stmt).all()
for user in users:
print(user.posts) # No additional queriesPitfall 2: Session Scope Issues
python
# ❌ BUG: Using object outside session
def get_user():
with SessionLocal() as session:
user = session.get(User, 1)
return user # Session closed!
user = get_user()
print(user.posts) # DetachedInstanceError!
# ✅ FIX 1: Eager load before returning
def get_user():
with SessionLocal() as session:
stmt = select(User).options(selectinload(User.posts)).where(User.id == 1)
return session.scalars(stmt).first()
# ✅ FIX 2: expire_on_commit=False
SessionLocal = sessionmaker(bind=engine, expire_on_commit=False)Pitfall 3: Forgetting to Commit
python
# ❌ BUG: Changes not persisted
with SessionLocal() as session:
user = User(name="HPN")
session.add(user)
# Forgot session.commit()!
# ✅ FIX: Always commit or use context manager
with SessionLocal() as session:
user = User(name="HPN")
session.add(user)
session.commit()
# Or use begin() context manager
with SessionLocal.begin() as session:
user = User(name="HPN")
session.add(user)
# Auto-commit on exitPitfall 4: Connection Leaks
python
# ❌ BUG: Connection not returned to pool
session = SessionLocal()
user = session.get(User, 1)
# session.close() never called!
# ✅ FIX: Always use context manager
with SessionLocal() as session:
user = session.get(User, 1)
# Auto-closed
# Or try/finally
session = SessionLocal()
try:
user = session.get(User, 1)
finally:
session.close()Pitfall 5: Async Lazy Loading
python
# ❌ BUG: Lazy loading in async context
async def get_user(session: AsyncSession, user_id: int):
user = await session.get(User, user_id)
return user.posts # MissingGreenlet error!
# ✅ FIX: Always eager load in async
async def get_user(session: AsyncSession, user_id: int):
stmt = select(User).options(selectinload(User.posts)).where(User.id == user_id)
result = await session.scalars(stmt)
user = result.first()
return user.posts if user else []from sqlalchemy import create_engine, select, and_, or_, func from sqlalchemy.orm import Session, sessionmaker, selectinload from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
=== ENGINE ===
engine = create_engine("postgresql://user:pass@localhost/db") async_engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")
=== SESSION ===
SessionLocal = sessionmaker(bind=engine) with SessionLocal() as session: pass
=== SELECT ===
stmt = select(User).where(User.id == 1) user = session.scalars(stmt).first()
=== FILTER ===
stmt = select(User).where(and_(User.age >= 18, User.active == True)) stmt = select(User).where(or_(User.role == "admin", User.role == "mod"))
=== EAGER LOADING ===
stmt = select(User).options(selectinload(User.posts))
=== AGGREGATION ===
stmt = select(func.count(User.id)) count = session.scalar(stmt)
=== ASYNC ===
async with async_session() as session: result = await session.scalars(stmt) users = result.all() )) async with async_session() as session: result = await session.scalars(stmt) users = result.all()
---
## Cross-links
- **Prerequisites**: [SQL Fundamentals](/sql/), [Asyncio](/python/concurrency/asyncio)
- **Related**: [FastAPI Deep Dive](/python/backend/fastapi) - API integration
- **See Also**: [API Design Patterns](/python/backend/api-design) - REST patterns
- **See Also**: [Production Deployment](/python/backend/deployment) - Database in production
---
<style>
.hero-subtitle {
font-size: 1.3rem;
color: var(--vp-c-text-2);
margin-bottom: 2rem;
font-weight: 300;
}
</style>