Slides 🖼️

Week 4: Data Drift Monitoring for ML Projects

Learning Objectives

  • Distinguish between application monitoring and ML monitoring
  • Use Alibi Detect framework to detect data drift

Steps

Introduction to Data Drift Monitoring

  • What’s data drift and why do we need to monitor for it?

  • Intro to Alibi Detect

  • Add Churn_Modelling_Germany.csv to data/more_data/

    Churn_Modelling_Germany.csv

  • Add /more_data entry to data/.gitignore

  • Create and explore notebooks/DriftDetection.ipynb

    DriftDetection.ipynb

Incorporate drift detection into the DVC pipeline

Create src/stages/drift_detector.py
import sys
from pathlib import Path

src_path = Path(__file__).parent.parent.resolve()
sys.path.append(str(src_path))

import argparse
import pandas as pd
from alibi_detect.cd import TabularDrift
from alibi_detect.saving import save_detector
from joblib import load
from utils.load_params import load_params

def train_drift_detector(params):
    processed_data_dir = Path(params.data_split.processed_data_dir)
    model_dir = Path(params.train.model_dir)
    model_path = Path(params.train.model_path)
    model = load(model_path)

    X_test = pd.read_pickle(processed_data_dir/'X_test.pkl')
    X_train = pd.read_pickle(processed_data_dir/'X_train.pkl')
    X = pd.concat([X_test, X_train])

    feat_names = X.columns.tolist()
    preprocessor = model[:-1]
    categories_per_feature = {i:None for i,k in enumerate(feat_names) if k.startswith('cat__')}
    cd = TabularDrift(X, 
                    p_val=.05, 
                    preprocess_fn=preprocessor.transform,
                    categories_per_feature=categories_per_feature)

    detector_path = model_dir/'drift_detector'
    save_detector(cd, detector_path)

if __name__ == '__main__':
    args_parser = argparse.ArgumentParser()
    args_parser.add_argument('--config', dest='config', required=True)
    args = args_parser.parse_args()
    params = load_params(params_path=args.config)
    train_drift_detector(params)
  • Add train_drift_detector stage to dvc.yaml

    train_drift_detector:
        cmd: python src/stages/drift_detector.py --config=params.yaml
        deps:
          - src/stages/drift_detector.py
          - models/clf-model.joblib
          - data/processed
        params:
          - base
          - train
          - data_split
        outs:
          - models/drift_detector
    
  • Execute pipeline: dvc exp run

  • Push new files with git and DVC

Update web app to store data drift metrics

Update src/app/main.py
import json
import sys
from pathlib import Path

import uvicorn

src_path = Path(__file__).parent.parent.resolve()
sys.path.append(str(src_path))

from typing import List

import datetime
import json
import os
import warnings

import pandas as pd
from alibi_detect.saving import load_detector
from fastapi import BackgroundTasks, FastAPI, Request, Body
from pydantic import BaseModel
from fastapi.middleware.cors import CORSMiddleware
from joblib import load
from sqlalchemy import create_engine
from utils.load_params import load_params

app = FastAPI()
# https://fastapi.tiangolo.com/tutorial/cors/#use-corsmiddleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

DATABASE_URL = os.environ['DATABASE_URL'].replace('postgres://', 'postgresql://')

params = load_params(params_path='params.yaml')
model_path = params.train.model_path
feat_cols = params.base.feat_cols
min_batch_size = params.drift_detect.min_batch_size

cd = load_detector(Path('models')/'drift_detector')
model = load(filename=model_path)

class Customer(BaseModel):
    CreditScore: int
    Age: int
    Tenure: int
    Balance: float
    NumOfProducts: int
    HasCrCard: int
    IsActiveMember: int
    EstimatedSalary: float

class Request(BaseModel):
    data: List[Customer]

@app.post("/predict")
async def predict(background_tasks: BackgroundTasks, info: Request = Body(..., example={
    "data": [
        {
            "CreditScore": 619,
            "Age": 42,
            "Tenure": 2,
            "Balance": 0,
            "NumOfProducts": 1,
            "HasCrCard": 1,
            "IsActiveMember": 1,
            "EstimatedSalary": 101348.88
        },
        {
            "CreditScore": 699,
            "Age": 39,
            "Tenure": 21,
            "Balance": 0,
            "NumOfProducts": 2,
            "HasCrCard": 0,
            "IsActiveMember": 0,
            "EstimatedSalary": 93826.63
        }
    ]
})):
    json_list = json.loads(info.json())
    data = json_list['data']
    try:
        background_tasks.add_task(collect_batch, json_list)
    except:
        warnings.warn("Unable to process batch data for drift detection")
    input_data = pd.DataFrame(data)
    probs = model.predict_proba(input_data)[:,0]
    probs = probs.tolist()
    return probs

@app.get("/drift_data")
async def get_drift_data():
    engine = create_engine(DATABASE_URL)
    with engine.connect() as conn:
        sql_query = "SELECT * FROM p_val_table"
        df_p_val = pd.read_sql(sql_query, con=conn)
    engine.dispose()
    parsed = json.loads(df_p_val.to_json())
    return json.dumps(parsed) 

def collect_batch(json_list, batch_size_thres = min_batch_size, batch = []):
    data = json_list['data']
    for req_json in data:
        batch.append(req_json)
    L = len(batch)
    if L >= batch_size_thres:
        X = pd.DataFrame.from_records(batch)
        preds = cd.predict(X)
        p_val = preds['data']['p_val']
        now = datetime.datetime.now()
        data = [[now] + p_val.tolist()]
        columns = ['time'] + feat_cols
        df_p_val = pd.DataFrame(data=data, columns=columns)
        print('Writing to database')
        engine = create_engine(DATABASE_URL)
        with engine.connect() as conn:
            df_p_val.to_sql('p_val_table', con=conn, if_exists='append', index=False)
        engine.dispose()
        batch.clear()

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)
  • Add drift_detect section to params.yaml

    drift_detect:
      min_batch_size: 50
    
  • Run pipeline dvc exp run

  • Push new files to DVC remote and git

  • Test a new version of the application

    • Create DATABASE_URL environment variable

      export DATABASE_URL=sqlite:///testdb.sqlite
      
    • Launch web app locally

      uvicorn src.app.main:app --host 0.0.0.0 --port 8080
      
    • Create, explore and run notebooks/TestAPI.ipynb

      TestAPI.ipynb

    • Optionally, check the contents of testdb.sqlite with a SQL client (e.g. DBeaver)

    • Delete testdb.sqlite

Redeploy a new version of the web app

  • In .github/workflows/deploy-api.yaml, replace line

dvc pull models/clf-model.joblib

with

dvc pull models/clf-model.joblib models/drift_detector

  • Push new files with git and DVC

  • Create and push a new git tag version

    git tag deploy-v1.0.0
    git push origin deploy-v1.0.0