Slides 🖼️
Week 2: ML Pipelines, Reproducibility and Experimentation
Learning objectives
- Refactor a Jupyter notebook into a reproducible ML pipeline
- Version artifacts of an ML pipeline in a remote storage
- Iterate over a large number of ML experiments in a disciplined way
Steps
Refactor Jupyter notebook in a DVC pipeline
Create the following files to read parameter values from a file
params.yaml
base: project: bank_customer_churn raw_data_dir: data/raw countries: - France - Spain feat_cols: - CreditScore - Age - Tenure - Balance - NumOfProducts - HasCrCard - IsActiveMember - EstimatedSalary targ_col: Exited random_state: 42 data_split: test_size: 0.25 processed_data_dir: data/processed train: model_type: randomforest model_dir: models model_path: models/clf-model.joblib params: n_estimators: 200 max_depth: 20
src/utils/load_params.py
import yaml from box import ConfigBox def load_params(params_path): with open(params_path, "r") as f: params = yaml.safe_load(f) params = ConfigBox(params) return params
Verify that
load_params
function works as expected# run the below code in python/ipython shell from src.utils.load_params import load_params params = load_params('params.yaml') params
Push files related to reading in parameters
git add src/utils/load_params.py params.yaml git commit -m "add params and utils"
Refactor notebook into multiple stages (
data_split
,train
,eval
)src/stages/data_split.py
import os import sys from pathlib import Path src_path = Path(__file__).parent.parent.resolve() sys.path.append(str(src_path)) import argparse import pandas as pd from sklearn.model_selection import train_test_split from utils.load_params import load_params def data_split(params): raw_data_dir = Path(params.base.raw_data_dir) feat_cols = params.base.feat_cols countries = params.base.countries targ_col = params.base.targ_col random_state = params.base.random_state test_size = params.data_split.test_size processed_data_dir = Path(params.data_split.processed_data_dir) processed_data_dir.mkdir(exist_ok=True) data_file_paths = [raw_data_dir/f'Churn_Modelling_{country}.csv' for country in countries] df = pd.concat([pd.read_csv(fpath) for fpath in data_file_paths]) X, y = df[feat_cols], df[targ_col] X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size, random_state=random_state) X_train.to_pickle(processed_data_dir/'X_train.pkl') X_test.to_pickle(processed_data_dir/'X_test.pkl') y_train.to_pickle(processed_data_dir/'y_train.pkl') y_test.to_pickle(processed_data_dir/'y_test.pkl') if __name__ == '__main__': args_parser = argparse.ArgumentParser() args_parser.add_argument('--config', dest='config', required=True) args = args_parser.parse_args() params = load_params(params_path=args.config) data_split(params)
src/stages/train.py
import sys from pathlib import Path src_path = Path(__file__).parent.parent.resolve() sys.path.append(str(src_path)) import argparse import pandas as pd from joblib import dump from sklearn.compose import ColumnTransformer from sklearn.ensemble import RandomForestClassifier from sklearn.impute import SimpleImputer from sklearn.pipeline import Pipeline from sklearn.preprocessing import StandardScaler from utils.load_params import load_params from xgboost import XGBClassifier def train(params): processed_data_dir = Path(params.data_split.processed_data_dir) random_state = params.base.random_state feat_cols = params.base.feat_cols model_type = params.train.model_type model_dir = Path(params.train.model_dir) model_dir.mkdir(exist_ok=True) model_path = Path(params.train.model_path) train_params = params.train.params X_train = pd.read_pickle(processed_data_dir/'X_train.pkl') y_train = pd.read_pickle(processed_data_dir/'y_train.pkl') if model_type == "randomforest": clf = RandomForestClassifier(random_state=random_state, **train_params) elif model_type == "xgboost": clf = XGBClassifier(random_state=random_state, **train_params) numeric_transformer = Pipeline( steps=[ ("imputer", SimpleImputer()), ("scaler", StandardScaler()) ] ) preprocessor = ColumnTransformer( transformers=[("num", numeric_transformer, feat_cols)] ) model = Pipeline( steps=[("preprocessor", preprocessor), ("clf", clf)] ) model.fit(X_train, y_train) dump(model, model_path) if __name__ == '__main__': args_parser = argparse.ArgumentParser() args_parser.add_argument('--config', dest='config', required=True) args = args_parser.parse_args() params = load_params(params_path=args.config) train(params)
src/stages/eval.py
import sys from pathlib import Path src_path = Path(__file__).parent.parent.resolve() sys.path.append(str(src_path)) import argparse import json import matplotlib.pyplot as plt import pandas as pd import seaborn as sns from eli5.sklearn import PermutationImportance from joblib import load from sklearn.metrics import (confusion_matrix, f1_score, make_scorer, roc_auc_score) from utils.load_params import load_params def eval(params): processed_data_dir = Path(params.data_split.processed_data_dir) model_path = Path(params.train.model_path) feat_cols = params.base.feat_cols random_state = params.base.random_state X_test = pd.read_pickle(processed_data_dir/'X_test.pkl') y_test = pd.read_pickle(processed_data_dir/'y_test.pkl') model = load(model_path) y_prob = model.predict_proba(X_test).astype(float) y_prob = y_prob[:, 1] y_pred = y_prob >= 0.5 f1 = f1_score(y_test, y_pred) roc_auc = roc_auc_score(y_test, y_prob) metrics = { 'f1': f1, 'roc_auc': roc_auc } json.dump( obj=metrics, fp=open('metrics.json', 'w'), indent=4, sort_keys=True ) reports_dir = Path('reports') reports_dir.mkdir(exist_ok=True) fig_dir = reports_dir/'figures' fig_dir.mkdir(exist_ok=True) cm = confusion_matrix(y_test, y_pred, normalize='true') sns.heatmap(cm, annot=True, cmap=plt.cm.Blues) plt.savefig(fig_dir/'cm.png') out_feat_names = model[:-1].get_feature_names_out(feat_cols) preprocessor = model.named_steps['preprocessor'] clf = model.named_steps['clf'] X_test_transformed = preprocessor.transform(X_test) perm = PermutationImportance(clf, scoring=make_scorer(f1_score), random_state=random_state).fit(X_test_transformed, y_test) feat_imp = zip(out_feat_names, perm.feature_importances_) df_feat_imp = pd.DataFrame(feat_imp, columns=[ 'feature', 'importance']) df_feat_imp = df_feat_imp.sort_values(by='importance', ascending=False) feat_importance_fpath = reports_dir/'feat_imp.csv' df_feat_imp.to_csv(feat_importance_fpath, index=False, float_format='%.2f') if __name__ == '__main__': args_parser = argparse.ArgumentParser() args_parser.add_argument('--config', dest='config', required=True) args = args_parser.parse_args() params = load_params(params_path=args.config) eval(params)
Verify that each stage runs without errors
python src/stages/data_split.py --config params.yaml python src/stages/train.py --config params.yaml python src/stages/eval.py --config params.yaml
Push stages to git
git add src/stages/ git commit -m "add pipeline stages"
Define DVC pipeline dvc.yaml
stages:
data_split:
cmd: python src/stages/data_split.py --config=params.yaml
deps:
- src/stages/data_split.py
- data/raw
params:
- base
- data_split
outs:
- data/processed
train:
cmd: python src/stages/train.py --config=params.yaml
deps:
- src/stages/train.py
- data/processed
params:
- base
- train
outs:
- models/clf-model.joblib
eval:
cmd: python src/stages/eval.py --config=params.yaml
deps:
- src/stages/eval.py
- data/processed
- models/clf-model.joblib
params:
- base
- data_split
- train
outs:
- reports/feat_imp.csv:
cache: false
metrics:
- metrics.json:
cache: false
plots:
- reports/figures/cm.png:
cache: false
Run the pipeline
dvc exp run
Push resulting files to Git
git add . git commit -m "add DVC pipeline"
Push resulting artifacts to DVC remote
Experiment Management
Experiment management via command line
Experiment management via DVC extension for VS Code
Run a few experiments manually
Create grid search script
- Create
grid_search.sh
#!/bin/sh dvc exp run -S 'train.params.n_estimators=range(50, 200, 30)' \ -S 'train.params.max_depth=range(10, 25, 5)' --queue
- Push it to git
git add grid_search.sh git commit -m "add grid search script"
- Create
Run grid search:
bash grid_search.sh
Review Experiment Tracking tab in DVC extension
Apply the best performing experiment to workspace
Push artifacts and code to DVC remote and GitHub:
dvc push git add . git commit -m "tune hyperparams" git push