FastAPI Background Task Processing
This skill provides patterns for background task processing with multiple frameworks: ARQ (recommended for async), Celery, and Dramatiq.
ARQ (Async Redis Queue) - Recommended
Installation
pip install arq
Configuration
app/workers/config.py
from arq.connections import RedisSettings from app.config import get_settings
settings = get_settings()
class WorkerSettings: redis_settings = RedisSettings( host=settings.redis_host, port=settings.redis_port, password=settings.redis_password, database=1 # Separate from cache )
# Job settings
max_jobs = 10
job_timeout = 300 # 5 minutes
keep_result = 3600 # 1 hour
queue_name = "default"
# Cron jobs
cron_jobs = []
Task Definitions
app/workers/tasks.py
from arq import cron from typing import Dict, Any import asyncio
async def send_email(ctx: Dict[str, Any], to: str, subject: str, body: str): """Send email asynchronously.""" email_service = ctx.get("email_service") await email_service.send(to=to, subject=subject, body=body) return {"status": "sent", "to": to}
async def process_upload(ctx: Dict[str, Any], file_id: str, user_id: str): """Process uploaded file (resize, convert, etc.).""" storage = ctx.get("storage") file_data = await storage.get(file_id)
# Process file
processed = await process_file(file_data)
# Save processed file
await storage.put(f"processed/{file_id}", processed)
return {"status": "processed", "file_id": file_id}
async def cleanup_expired(ctx: Dict[str, Any]): """Periodic cleanup of expired data.""" db = ctx.get("db") result = await db.delete_expired() return {"deleted": result.deleted_count}
Cron job example
@cron(hour=2, minute=0) # Run at 2 AM daily async def daily_report(ctx: Dict[str, Any]): """Generate daily report.""" report_service = ctx.get("report_service") await report_service.generate_daily()
Worker Entry Point
app/workers/main.py
from arq import create_pool from arq.connections import RedisSettings from app.workers.config import WorkerSettings from app.workers.tasks import send_email, process_upload, cleanup_expired, daily_report from app.infrastructure.database import init_database from app.services.email import EmailService
async def startup(ctx: Dict[str, Any]): """Worker startup - initialize services.""" await init_database() ctx["email_service"] = EmailService() ctx["db"] = get_db()
async def shutdown(ctx: Dict[str, Any]): """Worker shutdown - cleanup.""" await close_database()
class WorkerSettings(WorkerSettings): functions = [send_email, process_upload, cleanup_expired] cron_jobs = [daily_report] on_startup = startup on_shutdown = shutdown
Run with: arq app.workers.main.WorkerSettings
Enqueueing Tasks from FastAPI
app/dependencies.py
from arq import ArqRedis, create_pool from arq.connections import RedisSettings
async def get_task_queue() -> ArqRedis: return await create_pool(RedisSettings())
app/routes/users.py
from fastapi import Depends from arq import ArqRedis
@router.post("/users/{user_id}/welcome") async def send_welcome_email( user_id: str, queue: ArqRedis = Depends(get_task_queue) ): user = await get_user(user_id)
# Enqueue background task
job = await queue.enqueue_job(
"send_email",
to=user.email,
subject="Welcome!",
body="Thanks for signing up."
)
return {"job_id": job.job_id, "status": "queued"}
@router.post("/uploads") async def upload_file( file: UploadFile, user: User = Depends(get_current_user), queue: ArqRedis = Depends(get_task_queue) ): # Save file file_id = await save_file(file)
# Enqueue processing
await queue.enqueue_job(
"process_upload",
file_id=file_id,
user_id=str(user.id),
_defer_by=5 # Delay 5 seconds
)
return {"file_id": file_id, "status": "processing"}
Celery (Battle-Tested)
Configuration
app/workers/celery_app.py
from celery import Celery from app.config import get_settings
settings = get_settings()
celery_app = Celery( "worker", broker=settings.celery_broker_url, backend=settings.celery_result_backend, include=["app.workers.celery_tasks"] )
celery_app.conf.update( task_serializer="json", accept_content=["json"], result_serializer="json", timezone="UTC", enable_utc=True, task_track_started=True, task_time_limit=300, worker_prefetch_multiplier=1, )
Periodic tasks (Celery Beat)
celery_app.conf.beat_schedule = { "cleanup-every-hour": { "task": "app.workers.celery_tasks.cleanup_expired", "schedule": 3600.0, }, "daily-report": { "task": "app.workers.celery_tasks.generate_daily_report", "schedule": crontab(hour=2, minute=0), }, }
Celery Tasks
app/workers/celery_tasks.py
from app.workers.celery_app import celery_app import asyncio
def run_async(coro): """Helper to run async code in sync Celery tasks.""" loop = asyncio.get_event_loop() return loop.run_until_complete(coro)
@celery_app.task(bind=True, max_retries=3) def send_email(self, to: str, subject: str, body: str): try: run_async(_send_email_async(to, subject, body)) return {"status": "sent", "to": to} except Exception as exc: self.retry(exc=exc, countdown=60)
@celery_app.task def process_upload(file_id: str, user_id: str): run_async(_process_upload_async(file_id, user_id)) return {"status": "processed", "file_id": file_id}
Dramatiq (Modern Celery Alternative)
Configuration
app/workers/dramatiq_app.py
import dramatiq from dramatiq.brokers.redis import RedisBroker from dramatiq.results import Results from dramatiq.results.backends import RedisBackend
redis_broker = RedisBroker(url="redis://localhost:6379/0") result_backend = RedisBackend(url="redis://localhost:6379/1")
redis_broker.add_middleware(Results(backend=result_backend)) dramatiq.set_broker(redis_broker)
Dramatiq Tasks
app/workers/dramatiq_tasks.py
import dramatiq
@dramatiq.actor(max_retries=3, min_backoff=1000) def send_email(to: str, subject: str, body: str): # Sync implementation return {"status": "sent", "to": to}
@dramatiq.actor(time_limit=300000) # 5 min timeout def process_upload(file_id: str, user_id: str): return {"status": "processed", "file_id": file_id}
FastAPI Built-in Background Tasks
For simple fire-and-forget tasks (no persistence):
from fastapi import BackgroundTasks
async def write_log(message: str): with open("log.txt", "a") as f: f.write(f"{message}\n")
@router.post("/log") async def create_log(message: str, background_tasks: BackgroundTasks): background_tasks.add_task(write_log, message) return {"status": "logged"}
Additional Resources
Reference Files
For detailed patterns:
-
references/arq-advanced.md
-
ARQ advanced patterns, retries, priorities
-
references/celery-patterns.md
-
Celery best practices, chains, groups
-
references/monitoring.md
-
Flower, task monitoring
Example Files
Working examples in examples/ :
-
examples/arq_worker.py
-
Complete ARQ worker
-
examples/celery_app.py
-
Celery configuration
-
examples/task_service.py
-
Task enqueueing service