workflow-canvas

Purpose: Implement Celery canvas patterns for composing complex, distributed task workflows.

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "workflow-canvas" with this command: npx skills add vanman2024/ai-dev-marketplace/vanman2024-ai-dev-marketplace-workflow-canvas

Workflow Canvas

Purpose: Implement Celery canvas patterns for composing complex, distributed task workflows.

Activation Triggers:

  • Sequential task execution needed (chains)

  • Parallel task processing required (groups)

  • Synchronization after parallel work (chords)

  • Complex workflow composition

  • Task result aggregation

  • Error handling in workflows

  • Nested or conditional workflows

Key Resources:

  • scripts/test-workflow.sh

  • Test workflow execution and validate patterns

  • scripts/validate-canvas.sh

  • Validate canvas structure and dependencies

  • scripts/generate-workflow.sh

  • Generate workflows from templates

  • templates/

  • Complete workflow pattern implementations

  • examples/

  • Real-world workflow scenarios with explanations

Canvas Primitives

  1. Signatures

Foundation of canvas system - encapsulate task invocation details:

from celery import signature

Named signature

signature('tasks.add', args=(2, 2), countdown=10)

Using task method

add.signature((2, 2), countdown=10)

Shortcut syntax (most common)

add.s(2, 2)

Immutable signature (prevents result forwarding)

add.si(2, 2)

Use templates/chain-workflow.py for signature examples.

  1. Chains

Sequential task execution where each task's result becomes next task's first argument:

from celery import chain

Explicit chain

chain(add.s(2, 2), add.s(4), add.s(8))()

Pipe syntax (recommended)

(add.s(2, 2) | add.s(4) | add.s(8))()

With immutable tasks (independent execution)

(add.si(2, 2) | process.s() | notify.si('done'))()

Key Pattern: Use .si() when task should NOT receive previous result.

See templates/chain-workflow.py for complete patterns.

  1. Groups

Execute multiple tasks in parallel, returns GroupResult:

from celery import group

Parallel execution

group(add.s(i, i) for i in range(10))()

With result tracking

job = group(process.s(item) for item in batch) result = job.apply_async() result.get() # Wait for all tasks

Requirements:

  • Tasks must NOT ignore results

  • Result backend required

See templates/group-parallel.py for implementation.

  1. Chords

Group header + callback that executes after all header tasks complete:

from celery import chord

Basic chord

chord(add.s(i, i) for i in range(100))(tsum.s()).get()

With error handling

chord( process.s(item) for item in batch )(aggregate_results.s()).on_error(handle_chord_error.s())

Critical Requirements:

  • Result backend MUST be enabled

  • Set task_ignore_result=False explicitly

  • Redis 2.2+ for proper operation

Error Handling: Failed header tasks → callback receives ChordError with task ID and exception.

See templates/chord-pattern.py for complete implementation.

  1. Map & Starmap

Built-in tasks for sequence processing (single message, sequential execution):

Map: one call per element

add.map([(2, 2), (4, 4), (8, 8)])

Starmap: unpacks tuples

add.starmap(zip(range(100), range(100)))

Difference from groups: Single task message vs multiple messages.

  1. Chunks

Divide large iterables into sized batches:

Process 100 items in chunks of 10

add.chunks(zip(range(100), range(100)), 10)

Returns: Nested lists corresponding to chunk outputs.

Advanced Patterns

Partial Application

Incomplete signature

partial = add.s(2)

Complete later (arguments prepended)

partial.delay(4) # Executes add(4, 2)

Kwargs merge with precedence

partial = process.s(timeout=30) partial.apply_async(kwargs={'timeout': 60}) # Uses 60

Nested Workflows

Combine primitives for complex logic:

Chain of chords

workflow = chain( chord([task1.s(), task2.s()])(callback1.s()), chord([task3.s(), task4.s()])(callback2.s()) )

Groups in chains

workflow = ( prepare.s() | group([process.s(i) for i in range(10)]) | finalize.s() )

See templates/nested-workflows.py for production patterns.

Error Callbacks

Task-level error handling

add.s(2, 2).on_error(log_error.s()).delay()

Chord error handling

chord( header_tasks )(callback.s()).on_error(handle_error.s())

Errback signature: (request, exc, traceback)

Execution: Synchronous in worker.

See templates/error-handling-workflow.py for comprehensive patterns.

Complex Workflow Example

from celery import chain, group, chord

Data processing pipeline

workflow = chain( # Stage 1: Fetch and validate fetch_data.s(), validate_data.s(),

# Stage 2: Parallel processing
group([
    transform_batch.s(i) for i in range(num_batches)
]),

# Stage 3: Aggregate results
chord([
    aggregate_batch.s(i) for i in range(num_batches)
])(finalize_report.s()),

# Stage 4: Notify
send_notification.si('pipeline_complete')

)

Execute with error handling

result = workflow.apply_async( link_error=handle_pipeline_error.s() )

See templates/complex-workflow.py for production-ready implementation.

Testing Workflows

Validate Canvas Structure

Check workflow composition

./scripts/validate-canvas.sh path/to/workflow.py

Validates:

- Result backend enabled

- Task ignore_result settings

- Proper signature usage

- Error callback patterns

Test Workflow Execution

Run workflow with test data

./scripts/test-workflow.sh workflow_name

Options:

--dry-run : Validate without execution

--verbose : Show detailed task flow

--timeout 30 : Set execution timeout

Best Practices

DO:

  • Enable result backend for groups/chords

  • Use .si() for independent tasks in chains

  • Implement error callbacks for critical workflows

  • Set explicit timeouts for long-running workflows

  • Use chunks for large data processing

  • Add metadata with stamping API (5.3+)

  • Make error callbacks idempotent

DON'T:

  • Have tasks wait synchronously for other tasks

  • Ignore results for tasks in groups

  • Forget to call super() in after_return() overrides

  • Use chords without Redis 2.2+

  • Create deeply nested workflows (>3 levels)

  • Mix synchronous and async task calls

Configuration Requirements

Celery Config:

Required for canvas

result_backend = 'redis://localhost:6379/0' result_extended = True # Store task args/kwargs

Recommended

task_track_started = True result_expires = 3600 # 1 hour

For large workflows

worker_prefetch_multiplier = 1 task_acks_late = True

Debugging

Visualize workflow:

from celery import DependencyGraph

graph = workflow.graph() with open('workflow.dot', 'w') as f: f.write(graph.to_dot())

Convert to image:

dot -Tpng workflow.dot -o workflow.png

Monitor execution:

result = workflow.apply_async() print(f"Task ID: {result.id}") print(f"Status: {result.status}") print(f"Children: {result.children}")

Resources

Templates:

  • chain-workflow.py

  • Sequential task patterns

  • group-parallel.py

  • Parallel execution patterns

  • chord-pattern.py

  • Synchronization patterns

  • complex-workflow.py

  • Multi-stage pipelines

  • error-handling-workflow.py

  • Error callback patterns

  • nested-workflows.py

  • Advanced composition

Scripts:

  • test-workflow.sh

  • Execute and validate workflows

  • validate-canvas.sh

  • Static analysis of canvas patterns

  • generate-workflow.sh

  • Generate workflows from templates

Examples:

  • examples/chain-example.md

  • Real-world chain scenarios

  • examples/group-example.md

  • Parallel processing use cases

  • examples/chord-example.md

  • Synchronization patterns

  • examples/complex-workflows.md

  • Production workflow architectures

Security Compliance

This skill follows strict security rules:

  • All code examples use placeholder values only

  • No real API keys, passwords, or secrets

  • Environment variable references in all code

  • .gitignore protection documented

Version: 1.0.0 Celery Compatibility: 5.0+ Required Backend: Redis 2.2+ or compatible result backend

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Coding

document-parsers

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

stt-integration

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

model-routing-patterns

No summary provided by upstream source.

Repository SourceNeeds Review