- Model Deployment
- Overview
- Model deployment is the process of taking a trained machine learning model and making it available for production use through APIs, web services, or batch processing systems.
- When to Use
- When productionizing trained models for real-world inference and predictions
- When building REST APIs or web services for model serving
- When scaling predictions to serve multiple users or applications
- When deploying models to cloud platforms, edge devices, or containers
- When implementing CI/CD pipelines for ML model updates
- When creating batch processing systems for large-scale predictions
- Deployment Approaches
- REST APIs
-
- Flask, FastAPI for synchronous inference
- Batch Processing
-
- Scheduled jobs for large-scale predictions
- Real-time Streaming
-
- Kafka, Spark Streaming for continuous data
- Serverless
-
- AWS Lambda, Google Cloud Functions
- Edge Deployment
-
- TensorFlow Lite, ONNX for edge devices
- Model Serving
-
- TensorFlow Serving, Seldon Core, BentoML
- Key Considerations
- Model Format
-
- Pickle, SavedModel, ONNX, PMML
- Scalability
-
- Load balancing, auto-scaling
- Latency
-
- Response time requirements
- Monitoring
-
- Model drift, performance metrics
- Versioning
- Multiple model versions in production Python Implementation import numpy as np import pandas as pd import pickle import json from sklearn . ensemble import RandomForestClassifier from sklearn . preprocessing import StandardScaler from sklearn . datasets import make_classification import joblib
FastAPI for REST API
from fastapi import FastAPI , HTTPException , BackgroundTasks from pydantic import BaseModel , Field import uvicorn
For model serving
import mlflow . pyfunc import mlflow . sklearn
Docker and deployment
import logging import time from typing import List , Dict
Configure logging
logging . basicConfig ( level = logging . INFO ) logger = logging . getLogger ( name ) print ( "=== 1. Train and Save Model ===" )
Create dataset
X , y = make_classification ( n_samples = 1000 , n_features = 20 , random_state = 42 ) scaler = StandardScaler ( ) X_scaled = scaler . fit_transform ( X )
Train model
model
RandomForestClassifier ( n_estimators = 100 , max_depth = 10 , random_state = 42 ) model . fit ( X_scaled , y )
Save model and preprocessing
model_path
'/tmp/model.pkl' scaler_path = '/tmp/scaler.pkl' joblib . dump ( model , model_path ) joblib . dump ( scaler , scaler_path ) print ( f"Model saved to { model_path } " ) print ( f"Scaler saved to { scaler_path } " )
2. Model Serving Class
print ( "\n=== 2. Model Serving Class ===" ) class ModelPredictor : def init ( self , model_path , scaler_path ) : self . model = joblib . load ( model_path ) self . scaler = joblib . load ( scaler_path ) self . load_time = time . time ( ) self . predictions_count = 0 logger . info ( "Model loaded successfully" ) def predict ( self , features : List [ List [ float ] ] ) -
Dict : try : X = np . array ( features ) X_scaled = self . scaler . transform ( X ) predictions = self . model . predict ( X_scaled ) probabilities = self . model . predict_proba ( X_scaled ) self . predictions_count += len ( X ) return { 'predictions' : predictions . tolist ( ) , 'probabilities' : probabilities . tolist ( ) , 'count' : len ( X ) , 'timestamp' : time . time ( ) } except Exception as e : logger . error ( f"Prediction error: { str ( e ) } " ) raise def health_check ( self ) -
Dict : return { 'status' : 'healthy' , 'uptime' : time . time ( ) - self . load_time , 'predictions' : self . predictions_count }
Initialize predictor
predictor
ModelPredictor ( model_path , scaler_path )
3. FastAPI Application
print ( "\n=== 3. FastAPI Application ===" ) app = FastAPI ( title = "ML Model API" , description = "Production ML model serving API" , version = "1.0.0" ) class PredictionRequest ( BaseModel ) : features : List [ List [ float ] ] = Field ( . . . , example = [ [ 1.0 , 2.0 , 3.0 ] ] ) class PredictionResponse ( BaseModel ) : predictions : List [ int ] probabilities : List [ List [ float ] ] count : int timestamp : float class HealthResponse ( BaseModel ) : status : str uptime : float predictions : int @app . get ( "/health" , response_model = HealthResponse ) async def health_check ( ) : """Health check endpoint""" return predictor . health_check ( ) @app . post ( "/predict" , response_model = PredictionResponse ) async def predict ( request : PredictionRequest ) : """Make predictions""" try : result = predictor . predict ( request . features ) return result except Exception as e : raise HTTPException ( status_code = 400 , detail = str ( e ) ) @app . post ( "/predict-batch" ) async def predict_batch ( requests : List [ PredictionRequest ] , background_tasks : BackgroundTasks ) : """Batch prediction with background processing""" all_features = [ ] for req in requests : all_features . extend ( req . features ) result = predictor . predict ( all_features ) background_tasks . add_task ( logger . info , f"Batch prediction processed: { result [ 'count' ] } samples" ) return result @app . get ( "/stats" ) async def get_stats ( ) : """Get model statistics""" return { 'model_type' : type ( predictor . model ) . name , 'n_estimators' : predictor . model . n_estimators , 'max_depth' : predictor . model . max_depth , 'feature_importance' : predictor . model . feature_importances_ . tolist ( ) , 'total_predictions' : predictor . predictions_count }
4. Dockerfile template
print ( "\n=== 4. Dockerfile Template ===" ) dockerfile_content = '''FROM python:3.9-slim WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt COPY model.pkl . COPY scaler.pkl . COPY app.py . EXPOSE 8000 CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"] ''' print ( "Dockerfile content:" ) print ( dockerfile_content )
5. Requirements file
print ( "\n=== 5. Requirements.txt ===" ) requirements = """fastapi==0.104.1 uvicorn[standard]==0.24.0 numpy==1.24.0 pandas==2.1.0 scikit-learn==1.3.2 joblib==1.3.2 pydantic==2.5.0 mlflow==2.8.1 """ print ( "Requirements:" ) print ( requirements )
6. Docker Compose for deployment
print ( "\n=== 6. Docker Compose Template ===" ) docker_compose = '''version: '3.8' services: ml-api: build: . ports: - "8000:8000" environment: - LOG_LEVEL=info - WORKERS=4 restart: unless-stopped healthcheck: test: ["CMD", "curl", "-f", "http://localhost:8000/health"] interval: 10s timeout: 5s retries: 3 ml-monitor: image: prom/prometheus:latest ports: - "9090:9090" volumes: - ./prometheus.yml:/etc/prometheus/prometheus.yml command: - "--config.file=/etc/prometheus/prometheus.yml" ml-dashboard: image: grafana/grafana:latest ports: - "3000:3000" environment: - GF_SECURITY_ADMIN_PASSWORD=admin volumes: - ./grafana/dashboards:/etc/grafana/provisioning/dashboards ''' print ( "Docker Compose content:" ) print ( docker_compose )
7. Testing the API
print ( "\n=== 7. Testing the API ===" ) def test_predictor ( ) :
Test single prediction
test_features
[ [ 1.0 , 2.0 , 3.0 , 4.0 , 5.0 , 6.0 , 7.0 , 8.0 , 9.0 , 10.0 , 1.1 , 2.1 , 3.1 , 4.1 , 5.1 , 6.1 , 7.1 , 8.1 , 9.1 , 10.1 ] ] result = predictor . predict ( test_features ) print ( f"Prediction result: { result } " )
Health check
health
predictor . health_check ( ) print ( f"Health status: { health } " )
Batch predictions
batch_features
[ [ 1.0 ] * 20 , [ 2.0 ] * 20 , [ 3.0 ] * 20 , ] batch_result = predictor . predict ( batch_features ) print ( f"Batch prediction: { batch_result [ 'count' ] } samples processed" ) test_predictor ( )
8. Model versioning and registry
print ( "\n=== 8. Model Registry with MLflow ===" )
Log model to MLflow
with mlflow . start_run ( ) : mlflow . sklearn . log_model ( model , "model" ) mlflow . log_param ( "max_depth" , 10 ) mlflow . log_param ( "n_estimators" , 100 ) mlflow . log_metric ( "accuracy" , 0.95 ) model_uri = "runs:/" + mlflow . active_run ( ) . info . run_id + "/model" print ( f"Model logged to MLflow: { model_uri } " )
9. Deployment monitoring code
- (
- "\n=== 9. Monitoring Setup ==="
- )
- class
- ModelMonitor
- :
- def
- init
- (
- self
- )
- :
- self
- .
- predictions
- =
- [
- ]
- self
- .
- latencies
- =
- [
- ]
- def
- log_prediction
- (
- self
- ,
- features
- ,
- prediction
- ,
- latency
- )
- :
- self
- .
- predictions
- .
- append
- (
- {
- 'timestamp'
- :
- time
- .
- time
- (
- )
- ,
- 'features_mean'
- :
- np
- .
- mean
- (
- features
- )
- ,
- 'prediction'
- :
- prediction
- ,
- 'latency_ms'
- :
- latency
- *
- 1000
- }
- )
- def
- check_model_drift
- (
- self
- )
- :
- if
- len
- (
- self
- .
- predictions
- )
- <
- 100
- :
- return
- {
- 'drift_detected'
- :
- False
- }
- recent_predictions
- =
- [
- p
- [
- 'prediction'
- ]
- for
- p
- in
- self
- .
- predictions
- [
- -
- 100
- :
- ]
- ]
- historical_mean
- =
- np
- .
- mean
- (
- [
- p
- [
- 'prediction'
- ]
- for
- p
- in
- self
- .
- predictions
- [
- :
- -
- 100
- ]
- ]
- )
- recent_mean
- =
- np
- .
- mean
- (
- recent_predictions
- )
- drift
- =
- abs
- (
- recent_mean
- -
- historical_mean
- )
- >
- 0.1
- return
- {
- 'drift_detected'
- :
- drift
- ,
- 'historical_mean'
- :
- float
- (
- historical_mean
- )
- ,
- 'recent_mean'
- :
- float
- (
- recent_mean
- )
- ,
- 'threshold'
- :
- 0.1
- }
- def
- get_stats
- (
- self
- )
- :
- if
- not
- self
- .
- latencies
- :
- return
- {
- }
- return
- {
- 'avg_latency_ms'
- :
- np
- .
- mean
- (
- self
- .
- latencies
- )
- *
- 1000
- ,
- 'p95_latency_ms'
- :
- np
- .
- percentile
- (
- self
- .
- latencies
- ,
- 95
- )
- *
- 1000
- ,
- 'p99_latency_ms'
- :
- np
- .
- percentile
- (
- self
- .
- latencies
- ,
- 99
- )
- *
- 1000
- ,
- 'total_predictions'
- :
- len
- (
- self
- .
- predictions
- )
- }
- monitor
- =
- ModelMonitor
- (
- )
- (
- "\nDeployment setup completed!"
- )
- (
- "To run FastAPI server: uvicorn app:app --reload"
- )
- Deployment Checklist
- Model format and serialization
- Input/output validation
- Error handling and logging
- Authentication and security
- Rate limiting and throttling
- Health check endpoints
- Monitoring and alerting
- Version management
- Rollback procedures
- Cloud Deployment Options
- AWS
-
- SageMaker, Lambda, EC2
- GCP
-
- Vertex AI, Cloud Run, App Engine
- Azure
-
- Machine Learning, App Service
- Kubernetes
- Self-managed on-premises Performance Optimization Model quantization for smaller size Caching predictions Batch processing GPU acceleration Request pooling Deliverables Deployed model endpoint API documentation Docker configuration Monitoring dashboard Deployment guide Performance benchmarks Scaling recommendations