fastapi-async-patterns

FastAPI Async Patterns

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "fastapi-async-patterns" with this command: npx skills add thebushidocollective/han/thebushidocollective-han-fastapi-async-patterns

FastAPI Async Patterns

Master async patterns in FastAPI for building high-performance, concurrent APIs with optimal resource usage.

Basic Async Route Handlers

Understanding async vs sync endpoints in FastAPI.

from fastapi import FastAPI import time import asyncio

app = FastAPI()

Sync endpoint (blocks the event loop)

@app.get('/sync') def sync_endpoint(): time.sleep(1) # Blocks the entire server return {'message': 'Completed after 1 second'}

Async endpoint (non-blocking)

@app.get('/async') async def async_endpoint(): await asyncio.sleep(1) # Other requests can be handled return {'message': 'Completed after 1 second'}

CPU-bound work (use sync)

@app.get('/cpu-intensive') def cpu_intensive(): result = sum(i * i for i in range(10000000)) return {'result': result}

I/O-bound work (use async)

@app.get('/io-intensive') async def io_intensive(): async with httpx.AsyncClient() as client: response = await client.get('https://api.example.com/data') return response.json()

Async Database Operations

Async database patterns with popular ORMs and libraries.

from fastapi import FastAPI, Depends, HTTPException from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine from sqlalchemy.orm import sessionmaker from sqlalchemy import select import asyncpg from motor.motor_asyncio import AsyncIOMotorClient from tortoise import Tortoise from tortoise.contrib.fastapi import register_tortoise

app = FastAPI()

SQLAlchemy async setup

DATABASE_URL = 'postgresql+asyncpg://user:pass@localhost/db' engine = create_async_engine(DATABASE_URL, echo=True, future=True) AsyncSessionLocal = sessionmaker( engine, class_=AsyncSession, expire_on_commit=False )

async def get_db() -> AsyncSession: async with AsyncSessionLocal() as session: try: yield session await session.commit() except Exception: await session.rollback() raise

@app.get('/users/{user_id}') async def get_user(user_id: int, db: AsyncSession = Depends(get_db)): result = await db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if not user: raise HTTPException(status_code=404, detail='User not found') return user

Direct asyncpg (lower level, faster)

async def get_asyncpg_pool(): pool = await asyncpg.create_pool( 'postgresql://user:pass@localhost/db', min_size=10, max_size=20 ) try: yield pool finally: await pool.close()

@app.get('/users-fast/{user_id}') async def get_user_fast(user_id: int, pool = Depends(get_asyncpg_pool)): async with pool.acquire() as conn: row = await conn.fetchrow( 'SELECT * FROM users WHERE id = $1', user_id ) if not row: raise HTTPException(status_code=404, detail='User not found') return dict(row)

MongoDB with Motor

mongo_client = AsyncIOMotorClient('mongodb://localhost:27017') db = mongo_client.mydatabase

@app.get('/documents/{doc_id}') async def get_document(doc_id: str): document = await db.collection.find_one({'_id': doc_id}) if not document: raise HTTPException(status_code=404, detail='Document not found') return document

@app.post('/documents') async def create_document(data: dict): result = await db.collection.insert_one(data) return {'id': str(result.inserted_id)}

Tortoise ORM async

register_tortoise( app, db_url='postgres://user:pass@localhost/db', modules={'models': ['app.models']}, generate_schemas=True, add_exception_handlers=True, )

from tortoise.models import Model from tortoise import fields

class UserModel(Model): id = fields.IntField(pk=True) name = fields.CharField(max_length=255) email = fields.CharField(max_length=255)

@app.get('/tortoise-users/{user_id}') async def get_tortoise_user(user_id: int): user = await UserModel.get_or_none(id=user_id) if not user: raise HTTPException(status_code=404, detail='User not found') return user

Background Tasks

Fire-and-forget tasks without blocking the response.

from fastapi import BackgroundTasks, FastAPI import asyncio from datetime import datetime

app = FastAPI()

Simple background task

async def send_email(email: str, message: str): await asyncio.sleep(2) # Simulate email sending print(f'Email sent to {email}: {message}')

@app.post('/send-email') async def send_email_endpoint( email: str, message: str, background_tasks: BackgroundTasks ): background_tasks.add_task(send_email, email, message) return {'status': 'Email will be sent in background'}

Multiple background tasks

async def log_activity(user_id: int, action: str): await asyncio.sleep(0.5) print(f'[{datetime.now()}] User {user_id} performed: {action}')

async def update_analytics(action: str): await asyncio.sleep(1) print(f'Analytics updated for action: {action}')

@app.post('/users/{user_id}/action') async def perform_action( user_id: int, action: str, background_tasks: BackgroundTasks ): # Add multiple tasks background_tasks.add_task(log_activity, user_id, action) background_tasks.add_task(update_analytics, action) return {'status': 'Action logged'}

Background cleanup

async def cleanup_temp_files(file_path: str): await asyncio.sleep(60) # Wait before cleanup import os if os.path.exists(file_path): os.remove(file_path) print(f'Cleaned up: {file_path}')

@app.post('/upload') async def upload_file( file: UploadFile, background_tasks: BackgroundTasks ): temp_path = f'/tmp/{file.filename}' with open(temp_path, 'wb') as f: content = await file.read() f.write(content)

# Schedule cleanup
background_tasks.add_task(cleanup_temp_files, temp_path)
return {'filename': file.filename, 'path': temp_path}

WebSocket Handling

Real-time bidirectional communication patterns.

from fastapi import WebSocket, WebSocketDisconnect, Depends from typing import List import json

app = FastAPI()

Simple WebSocket

@app.websocket('/ws') async def websocket_endpoint(websocket: WebSocket): await websocket.accept() try: while True: data = await websocket.receive_text() await websocket.send_text(f'Echo: {data}') except WebSocketDisconnect: print('Client disconnected')

WebSocket with authentication

async def get_current_user_ws(websocket: WebSocket): token = websocket.query_params.get('token') if not token or not verify_token(token): await websocket.close(code=1008) # Policy violation raise HTTPException(status_code=401, detail='Unauthorized') return decode_token(token)

@app.websocket('/ws/authenticated') async def authenticated_websocket( websocket: WebSocket, user = Depends(get_current_user_ws) ): await websocket.accept() try: await websocket.send_text(f'Welcome {user["name"]}') while True: data = await websocket.receive_text() await websocket.send_text(f'{user["name"]}: {data}') except WebSocketDisconnect: print(f'User {user["name"]} disconnected')

Broadcasting to multiple connections

class ConnectionManager: def init(self): self.active_connections: List[WebSocket] = []

async def connect(self, websocket: WebSocket):
    await websocket.accept()
    self.active_connections.append(websocket)

def disconnect(self, websocket: WebSocket):
    self.active_connections.remove(websocket)

async def send_personal_message(self, message: str, websocket: WebSocket):
    await websocket.send_text(message)

async def broadcast(self, message: str):
    for connection in self.active_connections:
        await connection.send_text(message)

manager = ConnectionManager()

@app.websocket('/ws/chat/{client_id}') async def chat_endpoint(websocket: WebSocket, client_id: str): await manager.connect(websocket) await manager.broadcast(f'Client {client_id} joined the chat') try: while True: data = await websocket.receive_text() await manager.broadcast(f'Client {client_id}: {data}') except WebSocketDisconnect: manager.disconnect(websocket) await manager.broadcast(f'Client {client_id} left the chat')

WebSocket with JSON messages

@app.websocket('/ws/json') async def json_websocket(websocket: WebSocket): await websocket.accept() try: while True: data = await websocket.receive_json() message_type = data.get('type')

        if message_type == 'ping':
            await websocket.send_json({'type': 'pong'})
        elif message_type == 'message':
            await websocket.send_json({
                'type': 'response',
                'data': f'Received: {data.get("content")}'
            })
except WebSocketDisconnect:
    print('Client disconnected')

Server-Sent Events (SSE)

One-way streaming from server to client.

from fastapi import FastAPI from sse_starlette.sse import EventSourceResponse import asyncio

app = FastAPI()

@app.get('/sse') async def sse_endpoint(): async def event_generator(): for i in range(10): await asyncio.sleep(1) yield { 'event': 'message', 'data': f'Message {i}' }

return EventSourceResponse(event_generator())

SSE with real-time updates

@app.get('/sse/updates') async def sse_updates(): async def update_generator(): while True: # Simulate fetching updates await asyncio.sleep(2) update = await fetch_latest_update() yield { 'event': 'update', 'data': json.dumps(update) }

return EventSourceResponse(update_generator())

SSE with heartbeat

@app.get('/sse/heartbeat') async def sse_heartbeat(): async def heartbeat_generator(): try: while True: await asyncio.sleep(30) yield { 'event': 'heartbeat', 'data': datetime.now().isoformat() } except asyncio.CancelledError: print('SSE connection closed')

return EventSourceResponse(heartbeat_generator())

Streaming Responses

Stream large files or generated content.

from fastapi import FastAPI from fastapi.responses import StreamingResponse import io import csv

app = FastAPI()

Stream large file

@app.get('/download/{filename}') async def download_file(filename: str): async def file_stream(): with open(f'/data/{filename}', 'rb') as f: while chunk := f.read(8192): yield chunk

return StreamingResponse(
    file_stream(),
    media_type='application/octet-stream',
    headers={'Content-Disposition': f'attachment; filename={filename}'}
)

Stream generated CSV

@app.get('/export/users') async def export_users(): async def csv_stream(): output = io.StringIO() writer = csv.writer(output)

    # Write header
    writer.writerow(['ID', 'Name', 'Email'])
    yield output.getvalue()
    output.truncate(0)
    output.seek(0)

    # Stream users in batches
    offset = 0
    batch_size = 100
    while True:
        users = await fetch_users_batch(offset, batch_size)
        if not users:
            break

        for user in users:
            writer.writerow([user.id, user.name, user.email])
            yield output.getvalue()
            output.truncate(0)
            output.seek(0)

        offset += batch_size

return StreamingResponse(
    csv_stream(),
    media_type='text/csv',
    headers={'Content-Disposition': 'attachment; filename=users.csv'}
)

Stream generated content

@app.get('/generate/report') async def generate_report(): async def report_stream(): yield b'<html><body><h1>Report</h1>'

    for section in ['users', 'orders', 'analytics']:
        await asyncio.sleep(0.5)  # Simulate processing
        data = await fetch_section_data(section)
        yield f'&#x3C;h2>{section.title()}&#x3C;/h2>'.encode()
        yield f'&#x3C;pre>{data}&#x3C;/pre>'.encode()

    yield b'&#x3C;/body>&#x3C;/html>'

return StreamingResponse(report_stream(), media_type='text/html')

Concurrent Request Handling

Parallel processing patterns for multiple operations.

from fastapi import FastAPI import asyncio import httpx

app = FastAPI()

Parallel API calls

@app.get('/aggregate/user/{user_id}') async def aggregate_user_data(user_id: int): async with httpx.AsyncClient() as client: # Fetch from multiple sources in parallel profile_task = client.get(f'https://api.example.com/users/{user_id}') posts_task = client.get(f'https://api.example.com/users/{user_id}/posts') comments_task = client.get(f'https://api.example.com/users/{user_id}/comments')

    profile, posts, comments = await asyncio.gather(
        profile_task,
        posts_task,
        comments_task
    )

    return {
        'profile': profile.json(),
        'posts': posts.json(),
        'comments': comments.json()
    }

Parallel database queries

@app.get('/dashboard') async def get_dashboard(db: AsyncSession = Depends(get_db)): # Execute multiple queries in parallel users_query = db.execute(select(User).limit(10)) orders_query = db.execute(select(Order).limit(10)) stats_query = db.execute(select(func.count(User.id)))

users, orders, stats = await asyncio.gather(
    users_query,
    orders_query,
    stats_query
)

return {
    'users': users.scalars().all(),
    'orders': orders.scalars().all(),
    'total_users': stats.scalar()
}

Race condition (first to complete wins)

@app.get('/fastest-price/{product_id}') async def get_fastest_price(product_id: str): async with httpx.AsyncClient() as client: tasks = [ client.get(f'https://store1.com/price/{product_id}'), client.get(f'https://store2.com/price/{product_id}'), client.get(f'https://store3.com/price/{product_id}') ]

    done, pending = await asyncio.wait(
        tasks,
        return_when=asyncio.FIRST_COMPLETED
    )

    # Cancel pending requests
    for task in pending:
        task.cancel()

    result = done.pop().result()
    return result.json()

Async Context Managers

Resource management with async context managers.

from contextlib import asynccontextmanager from fastapi import FastAPI import asyncio

Async context manager for lifespan events

@asynccontextmanager async def lifespan(app: FastAPI): # Startup print('Starting up...') db_pool = await create_db_pool() redis_client = await create_redis_client()

# Store in app state
app.state.db_pool = db_pool
app.state.redis = redis_client

yield

# Shutdown
print('Shutting down...')
await db_pool.close()
await redis_client.close()

app = FastAPI(lifespan=lifespan)

Custom async context manager

class AsyncDatabaseSession: def init(self, pool): self.pool = pool self.conn = None

async def __aenter__(self):
    self.conn = await self.pool.acquire()
    return self.conn

async def __aexit__(self, exc_type, exc_val, exc_tb):
    await self.pool.release(self.conn)
    if exc_type is not None:
        # Handle exception
        await self.conn.rollback()
    return False

@app.get('/data') async def get_data(): async with AsyncDatabaseSession(app.state.db_pool) as conn: result = await conn.fetch('SELECT * FROM data') return result

Connection Pooling

Efficient connection management for databases and HTTP clients.

from fastapi import FastAPI, Depends import asyncpg import httpx from typing import AsyncGenerator

app = FastAPI()

Database connection pool

class DatabasePool: def init(self): self.pool = None

async def create_pool(self):
    self.pool = await asyncpg.create_pool(
        'postgresql://user:pass@localhost/db',
        min_size=10,
        max_size=20,
        command_timeout=60,
        max_queries=50000,
        max_inactive_connection_lifetime=300
    )

async def close_pool(self):
    await self.pool.close()

async def get_connection(self):
    async with self.pool.acquire() as connection:
        yield connection

db_pool = DatabasePool()

@app.on_event('startup') async def startup(): await db_pool.create_pool()

@app.on_event('shutdown') async def shutdown(): await db_pool.close_pool()

@app.get('/users') async def get_users(conn = Depends(db_pool.get_connection)): rows = await conn.fetch('SELECT * FROM users') return [dict(row) for row in rows]

HTTP client pool

class HTTPClientPool: def init(self): self.client = None

async def get_client(self) -> AsyncGenerator[httpx.AsyncClient, None]:
    if self.client is None:
        self.client = httpx.AsyncClient(
            limits=httpx.Limits(max_keepalive_connections=20, max_connections=100),
            timeout=httpx.Timeout(10.0)
        )
    yield self.client

async def close(self):
    if self.client:
        await self.client.aclose()

http_pool = HTTPClientPool()

@app.get('/external-api') async def call_external_api(client: httpx.AsyncClient = Depends(http_pool.get_client)): response = await client.get('https://api.example.com/data') return response.json()

Performance Optimization

Async patterns for optimal performance.

from fastapi import FastAPI import asyncio from functools import lru_cache

app = FastAPI()

Cache expensive async operations

from aiocache import Cache from aiocache.serializers import JsonSerializer

cache = Cache(Cache.MEMORY, serializer=JsonSerializer())

@app.get('/expensive-data/{key}') async def get_expensive_data(key: str): # Check cache first cached = await cache.get(key) if cached: return {'data': cached, 'cached': True}

# Expensive operation
await asyncio.sleep(2)
data = compute_expensive_result(key)

# Store in cache
await cache.set(key, data, ttl=300)
return {'data': data, 'cached': False}

Batch operations

@app.post('/users/batch') async def create_users_batch(users: List[UserCreate], db = Depends(get_db)): # Create users in batch (more efficient than one-by-one) user_objects = [User(**user.dict()) for user in users] db.add_all(user_objects) await db.flush() return user_objects

Debouncing with asyncio

class Debouncer: def init(self, delay: float): self.delay = delay self.task = None

async def debounce(self, coro):
    if self.task:
        self.task.cancel()

    async def delayed():
        await asyncio.sleep(self.delay)
        await coro

    self.task = asyncio.create_task(delayed())
    await self.task

debouncer = Debouncer(delay=1.0)

Prefetching related data

@app.get('/posts/{post_id}') async def get_post_with_relations(post_id: int, db = Depends(get_db)): # Fetch post and related data in parallel post_task = db.get(Post, post_id) comments_task = db.execute( select(Comment).where(Comment.post_id == post_id) ) author_task = db.execute( select(User).where(User.id == Post.author_id) )

post, comments_result, author_result = await asyncio.gather(
    post_task, comments_task, author_task
)

return {
    'post': post,
    'comments': comments_result.scalars().all(),
    'author': author_result.scalar_one()
}

When to Use This Skill

Use fastapi-async-patterns when:

  • Building high-throughput APIs that handle many concurrent requests

  • Working with I/O-bound operations (database, external APIs, file operations)

  • Implementing real-time features (WebSockets, SSE)

  • Processing multiple operations in parallel

  • Streaming large datasets or files

  • Building microservices that communicate with other services

  • Optimizing API response times and resource usage

  • Handling background tasks without blocking responses

FastAPI Async Best Practices

  • Use async for I/O - Always use async for database, HTTP requests, and file operations

  • Avoid blocking calls - Never use blocking calls in async functions (time.sleep, requests library)

  • Connection pooling - Use connection pools for databases and HTTP clients

  • Proper cleanup - Always clean up resources with try/finally or async context managers

  • Concurrent operations - Use asyncio.gather for parallel operations when possible

  • Background tasks - Use BackgroundTasks for fire-and-forget operations

  • Stream large data - Use StreamingResponse for large files or generated content

  • Timeout handling - Set timeouts on all external calls to prevent hanging

  • Error propagation - Handle exceptions properly in async code

  • Monitor performance - Use tools like aiomonitor to debug async issues

FastAPI Async Common Pitfalls

  • Blocking the event loop - Using synchronous I/O in async functions kills performance

  • Missing await - Forgetting await on async functions causes coroutine warnings

  • Creating too many tasks - Spawning unlimited tasks can exhaust resources

  • Not closing connections - Resource leaks from unclosed database/HTTP connections

  • Mixing sync and async - Incorrect mixing causes event loop issues

  • Race conditions - Shared state in async code without proper locking

  • Timeout issues - No timeouts on external calls can hang the server

  • Memory leaks - Background tasks that never complete accumulate

  • Error swallowing - Silent failures in background tasks and event handlers

  • Deadlocks - Circular waits in async dependencies or locks

Resources

  • FastAPI Async Documentation

  • Python asyncio Documentation

  • SQLAlchemy Async Guide

  • HTTPX Async Client

  • AsyncPG Documentation

  • Motor (MongoDB Async)

  • WebSockets in FastAPI

  • Server-Sent Events with Starlette

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

android-jetpack-compose

No summary provided by upstream source.

Repository SourceNeeds Review
General

storybook-story-writing

No summary provided by upstream source.

Repository SourceNeeds Review
General

atomic-design-fundamentals

No summary provided by upstream source.

Repository SourceNeeds Review