Skip to content

ColtAllen/mle-deployment-example

Repository files navigation

Customer Group Profitability Prediction Pipeline

Test Coverage Python 3.13 Deployed

A deployed production ML pipeline to target customer groups for marketing campaigns. Built on GCP with scale-to-zero architecture.

ML Challenge

Business Impact: CatBoost model generates 191% profit lift over uniform random baseline targeting.


Key Metrics

Metric Value
Model Accuracy 62%
Profit Lift 191% vs random baseline
Targeting Accuracy 78% on profitable campaigns
Monthly Infrastructure Cost ~€15
API Latency <500ms

See notebooks/eda.ipynb and notebooks/model_tuning.ipynb for detailed analyses.


Engineering Challenge

Requirements

Project Structure

├── src/
│   ├── common/              # Shared config, GCS client, CLI utils
│   ├── data_etl/            # Data simulation and BigQuery writes
│   ├── serving/             # Litestar API application
│   └── training/            # CatBoost training pipeline
├── dbt/
│   └── models/              # Staging and mart SQL transformations
├── terraform/               # GCP infrastructure as code
├── dockerfiles/             # Container images (api, ml, data)
├── tests/                   # Unit and integration tests
├── notebooks/               # EDA and model tuning analysis
├── docs/                    # PRD, architecture, code metrics
└── config.yaml              # Central configuration

Architecture Diagram

flowchart TB
    subgraph Orchestration["Orchestration"]
        SCHED[Cloud Scheduler<br/>Weekly Triggers]
    end

    subgraph Jobs["Cloud Run Jobs"]
        SIM[Data Sim Job<br/>30% Spurious]
        DBT[dbt Job<br/>Transforms]
        TRAIN[Training Job<br/>CatBoost]
    end

    subgraph Data["BigQuery"]
        RAW[(raw)]
        STG[(staging)]
        MARTS[(marts)]
    end

    subgraph Storage["Cloud Storage"]
        PARQUET[raw.parquet<br/>Bootstrap Source]
        MODELS[catboost/<br/>Models]
    end

    subgraph MLOps["Vertex AI"]
        EXP[Experiments]
        REG[Model Registry]
    end

    subgraph Serving["Cloud Run Service"]
        API[Litestar API<br/>Scale-to-Zero]
    end

    SCHED --> SIM
    SCHED --> DBT
    SCHED --> TRAIN

    PARQUET --> SIM
    SIM --> RAW
    RAW --> DBT
    DBT --> STG
    STG --> MARTS

    MARTS --> TRAIN
    TRAIN -->|Conditional| EXP
    TRAIN -->|Conditional| REG
    TRAIN -->|Conditional| MODELS

    MODELS --> API
Loading

Event Sequence Diagram

sequenceDiagram
    participant Scheduler as Cloud Scheduler
    participant DataJob as Data Sim Job
    participant BQ as BigQuery
    participant dbt as dbt Models
    participant TrainJob as Training Job
    participant VAI as Vertex AI
    participant GCS as Cloud Storage
    participant API as Prediction API
    participant Client as API Client

    Scheduler->>DataJob: Trigger weekly
    Scheduler->>TrainJob: Trigger weekly
    DataJob->>BQ: Write synthetic data (bq_raw)
    GCS->>DataJob: Load raw parquet for boostrapping

    Note over dbt: dbt run
    dbt->>BQ: Transform to marts
    dbt->>BQ: Calculate drift metrics (dim_feature_metadata)

    alt RETRAIN_FLAG = TRUE
        dbt->>TrainJob: Trigger training
        BQ->>TrainJob: Load mart_features, mart_targets
        TrainJob->>TrainJob: K-fold CV + final training
        TrainJob->>VAI: Log metrics to Experiments
        TrainJob->>VAI: Register model version
        TrainJob->>GCS: Upload model.cbm
    end

    Client->>API: POST /predict
    GCS->>API: Load model (on startup)
    API->>Client: Return prediction
Loading

1. ETL Layer

Data Simulation (scheduled via Cloud Scheduler)
   ├── Load raw.parquet from Cloud Storaage
   ├── Bootstrap random samples with 30% feature drift probability
   └── Write to BigQuery bq_raw
dbt Transform (triggered after data sim)
   ├── stg_raw: Clean and type-cast
   ├── mart_features: 67 pre-campaign features
   ├── mart_targets: Target + post-campaign
   └── dim_feature_metadata: Calculate RETRAIN_FLAG

BigQuery Tables

Dataset Purpose
bq_raw Ingested customer group data
staging Type-casted data staged for dbt
marts Tested features and targets for training

dbt Models

models/
├── staging/
│   ├── stg_raw.sql               # Type casting, cleaning
│   └── schema.yml                # Source definitions
└── marts/
    ├── mart_features.sql         # 67 pre-campaign features
    ├── mart_targets.sql          # Target variable + post-campaign
    ├── dim_feature_metadata.sql  # Drift metrics, RETRAIN_FLAG
    └── schema.yml                # Data quality tests

Monitoring & Drift Detection

dbt Data Quality Tests

  • Schema tests: unique, not_null, accepted_values
  • Custom tests for data integrity

dbt-Based Drift Metrics

  • Calculated in dim_feature_metadata model
  • Per-feature statistics: mean, std, min, max, median
  • Z-score threshold for feature drift detection
  • Label distribution tracking
  • Sets RETRAIN_FLAG for training job

Conditional Retraining (RETRAIN_FLAG)

The dim_feature_metadata model calculates per-batch statistics and sets RETRAIN_FLAG = TRUE when:

  • First batch (no prior model exists)
  • Feature drift detected (Z-score > threshold)
  • Label distribution shift > 10%
  • Out-of-range and outlier features

2. Training Layer

Training (triggered on RETRAIN_FLAG)
   ├── Check RETRAIN_FLAG, exit if FALSE
   ├── Load from marts
   ├── K-fold CV for optimal iterations
   ├── Train final CatBoost model
   ├── Log metrics to Vertex AI
   └── Upload model to GCS, update latest_uri.txt

Training Pipeline

flowchart TB
    subgraph Trigger["Trigger"]
        FLAG{RETRAIN_FLAG<br/>= TRUE?}
    end

    subgraph Training["Training Job"]
        FETCH[Fetch Features + Targets<br/>from BigQuery marts]
        CV[K-Fold Cross Validation]
        TRAIN[Train Final CatBoost]
        EVAL[Evaluate Metrics]
    end

    subgraph Outputs["Outputs"]
        GCS[Upload model.cbm<br/>to GCS]
        VAI[Log Metrics<br/>to Vertex AI]
        PTR[Update latest_uri.txt]
    end

    FLAG -->|Yes| FETCH
    FLAG -->|No| EXIT[Exit Early]
    FETCH --> SPLIT
    SPLIT --> CV
    CV --> TRAIN
    TRAIN --> EVAL
    EVAL --> GCS
    EVAL --> VAI
    GCS --> PTR
Loading

3. Serving Layer

Live API: https://code-challenge-api-887551700802.europe-west3.run.app

Frontend Structure

Litestar Application
├── Lifespan
│   └── Load model from GCS on startup (app.state.model)
├── Routes
│   ├── /predict/batch         # POST - Authenticated (1-100 samples)
│   ├── /predict/demo          # GET - Public (random data)
│   ├── /predict/example-request  # GET - Public (payload template)
│   ├── /health                # GET - Public
│   └── /schema/rapidoc        # GET - Interactive docs
└── Exception Handlers
    └── PredictionError with structured responses

Inference Data Flow

1. Request received at /predict/batch
2. Validate features (msgspec schemas)
3. Load model from app.state (cached at startup)
4. Generate prediction (class label)
5. Return JSON: prediction, interpretation, model_version, timestamp

API Architecture

flowchart LR
    subgraph Client["API Client"]
        REQ[POST /predict/batch]
    end

    subgraph API["Litestar API"]
        VAL[Validate Features<br/>msgspec]
        PREDICT[CatBoost Predict]
        FORMAT[Format Response]
    end

    subgraph State["App State"]
        MODEL[Cached Model]
    end

    REQ --> VAL
    VAL --> PREDICT
    MODEL --> PREDICT
    PREDICT --> FORMAT
    FORMAT --> RESP[JSON Response]
Loading

ML Lifecycle Management

Extract, Transform, and Load (ETL) Raw Data (src/data_etl/)

The ETL job simulates weekly raw production data with configurable drift:

# Generates synthetic samples via bootstrap sampling
# 30% probability of introducing data drift (nulls, distribution shifts)
python -m src.data_etl.main --config config.yaml --n-samples 1000
Component Purpose
data_generation.py Bootstrap sampling from original dataset
bigquery.py BigQuery client with CRUD operations
main.py Cloud Run Job entry point

dbt Transformation Layer (dbt/)

dbt transforms raw data into ML-ready features with quality testing:

bq_raw (ingested) → stg_raw (cleaned) → mart_features + mart_targets (training)
                                      → dim_feature_metadata (drift metrics)
Model Type Purpose
stg_raw View Type casting, column standardization
mart_features Table 67 pre-campaign features for training
mart_targets Table Target labels + post-campaign data
dim_feature_metadata Table Per-batch statistics, drift detection, RETRAIN_FLAG

Conditional Retraining

The training job checks dim_feature_metadata.RETRAIN_FLAG before training:

-- RETRAIN_FLAG = TRUE when:
-- 1. First batch (no prior model)
-- 2. Feature drift detected (Z-score > threshold)
-- 3. Outlier feature values exceeding previous column max/min values
-- 4. Label distribution shift > 10%

Why conditional retraining? Avoids unnecessary compute costs by only training when data drift is detected. This pattern scales to production without wasting resources on redundant model updates.

Why dbt over Python for feature transformation? SQL-based transformations are accessible to analysts, data quality tests are declarative, and lineage is automatic. This makes the pipeline maintainable by a broader team.

Hyperparameter Optimization

Hyperparameter tuning was tested in the experiment notebook but not prioritized for production because:

  1. Marginal Gains: Early experiments showed <2% accuracy improvement with 10x compute cost
  2. Streamlined Production Deployment: Prioritize rapid deployment of low-cost architecture over model tuning. An imperfect model can still add considerable business value. For future consideration, Vertex AI Hyperparameter Tuning provides distributed search with early stopping and native integration with Experiments for run tracking.

Vertex AI Model Registry and Tracking

Vertex AI provides the MLOps backbone for experiment tracking and model governance:

Component Purpose Status
Experiments Track training runs, metrics, and hyperparameters Implemented
Model Registry Model artifact versioning with lineage tracking Implemented
Hyperparameter Tuning Distributed search with early stopping On Hold
Model Monitoring Concept drift and prediction distribution tracking Future

Why Vertex AI? Managed GCP tracking registry provides enterprise-grade experiment logging, model lineage, and audit trails for production ML systems.


ML Serving API

Litestar live API: https://code-challenge-api-887551700802.europe-west3.run.app

Endpoint Method Auth Description
/predict/batch POST API Key Batch predictions (1-100 samples)
/predict/demo GET None Demo with random data
/predict/example-request GET None Example request payload
/health GET None Health check
/schema/rapidoc GET None Interactive API docs

Why Litestar over FastAPI? Litestar provides built-in dependency injection and msgspec integration, reducing boilerplate while maintaining full OpenAPI compatibility. The lifespan management pattern provides cleaner model loading semantics.

Why msgspec over Pydantic? msgspec provides 10x faster serialization, struct immutability by default, and cleaner validation errors. For high-throughput APIs, this performance difference matters.

Example Request

# Demo prediction (no auth required)
curl -s https://code-challenge-api-887551700802.europe-west3.run.app/predict/demo

# Batch prediction
curl -X POST "https://code-challenge-api-887551700802.europe-west3.run.app/predict/batch" \
  -H "Content-Type: application/json" \
  -H "X-API-Key: your-api-key" \
  -d '{"predictions": [{"g1_1": 45.2, "g1_2": 12.8, ..., "c_27": 33.1}]}'

Example Response

{
  "prediction": 2,
  "interpretation": "group_2",
  "model_version": null,
  "timestamp": "2026-01-12T00:49:29.603876+00:00"
}

Coding Design Patterns

  • Single Responsibility: Separate modules for ETL (src/data_etl/), training (src/training/), serving (src/serving/)
  • Type Safety: Full mypy strict mode enforcement across all modules
  • Immutability: msgspec Struct with frozen semantics for configuration objects
  • Linting & Formatting: ruff checks ran on every version control commit
  • Configuration as Code: All settings in version-controlled config.yaml

Configuration Management

Single source of truth. config.yaml contains all project configuration settings:

# config.yaml (excerpt)
gcp:
  project_id: colt-allen-code-challenge
  region: europe-west3

training:
  categorical_features: [c_5, c_6, c_7, c_8, c_24]
  k_folds_cv: 5
  metric_payoff:
    correct_target: 100
    wrong_target: -50
    wrong_no_target: -100

catboost:
  loss_function: MultiClass
  early_stopping_rounds: 20
  langevin: true

CLI Commands

All pipeline components support CLI args to override parameter values in config.yaml. Auto-generated from msgspec structs for type-safe validation:

# Train with custom config
python -m src.training.main --config config.yaml

# Override specific values
python -m src.training.main --k-folds-cv 10 --langevin --no-verbose

# Generate 500 new rows of data for ETL
python -m src.data_etl.main --config config.yaml --n-samples 500

Code Quality Metrics

This project started with AI-generated scaffolding, and was refactored for production readiness. Radon complexity metrics quantify the human improvements made to the training module:

Metric AI-Generated After Refactoring Improvement
Lines of Code 1,321 680 -48%
Cyclomatic Complexity (main.py) 18 1 -94%

Full analysis: docs/code_metrics/


Testing Structure

tests/
├── unit/                    # Fast, isolated tests
│   ├── test_common.py       # Config loading, GCS client
│   ├── test_training.py     # Model training functions
│   └── test_serving.py      # API routes, prediction logic
├── integration/             # End-to-end pipeline tests
├── fixtures/                # Sample data, mock configs
└── conftest.py              # Shared fixtures, mock factories

Testing Patterns

Pattern Example Location
Fixtures sample_features_df, mock_gcs_client conftest.py
Mocking @patch for GCS, BigQuery, Vertex AI test_training.py
Parametrized Multiple input scenarios test_serving.py
Isolation No external dependencies in unit tests All unit tests

Running Tests

# Unit tests only
pytest tests/unit -v

# Integration tests
pytest tests/integration -v

# With coverage report
pytest --cov=src --cov-report=html

# Specific markers
pytest -m "not slow" tests/

Basic Commands

Prerequisites

  • Python 3.13+
  • uv (fast Python package manager)
  • Docker (optional, for containerized runs)
  • GCP account (optional, for cloud deployment)
  • Terraform 1.5+ (optional, for managing cloud infrastructure as code)

Litestat API Serving

# Clone or download repo and go to project folder
cd code-challenge

# Create virtual environment and install dependencies with uv
uv venv
source .venv/bin/activate
uv pip install -e ".[dev]"

# Start API locally in CLI
litestar --app src.serving.app:app

dbt ETL

cd dbt
dbt deps              # Install packages
dbt compile           # Generate executable SQL
dbt run               # Run transformations
dbt test              # Data quality tests
dbt docs generate     # Generate documentation
dbt docs serve --port 8080  # Serve docs locally (includes lineage graphs)

GCP Deployment

cd terraform

# Initialize and plan
terraform init
terraform plan

# Deploy infrastructure
terraform apply

# Build and push images to cloud Run jobs
docker build -t europe-west3-docker.pkg.dev/<project>/code-challenge/api:latest -f dockerfiles/api.Dockerfile .
docker push europe-west3-docker.pkg.dev/<project>/code-challenge/api:latest

# Run terraform again to sync state with cloud env
terraform apply

GCP Cost Analysis

Service Usage Monthly Cost
Cloud Run (API) Scale 0-10 instances, ~1hr/day active ~€3
Cloud Run (Jobs) 2 jobs, ~5min/day each ~€2
BigQuery 10GB storage, 1TB queries ~€5
Cloud Storage 100MB models + data <€1
Artifact Registry 3 images, 5 versions retained ~€2
Cloud Scheduler 2 triggers <€1
Vertex AI Experiments + Model Registry ~€1
Total ~€15/month

Why scale-to-zero? Cloud Run's pay-per-use model reduces costs to ~€13/month while maintaining production SLAs. For a demo this is ideal; in production with higher traffic, costs scale linearly with usage.

About

GCP data science pipeline deployment.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors