airflow

Master Apache Airflow for workflow orchestration, data pipeline automation, and scheduled task management. This skill covers DAG authoring, operators, sensors, hooks, XComs, variables, connections, and deployment patterns.

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "airflow" with this command: npx skills add vamseeachanta/workspace-hub/vamseeachanta-workspace-hub-airflow

Apache Airflow Skill

Master Apache Airflow for workflow orchestration, data pipeline automation, and scheduled task management. This skill covers DAG authoring, operators, sensors, hooks, XComs, variables, connections, and deployment patterns.

When to Use This Skill

USE when:

  • Building complex data pipelines with task dependencies

  • Orchestrating ETL/ELT workflows

  • Scheduling recurring batch jobs

  • Managing workflows with retries and error handling

  • Coordinating tasks across multiple systems

  • Need visibility into workflow execution history

  • Requiring audit trails and lineage tracking

  • Building ML pipeline orchestration

DON'T USE when:

  • Real-time streaming data (use Kafka, Flink)

  • Simple cron jobs (use systemd timers, crontab)

  • CI/CD pipelines (use GitHub Actions, Jenkins)

  • Low-latency requirements (Airflow has scheduler overhead)

  • Simple single-task automation (overkill)

  • Need visual workflow design for non-developers (use n8n)

Prerequisites

Installation Options

Option 1: pip (Development)

Create virtual environment

python -m venv airflow-env source airflow-env/bin/activate

Set Airflow home

export AIRFLOW_HOME=~/airflow

Install Airflow with constraints

AIRFLOW_VERSION=2.8.1 PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)" CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"

pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

Initialize database

airflow db init

Create admin user

airflow users create
--username admin
--firstname Admin
--lastname User
--role Admin
--email admin@example.com
--password admin

Start services

airflow webserver --port 8080 & airflow scheduler &

Option 2: Docker Compose (Recommended)

Download official docker-compose

curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.8.1/docker-compose.yaml'

Create required directories

mkdir -p ./dags ./logs ./plugins ./config echo -e "AIRFLOW_UID=$(id -u)" > .env

Initialize

docker compose up airflow-init

Start services

docker compose up -d

Access UI at http://localhost:8080 (airflow/airflow)

Option 3: Kubernetes with Helm

Add Airflow Helm repo

helm repo add apache-airflow https://airflow.apache.org helm repo update

Install Airflow

helm install airflow apache-airflow/airflow
--namespace airflow
--create-namespace
--set executor=KubernetesExecutor

Get web UI password

kubectl get secret --namespace airflow airflow-webserver-secret -o jsonpath="{.data.webserver-secret-key}" | base64 --decode

Development Setup

Install development dependencies

pip install apache-airflow[dev,postgres,celery,kubernetes]

Install testing tools

pip install pytest pytest-airflow

Install linting

pip install ruff

Core Capabilities

  1. Basic DAG Structure

dags/basic_dag.py

""" Basic DAG demonstrating core Airflow concepts. """ from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator from airflow.operators.bash import BashOperator from airflow.operators.empty import EmptyOperator

Default arguments for all tasks

default_args = { 'owner': 'data-team', 'depends_on_past': False, 'email': ['alerts@example.com'], 'email_on_failure': True, 'email_on_retry': False, 'retries': 3, 'retry_delay': timedelta(minutes=5), 'retry_exponential_backoff': True, 'max_retry_delay': timedelta(minutes=30), 'execution_timeout': timedelta(hours=2), }

DAG definition

with DAG( dag_id='basic_etl_pipeline', default_args=default_args, description='Basic ETL pipeline demonstrating core patterns', schedule_interval='0 6 * * *', # Daily at 6 AM start_date=datetime(2026, 1, 1), catchup=False, max_active_runs=1, tags=['etl', 'production'], doc_md=""" ## Basic ETL Pipeline

This DAG demonstrates:
- Task dependencies
- Python and Bash operators
- Error handling with retries
- Task documentation

**Owner**: data-team
**Schedule**: Daily at 6 AM UTC
""",

) as dag:

# Start marker
start = EmptyOperator(
    task_id='start',
    doc='Pipeline start marker',
)

# Extract task
def extract_data(**context):
    """Extract data from source systems."""
    import logging
    logger = logging.getLogger(__name__)

    # Access execution context
    execution_date = context['ds']
    logger.info(f"Extracting data for {execution_date}")

    # Simulated extraction
    data = {
        'records': 1000,
        'source': 'database',
        'execution_date': execution_date,
    }

    # Return value available via XCom
    return data

extract = PythonOperator(
    task_id='extract_data',
    python_callable=extract_data,
    doc='Extract data from source database',
)

# Transform task
def transform_data(**context):
    """Transform extracted data."""
    import logging
    logger = logging.getLogger(__name__)

    # Pull data from previous task via XCom
    ti = context['ti']
    extracted_data = ti.xcom_pull(task_ids='extract_data')

    logger.info(f"Transforming {extracted_data['records']} records")

    # Simulated transformation
    transformed = {
        **extracted_data,
        'records_transformed': extracted_data['records'],
        'quality_score': 0.95,
    }

    return transformed

transform = PythonOperator(
    task_id='transform_data',
    python_callable=transform_data,
    doc='Apply transformations to extracted data',
)

# Load task using Bash
load = BashOperator(
    task_id='load_data',
    bash_command='''
        echo "Loading data to warehouse"
        echo "Execution date: {{ ds }}"
        echo "Previous task output: {{ ti.xcom_pull(task_ids='transform_data') }}"
    ''',
    doc='Load transformed data to data warehouse',
)

# End marker
end = EmptyOperator(
    task_id='end',
    doc='Pipeline end marker',
    trigger_rule='all_success',
)

# Define dependencies
start >> extract >> transform >> load >> end

2. Advanced Operators

dags/advanced_operators.py

""" DAG demonstrating advanced operator patterns. """ from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator, BranchPythonOperator from airflow.operators.bash import BashOperator from airflow.operators.empty import EmptyOperator from airflow.utils.trigger_rule import TriggerRule

default_args = { 'owner': 'data-team', 'retries': 2, 'retry_delay': timedelta(minutes=5), }

with DAG( dag_id='advanced_operators_demo', default_args=default_args, schedule_interval=None, # Manual trigger only start_date=datetime(2026, 1, 1), catchup=False, tags=['demo', 'advanced'], ) as dag:

# BranchPythonOperator for conditional logic
def choose_branch(**context):
    """Decide which branch to execute based on data."""
    import random

    # Simulated decision logic
    data_volume = random.randint(0, 1000)
    context['ti'].xcom_push(key='data_volume', value=data_volume)

    if data_volume > 500:
        return 'process_large_dataset'
    else:
        return 'process_small_dataset'

branch_task = BranchPythonOperator(
    task_id='branch_on_data_volume',
    python_callable=choose_branch,
)

process_large = PythonOperator(
    task_id='process_large_dataset',
    python_callable=lambda: print("Processing large dataset with parallel workers"),
)

process_small = PythonOperator(
    task_id='process_small_dataset',
    python_callable=lambda: print("Processing small dataset directly"),
)

# Join branches - triggered when any upstream succeeds
join = EmptyOperator(
    task_id='join_branches',
    trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
)

# TaskGroup for logical grouping
from airflow.utils.task_group import TaskGroup

with TaskGroup(group_id='validation_tasks') as validation_group:
    validate_schema = PythonOperator(
        task_id='validate_schema',
        python_callable=lambda: print("Validating data schema"),
    )

    validate_quality = PythonOperator(
        task_id='validate_quality',
        python_callable=lambda: print("Validating data quality"),
    )

    validate_completeness = PythonOperator(
        task_id='validate_completeness',
        python_callable=lambda: print("Validating data completeness"),
    )

    # Parallel validation tasks
    [validate_schema, validate_quality, validate_completeness]

# Dynamic task mapping (Airflow 2.3+)
def generate_partitions(**context):
    """Generate partition list for dynamic mapping."""
    return ['partition_a', 'partition_b', 'partition_c', 'partition_d']

get_partitions = PythonOperator(
    task_id='get_partitions',
    python_callable=generate_partitions,
)

def process_partition(partition: str):
    """Process a single partition."""
    print(f"Processing {partition}")
    return f"Processed {partition}"

# Map over partitions dynamically
process_partitions = PythonOperator.partial(
    task_id='process_partition',
).expand(op_args=get_partitions.output.map(lambda x: [x]))

# Final aggregation
def aggregate_results(**context):
    """Aggregate results from all partitions."""
    ti = context['ti']
    results = ti.xcom_pull(task_ids='process_partition', key='return_value')
    print(f"Aggregated {len(results)} partition results")

aggregate = PythonOperator(
    task_id='aggregate_results',
    python_callable=aggregate_results,
)

# Dependencies
branch_task >> [process_large, process_small] >> join
join >> validation_group >> get_partitions >> process_partitions >> aggregate

3. Sensors for Event-Driven Workflows

dags/sensor_patterns.py

""" DAG demonstrating sensor patterns for event-driven workflows. """ from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator from airflow.sensors.filesystem import FileSensor from airflow.sensors.external_task import ExternalTaskSensor from airflow.sensors.python import PythonSensor from airflow.providers.http.sensors.http import HttpSensor from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor

default_args = { 'owner': 'data-team', 'retries': 1, 'retry_delay': timedelta(minutes=1), }

with DAG( dag_id='sensor_patterns_demo', default_args=default_args, schedule_interval='@daily', start_date=datetime(2026, 1, 1), catchup=False, tags=['sensors', 'event-driven'], ) as dag:

# FileSensor - Wait for file to appear
wait_for_file = FileSensor(
    task_id='wait_for_data_file',
    filepath='/data/incoming/{{ ds }}/data.csv',
    poke_interval=60,  # Check every 60 seconds
    timeout=3600,  # Timeout after 1 hour
    mode='poke',  # 'poke' or 'reschedule'
    soft_fail=False,  # Fail task if timeout
)

# S3KeySensor - Wait for S3 object
wait_for_s3 = S3KeySensor(
    task_id='wait_for_s3_file',
    bucket_name='my-data-bucket',
    bucket_key='incoming/{{ ds }}/data.parquet',
    aws_conn_id='aws_default',
    poke_interval=120,
    timeout=7200,
    mode='reschedule',  # Release worker while waiting
)

# HttpSensor - Wait for API endpoint
wait_for_api = HttpSensor(
    task_id='wait_for_api_ready',
    http_conn_id='api_connection',
    endpoint='/health',
    request_params={},
    response_check=lambda response: response.json().get('status') == 'ready',
    poke_interval=30,
    timeout=600,
)

# ExternalTaskSensor - Wait for another DAG
wait_for_upstream_dag = ExternalTaskSensor(
    task_id='wait_for_upstream_dag',
    external_dag_id='upstream_data_pipeline',
    external_task_id='final_task',
    execution_delta=timedelta(hours=0),  # Same execution date
    poke_interval=60,
    timeout=3600,
    mode='reschedule',
    allowed_states=['success'],
    failed_states=['failed', 'skipped'],
)

# PythonSensor - Custom condition
def check_data_quality(**context):
    """Custom sensor logic to check data quality."""
    import random
    # Simulated quality check
    quality_score = random.random()
    print(f"Quality score: {quality_score}")
    return quality_score > 0.8  # Return True when condition met

wait_for_quality = PythonSensor(
    task_id='wait_for_data_quality',
    python_callable=check_data_quality,
    poke_interval=120,
    timeout=1800,
    mode='poke',
)

# Process after all sensors pass
def process_data(**context):
    """Process data after all conditions are met."""
    print("All sensors passed, processing data...")

process = PythonOperator(
    task_id='process_data',
    python_callable=process_data,
)

# Dependencies - all sensors must pass
[wait_for_file, wait_for_s3, wait_for_api, wait_for_upstream_dag, wait_for_quality] >> process

4. Hooks for External System Integration

dags/hooks_demo.py

""" DAG demonstrating hook patterns for external system integration. """ from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator from airflow.providers.postgres.hooks.postgres import PostgresHook from airflow.providers.amazon.aws.hooks.s3 import S3Hook from airflow.providers.http.hooks.http import HttpHook from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook

default_args = { 'owner': 'data-team', 'retries': 2, 'retry_delay': timedelta(minutes=5), }

with DAG( dag_id='hooks_integration_demo', default_args=default_args, schedule_interval='@daily', start_date=datetime(2026, 1, 1), catchup=False, tags=['hooks', 'integration'], ) as dag:

# PostgresHook for database operations
def extract_from_postgres(**context):
    """Extract data from PostgreSQL using hook."""
    hook = PostgresHook(postgres_conn_id='postgres_warehouse')

    # Execute query and fetch results
    sql = """
        SELECT id, name, value, created_at
        FROM source_table
        WHERE created_at >= '{{ ds }}'
        AND created_at < '{{ next_ds }}'
    """

    # Get connection and execute
    connection = hook.get_conn()
    cursor = connection.cursor()
    cursor.execute(sql)
    results = cursor.fetchall()

    # Or use pandas
    df = hook.get_pandas_df(sql)
    print(f"Extracted {len(df)} rows")

    # Store record count
    context['ti'].xcom_push(key='record_count', value=len(df))

    return df.to_dict()

extract_postgres = PythonOperator(
    task_id='extract_from_postgres',
    python_callable=extract_from_postgres,
)

# S3Hook for file operations
def upload_to_s3(**context):
    """Upload processed data to S3."""
    import json
    from io import BytesIO

    hook = S3Hook(aws_conn_id='aws_default')

    # Pull data from previous task
    ti = context['ti']
    data = ti.xcom_pull(task_ids='extract_from_postgres')

    # Convert to JSON and upload
    json_data = json.dumps(data)
    key = f"processed/{{ ds }}/data.json"

    hook.load_string(
        string_data=json_data,
        key=key,
        bucket_name='my-data-bucket',
        replace=True,
    )

    print(f"Uploaded to s3://my-data-bucket/{key}")
    return key

upload_s3 = PythonOperator(
    task_id='upload_to_s3',
    python_callable=upload_to_s3,
)

# HttpHook for API calls
def call_external_api(**context):
    """Make API call using HTTP hook."""
    hook = HttpHook(http_conn_id='api_connection', method='POST')

    # Prepare payload
    payload = {
        'execution_date': context['ds'],
        'dag_id': context['dag'].dag_id,
        'task_id': context['task'].task_id,
    }

    # Make request
    response = hook.run(
        endpoint='/api/v1/notify',
        data=json.dumps(payload),
        headers={'Content-Type': 'application/json'},
    )

    return response.json()

api_call = PythonOperator(
    task_id='call_external_api',
    python_callable=call_external_api,
)

# SlackWebhookHook for notifications
def send_slack_notification(**context):
    """Send completion notification to Slack."""
    hook = SlackWebhookHook(slack_webhook_conn_id='slack_webhook')

    ti = context['ti']
    record_count = ti.xcom_pull(task_ids='extract_from_postgres', key='record_count')

    message = f"""
    :white_check_mark: *Pipeline Completed Successfully*

    *DAG*: {context['dag'].dag_id}
    *Execution Date*: {context['ds']}
    *Records Processed*: {record_count}
    """

    hook.send(text=message)

notify_slack = PythonOperator(
    task_id='send_slack_notification',
    python_callable=send_slack_notification,
)

# Dependencies
extract_postgres >> upload_s3 >> api_call >> notify_slack

5. XCom for Task Communication

dags/xcom_patterns.py

""" DAG demonstrating XCom patterns for inter-task communication. """ from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator from airflow.models import XCom

default_args = { 'owner': 'data-team', 'retries': 1, }

with DAG( dag_id='xcom_patterns_demo', default_args=default_args, schedule_interval=None, start_date=datetime(2026, 1, 1), catchup=False, tags=['xcom', 'communication'], ) as dag:

# Basic XCom push via return value
def produce_data(**context):
    """Produce data - return value auto-pushed to XCom."""
    data = {
        'records': [
            {'id': 1, 'value': 100},
            {'id': 2, 'value': 200},
            {'id': 3, 'value': 300},
        ],
        'metadata': {
            'source': 'api',
            'timestamp': str(datetime.now()),
        }
    }
    return data  # Automatically pushed to XCom

producer = PythonOperator(
    task_id='produce_data',
    python_callable=produce_data,
)

# Consume XCom from previous task
def consume_data(**context):
    """Consume data from previous task."""
    ti = context['ti']

    # Pull return value (default key)
    data = ti.xcom_pull(task_ids='produce_data')
    print(f"Received {len(data['records'])} records")

    # Process and return
    total = sum(r['value'] for r in data['records'])
    return {'total': total, 'count': len(data['records'])}

consumer = PythonOperator(
    task_id='consume_data',
    python_callable=consume_data,
)

# Multiple XCom values with custom keys
def produce_multiple(**context):
    """Push multiple XCom values with different keys."""
    ti = context['ti']

    # Push individual values
    ti.xcom_push(key='status', value='success')
    ti.xcom_push(key='row_count', value=1000)
    ti.xcom_push(key='quality_score', value=0.95)
    ti.xcom_push(key='metadata', value={
        'source': 'database',
        'table': 'transactions',
    })

multi_producer = PythonOperator(
    task_id='produce_multiple_xcoms',
    python_callable=produce_multiple,
)

def consume_multiple(**context):
    """Pull multiple XCom values."""
    ti = context['ti']

    # Pull specific keys
    status = ti.xcom_pull(task_ids='produce_multiple_xcoms', key='status')
    row_count = ti.xcom_pull(task_ids='produce_multiple_xcoms', key='row_count')
    quality = ti.xcom_pull(task_ids='produce_multiple_xcoms', key='quality_score')
    metadata = ti.xcom_pull(task_ids='produce_multiple_xcoms', key='metadata')

    print(f"Status: {status}")
    print(f"Rows: {row_count}, Quality: {quality}")
    print(f"Source: {metadata['source']}")

multi_consumer = PythonOperator(
    task_id='consume_multiple_xcoms',
    python_callable=consume_multiple,
)

# Cross-DAG XCom (use with caution)
def cross_dag_pull(**context):
    """Pull XCom from another DAG run."""
    ti = context['ti']

    # Pull from specific DAG
    value = ti.xcom_pull(
        dag_id='other_dag_id',
        task_ids='other_task_id',
        key='shared_value',
        include_prior_dates=True,  # Look at previous runs
    )
    print(f"Value from other DAG: {value}")

# Template-based XCom access
template_task = PythonOperator(
    task_id='template_xcom_access',
    python_callable=lambda **ctx: print(ctx['templates_dict']),
    templates_dict={
        'data': "{{ ti.xcom_pull(task_ids='produce_data') }}",
        'status': "{{ ti.xcom_pull(task_ids='produce_multiple_xcoms', key='status') }}",
    },
)

# Dependencies
producer >> consumer
multi_producer >> multi_consumer
[consumer, multi_consumer] >> template_task

6. Variables and Connections

dags/config_management.py

""" DAG demonstrating Variables and Connections for configuration. """ from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator from airflow.models import Variable from airflow.hooks.base import BaseHook

default_args = { 'owner': 'data-team', 'retries': 1, }

with DAG( dag_id='config_management_demo', default_args=default_args, schedule_interval=None, start_date=datetime(2026, 1, 1), catchup=False, tags=['config', 'variables'], ) as dag:

# Using Airflow Variables
def use_variables(**context):
    """Access Airflow Variables."""
    import json

    # Get simple variable
    environment = Variable.get('environment', default_var='development')
    print(f"Environment: {environment}")

    # Get JSON variable (auto-deserialize)
    config = Variable.get('pipeline_config', deserialize_json=True)
    print(f"Config: {config}")

    # Set variable programmatically
    Variable.set('last_run', str(datetime.now()))

    # Get with default
    threshold = Variable.get('quality_threshold', default_var='0.9')

    return {
        'environment': environment,
        'config': config,
        'threshold': float(threshold),
    }

var_task = PythonOperator(
    task_id='use_variables',
    python_callable=use_variables,
)

# Using Connections
def use_connections(**context):
    """Access Airflow Connections."""
    # Get connection object
    conn = BaseHook.get_connection('postgres_warehouse')

    print(f"Host: {conn.host}")
    print(f"Port: {conn.port}")
    print(f"Schema: {conn.schema}")
    print(f"Login: {conn.login}")
    # Password accessible but don't log it: conn.password

    # Get extra fields (JSON)
    extra = conn.extra_dejson
    print(f"Extra config: {extra}")

    # Build connection URI
    uri = conn.get_uri()

    return {
        'host': conn.host,
        'schema': conn.schema,
    }

conn_task = PythonOperator(
    task_id='use_connections',
    python_callable=use_connections,
)

# Template-based variable access
def template_vars(**context):
    """Access variables via templates."""
    # Variables accessible in templates
    print(context['var']['value'])
    print(context['var']['json'])

template_task = PythonOperator(
    task_id='template_variable_access',
    python_callable=template_vars,
    op_kwargs={
        # Access variable in template
        'config': "{{ var.json.pipeline_config }}",
        'env': "{{ var.value.environment }}",
    },
)

# Environment-specific configuration pattern
def environment_config(**context):
    """Load environment-specific configuration."""
    import json

    env = Variable.get('environment', default_var='development')

    # Load environment-specific config
    config_key = f'config_{env}'
    config = Variable.get(config_key, deserialize_json=True, default_var={})

    # Merge with defaults
    defaults = {
        'batch_size': 1000,
        'timeout': 300,
        'retry_count': 3,
    }

    final_config = {**defaults, **config}
    print(f"Final config for {env}: {final_config}")

    return final_config

env_config = PythonOperator(
    task_id='load_environment_config',
    python_callable=environment_config,
)

# Dependencies
[var_task, conn_task] >> template_task >> env_config

7. Error Handling and Callbacks

dags/error_handling.py

""" DAG demonstrating error handling and callback patterns. """ from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator from airflow.operators.bash import BashOperator from airflow.utils.trigger_rule import TriggerRule

def on_success_callback(context): """Callback executed on task success.""" print(f"Task {context['task_instance'].task_id} succeeded!") # Send success notification, update metrics, etc.

def on_failure_callback(context): """Callback executed on task failure.""" import logging logger = logging.getLogger(name)

ti = context['task_instance']
dag_id = ti.dag_id
task_id = ti.task_id
execution_date = context['execution_date']
exception = context.get('exception')

error_msg = f"""
Task Failed!
DAG: {dag_id}
Task: {task_id}
Execution Date: {execution_date}
Exception: {exception}
"""

logger.error(error_msg)

# Send alert (Slack, PagerDuty, email, etc.)
# slack_hook.send(text=error_msg)

def on_retry_callback(context): """Callback executed on task retry.""" ti = context['task_instance'] print(f"Task {ti.task_id} retrying (attempt {ti.try_number})")

def sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis): """Callback when SLA is missed.""" print(f"SLA missed for tasks: {task_list}") # Send SLA alert

default_args = { 'owner': 'data-team', 'retries': 3, 'retry_delay': timedelta(minutes=5), 'retry_exponential_backoff': True, 'max_retry_delay': timedelta(minutes=30), 'on_success_callback': on_success_callback, 'on_failure_callback': on_failure_callback, 'on_retry_callback': on_retry_callback, }

with DAG( dag_id='error_handling_demo', default_args=default_args, schedule_interval='@daily', start_date=datetime(2026, 1, 1), catchup=False, sla_miss_callback=sla_miss_callback, tags=['error-handling', 'callbacks'], ) as dag:

# Task with potential failure
def potentially_failing_task(**context):
    """Task that might fail."""
    import random
    if random.random() < 0.3:
        raise ValueError("Random failure for demonstration")
    return "Success!"

risky_task = PythonOperator(
    task_id='risky_task',
    python_callable=potentially_failing_task,
    sla=timedelta(minutes=30),  # SLA deadline
)

# Cleanup task - always runs
cleanup = BashOperator(
    task_id='cleanup',
    bash_command='echo "Cleaning up resources..."',
    trigger_rule=TriggerRule.ALL_DONE,  # Run regardless of upstream status
)

# Error handler task - only runs on upstream failure
def handle_error(**context):
    """Handle upstream failures."""
    ti = context['ti']
    upstream_tasks = ti.get_dagrun().get_task_instances()

    failed_tasks = [t for t in upstream_tasks if t.state == 'failed']
    if failed_tasks:
        print(f"Failed tasks: {[t.task_id for t in failed_tasks]}")
        # Implement recovery logic

error_handler = PythonOperator(
    task_id='handle_errors',
    python_callable=handle_error,
    trigger_rule=TriggerRule.ONE_FAILED,  # Run if any upstream fails
)

# Success notification - only runs if all succeeded
success_notify = BashOperator(
    task_id='success_notification',
    bash_command='echo "All tasks completed successfully!"',
    trigger_rule=TriggerRule.ALL_SUCCESS,
)

# Dependencies
risky_task >> [cleanup, error_handler, success_notify]

8. Docker and Kubernetes Deployment

docker-compose.yml - Production-ready Airflow deployment

version: '3.8'

x-airflow-common: &airflow-common image: apache/airflow:2.8.1 environment: &airflow-common-env AIRFLOW__CORE__EXECUTOR: CeleryExecutor AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0 AIRFLOW__CORE__FERNET_KEY: '' AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true' AIRFLOW__CORE__LOAD_EXAMPLES: 'false' AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth' AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true' # Production settings AIRFLOW__CORE__PARALLELISM: 32 AIRFLOW__CORE__MAX_ACTIVE_TASKS_PER_DAG: 16 AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 3 AIRFLOW__SCHEDULER__PARSING_PROCESSES: 4 AIRFLOW__CELERY__WORKER_CONCURRENCY: 8 volumes: - ./dags:/opt/airflow/dags - ./logs:/opt/airflow/logs - ./plugins:/opt/airflow/plugins - ./config:/opt/airflow/config user: "${AIRFLOW_UID:-50000}:0" depends_on: &airflow-common-depends-on redis: condition: service_healthy postgres: condition: service_healthy

services: postgres: image: postgres:15 environment: POSTGRES_USER: airflow POSTGRES_PASSWORD: airflow POSTGRES_DB: airflow volumes: - postgres-db-volume:/var/lib/postgresql/data healthcheck: test: ["CMD", "pg_isready", "-U", "airflow"] interval: 10s retries: 5 start_period: 5s restart: always

redis: image: redis:7 healthcheck: test: ["CMD", "redis-cli", "ping"] interval: 10s timeout: 30s retries: 50 start_period: 30s restart: always

airflow-webserver: <<: *airflow-common command: webserver ports: - "8080:8080" healthcheck: test: ["CMD", "curl", "--fail", "http://localhost:8080/health"] interval: 30s timeout: 10s retries: 5 start_period: 30s restart: always depends_on: <<: *airflow-common-depends-on airflow-init: condition: service_completed_successfully

airflow-scheduler: <<: *airflow-common command: scheduler healthcheck: test: ["CMD", "curl", "--fail", "http://localhost:8974/health"] interval: 30s timeout: 10s retries: 5 start_period: 30s restart: always depends_on: <<: *airflow-common-depends-on airflow-init: condition: service_completed_successfully

airflow-worker: <<: *airflow-common command: celery worker healthcheck: test: - "CMD-SHELL" - 'celery --app airflow.providers.celery.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"' interval: 30s timeout: 10s retries: 5 start_period: 30s restart: always depends_on: <<: *airflow-common-depends-on airflow-init: condition: service_completed_successfully

airflow-triggerer: <<: *airflow-common command: triggerer healthcheck: test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"'] interval: 30s timeout: 10s retries: 5 start_period: 30s restart: always depends_on: <<: *airflow-common-depends-on airflow-init: condition: service_completed_successfully

airflow-init: <<: *airflow-common entrypoint: /bin/bash command: - -c - | airflow db init airflow users create
--username admin
--password admin
--firstname Admin
--lastname User
--role Admin
--email admin@example.com environment: <<: *airflow-common-env _AIRFLOW_DB_MIGRATE: 'true' _AIRFLOW_WWW_USER_CREATE: 'true' user: "0:0"

volumes: postgres-db-volume:

kubernetes/values.yaml - Helm chart values for Kubernetes deployment

executor: KubernetesExecutor

Airflow configuration

config: core: dags_are_paused_at_creation: 'true' load_examples: 'false' parallelism: 32 max_active_tasks_per_dag: 16 scheduler: parsing_processes: 4 kubernetes: delete_worker_pods: 'true' delete_worker_pods_on_failure: 'false'

Web server

webserver: replicas: 2 resources: requests: memory: "1Gi" cpu: "500m" limits: memory: "2Gi" cpu: "1000m"

Scheduler

scheduler: replicas: 2 resources: requests: memory: "2Gi" cpu: "1000m" limits: memory: "4Gi" cpu: "2000m"

Worker pod template

workers: resources: requests: memory: "1Gi" cpu: "500m" limits: memory: "2Gi" cpu: "1000m"

DAGs configuration

dags: persistence: enabled: true size: 10Gi gitSync: enabled: true repo: https://github.com/org/airflow-dags.git branch: main wait: 60

Logs

logs: persistence: enabled: true size: 50Gi

Database

postgresql: enabled: true persistence: enabled: true size: 20Gi

Redis (for Celery if using)

redis: enabled: false

Ingress

ingress: enabled: true web: annotations: kubernetes.io/ingress.class: nginx cert-manager.io/cluster-issuer: letsencrypt-prod hosts: - name: airflow.example.com tls: enabled: true secretName: airflow-tls

Integration Examples

Integration with AWS Services

dags/aws_integration.py

""" DAG integrating with AWS services. """ from datetime import datetime, timedelta from airflow import DAG from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator from airflow.providers.amazon.aws.transfers.local_to_s3 import LocalFilesystemToS3Operator from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator from airflow.providers.amazon.aws.operators.glue import GlueJobOperator from airflow.providers.amazon.aws.operators.athena import AthenaOperator

default_args = { 'owner': 'data-team', 'retries': 2, }

with DAG( dag_id='aws_integration_pipeline', default_args=default_args, schedule_interval='@daily', start_date=datetime(2026, 1, 1), catchup=False, tags=['aws', 'integration'], ) as dag:

# Upload to S3
upload_to_s3 = LocalFilesystemToS3Operator(
    task_id='upload_to_s3',
    filename='/data/output/{{ ds }}/data.parquet',
    dest_key='raw/{{ ds }}/data.parquet',
    dest_bucket='my-data-lake',
    aws_conn_id='aws_default',
    replace=True,
)

# Run Glue ETL job
run_glue_job = GlueJobOperator(
    task_id='run_glue_etl',
    job_name='my-etl-job',
    script_args={
        '--input_path': 's3://my-data-lake/raw/{{ ds }}/',
        '--output_path': 's3://my-data-lake/processed/{{ ds }}/',
    },
    aws_conn_id='aws_default',
    wait_for_completion=True,
)

# Query with Athena
run_athena_query = AthenaOperator(
    task_id='run_athena_analysis',
    query="""
        SELECT date, COUNT(*) as count, SUM(value) as total
        FROM processed_data
        WHERE partition_date = '{{ ds }}'
        GROUP BY date
    """,
    database='analytics',
    output_location='s3://my-data-lake/athena-results/',
    aws_conn_id='aws_default',
)

# Load to Redshift
load_to_redshift = S3ToRedshiftOperator(
    task_id='load_to_redshift',
    schema='public',
    table='fact_daily_metrics',
    s3_bucket='my-data-lake',
    s3_key='processed/{{ ds }}/',
    redshift_conn_id='redshift_warehouse',
    aws_conn_id='aws_default',
    copy_options=['FORMAT AS PARQUET'],
)

upload_to_s3 >> run_glue_job >> run_athena_query >> load_to_redshift

Best Practices

  1. DAG Design Principles

Use meaningful DAG and task IDs

dag_id='sales_daily_etl_pipeline' # Good dag_id='dag1' # Bad

Set appropriate concurrency limits

max_active_runs=1 # For data pipelines with dependencies max_active_tasks_per_dag=16 # Limit resource usage

Use tags for organization

tags=['production', 'etl', 'sales']

Always set catchup=False unless backfill needed

catchup=False

Use execution_timeout to prevent stuck tasks

execution_timeout=timedelta(hours=2)

  1. Task Best Practices

Keep tasks atomic and idempotent

def process_partition(partition_date: str): """Idempotent: can be safely re-run.""" # Delete existing data for this partition delete_partition(partition_date) # Process and insert new data insert_data(partition_date)

Use retries with exponential backoff

default_args = { 'retries': 3, 'retry_delay': timedelta(minutes=5), 'retry_exponential_backoff': True, }

Avoid heavy processing in sensors

Bad: sensor does complex computation

Good: sensor checks simple condition, processing in separate task

  1. Configuration Management

Use Variables for configuration, not hardcoded values

batch_size = Variable.get('batch_size', default_var=1000)

Use Connections for credentials

conn = BaseHook.get_connection('my_database')

Environment-specific configuration

env = Variable.get('environment') config = Variable.get(f'config_{env}', deserialize_json=True)

  1. Testing DAGs

tests/test_dags.py

import pytest from airflow.models import DagBag

def test_dag_loads(): """Test that DAGs load without errors.""" dagbag = DagBag() assert len(dagbag.import_errors) == 0

def test_dag_structure(): """Test DAG has expected structure.""" dagbag = DagBag() dag = dagbag.get_dag('my_pipeline')

assert dag is not None
assert len(dag.tasks) == 5
assert dag.schedule_interval == '@daily'

Troubleshooting

Common Issues

Issue: DAG not appearing in UI

Check for import errors

airflow dags list-import-errors

Validate DAG file

python dags/my_dag.py

Check scheduler logs

docker logs airflow-scheduler-1 | grep -i error

Issue: Tasks stuck in queued state

Check worker status

airflow celery status

Verify executor configuration

airflow config get-value core executor

Check for resource constraints

kubectl top pods -n airflow

Issue: XCom size limits

Use external storage for large data

def store_large_result(**context): # Store in S3 instead of XCom s3_hook.load_string(large_data, key='results/data.json', bucket='my-bucket') return 's3://my-bucket/results/data.json' # Return reference only

Issue: Scheduler performance

Tune scheduler settings

AIRFLOW__SCHEDULER__PARSING_PROCESSES: 4 AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL: 30 AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 60

Debugging Tips

Add detailed logging

import logging logger = logging.getLogger(name)

def my_task(**context): logger.info(f"Starting task with context: {context}") # ... task logic logger.debug(f"Intermediate result: {result}")

Test specific task

airflow tasks test my_dag my_task 2026-01-15

Clear task state for re-run

airflow tasks clear my_dag -t my_task -s 2026-01-15 -e 2026-01-15

Trigger DAG run

airflow dags trigger my_dag --conf '{"key": "value"}'

Version History

Version Date Changes

1.0.0 2026-01-17 Initial release with comprehensive workflow patterns

Resources

  • Apache Airflow Documentation

  • Airflow Best Practices

  • Airflow Helm Chart

  • Astronomer Guides

  • Airflow Providers

This skill provides production-ready patterns for Apache Airflow workflow orchestration, tested across enterprise data pipelines.

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Automation

git-worktree-workflow

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

yaml-workflow-executor

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

agent-os-framework

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

agenta

No summary provided by upstream source.

Repository SourceNeeds Review