Giao diện
Production Deployment Backend
Deploy Python APIs production-ready - scalable, observable, resilient
Learning Outcomes
Sau khi hoàn thành trang này, bạn sẽ:
- 🎯 Configure Gunicorn/Uvicorn cho production workloads
- 🎯 Containerize applications với Docker best practices
- 🎯 Implement health checks cho orchestration
- 🎯 Setup observability với logging, metrics, tracing
- 🎯 Tránh các Production Pitfalls phổ biến
ASGI Servers
Uvicorn (Single Process)
bash
# Development
uvicorn main:app --reload --host 0.0.0.0 --port 8000
# Production (single worker)
uvicorn main:app --host 0.0.0.0 --port 8000 --workers 1python
# main.py
from fastapi import FastAPI
app = FastAPI()
@app.get("/")
async def root():
return {"message": "Hello World"}
# Programmatic startup
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"main:app",
host="0.0.0.0",
port=8000,
reload=False, # Disable in production
workers=1,
log_level="info",
access_log=True,
)Gunicorn + Uvicorn Workers (Production)
bash
# Install
pip install gunicorn uvicorn[standard]
# Run with Uvicorn workers
gunicorn main:app \
--workers 4 \
--worker-class uvicorn.workers.UvicornWorker \
--bind 0.0.0.0:8000 \
--timeout 120 \
--keep-alive 5 \
--access-logfile - \
--error-logfile -Gunicorn Configuration File
python
# gunicorn.conf.py
import multiprocessing
# Server socket
bind = "0.0.0.0:8000"
backlog = 2048
# Worker processes
workers = multiprocessing.cpu_count() * 2 + 1
worker_class = "uvicorn.workers.UvicornWorker"
worker_connections = 1000
timeout = 120
keepalive = 5
# Restart workers after this many requests (prevent memory leaks)
max_requests = 1000
max_requests_jitter = 50
# Logging
accesslog = "-"
errorlog = "-"
loglevel = "info"
access_log_format = '%(h)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s" %(D)s'
# Process naming
proc_name = "myapp"
# Server mechanics
daemon = False
pidfile = None
umask = 0
user = None
group = None
tmp_upload_dir = None
# SSL (if terminating SSL at app level)
# keyfile = "/path/to/key.pem"
# certfile = "/path/to/cert.pem"
# Hooks
def on_starting(server):
print("Starting Gunicorn server...")
def on_exit(server):
print("Shutting down Gunicorn server...")
def worker_exit(server, worker):
print(f"Worker {worker.pid} exiting...")bash
# Run with config file
gunicorn main:app -c gunicorn.conf.pyWorker Count Formula
python
# CPU-bound: workers = CPU cores + 1
# I/O-bound (typical web apps): workers = (2 * CPU cores) + 1
import multiprocessing
# For async apps (FastAPI)
workers = multiprocessing.cpu_count() * 2 + 1
# For sync apps (Flask, Django)
workers = multiprocessing.cpu_count() + 1
# Memory consideration: Each worker uses ~50-200MB
# 4GB RAM server: max ~20 workersDocker Containerization
Basic Dockerfile
dockerfile
# Dockerfile
FROM python:3.12-slim
# Set environment variables
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
PIP_NO_CACHE_DIR=1 \
PIP_DISABLE_PIP_VERSION_CHECK=1
WORKDIR /app
# Install dependencies first (better caching)
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Create non-root user
RUN adduser --disabled-password --gecos "" appuser && \
chown -R appuser:appuser /app
USER appuser
# Expose port
EXPOSE 8000
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
# Run application
CMD ["gunicorn", "main:app", "-c", "gunicorn.conf.py"]Multi-stage Build (Optimized)
dockerfile
# Dockerfile.production
# Stage 1: Build
FROM python:3.12-slim as builder
WORKDIR /app
# Install build dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
&& rm -rf /var/lib/apt/lists/*
# Create virtual environment
RUN python -m venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"
# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Stage 2: Runtime
FROM python:3.12-slim as runtime
# Copy virtual environment from builder
COPY --from=builder /opt/venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"
WORKDIR /app
# Copy application code
COPY . .
# Create non-root user
RUN adduser --disabled-password --gecos "" appuser && \
chown -R appuser:appuser /app
USER appuser
EXPOSE 8000
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD python -c "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')" || exit 1
CMD ["gunicorn", "main:app", "-c", "gunicorn.conf.py"]
)Docker Compose
docker-compose.yml
version: "3.9"
services: api: build: context: . dockerfile: Dockerfile.production ports: - "8000:8000" environment: - DATABASE_URL=postgresql://user:pass@db:5432/mydb - REDIS_URL=redis://redis:6379/0 - LOG_LEVEL=info depends_on: db: condition: service_healthy redis: condition: service_started healthcheck: test: ["CMD", "curl", "-f", "http://localhost:8000/health"] interval: 30s timeout: 10s retries: 3 start_period: 10s deploy: resources: limits: cpus: "2" memory: 1G reservations: cpus: "0.5" memory: 256M restart: unless-stopped
db: image: postgres:15-alpine environment: - POSTGRES_USER=user - POSTGRES_PASSWORD=pass - POSTGRES_DB=mydb volumes: - postgres_data:/var/lib/postgresql/data healthcheck: test: ["CMD-SHELL", "pg_isready -U user -d mydb"] interval: 10s timeout: 5s retries: 5
redis: image: redis:7-alpine volumes: - redis_data:/data
volumes: postgres_data: redis_data: ] redis_data:
### .dockerignore.dockerignore
.git .gitignore .env .env.* pycache *.pyc *.pyo *.pyd .Python venv/ .venv/ .egg-info/ dist/ build/ .pytest_cache/ .coverage htmlcov/ .mypy_cache/ .ruff_cache/ .log Dockerfile docker-compose README.md docs/ tests/
---
## Health Checks
### Basic Health Endpoint
```python
from fastapi import FastAPI, status
from fastapi.responses import JSONResponse
app = FastAPI()
@app.get("/health")
async def health_check():
return {"status": "healthy"}
@app.get("/health/live")
async def liveness():
"""Kubernetes liveness probe - is the app running?"""
return {"status": "alive"}
@app.get("/health/ready")
async def readiness():
"""Kubernetes readiness probe - is the app ready to serve traffic?"""
return {"status": "ready"}Comprehensive Health Check
from fastapi import FastAPI, status from fastapi.responses import JSONResponse from sqlalchemy import text from redis import Redis import asyncio
app = FastAPI()
async def check_database() -> dict: try: async with async_session() as session: await session.execute(text("SELECT 1")) return {"status": "healthy", "latency_ms": 0} except Exception as e: return
async def check_redis() -> dict: try: redis = Redis.from_url(REDIS_URL) redis.ping() return {"status": "healthy"} except Exception as e: return
async def check_external_api() -> dict: try: async with httpx.AsyncClient() as client: response = await client.get("https://api.example.com/health", timeout=5) return {"status": "healthy" if response.status_code == 200 else "degraded"} except Exception as e: return
@app.get("/health") async def health_check(): checks = await asyncio.gather( check_database(), check_redis(), check_external_api(), return_exceptions=True )
results = {
"database": checks[0],
"redis": checks[1],
"external_api": checks[2],
}
# Determine overall status
all_healthy = all(
isinstance(c, dict) and c.get("status") == "healthy"
for c in checks
)
status_code = status.HTTP_200_OK if all_healthy else status.HTTP_503_SERVICE_UNAVAILABLE
return JSONResponse(
status_code=status_code,
content={
"status": "healthy" if all_healthy else "unhealthy",
"checks": results,
"version": "1.0.0",
}
)
) } )
### Kubernetes Probes
```yaml
# kubernetes deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: myapp
spec:
template:
spec:
containers:
- name: myapp
image: myapp:latest
ports:
- containerPort: 8000
# Liveness: Is the container running?
# Restart if fails
livenessProbe:
httpGet:
path: /health/live
port: 8000
initialDelaySeconds: 10
periodSeconds: 15
timeoutSeconds: 5
failureThreshold: 3
# Readiness: Is the container ready to serve traffic?
# Remove from load balancer if fails
readinessProbe:
httpGet:
path: /health/ready
port: 8000
initialDelaySeconds: 5
periodSeconds: 10
timeoutSeconds: 5
failureThreshold: 3
# Startup: Is the container started?
# Disable liveness/readiness until startup succeeds
startupProbe:
httpGet:
path: /health/live
port: 8000
initialDelaySeconds: 0
periodSeconds: 5
timeoutSeconds: 5
failureThreshold: 30 # 30 * 5s = 150s max startup timeObservability
Structured Logging
python
import logging
import json
from datetime import datetime
from fastapi import FastAPI, Request
from uuid import uuid4
# JSON formatter
class JSONFormatter(logging.Formatter):
def format(self, record):
log_record = {
"timestamp": datetime.utcnow().isoformat(),
"level": record.levelname,
"message": record.getMessage(),
"logger": record.name,
"module": record.module,
"function": record.funcName,
"line": record.lineno,
}
# Add extra fields
if hasattr(record, "request_id"):
log_record["request_id"] = record.request_id
if hasattr(record, "user_id"):
log_record["user_id"] = record.user_id
if record.exc_info:
log_record["exception"] = self.formatException(record.exc_info)
return json.dumps(log_record)
# Configure logging
def setup_logging():
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(handler)
setup_logging()
logger = logging.getLogger(__name__)
# Request logging middleware
@app.middleware("http")
async def log_requests(request: Request, call_next):
request_id = str(uuid4())
request.state.request_id = request_id
# Add request_id to log context
logger.info(
f"Request started: {request.method} {request.url.path}",
extra={"request_id": request_id}
)
start_time = time.time()
response = await call_next(request)
duration = time.time() - start_time
logger.info(
f"Request completed: {response.status_code} in {duration:.3f}s",
extra={"request_id": request_id}
)
response.headers["X-Request-ID"] = request_id
return responseMetrics with Prometheus
python
from prometheus_client import Counter, Histogram, generate_latest, CONTENT_TYPE_LATEST
from fastapi import FastAPI, Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
import time
# Define metrics
REQUEST_COUNT = Counter(
"http_requests_total",
"Total HTTP requests",
["method", "endpoint", "status"]
)
REQUEST_LATENCY = Histogram(
"http_request_duration_seconds",
"HTTP request latency",
["method", "endpoint"],
buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]
)
ACTIVE_REQUESTS = Gauge(
"http_requests_active",
"Active HTTP requests"
)
# Metrics middleware
class MetricsMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
ACTIVE_REQUESTS.inc()
start_time = time.time()
response = await call_next(request)
duration = time.time() - start_time
# Record metrics
endpoint = request.url.path
REQUEST_COUNT.labels(
method=request.method,
endpoint=endpoint,
status=response.status_code
).inc()
REQUEST_LATENCY.labels(
method=request.method,
endpoint=endpoint
).observe(duration)
ACTIVE_REQUESTS.dec()
return response
app.add_middleware(MetricsMiddleware)
# Metrics endpoint
@app.get("/metrics")
async def metrics():
return Response(
content=generate_latest(),
media_type=CONTENT_TYPE_LATEST
)Distributed Tracing with OpenTelemetry
python
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
# Setup tracing
def setup_tracing():
# Create tracer provider
provider = TracerProvider()
# Configure exporter (Jaeger, Zipkin, etc.)
exporter = OTLPSpanExporter(
endpoint="http://jaeger:4317",
insecure=True
)
# Add span processor
provider.add_span_processor(BatchSpanProcessor(exporter))
# Set global tracer provider
trace.set_tracer_provider(provider)
setup_tracing()
# Instrument FastAPI
FastAPIInstrumentor.instrument_app(app)
# Instrument SQLAlchemy
SQLAlchemyInstrumentor().instrument(engine=engine)
# Instrument HTTP client
HTTPXClientInstrumentor().instrument()
# Manual span creation
tracer = trace.get_tracer(__name__)
@app.get("/users/{user_id}")
async def get_user(user_id: int):
with tracer.start_as_current_span("get_user") as span:
span.set_attribute("user.id", user_id)
with tracer.start_as_current_span("database_query"):
user = await get_user_from_db(user_id)
with tracer.start_as_current_span("serialize_response"):
response = UserResponse.from_orm(user)
return responseconfig.py
import os
class Settings: # Application APP_NAME: str = "myapp" APP_VERSION: str = "1.0.0" ENVIRONMENT: str = os.getenv("ENVIRONMENT", "development")
# Logging
LOG_LEVEL: str = os.getenv("LOG_LEVEL", "INFO")
LOG_FORMAT: str = "json" # json or text
# Metrics
METRICS_ENABLED: bool = os.getenv("METRICS_ENABLED", "true").lower() == "true"
# Tracing
TRACING_ENABLED: bool = os.getenv("TRACING_ENABLED", "true").lower() == "true"
OTLP_ENDPOINT: str = os.getenv("OTLP_ENDPOINT", "http://localhost:4317")
# Sampling rate (0.0 to 1.0)
TRACE_SAMPLE_RATE: float = float(os.getenv("TRACE_SAMPLE_RATE", "1.0"))
settings = Settings() ) TRACE_SAMPLE_RATE: float = float(os.getenv("TRACE_SAMPLE_RATE", "1.0"))
settings = Settings()
---
## Graceful Shutdown
```python
from fastapi import FastAPI
from contextlib import asynccontextmanager
import signal
import asyncio
# Shared state
shutdown_event = asyncio.Event()
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
print("Starting up...")
await init_database()
await init_redis()
yield
# Shutdown
print("Shutting down...")
shutdown_event.set()
# Wait for in-flight requests
await asyncio.sleep(5)
# Cleanup
await close_database()
await close_redis()
print("Shutdown complete")
app = FastAPI(lifespan=lifespan)
# Signal handlers
def handle_sigterm(signum, frame):
print("Received SIGTERM, initiating graceful shutdown...")
shutdown_event.set()
signal.signal(signal.SIGTERM, handle_sigterm)
# Check shutdown in long-running tasks
@app.get("/long-task")
async def long_task():
for i in range(100):
if shutdown_event.is_set():
return {"status": "cancelled", "progress": i}
await asyncio.sleep(0.1)
return {"status": "completed"}Production Pitfalls
Pitfall 1: Running as Root
dockerfile
# ❌ BAD: Running as root
FROM python:3.12-slim
COPY . /app
CMD ["python", "main.py"]
# ✅ GOOD: Non-root user
FROM python:3.12-slim
RUN adduser --disabled-password --gecos "" appuser
USER appuser
COPY --chown=appuser:appuser . /app
CMD ["python", "main.py"]Pitfall 2: No Resource Limits
yaml
# ❌ BAD: No limits
containers:
- name: myapp
image: myapp:latest
# ✅ GOOD: Resource limits
containers:
- name: myapp
image: myapp:latest
resources:
limits:
cpu: "2"
memory: "1Gi"
requests:
cpu: "500m"
memory: "256Mi"Pitfall 3: Secrets in Environment Variables
python
# ❌ BAD: Secrets visible in process list
DATABASE_URL=postgresql://user:password@host/db
# ✅ GOOD: Use secrets management
# Kubernetes secrets, AWS Secrets Manager, HashiCorp Vault
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
database_url: str
class Config:
secrets_dir = "/run/secrets" # Docker/K8s secretsPitfall 4: No Graceful Shutdown
python
# ❌ BAD: Abrupt shutdown
if __name__ == "__main__":
uvicorn.run(app)
# ✅ GOOD: Graceful shutdown with timeout
if __name__ == "__main__":
uvicorn.run(
app,
host="0.0.0.0",
port=8000,
timeout_graceful_shutdown=30, # Wait 30s for in-flight requests
)Pitfall 5: Missing Health Checks
dockerfile
# ❌ BAD: No health check
FROM python:3.12-slim
CMD ["python", "main.py"]
# ✅ GOOD: Health check
FROM python:3.12-slim
HEALTHCHECK --interval=30s --timeout=10s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
CMD ["python", "main.py"]Quick Reference
bash
# === UVICORN ===
uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4
# === GUNICORN ===
gunicorn main:app -w 4 -k uvicorn.workers.UvicornWorker -b 0.0.0.0:8000
# === DOCKER ===
docker build -t myapp .
docker run -p 8000:8000 myapp
# === DOCKER COMPOSE ===
docker-compose up -d
docker-compose logs -f api
# === WORKER COUNT ===
# Async apps: (2 * CPU) + 1
# Sync apps: CPU + 1python
# === HEALTH CHECK ===
@app.get("/health")
async def health():
return {"status": "healthy"}
# === STRUCTURED LOGGING ===
logger.info("Request", extra={"request_id": "abc123"})
# === METRICS ===
REQUEST_COUNT.labels(method="GET", endpoint="/users").inc()
# === GRACEFUL SHUTDOWN ===
@asynccontextmanager
async def lifespan(app):
yield
await cleanup()Cross-links
- Prerequisites: FastAPI Deep Dive, Docker
- Related: API Design Patterns - API best practices
- See Also: Kubernetes - Container orchestration
- See Also: Logging & Debugging - Python logging