durable-task-python

Durable Task Python SDK with Durable Task Scheduler

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "durable-task-python" with this command: npx skills add azure-samples/durable-task-scheduler/azure-samples-durable-task-scheduler-durable-task-python

Durable Task Python SDK with Durable Task Scheduler

Build fault-tolerant, stateful workflows in Python applications using the Durable Task SDK connected to Azure Durable Task Scheduler.

Quick Start

Required Packages

pip install durabletask durabletask-azuremanaged azure-identity

Or add to requirements.txt :

durabletask durabletask-azuremanaged azure-identity

Minimal Worker + Client Setup

import os from azure.identity import DefaultAzureCredential from durabletask import task from durabletask.client import OrchestrationStatus from durabletask.azuremanaged.client import DurableTaskSchedulerClient from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker

Activity function

def hello(ctx: task.ActivityContext, name: str) -> str: return f"Hello {name}!"

Orchestrator function

def my_orchestration(ctx: task.OrchestrationContext, input: str): result = yield ctx.call_activity(hello, input=input) return result

Configuration - defaults to local emulator

taskhub = os.getenv("TASKHUB", "default") endpoint = os.getenv("ENDPOINT", "http://localhost:8080") secure_channel = endpoint != "http://localhost:8080" credential = None if endpoint == "http://localhost:8080" else DefaultAzureCredential()

Start worker and run orchestration

with DurableTaskSchedulerWorker( host_address=endpoint, secure_channel=secure_channel, taskhub=taskhub, token_credential=credential ) as worker: worker.add_orchestrator(my_orchestration) worker.add_activity(hello) worker.start()

# Create client and schedule orchestration
dts_client = DurableTaskSchedulerClient(
    host_address=endpoint,
    secure_channel=secure_channel,
    taskhub=taskhub,
    token_credential=credential
)

instance_id = dts_client.schedule_new_orchestration(my_orchestration, input="World")
state = dts_client.wait_for_orchestration_completion(instance_id, timeout=60)

if state and state.runtime_status == OrchestrationStatus.COMPLETED:
    print(f"Result: {state.serialized_output}")

Pattern Selection Guide

Pattern Use When

Function Chaining Sequential steps where each depends on the previous

Fan-Out/Fan-In Parallel processing with aggregated results

Human Interaction Workflow pauses for external input/approval

Durable Entities Stateful objects with operations (counters, accounts)

Sub-Orchestrations Reusable workflow components or version isolation

Eternal Orchestrations Long-running background processes with continue_as_new

Monitoring Periodic polling with configurable timeouts

See references/patterns.md for detailed implementations.

Orchestration Structure

Basic Orchestrator

def my_orchestration(ctx: task.OrchestrationContext, input: str): """Orchestrator function - MUST be deterministic""" # Call activities sequentially step1 = yield ctx.call_activity(step1_activity, input=input) step2 = yield ctx.call_activity(step2_activity, input=step1) return step2

Basic Activity

def my_activity(ctx: task.ActivityContext, input: str) -> str: """Activity function - can have side effects, I/O, non-determinism""" # Perform actual work here print(f"Processing: {input}") return f"Processed: {input}"

Registering with Worker

with DurableTaskSchedulerWorker(...) as worker: worker.add_orchestrator(my_orchestration) worker.add_activity(step1_activity) worker.add_activity(step2_activity) worker.start()

Critical Rules

Orchestration Determinism

Orchestrations replay from history - all code MUST be deterministic. When an orchestration resumes, it replays all previous code to rebuild state. Non-deterministic code produces different results on replay, causing failures.

NEVER do inside orchestrations:

  • datetime.now() , datetime.utcnow() → Use ctx.current_utc_datetime

  • uuid.uuid4() → Use ctx.new_uuid()

  • random.random() → Pass random values from activities

  • Direct I/O, HTTP calls, database access → Move to activities

  • time.sleep() , asyncio.sleep() → Use ctx.create_timer()

  • Environment variables that may change → Pass as input or use activities

  • Global mutable state → Pass state through activity results

ALWAYS use:

  • yield ctx.call_activity()

  • Call activities

  • yield ctx.call_sub_orchestrator()

  • Call sub-orchestrations

  • yield ctx.create_timer()

  • Durable delays

  • yield ctx.wait_for_external_event()

  • Wait for events

  • ctx.current_utc_datetime

  • Current time

  • ctx.new_uuid()

  • Generate GUIDs

  • ctx.set_custom_status()

  • Set status

Non-Determinism Patterns (WRONG vs CORRECT)

Getting Current Time

WRONG - datetime.now() returns different value on replay

def bad_orchestration(ctx: task.OrchestrationContext, _): current_time = datetime.now() # Non-deterministic! if current_time.hour < 12: yield ctx.call_activity(morning_activity)

CORRECT - ctx.current_utc_datetime is replayed consistently

def good_orchestration(ctx: task.OrchestrationContext, _): current_time = ctx.current_utc_datetime # Deterministic if current_time.hour < 12: yield ctx.call_activity(morning_activity)

Generating UUIDs/Random Values

WRONG - uuid4() generates different value on replay

def bad_orchestration(ctx: task.OrchestrationContext, _): order_id = str(uuid.uuid4()) # Non-deterministic! yield ctx.call_activity(create_order, input=order_id)

CORRECT - ctx.new_uuid() replays the same value

def good_orchestration(ctx: task.OrchestrationContext, _): order_id = str(ctx.new_uuid()) # Deterministic yield ctx.call_activity(create_order, input=order_id)

Random Numbers

WRONG - random produces different values on replay

def bad_orchestration(ctx: task.OrchestrationContext, _): delay = random.randint(1, 10) # Non-deterministic! yield ctx.create_timer(timedelta(seconds=delay))

CORRECT - generate random in activity, pass to orchestrator

def get_random_delay(ctx: task.ActivityContext, _) -> int: return random.randint(1, 10) # OK in activity

def good_orchestration(ctx: task.OrchestrationContext, _): delay = yield ctx.call_activity(get_random_delay) # Deterministic yield ctx.create_timer(timedelta(seconds=delay))

Sleeping/Delays

WRONG - time.sleep blocks and doesn't persist

def bad_orchestration(ctx: task.OrchestrationContext, _): yield ctx.call_activity(step1) time.sleep(60) # Non-durable! Lost on restart yield ctx.call_activity(step2)

CORRECT - ctx.create_timer is durable

def good_orchestration(ctx: task.OrchestrationContext, _): yield ctx.call_activity(step1) yield ctx.create_timer(timedelta(seconds=60)) # Durable timer yield ctx.call_activity(step2)

HTTP Calls and I/O

WRONG - HTTP call in orchestrator is non-deterministic

def bad_orchestration(ctx: task.OrchestrationContext, url: str): import requests response = requests.get(url) # Non-deterministic! return response.json()

CORRECT - move I/O to activity

def fetch_data(ctx: task.ActivityContext, url: str) -> dict: import requests response = requests.get(url) # OK in activity return response.json()

def good_orchestration(ctx: task.OrchestrationContext, url: str): data = yield ctx.call_activity(fetch_data, input=url) # Deterministic return data

Database Access

WRONG - database query in orchestrator

def bad_orchestration(ctx: task.OrchestrationContext, user_id: str): import sqlite3 conn = sqlite3.connect('db.sqlite') # Non-deterministic! cursor = conn.execute("SELECT * FROM users WHERE id=?", (user_id,)) user = cursor.fetchone() # ...

CORRECT - database access in activity

def get_user(ctx: task.ActivityContext, user_id: str) -> dict: import sqlite3 conn = sqlite3.connect('db.sqlite') # OK in activity cursor = conn.execute("SELECT * FROM users WHERE id=?", (user_id,)) return dict(cursor.fetchone())

def good_orchestration(ctx: task.OrchestrationContext, user_id: str): user = yield ctx.call_activity(get_user, input=user_id) # ...

Environment Variables

WRONG - env var might change between replays

def bad_orchestration(ctx: task.OrchestrationContext, _): api_endpoint = os.getenv("API_ENDPOINT") # Could change! yield ctx.call_activity(call_api, input=api_endpoint)

CORRECT - pass config as input or read in activity

def good_orchestration(ctx: task.OrchestrationContext, config: dict): api_endpoint = config["api_endpoint"] # From input, deterministic yield ctx.call_activity(call_api, input=api_endpoint)

ALSO CORRECT - read env var in activity

def call_api(ctx: task.ActivityContext, _) -> str: api_endpoint = os.getenv("API_ENDPOINT") # OK in activity # make the call...

Conditional Logic Based on External State

WRONG - file existence can change between replays

def bad_orchestration(ctx: task.OrchestrationContext, path: str): if os.path.exists(path): # Non-deterministic! yield ctx.call_activity(process_file, input=path)

CORRECT - check in activity

def check_file_exists(ctx: task.ActivityContext, path: str) -> bool: return os.path.exists(path) # OK in activity

def good_orchestration(ctx: task.OrchestrationContext, path: str): exists = yield ctx.call_activity(check_file_exists, input=path) if exists: # Deterministic - based on activity result yield ctx.call_activity(process_file, input=path)

Dictionary/Set Iteration Order

POTENTIALLY WRONG - dict iteration order may vary (Python < 3.7)

def risky_orchestration(ctx: task.OrchestrationContext, items: dict): for key in items: # Order might not be guaranteed yield ctx.call_activity(process, input=key)

CORRECT - use sorted keys for deterministic order

def good_orchestration(ctx: task.OrchestrationContext, items: dict): for key in sorted(items.keys()): # Guaranteed order yield ctx.call_activity(process, input=key)

Thread-Local or Global State

WRONG - global state can change

counter = 0

def bad_orchestration(ctx: task.OrchestrationContext, _): global counter counter += 1 # Non-deterministic across replays! yield ctx.call_activity(process, input=counter)

CORRECT - pass state through orchestration input/output

def good_orchestration(ctx: task.OrchestrationContext, counter: int): counter += 1 # Local variable, deterministic yield ctx.call_activity(process, input=counter) # If continuing, pass counter forward ctx.continue_as_new(counter)

Using yield

In Python, orchestrator functions use yield to await durable operations:

CORRECT - use yield

result = yield ctx.call_activity(my_activity, input="data")

WRONG - will not work

result = ctx.call_activity(my_activity, input="data") # Missing yield!

Error Handling

def orchestrator_with_error_handling(ctx: task.OrchestrationContext, input: str): try: result = yield ctx.call_activity(risky_activity, input=input) return result except task.TaskFailedError as e: # Activity failed - implement compensation ctx.set_custom_status({"error": str(e)}) yield ctx.call_activity(compensation_activity, input=input) return "Compensated"

Retry Policies

from durabletask.task import RetryPolicy

retry_policy = RetryPolicy( first_retry_interval=5, # seconds max_number_of_attempts=3, backoff_coefficient=2.0, max_retry_interval=60, # seconds retry_timeout=300 # seconds )

def orchestrator(ctx: task.OrchestrationContext, _): result = yield ctx.call_activity( unreliable_activity, input="data", retry_policy=retry_policy ) return result

Working with Custom Types

The SDK supports dataclasses, namedtuples, and custom classes:

from dataclasses import dataclass

@dataclass class Order: product: str quantity: int cost: float

def process_order(ctx: task.ActivityContext, order: Order) -> str: return f"Processed {order.quantity}x {order.product}"

def order_workflow(ctx: task.OrchestrationContext, order: Order): result = yield ctx.call_activity(process_order, input=order) return result

Connection & Authentication

Local Emulator (Default)

No authentication required

taskhub = "default" endpoint = "http://localhost:8080" credential = None secure_channel = False

Azure with DefaultAzureCredential

from azure.identity import DefaultAzureCredential

taskhub = "my-taskhub" endpoint = "https://my-scheduler.region.durabletask.io" credential = DefaultAzureCredential() secure_channel = True

Authentication Helper

def get_connection_config(): endpoint = os.getenv("ENDPOINT", "http://localhost:8080") taskhub = os.getenv("TASKHUB", "default")

is_local = endpoint == "http://localhost:8080"

return {
    "host_address": endpoint,
    "taskhub": taskhub,
    "secure_channel": not is_local,
    "token_credential": None if is_local else DefaultAzureCredential()
}

config = get_connection_config() worker = DurableTaskSchedulerWorker(**config) client = DurableTaskSchedulerClient(**config)

Local Development with Emulator

Pull and run the emulator

docker pull mcr.microsoft.com/dts/dts-emulator:latest docker run -d -p 8080:8080 -p 8082:8082 --name dts-emulator mcr.microsoft.com/dts/dts-emulator:latest

Dashboard available at http://localhost:8082

Client Operations

Schedule new orchestration

instance_id = client.schedule_new_orchestration(my_orchestration, input="data")

Schedule with custom instance ID

instance_id = client.schedule_new_orchestration( my_orchestration, input="data", instance_id="my-custom-id" )

Wait for completion

state = client.wait_for_orchestration_completion(instance_id, timeout=60)

Get current status

state = client.get_orchestration_state(instance_id)

Raise external event

client.raise_orchestration_event(instance_id, "approval_received", data=approval_data)

Terminate orchestration

client.terminate_orchestration(instance_id, output="User cancelled")

Suspend/Resume

client.suspend_orchestration(instance_id) client.resume_orchestration(instance_id)

References

  • patterns.md - Detailed pattern implementations (Fan-Out/Fan-In, Human Interaction, Entities, Sub-Orchestrations)

  • setup.md - Azure Durable Task Scheduler provisioning and deployment

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Automation

durable-functions-dotnet

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

durable-task-dotnet

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

durable-task-java

No summary provided by upstream source.

Repository SourceNeeds Review