sqlalchemy-2-async

SQLAlchemy 2.0 Async Patterns ()

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "sqlalchemy-2-async" with this command: npx skills add yonatangross/orchestkit/yonatangross-orchestkit-sqlalchemy-2-async

SQLAlchemy 2.0 Async Patterns ()

Modern async database patterns with SQLAlchemy 2.0, AsyncSession, and FastAPI integration.

Overview

  • Building async FastAPI applications with database access

  • Implementing async repository patterns

  • Configuring async connection pooling

  • Running concurrent database queries

  • Avoiding N+1 queries in async context

Quick Reference

Engine and Session Factory

from sqlalchemy.ext.asyncio import ( create_async_engine, async_sessionmaker, AsyncSession, )

Create async engine - ONE per application

engine = create_async_engine( "postgresql+asyncpg://user:pass@localhost/db", pool_size=20, max_overflow=10, pool_pre_ping=True, # Verify connections before use pool_recycle=3600, # Recycle connections after 1 hour echo=False, # Set True for SQL logging in dev )

Session factory - use this to create sessions

async_session_factory = async_sessionmaker( engine, class_=AsyncSession, expire_on_commit=False, # Prevent lazy load issues autoflush=False, # Explicit flush control )

FastAPI Dependency Injection

from typing import AsyncGenerator from fastapi import Depends from sqlalchemy.ext.asyncio import AsyncSession

async def get_db() -> AsyncGenerator[AsyncSession, None]: """Dependency that provides async database session.""" async with async_session_factory() as session: try: yield session await session.commit() except Exception: await session.rollback() raise

Usage in route

@router.get("/users/{user_id}") async def get_user( user_id: UUID, db: AsyncSession = Depends(get_db), ) -> UserResponse: result = await db.execute( select(User).where(User.id == user_id) ) user = result.scalar_one_or_none() if not user: raise HTTPException(404, "User not found") return UserResponse.model_validate(user)

Async Model Definition

from sqlalchemy import String, ForeignKey from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship from sqlalchemy.dialects.postgresql import UUID from datetime import datetime, timezone import uuid

class Base(DeclarativeBase): pass

class User(Base): tablename = "users"

id: Mapped[uuid.UUID] = mapped_column(
    UUID(as_uuid=True),
    primary_key=True,
    default=uuid.uuid4,
)
email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
created_at: Mapped[datetime] = mapped_column(default=lambda: datetime.now(timezone.utc))

# Relationship with explicit lazy loading strategy
orders: Mapped[list["Order"]] = relationship(
    back_populates="user",
    lazy="raise",  # Prevent accidental lazy loads - MUST use selectinload
)

class Order(Base): tablename = "orders"

id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True)
user_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("users.id"))
total: Mapped[int]

user: Mapped["User"] = relationship(back_populates="orders", lazy="raise")

Eager Loading (Avoid N+1)

from sqlalchemy.orm import selectinload, joinedload from sqlalchemy import select

async def get_user_with_orders(db: AsyncSession, user_id: UUID) -> User | None: """Load user with orders in single query - NO N+1.""" result = await db.execute( select(User) .options(selectinload(User.orders)) # Eager load orders .where(User.id == user_id) ) return result.scalar_one_or_none()

async def get_users_with_orders(db: AsyncSession, limit: int = 100) -> list[User]: """Load multiple users with orders efficiently.""" result = await db.execute( select(User) .options(selectinload(User.orders)) .limit(limit) ) return list(result.scalars().all())

Bulk Operations ( Optimized)

async def bulk_insert_users(db: AsyncSession, users_data: list[dict]) -> int: """Efficient bulk insert - SQLAlchemy 2.0 uses multi-value INSERT.""" # SQLAlchemy 2.0 automatically batches as single INSERT with multiple VALUES users = [User(**data) for data in users_data] db.add_all(users) await db.flush() # Get IDs without committing return len(users)

async def bulk_insert_chunked( db: AsyncSession, items: list[dict], chunk_size: int = 1000, ) -> int: """Insert large datasets in chunks to manage memory.""" total = 0 for i in range(0, len(items), chunk_size): chunk = items[i:i + chunk_size] db.add_all([Item(**data) for data in chunk]) await db.flush() total += len(chunk) return total

Repository Pattern

from typing import Generic, TypeVar from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession

T = TypeVar("T", bound=Base)

class AsyncRepository(Generic[T]): """Generic async repository for CRUD operations."""

def __init__(self, session: AsyncSession, model: type[T]):
    self.session = session
    self.model = model

async def get(self, id: UUID) -> T | None:
    return await self.session.get(self.model, id)

async def get_many(self, ids: list[UUID]) -> list[T]:
    result = await self.session.execute(
        select(self.model).where(self.model.id.in_(ids))
    )
    return list(result.scalars().all())

async def create(self, **kwargs) -> T:
    instance = self.model(**kwargs)
    self.session.add(instance)
    await self.session.flush()
    return instance

async def update(self, instance: T, **kwargs) -> T:
    for key, value in kwargs.items():
        setattr(instance, key, value)
    await self.session.flush()
    return instance

async def delete(self, instance: T) -> None:
    await self.session.delete(instance)
    await self.session.flush()

Concurrent Queries with TaskGroup

import asyncio

async def get_dashboard_data(db: AsyncSession, user_id: UUID) -> dict: """Run multiple queries concurrently - same session is NOT thread-safe.""" # WRONG: Don't share AsyncSession across tasks # async with asyncio.TaskGroup() as tg: # tg.create_task(db.execute(...)) # NOT SAFE

# CORRECT: Sequential queries with same session
user = await db.get(User, user_id)
orders_result = await db.execute(
    select(Order).where(Order.user_id == user_id).limit(10)
)
stats_result = await db.execute(
    select(func.count(Order.id)).where(Order.user_id == user_id)
)

return {
    "user": user,
    "recent_orders": list(orders_result.scalars().all()),
    "total_orders": stats_result.scalar(),
}

async def get_data_from_multiple_users(user_ids: list[UUID]) -> list[dict]: """Concurrent queries - each task gets its own session.""" async def fetch_user(user_id: UUID) -> dict: async with async_session_factory() as session: user = await session.get(User, user_id) return {"id": user_id, "email": user.email if user else None}

async with asyncio.TaskGroup() as tg:
    tasks = [tg.create_task(fetch_user(uid)) for uid in user_ids]

return [t.result() for t in tasks]

Key Decisions

Decision Recommendation Rationale

Session scope One AsyncSession per task/request SQLAlchemy docs: "AsyncSession per task"

Scoped sessions Avoid for async Maintainers discourage for async code

Lazy loading Use lazy="raise"

  • explicit loads Prevents accidental N+1 in async

Eager loading selectinload for collections Better than joinedload for async

expire_on_commit Set to False

Prevents lazy load errors after commit

Connection pool pool_pre_ping=True

Validates connections before use

Bulk inserts Chunk 1000-10000 rows Memory management for large inserts

Anti-Patterns (FORBIDDEN)

NEVER share AsyncSession across tasks

async with asyncio.TaskGroup() as tg: tg.create_task(session.execute(...)) # RACE CONDITION

NEVER use sync Session in async code

from sqlalchemy.orm import Session session = Session(engine) # BLOCKS EVENT LOOP

NEVER access lazy-loaded relationships without eager loading

user = await session.get(User) orders = user.orders # RAISES if lazy="raise", or BLOCKS if not

NEVER use scoped_session with async

from sqlalchemy.orm import scoped_session ScopedSession = scoped_session(session_factory) # WRONG for async

NEVER forget to handle session lifecycle

session = async_session_factory() result = await session.execute(...)

MISSING: session.close() - connection leak!

NEVER use create_async_engine without pool_pre_ping in production

engine = create_async_engine(url) # May use stale connections

Related Skills

  • asyncio-advanced

  • TaskGroup and structured concurrency patterns

  • alembic-migrations

  • Database migration with async support

  • fastapi-advanced

  • Full FastAPI integration patterns

  • database-schema-designer

  • Schema design best practices

Capability Details

async-session

Keywords: AsyncSession, async_sessionmaker, session factory, connection Solves:

  • How do I create async database sessions?

  • Configure async connection pooling

  • Session lifecycle management

fastapi-integration

Keywords: Depends, dependency injection, get_db, request scope Solves:

  • How do I integrate SQLAlchemy with FastAPI?

  • Request-scoped database sessions

  • Automatic commit/rollback handling

eager-loading

Keywords: selectinload, joinedload, eager load, N+1, relationship Solves:

  • How do I avoid N+1 queries in async?

  • Load relationships efficiently

  • Configure lazy loading behavior

bulk-operations

Keywords: bulk insert, batch, chunk, add_all, performance Solves:

  • How do I insert many rows efficiently?

  • Chunk large inserts for memory

  • SQLAlchemy 2.0 bulk optimizations

repository-pattern

Keywords: repository, CRUD, generic, base repository Solves:

  • How do I implement repository pattern?

  • Generic async CRUD operations

  • Clean architecture with SQLAlchemy

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

responsive-patterns

No summary provided by upstream source.

Repository SourceNeeds Review
General

domain-driven-design

No summary provided by upstream source.

Repository SourceNeeds Review
General

dashboard-patterns

No summary provided by upstream source.

Repository SourceNeeds Review
General

rag-retrieval

No summary provided by upstream source.

Repository SourceNeeds Review