python:temporal

Temporal workflow orchestration in Python. Use when designing workflows, implementing activities, handling retries, managing workflow state, or building durable distributed systems.

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "python:temporal" with this command: npx skills add martinffx/claude-code-atelier/martinffx-claude-code-atelier-python-temporal

Temporal Workflow Orchestration

Temporal SDK patterns for building durable, distributed workflows in Python.

Worker Setup

from temporalio.client import Client
from temporalio.worker import Worker

async def main():
    client = await Client.connect("localhost:7233")

    worker = Worker(
        client,
        task_queue="my-task-queue",
        workflows=[MyWorkflow],
        activities=[my_activity],
    )

    await worker.run()

Workflow Definition

from temporalio import workflow
from datetime import timedelta

@workflow.defn
class MyWorkflow:
    @workflow.run
    async def run(self, name: str) -> str:
        """Workflow run method"""
        # Execute activity
        result = await workflow.execute_activity(
            my_activity,
            name,
            start_to_close_timeout=timedelta(seconds=30),
        )

        return f"Hello {result}"

Activity Implementation

from temporalio import activity

@activity.defn
async def my_activity(name: str) -> str:
    """Activity - can fail and retry"""
    # Do work (database, API, etc.)
    return name.upper()

Starting Workflows

from temporalio.client import Client

async def start_workflow():
    client = await Client.connect("localhost:7233")

    handle = await client.start_workflow(
        MyWorkflow.run,
        "World",
        id="my-workflow-id",
        task_queue="my-task-queue",
    )

    result = await handle.result()
    print(result)  # "Hello WORLD"

Error Handling

from temporalio.exceptions import ActivityError

@workflow.defn
class MyWorkflow:
    @workflow.run
    async def run(self) -> str:
        try:
            result = await workflow.execute_activity(
                risky_activity,
                start_to_close_timeout=timedelta(seconds=30),
                retry_policy=RetryPolicy(maximum_attempts=3),
            )
        except ActivityError as e:
            # Handle failure after retries exhausted
            return "Failed"

        return result

Signals and Queries

@workflow.defn
class OrderWorkflow:
    def __init__(self):
        self.status = "pending"

    @workflow.run
    async def run(self, order_id: str) -> str:
        await workflow.wait_condition(lambda: self.status == "approved")
        return "Order processed"

    @workflow.signal
    def approve(self):
        """Signal to approve order"""
        self.status = "approved"

    @workflow.query
    def get_status(self) -> str:
        """Query current status"""
        return self.status

See references/ for testing patterns and common workflow patterns.

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Coding

python:architecture

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

python:build-tools

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

python:monorepo

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

python:modern-python

No summary provided by upstream source.

Repository SourceNeeds Review