SQLAlchemy 2.0 Async Patterns ()
Modern async database patterns with SQLAlchemy 2.0, AsyncSession, and FastAPI integration.
Overview
-
Building async FastAPI applications with database access
-
Implementing async repository patterns
-
Configuring async connection pooling
-
Running concurrent database queries
-
Avoiding N+1 queries in async context
Quick Reference
Engine and Session Factory
from sqlalchemy.ext.asyncio import ( create_async_engine, async_sessionmaker, AsyncSession, )
Create async engine - ONE per application
engine = create_async_engine( "postgresql+asyncpg://user:pass@localhost/db", pool_size=20, max_overflow=10, pool_pre_ping=True, # Verify connections before use pool_recycle=3600, # Recycle connections after 1 hour echo=False, # Set True for SQL logging in dev )
Session factory - use this to create sessions
async_session_factory = async_sessionmaker( engine, class_=AsyncSession, expire_on_commit=False, # Prevent lazy load issues autoflush=False, # Explicit flush control )
FastAPI Dependency Injection
from typing import AsyncGenerator from fastapi import Depends from sqlalchemy.ext.asyncio import AsyncSession
async def get_db() -> AsyncGenerator[AsyncSession, None]: """Dependency that provides async database session.""" async with async_session_factory() as session: try: yield session await session.commit() except Exception: await session.rollback() raise
Usage in route
@router.get("/users/{user_id}") async def get_user( user_id: UUID, db: AsyncSession = Depends(get_db), ) -> UserResponse: result = await db.execute( select(User).where(User.id == user_id) ) user = result.scalar_one_or_none() if not user: raise HTTPException(404, "User not found") return UserResponse.model_validate(user)
Async Model Definition
from sqlalchemy import String, ForeignKey from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship from sqlalchemy.dialects.postgresql import UUID from datetime import datetime, timezone import uuid
class Base(DeclarativeBase): pass
class User(Base): tablename = "users"
id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True),
primary_key=True,
default=uuid.uuid4,
)
email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
created_at: Mapped[datetime] = mapped_column(default=lambda: datetime.now(timezone.utc))
# Relationship with explicit lazy loading strategy
orders: Mapped[list["Order"]] = relationship(
back_populates="user",
lazy="raise", # Prevent accidental lazy loads - MUST use selectinload
)
class Order(Base): tablename = "orders"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True)
user_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("users.id"))
total: Mapped[int]
user: Mapped["User"] = relationship(back_populates="orders", lazy="raise")
Eager Loading (Avoid N+1)
from sqlalchemy.orm import selectinload, joinedload from sqlalchemy import select
async def get_user_with_orders(db: AsyncSession, user_id: UUID) -> User | None: """Load user with orders in single query - NO N+1.""" result = await db.execute( select(User) .options(selectinload(User.orders)) # Eager load orders .where(User.id == user_id) ) return result.scalar_one_or_none()
async def get_users_with_orders(db: AsyncSession, limit: int = 100) -> list[User]: """Load multiple users with orders efficiently.""" result = await db.execute( select(User) .options(selectinload(User.orders)) .limit(limit) ) return list(result.scalars().all())
Bulk Operations ( Optimized)
async def bulk_insert_users(db: AsyncSession, users_data: list[dict]) -> int: """Efficient bulk insert - SQLAlchemy 2.0 uses multi-value INSERT.""" # SQLAlchemy 2.0 automatically batches as single INSERT with multiple VALUES users = [User(**data) for data in users_data] db.add_all(users) await db.flush() # Get IDs without committing return len(users)
async def bulk_insert_chunked( db: AsyncSession, items: list[dict], chunk_size: int = 1000, ) -> int: """Insert large datasets in chunks to manage memory.""" total = 0 for i in range(0, len(items), chunk_size): chunk = items[i:i + chunk_size] db.add_all([Item(**data) for data in chunk]) await db.flush() total += len(chunk) return total
Repository Pattern
from typing import Generic, TypeVar from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession
T = TypeVar("T", bound=Base)
class AsyncRepository(Generic[T]): """Generic async repository for CRUD operations."""
def __init__(self, session: AsyncSession, model: type[T]):
self.session = session
self.model = model
async def get(self, id: UUID) -> T | None:
return await self.session.get(self.model, id)
async def get_many(self, ids: list[UUID]) -> list[T]:
result = await self.session.execute(
select(self.model).where(self.model.id.in_(ids))
)
return list(result.scalars().all())
async def create(self, **kwargs) -> T:
instance = self.model(**kwargs)
self.session.add(instance)
await self.session.flush()
return instance
async def update(self, instance: T, **kwargs) -> T:
for key, value in kwargs.items():
setattr(instance, key, value)
await self.session.flush()
return instance
async def delete(self, instance: T) -> None:
await self.session.delete(instance)
await self.session.flush()
Concurrent Queries with TaskGroup
import asyncio
async def get_dashboard_data(db: AsyncSession, user_id: UUID) -> dict: """Run multiple queries concurrently - same session is NOT thread-safe.""" # WRONG: Don't share AsyncSession across tasks # async with asyncio.TaskGroup() as tg: # tg.create_task(db.execute(...)) # NOT SAFE
# CORRECT: Sequential queries with same session
user = await db.get(User, user_id)
orders_result = await db.execute(
select(Order).where(Order.user_id == user_id).limit(10)
)
stats_result = await db.execute(
select(func.count(Order.id)).where(Order.user_id == user_id)
)
return {
"user": user,
"recent_orders": list(orders_result.scalars().all()),
"total_orders": stats_result.scalar(),
}
async def get_data_from_multiple_users(user_ids: list[UUID]) -> list[dict]: """Concurrent queries - each task gets its own session.""" async def fetch_user(user_id: UUID) -> dict: async with async_session_factory() as session: user = await session.get(User, user_id) return {"id": user_id, "email": user.email if user else None}
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(fetch_user(uid)) for uid in user_ids]
return [t.result() for t in tasks]
Key Decisions
Decision Recommendation Rationale
Session scope One AsyncSession per task/request SQLAlchemy docs: "AsyncSession per task"
Scoped sessions Avoid for async Maintainers discourage for async code
Lazy loading Use lazy="raise"
- explicit loads Prevents accidental N+1 in async
Eager loading selectinload for collections Better than joinedload for async
expire_on_commit Set to False
Prevents lazy load errors after commit
Connection pool pool_pre_ping=True
Validates connections before use
Bulk inserts Chunk 1000-10000 rows Memory management for large inserts
Anti-Patterns (FORBIDDEN)
NEVER share AsyncSession across tasks
async with asyncio.TaskGroup() as tg: tg.create_task(session.execute(...)) # RACE CONDITION
NEVER use sync Session in async code
from sqlalchemy.orm import Session session = Session(engine) # BLOCKS EVENT LOOP
NEVER access lazy-loaded relationships without eager loading
user = await session.get(User) orders = user.orders # RAISES if lazy="raise", or BLOCKS if not
NEVER use scoped_session with async
from sqlalchemy.orm import scoped_session ScopedSession = scoped_session(session_factory) # WRONG for async
NEVER forget to handle session lifecycle
session = async_session_factory() result = await session.execute(...)
MISSING: session.close() - connection leak!
NEVER use create_async_engine without pool_pre_ping in production
engine = create_async_engine(url) # May use stale connections
Related Skills
-
asyncio-advanced
-
TaskGroup and structured concurrency patterns
-
alembic-migrations
-
Database migration with async support
-
fastapi-advanced
-
Full FastAPI integration patterns
-
database-schema-designer
-
Schema design best practices
Capability Details
async-session
Keywords: AsyncSession, async_sessionmaker, session factory, connection Solves:
-
How do I create async database sessions?
-
Configure async connection pooling
-
Session lifecycle management
fastapi-integration
Keywords: Depends, dependency injection, get_db, request scope Solves:
-
How do I integrate SQLAlchemy with FastAPI?
-
Request-scoped database sessions
-
Automatic commit/rollback handling
eager-loading
Keywords: selectinload, joinedload, eager load, N+1, relationship Solves:
-
How do I avoid N+1 queries in async?
-
Load relationships efficiently
-
Configure lazy loading behavior
bulk-operations
Keywords: bulk insert, batch, chunk, add_all, performance Solves:
-
How do I insert many rows efficiently?
-
Chunk large inserts for memory
-
SQLAlchemy 2.0 bulk optimizations
repository-pattern
Keywords: repository, CRUD, generic, base repository Solves:
-
How do I implement repository pattern?
-
Generic async CRUD operations
-
Clean architecture with SQLAlchemy