celery-advanced

Advanced Celery Patterns

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "celery-advanced" with this command: npx skills add yonatangross/orchestkit/yonatangross-orchestkit-celery-advanced

Advanced Celery Patterns

Enterprise-grade task orchestration beyond basic background jobs.

Overview

  • Complex multi-step task workflows (ETL pipelines, order processing)

  • Priority-based task processing (premium vs standard users)

  • Rate-limited external API calls (API quotas, throttling)

  • Multi-queue routing (dedicated workers per task type)

  • Production monitoring and observability

  • Task result aggregation and fan-out patterns

Canvas Workflows

Signatures (Task Invocation)

from celery import signature, chain, group, chord

Create a reusable task signature

sig = signature("tasks.process_order", args=[order_id], kwargs={"priority": "high"})

Immutable signature (won't receive results from previous task)

sig = process_order.si(order_id)

Partial signature (curry arguments)

partial_sig = send_email.s(subject="Order Update")

Later: partial_sig.delay(to="user@example.com", body="...")

Chains (Sequential Execution)

from celery import chain

Tasks execute sequentially, passing results

workflow = chain( extract_data.s(source_id), # Returns raw_data transform_data.s(), # Receives raw_data, returns clean_data load_data.s(destination_id), # Receives clean_data ) result = workflow.apply_async()

Access intermediate results

chain_result = result.get() # Final result parent_result = result.parent.get() # Previous task result

Error handling in chains

@celery_app.task(bind=True) def transform_data(self, raw_data): try: return do_transform(raw_data) except TransformError as exc: # Chain stops here, no subsequent tasks run raise self.retry(exc=exc, countdown=60)

Groups (Parallel Execution)

from celery import group

Execute tasks in parallel

parallel = group( process_chunk.s(chunk) for chunk in chunks ) group_result = parallel.apply_async()

Wait for all to complete

results = group_result.get() # List of results

Check completion status

group_result.ready() # All completed? group_result.successful() # All succeeded? group_result.failed() # Any failed?

Iterate as they complete

for result in group_result: if result.ready(): print(f"Completed: {result.get()}")

Chords (Parallel + Callback)

from celery import chord

Parallel execution with callback when all complete

workflow = chord( [process_chunk.s(chunk) for chunk in chunks], aggregate_results.s() # Receives list of all results ) result = workflow.apply_async()

Chord with header and body

header = group(fetch_data.s(url) for url in urls) body = combine_data.s() workflow = chord(header, body)

Error handling: if any header task fails, body won't run

@celery_app.task(bind=True) def aggregate_results(self, results): # results = [result1, result2, ...] return sum(results)

Map and Starmap

Map: apply same task to each item

workflow = process_item.map([item1, item2, item3])

Starmap: unpack args for each call

workflow = send_email.starmap([ ("user1@example.com", "Subject 1"), ("user2@example.com", "Subject 2"), ])

Chunks: split large list into batches

workflow = process_item.chunks(items, batch_size=100)

Priority Queues

Queue Configuration

celery_config.py

from kombu import Queue

celery_app.conf.task_queues = ( Queue("high", routing_key="high"), Queue("default", routing_key="default"), Queue("low", routing_key="low"), )

celery_app.conf.task_default_queue = "default" celery_app.conf.task_default_routing_key = "default"

Priority within queue (requires Redis 5+)

celery_app.conf.broker_transport_options = { "priority_steps": list(range(10)), # 0-9 priority levels "sep": ":", "queue_order_strategy": "priority", }

Task Routing

Route by task name

celery_app.conf.task_routes = { "tasks.critical_task": {"queue": "high"}, "tasks.bulk_": {"queue": "low"}, "tasks.default_": {"queue": "default"}, }

Route dynamically at call time

critical_task.apply_async(args=[data], queue="high", priority=9) bulk_task.apply_async(args=[data], queue="low", priority=1)

Route by task attribute

@celery_app.task(queue="high", priority=8) def premium_user_task(user_id): pass

Worker Configuration

Start workers for specific queues

celery -A app worker -Q high -c 4 --prefetch-multiplier=1 celery -A app worker -Q default -c 8 celery -A app worker -Q low -c 2 --prefetch-multiplier=4

Rate Limiting

Per-Task Rate Limits

@celery_app.task(rate_limit="100/m") # 100 per minute def call_external_api(endpoint): return requests.get(endpoint)

@celery_app.task(rate_limit="10/s") # 10 per second def send_notification(user_id): pass

@celery_app.task(rate_limit="1000/h") # 1000 per hour def bulk_email(batch): pass

Dynamic Rate Limiting

from celery import current_app

Change rate limit at runtime

current_app.control.rate_limit( "tasks.call_external_api", "50/m", # Reduce during high load destination=["worker1@hostname"], )

Custom rate limiter with token bucket

from celery.utils.time import rate from celery_singleton import Singleton

class RateLimitedTask(celery_app.Task): _rate_limit_key = "api:rate_limit"

def __call__(self, *args, **kwargs):
    if not self._acquire_token():
        self.retry(countdown=self._get_backoff())
    return super().__call__(*args, **kwargs)

def _acquire_token(self):
    return redis_client.set(
        self._rate_limit_key,
        "1",
        nx=True,
        ex=1  # 1 second window
    )

Multi-Queue Routing

Router Classes

class TaskRouter: def route_for_task(self, task, args=None, kwargs=None): if task.startswith("tasks.premium"): return {"queue": "premium", "priority": 8} elif task.startswith("tasks.analytics"): return {"queue": "analytics"} elif kwargs and kwargs.get("urgent"): return {"queue": "high"} return {"queue": "default"}

celery_app.conf.task_routes = (TaskRouter(),)

Content-Based Routing

@celery_app.task(bind=True) def process_order(self, order): # Route based on order value if order["total"] > 1000: self.update_state(state="ROUTING", meta={"queue": "premium"}) return chain( verify_inventory.s(order).set(queue="high"), process_payment.s().set(queue="high"), notify_customer.s().set(queue="notifications"), ).apply_async() else: return standard_workflow(order)

Production Monitoring

Flower Dashboard

Install and run Flower

pip install flower celery -A app flower --port=5555 --basic_auth=admin:password

With persistent storage

celery -A app flower --persistent=True --db=flower.db

Custom Events

from celery import signals

@signals.task_prerun.connect def on_task_start(sender, task_id, task, args, kwargs, **_): metrics.counter("task_started", tags={"task": task.name})

@signals.task_postrun.connect def on_task_complete(sender, task_id, task, args, kwargs, retval, state, **_): metrics.counter("task_completed", tags={"task": task.name, "state": state})

@signals.task_failure.connect def on_task_failure(sender, task_id, exception, args, kwargs, traceback, einfo, **_): alerting.send_alert( f"Task {sender.name} failed: {exception}", severity="error" )

Health Checks

from celery import current_app

def celery_health_check(): try: # Check broker connection conn = current_app.connection() conn.ensure_connection(max_retries=3)

    # Check workers responding
    inspector = current_app.control.inspect()
    active_workers = inspector.active()

    if not active_workers:
        return {"status": "unhealthy", "reason": "No active workers"}

    return {
        "status": "healthy",
        "workers": list(active_workers.keys()),
        "active_tasks": sum(len(tasks) for tasks in active_workers.values()),
    }
except Exception as e:
    return {"status": "unhealthy", "reason": str(e)}

Custom Task States

from celery import states

Define custom states

VALIDATING = "VALIDATING" PROCESSING = "PROCESSING" UPLOADING = "UPLOADING"

@celery_app.task(bind=True) def long_running_task(self, data): self.update_state(state=VALIDATING, meta={"step": 1, "total": 3}) validate(data)

self.update_state(state=PROCESSING, meta={"step": 2, "total": 3})
result = process(data)

self.update_state(state=UPLOADING, meta={"step": 3, "total": 3})
upload(result)

return {"status": "complete", "url": result.url}

Query task progress

from celery.result import AsyncResult

result = AsyncResult(task_id) if result.state == PROCESSING: print(f"Step {result.info['step']}/{result.info['total']}")

Base Tasks and Inheritance

from celery import Task

class DatabaseTask(Task): """Base task with database session management.""" _db = None

@property
def db(self):
    if self._db is None:
        self._db = create_session()
    return self._db

def after_return(self, status, retval, task_id, args, kwargs, einfo):
    if self._db:
        self._db.close()
        self._db = None

class RetryableTask(Task): """Base task with exponential backoff retry.""" autoretry_for = (ConnectionError, TimeoutError) max_retries = 5 retry_backoff = True retry_backoff_max = 600 retry_jitter = True

@celery_app.task(base=DatabaseTask) def query_database(query): return query_database.db.execute(query)

@celery_app.task(base=RetryableTask) def call_flaky_api(endpoint): return requests.get(endpoint, timeout=30)

Key Decisions

Decision Recommendation

Workflow type Chain for sequential, Group for parallel, Chord for fan-in

Priority queues 3 queues (high/default/low) for most use cases

Rate limiting Per-task rate_limit for simple, token bucket for complex

Monitoring Flower + custom signals for production

Task routing Content-based router for dynamic routing needs

Worker scaling Separate workers per queue, autoscale with HPA

Error handling Base task with retry + dead letter queue

Anti-Patterns (FORBIDDEN)

NEVER block on results in tasks

@celery_app.task def bad_task(): result = other_task.delay() return result.get() # Blocks worker, causes deadlock!

NEVER use synchronous I/O without timeout

requests.get(url) # Can hang forever

NEVER ignore task acknowledgment

celery_app.conf.task_acks_late = False # Default loses tasks on crash

NEVER skip idempotency for retried tasks

@celery_app.task(max_retries=3) def create_order(order): Order.create(order) # Creates duplicates on retry!

ALWAYS use immutable signatures in chords

chord([task.s(x) for x in items], callback.si()) # si() prevents arg pollution

References

For detailed implementation patterns, see:

  • references/canvas-workflows.md

  • Deep dive on chain/group/chord with error handling

  • references/priority-queue-setup.md

  • Redis priority queue configuration

  • references/rate-limiting-patterns.md

  • Per-task and dynamic rate limiting

  • references/celery-beat-scheduling.md

  • Periodic task configuration

Templates

Production-ready code templates:

  • scripts/celery-config-template.py

  • Complete production Celery configuration

  • scripts/canvas-workflow-template.py

  • ETL pipeline using canvas patterns

  • scripts/priority-worker-template.py

  • Multi-queue worker with per-user rate limiting

Checklists

  • checklists/celery-production-checklist.md
  • Production deployment verification

Examples

  • examples/order-processing-pipeline.md
  • Real-world e-commerce order processing

Related Skills

  • background-jobs

  • Basic Celery and ARQ patterns

  • message-queues

  • RabbitMQ/Kafka integration

  • resilience-patterns

  • Circuit breakers, retries

  • observability-monitoring

  • Metrics and alerting

Capability Details

canvas-workflows

Keywords: chain, group, chord, signature, canvas, workflow Solves:

  • Complex multi-step task pipelines

  • Parallel task execution with aggregation

  • Sequential task dependencies

priority-queues

Keywords: priority, queue, routing, high priority, low priority Solves:

  • Premium user task prioritization

  • Urgent vs batch task handling

  • Multi-queue worker deployment

rate-limiting

Keywords: rate limit, throttle, quota, api limit Solves:

  • External API rate limiting

  • Per-task execution limits

  • Dynamic rate adjustment

task-monitoring

Keywords: flower, monitoring, health check, task state Solves: Production task monitoring, worker health checks, custom task state tracking

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

responsive-patterns

No summary provided by upstream source.

Repository SourceNeeds Review
General

domain-driven-design

No summary provided by upstream source.

Repository SourceNeeds Review
General

dashboard-patterns

No summary provided by upstream source.

Repository SourceNeeds Review
General

rag-retrieval

No summary provided by upstream source.

Repository SourceNeeds Review