routing-strategies

Celery Task Routing Strategies Skill

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "routing-strategies" with this command: npx skills add vanman2024/ai-dev-marketplace/vanman2024-ai-dev-marketplace-routing-strategies

Celery Task Routing Strategies Skill

This skill provides comprehensive templates, scripts, and patterns for implementing advanced task routing and queue management in Celery applications, including priority queues, topic-based routing, and worker-specific queue assignments.

Overview

Effective task routing is crucial for:

  • Performance Optimization - Route compute-intensive tasks to dedicated workers

  • Priority Management - High-priority tasks bypass slower queues

  • Resource Isolation - Separate critical operations from background jobs

  • Scalability - Independent scaling of different task types

This skill covers routing with RabbitMQ, Redis, and custom broker configurations.

Available Scripts

  1. Test Routing Configuration

Script: scripts/test-routing.sh <config-file>

Purpose: Validates routing configuration and tests queue connectivity

Checks:

  • Broker connectivity (RabbitMQ/Redis)

  • Queue declarations

  • Exchange configurations

  • Routing key patterns

  • Worker queue bindings

  • Priority queue setup

Usage:

Test routing configuration

./scripts/test-routing.sh ./celery_config.py

Test with custom broker URL

BROKER_URL=amqp://user:password@localhost:5672// ./scripts/test-routing.sh ./celery_config.py

Verbose output

VERBOSE=1 ./scripts/test-routing.sh ./celery_config.py

Exit Codes:

  • 0 : All routing tests passed

  • 1 : Configuration errors detected

  • 2 : Broker connection failed

  1. Validate Queue Configuration

Script: scripts/validate-queues.sh <project-dir>

Purpose: Validates queue setup across application code

Checks:

  • Task decorators use valid queues

  • No hardcoded queue names (use config)

  • All queues defined in routing configuration

  • Priority settings are valid (0-255)

  • Exchange types match routing patterns

  • Worker configurations reference valid queues

Usage:

Validate current project

./scripts/validate-queues.sh .

Validate specific directory

./scripts/validate-queues.sh /path/to/celery-app

Generate detailed report

REPORT=1 ./scripts/validate-queues.sh . > queue-validation-report.md

Exit Codes:

  • 0 : Validation passed

  • 1 : Validation failed (must fix issues)

Available Templates

  1. Basic Queue Configuration

Template: templates/queue-config.py

Features:

  • Default queue setup

  • Named queues for different task types

  • Queue-to-exchange bindings

  • Priority settings

  • Worker routing configuration

Usage:

from celery import Celery from templates.queue_config import CELERY_ROUTES, CELERY_QUEUES

app = Celery('myapp') app.conf.task_routes = CELERY_ROUTES app.conf.task_queues = CELERY_QUEUES

Configuration Example:

CELERY_QUEUES = ( Queue('default', Exchange('default'), routing_key='default'), Queue('high_priority', Exchange('default'), routing_key='high'), Queue('low_priority', Exchange('default'), routing_key='low'), Queue('emails', Exchange('emails'), routing_key='email.'), Queue('reports', Exchange('reports'), routing_key='report.'), )

CELERY_ROUTES = { 'myapp.tasks.send_email': {'queue': 'emails', 'routing_key': 'email.send'}, 'myapp.tasks.generate_report': {'queue': 'reports', 'routing_key': 'report.generate'}, 'myapp.tasks.urgent_task': {'queue': 'high_priority', 'priority': 9}, }

  1. Dynamic Routing Rules

Template: templates/routing-rules.py

Features:

  • Pattern-based routing

  • Conditional routing logic

  • Dynamic queue selection

  • Routing by task name patterns

  • Routing by task arguments

Key Functions:

def route_task(name, args, kwargs, options, task=None, **kw): """ Dynamic routing based on task name or arguments """ if name.startswith('urgent.'): return {'queue': 'high_priority', 'priority': 9}

if 'priority' in kwargs and kwargs['priority'] == 'high':
    return {'queue': 'high_priority'}

if name.startswith('email.'):
    return {'queue': 'emails', 'exchange': 'emails'}

return {'queue': 'default'}

app.conf.task_routes = (route_task,)

  1. Priority Queue Setup

Template: templates/priority-queues.py

Features:

  • Multi-level priority queues (0-255)

  • Priority inheritance

  • Default priority configuration

  • Queue priority enforcement

Priority Levels:

Priority queue configuration

CELERY_QUEUES = ( Queue('critical', Exchange('tasks'), routing_key='critical', queue_arguments={'x-max-priority': 10}), Queue('high', Exchange('tasks'), routing_key='high', queue_arguments={'x-max-priority': 10}), Queue('normal', Exchange('tasks'), routing_key='normal', queue_arguments={'x-max-priority': 10}), Queue('low', Exchange('tasks'), routing_key='low', queue_arguments={'x-max-priority': 10}), )

Task priority mapping

PRIORITY_LEVELS = { 'critical': 10, # Highest priority 'high': 7, 'normal': 5, 'low': 2, }

Apply priority to task

@app.task(priority=PRIORITY_LEVELS['high']) def urgent_processing(): pass

  1. Topic Exchange Routing

Template: templates/topic-exchange.py

Features:

  • Topic-based routing patterns

  • Wildcard routing keys

  • Multi-queue routing

  • Pattern matching

Topic Patterns:

from kombu import Exchange, Queue

Topic exchange setup

task_exchange = Exchange('tasks', type='topic', durable=True)

CELERY_QUEUES = ( # Match specific patterns Queue('user.notifications', exchange=task_exchange, routing_key='user.notification.*'),

# Match all email types
Queue('emails', exchange=task_exchange,
      routing_key='email.#'),

# Match processing tasks
Queue('processing', exchange=task_exchange,
      routing_key='*.processing.*'),

# Match all reports
Queue('reports', exchange=task_exchange,
      routing_key='report.*'),

)

Routing configuration

CELERY_ROUTES = { 'myapp.tasks.send_welcome_email': { 'exchange': 'tasks', 'routing_key': 'email.welcome.send' }, 'myapp.tasks.notify_user': { 'exchange': 'tasks', 'routing_key': 'user.notification.send' }, }

  1. Worker-Specific Routing

Template: templates/worker-routing.py

Features:

  • Dedicated worker pools

  • Worker-specific queues

  • CPU vs I/O task separation

  • Geographic routing

  • Resource-based routing

Worker Configuration:

Worker pool definitions

WORKER_POOLS = { 'cpu_intensive': { 'queues': ['ml_training', 'video_processing', 'data_analysis'], 'concurrency': 4, 'prefetch_multiplier': 1, }, 'io_intensive': { 'queues': ['api_calls', 'file_uploads', 'emails'], 'concurrency': 50, 'prefetch_multiplier': 10, }, 'general': { 'queues': ['default', 'background'], 'concurrency': 10, 'prefetch_multiplier': 4, }, }

Start workers

celery -A myapp worker --queues=ml_training,video_processing -c 4 -n cpu_worker@%h

celery -A myapp worker --queues=api_calls,file_uploads -c 50 -n io_worker@%h

Available Examples

  1. Priority Queue Setup Guide

Example: examples/priority-queue-setup.md

Demonstrates:

  • Configuring RabbitMQ priority queues

  • Setting task priorities

  • Priority inheritance patterns

  • Testing priority routing

  • Monitoring priority queue performance

Key Concepts:

  • Priority range: 0 (lowest) to 255 (highest)

  • RabbitMQ x-max-priority argument

  • Priority at task definition vs runtime

  • Queue argument configuration

  1. Topic-Based Routing Implementation

Example: examples/topic-routing.md

Demonstrates:

  • Topic exchange setup

  • Routing key patterns (* and # wildcards)

  • Multi-queue routing

  • Pattern matching strategies

  • Consumer binding patterns

Routing Key Patterns:

  • Matches exactly one word

  • Matches zero or more words

  • Example: email.*.send matches email.welcome.send , email.notification.send

  • Example: user.# matches user.create , user.update.profile

  1. Worker Queue Assignment Strategy

Example: examples/worker-queue-assignment.md

Demonstrates:

  • CPU-bound vs I/O-bound task separation

  • Worker pool configuration

  • Queue-to-worker mapping

  • Scaling strategies per worker type

  • Resource allocation patterns

Worker Types:

CPU-intensive workers (low concurrency)

celery -A myapp worker -Q ml_training,video_processing -c 4 -n cpu@%h

I/O-intensive workers (high concurrency)

celery -A myapp worker -Q api_calls,emails -c 50 -n io@%h

General purpose workers

celery -A myapp worker -Q default,background -c 10 -n general@%h

Routing Strategies Comparison

  1. Direct Exchange (Default)
  • Use Case: Simple queue-to-task mapping

  • Pros: Simple, predictable, fast

  • Cons: Limited flexibility

  • Example: Each task type goes to one specific queue

  1. Topic Exchange
  • Use Case: Pattern-based routing, hierarchical task categories

  • Pros: Flexible, supports wildcards, multi-queue routing

  • Cons: More complex configuration

  • Example: email.*.send routes all email types to email queue

  1. Fanout Exchange
  • Use Case: Broadcasting tasks to multiple queues

  • Pros: Simple broadcast mechanism

  • Cons: No routing logic, all queues receive all messages

  • Example: Notifications sent to multiple consumer types

  1. Headers Exchange
  • Use Case: Complex routing based on message headers

  • Pros: Very flexible, metadata-based routing

  • Cons: Performance overhead, complex configuration

  • Example: Route by priority=high and region=us-east

Performance Considerations

  1. Queue Configuration
  • Durable queues: Messages persist across broker restarts (use for critical tasks)

  • Transient queues: Faster but messages lost on restart (use for disposable tasks)

  • Queue length limits: Prevent memory issues with x-max-length

  1. Prefetch Settings
  • CPU-bound tasks: Low prefetch (1-2) to prevent blocking

  • I/O-bound tasks: High prefetch (10+) to keep workers busy

  • Configure per worker: celery worker --prefetch-multiplier=4

  1. Priority Queue Performance
  • RabbitMQ: Native priority support, efficient

  • Redis: Priority emulation via separate queues, less efficient

  • Trade-off: Priority checking adds overhead, use only when needed

Security Compliance

This skill follows strict security rules:

  • All code examples use placeholder values only

  • No real broker credentials, passwords, or secrets

  • Environment variable references in all code (BROKER_URL, BACKEND_URL)

  • .gitignore protection documented

  • Broker URLs use placeholder format: amqp://user:password@localhost:5672//

Never hardcode:

❌ WRONG

BROKER_URL = 'amqp://myuser:secretpass123@rabbitmq.example.com:5672//'

✅ CORRECT

import os BROKER_URL = os.environ.get('CELERY_BROKER_URL', 'amqp://guest:guest@localhost:5672//')

Best Practices

  • Use Environment-Based Configuration - Different queues for dev/staging/prod

  • Separate Critical Tasks - High-priority queue for time-sensitive operations

  • Match Worker to Task Type - CPU workers for compute, I/O workers for network/disk

  • Monitor Queue Lengths - Alert on queue buildup indicating bottlenecks

  • Use Topic Exchanges for Hierarchical Tasks - Cleaner routing than multiple direct exchanges

  • Test Routing Configuration - Validate routes before deploying to production

  • Document Routing Logic - Especially for complex pattern-based routing

  • Use Priority Sparingly - Overuse defeats the purpose and adds overhead

  • Configure Prefetch Per Worker Type - Optimize based on task characteristics

  • Plan for Scaling - Design routing to allow independent queue scaling

Requirements

  • Celery 5.0+

  • Message broker (RabbitMQ 3.8+ or Redis 6.0+)

  • Python 3.8+

  • kombu library (included with Celery)

  • Environment variables:

  • CELERY_BROKER_URL (required)

  • CELERY_RESULT_BACKEND (optional)

  • For RabbitMQ priority queues: RabbitMQ 3.5+

  • For testing scripts: netcat/telnet for connectivity checks

Progressive Disclosure

For advanced routing patterns, see:

  • examples/priority-queue-setup.md

  • Priority queue implementation

  • examples/topic-routing.md

  • Topic exchange patterns

  • examples/worker-queue-assignment.md

  • Worker pool strategies

Troubleshooting

Queue Not Receiving Tasks

  • Check routing configuration matches task name

  • Verify queue declaration in CELERY_QUEUES

  • Ensure workers are listening to correct queues

  • Check broker connectivity with test script

Priority Not Working

  • Verify x-max-priority set on queue (RabbitMQ only)

  • Check tasks are setting priority correctly

  • Confirm workers consuming from priority queue

  • Redis: Implement separate high/low priority queues

Worker Not Processing Tasks

  • Verify worker queue list matches routed queues

  • Check prefetch_multiplier isn't too low

  • Ensure no task failures blocking queue

  • Monitor worker logs for errors

Plugin: celery Version: 1.0.0 Last Updated: 2025-11-16

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Coding

document-parsers

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

model-routing-patterns

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

react-email-templates

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

ai-content-generation

No summary provided by upstream source.

Repository SourceNeeds Review