ai-architect-expert

Expert guidance for designing AI systems, MLOps architecture, scalable ML infrastructure, and AI platform engineering.

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "ai-architect-expert" with this command: npx skills add personamanagmentlayer/pcl/personamanagmentlayer-pcl-ai-architect-expert

AI Architect Expert

Expert guidance for designing AI systems, MLOps architecture, scalable ML infrastructure, and AI platform engineering.

Core Concepts

AI System Architecture

  • Model serving architectures

  • Real-time vs batch inference

  • Feature stores

  • Model registries

  • Training pipelines

  • Data versioning

MLOps Infrastructure

  • CI/CD for ML

  • Model monitoring and observability

  • A/B testing frameworks

  • Model retraining automation

  • Resource orchestration

  • Cost optimization

Scalability Patterns

  • Distributed training

  • Model parallelism

  • Data parallelism

  • Inference optimization

  • Caching strategies

  • Load balancing

ML Platform Architecture

from dataclasses import dataclass from typing import Dict, List, Optional from enum import Enum

class ModelStage(Enum): DEVELOPMENT = "development" STAGING = "staging" PRODUCTION = "production" ARCHIVED = "archived"

@dataclass class ModelMetadata: name: str version: str framework: str stage: ModelStage metrics: Dict[str, float] created_at: str updated_at: str

class ModelRegistry: """Central model registry for ML platform"""

def __init__(self):
    self.models: Dict[str, List[ModelMetadata]] = {}

def register_model(self, model: ModelMetadata) -> str:
    """Register new model version"""
    if model.name not in self.models:
        self.models[model.name] = []

    self.models[model.name].append(model)
    return f"{model.name}:{model.version}"

def promote_model(self, name: str, version: str, stage: ModelStage):
    """Promote model to different stage"""
    for model in self.models.get(name, []):
        if model.version == version:
            model.stage = stage
            return True
    return False

def get_production_model(self, name: str) -> Optional[ModelMetadata]:
    """Get current production model"""
    for model in self.models.get(name, []):
        if model.stage == ModelStage.PRODUCTION:
            return model
    return None

class FeatureStore: """Feature store for ML features"""

def __init__(self):
    self.features: Dict[str, Dict] = {}
    self.feature_groups: Dict[str, List[str]] = {}

def register_feature(self, name: str, dtype: str, description: str,
                    transformation: Optional[str] = None):
    """Register feature definition"""
    self.features[name] = {
        "dtype": dtype,
        "description": description,
        "transformation": transformation
    }

def create_feature_group(self, group_name: str, feature_names: List[str]):
    """Create feature group for reuse"""
    self.feature_groups[group_name] = feature_names

def get_features(self, entity_id: str, feature_names: List[str]) -> Dict:
    """Retrieve feature values for entity"""
    # In production, this would query online/offline stores
    return {name: self._fetch_feature(entity_id, name)
            for name in feature_names}

Training Pipeline Architecture

from abc import ABC, abstractmethod import torch.distributed as dist

class TrainingPipeline(ABC): """Base training pipeline"""

def __init__(self, config: Dict):
    self.config = config
    self.experiment_tracker = None
    self.checkpointer = None

@abstractmethod
def prepare_data(self):
    """Data preparation step"""
    pass

@abstractmethod
def train(self):
    """Training step"""
    pass

@abstractmethod
def evaluate(self):
    """Evaluation step"""
    pass

def run(self):
    """Execute full pipeline"""
    self.prepare_data()
    self.train()
    metrics = self.evaluate()
    self.log_metrics(metrics)
    return metrics

class DistributedTrainingPipeline(TrainingPipeline): """Distributed training with DDP"""

def __init__(self, config: Dict, world_size: int, rank: int):
    super().__init__(config)
    self.world_size = world_size
    self.rank = rank
    self.setup_distributed()

def setup_distributed(self):
    """Initialize distributed training"""
    dist.init_process_group(
        backend='nccl',
        world_size=self.world_size,
        rank=self.rank
    )

def prepare_data(self):
    """Distribute data across workers"""
    from torch.utils.data.distributed import DistributedSampler

    self.sampler = DistributedSampler(
        self.dataset,
        num_replicas=self.world_size,
        rank=self.rank
    )

def train(self):
    """Distributed training loop"""
    from torch.nn.parallel import DistributedDataParallel as DDP

    model = DDP(self.model, device_ids=[self.rank])

    for epoch in range(self.config['epochs']):
        self.sampler.set_epoch(epoch)

        for batch in self.dataloader:
            loss = self.train_step(model, batch)

            if self.rank == 0:
                self.log_loss(loss)

Model Serving Architecture

from fastapi import FastAPI, BackgroundTasks from prometheus_client import Counter, Histogram import asyncio

Metrics

prediction_counter = Counter('predictions_total', 'Total predictions') prediction_latency = Histogram('prediction_latency_seconds', 'Prediction latency')

class ModelServer: """Production model serving"""

def __init__(self, model_registry: ModelRegistry):
    self.registry = model_registry
    self.loaded_models = {}
    self.prediction_cache = {}

async def load_model(self, name: str, version: str = "production"):
    """Load model into memory"""
    if version == "production":
        model_metadata = self.registry.get_production_model(name)
    else:
        model_metadata = self.registry.get_model(name, version)

    if not model_metadata:
        raise ValueError(f"Model {name}:{version} not found")

    # Load model from storage
    model = await self._load_from_storage(model_metadata)
    self.loaded_models[f"{name}:{version}"] = model

    return model

@prediction_latency.time()
async def predict(self, model_name: str, features: Dict) -> Dict:
    """Make prediction with caching"""
    prediction_counter.inc()

    # Check cache
    cache_key = self._generate_cache_key(model_name, features)
    if cache_key in self.prediction_cache:
        return self.prediction_cache[cache_key]

    # Get model
    model = self.loaded_models.get(model_name)
    if not model:
        model = await self.load_model(model_name)

    # Predict
    result = await self._run_inference(model, features)

    # Cache result
    self.prediction_cache[cache_key] = result

    return result

async def predict_batch(self, model_name: str,
                       batch_features: List[Dict]) -> List[Dict]:
    """Batch prediction for efficiency"""
    tasks = [self.predict(model_name, features)
            for features in batch_features]
    return await asyncio.gather(*tasks)

app = FastAPI() model_server = ModelServer(model_registry=ModelRegistry())

@app.post("/predict/{model_name}") async def predict_endpoint(model_name: str, features: Dict): return await model_server.predict(model_name, features)

Monitoring and Observability

from dataclasses import dataclass from datetime import datetime import numpy as np

@dataclass class PredictionLog: timestamp: datetime model_name: str model_version: str features: Dict prediction: any latency_ms: float input_hash: str

class ModelMonitor: """Monitor model performance in production"""

def __init__(self):
    self.logs: List[PredictionLog] = []
    self.metrics = {}

def log_prediction(self, log: PredictionLog):
    """Log prediction for monitoring"""
    self.logs.append(log)

    # Update metrics
    self.update_latency_metrics(log)
    self.check_data_drift(log)

def update_latency_metrics(self, log: PredictionLog):
    """Track prediction latency"""
    model_key = f"{log.model_name}:{log.model_version}"

    if model_key not in self.metrics:
        self.metrics[model_key] = {
            "latencies": [],
            "predictions": 0
        }

    self.metrics[model_key]["latencies"].append(log.latency_ms)
    self.metrics[model_key]["predictions"] += 1

def check_data_drift(self, log: PredictionLog):
    """Detect data drift in input features"""
    # Compare current feature distributions with training data
    # Alert if significant drift detected
    pass

def get_model_health(self, model_name: str) -> Dict:
    """Get model health metrics"""
    model_metrics = self.metrics.get(model_name, {})

    latencies = model_metrics.get("latencies", [])

    return {
        "total_predictions": model_metrics.get("predictions", 0),
        "avg_latency_ms": np.mean(latencies) if latencies else 0,
        "p95_latency_ms": np.percentile(latencies, 95) if latencies else 0,
        "p99_latency_ms": np.percentile(latencies, 99) if latencies else 0
    }

Best Practices

Architecture Design

  • Separate training and serving infrastructure

  • Use feature stores for consistency

  • Implement model versioning from day one

  • Design for horizontal scalability

  • Plan for model rollback capability

  • Build monitoring into the architecture

MLOps

  • Automate model retraining pipelines

  • Implement CI/CD for models

  • Version everything (data, code, models)

  • Monitor model performance metrics

  • Track data drift and model decay

  • Implement gradual rollout (canary/blue-green)

Infrastructure

  • Use GPU efficiently (batching, mixed precision)

  • Implement caching for repeated predictions

  • Consider model compression (quantization, pruning)

  • Plan for disaster recovery

  • Optimize costs (spot instances, autoscaling)

  • Use managed services where appropriate

Anti-Patterns

❌ No model versioning or registry ❌ Training and serving environment mismatch ❌ No monitoring or alerting ❌ Manual model deployment process ❌ Ignoring data drift ❌ No rollback strategy ❌ Over-engineering for initial MVP

Resources

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Automation

finops-expert

No summary provided by upstream source.

Repository SourceNeeds Review
General

finance-expert

No summary provided by upstream source.

Repository SourceNeeds Review
General

trading-expert

No summary provided by upstream source.

Repository SourceNeeds Review
General

dart-expert

No summary provided by upstream source.

Repository SourceNeeds Review