data-engineering

Data pipeline architecture, ETL/ELT patterns, data modeling, and production data platform design

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "data-engineering" with this command: npx skills add pluginagentmarketplace/custom-plugin-data-engineer/pluginagentmarketplace-custom-plugin-data-engineer-data-engineering

Data Engineering Fundamentals

Core data engineering concepts, patterns, and practices for building production data platforms.

Quick Start

# Production Data Pipeline Pattern
from dataclasses import dataclass
from datetime import datetime
from typing import Generator
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class PipelineConfig:
    source: str
    destination: str
    batch_size: int = 10000
    retry_count: int = 3

class DataPipeline:
    """Production-ready data pipeline with error handling."""

    def __init__(self, config: PipelineConfig):
        self.config = config
        self.metrics = {"extracted": 0, "transformed": 0, "loaded": 0, "errors": 0}

    def extract(self) -> Generator[dict, None, None]:
        """Extract data in batches from source."""
        logger.info(f"Extracting from {self.config.source}")
        offset = 0
        while True:
            batch = self._fetch_batch(offset, self.config.batch_size)
            if not batch:
                break
            self.metrics["extracted"] += len(batch)
            yield batch
            offset += self.config.batch_size

    def transform(self, batch: list[dict]) -> list[dict]:
        """Apply transformations with validation."""
        transformed = []
        for record in batch:
            try:
                cleaned = self._clean_record(record)
                enriched = self._enrich_record(cleaned)
                transformed.append(enriched)
            except Exception as e:
                logger.warning(f"Transform error: {e}")
                self.metrics["errors"] += 1
        self.metrics["transformed"] += len(transformed)
        return transformed

    def load(self, batch: list[dict]) -> None:
        """Load to destination with retry logic."""
        for attempt in range(self.config.retry_count):
            try:
                self._write_batch(batch)
                self.metrics["loaded"] += len(batch)
                return
            except Exception as e:
                if attempt == self.config.retry_count - 1:
                    raise
                logger.warning(f"Load attempt {attempt + 1} failed: {e}")

    def run(self) -> dict:
        """Execute full ETL pipeline."""
        start_time = datetime.now()
        logger.info("Pipeline started")

        for batch in self.extract():
            transformed = self.transform(batch)
            if transformed:
                self.load(transformed)

        duration = (datetime.now() - start_time).total_seconds()
        self.metrics["duration_seconds"] = duration
        logger.info(f"Pipeline completed: {self.metrics}")
        return self.metrics

Core Concepts

1. Data Architecture Patterns

┌─────────────────────────────────────────────────────────────────┐
│                    Modern Data Architecture                      │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  Sources          Ingestion        Storage         Consumption  │
│  ────────         ─────────        ───────         ───────────  │
│  ┌──────┐        ┌─────────┐      ┌───────┐       ┌──────────┐ │
│  │ APIs │───────▶│ Airbyte │─────▶│ Raw   │──────▶│ BI Tools │ │
│  │ DBs  │        │ Fivetran│      │ Layer │       │ Dashboards│ │
│  │ Files│        │ Kafka   │      │ (S3)  │       └──────────┘ │
│  │ SaaS │        └─────────┘      └───┬───┘                    │
│  └──────┘                             │                         │
│                                       ▼                         │
│                               ┌───────────────┐                 │
│                               │  Transform    │                 │
│                               │  (dbt/Spark)  │                 │
│                               └───────┬───────┘                 │
│                                       │                         │
│                                       ▼                         │
│                               ┌───────────────┐   ┌──────────┐ │
│                               │   Warehouse   │──▶│ ML/AI    │ │
│                               │  (Snowflake)  │   │ Pipelines│ │
│                               └───────────────┘   └──────────┘ │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

2. ETL vs ELT

# ETL Pattern (Transform before load)
# Best for: Sensitive data, complex transformations, limited storage

def etl_pipeline():
    raw_data = extract_from_source()
    cleaned_data = transform_and_clean(raw_data)  # Transform first
    load_to_destination(cleaned_data)

# ELT Pattern (Load then transform)
# Best for: Cloud warehouses, large scale, exploratory analysis

def elt_pipeline():
    raw_data = extract_from_source()
    load_to_staging(raw_data)  # Load raw first
    # Transform in warehouse with SQL/dbt
    run_dbt_models()

3. Data Quality Framework

from dataclasses import dataclass
from typing import Callable, Any
import pandas as pd

@dataclass
class DataQualityCheck:
    name: str
    check_fn: Callable[[pd.DataFrame], bool]
    severity: str  # "error" or "warning"

class DataQualityValidator:
    def __init__(self, checks: list[DataQualityCheck]):
        self.checks = checks
        self.results = []

    def validate(self, df: pd.DataFrame) -> bool:
        all_passed = True
        for check in self.checks:
            passed = check.check_fn(df)
            self.results.append({
                "check": check.name,
                "passed": passed,
                "severity": check.severity
            })
            if not passed and check.severity == "error":
                all_passed = False
        return all_passed

# Define checks
checks = [
    DataQualityCheck(
        name="no_nulls_in_id",
        check_fn=lambda df: df["id"].notna().all(),
        severity="error"
    ),
    DataQualityCheck(
        name="positive_amounts",
        check_fn=lambda df: (df["amount"] > 0).all(),
        severity="error"
    ),
    DataQualityCheck(
        name="valid_dates",
        check_fn=lambda df: pd.to_datetime(df["date"], errors="coerce").notna().all(),
        severity="warning"
    ),
]

validator = DataQualityValidator(checks)
if not validator.validate(df):
    raise ValueError(f"Data quality checks failed: {validator.results}")

4. Idempotent Operations

from datetime import date

def idempotent_load(df: pd.DataFrame, table: str, partition_date: date):
    """
    Idempotent load: can be re-run safely without duplicates.
    Uses delete-then-insert pattern.
    """
    # Delete existing data for this partition
    db.execute(f"""
        DELETE FROM {table}
        WHERE partition_date = %(date)s
    """, {"date": partition_date})

    # Insert new data
    df["partition_date"] = partition_date
    df.to_sql(table, db, if_exists="append", index=False)

# Alternative: MERGE/UPSERT pattern
def upsert_records(df: pd.DataFrame, table: str, key_columns: list[str]):
    """Upsert: Update existing, insert new."""
    temp_table = f"{table}_staging"
    df.to_sql(temp_table, db, if_exists="replace", index=False)

    key_match = " AND ".join([f"t.{k} = s.{k}" for k in key_columns])

    db.execute(f"""
        MERGE INTO {table} t
        USING {temp_table} s ON {key_match}
        WHEN MATCHED THEN UPDATE SET ...
        WHEN NOT MATCHED THEN INSERT ...
    """)

Tools & Technologies

ToolPurposeVersion (2025)
PythonPipeline development3.12+
SQLData transformation-
AirflowOrchestration2.8+
dbtSQL transformations1.7+
SparkLarge-scale processing3.5+
AirbyteData integration0.55+
Great ExpectationsData quality0.18+

Troubleshooting Guide

IssueSymptomsRoot CauseFix
Duplicate DataCount mismatchNon-idempotent loadUse MERGE, add dedup
Schema DriftPipeline failureSource schema changedSchema validation, alerts
Data FreshnessStale dataPipeline delaysMonitor SLAs, alerting
Memory ErrorOOM in pipelineLarge batch sizeChunked processing

Best Practices

# ✅ DO: Make pipelines idempotent
delete_and_insert(partition_date)

# ✅ DO: Add observability
logger.info(f"Processed {count} records", extra={"metric": "records_processed"})

# ✅ DO: Validate data at boundaries
validate_schema(df, expected_schema)
validate_constraints(df)

# ✅ DO: Use incremental processing
WHERE updated_at > last_run_timestamp

# ❌ DON'T: Process all data every run
# ❌ DON'T: Skip data quality checks
# ❌ DON'T: Ignore schema changes

Resources


Skill Certification Checklist:

  • Can design end-to-end data pipelines
  • Can implement idempotent data loads
  • Can set up data quality validation
  • Can choose appropriate ETL vs ELT patterns
  • Can monitor and troubleshoot pipelines

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Coding

machine learning

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

python-programming

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

deep-learning

No summary provided by upstream source.

Repository SourceNeeds Review