background-jobs

Background Job Patterns

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "background-jobs" with this command: npx skills add yonatangross/orchestkit/yonatangross-orchestkit-background-jobs

Background Job Patterns

Offload long-running tasks with async job queues.

Overview

  • Long-running tasks (report generation, data processing)

  • Email/notification sending

  • Scheduled/periodic tasks

  • Webhook processing

  • Data export/import pipelines

  • Non-LLM async operations (use LangGraph for LLM workflows)

Tool Selection

Tool Language Best For Complexity

ARQ Python (async) FastAPI, simple jobs Low

Celery Python Complex workflows, enterprise High

RQ Python Simple Redis queues Low

Dramatiq Python Reliable messaging Medium

ARQ (Async Redis Queue)

Setup

backend/app/workers/arq_worker.py

from arq import create_pool from arq.connections import RedisSettings

async def startup(ctx: dict): """Initialize worker resources.""" ctx["db"] = await create_db_pool() ctx["http"] = httpx.AsyncClient()

async def shutdown(ctx: dict): """Cleanup worker resources.""" await ctx["db"].close() await ctx["http"].aclose()

class WorkerSettings: redis_settings = RedisSettings(host="redis", port=6379) functions = [ send_email, generate_report, process_webhook, ] on_startup = startup on_shutdown = shutdown max_jobs = 10 job_timeout = 300 # 5 minutes

Task Definition

from arq import func

async def send_email( ctx: dict, to: str, subject: str, body: str, ) -> dict: """Send email task.""" http = ctx["http"] response = await http.post( "https://api.sendgrid.com/v3/mail/send", json={"to": to, "subject": subject, "html": body}, headers={"Authorization": f"Bearer {SENDGRID_KEY}"}, ) return {"status": response.status_code, "to": to}

async def generate_report( ctx: dict, report_id: str, format: str = "pdf", ) -> dict: """Generate report asynchronously.""" db = ctx["db"] data = await db.fetch_report_data(report_id) pdf_bytes = await render_pdf(data) await db.save_report_file(report_id, pdf_bytes) return {"report_id": report_id, "size": len(pdf_bytes)}

Enqueue from FastAPI

from arq import create_pool from arq.connections import RedisSettings

Dependency

async def get_arq_pool(): return await create_pool(RedisSettings(host="redis"))

@router.post("/api/v1/reports") async def create_report( data: ReportRequest, arq: ArqRedis = Depends(get_arq_pool), ): report = await service.create_report(data)

# Enqueue background job
job = await arq.enqueue_job(
    "generate_report",
    report.id,
    format=data.format,
)

return {"report_id": report.id, "job_id": job.job_id}

@router.get("/api/v1/jobs/{job_id}") async def get_job_status( job_id: str, arq: ArqRedis = Depends(get_arq_pool), ): job = Job(job_id, arq) status = await job.status() result = await job.result() if status == JobStatus.complete else None return {"job_id": job_id, "status": status, "result": result}

Celery (Enterprise)

Setup

backend/app/workers/celery_app.py

from celery import Celery

celery_app = Celery( "orchestkit", broker="redis://redis:6379/0", backend="redis://redis:6379/1", )

celery_app.conf.update( task_serializer="json", accept_content=["json"], result_serializer="json", timezone="UTC", task_track_started=True, task_time_limit=600, # 10 minutes hard limit task_soft_time_limit=540, # 9 minutes soft limit worker_prefetch_multiplier=1, # Fair distribution task_acks_late=True, # Acknowledge after completion task_reject_on_worker_lost=True, )

Task Definition

from celery import shared_task from celery.utils.log import get_task_logger

logger = get_task_logger(name)

@shared_task( bind=True, max_retries=3, default_retry_delay=60, autoretry_for=(ConnectionError, TimeoutError), ) def send_email(self, to: str, subject: str, body: str) -> dict: """Send email with automatic retry.""" try: response = requests.post( "https://api.sendgrid.com/v3/mail/send", json={"to": to, "subject": subject, "html": body}, headers={"Authorization": f"Bearer {SENDGRID_KEY}"}, timeout=30, ) response.raise_for_status() return {"status": "sent", "to": to} except Exception as exc: logger.error(f"Email failed: {exc}") raise self.retry(exc=exc)

@shared_task(bind=True) def generate_report(self, report_id: str) -> dict: """Long-running report generation.""" self.update_state(state="PROGRESS", meta={"step": "fetching"}) data = fetch_report_data(report_id)

self.update_state(state="PROGRESS", meta={"step": "rendering"})
pdf = render_pdf(data)

self.update_state(state="PROGRESS", meta={"step": "saving"})
save_report(report_id, pdf)

return {"report_id": report_id, "size": len(pdf)}

Chains and Groups

from celery import chain, group, chord

Sequential execution

workflow = chain( extract_data.s(source_id), transform_data.s(), load_data.s(destination_id), ) result = workflow.apply_async()

Parallel execution

parallel = group( process_chunk.s(chunk) for chunk in chunks ) result = parallel.apply_async()

Parallel with callback

chord_workflow = chord( [process_chunk.s(chunk) for chunk in chunks], aggregate_results.s(), ) result = chord_workflow.apply_async()

Periodic Tasks (Celery Beat)

from celery.schedules import crontab

celery_app.conf.beat_schedule = { "cleanup-expired-sessions": { "task": "app.workers.tasks.cleanup_sessions", "schedule": crontab(minute=0, hour="*/6"), # Every 6 hours }, "generate-daily-report": { "task": "app.workers.tasks.daily_report", "schedule": crontab(minute=0, hour=2), # 2 AM daily }, "sync-external-data": { "task": "app.workers.tasks.sync_data", "schedule": 300.0, # Every 5 minutes }, }

FastAPI Integration

from fastapi import BackgroundTasks

@router.post("/api/v1/users") async def create_user( data: UserCreate, background_tasks: BackgroundTasks, ): user = await service.create_user(data)

# Simple background task (in-process)
background_tasks.add_task(send_welcome_email, user.email)

return user

For distributed tasks, use ARQ/Celery

@router.post("/api/v1/exports") async def create_export( data: ExportRequest, arq: ArqRedis = Depends(get_arq_pool), ): job = await arq.enqueue_job("export_data", data.dict()) return {"job_id": job.job_id}

Job Status Tracking

from enum import Enum

class JobStatus(Enum): PENDING = "pending" STARTED = "started" PROGRESS = "progress" SUCCESS = "success" FAILURE = "failure" REVOKED = "revoked"

@router.get("/api/v1/jobs/{job_id}") async def get_job(job_id: str): # Celery result = AsyncResult(job_id, app=celery_app) return { "job_id": job_id, "status": result.status, "result": result.result if result.ready() else None, "progress": result.info if result.status == "PROGRESS" else None, }

Anti-Patterns (FORBIDDEN)

NEVER run long tasks synchronously

@router.post("/api/v1/reports") async def create_report(data: ReportRequest): pdf = await generate_pdf(data) # Blocks for minutes! return pdf

NEVER lose jobs on failure

@shared_task def risky_task(): do_work() # No retry, no error handling

NEVER store large results in Redis

@shared_task def process_file(file_id: str) -> bytes: return large_file_bytes # Store in S3/DB instead!

NEVER use BackgroundTasks for distributed work

background_tasks.add_task(long_running_job) # Lost if server restarts

Key Decisions

Decision Recommendation

Simple async ARQ (native async)

Complex workflows Celery (chains, chords)

In-process quick FastAPI BackgroundTasks

LLM workflows LangGraph (not Celery)

Result storage Redis for status, S3/DB for data

Retry strategy Exponential backoff with jitter

Related Skills

  • langgraph-checkpoints

  • LLM workflow persistence

  • resilience-patterns

  • Retry and fallback

  • observability-monitoring

  • Job metrics

Capability Details

arq-tasks

Keywords: arq, async queue, redis queue, background task Solves:

  • How to run async background tasks in FastAPI?

  • Simple Redis job queue

celery-tasks

Keywords: celery, task queue, distributed tasks, worker Solves:

  • Enterprise task queue

  • Complex job workflows

celery-workflows

Keywords: chain, group, chord, celery workflow Solves:

  • Sequential task execution

  • Parallel task processing

periodic-tasks

Keywords: periodic, scheduled, cron, celery beat Solves:

  • Run tasks on schedule

  • Cron-like job scheduling

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

responsive-patterns

No summary provided by upstream source.

Repository SourceNeeds Review
General

domain-driven-design

No summary provided by upstream source.

Repository SourceNeeds Review
General

dashboard-patterns

No summary provided by upstream source.

Repository SourceNeeds Review
General

rag-retrieval

No summary provided by upstream source.

Repository SourceNeeds Review