Skip to content

Thực hành: Training Pipeline

🎯 Mục tiêu

🎯 Sau bài thực hành này, bạn sẽ:

  • Xây dựng pipeline hoàn chỉnh từ preprocessing đến model training
  • Sử dụng sklearn Pipeline để đảm bảo reproducibility
  • Implement proper data splitting và experiment tracking

Mô tả bài tập

Xây dựng production-ready training pipeline cho bài toán dự đoán giá nhà. Pipeline cần xử lý cả numerical và categorical features, có proper validation, và export model artifact.

Yêu cầu

Bài 1: Preprocessing Pipeline

Tạo sklearn Pipeline xử lý mixed data types.

python
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
import pandas as pd
import numpy as np

numerical_features = ['area', 'rooms', 'age', 'distance_to_center']
categorical_features = ['district', 'type', 'condition']

def build_preprocessor(numerical_features, categorical_features):
    """Tạo ColumnTransformer xử lý numerical và categorical features.
    Numerical: impute median + standard scaling
    Categorical: impute most_frequent + one-hot encoding
    """
    # TODO: Implement
    pass

Bài 2: Full Training Pipeline với Validation

Kết hợp preprocessing + model thành pipeline hoàn chỉnh, đánh giá đúng cách.

python
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.model_selection import cross_val_score, train_test_split
from sklearn.metrics import mean_absolute_error, r2_score
import joblib

def train_pipeline(X, y, numerical_features, categorical_features):
    """
    1. Split train/test (80/20)
    2. Build pipeline: preprocessor + model
    3. Cross-validate trên train set
    4. Final evaluation trên test set
    5. Return: pipeline, cv_scores, test_metrics
    """
    # TODO: Implement
    pass

Bài 3: Experiment Logging

Implement simple experiment tracking (không cần MLflow).

python
import json
from datetime import datetime

class ExperimentTracker:
    """Simple experiment tracker lưu kết quả vào JSON file."""

    def __init__(self, experiment_name):
        self.experiment_name = experiment_name
        self.runs = []

    def log_run(self, model_name, params, metrics, artifacts=None):
        """Log một experiment run."""
        # TODO: Implement

    def get_best_run(self, metric='test_mae', minimize=True):
        """Trả về run có metric tốt nhất."""
        # TODO: Implement

    def save(self, filepath='experiments.json'):
        """Save tất cả runs ra file."""
        # TODO: Implement

Gợi ý

💡 Xem gợi ý
  • Bài 1: Dùng ColumnTransformer với transformers list. Mỗi transformer là tuple (name, Pipeline, columns).
  • Bài 2: Pipeline([('preprocessor', preprocessor), ('model', model)]) kết hợp tất cả. Split trước khi fit để tránh leakage.
  • Bài 3: Mỗi run cần: timestamp, model_name, params, metrics, duration. Dùng json.dumps với default=str cho datetime.

Lời giải

✅ Xem lời giải
python
# Bài 1
def build_preprocessor(numerical_features, categorical_features):
    num_pipeline = Pipeline([
        ('imputer', SimpleImputer(strategy='median')),
        ('scaler', StandardScaler()),
    ])
    cat_pipeline = Pipeline([
        ('imputer', SimpleImputer(strategy='most_frequent')),
        ('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False)),
    ])
    return ColumnTransformer([
        ('num', num_pipeline, numerical_features),
        ('cat', cat_pipeline, categorical_features),
    ])

# Bài 2
def train_pipeline(X, y, numerical_features, categorical_features):
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42
    )
    preprocessor = build_preprocessor(numerical_features, categorical_features)
    pipeline = Pipeline([
        ('preprocessor', preprocessor),
        ('model', GradientBoostingRegressor(n_estimators=200, random_state=42)),
    ])
    cv_scores = cross_val_score(
        pipeline, X_train, y_train, cv=5,
        scoring='neg_mean_absolute_error'
    )
    pipeline.fit(X_train, y_train)
    y_pred = pipeline.predict(X_test)
    test_metrics = {
        'mae': mean_absolute_error(y_test, y_pred),
        'r2': r2_score(y_test, y_pred),
    }
    joblib.dump(pipeline, 'model_pipeline.joblib')
    return pipeline, -cv_scores, test_metrics

# Bài 3
class ExperimentTracker:
    def __init__(self, experiment_name):
        self.experiment_name = experiment_name
        self.runs = []

    def log_run(self, model_name, params, metrics, artifacts=None):
        run = {
            'run_id': len(self.runs) + 1,
            'timestamp': datetime.now().isoformat(),
            'model_name': model_name,
            'params': params,
            'metrics': metrics,
            'artifacts': artifacts or [],
        }
        self.runs.append(run)
        return run

    def get_best_run(self, metric='test_mae', minimize=True):
        if not self.runs:
            return None
        return sorted(
            self.runs,
            key=lambda r: r['metrics'].get(metric, float('inf')),
            reverse=not minimize
        )[0]

    def save(self, filepath='experiments.json'):
        data = {
            'experiment': self.experiment_name,
            'total_runs': len(self.runs),
            'runs': self.runs,
        }
        with open(filepath, 'w') as f:
            json.dump(data, f, indent=2, default=str)

Liên kết liên quan