delta-live-tables

Delta Live Tables Skill

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "delta-live-tables" with this command: npx skills add vivekgana/databricks-platform-marketplace/vivekgana-databricks-platform-marketplace-delta-live-tables

Delta Live Tables Skill

Overview

Delta Live Tables (DLT) is a declarative framework for building reliable, maintainable, and testable data processing pipelines. It automatically manages infrastructure, error handling, data quality, and monitoring.

Key Benefits:

  • Declarative pipeline definitions

  • Automatic dependency resolution

  • Built-in data quality checks

  • Real-time monitoring and lineage

  • Simplified error recovery

  • Automatic schema evolution

When to Use This Skill

Use Delta Live Tables when you need to:

  • Build production data pipelines with minimal operational overhead

  • Implement complex data quality rules

  • Create streaming and batch pipelines with unified syntax

  • Track data lineage automatically

  • Simplify pipeline maintenance and debugging

  • Enforce SLAs with expectations

Core Concepts

  1. Tables vs Views

Streaming Tables: Process data incrementally using structured streaming

@dlt.table( comment="Raw sensor events ingested in real-time", table_properties={"quality": "bronze"} ) def sensor_events_raw(): return ( spark.readStream .format("cloudFiles") .option("cloudFiles.format", "json") .load("/mnt/source/sensors/") )

Materialized Views: Computed from queries on other tables/views

@dlt.view( comment="Cleaned sensor events with quality checks" ) def sensor_events_cleaned(): return ( dlt.read_stream("sensor_events_raw") .filter(col("sensor_id").isNotNull()) .withColumn("timestamp", to_timestamp(col("event_time"))) )

  1. Expectations (Data Quality)

Three enforcement levels:

warn: Log violations but continue processing

@dlt.table @dlt.expect("valid_sensor_id", "sensor_id IS NOT NULL") def sensor_data(): return dlt.read("sensor_events_cleaned")

drop: Silently drop violating records

@dlt.table @dlt.expect_or_drop("valid_temperature", "temperature BETWEEN -50 AND 150") def valid_sensor_readings(): return dlt.read("sensor_data")

fail: Stop pipeline on violations

@dlt.table @dlt.expect_or_fail("no_null_ids", "sensor_id IS NOT NULL") def critical_sensor_data(): return dlt.read("sensor_events_cleaned")

  1. Incremental Processing

Streaming Tables automatically handle incremental processing:

@dlt.table def orders_incremental(): return ( dlt.read_stream("orders_raw") .select("order_id", "customer_id", "amount", "order_date") )

Apply Changes for CDC (Change Data Capture):

dlt.create_streaming_table("customers_current")

dlt.apply_changes( target="customers_current", source="customers_cdc", keys=["customer_id"], sequence_by="updated_at", stored_as_scd_type=2 # Slowly Changing Dimension Type 2 )

Implementation Patterns

Pattern 1: Multi-Hop Pipeline (Bronze-Silver-Gold)

Bronze Layer (Raw Ingestion):

import dlt from pyspark.sql.functions import *

@dlt.table( name="bronze_sales_raw", comment="Raw sales data ingested from cloud storage", table_properties={ "quality": "bronze", "pipelines.autoOptimize.managed": "true" } ) def bronze_sales(): return ( spark.readStream .format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.schemaLocation", "/mnt/schemas/sales") .load("/mnt/landing/sales/") .withColumn("ingestion_timestamp", current_timestamp()) .withColumn("source_file", input_file_name()) )

Silver Layer (Cleaned & Validated):

@dlt.table( name="silver_sales_validated", comment="Validated and cleaned sales data", table_properties={"quality": "silver"} ) @dlt.expect_or_drop("valid_sale_id", "sale_id IS NOT NULL") @dlt.expect_or_drop("positive_amount", "amount > 0") @dlt.expect("valid_email", "email RLIKE '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}$'") def silver_sales(): return ( dlt.read_stream("bronze_sales_raw") .select( col("sale_id"), col("customer_id"), col("amount").cast("decimal(10,2)"), lower(trim(col("email"))).alias("email"), to_timestamp(col("sale_timestamp")).alias("sale_timestamp"), col("ingestion_timestamp") ) .dropDuplicates(["sale_id"]) )

Gold Layer (Business Aggregates):

@dlt.table( name="gold_daily_sales_summary", comment="Daily sales aggregates for reporting", table_properties={"quality": "gold"} ) @dlt.expect_or_fail("valid_date", "sale_date IS NOT NULL") def gold_daily_sales(): return ( dlt.read("silver_sales_validated") .groupBy( to_date(col("sale_timestamp")).alias("sale_date"), col("customer_id") ) .agg( count("*").alias("transaction_count"), sum("amount").alias("total_amount"), avg("amount").alias("avg_amount"), min("sale_timestamp").alias("first_transaction"), max("sale_timestamp").alias("last_transaction") ) )

Pattern 2: Change Data Capture (CDC)

import dlt from pyspark.sql.functions import *

Bronze: Ingest CDC events

@dlt.table( name="bronze_customer_cdc", comment="Customer CDC events from upstream system" ) def customer_cdc_bronze(): return ( spark.readStream .format("delta") .option("readChangeFeed", "true") .option("startingVersion", 0) .table("source.customers") )

Silver: Apply changes with SCD Type 2

dlt.create_streaming_table( name="silver_customers_current", comment="Current customer records with history", table_properties={ "quality": "silver", "delta.enableChangeDataFeed": "true" } )

dlt.apply_changes( target="silver_customers_current", source="bronze_customer_cdc", keys=["customer_id"], sequence_by="updated_timestamp", apply_as_deletes=expr("operation = 'DELETE'"), except_column_list=["operation", "source_timestamp"], stored_as_scd_type=2 )

Gold: Active customers only

@dlt.table( name="gold_active_customers", comment="Currently active customer records" ) def active_customers(): return ( dlt.read("silver_customers_current") .filter(col("__END_AT").isNull()) # SCD Type 2: current records .filter(col("status") == "active") .select( "customer_id", "name", "email", "segment", "lifetime_value", col("__START_AT").alias("valid_from") ) )

Pattern 3: Complex Data Quality Rules

import dlt from pyspark.sql.functions import *

@dlt.table( name="silver_orders_validated", comment="Orders with comprehensive quality checks" )

Basic expectations

@dlt.expect("valid_order_id", "order_id IS NOT NULL") @dlt.expect_or_drop("positive_amount", "total_amount > 0") @dlt.expect_or_drop("valid_quantity", "quantity > 0 AND quantity < 1000")

Email format validation

@dlt.expect("valid_email_format", "customer_email RLIKE '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}$'" )

Date range validation

@dlt.expect_or_drop("order_date_in_range", "order_date >= '2020-01-01' AND order_date <= current_date()" )

Referential integrity

@dlt.expect_or_fail("valid_customer", "customer_id IN (SELECT customer_id FROM LIVE.silver_customers_current)" )

Business rules

@dlt.expect("reasonable_unit_price", "unit_price >= 0.01 AND unit_price <= 100000" )

Composite validation

@dlt.expect("amount_matches_calculation", "ABS(total_amount - (quantity * unit_price)) < 0.01" )

def orders_validated(): return ( dlt.read_stream("bronze_orders_raw") .select( "order_id", "customer_id", "customer_email", col("total_amount").cast("decimal(10,2)"), col("quantity").cast("int"), col("unit_price").cast("decimal(10,2)"), to_date(col("order_date")).alias("order_date"), current_timestamp().alias("validated_at") ) )

Pattern 4: Streaming Joins

@dlt.table( name="gold_enriched_transactions", comment="Transactions enriched with customer and product data" ) def enriched_transactions(): transactions = dlt.read_stream("silver_transactions") customers = dlt.read("silver_customers_current") products = dlt.read("silver_products")

return (
    transactions
        .join(
            customers,
            transactions.customer_id == customers.customer_id,
            "left"
        )
        .join(
            products,
            transactions.product_id == products.product_id,
            "left"
        )
        .select(
            transactions["*"],
            customers["customer_name"],
            customers["customer_segment"],
            products["product_name"],
            products["category"]
        )
)

Pipeline Configuration

DLT Pipeline Settings (YAML)

databricks.yml or pipeline configuration

name: sales_pipeline target: production_db storage: /mnt/dlt/sales_pipeline

clusters:

  • label: default num_workers: 4 node_type_id: i3.xlarge spark_conf: spark.databricks.delta.preview.enabled: "true" spark.databricks.delta.properties.defaults.enableChangeDataFeed: "true"

libraries:

  • notebook: path: /Workspace/pipelines/bronze_ingestion
  • notebook: path: /Workspace/pipelines/silver_transformation
  • notebook: path: /Workspace/pipelines/gold_aggregation

configuration: source_path: /mnt/landing/sales checkpoint_path: /mnt/checkpoints/sales

continuous: false # Set to true for continuous processing development: false # Set to true for development mode

notifications:

Runtime Configuration

Access pipeline configuration in notebooks

source_path = spark.conf.get("source_path") checkpoint_path = spark.conf.get("checkpoint_path")

@dlt.table def configured_ingestion(): return ( spark.readStream .format("cloudFiles") .option("cloudFiles.format", "json") .load(source_path) )

Monitoring and Observability

Event Log Queries

Query DLT event log

event_log_path = f"{storage_location}/system/events"

events_df = ( spark.read .format("delta") .load(event_log_path) )

Data quality metrics

quality_metrics = ( events_df .filter(col("event_type") == "flow_progress") .select( col("timestamp"), col("details.flow_progress.metrics.num_output_rows").alias("output_rows"), col("details.flow_progress.data_quality.dropped_records").alias("dropped_records"), col("details.flow_progress.data_quality.expectations").alias("expectations") ) )

quality_metrics.show()

Custom Metrics

@dlt.table def monitored_pipeline(): df = dlt.read_stream("source_data")

# Log custom metrics
row_count = df.count()
spark.conf.set("pipeline.custom_metric.row_count", row_count)

return df

Testing Strategies

Unit Testing

tests/test_dlt_transformations.py

import pytest from pyspark.sql import SparkSession

def test_silver_transformation(): spark = SparkSession.builder.getOrCreate()

# Create test data
test_data = [
    ("1", "customer@example.com", 100.50),
    ("2", "INVALID_EMAIL", 200.00),
    ("3", None, -50.00)  # Invalid data
]

df = spark.createDataFrame(test_data, ["sale_id", "email", "amount"])

# Apply transformation logic (extracted from DLT notebook)
result = transform_to_silver(df)

# Assertions
assert result.count() == 1  # Only valid records
assert result.filter(col("email").contains("@")).count() == 1

Integration Testing

tests/test_dlt_pipeline.py

def test_pipeline_expectations(): """Test that expectations are properly defined.""" from databricks.sdk import WorkspaceClient

w = WorkspaceClient()

pipeline = w.pipelines.get(pipeline_id="your-pipeline-id")
update = w.pipelines.start_update(pipeline_id=pipeline.pipeline_id)

# Wait for completion
update = w.pipelines.get_update(
    pipeline_id=pipeline.pipeline_id,
    update_id=update.update_id
)

# Verify expectations
assert update.state == "COMPLETED"
assert update.data_quality_metrics.passed_records > 0

Best Practices

  1. Pipeline Organization

/pipelines/ ├── bronze/ │ ├── ingest_sales.py │ └── ingest_customers.py ├── silver/ │ ├── validate_sales.py │ └── validate_customers.py └── gold/ ├── daily_aggregates.py └── customer_features.py

  1. Naming Conventions
  • Use descriptive table names: bronze_sales_raw , silver_sales_validated

  • Add layer prefix: bronze_ , silver_ , gold_

  • Include domain: sales , customers , products

  1. Data Quality Strategy
  • Bronze: Minimal quality checks (schema validation)

  • Silver: Comprehensive validation and cleansing

  • Gold: Business rule validation and aggregate checks

  1. Error Handling

@dlt.table def resilient_processing(): return ( dlt.read_stream("source") .withColumn( "processing_status", when(col("id").isNull(), "invalid") .when(col("amount") < 0, "invalid") .otherwise("valid") ) )

Separate invalid records for review

@dlt.table def invalid_records(): return ( dlt.read("resilient_processing") .filter(col("processing_status") == "invalid") )

  1. Performance Optimization

@dlt.table( table_properties={ "pipelines.autoOptimize.zOrderCols": "customer_id,order_date", "delta.autoOptimize.optimizeWrite": "true", "delta.autoOptimize.autoCompact": "true" } ) def optimized_table(): return dlt.read_stream("source")

Complete Examples

See /examples/ directory for complete implementations:

  • complete-dlt-pipeline/ : Full Bronze-Silver-Gold pipeline

  • cdc-pipeline/ : Change Data Capture with SCD Type 2

  • streaming-aggregation/ : Real-time windowed aggregations

Common Pitfalls to Avoid

❌ Don't:

  • Mix streaming and batch reads without understanding implications

  • Forget to set appropriate expectation levels (warn/drop/fail)

  • Ignore data quality metrics in event logs

  • Use SELECT * in production pipelines

  • Hard-code paths and configurations

✅ Do:

  • Use parameterized configurations

  • Implement comprehensive data quality checks

  • Monitor pipeline metrics and SLAs

  • Test transformations independently

  • Document expectations and business rules

  • Use appropriate clustering strategies

Related Skills

  • medallion-architecture : Layer design patterns

  • data-quality : Great Expectations integration

  • testing-patterns : Pipeline testing strategies

  • cicd-workflows : Automated deployment

References

  • Delta Live Tables Documentation

  • DLT Expectations

  • DLT API Reference

  • DLT Best Practices

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

databricks-asset-bundles

No summary provided by upstream source.

Repository SourceNeeds Review
General

medallion-architecture

No summary provided by upstream source.

Repository SourceNeeds Review
General

data-products

No summary provided by upstream source.

Repository SourceNeeds Review
General

access-management

No summary provided by upstream source.

Repository SourceNeeds Review