Skip to content

Thực hành: Deployment Pipeline

🎯 Mục tiêu

🎯 Sau bài thực hành này, bạn sẽ:

  • Export và serialize ML model đúng cách
  • Xây dựng prediction service đơn giản
  • Implement health checks và input validation cho model serving

Mô tả bài tập

Bạn đã train xong model và cần deploy lên production. Bài tập này cover từ model export đến tạo serving layer với validation và monitoring hooks.

Yêu cầu

Bài 1: Model Export & Versioning

Tạo model artifact với metadata đầy đủ.

python
import joblib
import json
from datetime import datetime

class ModelArtifact:
    """Đóng gói model với metadata cho deployment."""

    def __init__(self, model, model_name, version, metrics):
        self.model = model
        self.metadata = {
            'model_name': model_name,
            'version': version,
            'metrics': metrics,
            'created_at': datetime.now().isoformat(),
            'python_version': None,
            'dependencies': {},
        }

    def save(self, directory):
        """Save model artifact và metadata vào directory.
        Files: model.joblib, metadata.json
        """
        # TODO: Implement

    @classmethod
    def load(cls, directory):
        """Load model artifact từ directory."""
        # TODO: Implement

Bài 2: Input Validation

Implement validation layer cho prediction requests.

python
from dataclasses import dataclass
from typing import List, Optional

@dataclass
class PredictionRequest:
    features: dict

@dataclass
class ValidationResult:
    is_valid: bool
    errors: List[str]

class InputValidator:
    """Validate input features trước khi predict."""

    def __init__(self, feature_schema):
        """feature_schema: dict mapping feature_name -> {type, min, max, required}"""
        self.schema = feature_schema

    def validate(self, request: PredictionRequest) -> ValidationResult:
        """Validate request features theo schema.
        Check: required fields, types, ranges.
        """
        # TODO: Implement
        pass

Bài 3: Simple Prediction Service

Tạo prediction service class tích hợp validation và basic logging.

python
import logging
from collections import defaultdict

class PredictionService:
    """Production prediction service với validation và monitoring."""

    def __init__(self, model_artifact, validator):
        self.model = model_artifact
        self.validator = validator
        self.request_count = 0
        self.error_count = 0
        self.prediction_log = []
        self.logger = logging.getLogger('prediction_service')

    def predict(self, request):
        """
        1. Validate input
        2. Transform features
        3. Run prediction
        4. Log result
        Return: prediction result hoặc error
        """
        # TODO: Implement

    def health_check(self):
        """Return service health status."""
        # TODO: Implement

    def get_metrics(self):
        """Return service metrics: request_count, error_rate, avg_latency."""
        # TODO: Implement

Gợi ý

💡 Xem gợi ý
  • Bài 1: Dùng joblib.dump/load cho model, json.dump/load cho metadata. Thêm sys.versionpkg_resources cho dependency tracking.
  • Bài 2: Iterate qua schema, check required, isinstance cho type, so sánh min/max cho ranges. Collect tất cả errors.
  • Bài 3: Wrap prediction trong try-except, log mỗi request với timestamp và duration. Health check kiểm tra model loaded + error rate.

Lời giải

✅ Xem lời giải
python
# Bài 1
import os, sys

class ModelArtifact:
    def __init__(self, model, model_name, version, metrics):
        self.model = model
        self.metadata = {
            'model_name': model_name,
            'version': version,
            'metrics': metrics,
            'created_at': datetime.now().isoformat(),
            'python_version': sys.version,
            'dependencies': {},
        }

    def save(self, directory):
        os.makedirs(directory, exist_ok=True)
        joblib.dump(self.model, os.path.join(directory, 'model.joblib'))
        with open(os.path.join(directory, 'metadata.json'), 'w') as f:
            json.dump(self.metadata, f, indent=2)

    @classmethod
    def load(cls, directory):
        model = joblib.load(os.path.join(directory, 'model.joblib'))
        with open(os.path.join(directory, 'metadata.json')) as f:
            metadata = json.load(f)
        artifact = cls(model, metadata['model_name'],
                       metadata['version'], metadata['metrics'])
        artifact.metadata = metadata
        return artifact

# Bài 2
class InputValidator:
    def __init__(self, feature_schema):
        self.schema = feature_schema

    def validate(self, request):
        errors = []
        for name, rules in self.schema.items():
            value = request.features.get(name)
            if rules.get('required', False) and value is None:
                errors.append(f"Missing required feature: {name}")
                continue
            if value is None:
                continue
            expected_type = rules.get('type')
            if expected_type and not isinstance(value, expected_type):
                errors.append(f"{name}: expected {expected_type.__name__}, got {type(value).__name__}")
            if 'min' in rules and value < rules['min']:
                errors.append(f"{name}: value {value} below minimum {rules['min']}")
            if 'max' in rules and value > rules['max']:
                errors.append(f"{name}: value {value} above maximum {rules['max']}")
        return ValidationResult(is_valid=len(errors) == 0, errors=errors)

# Bài 3
import time

class PredictionService:
    def __init__(self, model_artifact, validator):
        self.model = model_artifact
        self.validator = validator
        self.request_count = 0
        self.error_count = 0
        self.latencies = []
        self.logger = logging.getLogger('prediction_service')

    def predict(self, request):
        self.request_count += 1
        start = time.time()
        try:
            validation = self.validator.validate(request)
            if not validation.is_valid:
                self.error_count += 1
                return {'error': 'Validation failed', 'details': validation.errors}
            import pandas as pd
            features_df = pd.DataFrame([request.features])
            prediction = self.model.model.predict(features_df)
            duration = time.time() - start
            self.latencies.append(duration)
            self.logger.info(f"Prediction: {prediction[0]}, latency: {duration:.4f}s")
            return {'prediction': prediction[0], 'latency_ms': round(duration * 1000, 2)}
        except Exception as e:
            self.error_count += 1
            self.logger.error(f"Prediction failed: {e}")
            return {'error': str(e)}

    def health_check(self):
        error_rate = self.error_count / max(self.request_count, 1)
        return {
            'status': 'healthy' if error_rate < 0.1 else 'degraded',
            'model_loaded': self.model is not None,
            'total_requests': self.request_count,
            'error_rate': round(error_rate, 4),
        }

    def get_metrics(self):
        return {
            'request_count': self.request_count,
            'error_count': self.error_count,
            'error_rate': round(self.error_count / max(self.request_count, 1), 4),
            'avg_latency_ms': round(
                sum(self.latencies) / max(len(self.latencies), 1) * 1000, 2
            ),
            'p99_latency_ms': round(
                sorted(self.latencies)[int(len(self.latencies) * 0.99)] * 1000, 2
            ) if self.latencies else 0,
        }

Liên kết liên quan