Giao diện
Thực hành: Deployment Pipeline
🎯 Mục tiêu
🎯 Sau bài thực hành này, bạn sẽ:
- Export và serialize ML model đúng cách
- Xây dựng prediction service đơn giản
- Implement health checks và input validation cho model serving
Mô tả bài tập
Bạn đã train xong model và cần deploy lên production. Bài tập này cover từ model export đến tạo serving layer với validation và monitoring hooks.
Yêu cầu
Bài 1: Model Export & Versioning
Tạo model artifact với metadata đầy đủ.
python
import joblib
import json
from datetime import datetime
class ModelArtifact:
"""Đóng gói model với metadata cho deployment."""
def __init__(self, model, model_name, version, metrics):
self.model = model
self.metadata = {
'model_name': model_name,
'version': version,
'metrics': metrics,
'created_at': datetime.now().isoformat(),
'python_version': None,
'dependencies': {},
}
def save(self, directory):
"""Save model artifact và metadata vào directory.
Files: model.joblib, metadata.json
"""
# TODO: Implement
@classmethod
def load(cls, directory):
"""Load model artifact từ directory."""
# TODO: ImplementBài 2: Input Validation
Implement validation layer cho prediction requests.
python
from dataclasses import dataclass
from typing import List, Optional
@dataclass
class PredictionRequest:
features: dict
@dataclass
class ValidationResult:
is_valid: bool
errors: List[str]
class InputValidator:
"""Validate input features trước khi predict."""
def __init__(self, feature_schema):
"""feature_schema: dict mapping feature_name -> {type, min, max, required}"""
self.schema = feature_schema
def validate(self, request: PredictionRequest) -> ValidationResult:
"""Validate request features theo schema.
Check: required fields, types, ranges.
"""
# TODO: Implement
passBài 3: Simple Prediction Service
Tạo prediction service class tích hợp validation và basic logging.
python
import logging
from collections import defaultdict
class PredictionService:
"""Production prediction service với validation và monitoring."""
def __init__(self, model_artifact, validator):
self.model = model_artifact
self.validator = validator
self.request_count = 0
self.error_count = 0
self.prediction_log = []
self.logger = logging.getLogger('prediction_service')
def predict(self, request):
"""
1. Validate input
2. Transform features
3. Run prediction
4. Log result
Return: prediction result hoặc error
"""
# TODO: Implement
def health_check(self):
"""Return service health status."""
# TODO: Implement
def get_metrics(self):
"""Return service metrics: request_count, error_rate, avg_latency."""
# TODO: ImplementGợi ý
💡 Xem gợi ý
- Bài 1: Dùng
joblib.dump/loadcho model,json.dump/loadcho metadata. Thêmsys.versionvàpkg_resourcescho dependency tracking. - Bài 2: Iterate qua schema, check
required,isinstancecho type, so sánh min/max cho ranges. Collect tất cả errors. - Bài 3: Wrap prediction trong try-except, log mỗi request với timestamp và duration. Health check kiểm tra model loaded + error rate.
Lời giải
✅ Xem lời giải
python
# Bài 1
import os, sys
class ModelArtifact:
def __init__(self, model, model_name, version, metrics):
self.model = model
self.metadata = {
'model_name': model_name,
'version': version,
'metrics': metrics,
'created_at': datetime.now().isoformat(),
'python_version': sys.version,
'dependencies': {},
}
def save(self, directory):
os.makedirs(directory, exist_ok=True)
joblib.dump(self.model, os.path.join(directory, 'model.joblib'))
with open(os.path.join(directory, 'metadata.json'), 'w') as f:
json.dump(self.metadata, f, indent=2)
@classmethod
def load(cls, directory):
model = joblib.load(os.path.join(directory, 'model.joblib'))
with open(os.path.join(directory, 'metadata.json')) as f:
metadata = json.load(f)
artifact = cls(model, metadata['model_name'],
metadata['version'], metadata['metrics'])
artifact.metadata = metadata
return artifact
# Bài 2
class InputValidator:
def __init__(self, feature_schema):
self.schema = feature_schema
def validate(self, request):
errors = []
for name, rules in self.schema.items():
value = request.features.get(name)
if rules.get('required', False) and value is None:
errors.append(f"Missing required feature: {name}")
continue
if value is None:
continue
expected_type = rules.get('type')
if expected_type and not isinstance(value, expected_type):
errors.append(f"{name}: expected {expected_type.__name__}, got {type(value).__name__}")
if 'min' in rules and value < rules['min']:
errors.append(f"{name}: value {value} below minimum {rules['min']}")
if 'max' in rules and value > rules['max']:
errors.append(f"{name}: value {value} above maximum {rules['max']}")
return ValidationResult(is_valid=len(errors) == 0, errors=errors)
# Bài 3
import time
class PredictionService:
def __init__(self, model_artifact, validator):
self.model = model_artifact
self.validator = validator
self.request_count = 0
self.error_count = 0
self.latencies = []
self.logger = logging.getLogger('prediction_service')
def predict(self, request):
self.request_count += 1
start = time.time()
try:
validation = self.validator.validate(request)
if not validation.is_valid:
self.error_count += 1
return {'error': 'Validation failed', 'details': validation.errors}
import pandas as pd
features_df = pd.DataFrame([request.features])
prediction = self.model.model.predict(features_df)
duration = time.time() - start
self.latencies.append(duration)
self.logger.info(f"Prediction: {prediction[0]}, latency: {duration:.4f}s")
return {'prediction': prediction[0], 'latency_ms': round(duration * 1000, 2)}
except Exception as e:
self.error_count += 1
self.logger.error(f"Prediction failed: {e}")
return {'error': str(e)}
def health_check(self):
error_rate = self.error_count / max(self.request_count, 1)
return {
'status': 'healthy' if error_rate < 0.1 else 'degraded',
'model_loaded': self.model is not None,
'total_requests': self.request_count,
'error_rate': round(error_rate, 4),
}
def get_metrics(self):
return {
'request_count': self.request_count,
'error_count': self.error_count,
'error_rate': round(self.error_count / max(self.request_count, 1), 4),
'avg_latency_ms': round(
sum(self.latencies) / max(len(self.latencies), 1) * 1000, 2
),
'p99_latency_ms': round(
sorted(self.latencies)[int(len(self.latencies) * 0.99)] * 1000, 2
) if self.latencies else 0,
}