Core Data Engineering
Use this skill for day-to-day Python data engineering decisions: dataframe processing, embedded OLAP SQL, Arrow interchange, ETL structure, and resilience patterns.
When to use this skill
Use this skill when the task involves one or more of:
-
Polars transformations and lazy query optimization
-
DuckDB SQL analytics, MERGE/upsert, or Parquet querying
-
PyArrow table/dataset interchange and Parquet scans
-
Python ETL pipeline structure (extract/transform/load, logging, retries, idempotency)
-
PostgreSQL-to-lake/warehouse ingestion patterns
If the task is primarily about cloud storage auth/connectors, use:
-
@data-engineering-storage-remote-access
-
@data-engineering-storage-authentication
If the task is primarily about ACID lakehouse table behavior, use:
- @data-engineering-storage-lakehouse
Quick tool selection
Need Default choice Why
DataFrame transformation in Python Polars Fast lazy engine, strong expression API
SQL over files/DataFrames DuckDB Embedded OLAP + Parquet/Arrow native
Interchange format between systems PyArrow Zero-copy table/batch ecosystem
OLTP source extraction psycopg2 / postgres driver Stable DB connectivity
Rule of thumb:
-
Start transformations in Polars lazy.
-
Use DuckDB for heavy SQL/windowing/joins over files.
-
Keep boundaries in Arrow/Parquet.
Core implementation rules
- Prefer lazy execution
-
Use pl.scan_* over pl.read_* for large inputs.
-
Chain filters/projections before collect() .
-
Avoid row-wise loops.
- Push work down
-
Push filtering into file scans (predicate pushdown).
-
Select only needed columns (column pruning).
-
Partition data by query dimensions (typically date/tenant/region).
- Keep writes idempotent
-
Prefer MERGE/upsert semantics where possible.
-
If append-only, track watermark/checkpoints.
-
Ensure retries do not duplicate side effects.
- Protect boundaries
-
Use parameterized SQL for values.
-
Treat dynamic identifiers (table/column names) separately and validate.
-
Never hardcode secrets.
- Instrument and validate
-
Log row counts and stage durations.
-
Validate schema/required columns at stage boundaries.
-
Record checkpoints/watermarks for incremental flows.
Minimal safe ETL shape
import polars as pl import duckdb
def run_etl(source_path: str, target_table: str) -> None: lazy = ( pl.scan_parquet(source_path) .filter(pl.col("value").is_not_null()) .select(["id", "event_ts", "value", "category"]) )
df = lazy.collect()
with duckdb.connect("analytics.db") as con:
con.sql("CREATE OR REPLACE TABLE staging AS SELECT * FROM df")
con.sql(f"""
CREATE TABLE IF NOT EXISTS {target_table} AS
SELECT * FROM staging WHERE 1=0
""")
con.sql(f"INSERT INTO {target_table} SELECT * FROM staging")
For production-grade structure, use:
- templates/complete_etl_pipeline.py
Progressive disclosure (read next as needed)
-
patterns/etl.md — canonical ETL pipeline structure
-
patterns/incremental.md — watermark/CDC/incremental loading patterns
-
templates/complete_etl_pipeline.py — full template with logging and checkpoints
-
core-detailed.md — comprehensive reference (extended examples)
Related skills
-
@data-engineering-storage-lakehouse — Delta/Iceberg/Hudi behavior
-
@data-engineering-storage-remote-access — fsspec/pyarrow.fs/obstore cloud access
-
@data-engineering-orchestration — Prefect/Dagster/dbt orchestration
-
@data-engineering-quality — Pandera / Great Expectations validation
-
@data-engineering-observability — OTel + Prometheus monitoring
-
@data-engineering-ai-ml — embedding/vector/RAG pipelines
References
-
Polars Documentation
-
DuckDB Documentation
-
PyArrow Documentation
-
psycopg Documentation