temporal-io

Temporal.io Workflow Orchestration

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "temporal-io" with this command: npx skills add yonatangross/orchestkit/yonatangross-orchestkit-temporal-io

Temporal.io Workflow Orchestration

Durable execution engine for reliable distributed applications.

Overview

  • Long-running business processes (days/weeks/months)

  • Saga patterns requiring compensation/rollback

  • Microservice orchestration with retries

  • Systems requiring exactly-once execution guarantees

  • Complex state machines with human-in-the-loop

  • Scheduled and recurring workflows

Workflow Definition

from temporalio import workflow from temporalio.common import RetryPolicy from datetime import timedelta

@workflow.defn class OrderWorkflow: def init(self): self._status = "pending" self._order_id: str | None = None

@workflow.run
async def run(self, order_data: OrderInput) -> OrderResult:
    self._order_id = await workflow.execute_activity(
        create_order, order_data,
        start_to_close_timeout=timedelta(seconds=30),
        retry_policy=RetryPolicy(maximum_attempts=3, initial_interval=timedelta(seconds=1)),
    )
    self._status = "processing"

    # Parallel activities
    payment, inventory = await asyncio.gather(
        workflow.execute_activity(process_payment, PaymentInput(order_id=self._order_id), start_to_close_timeout=timedelta(minutes=5)),
        workflow.execute_activity(reserve_inventory, InventoryInput(order_id=self._order_id), start_to_close_timeout=timedelta(minutes=2)),
    )

    self._status = "completed"
    return OrderResult(order_id=self._order_id, payment_id=payment.id)

@workflow.query
def get_status(self) -> str:
    return self._status

@workflow.signal
async def cancel_order(self, reason: str):
    self._status = "cancelling"
    await workflow.execute_activity(cancel_order_activity, CancelInput(order_id=self._order_id), start_to_close_timeout=timedelta(seconds=30))
    self._status = "cancelled"

Activity Definition

from temporalio import activity from temporalio.exceptions import ApplicationError

@activity.defn async def process_payment(input: PaymentInput) -> PaymentResult: activity.logger.info(f"Processing payment for order {input.order_id}") try: async with httpx.AsyncClient() as client: response = await client.post("https://payments.example.com/charge", json={"order_id": input.order_id, "amount": input.amount}) response.raise_for_status() return PaymentResult(**response.json()) except httpx.HTTPStatusError as e: if e.response.status_code == 402: raise ApplicationError("Payment declined", non_retryable=True, type="PaymentDeclined") raise

@activity.defn async def send_notification(input: NotificationInput) -> None: for i, recipient in enumerate(input.recipients): activity.heartbeat(f"Sending {i+1}/{len(input.recipients)}") # For long operations await send_email(recipient, input.subject, input.body)

Worker and Client

from temporalio.client import Client from temporalio.worker import Worker

async def main(): client = await Client.connect("localhost:7233") worker = Worker( client, task_queue="order-processing", workflows=[OrderWorkflow], activities=[create_order, process_payment, reserve_inventory, cancel_order_activity], ) await worker.run()

async def start_order_workflow(order_data: OrderInput) -> str: client = await Client.connect("localhost:7233") handle = await client.start_workflow( OrderWorkflow.run, order_data, id=f"order-{order_data.order_id}", task_queue="order-processing", ) return handle.id

async def get_order_status(workflow_id: str) -> str: client = await Client.connect("localhost:7233") handle = client.get_workflow_handle(workflow_id) return await handle.query(OrderWorkflow.get_status)

Saga Pattern with Compensation

@workflow.defn class OrderSagaWorkflow: @workflow.run async def run(self, order: OrderInput) -> OrderResult: compensations: list[tuple[Callable, Any]] = []

    try:
        reservation = await workflow.execute_activity(reserve_inventory, order.items, start_to_close_timeout=timedelta(minutes=2))
        compensations.append((release_inventory, reservation.id))

        payment = await workflow.execute_activity(charge_payment, PaymentInput(order_id=order.id), start_to_close_timeout=timedelta(minutes=5))
        compensations.append((refund_payment, payment.id))

        shipment = await workflow.execute_activity(create_shipment, ShipmentInput(order_id=order.id), start_to_close_timeout=timedelta(minutes=3))
        return OrderResult(order_id=order.id, payment_id=payment.id, shipment_id=shipment.id)

    except Exception:
        workflow.logger.warning(f"Saga failed, running {len(compensations)} compensations")
        for compensate_fn, compensate_arg in reversed(compensations):
            try:
                await workflow.execute_activity(compensate_fn, compensate_arg, start_to_close_timeout=timedelta(minutes=2))
            except Exception as e:
                workflow.logger.error(f"Compensation failed: {e}")
        raise

Timers and Scheduling

@workflow.defn class TimeoutWorkflow: @workflow.run async def run(self, input: TaskInput) -> TaskResult: try: await workflow.wait_condition(lambda: self._approved is not None, timeout=timedelta(hours=24)) except asyncio.TimeoutError: return TaskResult(status="auto_rejected") return TaskResult(status="approved" if self._approved else "rejected")

@workflow.signal
async def approve(self, approved: bool):
    self._approved = approved

Testing

import pytest from temporalio.testing import WorkflowEnvironment

@pytest.fixture async def workflow_env(): async with await WorkflowEnvironment.start_local() as env: yield env

@pytest.mark.asyncio async def test_order_workflow(workflow_env): async with Worker(workflow_env.client, task_queue="test", workflows=[OrderWorkflow], activities=[create_order, process_payment]): result = await workflow_env.client.execute_workflow( OrderWorkflow.run, OrderInput(id="test-1", total=100), id="test-order-1", task_queue="test", ) assert result.order_id == "test-1"

Key Decisions

Decision Recommendation

Workflow ID Business-meaningful, idempotent (e.g., order-{order_id} )

Task queue Per-service or per-workflow-type

Activity timeout start_to_close for most cases

Retry policy 3 attempts default, exponential backoff

Heartbeating Required for activities > 60s

Anti-Patterns (FORBIDDEN)

NEVER do non-deterministic operations in workflows

if random.random() > 0.5: # Different on replay! if datetime.now() > deadline: # Different on replay!

CORRECT: Use workflow APIs

if await workflow.random() > 0.5: if workflow.now() > deadline:

NEVER make network calls directly in workflows

response = await httpx.get("https://api.example.com") # WRONG!

CORRECT: Use activities for I/O

response = await workflow.execute_activity(fetch_data, ...)

NEVER ignore activity idempotency - use upsert with order_id as key

Related Skills

  • saga-patterns

  • Distributed transaction patterns

  • message-queues

  • Event-driven integration

  • resilience-patterns

  • Retry and circuit breaker patterns

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Automation

agent-orchestration

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

git-workflow

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

agentic-rag-patterns

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

agent-loops

No summary provided by upstream source.

Repository SourceNeeds Review