Medallion Architecture Skill
Overview
The medallion architecture (also called multi-hop architecture) is a design pattern for organizing data in a lakehouse using three progressive layers:
-
Bronze (Raw): Ingested data in its original format
-
Silver (Refined): Cleansed and conformed data
-
Gold (Curated): Business-level aggregates and features
When to Use This Skill
Use this skill when you need to:
-
Design a new data pipeline with proper layering
-
Migrate from traditional ETL to lakehouse architecture
-
Implement incremental processing patterns
-
Build a scalable data platform
-
Ensure data quality at each layer
Architecture Principles
- Bronze Layer (Raw)
Purpose: Store raw data exactly as received from source systems
Characteristics:
-
Immutable historical record
-
Schema-on-read approach
-
Metadata enrichment (_ingested_at, _source_file)
-
Minimal transformations
-
Full audit trail
Use Cases:
-
Data recovery
-
Reprocessing requirements
-
Audit compliance
-
Debugging data issues
- Silver Layer (Refined)
Purpose: Cleansed, validated, and standardized data
Characteristics:
-
Schema enforcement
-
Data quality checks
-
Deduplication
-
Standardization
-
Type conversions
-
Business rules applied
Use Cases:
-
Downstream analytics
-
Feature engineering
-
Data science modeling
-
Operational reporting
- Gold Layer (Curated)
Purpose: Business-level aggregates optimized for consumption
Characteristics:
-
Highly aggregated
-
Optimized for queries
-
Business KPIs
-
Feature tables
-
Production-ready datasets
Use Cases:
-
Dashboards and BI
-
ML model serving
-
Real-time applications
-
Executive reporting
Implementation Patterns
Pattern 1: Batch Processing
Bronze Layer:
def ingest_to_bronze(source_path: str, target_table: str): """Ingest raw data to Bronze layer.""" df = (spark.read .format("cloudFiles") .option("cloudFiles.format", "parquet") .load(source_path) .withColumn("_ingested_at", current_timestamp()) .withColumn("_source_file", input_file_name()) )
(df.write
.format("delta")
.mode("append")
.option("mergeSchema", "true")
.saveAsTable(target_table)
)
Silver Layer:
def process_to_silver(bronze_table: str, silver_table: str): """Transform Bronze to Silver with quality checks.""" bronze_df = spark.read.table(bronze_table)
silver_df = (bronze_df
.dropDuplicates(["id"])
.filter(col("id").isNotNull())
.withColumn("email", lower(trim(col("email"))))
.withColumn("created_date", to_date(col("created_at")))
.withColumn("quality_score",
when(col("email").rlike(r"^[\w\.-]+@[\w\.-]+\.\w+$"), 1.0)
.otherwise(0.5)
)
)
(silver_df.write
.format("delta")
.mode("overwrite")
.saveAsTable(silver_table)
)
Gold Layer:
def aggregate_to_gold(silver_table: str, gold_table: str): """Aggregate Silver to Gold business metrics.""" silver_df = spark.read.table(silver_table)
gold_df = (silver_df
.groupBy("customer_segment", "region")
.agg(
count("*").alias("customer_count"),
sum("lifetime_value").alias("total_ltv"),
avg("quality_score").alias("avg_quality")
)
.withColumn("updated_at", current_timestamp())
)
(gold_df.write
.format("delta")
.mode("overwrite")
.saveAsTable(gold_table)
)
Pattern 2: Incremental Processing
Bronze (Streaming):
(spark.readStream .format("cloudFiles") .option("cloudFiles.format", "json") .load(source_path) .withColumn("_ingested_at", current_timestamp()) .writeStream .format("delta") .option("checkpointLocation", checkpoint_path) .trigger(availableNow=True) .toTable(bronze_table) )
Silver (Incremental Merge):
from delta.tables import DeltaTable
def incremental_silver_merge(bronze_table: str, silver_table: str, watermark: str): """Incrementally merge new Bronze data into Silver."""
# Get new records since last watermark
new_records = (spark.read.table(bronze_table)
.filter(col("_ingested_at") > watermark)
)
# Transform
transformed = transform_to_silver(new_records)
# Merge into Silver
silver = DeltaTable.forName(spark, silver_table)
(silver.alias("target")
.merge(
transformed.alias("source"),
"target.id = source.id"
)
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
Data Quality Patterns
Quality Checks at Each Layer
Bronze:
-
File completeness check
-
Row count validation
-
Schema drift detection
Silver:
-
Null value checks
-
Data type validation
-
Business rule validation
-
Referential integrity
-
Duplicate detection
Gold:
-
Aggregate accuracy
-
KPI threshold checks
-
Trend anomaly detection
-
Completeness validation
Quality Check Implementation
def validate_silver_quality(table_name: str) -> Dict[str, bool]: """Run quality checks on Silver table.""" df = spark.read.table(table_name)
checks = {
"no_null_ids": df.filter(col("id").isNull()).count() == 0,
"valid_emails": df.filter(
~col("email").rlike(r"^[\w\.-]+@[\w\.-]+\.\w+$")
).count() == 0,
"no_duplicates": df.count() == df.select("id").distinct().count(),
"within_date_range": df.filter(
(col("created_date") < "2020-01-01") |
(col("created_date") > current_date())
).count() == 0
}
return checks
Optimization Strategies
Bronze Layer Optimization
-- Partition by ingestion date CREATE TABLE bronze.raw_events USING delta PARTITIONED BY (ingestion_date) AS SELECT *, current_date() as ingestion_date FROM source;
-- Enable auto-optimize ALTER TABLE bronze.raw_events SET TBLPROPERTIES ( 'delta.autoOptimize.optimizeWrite' = 'true', 'delta.autoOptimize.autoCompact' = 'true' );
Silver Layer Optimization
-- Z-ORDER for common filters OPTIMIZE silver.customers ZORDER BY (customer_segment, region, created_date);
-- Enable Change Data Feed ALTER TABLE silver.customers SET TBLPROPERTIES (delta.enableChangeDataFeed = true);
Gold Layer Optimization
-- Liquid clustering for query performance CREATE TABLE gold.customer_metrics USING delta CLUSTER BY (customer_segment, date) AS SELECT * FROM aggregated_metrics;
-- Optimize and vacuum OPTIMIZE gold.customer_metrics; VACUUM gold.customer_metrics RETAIN 168 HOURS;
Complete Example
See /templates/bronze-silver-gold/ for a complete implementation including:
-
Project structure
-
Bronze ingestion scripts
-
Silver transformation logic
-
Gold aggregation queries
-
Data quality tests
-
Deployment configuration
Best Practices
-
Idempotency: Ensure pipelines can be re-run safely
-
Incrementality: Process only new/changed data
-
Quality Gates: Block bad data from progressing
-
Schema Evolution: Handle schema changes gracefully
-
Monitoring: Track pipeline health and data quality
-
Documentation: Document data lineage and transformations
-
Testing: Unit test transformations, integration test pipelines
Common Pitfalls to Avoid
❌ Don't:
-
Mix transformation logic across layers
-
Skip Bronze layer to "save storage"
-
Over-aggregate too early
-
Ignore data quality in Silver
-
Hard-code business logic in Bronze
✅ Do:
-
Keep Bronze immutable
-
Enforce quality in Silver
-
Optimize Gold for consumption
-
Use incremental processing
-
Implement proper monitoring
Related Skills
-
delta-live-tables : Declarative pipeline orchestration
-
data-quality : Great Expectations integration
-
testing-patterns : Pipeline testing strategies
-
cicd-workflows : Deployment automation
References
-
Databricks Medallion Architecture
-
Delta Lake Best Practices