Data Quality Frameworks
Production patterns for implementing data quality with Great Expectations, dbt tests, and data contracts to ensure reliable data pipelines.
When to Use This Skill
-
Implementing data quality checks in pipelines
-
Setting up Great Expectations validation
-
Building comprehensive dbt test suites
-
Establishing data contracts between teams
-
Monitoring data quality metrics
-
Automating data validation in CI/CD
Core Concepts
- Data Quality Dimensions
Dimension Description Example Check
Completeness No missing values expect_column_values_to_not_be_null
Uniqueness No duplicates expect_column_values_to_be_unique
Validity Values in expected range expect_column_values_to_be_in_set
Accuracy Data matches reality Cross-reference validation
Consistency No contradictions expect_column_pair_values_A_to_be_greater_than_B
Timeliness Data is recent expect_column_max_to_be_between
-
Testing Pyramid for Data
/\ / \ Integration Tests (cross-table) /────\ / \ Unit Tests (single column)/────────
/ \ Schema Tests (structure) /────────────\
Quick Start
Great Expectations Setup
Install
pip install great_expectations
Initialize project
great_expectations init
Create datasource
great_expectations datasource new
great_expectations/checkpoints/daily_validation.yml
import great_expectations as gx
Create context
context = gx.get_context()
Create expectation suite
suite = context.add_expectation_suite("orders_suite")
Add expectations
suite.add_expectation( gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id") ) suite.add_expectation( gx.expectations.ExpectColumnValuesToBeUnique(column="order_id") )
Validate
results = context.run_checkpoint(checkpoint_name="daily_orders")
Patterns
Pattern 1: Great Expectations Suite
expectations/orders_suite.py
import great_expectations as gx from great_expectations.core import ExpectationSuite from great_expectations.core.expectation_configuration import ExpectationConfiguration
def build_orders_suite() -> ExpectationSuite: """Build comprehensive orders expectation suite"""
suite = ExpectationSuite(expectation_suite_name="orders_suite")
# Schema expectations
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_table_columns_to_match_set",
kwargs={
"column_set": ["order_id", "customer_id", "amount", "status", "created_at"],
"exact_match": False # Allow additional columns
}
))
# Primary key
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_column_values_to_not_be_null",
kwargs={"column": "order_id"}
))
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_column_values_to_be_unique",
kwargs={"column": "order_id"}
))
# Foreign key
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_column_values_to_not_be_null",
kwargs={"column": "customer_id"}
))
# Categorical values
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_column_values_to_be_in_set",
kwargs={
"column": "status",
"value_set": ["pending", "processing", "shipped", "delivered", "cancelled"]
}
))
# Numeric ranges
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_column_values_to_be_between",
kwargs={
"column": "amount",
"min_value": 0,
"max_value": 100000,
"strict_min": True # amount > 0
}
))
# Date validity
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_column_values_to_be_dateutil_parseable",
kwargs={"column": "created_at"}
))
# Freshness - data should be recent
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_column_max_to_be_between",
kwargs={
"column": "created_at",
"min_value": {"$PARAMETER": "now - timedelta(days=1)"},
"max_value": {"$PARAMETER": "now"}
}
))
# Row count sanity
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_table_row_count_to_be_between",
kwargs={
"min_value": 1000, # Expect at least 1000 rows
"max_value": 10000000
}
))
# Statistical expectations
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_column_mean_to_be_between",
kwargs={
"column": "amount",
"min_value": 50,
"max_value": 500
}
))
return suite
Pattern 2: Great Expectations Checkpoint
great_expectations/checkpoints/orders_checkpoint.yml
name: orders_checkpoint config_version: 1.0 class_name: Checkpoint run_name_template: "%Y%m%d-%H%M%S-orders-validation"
validations:
- batch_request: datasource_name: warehouse data_connector_name: default_inferred_data_connector_name data_asset_name: orders data_connector_query: index: -1 # Latest batch expectation_suite_name: orders_suite
action_list:
-
name: store_validation_result action: class_name: StoreValidationResultAction
-
name: store_evaluation_parameters action: class_name: StoreEvaluationParametersAction
-
name: update_data_docs action: class_name: UpdateDataDocsAction
Slack notification on failure
- name: send_slack_notification action: class_name: SlackNotificationAction slack_webhook: ${SLACK_WEBHOOK} notify_on: failure renderer: module_name: great_expectations.render.renderer.slack_renderer class_name: SlackRenderer
Run checkpoint
import great_expectations as gx
context = gx.get_context() result = context.run_checkpoint(checkpoint_name="orders_checkpoint")
if not result.success: failed_expectations = [ r for r in result.run_results.values() if not r.success ] raise ValueError(f"Data quality check failed: {failed_expectations}")
Pattern 3: dbt Data Tests
models/marts/core/_core__models.yml
version: 2
models:
-
name: fct_orders description: Order fact table tests:
Table-level tests
- dbt_utils.recency: datepart: day field: created_at interval: 1
- dbt_utils.at_least_one
- dbt_utils.expression_is_true: expression: "total_amount >= 0"
columns:
-
name: order_id description: Primary key tests:
- unique
- not_null
-
name: customer_id description: Foreign key to dim_customers tests:
- not_null
- relationships: to: ref('dim_customers') field: customer_id
-
name: order_status tests:
- accepted_values: values: ["pending", "processing", "shipped", "delivered", "cancelled"]
-
name: total_amount tests:
- not_null
- dbt_utils.expression_is_true: expression: ">= 0"
-
name: created_at tests:
- not_null
- dbt_utils.expression_is_true: expression: "<= current_timestamp"
-
name: dim_customers columns:
-
name: customer_id tests:
- unique
- not_null
-
name: email tests:
- unique
- not_null
Custom regex test
- dbt_utils.expression_is_true: expression: "email ~ '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$'"
-
Pattern 4: Custom dbt Tests
-- tests/generic/test_row_count_in_range.sql {% test row_count_in_range(model, min_count, max_count) %}
with row_count as ( select count(*) as cnt from {{ model }} )
select cnt from row_count where cnt < {{ min_count }} or cnt > {{ max_count }}
{% endtest %}
-- Usage in schema.yml: -- tests: -- - row_count_in_range: -- min_count: 1000 -- max_count: 10000000
-- tests/generic/test_sequential_values.sql {% test sequential_values(model, column_name, interval=1) %}
with lagged as ( select {{ column_name }}, lag({{ column_name }}) over (order by {{ column_name }}) as prev_value from {{ model }} )
select * from lagged where {{ column_name }} - prev_value != {{ interval }} and prev_value is not null
{% endtest %}
-- tests/singular/assert_orders_customers_match.sql -- Singular test: specific business rule
with orders_customers as ( select distinct customer_id from {{ ref('fct_orders') }} ),
dim_customers as ( select customer_id from {{ ref('dim_customers') }} ),
orphaned_orders as ( select o.customer_id from orders_customers o left join dim_customers c using (customer_id) where c.customer_id is null )
select * from orphaned_orders -- Test passes if this returns 0 rows
Pattern 5: Data Contracts
contracts/orders_contract.yaml
apiVersion: datacontract.com/v1.0.0 kind: DataContract metadata: name: orders version: 1.0.0 owner: data-platform-team contact: data-team@company.com
info: title: Orders Data Contract description: Contract for order event data from the ecommerce platform purpose: Analytics, reporting, and ML features
servers: production: type: snowflake account: company.us-east-1 database: ANALYTICS schema: CORE
terms: usage: Internal analytics only limitations: PII must not be exposed in downstream marts billing: Charged per query TB scanned
schema: type: object properties: order_id: type: string format: uuid description: Unique order identifier required: true unique: true pii: false
customer_id:
type: string
format: uuid
description: Customer identifier
required: true
pii: true
piiClassification: indirect
total_amount:
type: number
minimum: 0
maximum: 100000
description: Order total in USD
created_at:
type: string
format: date-time
description: Order creation timestamp
required: true
status:
type: string
enum: [pending, processing, shipped, delivered, cancelled]
description: Current order status
quality: type: SodaCL specification: checks for orders: - row_count > 0 - missing_count(order_id) = 0 - duplicate_count(order_id) = 0 - invalid_count(status) = 0: valid values: [pending, processing, shipped, delivered, cancelled] - freshness(created_at) < 24h
sla: availability: 99.9% freshness: 1 hour latency: 5 minutes
Pattern 6: Automated Quality Pipeline
quality_pipeline.py
from dataclasses import dataclass from typing import List, Dict, Any import great_expectations as gx from datetime import datetime
@dataclass class QualityResult: table: str passed: bool total_expectations: int failed_expectations: int details: List[Dict[str, Any]] timestamp: datetime
class DataQualityPipeline: """Orchestrate data quality checks across tables"""
def __init__(self, context: gx.DataContext):
self.context = context
self.results: List[QualityResult] = []
def validate_table(self, table: str, suite: str) -> QualityResult:
"""Validate a single table against expectation suite"""
checkpoint_config = {
"name": f"{table}_validation",
"config_version": 1.0,
"class_name": "Checkpoint",
"validations": [{
"batch_request": {
"datasource_name": "warehouse",
"data_asset_name": table,
},
"expectation_suite_name": suite,
}],
}
result = self.context.run_checkpoint(**checkpoint_config)
# Parse results
validation_result = list(result.run_results.values())[0]
results = validation_result.results
failed = [r for r in results if not r.success]
return QualityResult(
table=table,
passed=result.success,
total_expectations=len(results),
failed_expectations=len(failed),
details=[{
"expectation": r.expectation_config.expectation_type,
"success": r.success,
"observed_value": r.result.get("observed_value"),
} for r in results],
timestamp=datetime.now()
)
def run_all(self, tables: Dict[str, str]) -> Dict[str, QualityResult]:
"""Run validation for all tables"""
results = {}
for table, suite in tables.items():
print(f"Validating {table}...")
results[table] = self.validate_table(table, suite)
return results
def generate_report(self, results: Dict[str, QualityResult]) -> str:
"""Generate quality report"""
report = ["# Data Quality Report", f"Generated: {datetime.now()}", ""]
total_passed = sum(1 for r in results.values() if r.passed)
total_tables = len(results)
report.append(f"## Summary: {total_passed}/{total_tables} tables passed")
report.append("")
for table, result in results.items():
status = "✅" if result.passed else "❌"
report.append(f"### {status} {table}")
report.append(f"- Expectations: {result.total_expectations}")
report.append(f"- Failed: {result.failed_expectations}")
if not result.passed:
report.append("- Failed checks:")
for detail in result.details:
if not detail["success"]:
report.append(f" - {detail['expectation']}: {detail['observed_value']}")
report.append("")
return "\n".join(report)
Usage
context = gx.get_context() pipeline = DataQualityPipeline(context)
tables_to_validate = { "orders": "orders_suite", "customers": "customers_suite", "products": "products_suite", }
results = pipeline.run_all(tables_to_validate) report = pipeline.generate_report(results)
Fail pipeline if any table failed
if not all(r.passed for r in results.values()): print(report) raise ValueError("Data quality checks failed!")
Best Practices
Do's
-
Test early - Validate source data before transformations
-
Test incrementally - Add tests as you find issues
-
Document expectations - Clear descriptions for each test
-
Alert on failures - Integrate with monitoring
-
Version contracts - Track schema changes
Don'ts
-
Don't test everything - Focus on critical columns
-
Don't ignore warnings - They often precede failures
-
Don't skip freshness - Stale data is bad data
-
Don't hardcode thresholds - Use dynamic baselines
-
Don't test in isolation - Test relationships too