Slides 🖼️

Week 2: ML Pipelines, Reproducibility and Experimentation

Learning objectives

  • Refactor a Jupyter notebook into a reproducible ML pipeline
  • Version artifacts of an ML pipeline in a remote storage
  • Iterate over a large number of ML experiments in a disciplined way

Steps

Refactor Jupyter notebook in a DVC pipeline

  • Docs: https://dvc.org/doc/start/data-pipelines

  • Create the following files to read parameter values from a file

    params.yaml

      base:
          project: bank_customer_churn
          raw_data_dir: data/raw
          countries:
          - France
          - Spain
          feat_cols:
          - CreditScore
          - Age
          - Tenure
          - Balance
          - NumOfProducts
          - HasCrCard
          - IsActiveMember
          - EstimatedSalary
          targ_col: Exited
          random_state: 42
    
      data_split:
          test_size: 0.25
          processed_data_dir: data/processed
    
      train:
          model_type: randomforest
          model_dir: models
          model_path: models/clf-model.joblib
          params:
              n_estimators: 200
              max_depth: 20
    

    src/utils/load_params.py

      import yaml
      from box import ConfigBox
    
      def load_params(params_path):
          with open(params_path, "r") as f:
              params = yaml.safe_load(f)
              params = ConfigBox(params)
          return params
    

  • Verify that load_params function works as expected

    # run the below code in python/ipython shell
    from src.utils.load_params import load_params
    params = load_params('params.yaml')
    params
    
  • Push files related to reading in parameters

    git add src/utils/load_params.py params.yaml
    git commit -m "add params and utils"
    
  • Refactor notebook into multiple stages (data_split, train, eval)

    src/stages/data_split.py
      import os
      import sys
      from pathlib import Path
      src_path = Path(__file__).parent.parent.resolve()
      sys.path.append(str(src_path))
    
      import argparse
      import pandas as pd
      from sklearn.model_selection import train_test_split
      from utils.load_params import load_params
    
      def data_split(params):
          raw_data_dir = Path(params.base.raw_data_dir)
          feat_cols = params.base.feat_cols
          countries = params.base.countries
          targ_col = params.base.targ_col
          random_state = params.base.random_state
          test_size = params.data_split.test_size
          processed_data_dir = Path(params.data_split.processed_data_dir)
          processed_data_dir.mkdir(exist_ok=True)
          data_file_paths = [raw_data_dir/f'Churn_Modelling_{country}.csv'  for country in countries]
          df = pd.concat([pd.read_csv(fpath) for fpath in data_file_paths])
          X, y = df[feat_cols], df[targ_col]
          X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size, random_state=random_state)
          X_train.to_pickle(processed_data_dir/'X_train.pkl')
          X_test.to_pickle(processed_data_dir/'X_test.pkl')
          y_train.to_pickle(processed_data_dir/'y_train.pkl')
          y_test.to_pickle(processed_data_dir/'y_test.pkl')
    
      if __name__ == '__main__':
          args_parser = argparse.ArgumentParser()
          args_parser.add_argument('--config', dest='config', required=True)
          args = args_parser.parse_args()
    
          params = load_params(params_path=args.config)
          data_split(params)
    
    src/stages/train.py
      import sys
      from pathlib import Path
    
      src_path = Path(__file__).parent.parent.resolve()
      sys.path.append(str(src_path))
    
      import argparse
    
      import pandas as pd
      from joblib import dump
      from sklearn.compose import ColumnTransformer
      from sklearn.ensemble import RandomForestClassifier
      from sklearn.impute import SimpleImputer
      from sklearn.pipeline import Pipeline
      from sklearn.preprocessing import StandardScaler
      from utils.load_params import load_params
      from xgboost import XGBClassifier
    
      def train(params):
          processed_data_dir = Path(params.data_split.processed_data_dir)
          random_state = params.base.random_state
          feat_cols = params.base.feat_cols
          model_type = params.train.model_type
          model_dir = Path(params.train.model_dir)
          model_dir.mkdir(exist_ok=True)
          model_path = Path(params.train.model_path)
          train_params = params.train.params
    
          X_train = pd.read_pickle(processed_data_dir/'X_train.pkl')
          y_train = pd.read_pickle(processed_data_dir/'y_train.pkl')
    
          if model_type == "randomforest":
              clf = RandomForestClassifier(random_state=random_state,
                                       **train_params)
          elif model_type == "xgboost":
              clf = XGBClassifier(random_state=random_state,
                                          **train_params)
    
          numeric_transformer = Pipeline(
              steps=[
                  ("imputer", SimpleImputer()),
                  ("scaler", StandardScaler())
                  ]
              )
    
          preprocessor = ColumnTransformer(
              transformers=[("num", numeric_transformer, feat_cols)]
          )
          model = Pipeline(
              steps=[("preprocessor", preprocessor), ("clf", clf)]
              )
    
          model.fit(X_train, y_train)
          dump(model, model_path)
    
      if __name__ == '__main__':
          args_parser = argparse.ArgumentParser()
          args_parser.add_argument('--config', dest='config', required=True)
          args = args_parser.parse_args()
          params = load_params(params_path=args.config)
          train(params)
    
    src/stages/eval.py
      import sys
      from pathlib import Path
    
      src_path = Path(__file__).parent.parent.resolve()
      sys.path.append(str(src_path))
    
      import argparse
      import json
    
      import matplotlib.pyplot as plt
      import pandas as pd
      import seaborn as sns
      from eli5.sklearn import PermutationImportance
      from joblib import load
      from sklearn.metrics import (confusion_matrix, f1_score, make_scorer,
                                   roc_auc_score)
      from utils.load_params import load_params
    
      def eval(params):
          processed_data_dir = Path(params.data_split.processed_data_dir)
          model_path = Path(params.train.model_path)
          feat_cols = params.base.feat_cols
          random_state = params.base.random_state
    
          X_test = pd.read_pickle(processed_data_dir/'X_test.pkl')
          y_test = pd.read_pickle(processed_data_dir/'y_test.pkl')
          model = load(model_path)
          y_prob = model.predict_proba(X_test).astype(float)
          y_prob = y_prob[:, 1]
          y_pred = y_prob >= 0.5
    
          f1 = f1_score(y_test, y_pred)
          roc_auc = roc_auc_score(y_test, y_prob)
          metrics = {
              'f1': f1,
              'roc_auc': roc_auc
          }
          json.dump(
              obj=metrics,
              fp=open('metrics.json', 'w'),
              indent=4,
              sort_keys=True
          )
    
          reports_dir = Path('reports')
          reports_dir.mkdir(exist_ok=True)
          fig_dir = reports_dir/'figures'
          fig_dir.mkdir(exist_ok=True)
    
          cm = confusion_matrix(y_test, y_pred, normalize='true')
          sns.heatmap(cm, annot=True, cmap=plt.cm.Blues)
          plt.savefig(fig_dir/'cm.png')
    
          out_feat_names = model[:-1].get_feature_names_out(feat_cols)
          preprocessor = model.named_steps['preprocessor']
          clf = model.named_steps['clf']
          X_test_transformed = preprocessor.transform(X_test)
    
          perm = PermutationImportance(clf, scoring=make_scorer(f1_score), random_state=random_state).fit(X_test_transformed, y_test)
    
          feat_imp = zip(out_feat_names, perm.feature_importances_)
          df_feat_imp = pd.DataFrame(feat_imp, columns=[
              'feature',
              'importance'])
          df_feat_imp = df_feat_imp.sort_values(by='importance', ascending=False)
          feat_importance_fpath = reports_dir/'feat_imp.csv'
          df_feat_imp.to_csv(feat_importance_fpath, index=False, float_format='%.2f')
    
      if __name__ == '__main__':
          args_parser = argparse.ArgumentParser()
          args_parser.add_argument('--config', dest='config', required=True)
          args = args_parser.parse_args()
          params = load_params(params_path=args.config)
          eval(params)
    
  • Verify that each stage runs without errors

    python src/stages/data_split.py --config params.yaml
    python src/stages/train.py --config params.yaml
    python src/stages/eval.py --config params.yaml
    
  • Push stages to git

    git add src/stages/
    git commit -m "add pipeline stages"
    
Define DVC pipeline dvc.yaml
stages:
    data_split:
        cmd: python src/stages/data_split.py --config=params.yaml
        deps:
        - src/stages/data_split.py
        - data/raw
        params:
        - base
        - data_split
        outs:
        - data/processed
    train:
        cmd: python src/stages/train.py --config=params.yaml
        deps:
        - src/stages/train.py
        - data/processed
        params:
        - base
        - train
        outs:
        - models/clf-model.joblib
    eval:
        cmd: python src/stages/eval.py --config=params.yaml
        deps:
        - src/stages/eval.py
        - data/processed
        - models/clf-model.joblib
        params:
        - base
        - data_split
        - train
        outs:
        - reports/feat_imp.csv:
            cache: false
        metrics:
        - metrics.json:
            cache: false
        plots:
        - reports/figures/cm.png:
            cache: false
  • Run the pipeline

    dvc exp run
    
  • Push resulting files to Git

    git add .
    git commit -m "add DVC pipeline"
    
  • Push resulting artifacts to DVC remote

Experiment Management