Async SQLAlchemy Patterns
Purpose: Implement production-ready async SQLAlchemy 2.0+ patterns in FastAPI with proper session management, migrations, and performance optimization.
Activation Triggers:
-
Database model implementation
-
Async session configuration
-
Alembic migration setup
-
Query performance optimization
-
Relationship loading issues
-
Connection pool configuration
-
Transaction management
-
Database schema migrations
Key Resources:
-
scripts/setup-alembic.sh
-
Initialize Alembic with async support
-
scripts/generate-migration.sh
-
Create migrations from model changes
-
templates/base_model.py
-
Base model with common patterns
-
templates/session_manager.py
-
Async session factory and dependency
-
templates/alembic.ini
-
Alembic configuration for async
-
examples/user_model.py
-
Complete model with relationships
-
examples/async_context_examples.py
-
Session usage patterns
Core Patterns
- Async Engine and Session Setup
Database Configuration:
app/core/database.py
from sqlalchemy.ext.asyncio import ( AsyncSession, create_async_engine, async_sessionmaker ) from sqlalchemy.orm import declarative_base from app.core.config import settings
Create async engine
engine = create_async_engine( settings.DATABASE_URL, echo=settings.DEBUG, pool_pre_ping=True, # Verify connections before using pool_size=5, # Base number of connections max_overflow=10, # Additional connections when pool is full pool_recycle=3600, # Recycle connections after 1 hour pool_timeout=30, # Wait 30s for available connection )
Create async session factory
AsyncSessionLocal = async_sessionmaker( engine, class_=AsyncSession, expire_on_commit=False, # Keep objects usable after commit autoflush=False, # Manual flush control autocommit=False, # Explicit commits only )
Base = declarative_base()
Dependency Injection Pattern:
app/core/deps.py
from typing import AsyncGenerator from sqlalchemy.ext.asyncio import AsyncSession from app.core.database import AsyncSessionLocal
async def get_db() -> AsyncGenerator[AsyncSession, None]: """ FastAPI dependency for database sessions. Automatically handles cleanup. """ async with AsyncSessionLocal() as session: try: yield session await session.commit() except Exception: await session.rollback() raise finally: await session.close()
- Base Model Pattern
Use template from templates/base_model.py with:
-
UUID primary keys by default
-
Automatic created_at/updated_at timestamps
-
Soft delete support
-
Common query methods
Essential Mixins:
from datetime import datetime from sqlalchemy import DateTime, Boolean, func from sqlalchemy.orm import Mapped, mapped_column
class TimestampMixin: """Auto-managed timestamps""" created_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), server_default=func.now(), nullable=False ) updated_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), server_default=func.now(), onupdate=func.now(), nullable=False )
class SoftDeleteMixin: """Soft delete support""" is_deleted: Mapped[bool] = mapped_column( Boolean, default=False, nullable=False ) deleted_at: Mapped[datetime | None] = mapped_column( DateTime(timezone=True), nullable=True )
- Relationship Loading Strategies
Lazy Loading (Default - N+1 Problem Risk):
Bad: Causes N+1 queries
users = await session.execute(select(User)) for user in users.scalars(): print(user.posts) # Separate query per user!
Eager Loading (Recommended):
from sqlalchemy.orm import selectinload, joinedload
selectinload: Separate query, good for one-to-many
stmt = select(User).options(selectinload(User.posts)) result = await session.execute(stmt) users = result.scalars().all()
joinedload: SQL JOIN, good for many-to-one
stmt = select(Post).options(joinedload(Post.author)) result = await session.execute(stmt) posts = result.scalars().unique().all() # unique() required with joins!
subqueryload: Subquery loading
stmt = select(User).options(subqueryload(User.posts))
Relationship Configuration:
from sqlalchemy.orm import relationship
class User(Base): tablename = "users"
# One-to-many with cascade
posts: Mapped[list["Post"]] = relationship(
back_populates="author",
cascade="all, delete-orphan",
lazy="selectin" # Default eager loading
)
# Many-to-many
roles: Mapped[list["Role"]] = relationship(
secondary="user_roles",
back_populates="users",
lazy="selectin"
)
4. Query Patterns
Basic CRUD:
from sqlalchemy import select, update, delete from sqlalchemy.ext.asyncio import AsyncSession
Create
async def create_user(session: AsyncSession, user_data: dict): user = User(**user_data) session.add(user) await session.commit() await session.refresh(user) return user
Read with filters
async def get_users(session: AsyncSession, skip: int = 0, limit: int = 100): stmt = ( select(User) .where(User.is_deleted == False) .offset(skip) .limit(limit) .order_by(User.created_at.desc()) ) result = await session.execute(stmt) return result.scalars().all()
Update
async def update_user(session: AsyncSession, user_id: int, updates: dict): stmt = ( update(User) .where(User.id == user_id) .values(**updates) .returning(User) ) result = await session.execute(stmt) await session.commit() return result.scalar_one()
Delete
async def delete_user(session: AsyncSession, user_id: int): stmt = delete(User).where(User.id == user_id) await session.execute(stmt) await session.commit()
Complex Queries:
from sqlalchemy import func, and_, or_
Aggregations
stmt = ( select(User.id, func.count(Post.id)) .join(Post) .group_by(User.id) .having(func.count(Post.id) > 5) )
Subqueries
subq = ( select(func.avg(Post.views)) .where(Post.user_id == User.id) .scalar_subquery() ) stmt = select(User).where(User.id.in_( select(Post.user_id).where(Post.views > subq) ))
Window functions
from sqlalchemy import over stmt = select( Post, func.row_number().over( partition_by=Post.user_id, order_by=Post.created_at.desc() ).label('row_num') ).where(over.row_num <= 10)
- Transaction Management
Nested Transactions:
async def transfer_funds( session: AsyncSession, from_account: int, to_account: int, amount: float ): async with session.begin_nested(): # Savepoint # Debit stmt = ( update(Account) .where(Account.id == from_account) .where(Account.balance >= amount) .values(balance=Account.balance - amount) ) result = await session.execute(stmt) if result.rowcount == 0: raise ValueError("Insufficient funds")
# Credit
stmt = (
update(Account)
.where(Account.id == to_account)
.values(balance=Account.balance + amount)
)
await session.execute(stmt)
await session.commit()
Manual Transaction Control:
async def batch_operation(session: AsyncSession, items: list): try: for item in items: session.add(item) if len(session.new) >= 100: await session.flush() # Flush but don't commit
await session.commit()
except Exception as e:
await session.rollback()
raise
6. Alembic Migration Setup
Initialize Alembic with async template
./scripts/setup-alembic.sh
Creates:
- alembic/ directory
- alembic.ini configured for async
- env.py with async support
Generate Migrations:
Auto-generate from model changes
./scripts/generate-migration.sh "add user table"
Review generated migration in alembic/versions/
Always review auto-generated migrations!
Migration Best Practices:
-
Always review auto-generated migrations
-
Use batch operations for large tables
-
Add indexes in separate migrations
-
Include both upgrade and downgrade
-
Test migrations on staging first
Manual Migration Template:
alembic/versions/xxx_add_index.py
from alembic import op import sqlalchemy as sa
def upgrade() -> None: # Use batch for SQLite compatibility with op.batch_alter_table('users') as batch_op: batch_op.create_index( 'ix_users_email', ['email'], unique=True )
def downgrade() -> None: with op.batch_alter_table('users') as batch_op: batch_op.drop_index('ix_users_email')
- Connection Pool Configuration
Production Settings:
For web applications
engine = create_async_engine( DATABASE_URL, pool_size=20, # Concurrent requests max_overflow=40, # Burst capacity pool_recycle=3600, # 1 hour pool_pre_ping=True, # Verify connections pool_timeout=30, # Wait time echo_pool=False, # Pool debug logging )
For background tasks
engine = create_async_engine( DATABASE_URL, pool_size=5, # Fewer connections max_overflow=10, pool_recycle=7200, # 2 hours )
Connection Health Check:
from sqlalchemy import text
async def check_database_health(session: AsyncSession) -> bool: try: await session.execute(text("SELECT 1")) return True except Exception: return False
- Performance Optimization
Bulk Operations:
Bulk insert
async def bulk_create_users(session: AsyncSession, users: list[dict]): stmt = insert(User).values(users) await session.execute(stmt) await session.commit()
Bulk update
async def bulk_update_status(session: AsyncSession, user_ids: list[int]): stmt = ( update(User) .where(User.id.in_(user_ids)) .values(status="active") ) await session.execute(stmt) await session.commit()
Query Result Streaming:
async def stream_large_dataset(session: AsyncSession): stmt = select(User).execution_options(yield_per=100) result = await session.stream(stmt)
async for partition in result.partitions(100):
users = partition.scalars().all()
# Process batch
yield users
Index Optimization:
from sqlalchemy import Index
class User(Base): tablename = "users"
email: Mapped[str] = mapped_column(unique=True, index=True)
__table_args__ = (
Index('ix_user_status_created', 'status', 'created_at'),
Index('ix_user_email_active', 'email', postgresql_where=~is_deleted),
)
Troubleshooting
Common Issues
"Object is not bound to session":
Bad: Object expires after commit
user = await create_user(session, data) print(user.email) # Error if expire_on_commit=True
Fix: Refresh or configure session
await session.refresh(user)
Or: AsyncSessionLocal(expire_on_commit=False)
"N+1 Query Problem":
Enable SQL logging to detect
engine = create_async_engine(url, echo=True)
Fix with eager loading
stmt = select(User).options(selectinload(User.posts))
"DetachedInstanceError":
Bad: Accessing relationship outside session
async with AsyncSessionLocal() as session: user = await session.get(User, user_id) print(user.posts) # Error!
Fix: Load within session or use eager loading
stmt = select(User).options(selectinload(User.posts))
"Connection Pool Exhausted":
Increase pool size or add timeout
engine = create_async_engine( url, pool_size=20, max_overflow=40, pool_timeout=30 )
Resources
Scripts: scripts/ directory contains:
-
setup-alembic.sh
-
Initialize Alembic with async configuration
-
generate-migration.sh
-
Create migrations from model changes
-
test-connection.sh
-
Test database connectivity
-
optimize-queries.sh
-
Analyze and suggest query optimizations
Templates: templates/ directory includes:
-
base_model.py
-
Base model with UUID, timestamps, soft delete
-
session_manager.py
-
Complete session factory and dependencies
-
alembic.ini
-
Production-ready Alembic configuration
-
env.py
-
Alembic async environment template
Examples: examples/ directory provides:
-
user_model.py
-
Full model with relationships and indexes
-
async_context_examples.py
-
Session patterns and best practices
-
query_patterns.py
-
Common query optimization examples
-
migration_examples.py
-
Manual migration patterns
SQLAlchemy Version: 2.0+ Database Support: PostgreSQL, MySQL, SQLite (async drivers) Async Drivers: asyncpg (PostgreSQL), aiomysql (MySQL), aiosqlite (SQLite) Version: 1.0.0