monitoring-observability

Prometheus, Grafana, logging, alerting, and data pipeline observability

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "monitoring-observability" with this command: npx skills add pluginagentmarketplace/custom-plugin-data-engineer/pluginagentmarketplace-custom-plugin-data-engineer-monitoring-observability

Monitoring & Observability

Production monitoring with Prometheus, Grafana, structured logging, and data quality observability.

Quick Start

from prometheus_client import Counter, Histogram, Gauge, start_http_server
import structlog
import time

# Configure structured logging
structlog.configure(
    processors=[
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.JSONRenderer()
    ]
)
logger = structlog.get_logger()

# Prometheus metrics
RECORDS_PROCESSED = Counter('records_processed_total', 'Total records processed', ['pipeline', 'status'])
PROCESSING_TIME = Histogram('processing_duration_seconds', 'Processing duration', ['pipeline'])
QUEUE_SIZE = Gauge('queue_size', 'Current queue size', ['queue_name'])

def process_batch(batch: list, pipeline_name: str):
    start_time = time.time()

    try:
        for record in batch:
            # Process record...
            RECORDS_PROCESSED.labels(pipeline=pipeline_name, status='success').inc()

        duration = time.time() - start_time
        PROCESSING_TIME.labels(pipeline=pipeline_name).observe(duration)

        logger.info("batch_processed",
            pipeline=pipeline_name,
            count=len(batch),
            duration_seconds=duration
        )

    except Exception as e:
        RECORDS_PROCESSED.labels(pipeline=pipeline_name, status='error').inc()
        logger.error("batch_failed", pipeline=pipeline_name, error=str(e))
        raise

# Start metrics server
start_http_server(8000)

Core Concepts

1. Prometheus Metrics

from prometheus_client import Counter, Histogram, Gauge, Summary

# Counter: monotonically increasing value
http_requests = Counter(
    'http_requests_total',
    'Total HTTP requests',
    ['method', 'endpoint', 'status']
)
http_requests.labels(method='GET', endpoint='/api/data', status='200').inc()

# Histogram: distribution of values (latency, sizes)
request_latency = Histogram(
    'request_latency_seconds',
    'Request latency in seconds',
    ['endpoint'],
    buckets=[0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]
)

with request_latency.labels(endpoint='/api/data').time():
    # Process request
    pass

# Gauge: value that can go up and down
active_connections = Gauge('active_connections', 'Active connections')
active_connections.inc()  # Connection opened
active_connections.dec()  # Connection closed

# Summary: similar to histogram with percentiles
response_size = Summary('response_size_bytes', 'Response size', ['endpoint'])
response_size.labels(endpoint='/api/data').observe(1024)

2. Grafana Dashboard (JSON)

{
  "title": "Data Pipeline Dashboard",
  "panels": [
    {
      "title": "Records Processed",
      "type": "stat",
      "targets": [{
        "expr": "sum(rate(records_processed_total[5m]))",
        "legendFormat": "Records/sec"
      }]
    },
    {
      "title": "Processing Latency P95",
      "type": "graph",
      "targets": [{
        "expr": "histogram_quantile(0.95, rate(processing_duration_seconds_bucket[5m]))",
        "legendFormat": "P95 Latency"
      }]
    },
    {
      "title": "Error Rate",
      "type": "gauge",
      "targets": [{
        "expr": "sum(rate(records_processed_total{status='error'}[5m])) / sum(rate(records_processed_total[5m])) * 100",
        "legendFormat": "Error %"
      }]
    }
  ]
}

3. Structured Logging

import structlog
from datetime import datetime

# Configure structlog
structlog.configure(
    processors=[
        structlog.stdlib.add_log_level,
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.StackInfoRenderer(),
        structlog.processors.format_exc_info,
        structlog.processors.JSONRenderer()
    ],
    context_class=dict,
    logger_factory=structlog.PrintLoggerFactory(),
)

logger = structlog.get_logger()

# Usage with context
log = logger.bind(service="etl-pipeline", environment="production")

def process_order(order_id: str, user_id: str):
    order_log = log.bind(order_id=order_id, user_id=user_id)

    order_log.info("processing_started")

    try:
        # Process...
        order_log.info("processing_completed", duration_ms=150)
    except Exception as e:
        order_log.error("processing_failed", error=str(e), exc_info=True)
        raise

4. Alerting Rules (Prometheus)

# alerting_rules.yml
groups:
  - name: data-pipeline-alerts
    rules:
      - alert: HighErrorRate
        expr: |
          sum(rate(records_processed_total{status="error"}[5m]))
          / sum(rate(records_processed_total[5m])) > 0.05
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "High error rate in data pipeline"
          description: "Error rate is {{ $value | humanizePercentage }}"

      - alert: PipelineStalled
        expr: |
          sum(rate(records_processed_total[10m])) == 0
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "Data pipeline is not processing records"

      - alert: HighLatency
        expr: |
          histogram_quantile(0.95, rate(processing_duration_seconds_bucket[5m])) > 5
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "High processing latency detected"

Tools & Technologies

ToolPurposeVersion (2025)
PrometheusMetrics collection2.50+
GrafanaVisualization10.3+
LokiLog aggregation2.9+
AlertmanagerAlert routing0.27+
OpenTelemetryTracing standard1.24+
DatadogFull observabilityLatest
Monte CarloData observabilityLatest

Troubleshooting Guide

IssueSymptomsRoot CauseFix
Missing MetricsGaps in graphsScrape failureCheck targets, network
High CardinalityPrometheus OOMToo many labelsReduce label values
Alert FatigueToo many alertsSensitive thresholdsTune thresholds, add for duration
Log VolumeHigh storage costVerbose loggingAdjust log levels

Best Practices

# ✅ DO: Use appropriate metric types
# Counter for totals, Histogram for latency

# ✅ DO: Add meaningful labels (but limit cardinality)
REQUESTS.labels(method='GET', status='200', endpoint='/api').inc()

# ✅ DO: Include correlation IDs in logs
logger.info("request_completed", request_id=request_id)

# ✅ DO: Set up dashboards for key metrics

# ❌ DON'T: High cardinality labels (user_id, request_id as labels)
# ❌ DON'T: Log sensitive data
# ❌ DON'T: Alert on every error

Resources


Skill Certification Checklist:

  • Can instrument applications with Prometheus metrics
  • Can create Grafana dashboards
  • Can implement structured logging
  • Can set up alerting rules
  • Can troubleshoot observability issues

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Automation

statistics-math

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

deep-learning

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

data-engineering

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

big-data

No summary provided by upstream source.

Repository SourceNeeds Review