Slides 🖼️
Week 4: Data Drift Monitoring for ML Projects
Learning Objectives
- Distinguish between application monitoring and ML monitoring
- Use Alibi Detect framework to detect data drift
Steps
Introduction to Data Drift Monitoring
What’s data drift and why do we need to monitor for it?
Intro to Alibi Detect
Add
Churn_Modelling_Germany.csv
todata/more_data/
Add
/more_data
entry todata/.gitignore
Create and explore
notebooks/DriftDetection.ipynb
Incorporate drift detection into the DVC pipeline
Create src/stages/drift_detector.py
import sys
from pathlib import Path
src_path = Path(__file__).parent.parent.resolve()
sys.path.append(str(src_path))
import argparse
import pandas as pd
from alibi_detect.cd import TabularDrift
from alibi_detect.saving import save_detector
from joblib import load
from utils.load_params import load_params
def train_drift_detector(params):
processed_data_dir = Path(params.data_split.processed_data_dir)
model_dir = Path(params.train.model_dir)
model_path = Path(params.train.model_path)
model = load(model_path)
X_test = pd.read_pickle(processed_data_dir/'X_test.pkl')
X_train = pd.read_pickle(processed_data_dir/'X_train.pkl')
X = pd.concat([X_test, X_train])
feat_names = X.columns.tolist()
preprocessor = model[:-1]
categories_per_feature = {i:None for i,k in enumerate(feat_names) if k.startswith('cat__')}
cd = TabularDrift(X,
p_val=.05,
preprocess_fn=preprocessor.transform,
categories_per_feature=categories_per_feature)
detector_path = model_dir/'drift_detector'
save_detector(cd, detector_path)
if __name__ == '__main__':
args_parser = argparse.ArgumentParser()
args_parser.add_argument('--config', dest='config', required=True)
args = args_parser.parse_args()
params = load_params(params_path=args.config)
train_drift_detector(params)
Add
train_drift_detector
stage todvc.yaml
train_drift_detector: cmd: python src/stages/drift_detector.py --config=params.yaml deps: - src/stages/drift_detector.py - models/clf-model.joblib - data/processed params: - base - train - data_split outs: - models/drift_detector
Execute pipeline:
dvc exp run
Push new files with git and DVC
Update web app to store data drift metrics
Update src/app/main.py
import json
import sys
from pathlib import Path
import uvicorn
src_path = Path(__file__).parent.parent.resolve()
sys.path.append(str(src_path))
from typing import List
import datetime
import json
import os
import warnings
import pandas as pd
from alibi_detect.saving import load_detector
from fastapi import BackgroundTasks, FastAPI, Request, Body
from pydantic import BaseModel
from fastapi.middleware.cors import CORSMiddleware
from joblib import load
from sqlalchemy import create_engine
from utils.load_params import load_params
app = FastAPI()
# https://fastapi.tiangolo.com/tutorial/cors/#use-corsmiddleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
DATABASE_URL = os.environ['DATABASE_URL'].replace('postgres://', 'postgresql://')
params = load_params(params_path='params.yaml')
model_path = params.train.model_path
feat_cols = params.base.feat_cols
min_batch_size = params.drift_detect.min_batch_size
cd = load_detector(Path('models')/'drift_detector')
model = load(filename=model_path)
class Customer(BaseModel):
CreditScore: int
Age: int
Tenure: int
Balance: float
NumOfProducts: int
HasCrCard: int
IsActiveMember: int
EstimatedSalary: float
class Request(BaseModel):
data: List[Customer]
@app.post("/predict")
async def predict(background_tasks: BackgroundTasks, info: Request = Body(..., example={
"data": [
{
"CreditScore": 619,
"Age": 42,
"Tenure": 2,
"Balance": 0,
"NumOfProducts": 1,
"HasCrCard": 1,
"IsActiveMember": 1,
"EstimatedSalary": 101348.88
},
{
"CreditScore": 699,
"Age": 39,
"Tenure": 21,
"Balance": 0,
"NumOfProducts": 2,
"HasCrCard": 0,
"IsActiveMember": 0,
"EstimatedSalary": 93826.63
}
]
})):
json_list = json.loads(info.json())
data = json_list['data']
try:
background_tasks.add_task(collect_batch, json_list)
except:
warnings.warn("Unable to process batch data for drift detection")
input_data = pd.DataFrame(data)
probs = model.predict_proba(input_data)[:,0]
probs = probs.tolist()
return probs
@app.get("/drift_data")
async def get_drift_data():
engine = create_engine(DATABASE_URL)
with engine.connect() as conn:
sql_query = "SELECT * FROM p_val_table"
df_p_val = pd.read_sql(sql_query, con=conn)
engine.dispose()
parsed = json.loads(df_p_val.to_json())
return json.dumps(parsed)
def collect_batch(json_list, batch_size_thres = min_batch_size, batch = []):
data = json_list['data']
for req_json in data:
batch.append(req_json)
L = len(batch)
if L >= batch_size_thres:
X = pd.DataFrame.from_records(batch)
preds = cd.predict(X)
p_val = preds['data']['p_val']
now = datetime.datetime.now()
data = [[now] + p_val.tolist()]
columns = ['time'] + feat_cols
df_p_val = pd.DataFrame(data=data, columns=columns)
print('Writing to database')
engine = create_engine(DATABASE_URL)
with engine.connect() as conn:
df_p_val.to_sql('p_val_table', con=conn, if_exists='append', index=False)
engine.dispose()
batch.clear()
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
Add
drift_detect
section toparams.yaml
drift_detect: min_batch_size: 50
Run pipeline
dvc exp run
Push new files to DVC remote and git
Test a new version of the application
Create
DATABASE_URL
environment variableexport DATABASE_URL=sqlite:///testdb.sqlite
Launch web app locally
uvicorn src.app.main:app --host 0.0.0.0 --port 8080
Create, explore and run
notebooks/TestAPI.ipynb
Optionally, check the contents of
testdb.sqlite
with a SQL client (e.g. DBeaver)Delete
testdb.sqlite
Redeploy a new version of the web app
- In
.github/workflows/deploy-api.yaml
, replace line
dvc pull models/clf-model.joblib
with
dvc pull models/clf-model.joblib models/drift_detector
Push new files with git and DVC
Create and push a new git tag version
git tag deploy-v1.0.0 git push origin deploy-v1.0.0