fastapi background tasks

FastAPI Background Task Processing

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "fastapi background tasks" with this command: npx skills add lobbi-docs/claude/lobbi-docs-claude-fastapi-background-tasks

FastAPI Background Task Processing

This skill provides patterns for background task processing with multiple frameworks: ARQ (recommended for async), Celery, and Dramatiq.

ARQ (Async Redis Queue) - Recommended

Installation

pip install arq

Configuration

app/workers/config.py

from arq.connections import RedisSettings from app.config import get_settings

settings = get_settings()

class WorkerSettings: redis_settings = RedisSettings( host=settings.redis_host, port=settings.redis_port, password=settings.redis_password, database=1 # Separate from cache )

# Job settings
max_jobs = 10
job_timeout = 300  # 5 minutes
keep_result = 3600  # 1 hour
queue_name = "default"

# Cron jobs
cron_jobs = []

Task Definitions

app/workers/tasks.py

from arq import cron from typing import Dict, Any import asyncio

async def send_email(ctx: Dict[str, Any], to: str, subject: str, body: str): """Send email asynchronously.""" email_service = ctx.get("email_service") await email_service.send(to=to, subject=subject, body=body) return {"status": "sent", "to": to}

async def process_upload(ctx: Dict[str, Any], file_id: str, user_id: str): """Process uploaded file (resize, convert, etc.).""" storage = ctx.get("storage") file_data = await storage.get(file_id)

# Process file
processed = await process_file(file_data)

# Save processed file
await storage.put(f"processed/{file_id}", processed)

return {"status": "processed", "file_id": file_id}

async def cleanup_expired(ctx: Dict[str, Any]): """Periodic cleanup of expired data.""" db = ctx.get("db") result = await db.delete_expired() return {"deleted": result.deleted_count}

Cron job example

@cron(hour=2, minute=0) # Run at 2 AM daily async def daily_report(ctx: Dict[str, Any]): """Generate daily report.""" report_service = ctx.get("report_service") await report_service.generate_daily()

Worker Entry Point

app/workers/main.py

from arq import create_pool from arq.connections import RedisSettings from app.workers.config import WorkerSettings from app.workers.tasks import send_email, process_upload, cleanup_expired, daily_report from app.infrastructure.database import init_database from app.services.email import EmailService

async def startup(ctx: Dict[str, Any]): """Worker startup - initialize services.""" await init_database() ctx["email_service"] = EmailService() ctx["db"] = get_db()

async def shutdown(ctx: Dict[str, Any]): """Worker shutdown - cleanup.""" await close_database()

class WorkerSettings(WorkerSettings): functions = [send_email, process_upload, cleanup_expired] cron_jobs = [daily_report] on_startup = startup on_shutdown = shutdown

Run with: arq app.workers.main.WorkerSettings

Enqueueing Tasks from FastAPI

app/dependencies.py

from arq import ArqRedis, create_pool from arq.connections import RedisSettings

async def get_task_queue() -> ArqRedis: return await create_pool(RedisSettings())

app/routes/users.py

from fastapi import Depends from arq import ArqRedis

@router.post("/users/{user_id}/welcome") async def send_welcome_email( user_id: str, queue: ArqRedis = Depends(get_task_queue) ): user = await get_user(user_id)

# Enqueue background task
job = await queue.enqueue_job(
    "send_email",
    to=user.email,
    subject="Welcome!",
    body="Thanks for signing up."
)

return {"job_id": job.job_id, "status": "queued"}

@router.post("/uploads") async def upload_file( file: UploadFile, user: User = Depends(get_current_user), queue: ArqRedis = Depends(get_task_queue) ): # Save file file_id = await save_file(file)

# Enqueue processing
await queue.enqueue_job(
    "process_upload",
    file_id=file_id,
    user_id=str(user.id),
    _defer_by=5  # Delay 5 seconds
)

return {"file_id": file_id, "status": "processing"}

Celery (Battle-Tested)

Configuration

app/workers/celery_app.py

from celery import Celery from app.config import get_settings

settings = get_settings()

celery_app = Celery( "worker", broker=settings.celery_broker_url, backend=settings.celery_result_backend, include=["app.workers.celery_tasks"] )

celery_app.conf.update( task_serializer="json", accept_content=["json"], result_serializer="json", timezone="UTC", enable_utc=True, task_track_started=True, task_time_limit=300, worker_prefetch_multiplier=1, )

Periodic tasks (Celery Beat)

celery_app.conf.beat_schedule = { "cleanup-every-hour": { "task": "app.workers.celery_tasks.cleanup_expired", "schedule": 3600.0, }, "daily-report": { "task": "app.workers.celery_tasks.generate_daily_report", "schedule": crontab(hour=2, minute=0), }, }

Celery Tasks

app/workers/celery_tasks.py

from app.workers.celery_app import celery_app import asyncio

def run_async(coro): """Helper to run async code in sync Celery tasks.""" loop = asyncio.get_event_loop() return loop.run_until_complete(coro)

@celery_app.task(bind=True, max_retries=3) def send_email(self, to: str, subject: str, body: str): try: run_async(_send_email_async(to, subject, body)) return {"status": "sent", "to": to} except Exception as exc: self.retry(exc=exc, countdown=60)

@celery_app.task def process_upload(file_id: str, user_id: str): run_async(_process_upload_async(file_id, user_id)) return {"status": "processed", "file_id": file_id}

Dramatiq (Modern Celery Alternative)

Configuration

app/workers/dramatiq_app.py

import dramatiq from dramatiq.brokers.redis import RedisBroker from dramatiq.results import Results from dramatiq.results.backends import RedisBackend

redis_broker = RedisBroker(url="redis://localhost:6379/0") result_backend = RedisBackend(url="redis://localhost:6379/1")

redis_broker.add_middleware(Results(backend=result_backend)) dramatiq.set_broker(redis_broker)

Dramatiq Tasks

app/workers/dramatiq_tasks.py

import dramatiq

@dramatiq.actor(max_retries=3, min_backoff=1000) def send_email(to: str, subject: str, body: str): # Sync implementation return {"status": "sent", "to": to}

@dramatiq.actor(time_limit=300000) # 5 min timeout def process_upload(file_id: str, user_id: str): return {"status": "processed", "file_id": file_id}

FastAPI Built-in Background Tasks

For simple fire-and-forget tasks (no persistence):

from fastapi import BackgroundTasks

async def write_log(message: str): with open("log.txt", "a") as f: f.write(f"{message}\n")

@router.post("/log") async def create_log(message: str, background_tasks: BackgroundTasks): background_tasks.add_task(write_log, message) return {"status": "logged"}

Additional Resources

Reference Files

For detailed patterns:

  • references/arq-advanced.md

  • ARQ advanced patterns, retries, priorities

  • references/celery-patterns.md

  • Celery best practices, chains, groups

  • references/monitoring.md

  • Flower, task monitoring

Example Files

Working examples in examples/ :

  • examples/arq_worker.py

  • Complete ARQ worker

  • examples/celery_app.py

  • Celery configuration

  • examples/task_service.py

  • Task enqueueing service

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

vision-multimodal

No summary provided by upstream source.

Repository SourceNeeds Review
General

design-system

No summary provided by upstream source.

Repository SourceNeeds Review
General

kanban

No summary provided by upstream source.

Repository SourceNeeds Review
General

complex-reasoning

No summary provided by upstream source.

Repository SourceNeeds Review