Skip to content

SQLAlchemy & Databases Backend

Database toolkit mạnh nhất cho Python - từ raw SQL đến full ORM

Learning Outcomes

Sau khi hoàn thành trang này, bạn sẽ:

  • 🎯 Hiểu sự khác biệt giữa ORM vs Core và khi nào dùng cái nào
  • 🎯 Implement async sessions cho high-performance APIs
  • 🎯 Master Alembic migrations cho database schema evolution
  • 🎯 Configure connection pooling cho production workloads
  • 🎯 Tránh các Production Pitfalls phổ biến

SQLAlchemy Architecture

┌─────────────────────────────────────────────────────────┐
│                    SQLAlchemy                           │
├─────────────────────────────────────────────────────────┤
│  ORM Layer (High-level)                                 │
│  - Declarative Models                                   │
│  - Session, Query                                       │
│  - Relationships                                        │
├─────────────────────────────────────────────────────────┤
│  Core Layer (Low-level)                                 │
│  - Table, Column, MetaData                              │
│  - select(), insert(), update()                         │
│  - Connection, Engine                                   │
├─────────────────────────────────────────────────────────┤
│  DBAPI (Database Driver)                                │
│  - psycopg2, asyncpg, pymysql, sqlite3                  │
└─────────────────────────────────────────────────────────┘

ORM vs Core

SQLAlchemy Core (SQL Expression Language)

python
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, select

# Engine và MetaData
engine = create_engine("postgresql://user:pass@localhost/db")
metadata = MetaData()

# Define table
users = Table(
    "users",
    metadata,
    Column("id", Integer, primary_key=True),
    Column("name", String(100)),
    Column("email", String(255), unique=True),
)

# Create tables
metadata.create_all(engine)

# === CRUD với Core ===

# INSERT
with engine.connect() as conn:
    result = conn.execute(
        users.insert().values(name="HPN", email="hpn@test.com")
    )
    conn.commit()
    print(f"Inserted ID: {result.inserted_primary_key[0]}")

# SELECT
with engine.connect() as conn:
    stmt = select(users).where(users.c.name == "HPN")
    result = conn.execute(stmt)
    for row in result:
        print(row.id, row.name, row.email)

# UPDATE
with engine.connect() as conn:
    stmt = users.update().where(users.c.id == 1).values(name="Updated")
    conn.execute(stmt)
    conn.commit()

# DELETE
with engine.connect() as conn:
    stmt = users.delete().where(users.c.id == 1)
    conn.execute(stmt)
    conn.commit()
)

SQLAlchemy ORM (Object Relational Mapper)

from sqlalchemy import create_engine, Column, Integer, String from sqlalchemy.orm import declarative_base, sessionmaker

Base = declarative_base()

Define model

class User(Base): tablename = "users"

id = Column(Integer, primary_key=True)
name = Column(String(100))
email = Column(String(255), unique=True)

def __repr__(self):
    return f"<User(id={self.id}, name={self.name})>"

Engine và Session

engine = create_engine("postgresql://user:pass@localhost/db") Base.metadata.create_all(engine) SessionLocal = sessionmaker(bind=engine)

=== CRUD với ORM ===

CREATE

with SessionLocal() as session: user = User(name="HPN", email="hpn@test.com") session.add(user) session.commit() session.refresh(user) # Get generated ID print(f"Created: {user}")

READ

with SessionLocal() as session: # Get by ID user = session.get(User, 1)

# Query
users = session.query(User).filter(User.name == "HPN").all()

# Modern select() syntax (SQLAlchemy 2.0)
from sqlalchemy import select
stmt = select(User).where(User.name == "HPN")
users = session.scalars(stmt).all()

UPDATE

with SessionLocal() as session: user = session.get(User, 1) user.name = "Updated" session.commit()

DELETE

with SessionLocal() as session: user = session.get(User, 1) session.delete(user) session.commit() ) session.commit()


### Khi nào dùng ORM vs Core?

| Use Case | ORM | Core |
|----------|-----|------|
| CRUD đơn giản | ✅ | ⚠️ |
| Complex queries | ⚠️ | ✅ |
| Bulk operations | ❌ Slow | ✅ Fast |
| Relationships | ✅ Easy | ⚠️ Manual |
| Raw performance | ⚠️ | ✅ |
| Type safety | ✅ | ⚠️ |
| Learning curve | Steeper | Easier |

```python
# ✅ ORM: CRUD, relationships, business logic
user = session.get(User, 1)
user.posts.append(Post(title="New Post"))
session.commit()

# ✅ Core: Bulk operations, complex queries, performance
stmt = users.insert().values([
    {"name": "User1", "email": "u1@test.com"},
    {"name": "User2", "email": "u2@test.com"},
    # ... thousands of rows
])
conn.execute(stmt)

SQLAlchemy 2.0 Style

SQLAlchemy 2.0 introduces new patterns - use these for new projects.

Declarative Models (2.0 Style)

python
from sqlalchemy import String, ForeignKey
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
from typing import List, Optional

class Base(DeclarativeBase):
    pass

class User(Base):
    __tablename__ = "users"
    
    # Mapped columns với type hints
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(100))
    email: Mapped[str] = mapped_column(String(255), unique=True)
    age: Mapped[Optional[int]] = mapped_column(default=None)
    
    # Relationship
    posts: Mapped[List["Post"]] = relationship(back_populates="author")

class Post(Base):
    __tablename__ = "posts"
    
    id: Mapped[int] = mapped_column(primary_key=True)
    title: Mapped[str] = mapped_column(String(200))
    author_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
    
    author: Mapped["User"] = relationship(back_populates="posts")

Select Statements (2.0 Style)

python
from sqlalchemy import select, and_, or_, func
from sqlalchemy.orm import Session

# Basic select
stmt = select(User).where(User.name == "HPN")
users = session.scalars(stmt).all()

# Single result
user = session.scalars(stmt).first()
user = session.scalars(stmt).one()  # Raises if not exactly one

# Filter với multiple conditions
stmt = select(User).where(
    and_(
        User.age >= 18,
        User.email.like("%@gmail.com")
    )
)

# OR conditions
stmt = select(User).where(
    or_(
        User.name == "HPN",
        User.name == "Admin"
    )
)

# Order by
stmt = select(User).order_by(User.name.asc())

# Limit và offset
stmt = select(User).limit(10).offset(20)

# Aggregations
stmt = select(func.count(User.id))
count = session.scalar(stmt)

# Group by
stmt = select(User.age, func.count(User.id)).group_by(User.age)
results = session.execute(stmt).all()

Joins (2.0 Style)

python
from sqlalchemy import select
from sqlalchemy.orm import joinedload, selectinload

# Implicit join
stmt = select(User, Post).join(Post)

# Explicit join condition
stmt = select(User).join(Post, User.id == Post.author_id)

# Left outer join
stmt = select(User).outerjoin(Post)

# Eager loading - joinedload (single query với JOIN)
stmt = select(User).options(joinedload(User.posts))
users = session.scalars(stmt).unique().all()

# Eager loading - selectinload (separate SELECT IN query)
stmt = select(User).options(selectinload(User.posts))
users = session.scalars(stmt).all()

Async SQLAlchemy

Setup Async Engine

python
from sqlalchemy.ext.asyncio import (
    create_async_engine,
    AsyncSession,
    async_sessionmaker
)
from sqlalchemy.orm import DeclarativeBase

# Async engine (note: asyncpg driver)
engine = create_async_engine(
    "postgresql+asyncpg://user:pass@localhost/db",
    echo=True,  # Log SQL statements
    pool_size=5,
    max_overflow=10,
)

# Async session factory
async_session = async_sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False,
)

class Base(DeclarativeBase):
    pass

Async CRUD Operations

python
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

async def create_user(session: AsyncSession, name: str, email: str) -> User:
    user = User(name=name, email=email)
    session.add(user)
    await session.commit()
    await session.refresh(user)
    return user

async def get_user(session: AsyncSession, user_id: int) -> User | None:
    return await session.get(User, user_id)

async def list_users(session: AsyncSession) -> list[User]:
    stmt = select(User).order_by(User.id)
    result = await session.scalars(stmt)
    return result.all()

async def update_user(session: AsyncSession, user_id: int, name: str) -> User:
    user = await session.get(User, user_id)
    if user:
        user.name = name
        await session.commit()
        await session.refresh(user)
    return user

async def delete_user(session: AsyncSession, user_id: int) -> bool:
    user = await session.get(User, user_id)
    if user:
        await session.delete(user)
        await session.commit()
        return True
    return False

FastAPI Integration

python
from fastapi import FastAPI, Depends
from sqlalchemy.ext.asyncio import AsyncSession

app = FastAPI()

# Dependency để get async session
async def get_db() -> AsyncSession:
    async with async_session() as session:
        try:
            yield session
        finally:
            await session.close()

@app.get("/users/{user_id}")
async def get_user_endpoint(
    user_id: int,
    db: AsyncSession = Depends(get_db)
):
    user = await get_user(db, user_id)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return user

@app.post("/users")
async def create_user_endpoint(
    user_data: UserCreate,
    db: AsyncSession = Depends(get_db)
):
    user = await create_user(db, user_data.name, user_data.email)
    return user

Async Relationships (Lazy Loading Issue)

python
from sqlalchemy.orm import selectinload

# ❌ BUG: Lazy loading không work với async
async def get_user_with_posts_wrong(session: AsyncSession, user_id: int):
    user = await session.get(User, user_id)
    # user.posts  # MissingGreenlet error!
    return user

# ✅ FIX: Eager loading với selectinload
async def get_user_with_posts(session: AsyncSession, user_id: int):
    stmt = select(User).options(selectinload(User.posts)).where(User.id == user_id)
    result = await session.scalars(stmt)
    return result.first()

Alembic Migrations

Alembic là migration tool chính thức cho SQLAlchemy.

Setup Alembic

bash
# Install
pip install alembic

# Initialize
alembic init alembic

Configure alembic.ini

ini
# alembic.ini
[alembic]
script_location = alembic
sqlalchemy.url = postgresql://user:pass@localhost/db

Configure env.py

python
# alembic/env.py
from logging.config import fileConfig
from sqlalchemy import engine_from_config, pool
from alembic import context

# Import your models
from app.models import Base

config = context.config
target_metadata = Base.metadata

def run_migrations_offline():
    url = config.get_main_option("sqlalchemy.url")
    context.configure(
        url=url,
        target_metadata=target_metadata,
        literal_binds=True,
    )
    with context.begin_transaction():
        context.run_migrations()

def run_migrations_online():
    connectable = engine_from_config(
        config.get_section(config.config_ini_section),
        prefix="sqlalchemy.",
        poolclass=pool.NullPool,
    )
    with connectable.connect() as connection:
        context.configure(
            connection=connection,
            target_metadata=target_metadata,
        )
        with context.begin_transaction():
            context.run_migrations()

if context.is_offline_mode():
    run_migrations_offline()
else:
    run_migrations_online()

Migration Commands

bash
# Auto-generate migration từ model changes
alembic revision --autogenerate -m "Add users table"

# Create empty migration
alembic revision -m "Custom migration"

# Apply migrations
alembic upgrade head

# Rollback one step
alembic downgrade -1

# Rollback to specific revision
alembic downgrade abc123

# Show current revision
alembic current

# Show migration history
alembic history

Migration Script Example

python
# alembic/versions/abc123_add_users_table.py
"""Add users table

Revision ID: abc123
Revises: 
Create Date: 2024-01-01 00:00:00.000000
"""
from alembic import op
import sqlalchemy as sa

revision = 'abc123'
down_revision = None
branch_labels = None
depends_on = None

def upgrade():
    op.create_table(
        'users',
        sa.Column('id', sa.Integer(), nullable=False),
        sa.Column('name', sa.String(100), nullable=False),
        sa.Column('email', sa.String(255), nullable=False),
        sa.Column('created_at', sa.DateTime(), server_default=sa.func.now()),
        sa.PrimaryKeyConstraint('id'),
        sa.UniqueConstraint('email')
    )
    op.create_index('ix_users_email', 'users', ['email'])

def downgrade():
    op.drop_index('ix_users_email', 'users')
    op.drop_table('users')

Async Alembic

python
# alembic/env.py for async
from sqlalchemy.ext.asyncio import create_async_engine

async def run_migrations_online():
    connectable = create_async_engine(
        config.get_main_option("sqlalchemy.url"),
        poolclass=pool.NullPool,
    )
    
    async with connectable.connect() as connection:
        await connection.run_sync(do_run_migrations)
    
    await connectable.dispose()

def do_run_migrations(connection):
    context.configure(
        connection=connection,
        target_metadata=target_metadata,
    )
    with context.begin_transaction():
        context.run_migrations()

import asyncio
asyncio.run(run_migrations_online())

Connection Pooling

Pool Configuration

python
from sqlalchemy import create_engine

engine = create_engine(
    "postgresql://user:pass@localhost/db",
    
    # Pool size
    pool_size=5,           # Number of persistent connections
    max_overflow=10,       # Extra connections when pool exhausted
    
    # Timeouts
    pool_timeout=30,       # Seconds to wait for connection
    pool_recycle=1800,     # Recycle connections after 30 minutes
    pool_pre_ping=True,    # Test connection before using
    
    # Echo SQL (for debugging)
    echo=False,
    echo_pool=False,
)

Pool Types

from sqlalchemy import create_engine from sqlalchemy.pool import QueuePool, NullPool, StaticPool

QueuePool (default) - Connection pooling

engine = create_engine(url, poolclass=QueuePool)

NullPool - No pooling (new connection each time)

Good for: serverless, testing

engine = create_engine(url, poolclass=NullPool)

StaticPool - Single connection reused

Good for: SQLite in-memory, testing

engine = create_engine("sqlite:///:memory:", poolclass=StaticPool) )

Good for: SQLite in-memory, testing

engine = create_engine("sqlite:///:memory:", poolclass=StaticPool)


### Pool Events

```python
from sqlalchemy import event

@event.listens_for(engine, "connect")
def on_connect(dbapi_connection, connection_record):
    """Called when a new connection is created."""
    print("New connection created")

@event.listens_for(engine, "checkout")
def on_checkout(dbapi_connection, connection_record, connection_proxy):
    """Called when a connection is checked out from pool."""
    print("Connection checked out")

@event.listens_for(engine, "checkin")
def on_checkin(dbapi_connection, connection_record):
    """Called when a connection is returned to pool."""
    print("Connection returned to pool")

Production Pool Settings

python
# Production configuration
engine = create_engine(
    DATABASE_URL,
    
    # Pool sizing (tune based on your workload)
    pool_size=10,          # Base connections
    max_overflow=20,       # Burst capacity
    
    # Health checks
    pool_pre_ping=True,    # Verify connection is alive
    pool_recycle=3600,     # Recycle after 1 hour
    
    # Timeouts
    pool_timeout=30,       # Wait max 30s for connection
    connect_args={
        "connect_timeout": 10,  # Connection timeout
        "options": "-c statement_timeout=30000"  # Query timeout 30s
    }
)

Relationships

One-to-Many

python
from sqlalchemy.orm import relationship, Mapped, mapped_column
from sqlalchemy import ForeignKey
from typing import List

class User(Base):
    __tablename__ = "users"
    
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(100))
    
    # One user has many posts
    posts: Mapped[List["Post"]] = relationship(back_populates="author")

class Post(Base):
    __tablename__ = "posts"
    
    id: Mapped[int] = mapped_column(primary_key=True)
    title: Mapped[str] = mapped_column(String(200))
    author_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
    
    # Many posts belong to one user
    author: Mapped["User"] = relationship(back_populates="posts")

# Usage
user = User(name="HPN")
user.posts.append(Post(title="First Post"))
session.add(user)
session.commit()

Many-to-Many

python
from sqlalchemy import Table, Column, ForeignKey

# Association table
post_tags = Table(
    "post_tags",
    Base.metadata,
    Column("post_id", ForeignKey("posts.id"), primary_key=True),
    Column("tag_id", ForeignKey("tags.id"), primary_key=True),
)

class Post(Base):
    __tablename__ = "posts"
    
    id: Mapped[int] = mapped_column(primary_key=True)
    title: Mapped[str] = mapped_column(String(200))
    
    tags: Mapped[List["Tag"]] = relationship(
        secondary=post_tags,
        back_populates="posts"
    )

class Tag(Base):
    __tablename__ = "tags"
    
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(50), unique=True)
    
    posts: Mapped[List["Post"]] = relationship(
        secondary=post_tags,
        back_populates="tags"
    )

# Usage
post = Post(title="SQLAlchemy Guide")
post.tags.append(Tag(name="python"))
post.tags.append(Tag(name="database"))

One-to-One

python
class User(Base):
    __tablename__ = "users"
    
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(100))
    
    # One-to-one với uselist=False
    profile: Mapped["Profile"] = relationship(
        back_populates="user",
        uselist=False
    )

class Profile(Base):
    __tablename__ = "profiles"
    
    id: Mapped[int] = mapped_column(primary_key=True)
    user_id: Mapped[int] = mapped_column(ForeignKey("users.id"), unique=True)
    bio: Mapped[str] = mapped_column(String(500))
    
    user: Mapped["User"] = relationship(back_populates="profile")

Production Pitfalls

Pitfall 1: N+1 Query Problem

python
# ❌ BUG: N+1 queries
users = session.query(User).all()
for user in users:
    print(user.posts)  # Each access = 1 query!

# ✅ FIX: Eager loading
from sqlalchemy.orm import selectinload

stmt = select(User).options(selectinload(User.posts))
users = session.scalars(stmt).all()
for user in users:
    print(user.posts)  # No additional queries

Pitfall 2: Session Scope Issues

python
# ❌ BUG: Using object outside session
def get_user():
    with SessionLocal() as session:
        user = session.get(User, 1)
        return user  # Session closed!

user = get_user()
print(user.posts)  # DetachedInstanceError!

# ✅ FIX 1: Eager load before returning
def get_user():
    with SessionLocal() as session:
        stmt = select(User).options(selectinload(User.posts)).where(User.id == 1)
        return session.scalars(stmt).first()

# ✅ FIX 2: expire_on_commit=False
SessionLocal = sessionmaker(bind=engine, expire_on_commit=False)

Pitfall 3: Forgetting to Commit

python
# ❌ BUG: Changes not persisted
with SessionLocal() as session:
    user = User(name="HPN")
    session.add(user)
    # Forgot session.commit()!

# ✅ FIX: Always commit or use context manager
with SessionLocal() as session:
    user = User(name="HPN")
    session.add(user)
    session.commit()

# Or use begin() context manager
with SessionLocal.begin() as session:
    user = User(name="HPN")
    session.add(user)
    # Auto-commit on exit

Pitfall 4: Connection Leaks

python
# ❌ BUG: Connection not returned to pool
session = SessionLocal()
user = session.get(User, 1)
# session.close() never called!

# ✅ FIX: Always use context manager
with SessionLocal() as session:
    user = session.get(User, 1)
# Auto-closed

# Or try/finally
session = SessionLocal()
try:
    user = session.get(User, 1)
finally:
    session.close()

Pitfall 5: Async Lazy Loading

python
# ❌ BUG: Lazy loading in async context
async def get_user(session: AsyncSession, user_id: int):
    user = await session.get(User, user_id)
    return user.posts  # MissingGreenlet error!

# ✅ FIX: Always eager load in async
async def get_user(session: AsyncSession, user_id: int):
    stmt = select(User).options(selectinload(User.posts)).where(User.id == user_id)
    result = await session.scalars(stmt)
    user = result.first()
    return user.posts if user else []

from sqlalchemy import create_engine, select, and_, or_, func from sqlalchemy.orm import Session, sessionmaker, selectinload from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession

=== ENGINE ===

engine = create_engine("postgresql://user:pass@localhost/db") async_engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")

=== SESSION ===

SessionLocal = sessionmaker(bind=engine) with SessionLocal() as session: pass

=== SELECT ===

stmt = select(User).where(User.id == 1) user = session.scalars(stmt).first()

=== FILTER ===

stmt = select(User).where(and_(User.age >= 18, User.active == True)) stmt = select(User).where(or_(User.role == "admin", User.role == "mod"))

=== EAGER LOADING ===

stmt = select(User).options(selectinload(User.posts))

=== AGGREGATION ===

stmt = select(func.count(User.id)) count = session.scalar(stmt)

=== ASYNC ===

async with async_session() as session: result = await session.scalars(stmt) users = result.all() )) async with async_session() as session: result = await session.scalars(stmt) users = result.all()


---

## Cross-links

- **Prerequisites**: [SQL Fundamentals](/sql/), [Asyncio](/python/concurrency/asyncio)
- **Related**: [FastAPI Deep Dive](/python/backend/fastapi) - API integration
- **See Also**: [API Design Patterns](/python/backend/api-design) - REST patterns
- **See Also**: [Production Deployment](/python/backend/deployment) - Database in production

---

<style>
.hero-subtitle {
  font-size: 1.3rem;
  color: var(--vp-c-text-2);
  margin-bottom: 2rem;
  font-weight: 300;
}
</style>