Giao diện
Thực hành: Training Pipeline
🎯 Mục tiêu
🎯 Sau bài thực hành này, bạn sẽ:
- Xây dựng pipeline hoàn chỉnh từ preprocessing đến model training
- Sử dụng sklearn Pipeline để đảm bảo reproducibility
- Implement proper data splitting và experiment tracking
Mô tả bài tập
Xây dựng production-ready training pipeline cho bài toán dự đoán giá nhà. Pipeline cần xử lý cả numerical và categorical features, có proper validation, và export model artifact.
Yêu cầu
Bài 1: Preprocessing Pipeline
Tạo sklearn Pipeline xử lý mixed data types.
python
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
import pandas as pd
import numpy as np
numerical_features = ['area', 'rooms', 'age', 'distance_to_center']
categorical_features = ['district', 'type', 'condition']
def build_preprocessor(numerical_features, categorical_features):
"""Tạo ColumnTransformer xử lý numerical và categorical features.
Numerical: impute median + standard scaling
Categorical: impute most_frequent + one-hot encoding
"""
# TODO: Implement
passBài 2: Full Training Pipeline với Validation
Kết hợp preprocessing + model thành pipeline hoàn chỉnh, đánh giá đúng cách.
python
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.model_selection import cross_val_score, train_test_split
from sklearn.metrics import mean_absolute_error, r2_score
import joblib
def train_pipeline(X, y, numerical_features, categorical_features):
"""
1. Split train/test (80/20)
2. Build pipeline: preprocessor + model
3. Cross-validate trên train set
4. Final evaluation trên test set
5. Return: pipeline, cv_scores, test_metrics
"""
# TODO: Implement
passBài 3: Experiment Logging
Implement simple experiment tracking (không cần MLflow).
python
import json
from datetime import datetime
class ExperimentTracker:
"""Simple experiment tracker lưu kết quả vào JSON file."""
def __init__(self, experiment_name):
self.experiment_name = experiment_name
self.runs = []
def log_run(self, model_name, params, metrics, artifacts=None):
"""Log một experiment run."""
# TODO: Implement
def get_best_run(self, metric='test_mae', minimize=True):
"""Trả về run có metric tốt nhất."""
# TODO: Implement
def save(self, filepath='experiments.json'):
"""Save tất cả runs ra file."""
# TODO: ImplementGợi ý
💡 Xem gợi ý
- Bài 1: Dùng
ColumnTransformervớitransformerslist. Mỗi transformer là tuple(name, Pipeline, columns). - Bài 2:
Pipeline([('preprocessor', preprocessor), ('model', model)])kết hợp tất cả. Split trước khi fit để tránh leakage. - Bài 3: Mỗi run cần: timestamp, model_name, params, metrics, duration. Dùng
json.dumpsvớidefault=strcho datetime.
Lời giải
✅ Xem lời giải
python
# Bài 1
def build_preprocessor(numerical_features, categorical_features):
num_pipeline = Pipeline([
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler()),
])
cat_pipeline = Pipeline([
('imputer', SimpleImputer(strategy='most_frequent')),
('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False)),
])
return ColumnTransformer([
('num', num_pipeline, numerical_features),
('cat', cat_pipeline, categorical_features),
])
# Bài 2
def train_pipeline(X, y, numerical_features, categorical_features):
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
preprocessor = build_preprocessor(numerical_features, categorical_features)
pipeline = Pipeline([
('preprocessor', preprocessor),
('model', GradientBoostingRegressor(n_estimators=200, random_state=42)),
])
cv_scores = cross_val_score(
pipeline, X_train, y_train, cv=5,
scoring='neg_mean_absolute_error'
)
pipeline.fit(X_train, y_train)
y_pred = pipeline.predict(X_test)
test_metrics = {
'mae': mean_absolute_error(y_test, y_pred),
'r2': r2_score(y_test, y_pred),
}
joblib.dump(pipeline, 'model_pipeline.joblib')
return pipeline, -cv_scores, test_metrics
# Bài 3
class ExperimentTracker:
def __init__(self, experiment_name):
self.experiment_name = experiment_name
self.runs = []
def log_run(self, model_name, params, metrics, artifacts=None):
run = {
'run_id': len(self.runs) + 1,
'timestamp': datetime.now().isoformat(),
'model_name': model_name,
'params': params,
'metrics': metrics,
'artifacts': artifacts or [],
}
self.runs.append(run)
return run
def get_best_run(self, metric='test_mae', minimize=True):
if not self.runs:
return None
return sorted(
self.runs,
key=lambda r: r['metrics'].get(metric, float('inf')),
reverse=not minimize
)[0]
def save(self, filepath='experiments.json'):
data = {
'experiment': self.experiment_name,
'total_runs': len(self.runs),
'runs': self.runs,
}
with open(filepath, 'w') as f:
json.dump(data, f, indent=2, default=str)