airflow-dag-patterns

Apache Airflow DAG Patterns

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "airflow-dag-patterns" with this command: npx skills add wshobson/agents/wshobson-agents-airflow-dag-patterns

Apache Airflow DAG Patterns

Production-ready patterns for Apache Airflow including DAG design, operators, sensors, testing, and deployment strategies.

When to Use This Skill

  • Creating data pipeline orchestration with Airflow

  • Designing DAG structures and dependencies

  • Implementing custom operators and sensors

  • Testing Airflow DAGs locally

  • Setting up Airflow in production

  • Debugging failed DAG runs

Core Concepts

  1. DAG Design Principles

Principle Description

Idempotent Running twice produces same result

Atomic Tasks succeed or fail completely

Incremental Process only new/changed data

Observable Logs, metrics, alerts at every step

  1. Task Dependencies

Linear

task1 >> task2 >> task3

Fan-out

task1 >> [task2, task3, task4]

Fan-in

[task1, task2, task3] >> task4

Complex

task1 >> task2 >> task4 task1 >> task3 >> task4

Quick Start

dags/example_dag.py

from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator from airflow.operators.empty import EmptyOperator

default_args = { 'owner': 'data-team', 'depends_on_past': False, 'email_on_failure': True, 'email_on_retry': False, 'retries': 3, 'retry_delay': timedelta(minutes=5), 'retry_exponential_backoff': True, 'max_retry_delay': timedelta(hours=1), }

with DAG( dag_id='example_etl', default_args=default_args, description='Example ETL pipeline', schedule='0 6 * * *', # Daily at 6 AM start_date=datetime(2024, 1, 1), catchup=False, tags=['etl', 'example'], max_active_runs=1, ) as dag:

start = EmptyOperator(task_id='start')

def extract_data(**context):
    execution_date = context['ds']
    # Extract logic here
    return {'records': 1000}

extract = PythonOperator(
    task_id='extract',
    python_callable=extract_data,
)

end = EmptyOperator(task_id='end')

start >> extract >> end

Patterns

Pattern 1: TaskFlow API (Airflow 2.0+)

dags/taskflow_example.py

from datetime import datetime from airflow.decorators import dag, task from airflow.models import Variable

@dag( dag_id='taskflow_etl', schedule='@daily', start_date=datetime(2024, 1, 1), catchup=False, tags=['etl', 'taskflow'], ) def taskflow_etl(): """ETL pipeline using TaskFlow API"""

@task()
def extract(source: str) -> dict:
    """Extract data from source"""
    import pandas as pd

    df = pd.read_csv(f's3://bucket/{source}/{{ ds }}.csv')
    return {'data': df.to_dict(), 'rows': len(df)}

@task()
def transform(extracted: dict) -> dict:
    """Transform extracted data"""
    import pandas as pd

    df = pd.DataFrame(extracted['data'])
    df['processed_at'] = datetime.now()
    df = df.dropna()
    return {'data': df.to_dict(), 'rows': len(df)}

@task()
def load(transformed: dict, target: str):
    """Load data to target"""
    import pandas as pd

    df = pd.DataFrame(transformed['data'])
    df.to_parquet(f's3://bucket/{target}/{{ ds }}.parquet')
    return transformed['rows']

@task()
def notify(rows_loaded: int):
    """Send notification"""
    print(f'Loaded {rows_loaded} rows')

# Define dependencies with XCom passing
extracted = extract(source='raw_data')
transformed = transform(extracted)
loaded = load(transformed, target='processed_data')
notify(loaded)

Instantiate the DAG

taskflow_etl()

Pattern 2: Dynamic DAG Generation

dags/dynamic_dag_factory.py

from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator from airflow.models import Variable import json

Configuration for multiple similar pipelines

PIPELINE_CONFIGS = [ {'name': 'customers', 'schedule': '@daily', 'source': 's3://raw/customers'}, {'name': 'orders', 'schedule': '@hourly', 'source': 's3://raw/orders'}, {'name': 'products', 'schedule': '@weekly', 'source': 's3://raw/products'}, ]

def create_dag(config: dict) -> DAG: """Factory function to create DAGs from config"""

dag_id = f"etl_{config['name']}"

default_args = {
    'owner': 'data-team',
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    dag_id=dag_id,
    default_args=default_args,
    schedule=config['schedule'],
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['etl', 'dynamic', config['name']],
)

with dag:
    def extract_fn(source, **context):
        print(f"Extracting from {source} for {context['ds']}")

    def transform_fn(**context):
        print(f"Transforming data for {context['ds']}")

    def load_fn(table_name, **context):
        print(f"Loading to {table_name} for {context['ds']}")

    extract = PythonOperator(
        task_id='extract',
        python_callable=extract_fn,
        op_kwargs={'source': config['source']},
    )

    transform = PythonOperator(
        task_id='transform',
        python_callable=transform_fn,
    )

    load = PythonOperator(
        task_id='load',
        python_callable=load_fn,
        op_kwargs={'table_name': config['name']},
    )

    extract >> transform >> load

return dag

Generate DAGs

for config in PIPELINE_CONFIGS: globals()[f"dag_{config['name']}"] = create_dag(config)

Pattern 3: Branching and Conditional Logic

dags/branching_example.py

from airflow.decorators import dag, task from airflow.operators.python import BranchPythonOperator from airflow.operators.empty import EmptyOperator from airflow.utils.trigger_rule import TriggerRule

@dag( dag_id='branching_pipeline', schedule='@daily', start_date=datetime(2024, 1, 1), catchup=False, ) def branching_pipeline():

@task()
def check_data_quality() -> dict:
    """Check data quality and return metrics"""
    quality_score = 0.95  # Simulated
    return {'score': quality_score, 'rows': 10000}

def choose_branch(**context) -> str:
    """Determine which branch to execute"""
    ti = context['ti']
    metrics = ti.xcom_pull(task_ids='check_data_quality')

    if metrics['score'] >= 0.9:
        return 'high_quality_path'
    elif metrics['score'] >= 0.7:
        return 'medium_quality_path'
    else:
        return 'low_quality_path'

quality_check = check_data_quality()

branch = BranchPythonOperator(
    task_id='branch',
    python_callable=choose_branch,
)

high_quality = EmptyOperator(task_id='high_quality_path')
medium_quality = EmptyOperator(task_id='medium_quality_path')
low_quality = EmptyOperator(task_id='low_quality_path')

# Join point - runs after any branch completes
join = EmptyOperator(
    task_id='join',
    trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
)

quality_check >> branch >> [high_quality, medium_quality, low_quality] >> join

branching_pipeline()

Pattern 4: Sensors and External Dependencies

dags/sensor_patterns.py

from datetime import datetime, timedelta from airflow import DAG from airflow.sensors.filesystem import FileSensor from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor from airflow.sensors.external_task import ExternalTaskSensor from airflow.operators.python import PythonOperator

with DAG( dag_id='sensor_example', schedule='@daily', start_date=datetime(2024, 1, 1), catchup=False, ) as dag:

# Wait for file on S3
wait_for_file = S3KeySensor(
    task_id='wait_for_s3_file',
    bucket_name='data-lake',
    bucket_key='raw/{{ ds }}/data.parquet',
    aws_conn_id='aws_default',
    timeout=60 * 60 * 2,  # 2 hours
    poke_interval=60 * 5,  # Check every 5 minutes
    mode='reschedule',  # Free up worker slot while waiting
)

# Wait for another DAG to complete
wait_for_upstream = ExternalTaskSensor(
    task_id='wait_for_upstream_dag',
    external_dag_id='upstream_etl',
    external_task_id='final_task',
    execution_date_fn=lambda dt: dt,  # Same execution date
    timeout=60 * 60 * 3,
    mode='reschedule',
)

# Custom sensor using @task.sensor decorator
@task.sensor(poke_interval=60, timeout=3600, mode='reschedule')
def wait_for_api() -> PokeReturnValue:
    """Custom sensor for API availability"""
    import requests

    response = requests.get('https://api.example.com/health')
    is_done = response.status_code == 200

    return PokeReturnValue(is_done=is_done, xcom_value=response.json())

api_ready = wait_for_api()

def process_data(**context):
    api_result = context['ti'].xcom_pull(task_ids='wait_for_api')
    print(f"API returned: {api_result}")

process = PythonOperator(
    task_id='process',
    python_callable=process_data,
)

[wait_for_file, wait_for_upstream, api_ready] >> process

Pattern 5: Error Handling and Alerts

dags/error_handling.py

from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator from airflow.utils.trigger_rule import TriggerRule from airflow.models import Variable

def task_failure_callback(context): """Callback on task failure""" task_instance = context['task_instance'] exception = context.get('exception')

# Send to Slack/PagerDuty/etc
message = f"""
Task Failed!
DAG: {task_instance.dag_id}
Task: {task_instance.task_id}
Execution Date: {context['ds']}
Error: {exception}
Log URL: {task_instance.log_url}
"""
# send_slack_alert(message)
print(message)

def dag_failure_callback(context): """Callback on DAG failure""" # Aggregate failures, send summary pass

with DAG( dag_id='error_handling_example', schedule='@daily', start_date=datetime(2024, 1, 1), catchup=False, on_failure_callback=dag_failure_callback, default_args={ 'on_failure_callback': task_failure_callback, 'retries': 3, 'retry_delay': timedelta(minutes=5), }, ) as dag:

def might_fail(**context):
    import random
    if random.random() < 0.3:
        raise ValueError("Random failure!")
    return "Success"

risky_task = PythonOperator(
    task_id='risky_task',
    python_callable=might_fail,
)

def cleanup(**context):
    """Cleanup runs regardless of upstream failures"""
    print("Cleaning up...")

cleanup_task = PythonOperator(
    task_id='cleanup',
    python_callable=cleanup,
    trigger_rule=TriggerRule.ALL_DONE,  # Run even if upstream fails
)

def notify_success(**context):
    """Only runs if all upstream succeeded"""
    print("All tasks succeeded!")

success_notification = PythonOperator(
    task_id='notify_success',
    python_callable=notify_success,
    trigger_rule=TriggerRule.ALL_SUCCESS,
)

risky_task >> [cleanup_task, success_notification]

Pattern 6: Testing DAGs

tests/test_dags.py

import pytest from datetime import datetime from airflow.models import DagBag

@pytest.fixture def dagbag(): return DagBag(dag_folder='dags/', include_examples=False)

def test_dag_loaded(dagbag): """Test that all DAGs load without errors""" assert len(dagbag.import_errors) == 0, f"DAG import errors: {dagbag.import_errors}"

def test_dag_structure(dagbag): """Test specific DAG structure""" dag = dagbag.get_dag('example_etl')

assert dag is not None
assert len(dag.tasks) == 3
assert dag.schedule_interval == '0 6 * * *'

def test_task_dependencies(dagbag): """Test task dependencies are correct""" dag = dagbag.get_dag('example_etl')

extract_task = dag.get_task('extract')
assert 'start' in [t.task_id for t in extract_task.upstream_list]
assert 'end' in [t.task_id for t in extract_task.downstream_list]

def test_dag_integrity(dagbag): """Test DAG has no cycles and is valid""" for dag_id, dag in dagbag.dags.items(): assert dag.test_cycle() is None, f"Cycle detected in {dag_id}"

Test individual task logic

def test_extract_function(): """Unit test for extract function""" from dags.example_dag import extract_data

result = extract_data(ds='2024-01-01')
assert 'records' in result
assert isinstance(result['records'], int)

Project Structure

airflow/ ├── dags/ │ ├── init.py │ ├── common/ │ │ ├── init.py │ │ ├── operators.py # Custom operators │ │ ├── sensors.py # Custom sensors │ │ └── callbacks.py # Alert callbacks │ ├── etl/ │ │ ├── customers.py │ │ └── orders.py │ └── ml/ │ └── training.py ├── plugins/ │ └── custom_plugin.py ├── tests/ │ ├── init.py │ ├── test_dags.py │ └── test_operators.py ├── docker-compose.yml └── requirements.txt

Best Practices

Do's

  • Use TaskFlow API - Cleaner code, automatic XCom

  • Set timeouts - Prevent zombie tasks

  • Use mode='reschedule'

  • For sensors, free up workers

  • Test DAGs - Unit tests and integration tests

  • Idempotent tasks - Safe to retry

Don'ts

  • Don't use depends_on_past=True

  • Creates bottlenecks

  • Don't hardcode dates - Use {{ ds }} macros

  • Don't use global state - Tasks should be stateless

  • Don't skip catchup blindly - Understand implications

  • Don't put heavy logic in DAG file - Import from modules

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Automation

tailwind-design-system

Tailwind Design System (v4)

Repository Source
31.3K19K
wshobson
Automation

api-design-principles

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

nodejs-backend-patterns

No summary provided by upstream source.

Repository SourceNeeds Review