SQLAlchemy 2.0+ Skill
Quick Start
Basic Setup
from sqlalchemy.ext.asyncio import AsyncAttrs, async_sessionmaker, create_async_engine, AsyncSession from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column import asyncio
Base class for models
class Base(AsyncAttrs, DeclarativeBase): pass
Async engine
engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")
Session factory
async_session = async_sessionmaker(engine, expire_on_commit=False)
Example model
class User(Base): tablename = "users"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(50))
email: Mapped[str] = mapped_column(String(100))
Basic CRUD Operations
async def create_user(name: str, email: str) -> User: async with async_session() as session: async with session.begin(): user = User(name=name, email=email) session.add(user) await session.flush() # Get the ID return user
async def get_user(user_id: int) -> User | None: async with async_session() as session: result = await session.execute(select(User).where(User.id == user_id)) return result.scalar_one_or_none()
async def update_user_email(user_id: int, new_email: str) -> bool: async with async_session() as session: result = await session.execute( update(User).where(User.id == user_id).values(email=new_email) ) await session.commit() return result.rowcount > 0
Common Patterns
Models
Annotated Type-Safe Models (Recommended)
from typing_extensions import Annotated from typing import List, Optional
Reusable column types
intpk = Annotated[int, mapped_column(primary_key=True)] str50 = Annotated[str, mapped_column(String(50))] created_at = Annotated[datetime, mapped_column(insert_default=func.now())]
class Post(Base): tablename = "posts"
id: Mapped[intpk]
title: Mapped[str50]
content: Mapped[str] = mapped_column(Text)
author_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
created: Mapped[created_at]
# Relationships
author: Mapped["User"] = relationship(back_populates="posts")
tags: Mapped[List["Tag"]] = relationship(secondary="post_tags")
Classic Style Models
class Post(Base): tablename = "posts"
id = mapped_column(Integer, primary_key=True)
title = mapped_column(String(50))
content = mapped_column(Text)
author_id = mapped_column(ForeignKey("users.id"))
author = relationship("User", back_populates="posts")
Relationships
One-to-Many
class User(Base): tablename = "users"
id: Mapped[int] = mapped_column(primary_key=True)
posts: Mapped[List["Post"]] = relationship(
back_populates="author",
cascade="all, delete-orphan"
)
class Post(Base): tablename = "posts"
id: Mapped[int] = mapped_column(primary_key=True)
author_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
author: Mapped["User"] = relationship(back_populates="posts")
Many-to-Many
association_table = Table( "post_tags", Base.metadata, Column("post_id", ForeignKey("posts.id"), primary_key=True), Column("tag_id", ForeignKey("tags.id"), primary_key=True) )
class Post(Base): tablename = "posts"
id: Mapped[int] = mapped_column(primary_key=True)
tags: Mapped[List["Tag"]] = relationship(
secondary=association_table,
back_populates="posts"
)
class Tag(Base): tablename = "tags"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(50), unique=True)
posts: Mapped[List["Post"]] = relationship(
secondary=association_table,
back_populates="tags"
)
Queries
Basic Select
from sqlalchemy import select, and_, or_
Get all users
async def get_all_users(): async with async_session() as session: result = await session.execute(select(User)) return result.scalars().all()
Filter with conditions
async def get_users_by_name(name: str): async with async_session() as session: stmt = select(User).where(User.name.ilike(f"%{name}%")) result = await session.execute(stmt) return result.scalars().all()
Complex conditions
async def search_users(name: str = None, email: str = None): async with async_session() as session: conditions = [] if name: conditions.append(User.name.ilike(f"%{name}%")) if email: conditions.append(User.email.ilike(f"%{email}%"))
if conditions:
stmt = select(User).where(and_(*conditions))
else:
stmt = select(User)
result = await session.execute(stmt)
return result.scalars().all()
Relationship Loading
from sqlalchemy.orm import selectinload, joinedload
Eager load relationships
async def get_posts_with_author(): async with async_session() as session: stmt = select(Post).options(selectinload(Post.author)) result = await session.execute(stmt) return result.scalars().all()
Joined loading for single relationships
async def get_post_with_tags(post_id: int): async with async_session() as session: stmt = select(Post).options( joinedload(Post.author), selectinload(Post.tags) ).where(Post.id == post_id) result = await session.execute(stmt) return result.scalar_one_or_none()
Pagination
async def get_posts_paginated(page: int, size: int): async with async_session() as session: offset = (page - 1) * size stmt = select(Post).offset(offset).limit(size).order_by(Post.created.desc()) result = await session.execute(stmt) return result.scalars().all()
Aggregations
from sqlalchemy import func
async def get_user_post_count(): async with async_session() as session: stmt = ( select(User.name, func.count(Post.id).label("post_count")) .join(Post) .group_by(User.id, User.name) .order_by(func.count(Post.id).desc()) ) result = await session.execute(stmt) return result.all()
Sessions Management
Context Manager Pattern
async def create_post(title: str, content: str, author_id: int): async with async_session() as session: async with session.begin(): post = Post(title=title, content=content, author_id=author_id) session.add(post) return post
Dependency Injection (FastAPI)
from fastapi import Depends
async def get_db_session(): async with async_session() as session: try: yield session finally: await session.close()
async def create_user_endpoint( user_data: UserCreate, session: AsyncSession = Depends(get_db_session) ): user = User(**user_data.dict()) session.add(user) await session.commit() await session.refresh(user) return user
Scoped Sessions
from sqlalchemy.ext.asyncio import async_scoped_session import asyncio
Create scoped session
async_session_scope = async_scoped_session( async_sessionmaker(engine, expire_on_commit=False), scopefunc=asyncio.current_task )
Use in application
async def some_function(): session = async_session_scope() # Use session normally await session.commit()
Advanced Patterns
Write-Only Relationships (Memory Efficient)
from sqlalchemy.orm import WriteOnlyMapped
class User(Base): tablename = "users"
id: Mapped[int] = mapped_column(primary_key=True)
posts: WriteOnlyMapped["Post"] = relationship()
async def get_user_posts(user_id: int): async with async_session() as session: user = await session.get(User, user_id) if user: # Explicit select for collection stmt = select(Post).where(Post.author_id == user_id) result = await session.execute(stmt) return result.scalars().all() return []
Custom Session Classes
class AsyncSessionWithDefaults(AsyncSession): async def execute_with_defaults(self, statement, **kwargs): # Add default options return await self.execute(statement, **kwargs)
Use custom session
async_session = async_sessionmaker( engine, class_=AsyncSessionWithDefaults, expire_on_commit=False )
Connection Routing
class RoutingSession(Session): def get_bind(self, mapper=None, clause=None, **kw): if mapper and issubclass(mapper.class_, ReadOnlyModel): return read_engine return write_engine
class AsyncRoutingSession(AsyncSession): sync_session_class = RoutingSession
Raw SQL
from sqlalchemy import text
async def run_raw_sql(): async with async_session() as session: result = await session.execute(text("SELECT COUNT(*) FROM users")) count = result.scalar() return count
async def run_parameterized_query(user_id: int): async with async_session() as session: stmt = text("SELECT * FROM posts WHERE author_id = :user_id") result = await session.execute(stmt, {"user_id": user_id}) return result.fetchall()
Performance Tips
-
Use selectinload for collections: More efficient than lazy loading
-
Batch operations: Use add_all() for bulk inserts
-
Connection pooling: Configure pool size based on load
-
Index columns: Add indexes for frequently queried columns
-
Use streaming: For large result sets, use stream()
Streaming large results
async def process_all_users(): async with async_session() as session: result = await session.stream(select(User)) async for user in result.scalars(): # Process user without loading all into memory await process_user(user)
Requirements
pip install sqlalchemy[asyncio] # Core SQLAlchemy pip install asyncpg # PostgreSQL async driver
or
pip install aiosqlite # SQLite async driver
or
pip install aiomysql # MySQL async driver
Database URLs
-
PostgreSQL: postgresql+asyncpg://user:pass@localhost/db
-
SQLite: sqlite+aiosqlite:///database.db
-
MySQL: mysql+aiomysql://user:pass@localhost/db
Migration Integration
Use Alembic for database migrations:
Generate migration
alembic revision --autogenerate -m "Add users table"
Apply migrations
alembic upgrade head