mlops-workflows

MLOps Workflows with MLflow

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "mlops-workflows" with this command: npx skills add manutej/luxor-claude-marketplace/manutej-luxor-claude-marketplace-mlops-workflows

MLOps Workflows with MLflow

A comprehensive guide to production-grade MLOps workflows covering the complete machine learning lifecycle from experimentation to production deployment and monitoring.

Table of Contents

  • MLflow Components Overview

  • Experiment Tracking

  • Model Registry

  • Deployment Patterns

  • Monitoring and Observability

  • A/B Testing

  • Feature Stores

  • CI/CD for ML

  • Model Versioning

  • Production Best Practices

MLflow Components Overview

MLflow consists of four primary components for managing the ML lifecycle:

  1. MLflow Tracking

Track experiments, parameters, metrics, and artifacts during model development.

import mlflow

Set tracking URI

mlflow.set_tracking_uri("http://localhost:5000")

Create or set experiment

mlflow.set_experiment("production-models")

Start a run

with mlflow.start_run(run_name="baseline-model"): # Log parameters mlflow.log_param("learning_rate", 0.01) mlflow.log_param("batch_size", 32)

# Log metrics
mlflow.log_metric("accuracy", 0.95)
mlflow.log_metric("loss", 0.05)

# Log artifacts
mlflow.log_artifact("model_plot.png")

2. MLflow Projects

Package ML code in a reusable, reproducible format.

MLproject file

name: my-ml-project conda_env: conda.yaml

entry_points: main: parameters: learning_rate: {type: float, default: 0.01} epochs: {type: int, default: 100} command: "python train.py --lr {learning_rate} --epochs {epochs}"

evaluate: parameters: model_uri: {type: string} command: "python evaluate.py --model-uri {model_uri}"

  1. MLflow Models

Package models in a standard format for deployment across platforms.

import mlflow.sklearn from sklearn.ensemble import RandomForestClassifier

Train model

model = RandomForestClassifier() model.fit(X_train, y_train)

Log model with signature

from mlflow.models import infer_signature signature = infer_signature(X_train, model.predict(X_train))

mlflow.sklearn.log_model( sk_model=model, name="random-forest-model", signature=signature, input_example=X_train[:5], registered_model_name="ProductionClassifier" )

  1. MLflow Registry

Centralized model store for managing model lifecycle and versioning.

from mlflow import MlflowClient

client = MlflowClient()

Register model

model_uri = f"runs:/{run_id}/model" registered_model = mlflow.register_model( model_uri=model_uri, name="CustomerChurnModel" )

Set model alias for deployment

client.set_registered_model_alias( name="CustomerChurnModel", alias="production", version=registered_model.version )

Experiment Tracking

Basic Experiment Tracking

import mlflow import mlflow.sklearn from sklearn.ensemble import RandomForestRegressor from sklearn.metrics import mean_squared_error, r2_score from sklearn.model_selection import train_test_split

Configure MLflow

mlflow.set_tracking_uri("http://localhost:5000") mlflow.set_experiment("house-price-prediction")

Load and prepare data

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

Training with MLflow tracking

with mlflow.start_run(run_name="rf-baseline"): # Define parameters params = { "n_estimators": 100, "max_depth": 10, "min_samples_split": 5, "random_state": 42 }

# Train model
model = RandomForestRegressor(**params)
model.fit(X_train, y_train)

# Evaluate
predictions = model.predict(X_test)
mse = mean_squared_error(y_test, predictions)
r2 = r2_score(y_test, predictions)

# Log everything
mlflow.log_params(params)
mlflow.log_metrics({
    "mse": mse,
    "r2": r2,
    "rmse": mse ** 0.5
})

# Log model
mlflow.sklearn.log_model(
    sk_model=model,
    name="model",
    registered_model_name="HousePricePredictor"
)

Autologging

MLflow provides automatic logging for popular frameworks:

import mlflow from sklearn.ensemble import RandomForestClassifier

Enable autologging for scikit-learn

mlflow.sklearn.autolog()

Your training code - everything is logged automatically

with mlflow.start_run(): model = RandomForestClassifier(n_estimators=100, max_depth=5) model.fit(X_train, y_train) predictions = model.predict(X_test)

Nested Runs for Hyperparameter Tuning

import mlflow from sklearn.model_selection import GridSearchCV from sklearn.ensemble import GradientBoostingClassifier

mlflow.set_experiment("hyperparameter-tuning")

Parent run for the entire tuning process

with mlflow.start_run(run_name="grid-search-parent"): param_grid = { 'learning_rate': [0.01, 0.1, 0.3], 'n_estimators': [50, 100, 200], 'max_depth': [3, 5, 7] }

# Log parent parameters
mlflow.log_param("tuning_method", "grid_search")
mlflow.log_param("cv_folds", 5)

best_score = 0
best_params = None

# Nested runs for each parameter combination
for lr in param_grid['learning_rate']:
    for n_est in param_grid['n_estimators']:
        for depth in param_grid['max_depth']:
            with mlflow.start_run(nested=True, run_name=f"lr{lr}_n{n_est}_d{depth}"):
                params = {
                    'learning_rate': lr,
                    'n_estimators': n_est,
                    'max_depth': depth
                }

                model = GradientBoostingClassifier(**params)
                model.fit(X_train, y_train)
                score = model.score(X_test, y_test)

                mlflow.log_params(params)
                mlflow.log_metric("accuracy", score)

                if score > best_score:
                    best_score = score
                    best_params = params

# Log best results in parent run
mlflow.log_params({f"best_{k}": v for k, v in best_params.items()})
mlflow.log_metric("best_accuracy", best_score)

Tracking Multiple Metrics Over Time

import mlflow import numpy as np

with mlflow.start_run(): # Log metrics at different steps (epochs) for epoch in range(100): train_loss = np.random.random() * (1 - epoch/100) val_loss = np.random.random() * (1 - epoch/100) + 0.1

    mlflow.log_metric("train_loss", train_loss, step=epoch)
    mlflow.log_metric("val_loss", val_loss, step=epoch)
    mlflow.log_metric("learning_rate", 0.01 * (0.95 ** epoch), step=epoch)

Logging Artifacts

import mlflow import matplotlib.pyplot as plt import pandas as pd

with mlflow.start_run(): # Log plot plt.figure(figsize=(10, 6)) plt.plot(history['loss'], label='Training Loss') plt.plot(history['val_loss'], label='Validation Loss') plt.legend() plt.savefig("loss_curve.png") mlflow.log_artifact("loss_curve.png")

# Log dataframe as CSV
feature_importance = pd.DataFrame({
    'feature': feature_names,
    'importance': model.feature_importances_
})
feature_importance.to_csv("feature_importance.csv", index=False)
mlflow.log_artifact("feature_importance.csv")

# Log entire directory
mlflow.log_artifacts("output_dir/", artifact_path="outputs")

Model Registry

Registering Models

from mlflow import MlflowClient import mlflow.sklearn

client = MlflowClient()

Method 1: Register during model logging

with mlflow.start_run(): mlflow.sklearn.log_model( sk_model=model, name="model", registered_model_name="CustomerSegmentationModel" )

Method 2: Register an existing model

run_id = "abc123" model_uri = f"runs:/{run_id}/model" registered_model = mlflow.register_model( model_uri=model_uri, name="CustomerSegmentationModel" )

Model Versioning and Aliases

from mlflow import MlflowClient

client = MlflowClient()

Create registered model

client.create_registered_model( name="FraudDetectionModel", description="ML model for detecting fraudulent transactions" )

Register version 1

model_uri_v1 = "runs:/run1/model" mv1 = client.create_model_version( name="FraudDetectionModel", source=model_uri_v1, run_id="run1" )

Set aliases for deployment management

client.set_registered_model_alias( name="FraudDetectionModel", alias="champion", # Production model version="1" )

client.set_registered_model_alias( name="FraudDetectionModel", alias="challenger", # A/B testing model version="2" )

Load model by alias

champion_model = mlflow.sklearn.load_model("models:/FraudDetectionModel@champion") challenger_model = mlflow.sklearn.load_model("models:/FraudDetectionModel@challenger")

Model Lifecycle Management

from mlflow import MlflowClient from mlflow.entities import LoggedModelStatus

client = MlflowClient()

Initialize model in PENDING state

model = mlflow.initialize_logged_model( name="neural_network_classifier", model_type="neural_network", tags={"architecture": "resnet", "dataset": "imagenet"} )

try: # Training and validation train_model() validate_model()

# Log model artifacts
mlflow.pytorch.log_model(
    pytorch_model=model_instance,
    name="model",
    model_id=model.model_id
)

# Mark as ready
mlflow.finalize_logged_model(model.model_id, LoggedModelStatus.READY)

except Exception as e: # Mark as failed mlflow.finalize_logged_model(model.model_id, LoggedModelStatus.FAILED) raise

Model Metadata and Tags

from mlflow import MlflowClient

client = MlflowClient()

Set registered model tags

client.set_registered_model_tag( name="RecommendationModel", key="task", value="collaborative_filtering" )

client.set_registered_model_tag( name="RecommendationModel", key="business_unit", value="ecommerce" )

Set model version tags

client.set_model_version_tag( name="RecommendationModel", version="3", key="validation_status", value="approved" )

client.set_model_version_tag( name="RecommendationModel", version="3", key="approval_date", value="2024-01-15" )

Update model description

client.update_registered_model( name="RecommendationModel", description="Collaborative filtering model for product recommendations. Trained on user-item interaction data." )

Searching and Filtering Models

from mlflow import MlflowClient

client = MlflowClient()

Search registered models

models = client.search_registered_models( filter_string="name LIKE 'Production%'", max_results=10 )

Search model versions

versions = client.search_model_versions( filter_string="name='CustomerChurnModel' AND tags.validation_status='approved'" )

Get specific model version

model_version = client.get_model_version( name="CustomerChurnModel", version="5" )

Get model by alias

champion = client.get_model_version_by_alias( name="CustomerChurnModel", alias="champion" )

Deployment Patterns

Local Model Serving

import mlflow.pyfunc

Load model

model = mlflow.pyfunc.load_model("models:/CustomerChurnModel@production")

Make predictions

predictions = model.predict(data)

REST API Deployment

Serve model as REST API

mlflow models serve
--model-uri models:/CustomerChurnModel@production
--host 0.0.0.0
--port 5001
--workers 4

Client code to call the REST API

import requests import json

url = "http://localhost:5001/invocations" headers = {"Content-Type": "application/json"}

data = { "dataframe_split": { "columns": ["feature1", "feature2", "feature3"], "data": [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]] } }

response = requests.post(url, headers=headers, data=json.dumps(data)) predictions = response.json()

Docker Deployment

Build Docker image

mlflow models build-docker
--model-uri models:/CustomerChurnModel@production
--name customer-churn-model

Run container

docker run -p 8080:8080 customer-churn-model

AWS SageMaker Deployment

import mlflow.sagemaker

Deploy to SageMaker

mlflow.sagemaker.deploy( app_name="customer-churn-predictor", model_uri="models:/CustomerChurnModel@production", region_name="us-east-1", mode="create", execution_role_arn="arn:aws:iam::123456789:role/SageMakerRole", instance_type="ml.m5.xlarge", instance_count=2 )

Azure ML Deployment

import mlflow.azureml from azureml.core import Workspace from azureml.core.webservice import AciWebservice

Configure workspace

ws = Workspace.from_config()

Deploy to Azure Container Instance

aci_config = AciWebservice.deploy_configuration( cpu_cores=2, memory_gb=4, tags={"model": "churn-predictor"}, description="Customer churn prediction model" )

mlflow.azureml.deploy( model_uri="models:/CustomerChurnModel@production", workspace=ws, deployment_config=aci_config, service_name="churn-predictor-service" )

GCP Vertex AI Deployment

from google.cloud import aiplatform import mlflow

Initialize Vertex AI

aiplatform.init(project="my-project", location="us-central1")

Deploy to Vertex AI

model = mlflow.register_model( model_uri="runs:/run-id/model", name="CustomerChurnModel" )

Create Vertex AI endpoint

endpoint = aiplatform.Endpoint.create(display_name="churn-prediction-endpoint")

Deploy model

endpoint.deploy( model=model, deployed_model_display_name="churn-v1", machine_type="n1-standard-4", min_replica_count=1, max_replica_count=5 )

Batch Inference

import mlflow import pandas as pd

Load model

model = mlflow.pyfunc.load_model("models:/CustomerChurnModel@production")

Load batch data

batch_data = pd.read_csv("customer_batch.csv")

Process in chunks

chunk_size = 1000 predictions = []

for i in range(0, len(batch_data), chunk_size): chunk = batch_data[i:i+chunk_size] chunk_predictions = model.predict(chunk) predictions.extend(chunk_predictions)

Save results

results = pd.DataFrame({ 'customer_id': batch_data['customer_id'], 'churn_probability': predictions }) results.to_csv("churn_predictions.csv", index=False)

Monitoring and Observability

Model Performance Monitoring

import mlflow from datetime import datetime import pandas as pd from sklearn.metrics import accuracy_score, precision_score, recall_score

class ModelMonitor: def init(self, model_name, tracking_uri): self.model_name = model_name mlflow.set_tracking_uri(tracking_uri) mlflow.set_experiment(f"{model_name}-monitoring")

def log_prediction_metrics(self, y_true, y_pred, timestamp=None):
    """Log prediction metrics for monitoring"""
    if timestamp is None:
        timestamp = datetime.now()

    with mlflow.start_run(run_name=f"monitoring-{timestamp}"):
        # Calculate metrics
        metrics = {
            "accuracy": accuracy_score(y_true, y_pred),
            "precision": precision_score(y_true, y_pred, average='weighted'),
            "recall": recall_score(y_true, y_pred, average='weighted')
        }

        # Log metrics
        mlflow.log_metrics(metrics)
        mlflow.log_param("timestamp", timestamp.isoformat())
        mlflow.log_param("num_predictions", len(y_pred))

        # Check for drift
        if metrics["accuracy"] < 0.85:
            mlflow.set_tag("alert", "performance_degradation")

def log_data_drift(self, reference_data, current_data):
    """Monitor for data drift"""
    with mlflow.start_run(run_name="data-drift-check"):
        # Calculate distribution statistics
        for col in reference_data.columns:
            ref_mean = reference_data[col].mean()
            curr_mean = current_data[col].mean()
            drift_percent = abs((curr_mean - ref_mean) / ref_mean) * 100

            mlflow.log_metric(f"{col}_drift_percent", drift_percent)

            if drift_percent > 20:
                mlflow.set_tag(f"{col}_drift_alert", "high")

Usage

monitor = ModelMonitor("CustomerChurnModel", "http://localhost:5000") monitor.log_prediction_metrics(y_true, y_pred)

Prediction Logging

import mlflow from datetime import datetime import json

def log_predictions(model_name, inputs, predictions, metadata=None): """Log predictions for auditing and monitoring""" mlflow.set_experiment(f"{model_name}-predictions")

with mlflow.start_run(run_name=f"prediction-{datetime.now().isoformat()}"):
    # Log prediction data
    mlflow.log_param("num_predictions", len(predictions))
    mlflow.log_param("model_name", model_name)

    # Log metadata
    if metadata:
        mlflow.log_params(metadata)

    # Log input/output samples
    sample_data = {
        "inputs": inputs[:5].tolist() if hasattr(inputs, 'tolist') else inputs[:5],
        "predictions": predictions[:5].tolist() if hasattr(predictions, 'tolist') else predictions[:5]
    }

    with open("prediction_sample.json", "w") as f:
        json.dump(sample_data, f)
    mlflow.log_artifact("prediction_sample.json")

Model Explainability Tracking

import mlflow import shap import matplotlib.pyplot as plt

def log_model_explanations(model, X_test, feature_names): """Log SHAP explanations for model interpretability""" with mlflow.start_run(): # Calculate SHAP values explainer = shap.TreeExplainer(model) shap_values = explainer.shap_values(X_test)

    # Create summary plot
    plt.figure()
    shap.summary_plot(shap_values, X_test, feature_names=feature_names, show=False)
    plt.savefig("shap_summary.png", bbox_inches='tight')
    mlflow.log_artifact("shap_summary.png")

    # Log feature importance
    feature_importance = dict(zip(feature_names, model.feature_importances_))
    mlflow.log_params({f"importance_{k}": v for k, v in feature_importance.items()})

A/B Testing

A/B Test Framework

import mlflow import numpy as np from datetime import datetime

class ABTestFramework: def init(self, model_a_uri, model_b_uri, traffic_split=0.5): self.model_a = mlflow.pyfunc.load_model(model_a_uri) self.model_b = mlflow.pyfunc.load_model(model_b_uri) self.traffic_split = traffic_split

    mlflow.set_experiment("ab-testing")

def predict(self, data, user_id=None):
    """Route traffic between models and log results"""
    # Determine which model to use
    if user_id is None or hash(user_id) % 100 < self.traffic_split * 100:
        model_name = "model_a"
        prediction = self.model_a.predict(data)
    else:
        model_name = "model_b"
        prediction = self.model_b.predict(data)

    # Log the prediction
    with mlflow.start_run(run_name=f"ab-test-{datetime.now().isoformat()}"):
        mlflow.log_param("model_variant", model_name)
        mlflow.log_param("user_id", user_id)
        mlflow.log_metric("prediction", float(prediction[0]))

    return prediction

def evaluate_test(self, results_a, results_b):
    """Evaluate A/B test results"""
    with mlflow.start_run(run_name="ab-test-evaluation"):
        # Calculate metrics for both variants
        metrics_a = {
            "mean_a": np.mean(results_a),
            "std_a": np.std(results_a),
            "count_a": len(results_a)
        }

        metrics_b = {
            "mean_b": np.mean(results_b),
            "std_b": np.std(results_b),
            "count_b": len(results_b)
        }

        # Statistical test
        from scipy import stats
        t_stat, p_value = stats.ttest_ind(results_a, results_b)

        mlflow.log_metrics({**metrics_a, **metrics_b})
        mlflow.log_metric("t_statistic", t_stat)
        mlflow.log_metric("p_value", p_value)

        # Determine winner
        if p_value < 0.05:
            winner = "model_a" if np.mean(results_a) > np.mean(results_b) else "model_b"
            mlflow.set_tag("winner", winner)
            mlflow.set_tag("significant", "yes")
        else:
            mlflow.set_tag("significant", "no")

Usage

ab_test = ABTestFramework( model_a_uri="models:/CustomerChurnModel@champion", model_b_uri="models:/CustomerChurnModel@challenger", traffic_split=0.5 )

prediction = ab_test.predict(customer_data, user_id="user123")

Multi-Armed Bandit Testing

import mlflow import numpy as np from scipy.stats import beta

class MultiArmedBandit: def init(self, model_uris): self.models = [mlflow.pyfunc.load_model(uri) for uri in model_uris] self.successes = [1] * len(model_uris) # Prior self.failures = [1] * len(model_uris) # Prior

    mlflow.set_experiment("mab-testing")

def select_model(self):
    """Thompson sampling to select model"""
    samples = [
        np.random.beta(s, f)
        for s, f in zip(self.successes, self.failures)
    ]
    return np.argmax(samples)

def predict_and_update(self, data, actual_outcome=None):
    """Make prediction and update model performance"""
    model_idx = self.select_model()
    prediction = self.models[model_idx].predict(data)

    with mlflow.start_run(run_name=f"mab-prediction"):
        mlflow.log_param("selected_model", model_idx)
        mlflow.log_metric("prediction", float(prediction[0]))

        # Update based on outcome
        if actual_outcome is not None:
            if actual_outcome == prediction[0]:
                self.successes[model_idx] += 1
            else:
                self.failures[model_idx] += 1

            mlflow.log_metric("success_rate",
                self.successes[model_idx] / (self.successes[model_idx] + self.failures[model_idx]))

    return prediction

Feature Stores

Feature Store Integration

import mlflow from datetime import datetime import pandas as pd

class FeatureStore: def init(self, storage_path): self.storage_path = storage_path mlflow.set_experiment("feature-store")

def create_feature_set(self, name, df, description=None):
    """Create and version a feature set"""
    with mlflow.start_run(run_name=f"feature-set-{name}"):
        # Save features
        feature_path = f"{self.storage_path}/{name}_{datetime.now().isoformat()}.parquet"
        df.to_parquet(feature_path)

        # Log metadata
        mlflow.log_param("feature_set_name", name)
        mlflow.log_param("num_features", len(df.columns))
        mlflow.log_param("num_samples", len(df))
        mlflow.log_param("description", description or "")

        # Log feature statistics
        stats = df.describe().to_dict()
        mlflow.log_dict(stats, "feature_stats.json")

        # Log artifact
        mlflow.log_artifact(feature_path)

        return feature_path

def get_features(self, run_id):
    """Retrieve feature set by run ID"""
    client = mlflow.MlflowClient()
    run = client.get_run(run_id)
    artifact_uri = run.info.artifact_uri

    # Download and load features
    local_path = mlflow.artifacts.download_artifacts(artifact_uri)
    df = pd.read_parquet(local_path)

    return df

Usage

store = FeatureStore("s3://my-bucket/features")

Create features

features = pd.DataFrame({ 'customer_id': range(1000), 'lifetime_value': np.random.rand(1000) * 1000, 'avg_purchase': np.random.rand(1000) * 100, 'days_since_last_purchase': np.random.randint(0, 365, 1000) })

feature_path = store.create_feature_set( name="customer_features", df=features, description="Customer behavioral features for churn prediction" )

Feature Engineering Pipeline

import mlflow from sklearn.preprocessing import StandardScaler from sklearn.decomposition import PCA

def feature_engineering_pipeline(data, run_name="feature-engineering"): """Log feature engineering steps""" with mlflow.start_run(run_name=run_name): # Original features mlflow.log_param("original_features", len(data.columns))

    # Scaling
    scaler = StandardScaler()
    scaled_data = scaler.fit_transform(data)
    mlflow.sklearn.log_model(scaler, "scaler")

    # Dimensionality reduction
    pca = PCA(n_components=0.95)
    transformed_data = pca.fit_transform(scaled_data)
    mlflow.sklearn.log_model(pca, "pca")

    mlflow.log_param("final_features", transformed_data.shape[1])
    mlflow.log_metric("variance_explained", pca.explained_variance_ratio_.sum())

    return transformed_data

Usage

transformed_features = feature_engineering_pipeline(raw_data)

CI/CD for ML

Training Pipeline

import mlflow from sklearn.model_selection import cross_val_score from sklearn.ensemble import RandomForestClassifier

def training_pipeline(data_path, model_params, validation_threshold=0.85): """Automated training pipeline with validation gates"""

mlflow.set_experiment("production-training-pipeline")

with mlflow.start_run(run_name="pipeline-run"):
    # Load data
    data = pd.read_csv(data_path)
    X = data.drop('target', axis=1)
    y = data['target']

    # Log data version
    mlflow.log_param("data_version", data_path.split('/')[-1])
    mlflow.log_param("data_samples", len(data))

    # Train model
    model = RandomForestClassifier(**model_params)
    model.fit(X, y)

    # Cross-validation
    cv_scores = cross_val_score(model, X, y, cv=5)
    mean_cv_score = cv_scores.mean()

    mlflow.log_params(model_params)
    mlflow.log_metric("cv_score_mean", mean_cv_score)
    mlflow.log_metric("cv_score_std", cv_scores.std())

    # Validation gate
    if mean_cv_score >= validation_threshold:
        # Log model
        mlflow.sklearn.log_model(
            sk_model=model,
            name="model",
            registered_model_name="ProductionModel"
        )
        mlflow.set_tag("status", "passed")
        return True
    else:
        mlflow.set_tag("status", "failed")
        mlflow.set_tag("failure_reason", "below_threshold")
        return False

Usage in CI/CD

success = training_pipeline( data_path="data/training_data_v2.csv", model_params={'n_estimators': 100, 'max_depth': 10}, validation_threshold=0.85 )

if not success: raise ValueError("Model did not meet validation criteria")

Model Promotion Pipeline

from mlflow import MlflowClient

def promote_model_to_production(model_name, version, validation_results): """Promote model through stages with validation"""

client = MlflowClient()

# Validation checks
required_metrics = ['accuracy', 'precision', 'recall']
for metric in required_metrics:
    if metric not in validation_results:
        raise ValueError(f"Missing required metric: {metric}")
    if validation_results[metric] < 0.8:
        raise ValueError(f"{metric} below threshold: {validation_results[metric]}")

# Set tags
for metric, value in validation_results.items():
    client.set_model_version_tag(
        name=model_name,
        version=version,
        key=f"validation_{metric}",
        value=str(value)
    )

# Promote to production
client.set_registered_model_alias(
    name=model_name,
    alias="production",
    version=version
)

# Tag with promotion metadata
client.set_model_version_tag(
    name=model_name,
    version=version,
    key="promoted_at",
    value=datetime.now().isoformat()
)

return True

Usage

validation_results = { 'accuracy': 0.92, 'precision': 0.89, 'recall': 0.91 }

promote_model_to_production( model_name="FraudDetectionModel", version="5", validation_results=validation_results )

Automated Model Retraining

import mlflow import schedule import time

class AutomatedRetrainer: def init(self, model_name, data_source, schedule_interval="daily"): self.model_name = model_name self.data_source = data_source self.schedule_interval = schedule_interval

    mlflow.set_experiment(f"{model_name}-retraining")

def retrain(self):
    """Retrain model with latest data"""
    with mlflow.start_run(run_name=f"retrain-{datetime.now().isoformat()}"):
        # Load latest data
        data = self.load_latest_data()

        # Get current production model
        client = MlflowClient()
        current_model = client.get_model_version_by_alias(
            self.model_name, "production"
        )

        # Load and evaluate current model
        current_model_obj = mlflow.sklearn.load_model(
            f"models:/{self.model_name}@production"
        )
        current_score = current_model_obj.score(X_test, y_test)

        mlflow.log_metric("current_production_score", current_score)

        # Train new model
        new_model = self.train_model(data)
        new_score = new_model.score(X_test, y_test)

        mlflow.log_metric("new_model_score", new_score)

        # Compare and promote if better
        if new_score > current_score:
            mlflow.sklearn.log_model(
                sk_model=new_model,
                name="model",
                registered_model_name=self.model_name
            )
            mlflow.set_tag("status", "promoted")
        else:
            mlflow.set_tag("status", "not_promoted")

def start_scheduled_retraining(self):
    """Start scheduled retraining"""
    if self.schedule_interval == "daily":
        schedule.every().day.at("02:00").do(self.retrain)
    elif self.schedule_interval == "weekly":
        schedule.every().monday.at("02:00").do(self.retrain)

    while True:
        schedule.run_pending()
        time.sleep(3600)

Usage

retrainer = AutomatedRetrainer( model_name="CustomerChurnModel", data_source="s3://bucket/data", schedule_interval="daily" )

Production Best Practices

Model Signatures

from mlflow.models import infer_signature, ModelSignature from mlflow.types import Schema, ColSpec import mlflow.sklearn import numpy as np

Method 1: Infer signature from data

signature = infer_signature(X_train, model.predict(X_train))

Method 2: Define explicit signature

input_schema = Schema([ ColSpec("double", "age"), ColSpec("double", "income"), ColSpec("string", "customer_segment") ])

output_schema = Schema([ColSpec("double")])

signature = ModelSignature(inputs=input_schema, outputs=output_schema)

Log model with signature

mlflow.sklearn.log_model( sk_model=model, name="model", signature=signature, input_example=X_train[:5] )

Model Validation Framework

import mlflow from sklearn.metrics import classification_report import json

class ModelValidator: def init(self, thresholds): self.thresholds = thresholds

def validate(self, model, X_test, y_test):
    """Comprehensive model validation"""
    results = {}

    with mlflow.start_run(run_name="model-validation"):
        # Performance metrics
        predictions = model.predict(X_test)
        report = classification_report(y_test, predictions, output_dict=True)

        # Check thresholds
        passed = True
        for metric, threshold in self.thresholds.items():
            value = report['weighted avg'][metric]
            results[metric] = value
            mlflow.log_metric(metric, value)

            if value < threshold:
                passed = False
                mlflow.set_tag(f"{metric}_failed", "true")

        # Detailed report
        with open("validation_report.json", "w") as f:
            json.dump(report, f, indent=2)
        mlflow.log_artifact("validation_report.json")

        mlflow.set_tag("validation_passed", str(passed))

        return passed, results

Usage

validator = ModelValidator(thresholds={ 'precision': 0.85, 'recall': 0.80, 'f1-score': 0.82 })

passed, results = validator.validate(model, X_test, y_test)

Error Handling and Logging

import mlflow import logging from functools import wraps

def mlflow_error_handler(func): """Decorator for MLflow error handling""" @wraps(func) def wrapper(*args, **kwargs): with mlflow.start_run(run_name=f"{func.name}"): try: result = func(*args, **kwargs) mlflow.set_tag("status", "success") return result

        except Exception as e:
            # Log error
            mlflow.set_tag("status", "failed")
            mlflow.set_tag("error_type", type(e).__name__)
            mlflow.set_tag("error_message", str(e))

            # Log traceback
            import traceback
            tb = traceback.format_exc()
            with open("error_traceback.txt", "w") as f:
                f.write(tb)
            mlflow.log_artifact("error_traceback.txt")

            logging.error(f"Error in {func.__name__}: {str(e)}")
            raise

return wrapper

@mlflow_error_handler def train_model_with_error_handling(data): # Training code model = RandomForestClassifier() model.fit(X, y) return model

Model Performance Baseline

import mlflow from sklearn.dummy import DummyClassifier

def establish_baseline(X_train, y_train, X_test, y_test): """Establish baseline model performance""" mlflow.set_experiment("baseline-models")

strategies = ['most_frequent', 'stratified', 'uniform']

for strategy in strategies:
    with mlflow.start_run(run_name=f"baseline-{strategy}"):
        baseline = DummyClassifier(strategy=strategy)
        baseline.fit(X_train, y_train)
        score = baseline.score(X_test, y_test)

        mlflow.log_param("strategy", strategy)
        mlflow.log_metric("accuracy", score)

        mlflow.sklearn.log_model(
            sk_model=baseline,
            name="baseline_model",
            registered_model_name=f"Baseline-{strategy}"
        )

Usage

establish_baseline(X_train, y_train, X_test, y_test)

Summary

This comprehensive guide covers production-grade MLOps workflows using MLflow:

  • Experiment Tracking: Log parameters, metrics, and artifacts systematically

  • Model Registry: Centralized model versioning and lifecycle management

  • Deployment: Multiple deployment patterns for various platforms

  • Monitoring: Track model performance and data drift in production

  • A/B Testing: Compare model variants in production

  • Feature Stores: Version and manage feature engineering

  • CI/CD: Automated training, validation, and promotion pipelines

  • Best Practices: Signatures, validation, error handling, and baselines

These patterns enable teams to build robust, scalable ML systems from experimentation through production deployment and monitoring.

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

docker-compose-orchestration

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

golang-backend-development

No summary provided by upstream source.

Repository SourceNeeds Review
General

postgresql-database-engineering

No summary provided by upstream source.

Repository SourceNeeds Review
General

jest-react-testing

No summary provided by upstream source.

Repository SourceNeeds Review