data-engineering

Data engineering patterns for ETL pipelines, data warehousing, Apache Spark, and data quality validation

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "data-engineering" with this command: npx skills add rohitg00/awesome-claude-code-toolkit/rohitg00-awesome-claude-code-toolkit-data-engineering

Data Engineering

ETL Pipeline Pattern

from datetime import datetime
from dataclasses import dataclass

@dataclass
class PipelineResult:
    records_extracted: int
    records_transformed: int
    records_loaded: int
    errors: list[str]
    duration_seconds: float

class OrderPipeline:
    def __init__(self, source_db, warehouse_db):
        self.source = source_db
        self.warehouse = warehouse_db

    def extract(self, since: datetime) -> list[dict]:
        query = """
            SELECT o.*, c.name as customer_name, c.segment
            FROM orders o
            JOIN customers c ON o.customer_id = c.id
            WHERE o.updated_at > %s
        """
        return self.source.fetch_all(query, [since])

    def transform(self, records: list[dict]) -> list[dict]:
        transformed = []
        for record in records:
            transformed.append({
                "order_id": record["id"],
                "customer_name": record["customer_name"],
                "segment": record["segment"].upper(),
                "total_amount": float(record["total"]),
                "order_date": record["created_at"].date(),
                "fiscal_quarter": get_fiscal_quarter(record["created_at"]),
                "is_high_value": float(record["total"]) > 1000,
                "loaded_at": datetime.utcnow(),
            })
        return transformed

    def load(self, records: list[dict]) -> int:
        return self.warehouse.upsert_batch(
            table="fact_orders",
            records=records,
            conflict_keys=["order_id"],
            batch_size=5000,
        )

    def run(self, since: datetime) -> PipelineResult:
        start = datetime.utcnow()
        raw = self.extract(since)
        clean = self.transform(raw)
        loaded = self.load(clean)
        return PipelineResult(
            records_extracted=len(raw),
            records_transformed=len(clean),
            records_loaded=loaded,
            errors=[],
            duration_seconds=(datetime.utcnow() - start).total_seconds(),
        )

Apache Spark Processing

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

spark = SparkSession.builder \
    .appName("sales-analytics") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

orders = spark.read.parquet("s3://data-lake/orders/")
customers = spark.read.parquet("s3://data-lake/customers/")

daily_revenue = (
    orders
    .filter(F.col("status") == "completed")
    .withColumn("order_date", F.to_date("created_at"))
    .groupBy("order_date", "product_category")
    .agg(
        F.sum("total_amount").alias("revenue"),
        F.count("id").alias("order_count"),
        F.avg("total_amount").alias("avg_order_value"),
    )
    .withColumn(
        "revenue_7d_avg",
        F.avg("revenue").over(
            Window.partitionBy("product_category")
            .orderBy("order_date")
            .rowsBetween(-6, 0)
        )
    )
)

daily_revenue.write \
    .partitionBy("order_date") \
    .mode("overwrite") \
    .parquet("s3://data-warehouse/daily_revenue/")

Data Quality Checks

from dataclasses import dataclass

@dataclass
class QualityCheck:
    name: str
    query: str
    threshold: float
    severity: str

CHECKS = [
    QualityCheck(
        name="null_customer_ids",
        query="SELECT COUNT(*) FROM fact_orders WHERE customer_id IS NULL",
        threshold=0,
        severity="critical",
    ),
    QualityCheck(
        name="negative_amounts",
        query="SELECT COUNT(*) FROM fact_orders WHERE total_amount < 0",
        threshold=0,
        severity="critical",
    ),
    QualityCheck(
        name="duplicate_orders",
        query="SELECT COUNT(*) - COUNT(DISTINCT order_id) FROM fact_orders",
        threshold=0,
        severity="warning",
    ),
    QualityCheck(
        name="freshness",
        query="SELECT EXTRACT(EPOCH FROM NOW() - MAX(loaded_at))/3600 FROM fact_orders",
        threshold=2.0,
        severity="warning",
    ),
]

def run_quality_checks(db, checks: list[QualityCheck]) -> list[dict]:
    results = []
    for check in checks:
        value = db.fetch_scalar(check.query)
        passed = value <= check.threshold
        results.append({
            "name": check.name,
            "value": value,
            "threshold": check.threshold,
            "passed": passed,
            "severity": check.severity,
        })
        if not passed and check.severity == "critical":
            raise DataQualityError(f"Critical check failed: {check.name} = {value}")
    return results

Data Warehouse Schema (Star Schema)

CREATE TABLE dim_customers (
    customer_key    BIGINT PRIMARY KEY,
    customer_id     VARCHAR(50) NOT NULL,
    name            VARCHAR(200),
    segment         VARCHAR(50),
    country         VARCHAR(100),
    valid_from      TIMESTAMP NOT NULL,
    valid_to        TIMESTAMP,
    is_current      BOOLEAN DEFAULT TRUE
);

CREATE TABLE dim_products (
    product_key     BIGINT PRIMARY KEY,
    product_id      VARCHAR(50) NOT NULL,
    name            VARCHAR(200),
    category        VARCHAR(100),
    subcategory     VARCHAR(100)
);

CREATE TABLE fact_orders (
    order_key       BIGINT PRIMARY KEY,
    order_id        VARCHAR(50) UNIQUE NOT NULL,
    customer_key    BIGINT REFERENCES dim_customers(customer_key),
    product_key     BIGINT REFERENCES dim_products(product_key),
    order_date_key  INT,
    quantity        INT,
    unit_price      DECIMAL(10,2),
    total_amount    DECIMAL(12,2),
    loaded_at       TIMESTAMP DEFAULT NOW()
);

Anti-Patterns

  • Processing data row-by-row instead of in batches or sets
  • Not partitioning large tables by date or category
  • Missing data quality checks between pipeline stages
  • Loading raw data directly into the warehouse without transformation
  • Using full table scans when incremental loads would suffice
  • Not tracking data lineage (where data came from, when it was processed)

Checklist

  • Pipelines follow Extract-Transform-Load with clear stage separation
  • Incremental processing based on watermarks or change data capture
  • Data quality checks run after each pipeline stage
  • Warehouse uses star or snowflake schema with dimension and fact tables
  • Spark jobs use adaptive query execution and appropriate partitioning
  • Idempotent loads (re-running produces the same result)
  • Data freshness monitored with automated alerts
  • Schema evolution handled gracefully (additive changes preferred)

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Coding

golang-idioms

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

devops-automation

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

database-optimization

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

redis-patterns

No summary provided by upstream source.

Repository SourceNeeds Review