Testing Patterns
Pytest templates for comprehensive ETL pipeline testing.
Unit Tests - Transform Functions
tests/test_transforms.py
import pytest import pandas as pd from pipeline.transforms import clean_email, calculate_total, categorize_customer
class TestCleanEmail: def test_lowercase(self): assert clean_email("John@Example.COM") == "john@example.com"
def test_strip_whitespace(self):
assert clean_email(" john@example.com ") == "john@example.com"
def test_invalid_returns_none(self):
assert clean_email("not-an-email") is None
def test_null_input(self):
assert clean_email(None) is None
class TestCalculateTotal: @pytest.fixture def order_items(self): return pd.DataFrame({ 'order_id': [1, 1, 2], 'quantity': [2, 3, 1], 'unit_price': [10.0, 5.0, 100.0] })
def test_sums_correctly(self, order_items):
result = calculate_total(order_items)
assert result.loc[result['order_id'] == 1, 'total'].values[0] == 35.0
def test_handles_empty(self):
empty = pd.DataFrame(columns=['order_id', 'quantity', 'unit_price'])
result = calculate_total(empty)
assert len(result) == 0
class TestCategorizeCustomer: @pytest.mark.parametrize("total_spent,expected", [ (0, 'bronze'), (99, 'bronze'), (100, 'silver'), (999, 'silver'), (1000, 'gold'), (9999, 'gold'), (10000, 'platinum'), ]) def test_tiers(self, total_spent, expected): assert categorize_customer(total_spent) == expected
Integration Tests - Full Pipeline
tests/test_pipeline.py
import pytest from pipeline import OrdersPipeline from tests.fixtures import generate_orders_fixture
class TestOrdersPipeline: @pytest.fixture def pipeline(self, tmp_path): return OrdersPipeline( source_path=tmp_path / "source", target_path=tmp_path / "target" )
@pytest.fixture
def source_data(self, tmp_path):
df = generate_orders_fixture(100)
path = tmp_path / "source" / "orders.csv"
path.parent.mkdir(parents=True)
df.to_csv(path, index=False)
return df
def test_row_count_preserved(self, pipeline, source_data):
"""Verify no rows lost in transformation."""
pipeline.run()
result = pd.read_parquet(pipeline.target_path / "orders.parquet")
assert len(result) == len(source_data)
def test_all_columns_present(self, pipeline, source_data):
"""Verify output has expected columns."""
pipeline.run()
result = pd.read_parquet(pipeline.target_path / "orders.parquet")
expected_columns = ['order_id', 'customer_id', 'total', 'tier', 'processed_at']
assert all(col in result.columns for col in expected_columns)
def test_no_null_required_fields(self, pipeline, source_data):
"""Verify required fields are populated."""
pipeline.run()
result = pd.read_parquet(pipeline.target_path / "orders.parquet")
assert result['order_id'].notna().all()
assert result['customer_id'].notna().all()
def test_idempotent(self, pipeline, source_data):
"""Running twice produces same result."""
pipeline.run()
first_result = pd.read_parquet(pipeline.target_path / "orders.parquet")
pipeline.run()
second_result = pd.read_parquet(pipeline.target_path / "orders.parquet")
pd.testing.assert_frame_equal(first_result, second_result)
Data Quality Tests (dbt-style)
tests/test_data_quality.py
import pytest from sqlalchemy import create_engine, text
@pytest.fixture def db_connection(): engine = create_engine("postgresql://...") with engine.connect() as conn: yield conn
class TestOrdersTable: def test_unique_order_id(self, db_connection): result = db_connection.execute(text(""" SELECT order_id, COUNT() as cnt FROM orders GROUP BY order_id HAVING COUNT() > 1 """)) duplicates = result.fetchall() assert len(duplicates) == 0, f"Found duplicate order_ids: {duplicates[:5]}"
def test_valid_status(self, db_connection):
result = db_connection.execute(text("""
SELECT DISTINCT status
FROM orders
WHERE status NOT IN ('pending', 'confirmed', 'shipped', 'delivered', 'cancelled')
"""))
invalid = result.fetchall()
assert len(invalid) == 0, f"Found invalid statuses: {invalid}"
def test_positive_amounts(self, db_connection):
result = db_connection.execute(text("""
SELECT COUNT(*) FROM orders WHERE total < 0
"""))
negative_count = result.scalar()
assert negative_count == 0, f"Found {negative_count} orders with negative totals"
Golden File Testing
def test_transform_matches_golden(self): """Compare output to known-good result.""" input_df = pd.read_csv("tests/fixtures/input.csv") expected = pd.read_csv("tests/golden/expected_output.csv")
result = transform(input_df)
pd.testing.assert_frame_equal(result, expected)
Snapshot Testing
def test_schema_snapshot(self, snapshot): """Ensure schema hasn't changed unexpectedly.""" result = transform(input_df) schema = {col: str(dtype) for col, dtype in result.dtypes.items()} snapshot.assert_match(json.dumps(schema, indent=2), "schema.json")
Property-Based Testing
from hypothesis import given, strategies as st
@given(st.floats(min_value=0, max_value=1e9)) def test_total_always_positive(amount): """Total should never go negative.""" result = calculate_tax(amount) assert result >= 0
@given(st.lists(st.integers(min_value=1, max_value=100), min_size=1)) def test_sum_equals_parts(values): """Aggregation should equal sum of parts.""" df = pd.DataFrame({'amount': values}) result = aggregate(df) assert result == sum(values)