Skip to content

Production Deployment Backend

Deploy Python APIs production-ready - scalable, observable, resilient

Learning Outcomes

Sau khi hoàn thành trang này, bạn sẽ:

  • 🎯 Configure Gunicorn/Uvicorn cho production workloads
  • 🎯 Containerize applications với Docker best practices
  • 🎯 Implement health checks cho orchestration
  • 🎯 Setup observability với logging, metrics, tracing
  • 🎯 Tránh các Production Pitfalls phổ biến

ASGI Servers

Uvicorn (Single Process)

bash
# Development
uvicorn main:app --reload --host 0.0.0.0 --port 8000

# Production (single worker)
uvicorn main:app --host 0.0.0.0 --port 8000 --workers 1
python
# main.py
from fastapi import FastAPI

app = FastAPI()

@app.get("/")
async def root():
    return {"message": "Hello World"}

# Programmatic startup
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(
        "main:app",
        host="0.0.0.0",
        port=8000,
        reload=False,  # Disable in production
        workers=1,
        log_level="info",
        access_log=True,
    )

Gunicorn + Uvicorn Workers (Production)

bash
# Install
pip install gunicorn uvicorn[standard]

# Run with Uvicorn workers
gunicorn main:app \
    --workers 4 \
    --worker-class uvicorn.workers.UvicornWorker \
    --bind 0.0.0.0:8000 \
    --timeout 120 \
    --keep-alive 5 \
    --access-logfile - \
    --error-logfile -

Gunicorn Configuration File

python
# gunicorn.conf.py
import multiprocessing

# Server socket
bind = "0.0.0.0:8000"
backlog = 2048

# Worker processes
workers = multiprocessing.cpu_count() * 2 + 1
worker_class = "uvicorn.workers.UvicornWorker"
worker_connections = 1000
timeout = 120
keepalive = 5

# Restart workers after this many requests (prevent memory leaks)
max_requests = 1000
max_requests_jitter = 50

# Logging
accesslog = "-"
errorlog = "-"
loglevel = "info"
access_log_format = '%(h)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s" %(D)s'

# Process naming
proc_name = "myapp"

# Server mechanics
daemon = False
pidfile = None
umask = 0
user = None
group = None
tmp_upload_dir = None

# SSL (if terminating SSL at app level)
# keyfile = "/path/to/key.pem"
# certfile = "/path/to/cert.pem"

# Hooks
def on_starting(server):
    print("Starting Gunicorn server...")

def on_exit(server):
    print("Shutting down Gunicorn server...")

def worker_exit(server, worker):
    print(f"Worker {worker.pid} exiting...")
bash
# Run with config file
gunicorn main:app -c gunicorn.conf.py

Worker Count Formula

python
# CPU-bound: workers = CPU cores + 1
# I/O-bound (typical web apps): workers = (2 * CPU cores) + 1

import multiprocessing

# For async apps (FastAPI)
workers = multiprocessing.cpu_count() * 2 + 1

# For sync apps (Flask, Django)
workers = multiprocessing.cpu_count() + 1

# Memory consideration: Each worker uses ~50-200MB
# 4GB RAM server: max ~20 workers

Docker Containerization

Basic Dockerfile

dockerfile
# Dockerfile
FROM python:3.12-slim

# Set environment variables
ENV PYTHONDONTWRITEBYTECODE=1 \
    PYTHONUNBUFFERED=1 \
    PIP_NO_CACHE_DIR=1 \
    PIP_DISABLE_PIP_VERSION_CHECK=1

WORKDIR /app

# Install dependencies first (better caching)
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY . .

# Create non-root user
RUN adduser --disabled-password --gecos "" appuser && \
    chown -R appuser:appuser /app
USER appuser

# Expose port
EXPOSE 8000

# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
    CMD curl -f http://localhost:8000/health || exit 1

# Run application
CMD ["gunicorn", "main:app", "-c", "gunicorn.conf.py"]

Multi-stage Build (Optimized)

dockerfile
# Dockerfile.production
# Stage 1: Build
FROM python:3.12-slim as builder

WORKDIR /app

# Install build dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
    build-essential \
    && rm -rf /var/lib/apt/lists/*

# Create virtual environment
RUN python -m venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"

# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Stage 2: Runtime
FROM python:3.12-slim as runtime

# Copy virtual environment from builder
COPY --from=builder /opt/venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"

WORKDIR /app

# Copy application code
COPY . .

# Create non-root user
RUN adduser --disabled-password --gecos "" appuser && \
    chown -R appuser:appuser /app
USER appuser

EXPOSE 8000

HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
    CMD python -c "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')" || exit 1

CMD ["gunicorn", "main:app", "-c", "gunicorn.conf.py"]
)

Docker Compose

docker-compose.yml

version: "3.9"

services: api: build: context: . dockerfile: Dockerfile.production ports: - "8000:8000" environment: - DATABASE_URL=postgresql://user:pass@db:5432/mydb - REDIS_URL=redis://redis:6379/0 - LOG_LEVEL=info depends_on: db: condition: service_healthy redis: condition: service_started healthcheck: test: ["CMD", "curl", "-f", "http://localhost:8000/health"] interval: 30s timeout: 10s retries: 3 start_period: 10s deploy: resources: limits: cpus: "2" memory: 1G reservations: cpus: "0.5" memory: 256M restart: unless-stopped

db: image: postgres:15-alpine environment: - POSTGRES_USER=user - POSTGRES_PASSWORD=pass - POSTGRES_DB=mydb volumes: - postgres_data:/var/lib/postgresql/data healthcheck: test: ["CMD-SHELL", "pg_isready -U user -d mydb"] interval: 10s timeout: 5s retries: 5

redis: image: redis:7-alpine volumes: - redis_data:/data

volumes: postgres_data: redis_data: ] redis_data:


### .dockerignore

.dockerignore

.git .gitignore .env .env.* pycache *.pyc *.pyo *.pyd .Python venv/ .venv/ .egg-info/ dist/ build/ .pytest_cache/ .coverage htmlcov/ .mypy_cache/ .ruff_cache/ .log Dockerfile docker-compose README.md docs/ tests/


---

## Health Checks

### Basic Health Endpoint

```python
from fastapi import FastAPI, status
from fastapi.responses import JSONResponse

app = FastAPI()

@app.get("/health")
async def health_check():
    return {"status": "healthy"}

@app.get("/health/live")
async def liveness():
    """Kubernetes liveness probe - is the app running?"""
    return {"status": "alive"}

@app.get("/health/ready")
async def readiness():
    """Kubernetes readiness probe - is the app ready to serve traffic?"""
    return {"status": "ready"}

Comprehensive Health Check

from fastapi import FastAPI, status from fastapi.responses import JSONResponse from sqlalchemy import text from redis import Redis import asyncio

app = FastAPI()

async def check_database() -> dict: try: async with async_session() as session: await session.execute(text("SELECT 1")) return {"status": "healthy", "latency_ms": 0} except Exception as e: return

async def check_redis() -> dict: try: redis = Redis.from_url(REDIS_URL) redis.ping() return {"status": "healthy"} except Exception as e: return

async def check_external_api() -> dict: try: async with httpx.AsyncClient() as client: response = await client.get("https://api.example.com/health", timeout=5) return {"status": "healthy" if response.status_code == 200 else "degraded"} except Exception as e: return

@app.get("/health") async def health_check(): checks = await asyncio.gather( check_database(), check_redis(), check_external_api(), return_exceptions=True )

results = {
    "database": checks[0],
    "redis": checks[1],
    "external_api": checks[2],
}

# Determine overall status
all_healthy = all(
    isinstance(c, dict) and c.get("status") == "healthy"
    for c in checks
)

status_code = status.HTTP_200_OK if all_healthy else status.HTTP_503_SERVICE_UNAVAILABLE

return JSONResponse(
    status_code=status_code,
    content={
        "status": "healthy" if all_healthy else "unhealthy",
        "checks": results,
        "version": "1.0.0",
    }
)

) } )


### Kubernetes Probes

```yaml
# kubernetes deployment
apiVersion: apps/v1
kind: Deployment
metadata:
  name: myapp
spec:
  template:
    spec:
      containers:
        - name: myapp
          image: myapp:latest
          ports:
            - containerPort: 8000
          
          # Liveness: Is the container running?
          # Restart if fails
          livenessProbe:
            httpGet:
              path: /health/live
              port: 8000
            initialDelaySeconds: 10
            periodSeconds: 15
            timeoutSeconds: 5
            failureThreshold: 3
          
          # Readiness: Is the container ready to serve traffic?
          # Remove from load balancer if fails
          readinessProbe:
            httpGet:
              path: /health/ready
              port: 8000
            initialDelaySeconds: 5
            periodSeconds: 10
            timeoutSeconds: 5
            failureThreshold: 3
          
          # Startup: Is the container started?
          # Disable liveness/readiness until startup succeeds
          startupProbe:
            httpGet:
              path: /health/live
              port: 8000
            initialDelaySeconds: 0
            periodSeconds: 5
            timeoutSeconds: 5
            failureThreshold: 30  # 30 * 5s = 150s max startup time

Observability

Structured Logging

python
import logging
import json
from datetime import datetime
from fastapi import FastAPI, Request
from uuid import uuid4

# JSON formatter
class JSONFormatter(logging.Formatter):
    def format(self, record):
        log_record = {
            "timestamp": datetime.utcnow().isoformat(),
            "level": record.levelname,
            "message": record.getMessage(),
            "logger": record.name,
            "module": record.module,
            "function": record.funcName,
            "line": record.lineno,
        }
        
        # Add extra fields
        if hasattr(record, "request_id"):
            log_record["request_id"] = record.request_id
        if hasattr(record, "user_id"):
            log_record["user_id"] = record.user_id
        if record.exc_info:
            log_record["exception"] = self.formatException(record.exc_info)
        
        return json.dumps(log_record)

# Configure logging
def setup_logging():
    handler = logging.StreamHandler()
    handler.setFormatter(JSONFormatter())
    
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    logger.addHandler(handler)

setup_logging()
logger = logging.getLogger(__name__)

# Request logging middleware
@app.middleware("http")
async def log_requests(request: Request, call_next):
    request_id = str(uuid4())
    request.state.request_id = request_id
    
    # Add request_id to log context
    logger.info(
        f"Request started: {request.method} {request.url.path}",
        extra={"request_id": request_id}
    )
    
    start_time = time.time()
    response = await call_next(request)
    duration = time.time() - start_time
    
    logger.info(
        f"Request completed: {response.status_code} in {duration:.3f}s",
        extra={"request_id": request_id}
    )
    
    response.headers["X-Request-ID"] = request_id
    return response

Metrics with Prometheus

python
from prometheus_client import Counter, Histogram, generate_latest, CONTENT_TYPE_LATEST
from fastapi import FastAPI, Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
import time

# Define metrics
REQUEST_COUNT = Counter(
    "http_requests_total",
    "Total HTTP requests",
    ["method", "endpoint", "status"]
)

REQUEST_LATENCY = Histogram(
    "http_request_duration_seconds",
    "HTTP request latency",
    ["method", "endpoint"],
    buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]
)

ACTIVE_REQUESTS = Gauge(
    "http_requests_active",
    "Active HTTP requests"
)

# Metrics middleware
class MetricsMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        ACTIVE_REQUESTS.inc()
        
        start_time = time.time()
        response = await call_next(request)
        duration = time.time() - start_time
        
        # Record metrics
        endpoint = request.url.path
        REQUEST_COUNT.labels(
            method=request.method,
            endpoint=endpoint,
            status=response.status_code
        ).inc()
        
        REQUEST_LATENCY.labels(
            method=request.method,
            endpoint=endpoint
        ).observe(duration)
        
        ACTIVE_REQUESTS.dec()
        return response

app.add_middleware(MetricsMiddleware)

# Metrics endpoint
@app.get("/metrics")
async def metrics():
    return Response(
        content=generate_latest(),
        media_type=CONTENT_TYPE_LATEST
    )

Distributed Tracing with OpenTelemetry

python
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor

# Setup tracing
def setup_tracing():
    # Create tracer provider
    provider = TracerProvider()
    
    # Configure exporter (Jaeger, Zipkin, etc.)
    exporter = OTLPSpanExporter(
        endpoint="http://jaeger:4317",
        insecure=True
    )
    
    # Add span processor
    provider.add_span_processor(BatchSpanProcessor(exporter))
    
    # Set global tracer provider
    trace.set_tracer_provider(provider)

setup_tracing()

# Instrument FastAPI
FastAPIInstrumentor.instrument_app(app)

# Instrument SQLAlchemy
SQLAlchemyInstrumentor().instrument(engine=engine)

# Instrument HTTP client
HTTPXClientInstrumentor().instrument()

# Manual span creation
tracer = trace.get_tracer(__name__)

@app.get("/users/{user_id}")
async def get_user(user_id: int):
    with tracer.start_as_current_span("get_user") as span:
        span.set_attribute("user.id", user_id)
        
        with tracer.start_as_current_span("database_query"):
            user = await get_user_from_db(user_id)
        
        with tracer.start_as_current_span("serialize_response"):
            response = UserResponse.from_orm(user)
        
        return response

config.py

import os

class Settings: # Application APP_NAME: str = "myapp" APP_VERSION: str = "1.0.0" ENVIRONMENT: str = os.getenv("ENVIRONMENT", "development")

# Logging
LOG_LEVEL: str = os.getenv("LOG_LEVEL", "INFO")
LOG_FORMAT: str = "json"  # json or text

# Metrics
METRICS_ENABLED: bool = os.getenv("METRICS_ENABLED", "true").lower() == "true"

# Tracing
TRACING_ENABLED: bool = os.getenv("TRACING_ENABLED", "true").lower() == "true"
OTLP_ENDPOINT: str = os.getenv("OTLP_ENDPOINT", "http://localhost:4317")

# Sampling rate (0.0 to 1.0)
TRACE_SAMPLE_RATE: float = float(os.getenv("TRACE_SAMPLE_RATE", "1.0"))

settings = Settings() ) TRACE_SAMPLE_RATE: float = float(os.getenv("TRACE_SAMPLE_RATE", "1.0"))

settings = Settings()


---

## Graceful Shutdown

```python
from fastapi import FastAPI
from contextlib import asynccontextmanager
import signal
import asyncio

# Shared state
shutdown_event = asyncio.Event()

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    print("Starting up...")
    await init_database()
    await init_redis()
    
    yield
    
    # Shutdown
    print("Shutting down...")
    shutdown_event.set()
    
    # Wait for in-flight requests
    await asyncio.sleep(5)
    
    # Cleanup
    await close_database()
    await close_redis()
    print("Shutdown complete")

app = FastAPI(lifespan=lifespan)

# Signal handlers
def handle_sigterm(signum, frame):
    print("Received SIGTERM, initiating graceful shutdown...")
    shutdown_event.set()

signal.signal(signal.SIGTERM, handle_sigterm)

# Check shutdown in long-running tasks
@app.get("/long-task")
async def long_task():
    for i in range(100):
        if shutdown_event.is_set():
            return {"status": "cancelled", "progress": i}
        await asyncio.sleep(0.1)
    return {"status": "completed"}

Production Pitfalls

Pitfall 1: Running as Root

dockerfile
# ❌ BAD: Running as root
FROM python:3.12-slim
COPY . /app
CMD ["python", "main.py"]

# ✅ GOOD: Non-root user
FROM python:3.12-slim
RUN adduser --disabled-password --gecos "" appuser
USER appuser
COPY --chown=appuser:appuser . /app
CMD ["python", "main.py"]

Pitfall 2: No Resource Limits

yaml
# ❌ BAD: No limits
containers:
  - name: myapp
    image: myapp:latest

# ✅ GOOD: Resource limits
containers:
  - name: myapp
    image: myapp:latest
    resources:
      limits:
        cpu: "2"
        memory: "1Gi"
      requests:
        cpu: "500m"
        memory: "256Mi"

Pitfall 3: Secrets in Environment Variables

python
# ❌ BAD: Secrets visible in process list
DATABASE_URL=postgresql://user:password@host/db

# ✅ GOOD: Use secrets management
# Kubernetes secrets, AWS Secrets Manager, HashiCorp Vault
from pydantic_settings import BaseSettings

class Settings(BaseSettings):
    database_url: str
    
    class Config:
        secrets_dir = "/run/secrets"  # Docker/K8s secrets

Pitfall 4: No Graceful Shutdown

python
# ❌ BAD: Abrupt shutdown
if __name__ == "__main__":
    uvicorn.run(app)

# ✅ GOOD: Graceful shutdown with timeout
if __name__ == "__main__":
    uvicorn.run(
        app,
        host="0.0.0.0",
        port=8000,
        timeout_graceful_shutdown=30,  # Wait 30s for in-flight requests
    )

Pitfall 5: Missing Health Checks

dockerfile
# ❌ BAD: No health check
FROM python:3.12-slim
CMD ["python", "main.py"]

# ✅ GOOD: Health check
FROM python:3.12-slim
HEALTHCHECK --interval=30s --timeout=10s --retries=3 \
    CMD curl -f http://localhost:8000/health || exit 1
CMD ["python", "main.py"]

Quick Reference

bash
# === UVICORN ===
uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4

# === GUNICORN ===
gunicorn main:app -w 4 -k uvicorn.workers.UvicornWorker -b 0.0.0.0:8000

# === DOCKER ===
docker build -t myapp .
docker run -p 8000:8000 myapp

# === DOCKER COMPOSE ===
docker-compose up -d
docker-compose logs -f api

# === WORKER COUNT ===
# Async apps: (2 * CPU) + 1
# Sync apps: CPU + 1
python
# === HEALTH CHECK ===
@app.get("/health")
async def health():
    return {"status": "healthy"}

# === STRUCTURED LOGGING ===
logger.info("Request", extra={"request_id": "abc123"})

# === METRICS ===
REQUEST_COUNT.labels(method="GET", endpoint="/users").inc()

# === GRACEFUL SHUTDOWN ===
@asynccontextmanager
async def lifespan(app):
    yield
    await cleanup()