MLOps Workflows with MLflow
A comprehensive guide to production-grade MLOps workflows covering the complete machine learning lifecycle from experimentation to production deployment and monitoring.
Table of Contents
-
MLflow Components Overview
-
Experiment Tracking
-
Model Registry
-
Deployment Patterns
-
Monitoring and Observability
-
A/B Testing
-
Feature Stores
-
CI/CD for ML
-
Model Versioning
-
Production Best Practices
MLflow Components Overview
MLflow consists of four primary components for managing the ML lifecycle:
- MLflow Tracking
Track experiments, parameters, metrics, and artifacts during model development.
import mlflow
Set tracking URI
mlflow.set_tracking_uri("http://localhost:5000")
Create or set experiment
mlflow.set_experiment("production-models")
Start a run
with mlflow.start_run(run_name="baseline-model"): # Log parameters mlflow.log_param("learning_rate", 0.01) mlflow.log_param("batch_size", 32)
# Log metrics
mlflow.log_metric("accuracy", 0.95)
mlflow.log_metric("loss", 0.05)
# Log artifacts
mlflow.log_artifact("model_plot.png")
2. MLflow Projects
Package ML code in a reusable, reproducible format.
MLproject file
name: my-ml-project conda_env: conda.yaml
entry_points: main: parameters: learning_rate: {type: float, default: 0.01} epochs: {type: int, default: 100} command: "python train.py --lr {learning_rate} --epochs {epochs}"
evaluate: parameters: model_uri: {type: string} command: "python evaluate.py --model-uri {model_uri}"
- MLflow Models
Package models in a standard format for deployment across platforms.
import mlflow.sklearn from sklearn.ensemble import RandomForestClassifier
Train model
model = RandomForestClassifier() model.fit(X_train, y_train)
Log model with signature
from mlflow.models import infer_signature signature = infer_signature(X_train, model.predict(X_train))
mlflow.sklearn.log_model( sk_model=model, name="random-forest-model", signature=signature, input_example=X_train[:5], registered_model_name="ProductionClassifier" )
- MLflow Registry
Centralized model store for managing model lifecycle and versioning.
from mlflow import MlflowClient
client = MlflowClient()
Register model
model_uri = f"runs:/{run_id}/model" registered_model = mlflow.register_model( model_uri=model_uri, name="CustomerChurnModel" )
Set model alias for deployment
client.set_registered_model_alias( name="CustomerChurnModel", alias="production", version=registered_model.version )
Experiment Tracking
Basic Experiment Tracking
import mlflow import mlflow.sklearn from sklearn.ensemble import RandomForestRegressor from sklearn.metrics import mean_squared_error, r2_score from sklearn.model_selection import train_test_split
Configure MLflow
mlflow.set_tracking_uri("http://localhost:5000") mlflow.set_experiment("house-price-prediction")
Load and prepare data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
Training with MLflow tracking
with mlflow.start_run(run_name="rf-baseline"): # Define parameters params = { "n_estimators": 100, "max_depth": 10, "min_samples_split": 5, "random_state": 42 }
# Train model
model = RandomForestRegressor(**params)
model.fit(X_train, y_train)
# Evaluate
predictions = model.predict(X_test)
mse = mean_squared_error(y_test, predictions)
r2 = r2_score(y_test, predictions)
# Log everything
mlflow.log_params(params)
mlflow.log_metrics({
"mse": mse,
"r2": r2,
"rmse": mse ** 0.5
})
# Log model
mlflow.sklearn.log_model(
sk_model=model,
name="model",
registered_model_name="HousePricePredictor"
)
Autologging
MLflow provides automatic logging for popular frameworks:
import mlflow from sklearn.ensemble import RandomForestClassifier
Enable autologging for scikit-learn
mlflow.sklearn.autolog()
Your training code - everything is logged automatically
with mlflow.start_run(): model = RandomForestClassifier(n_estimators=100, max_depth=5) model.fit(X_train, y_train) predictions = model.predict(X_test)
Nested Runs for Hyperparameter Tuning
import mlflow from sklearn.model_selection import GridSearchCV from sklearn.ensemble import GradientBoostingClassifier
mlflow.set_experiment("hyperparameter-tuning")
Parent run for the entire tuning process
with mlflow.start_run(run_name="grid-search-parent"): param_grid = { 'learning_rate': [0.01, 0.1, 0.3], 'n_estimators': [50, 100, 200], 'max_depth': [3, 5, 7] }
# Log parent parameters
mlflow.log_param("tuning_method", "grid_search")
mlflow.log_param("cv_folds", 5)
best_score = 0
best_params = None
# Nested runs for each parameter combination
for lr in param_grid['learning_rate']:
for n_est in param_grid['n_estimators']:
for depth in param_grid['max_depth']:
with mlflow.start_run(nested=True, run_name=f"lr{lr}_n{n_est}_d{depth}"):
params = {
'learning_rate': lr,
'n_estimators': n_est,
'max_depth': depth
}
model = GradientBoostingClassifier(**params)
model.fit(X_train, y_train)
score = model.score(X_test, y_test)
mlflow.log_params(params)
mlflow.log_metric("accuracy", score)
if score > best_score:
best_score = score
best_params = params
# Log best results in parent run
mlflow.log_params({f"best_{k}": v for k, v in best_params.items()})
mlflow.log_metric("best_accuracy", best_score)
Tracking Multiple Metrics Over Time
import mlflow import numpy as np
with mlflow.start_run(): # Log metrics at different steps (epochs) for epoch in range(100): train_loss = np.random.random() * (1 - epoch/100) val_loss = np.random.random() * (1 - epoch/100) + 0.1
mlflow.log_metric("train_loss", train_loss, step=epoch)
mlflow.log_metric("val_loss", val_loss, step=epoch)
mlflow.log_metric("learning_rate", 0.01 * (0.95 ** epoch), step=epoch)
Logging Artifacts
import mlflow import matplotlib.pyplot as plt import pandas as pd
with mlflow.start_run(): # Log plot plt.figure(figsize=(10, 6)) plt.plot(history['loss'], label='Training Loss') plt.plot(history['val_loss'], label='Validation Loss') plt.legend() plt.savefig("loss_curve.png") mlflow.log_artifact("loss_curve.png")
# Log dataframe as CSV
feature_importance = pd.DataFrame({
'feature': feature_names,
'importance': model.feature_importances_
})
feature_importance.to_csv("feature_importance.csv", index=False)
mlflow.log_artifact("feature_importance.csv")
# Log entire directory
mlflow.log_artifacts("output_dir/", artifact_path="outputs")
Model Registry
Registering Models
from mlflow import MlflowClient import mlflow.sklearn
client = MlflowClient()
Method 1: Register during model logging
with mlflow.start_run(): mlflow.sklearn.log_model( sk_model=model, name="model", registered_model_name="CustomerSegmentationModel" )
Method 2: Register an existing model
run_id = "abc123" model_uri = f"runs:/{run_id}/model" registered_model = mlflow.register_model( model_uri=model_uri, name="CustomerSegmentationModel" )
Model Versioning and Aliases
from mlflow import MlflowClient
client = MlflowClient()
Create registered model
client.create_registered_model( name="FraudDetectionModel", description="ML model for detecting fraudulent transactions" )
Register version 1
model_uri_v1 = "runs:/run1/model" mv1 = client.create_model_version( name="FraudDetectionModel", source=model_uri_v1, run_id="run1" )
Set aliases for deployment management
client.set_registered_model_alias( name="FraudDetectionModel", alias="champion", # Production model version="1" )
client.set_registered_model_alias( name="FraudDetectionModel", alias="challenger", # A/B testing model version="2" )
Load model by alias
champion_model = mlflow.sklearn.load_model("models:/FraudDetectionModel@champion") challenger_model = mlflow.sklearn.load_model("models:/FraudDetectionModel@challenger")
Model Lifecycle Management
from mlflow import MlflowClient from mlflow.entities import LoggedModelStatus
client = MlflowClient()
Initialize model in PENDING state
model = mlflow.initialize_logged_model( name="neural_network_classifier", model_type="neural_network", tags={"architecture": "resnet", "dataset": "imagenet"} )
try: # Training and validation train_model() validate_model()
# Log model artifacts
mlflow.pytorch.log_model(
pytorch_model=model_instance,
name="model",
model_id=model.model_id
)
# Mark as ready
mlflow.finalize_logged_model(model.model_id, LoggedModelStatus.READY)
except Exception as e: # Mark as failed mlflow.finalize_logged_model(model.model_id, LoggedModelStatus.FAILED) raise
Model Metadata and Tags
from mlflow import MlflowClient
client = MlflowClient()
Set registered model tags
client.set_registered_model_tag( name="RecommendationModel", key="task", value="collaborative_filtering" )
client.set_registered_model_tag( name="RecommendationModel", key="business_unit", value="ecommerce" )
Set model version tags
client.set_model_version_tag( name="RecommendationModel", version="3", key="validation_status", value="approved" )
client.set_model_version_tag( name="RecommendationModel", version="3", key="approval_date", value="2024-01-15" )
Update model description
client.update_registered_model( name="RecommendationModel", description="Collaborative filtering model for product recommendations. Trained on user-item interaction data." )
Searching and Filtering Models
from mlflow import MlflowClient
client = MlflowClient()
Search registered models
models = client.search_registered_models( filter_string="name LIKE 'Production%'", max_results=10 )
Search model versions
versions = client.search_model_versions( filter_string="name='CustomerChurnModel' AND tags.validation_status='approved'" )
Get specific model version
model_version = client.get_model_version( name="CustomerChurnModel", version="5" )
Get model by alias
champion = client.get_model_version_by_alias( name="CustomerChurnModel", alias="champion" )
Deployment Patterns
Local Model Serving
import mlflow.pyfunc
Load model
model = mlflow.pyfunc.load_model("models:/CustomerChurnModel@production")
Make predictions
predictions = model.predict(data)
REST API Deployment
Serve model as REST API
mlflow models serve
--model-uri models:/CustomerChurnModel@production
--host 0.0.0.0
--port 5001
--workers 4
Client code to call the REST API
import requests import json
url = "http://localhost:5001/invocations" headers = {"Content-Type": "application/json"}
data = { "dataframe_split": { "columns": ["feature1", "feature2", "feature3"], "data": [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]] } }
response = requests.post(url, headers=headers, data=json.dumps(data)) predictions = response.json()
Docker Deployment
Build Docker image
mlflow models build-docker
--model-uri models:/CustomerChurnModel@production
--name customer-churn-model
Run container
docker run -p 8080:8080 customer-churn-model
AWS SageMaker Deployment
import mlflow.sagemaker
Deploy to SageMaker
mlflow.sagemaker.deploy( app_name="customer-churn-predictor", model_uri="models:/CustomerChurnModel@production", region_name="us-east-1", mode="create", execution_role_arn="arn:aws:iam::123456789:role/SageMakerRole", instance_type="ml.m5.xlarge", instance_count=2 )
Azure ML Deployment
import mlflow.azureml from azureml.core import Workspace from azureml.core.webservice import AciWebservice
Configure workspace
ws = Workspace.from_config()
Deploy to Azure Container Instance
aci_config = AciWebservice.deploy_configuration( cpu_cores=2, memory_gb=4, tags={"model": "churn-predictor"}, description="Customer churn prediction model" )
mlflow.azureml.deploy( model_uri="models:/CustomerChurnModel@production", workspace=ws, deployment_config=aci_config, service_name="churn-predictor-service" )
GCP Vertex AI Deployment
from google.cloud import aiplatform import mlflow
Initialize Vertex AI
aiplatform.init(project="my-project", location="us-central1")
Deploy to Vertex AI
model = mlflow.register_model( model_uri="runs:/run-id/model", name="CustomerChurnModel" )
Create Vertex AI endpoint
endpoint = aiplatform.Endpoint.create(display_name="churn-prediction-endpoint")
Deploy model
endpoint.deploy( model=model, deployed_model_display_name="churn-v1", machine_type="n1-standard-4", min_replica_count=1, max_replica_count=5 )
Batch Inference
import mlflow import pandas as pd
Load model
model = mlflow.pyfunc.load_model("models:/CustomerChurnModel@production")
Load batch data
batch_data = pd.read_csv("customer_batch.csv")
Process in chunks
chunk_size = 1000 predictions = []
for i in range(0, len(batch_data), chunk_size): chunk = batch_data[i:i+chunk_size] chunk_predictions = model.predict(chunk) predictions.extend(chunk_predictions)
Save results
results = pd.DataFrame({ 'customer_id': batch_data['customer_id'], 'churn_probability': predictions }) results.to_csv("churn_predictions.csv", index=False)
Monitoring and Observability
Model Performance Monitoring
import mlflow from datetime import datetime import pandas as pd from sklearn.metrics import accuracy_score, precision_score, recall_score
class ModelMonitor: def init(self, model_name, tracking_uri): self.model_name = model_name mlflow.set_tracking_uri(tracking_uri) mlflow.set_experiment(f"{model_name}-monitoring")
def log_prediction_metrics(self, y_true, y_pred, timestamp=None):
"""Log prediction metrics for monitoring"""
if timestamp is None:
timestamp = datetime.now()
with mlflow.start_run(run_name=f"monitoring-{timestamp}"):
# Calculate metrics
metrics = {
"accuracy": accuracy_score(y_true, y_pred),
"precision": precision_score(y_true, y_pred, average='weighted'),
"recall": recall_score(y_true, y_pred, average='weighted')
}
# Log metrics
mlflow.log_metrics(metrics)
mlflow.log_param("timestamp", timestamp.isoformat())
mlflow.log_param("num_predictions", len(y_pred))
# Check for drift
if metrics["accuracy"] < 0.85:
mlflow.set_tag("alert", "performance_degradation")
def log_data_drift(self, reference_data, current_data):
"""Monitor for data drift"""
with mlflow.start_run(run_name="data-drift-check"):
# Calculate distribution statistics
for col in reference_data.columns:
ref_mean = reference_data[col].mean()
curr_mean = current_data[col].mean()
drift_percent = abs((curr_mean - ref_mean) / ref_mean) * 100
mlflow.log_metric(f"{col}_drift_percent", drift_percent)
if drift_percent > 20:
mlflow.set_tag(f"{col}_drift_alert", "high")
Usage
monitor = ModelMonitor("CustomerChurnModel", "http://localhost:5000") monitor.log_prediction_metrics(y_true, y_pred)
Prediction Logging
import mlflow from datetime import datetime import json
def log_predictions(model_name, inputs, predictions, metadata=None): """Log predictions for auditing and monitoring""" mlflow.set_experiment(f"{model_name}-predictions")
with mlflow.start_run(run_name=f"prediction-{datetime.now().isoformat()}"):
# Log prediction data
mlflow.log_param("num_predictions", len(predictions))
mlflow.log_param("model_name", model_name)
# Log metadata
if metadata:
mlflow.log_params(metadata)
# Log input/output samples
sample_data = {
"inputs": inputs[:5].tolist() if hasattr(inputs, 'tolist') else inputs[:5],
"predictions": predictions[:5].tolist() if hasattr(predictions, 'tolist') else predictions[:5]
}
with open("prediction_sample.json", "w") as f:
json.dump(sample_data, f)
mlflow.log_artifact("prediction_sample.json")
Model Explainability Tracking
import mlflow import shap import matplotlib.pyplot as plt
def log_model_explanations(model, X_test, feature_names): """Log SHAP explanations for model interpretability""" with mlflow.start_run(): # Calculate SHAP values explainer = shap.TreeExplainer(model) shap_values = explainer.shap_values(X_test)
# Create summary plot
plt.figure()
shap.summary_plot(shap_values, X_test, feature_names=feature_names, show=False)
plt.savefig("shap_summary.png", bbox_inches='tight')
mlflow.log_artifact("shap_summary.png")
# Log feature importance
feature_importance = dict(zip(feature_names, model.feature_importances_))
mlflow.log_params({f"importance_{k}": v for k, v in feature_importance.items()})
A/B Testing
A/B Test Framework
import mlflow import numpy as np from datetime import datetime
class ABTestFramework: def init(self, model_a_uri, model_b_uri, traffic_split=0.5): self.model_a = mlflow.pyfunc.load_model(model_a_uri) self.model_b = mlflow.pyfunc.load_model(model_b_uri) self.traffic_split = traffic_split
mlflow.set_experiment("ab-testing")
def predict(self, data, user_id=None):
"""Route traffic between models and log results"""
# Determine which model to use
if user_id is None or hash(user_id) % 100 < self.traffic_split * 100:
model_name = "model_a"
prediction = self.model_a.predict(data)
else:
model_name = "model_b"
prediction = self.model_b.predict(data)
# Log the prediction
with mlflow.start_run(run_name=f"ab-test-{datetime.now().isoformat()}"):
mlflow.log_param("model_variant", model_name)
mlflow.log_param("user_id", user_id)
mlflow.log_metric("prediction", float(prediction[0]))
return prediction
def evaluate_test(self, results_a, results_b):
"""Evaluate A/B test results"""
with mlflow.start_run(run_name="ab-test-evaluation"):
# Calculate metrics for both variants
metrics_a = {
"mean_a": np.mean(results_a),
"std_a": np.std(results_a),
"count_a": len(results_a)
}
metrics_b = {
"mean_b": np.mean(results_b),
"std_b": np.std(results_b),
"count_b": len(results_b)
}
# Statistical test
from scipy import stats
t_stat, p_value = stats.ttest_ind(results_a, results_b)
mlflow.log_metrics({**metrics_a, **metrics_b})
mlflow.log_metric("t_statistic", t_stat)
mlflow.log_metric("p_value", p_value)
# Determine winner
if p_value < 0.05:
winner = "model_a" if np.mean(results_a) > np.mean(results_b) else "model_b"
mlflow.set_tag("winner", winner)
mlflow.set_tag("significant", "yes")
else:
mlflow.set_tag("significant", "no")
Usage
ab_test = ABTestFramework( model_a_uri="models:/CustomerChurnModel@champion", model_b_uri="models:/CustomerChurnModel@challenger", traffic_split=0.5 )
prediction = ab_test.predict(customer_data, user_id="user123")
Multi-Armed Bandit Testing
import mlflow import numpy as np from scipy.stats import beta
class MultiArmedBandit: def init(self, model_uris): self.models = [mlflow.pyfunc.load_model(uri) for uri in model_uris] self.successes = [1] * len(model_uris) # Prior self.failures = [1] * len(model_uris) # Prior
mlflow.set_experiment("mab-testing")
def select_model(self):
"""Thompson sampling to select model"""
samples = [
np.random.beta(s, f)
for s, f in zip(self.successes, self.failures)
]
return np.argmax(samples)
def predict_and_update(self, data, actual_outcome=None):
"""Make prediction and update model performance"""
model_idx = self.select_model()
prediction = self.models[model_idx].predict(data)
with mlflow.start_run(run_name=f"mab-prediction"):
mlflow.log_param("selected_model", model_idx)
mlflow.log_metric("prediction", float(prediction[0]))
# Update based on outcome
if actual_outcome is not None:
if actual_outcome == prediction[0]:
self.successes[model_idx] += 1
else:
self.failures[model_idx] += 1
mlflow.log_metric("success_rate",
self.successes[model_idx] / (self.successes[model_idx] + self.failures[model_idx]))
return prediction
Feature Stores
Feature Store Integration
import mlflow from datetime import datetime import pandas as pd
class FeatureStore: def init(self, storage_path): self.storage_path = storage_path mlflow.set_experiment("feature-store")
def create_feature_set(self, name, df, description=None):
"""Create and version a feature set"""
with mlflow.start_run(run_name=f"feature-set-{name}"):
# Save features
feature_path = f"{self.storage_path}/{name}_{datetime.now().isoformat()}.parquet"
df.to_parquet(feature_path)
# Log metadata
mlflow.log_param("feature_set_name", name)
mlflow.log_param("num_features", len(df.columns))
mlflow.log_param("num_samples", len(df))
mlflow.log_param("description", description or "")
# Log feature statistics
stats = df.describe().to_dict()
mlflow.log_dict(stats, "feature_stats.json")
# Log artifact
mlflow.log_artifact(feature_path)
return feature_path
def get_features(self, run_id):
"""Retrieve feature set by run ID"""
client = mlflow.MlflowClient()
run = client.get_run(run_id)
artifact_uri = run.info.artifact_uri
# Download and load features
local_path = mlflow.artifacts.download_artifacts(artifact_uri)
df = pd.read_parquet(local_path)
return df
Usage
store = FeatureStore("s3://my-bucket/features")
Create features
features = pd.DataFrame({ 'customer_id': range(1000), 'lifetime_value': np.random.rand(1000) * 1000, 'avg_purchase': np.random.rand(1000) * 100, 'days_since_last_purchase': np.random.randint(0, 365, 1000) })
feature_path = store.create_feature_set( name="customer_features", df=features, description="Customer behavioral features for churn prediction" )
Feature Engineering Pipeline
import mlflow from sklearn.preprocessing import StandardScaler from sklearn.decomposition import PCA
def feature_engineering_pipeline(data, run_name="feature-engineering"): """Log feature engineering steps""" with mlflow.start_run(run_name=run_name): # Original features mlflow.log_param("original_features", len(data.columns))
# Scaling
scaler = StandardScaler()
scaled_data = scaler.fit_transform(data)
mlflow.sklearn.log_model(scaler, "scaler")
# Dimensionality reduction
pca = PCA(n_components=0.95)
transformed_data = pca.fit_transform(scaled_data)
mlflow.sklearn.log_model(pca, "pca")
mlflow.log_param("final_features", transformed_data.shape[1])
mlflow.log_metric("variance_explained", pca.explained_variance_ratio_.sum())
return transformed_data
Usage
transformed_features = feature_engineering_pipeline(raw_data)
CI/CD for ML
Training Pipeline
import mlflow from sklearn.model_selection import cross_val_score from sklearn.ensemble import RandomForestClassifier
def training_pipeline(data_path, model_params, validation_threshold=0.85): """Automated training pipeline with validation gates"""
mlflow.set_experiment("production-training-pipeline")
with mlflow.start_run(run_name="pipeline-run"):
# Load data
data = pd.read_csv(data_path)
X = data.drop('target', axis=1)
y = data['target']
# Log data version
mlflow.log_param("data_version", data_path.split('/')[-1])
mlflow.log_param("data_samples", len(data))
# Train model
model = RandomForestClassifier(**model_params)
model.fit(X, y)
# Cross-validation
cv_scores = cross_val_score(model, X, y, cv=5)
mean_cv_score = cv_scores.mean()
mlflow.log_params(model_params)
mlflow.log_metric("cv_score_mean", mean_cv_score)
mlflow.log_metric("cv_score_std", cv_scores.std())
# Validation gate
if mean_cv_score >= validation_threshold:
# Log model
mlflow.sklearn.log_model(
sk_model=model,
name="model",
registered_model_name="ProductionModel"
)
mlflow.set_tag("status", "passed")
return True
else:
mlflow.set_tag("status", "failed")
mlflow.set_tag("failure_reason", "below_threshold")
return False
Usage in CI/CD
success = training_pipeline( data_path="data/training_data_v2.csv", model_params={'n_estimators': 100, 'max_depth': 10}, validation_threshold=0.85 )
if not success: raise ValueError("Model did not meet validation criteria")
Model Promotion Pipeline
from mlflow import MlflowClient
def promote_model_to_production(model_name, version, validation_results): """Promote model through stages with validation"""
client = MlflowClient()
# Validation checks
required_metrics = ['accuracy', 'precision', 'recall']
for metric in required_metrics:
if metric not in validation_results:
raise ValueError(f"Missing required metric: {metric}")
if validation_results[metric] < 0.8:
raise ValueError(f"{metric} below threshold: {validation_results[metric]}")
# Set tags
for metric, value in validation_results.items():
client.set_model_version_tag(
name=model_name,
version=version,
key=f"validation_{metric}",
value=str(value)
)
# Promote to production
client.set_registered_model_alias(
name=model_name,
alias="production",
version=version
)
# Tag with promotion metadata
client.set_model_version_tag(
name=model_name,
version=version,
key="promoted_at",
value=datetime.now().isoformat()
)
return True
Usage
validation_results = { 'accuracy': 0.92, 'precision': 0.89, 'recall': 0.91 }
promote_model_to_production( model_name="FraudDetectionModel", version="5", validation_results=validation_results )
Automated Model Retraining
import mlflow import schedule import time
class AutomatedRetrainer: def init(self, model_name, data_source, schedule_interval="daily"): self.model_name = model_name self.data_source = data_source self.schedule_interval = schedule_interval
mlflow.set_experiment(f"{model_name}-retraining")
def retrain(self):
"""Retrain model with latest data"""
with mlflow.start_run(run_name=f"retrain-{datetime.now().isoformat()}"):
# Load latest data
data = self.load_latest_data()
# Get current production model
client = MlflowClient()
current_model = client.get_model_version_by_alias(
self.model_name, "production"
)
# Load and evaluate current model
current_model_obj = mlflow.sklearn.load_model(
f"models:/{self.model_name}@production"
)
current_score = current_model_obj.score(X_test, y_test)
mlflow.log_metric("current_production_score", current_score)
# Train new model
new_model = self.train_model(data)
new_score = new_model.score(X_test, y_test)
mlflow.log_metric("new_model_score", new_score)
# Compare and promote if better
if new_score > current_score:
mlflow.sklearn.log_model(
sk_model=new_model,
name="model",
registered_model_name=self.model_name
)
mlflow.set_tag("status", "promoted")
else:
mlflow.set_tag("status", "not_promoted")
def start_scheduled_retraining(self):
"""Start scheduled retraining"""
if self.schedule_interval == "daily":
schedule.every().day.at("02:00").do(self.retrain)
elif self.schedule_interval == "weekly":
schedule.every().monday.at("02:00").do(self.retrain)
while True:
schedule.run_pending()
time.sleep(3600)
Usage
retrainer = AutomatedRetrainer( model_name="CustomerChurnModel", data_source="s3://bucket/data", schedule_interval="daily" )
Production Best Practices
Model Signatures
from mlflow.models import infer_signature, ModelSignature from mlflow.types import Schema, ColSpec import mlflow.sklearn import numpy as np
Method 1: Infer signature from data
signature = infer_signature(X_train, model.predict(X_train))
Method 2: Define explicit signature
input_schema = Schema([ ColSpec("double", "age"), ColSpec("double", "income"), ColSpec("string", "customer_segment") ])
output_schema = Schema([ColSpec("double")])
signature = ModelSignature(inputs=input_schema, outputs=output_schema)
Log model with signature
mlflow.sklearn.log_model( sk_model=model, name="model", signature=signature, input_example=X_train[:5] )
Model Validation Framework
import mlflow from sklearn.metrics import classification_report import json
class ModelValidator: def init(self, thresholds): self.thresholds = thresholds
def validate(self, model, X_test, y_test):
"""Comprehensive model validation"""
results = {}
with mlflow.start_run(run_name="model-validation"):
# Performance metrics
predictions = model.predict(X_test)
report = classification_report(y_test, predictions, output_dict=True)
# Check thresholds
passed = True
for metric, threshold in self.thresholds.items():
value = report['weighted avg'][metric]
results[metric] = value
mlflow.log_metric(metric, value)
if value < threshold:
passed = False
mlflow.set_tag(f"{metric}_failed", "true")
# Detailed report
with open("validation_report.json", "w") as f:
json.dump(report, f, indent=2)
mlflow.log_artifact("validation_report.json")
mlflow.set_tag("validation_passed", str(passed))
return passed, results
Usage
validator = ModelValidator(thresholds={ 'precision': 0.85, 'recall': 0.80, 'f1-score': 0.82 })
passed, results = validator.validate(model, X_test, y_test)
Error Handling and Logging
import mlflow import logging from functools import wraps
def mlflow_error_handler(func): """Decorator for MLflow error handling""" @wraps(func) def wrapper(*args, **kwargs): with mlflow.start_run(run_name=f"{func.name}"): try: result = func(*args, **kwargs) mlflow.set_tag("status", "success") return result
except Exception as e:
# Log error
mlflow.set_tag("status", "failed")
mlflow.set_tag("error_type", type(e).__name__)
mlflow.set_tag("error_message", str(e))
# Log traceback
import traceback
tb = traceback.format_exc()
with open("error_traceback.txt", "w") as f:
f.write(tb)
mlflow.log_artifact("error_traceback.txt")
logging.error(f"Error in {func.__name__}: {str(e)}")
raise
return wrapper
@mlflow_error_handler def train_model_with_error_handling(data): # Training code model = RandomForestClassifier() model.fit(X, y) return model
Model Performance Baseline
import mlflow from sklearn.dummy import DummyClassifier
def establish_baseline(X_train, y_train, X_test, y_test): """Establish baseline model performance""" mlflow.set_experiment("baseline-models")
strategies = ['most_frequent', 'stratified', 'uniform']
for strategy in strategies:
with mlflow.start_run(run_name=f"baseline-{strategy}"):
baseline = DummyClassifier(strategy=strategy)
baseline.fit(X_train, y_train)
score = baseline.score(X_test, y_test)
mlflow.log_param("strategy", strategy)
mlflow.log_metric("accuracy", score)
mlflow.sklearn.log_model(
sk_model=baseline,
name="baseline_model",
registered_model_name=f"Baseline-{strategy}"
)
Usage
establish_baseline(X_train, y_train, X_test, y_test)
Summary
This comprehensive guide covers production-grade MLOps workflows using MLflow:
-
Experiment Tracking: Log parameters, metrics, and artifacts systematically
-
Model Registry: Centralized model versioning and lifecycle management
-
Deployment: Multiple deployment patterns for various platforms
-
Monitoring: Track model performance and data drift in production
-
A/B Testing: Compare model variants in production
-
Feature Stores: Version and manage feature engineering
-
CI/CD: Automated training, validation, and promotion pipelines
-
Best Practices: Signatures, validation, error handling, and baselines
These patterns enable teams to build robust, scalable ML systems from experimentation through production deployment and monitoring.