Airflow Human-in-the-Loop Operators
Implement human approval gates, form inputs, and human-driven branching in Airflow DAGs using the HITL operators. These deferrable operators pause workflow execution until a human responds via the Airflow UI or REST API.
Implementation Checklist
Execute steps in order. Prefer deferrable HITL operators over custom sensors or polling loops.
CRITICAL: Requires Airflow 3.1+. Not available in Airflow 2.x.
All HITL operators are deferrable and release their worker slot while waiting for input.
UI Location: Browse -> Required Actions in Airflow UI. Respond via the task instance Required Actions tab or the REST API.
Step 1: Choose operator
| Operator | Human action | Outcome |
|---|---|---|
ApprovalOperator | Approve or reject | Reject causes downstream tasks to be skipped (approval task itself succeeds) |
HITLOperator | Select option(s) plus form | Returns selections |
HITLBranchOperator | Select downstream task(s) | Runs selected, skips others |
HITLEntryOperator | Submit form | Returns form data |
Step 2: Implement operator
ApprovalOperator
from airflow.providers.standard.operators.hitl import ApprovalOperator
from airflow.sdk import dag, task, chain, Param
from pendulum import datetime
@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def approval_example():
@task
def prepare():
return "Review quarterly report"
approval = ApprovalOperator(
task_id="approve_report",
subject="Report Approval",
body="{{ ti.xcom_pull(task_ids='prepare') }}",
defaults="Approve",
params={"comments": Param("", type="string")},
)
@task
def after_approval(result):
print(f"Decision: {result['chosen_options']}")
chain(prepare(), approval)
after_approval(approval.output)
approval_example()
HITLOperator
Required parameters: subject and options.
from airflow.providers.standard.operators.hitl import HITLOperator
from airflow.sdk import dag, task, chain, Param
from datetime import timedelta
from pendulum import datetime
@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def hitl_example():
hitl = HITLOperator(
task_id="select_option",
subject="Select Payment Method",
body="Choose how to process payment",
options=["ACH", "Wire", "Check"],
defaults=["ACH"],
multiple=False,
execution_timeout=timedelta(hours=4),
params={"amount": Param(1000, type="number")},
)
@task
def process(result):
print(f"Selected: {result['chosen_options']}")
print(f"Amount: {result['params_input']['amount']}")
process(hitl.output)
hitl_example()
HITLBranchOperator
Options can either match downstream task IDs directly or use options_mapping.
from airflow.providers.standard.operators.hitl import HITLBranchOperator
from airflow.sdk import dag, task, chain
from pendulum import datetime
DEPTS = ["marketing", "engineering", "sales"]
@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def branch_example():
branch = HITLBranchOperator(
task_id="select_dept",
subject="Select Departments",
options=[f"Fund {d}" for d in DEPTS],
options_mapping={f"Fund {d}": d for d in DEPTS},
multiple=True,
)
for dept in DEPTS:
@task(task_id=dept)
def handle(dept_name: str = dept):
print(f"Processing {dept_name}")
chain(branch, handle())
branch_example()
HITLEntryOperator
from airflow.providers.standard.operators.hitl import HITLEntryOperator
from airflow.sdk import dag, task, chain, Param
from pendulum import datetime
@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def entry_example():
entry = HITLEntryOperator(
task_id="get_input",
subject="Enter Details",
body="Provide response",
params={
"response": Param("", type="string"),
"priority": Param("p3", type="string"),
},
)
@task
def process(result):
print(f"Response: {result['params_input']['response']}")
process(entry.output)
entry_example()
Step 3: Optional features
Notifiers
from airflow.sdk import BaseNotifier, Context
from airflow.providers.standard.operators.hitl import HITLOperator
class MyNotifier(BaseNotifier):
template_fields = ("message",)
def __init__(self, message=""):
self.message = message
def notify(self, context: Context):
if context["ti"].state == "running":
url = HITLOperator.generate_link_to_ui_from_context(context, base_url="https://airflow.example.com")
self.log.info(f"Action needed: {url}")
hitl = HITLOperator(..., notifiers=[MyNotifier("{{ task.subject }}")])
Restrict respondents
Format depends on your auth manager:
| Auth Manager | Format | Example |
|---|---|---|
| SimpleAuthManager | Username | ["admin", "manager"] |
| FabAuthManager | ["manager@example.com"] | |
| Astro | Astro ID | ["cl1a2b3cd456789ef1gh2ijkl3"] |
hitl = HITLOperator(..., respondents=["manager@example.com"])
Timeout behavior
Use execution_timeout to auto-resolve and defaults to set the choice on timeout.