Delta Live Tables Skill
Overview
Delta Live Tables (DLT) is a declarative framework for building reliable, maintainable, and testable data processing pipelines. It automatically manages infrastructure, error handling, data quality, and monitoring.
Key Benefits:
-
Declarative pipeline definitions
-
Automatic dependency resolution
-
Built-in data quality checks
-
Real-time monitoring and lineage
-
Simplified error recovery
-
Automatic schema evolution
When to Use This Skill
Use Delta Live Tables when you need to:
-
Build production data pipelines with minimal operational overhead
-
Implement complex data quality rules
-
Create streaming and batch pipelines with unified syntax
-
Track data lineage automatically
-
Simplify pipeline maintenance and debugging
-
Enforce SLAs with expectations
Core Concepts
- Tables vs Views
Streaming Tables: Process data incrementally using structured streaming
@dlt.table( comment="Raw sensor events ingested in real-time", table_properties={"quality": "bronze"} ) def sensor_events_raw(): return ( spark.readStream .format("cloudFiles") .option("cloudFiles.format", "json") .load("/mnt/source/sensors/") )
Materialized Views: Computed from queries on other tables/views
@dlt.view( comment="Cleaned sensor events with quality checks" ) def sensor_events_cleaned(): return ( dlt.read_stream("sensor_events_raw") .filter(col("sensor_id").isNotNull()) .withColumn("timestamp", to_timestamp(col("event_time"))) )
- Expectations (Data Quality)
Three enforcement levels:
warn: Log violations but continue processing
@dlt.table @dlt.expect("valid_sensor_id", "sensor_id IS NOT NULL") def sensor_data(): return dlt.read("sensor_events_cleaned")
drop: Silently drop violating records
@dlt.table @dlt.expect_or_drop("valid_temperature", "temperature BETWEEN -50 AND 150") def valid_sensor_readings(): return dlt.read("sensor_data")
fail: Stop pipeline on violations
@dlt.table @dlt.expect_or_fail("no_null_ids", "sensor_id IS NOT NULL") def critical_sensor_data(): return dlt.read("sensor_events_cleaned")
- Incremental Processing
Streaming Tables automatically handle incremental processing:
@dlt.table def orders_incremental(): return ( dlt.read_stream("orders_raw") .select("order_id", "customer_id", "amount", "order_date") )
Apply Changes for CDC (Change Data Capture):
dlt.create_streaming_table("customers_current")
dlt.apply_changes( target="customers_current", source="customers_cdc", keys=["customer_id"], sequence_by="updated_at", stored_as_scd_type=2 # Slowly Changing Dimension Type 2 )
Implementation Patterns
Pattern 1: Multi-Hop Pipeline (Bronze-Silver-Gold)
Bronze Layer (Raw Ingestion):
import dlt from pyspark.sql.functions import *
@dlt.table( name="bronze_sales_raw", comment="Raw sales data ingested from cloud storage", table_properties={ "quality": "bronze", "pipelines.autoOptimize.managed": "true" } ) def bronze_sales(): return ( spark.readStream .format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.schemaLocation", "/mnt/schemas/sales") .load("/mnt/landing/sales/") .withColumn("ingestion_timestamp", current_timestamp()) .withColumn("source_file", input_file_name()) )
Silver Layer (Cleaned & Validated):
@dlt.table( name="silver_sales_validated", comment="Validated and cleaned sales data", table_properties={"quality": "silver"} ) @dlt.expect_or_drop("valid_sale_id", "sale_id IS NOT NULL") @dlt.expect_or_drop("positive_amount", "amount > 0") @dlt.expect("valid_email", "email RLIKE '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}$'") def silver_sales(): return ( dlt.read_stream("bronze_sales_raw") .select( col("sale_id"), col("customer_id"), col("amount").cast("decimal(10,2)"), lower(trim(col("email"))).alias("email"), to_timestamp(col("sale_timestamp")).alias("sale_timestamp"), col("ingestion_timestamp") ) .dropDuplicates(["sale_id"]) )
Gold Layer (Business Aggregates):
@dlt.table( name="gold_daily_sales_summary", comment="Daily sales aggregates for reporting", table_properties={"quality": "gold"} ) @dlt.expect_or_fail("valid_date", "sale_date IS NOT NULL") def gold_daily_sales(): return ( dlt.read("silver_sales_validated") .groupBy( to_date(col("sale_timestamp")).alias("sale_date"), col("customer_id") ) .agg( count("*").alias("transaction_count"), sum("amount").alias("total_amount"), avg("amount").alias("avg_amount"), min("sale_timestamp").alias("first_transaction"), max("sale_timestamp").alias("last_transaction") ) )
Pattern 2: Change Data Capture (CDC)
import dlt from pyspark.sql.functions import *
Bronze: Ingest CDC events
@dlt.table( name="bronze_customer_cdc", comment="Customer CDC events from upstream system" ) def customer_cdc_bronze(): return ( spark.readStream .format("delta") .option("readChangeFeed", "true") .option("startingVersion", 0) .table("source.customers") )
Silver: Apply changes with SCD Type 2
dlt.create_streaming_table( name="silver_customers_current", comment="Current customer records with history", table_properties={ "quality": "silver", "delta.enableChangeDataFeed": "true" } )
dlt.apply_changes( target="silver_customers_current", source="bronze_customer_cdc", keys=["customer_id"], sequence_by="updated_timestamp", apply_as_deletes=expr("operation = 'DELETE'"), except_column_list=["operation", "source_timestamp"], stored_as_scd_type=2 )
Gold: Active customers only
@dlt.table( name="gold_active_customers", comment="Currently active customer records" ) def active_customers(): return ( dlt.read("silver_customers_current") .filter(col("__END_AT").isNull()) # SCD Type 2: current records .filter(col("status") == "active") .select( "customer_id", "name", "email", "segment", "lifetime_value", col("__START_AT").alias("valid_from") ) )
Pattern 3: Complex Data Quality Rules
import dlt from pyspark.sql.functions import *
@dlt.table( name="silver_orders_validated", comment="Orders with comprehensive quality checks" )
Basic expectations
@dlt.expect("valid_order_id", "order_id IS NOT NULL") @dlt.expect_or_drop("positive_amount", "total_amount > 0") @dlt.expect_or_drop("valid_quantity", "quantity > 0 AND quantity < 1000")
Email format validation
@dlt.expect("valid_email_format", "customer_email RLIKE '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}$'" )
Date range validation
@dlt.expect_or_drop("order_date_in_range", "order_date >= '2020-01-01' AND order_date <= current_date()" )
Referential integrity
@dlt.expect_or_fail("valid_customer", "customer_id IN (SELECT customer_id FROM LIVE.silver_customers_current)" )
Business rules
@dlt.expect("reasonable_unit_price", "unit_price >= 0.01 AND unit_price <= 100000" )
Composite validation
@dlt.expect("amount_matches_calculation", "ABS(total_amount - (quantity * unit_price)) < 0.01" )
def orders_validated(): return ( dlt.read_stream("bronze_orders_raw") .select( "order_id", "customer_id", "customer_email", col("total_amount").cast("decimal(10,2)"), col("quantity").cast("int"), col("unit_price").cast("decimal(10,2)"), to_date(col("order_date")).alias("order_date"), current_timestamp().alias("validated_at") ) )
Pattern 4: Streaming Joins
@dlt.table( name="gold_enriched_transactions", comment="Transactions enriched with customer and product data" ) def enriched_transactions(): transactions = dlt.read_stream("silver_transactions") customers = dlt.read("silver_customers_current") products = dlt.read("silver_products")
return (
transactions
.join(
customers,
transactions.customer_id == customers.customer_id,
"left"
)
.join(
products,
transactions.product_id == products.product_id,
"left"
)
.select(
transactions["*"],
customers["customer_name"],
customers["customer_segment"],
products["product_name"],
products["category"]
)
)
Pipeline Configuration
DLT Pipeline Settings (YAML)
databricks.yml or pipeline configuration
name: sales_pipeline target: production_db storage: /mnt/dlt/sales_pipeline
clusters:
- label: default num_workers: 4 node_type_id: i3.xlarge spark_conf: spark.databricks.delta.preview.enabled: "true" spark.databricks.delta.properties.defaults.enableChangeDataFeed: "true"
libraries:
- notebook: path: /Workspace/pipelines/bronze_ingestion
- notebook: path: /Workspace/pipelines/silver_transformation
- notebook: path: /Workspace/pipelines/gold_aggregation
configuration: source_path: /mnt/landing/sales checkpoint_path: /mnt/checkpoints/sales
continuous: false # Set to true for continuous processing development: false # Set to true for development mode
notifications:
- email_recipients:
- data-team@company.com on_failure: true on_success: false
Runtime Configuration
Access pipeline configuration in notebooks
source_path = spark.conf.get("source_path") checkpoint_path = spark.conf.get("checkpoint_path")
@dlt.table def configured_ingestion(): return ( spark.readStream .format("cloudFiles") .option("cloudFiles.format", "json") .load(source_path) )
Monitoring and Observability
Event Log Queries
Query DLT event log
event_log_path = f"{storage_location}/system/events"
events_df = ( spark.read .format("delta") .load(event_log_path) )
Data quality metrics
quality_metrics = ( events_df .filter(col("event_type") == "flow_progress") .select( col("timestamp"), col("details.flow_progress.metrics.num_output_rows").alias("output_rows"), col("details.flow_progress.data_quality.dropped_records").alias("dropped_records"), col("details.flow_progress.data_quality.expectations").alias("expectations") ) )
quality_metrics.show()
Custom Metrics
@dlt.table def monitored_pipeline(): df = dlt.read_stream("source_data")
# Log custom metrics
row_count = df.count()
spark.conf.set("pipeline.custom_metric.row_count", row_count)
return df
Testing Strategies
Unit Testing
tests/test_dlt_transformations.py
import pytest from pyspark.sql import SparkSession
def test_silver_transformation(): spark = SparkSession.builder.getOrCreate()
# Create test data
test_data = [
("1", "customer@example.com", 100.50),
("2", "INVALID_EMAIL", 200.00),
("3", None, -50.00) # Invalid data
]
df = spark.createDataFrame(test_data, ["sale_id", "email", "amount"])
# Apply transformation logic (extracted from DLT notebook)
result = transform_to_silver(df)
# Assertions
assert result.count() == 1 # Only valid records
assert result.filter(col("email").contains("@")).count() == 1
Integration Testing
tests/test_dlt_pipeline.py
def test_pipeline_expectations(): """Test that expectations are properly defined.""" from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
pipeline = w.pipelines.get(pipeline_id="your-pipeline-id")
update = w.pipelines.start_update(pipeline_id=pipeline.pipeline_id)
# Wait for completion
update = w.pipelines.get_update(
pipeline_id=pipeline.pipeline_id,
update_id=update.update_id
)
# Verify expectations
assert update.state == "COMPLETED"
assert update.data_quality_metrics.passed_records > 0
Best Practices
- Pipeline Organization
/pipelines/ ├── bronze/ │ ├── ingest_sales.py │ └── ingest_customers.py ├── silver/ │ ├── validate_sales.py │ └── validate_customers.py └── gold/ ├── daily_aggregates.py └── customer_features.py
- Naming Conventions
-
Use descriptive table names: bronze_sales_raw , silver_sales_validated
-
Add layer prefix: bronze_ , silver_ , gold_
-
Include domain: sales , customers , products
- Data Quality Strategy
-
Bronze: Minimal quality checks (schema validation)
-
Silver: Comprehensive validation and cleansing
-
Gold: Business rule validation and aggregate checks
- Error Handling
@dlt.table def resilient_processing(): return ( dlt.read_stream("source") .withColumn( "processing_status", when(col("id").isNull(), "invalid") .when(col("amount") < 0, "invalid") .otherwise("valid") ) )
Separate invalid records for review
@dlt.table def invalid_records(): return ( dlt.read("resilient_processing") .filter(col("processing_status") == "invalid") )
- Performance Optimization
@dlt.table( table_properties={ "pipelines.autoOptimize.zOrderCols": "customer_id,order_date", "delta.autoOptimize.optimizeWrite": "true", "delta.autoOptimize.autoCompact": "true" } ) def optimized_table(): return dlt.read_stream("source")
Complete Examples
See /examples/ directory for complete implementations:
-
complete-dlt-pipeline/ : Full Bronze-Silver-Gold pipeline
-
cdc-pipeline/ : Change Data Capture with SCD Type 2
-
streaming-aggregation/ : Real-time windowed aggregations
Common Pitfalls to Avoid
❌ Don't:
-
Mix streaming and batch reads without understanding implications
-
Forget to set appropriate expectation levels (warn/drop/fail)
-
Ignore data quality metrics in event logs
-
Use SELECT * in production pipelines
-
Hard-code paths and configurations
✅ Do:
-
Use parameterized configurations
-
Implement comprehensive data quality checks
-
Monitor pipeline metrics and SLAs
-
Test transformations independently
-
Document expectations and business rules
-
Use appropriate clustering strategies
Related Skills
-
medallion-architecture : Layer design patterns
-
data-quality : Great Expectations integration
-
testing-patterns : Pipeline testing strategies
-
cicd-workflows : Automated deployment
References
-
Delta Live Tables Documentation
-
DLT Expectations
-
DLT API Reference
-
DLT Best Practices