FastAPI Production Engineering
Complete methodology for building, deploying, and scaling production FastAPI applications. Not a tutorial — a production operating system.
Quick Health Check (/16)
Score 2 points each. Total < 8 = critical work needed.
| Signal | Healthy | Unhealthy |
|---|---|---|
| Type safety | Pydantic v2 models everywhere | dict returns, no validation |
| Error handling | Structured error hierarchy | Bare HTTPException strings |
| Auth | JWT + dependency injection | Manual token parsing |
| Testing | 80%+ coverage, async tests | No tests or sync-only |
| Database | Async ORM, migrations | Raw SQL, no migrations |
| Observability | Structured logging + tracing | print() debugging |
| Deployment | Multi-stage Docker, health checks | uvicorn main:app on bare metal |
| Documentation | Auto-generated, accurate OpenAPI | Default /docs untouched |
Phase 1: Project Architecture
Recommended Structure
src/
├── app/
│ ├── __init__.py
│ ├── main.py # App factory
│ ├── config.py # Pydantic Settings
│ ├── dependencies.py # Shared DI
│ ├── middleware.py # Custom middleware
│ ├── features/
│ │ ├── users/
│ │ │ ├── __init__.py
│ │ │ ├── router.py # Endpoints
│ │ │ ├── schemas.py # Pydantic models
│ │ │ ├── service.py # Business logic
│ │ │ ├── repository.py # Data access
│ │ │ ├── models.py # SQLAlchemy/SQLModel
│ │ │ ├── dependencies.py
│ │ │ └── exceptions.py
│ │ ├── auth/
│ │ ├── orders/
│ │ └── ...
│ ├── core/
│ │ ├── database.py # Engine, session factory
│ │ ├── security.py # JWT, hashing
│ │ ├── errors.py # Error hierarchy
│ │ └── logging.py # Structlog config
│ └── shared/
│ ├── pagination.py
│ ├── filters.py
│ └── responses.py
├── migrations/ # Alembic
├── tests/
│ ├── conftest.py
│ ├── unit/
│ ├── integration/
│ └── e2e/
├── pyproject.toml
├── Dockerfile
└── docker-compose.yml
7 Architecture Rules
- Feature-based modules — group by domain, not by layer
- Router → Service → Repository — strict layering, no skipping
- Dependency injection everywhere — use
Depends()for testability - Pydantic models at boundaries — validate all input AND output
- No business logic in routers — routers are thin, services are thick
- Config via environment — Pydantic Settings with
.envsupport - Async by default — use async def for all I/O-bound operations
Framework Selection Context
# When to choose FastAPI over alternatives
fastapi_is_best_when:
- "You need auto-generated OpenAPI docs"
- "Team knows Python type hints"
- "API-first (no server-rendered HTML as primary)"
- "High concurrency with async I/O"
- "Microservice or API gateway"
consider_alternatives:
django: "Full-featured web app with admin, ORM, auth batteries"
flask: "Simple app, team prefers explicit over magic"
litestar: "Need WebSocket-heavy or more opinionated framework"
hono_or_express: "Team prefers TypeScript"
Phase 2: Configuration & Environment
Pydantic Settings Pattern
from pydantic_settings import BaseSettings
from pydantic import SecretStr, field_validator
from functools import lru_cache
class Settings(BaseSettings):
# App
app_name: str = "MyAPI"
debug: bool = False
environment: str = "production" # development | staging | production
# Server
host: str = "0.0.0.0"
port: int = 8000
workers: int = 4
# Database
database_url: SecretStr # Required — no default
db_pool_size: int = 20
db_max_overflow: int = 10
db_pool_timeout: int = 30
# Auth
jwt_secret: SecretStr # Required
jwt_algorithm: str = "HS256"
jwt_expire_minutes: int = 30
# Redis
redis_url: str = "redis://localhost:6379/0"
# CORS
cors_origins: list[str] = ["http://localhost:3000"]
@field_validator("environment")
@classmethod
def validate_environment(cls, v: str) -> str:
allowed = {"development", "staging", "production"}
if v not in allowed:
raise ValueError(f"environment must be one of {allowed}")
return v
model_config = {"env_file": ".env", "env_file_encoding": "utf-8"}
@lru_cache
def get_settings() -> Settings:
return Settings()
5 Configuration Rules
- Never hardcode secrets — use
SecretStrfor sensitive values - Fail fast — required fields have no defaults; app won't start without them
- Validate at startup — use
@field_validatorfor constraint checking - Cache settings —
@lru_cacheensures single parse - Type everything — no
strfor structured values; use enums, Literal types
Phase 3: Pydantic v2 Mastery
Schema Design Patterns
from pydantic import BaseModel, Field, ConfigDict
from datetime import datetime
from uuid import UUID
# Base with common config
class AppSchema(BaseModel):
model_config = ConfigDict(
from_attributes=True, # ORM mode
str_strip_whitespace=True, # Auto-strip
validate_default=True, # Validate defaults too
)
# Input schemas (what the API accepts)
class UserCreate(AppSchema):
email: str = Field(..., pattern=r"^[\w\.-]+@[\w\.-]+\.\w+$")
name: str = Field(..., min_length=1, max_length=100)
password: str = Field(..., min_length=8, max_length=128)
class UserUpdate(AppSchema):
name: str | None = Field(None, min_length=1, max_length=100)
email: str | None = Field(None, pattern=r"^[\w\.-]+@[\w\.-]+\.\w+$")
# Output schemas (what the API returns)
class UserResponse(AppSchema):
id: UUID
email: str
name: str
created_at: datetime
# Note: password is NEVER in response schema
# List response with pagination
class PaginatedResponse[T](AppSchema):
items: list[T]
total: int
page: int
page_size: int
has_next: bool
8 Pydantic Rules
- Separate Create/Update/Response schemas — never reuse input as output
- Never expose internal fields — no passwords, internal IDs, or debug info in responses
- Use Field() for constraints — min/max length, regex patterns, gt/lt for numbers
- Enable
from_attributes=True— for ORM model → schema conversion - Use generics for wrappers —
PaginatedResponse[T],ApiResponse[T] - Validate at boundaries — request body, query params, path params, headers
- Use computed fields —
@computed_fieldfor derived values - Document with examples —
model_config = {"json_schema_extra": {"examples": [...]}}
Phase 4: Error Handling Architecture
Structured Error Hierarchy
from fastapi import Request
from fastapi.responses import JSONResponse
from starlette.status import (
HTTP_400_BAD_REQUEST, HTTP_401_UNAUTHORIZED,
HTTP_403_FORBIDDEN, HTTP_404_NOT_FOUND,
HTTP_409_CONFLICT, HTTP_422_UNPROCESSABLE_ENTITY,
HTTP_429_TOO_MANY_REQUESTS, HTTP_500_INTERNAL_SERVER_ERROR,
)
class AppError(Exception):
"""Base application error."""
def __init__(
self,
message: str,
code: str,
status_code: int = HTTP_500_INTERNAL_SERVER_ERROR,
details: dict | None = None,
):
self.message = message
self.code = code
self.status_code = status_code
self.details = details or {}
super().__init__(message)
class NotFoundError(AppError):
def __init__(self, resource: str, identifier: str | int):
super().__init__(
message=f"{resource} not found: {identifier}",
code="NOT_FOUND",
status_code=HTTP_404_NOT_FOUND,
details={"resource": resource, "identifier": str(identifier)},
)
class ConflictError(AppError):
def __init__(self, message: str, field: str | None = None):
super().__init__(
message=message, code="CONFLICT",
status_code=HTTP_409_CONFLICT,
details={"field": field} if field else {},
)
class AuthenticationError(AppError):
def __init__(self, message: str = "Invalid credentials"):
super().__init__(message=message, code="UNAUTHORIZED", status_code=HTTP_401_UNAUTHORIZED)
class AuthorizationError(AppError):
def __init__(self, message: str = "Insufficient permissions"):
super().__init__(message=message, code="FORBIDDEN", status_code=HTTP_403_FORBIDDEN)
class ValidationError(AppError):
def __init__(self, message: str, errors: list[dict] | None = None):
super().__init__(
message=message, code="VALIDATION_ERROR",
status_code=HTTP_422_UNPROCESSABLE_ENTITY,
details={"errors": errors or []},
)
class RateLimitError(AppError):
def __init__(self, retry_after: int = 60):
super().__init__(
message="Rate limit exceeded", code="RATE_LIMITED",
status_code=HTTP_429_TOO_MANY_REQUESTS,
details={"retry_after": retry_after},
)
# Global error handler
async def app_error_handler(request: Request, exc: AppError) -> JSONResponse:
return JSONResponse(
status_code=exc.status_code,
content={
"error": {
"code": exc.code,
"message": exc.message,
"details": exc.details,
}
},
)
# Register in app factory
# app.add_exception_handler(AppError, app_error_handler)
6 Error Handling Rules
- Never return bare strings — always structured
{"error": {"code", "message", "details"}} - Use domain-specific errors —
NotFoundError("User", user_id)notHTTPException(404) - Global handler catches all — register
AppErrorhandler in app factory - Log server errors, don't expose — 5xx returns generic message, logs full traceback
- Include actionable details — which field failed, what's allowed, retry-after for rate limits
- Never leak internals — no stack traces, SQL queries, or file paths in responses
Phase 5: Authentication & Authorization
JWT + Dependency Injection Pattern
from fastapi import Depends, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import jwt, JWTError
from datetime import datetime, timedelta, timezone
security = HTTPBearer()
def create_access_token(user_id: str, roles: list[str], settings: Settings) -> str:
expire = datetime.now(timezone.utc) + timedelta(minutes=settings.jwt_expire_minutes)
payload = {
"sub": user_id,
"roles": roles,
"exp": expire,
"iat": datetime.now(timezone.utc),
}
return jwt.encode(payload, settings.jwt_secret.get_secret_value(), algorithm=settings.jwt_algorithm)
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Security(security),
settings: Settings = Depends(get_settings),
db: AsyncSession = Depends(get_db),
) -> User:
try:
payload = jwt.decode(
credentials.credentials,
settings.jwt_secret.get_secret_value(),
algorithms=[settings.jwt_algorithm],
)
user_id = payload.get("sub")
if not user_id:
raise AuthenticationError("Invalid token payload")
except JWTError:
raise AuthenticationError("Invalid or expired token")
user = await db.get(User, user_id)
if not user:
raise AuthenticationError("User not found")
return user
# Role-based authorization
def require_role(*roles: str):
async def checker(user: User = Depends(get_current_user)) -> User:
if not any(r in user.roles for r in roles):
raise AuthorizationError(f"Requires one of: {', '.join(roles)}")
return user
return checker
# Usage in router
@router.get("/admin/users")
async def list_users(
admin: User = Depends(require_role("admin", "superadmin")),
service: UserService = Depends(get_user_service),
):
return await service.list_all()
10-Point Security Checklist
| # | Check | Priority |
|---|---|---|
| 1 | JWT secret ≥ 256 bits, from env | P0 |
| 2 | Token expiry ≤ 30 min for access, ≤ 7 days refresh | P0 |
| 3 | Password hashed with bcrypt/argon2 | P0 |
| 4 | CORS configured per environment | P0 |
| 5 | Rate limiting on auth endpoints | P0 |
| 6 | HTTPS enforced (redirect HTTP) | P0 |
| 7 | Security headers (HSTS, CSP, X-Frame) | P1 |
| 8 | Input validation on ALL endpoints | P1 |
| 9 | SQL injection prevented (parameterized queries) | P0 |
| 10 | Dependency scanning (safety/pip-audit) | P1 |
Phase 6: Database Patterns
Async SQLAlchemy + Repository Pattern
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy import select, func
from uuid import uuid4, UUID
from datetime import datetime, timezone
# Engine setup
engine = create_async_engine(
settings.database_url.get_secret_value(),
pool_size=settings.db_pool_size,
max_overflow=settings.db_max_overflow,
pool_timeout=settings.db_pool_timeout,
pool_pre_ping=True, # Check connection health
echo=settings.debug,
)
SessionFactory = async_sessionmaker(engine, expire_on_commit=False)
async def get_db() -> AsyncGenerator[AsyncSession, None]:
async with SessionFactory() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
# Base model with common fields
class Base(DeclarativeBase):
pass
class TimestampMixin:
created_at: Mapped[datetime] = mapped_column(default=lambda: datetime.now(timezone.utc))
updated_at: Mapped[datetime] = mapped_column(
default=lambda: datetime.now(timezone.utc),
onupdate=lambda: datetime.now(timezone.utc),
)
# Repository pattern
class BaseRepository[T]:
def __init__(self, session: AsyncSession, model: type[T]):
self.session = session
self.model = model
async def get_by_id(self, id: UUID) -> T | None:
return await self.session.get(self.model, id)
async def get_or_raise(self, id: UUID) -> T:
entity = await self.get_by_id(id)
if not entity:
raise NotFoundError(self.model.__name__, str(id))
return entity
async def list(
self, *, offset: int = 0, limit: int = 20, **filters
) -> tuple[list[T], int]:
query = select(self.model)
count_query = select(func.count()).select_from(self.model)
for field, value in filters.items():
if value is not None:
query = query.where(getattr(self.model, field) == value)
count_query = count_query.where(getattr(self.model, field) == value)
total = await self.session.scalar(count_query) or 0
result = await self.session.execute(
query.offset(offset).limit(limit).order_by(self.model.created_at.desc())
)
return list(result.scalars().all()), total
async def create(self, entity: T) -> T:
self.session.add(entity)
await self.session.flush()
return entity
async def delete(self, entity: T) -> None:
await self.session.delete(entity)
ORM Selection Guide
| ORM | Best For | Async | Type Safety | Learning Curve |
|---|---|---|---|---|
| SQLAlchemy 2.0 | Complex queries, enterprise | ✅ | ✅ Mapped[] | Medium |
| SQLModel | Simple CRUD, Pydantic sync | ✅ | ✅ | Low |
| Tortoise | Django-like feel | ✅ | Partial | Low |
| Piccolo | Modern, migrations built-in | ✅ | ✅ | Low |
Recommendation: SQLAlchemy 2.0 for production. SQLModel for prototypes.
Migration Strategy (Alembic)
# Setup
alembic init migrations
# Edit alembic.ini: sqlalchemy.url = from env
# Generate migration
alembic revision --autogenerate -m "add users table"
# Apply
alembic upgrade head
# Rollback
alembic downgrade -1
Migration Rules:
- Always review autogenerated migrations before applying
- Never edit applied migrations — create new ones
- Test migrations in staging before production
- Include
downgrade()for everyupgrade() - Use
batch_alter_tablefor SQLite compatibility
Phase 7: Testing Strategy
Test Pyramid
| Level | Coverage Target | Tools | Focus |
|---|---|---|---|
| Unit | 80%+ | pytest, unittest.mock | Service logic, validators |
| Integration | Key paths | pytest-asyncio, testcontainers | DB queries, external APIs |
| E2E | Critical flows | httpx.AsyncClient | Full request→response |
| Contract | API boundaries | schemathesis | OpenAPI compliance |
Test Patterns
import pytest
from httpx import AsyncClient, ASGITransport
from app.main import create_app
@pytest.fixture
async def app():
app = create_app()
yield app
@pytest.fixture
async def client(app):
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
yield ac
@pytest.fixture
async def auth_client(client, test_user):
token = create_access_token(test_user.id, test_user.roles)
client.headers["Authorization"] = f"Bearer {token}"
return client
# E2E test
@pytest.mark.asyncio
async def test_create_user(client: AsyncClient):
response = await client.post("/api/users", json={
"email": "test@example.com",
"name": "Test User",
"password": "securepass123",
})
assert response.status_code == 201
data = response.json()
assert data["email"] == "test@example.com"
assert "password" not in data # Never expose
# Unit test (service layer)
@pytest.mark.asyncio
async def test_user_service_duplicate_email(user_service, mock_repo):
mock_repo.get_by_email.return_value = existing_user
with pytest.raises(ConflictError, match="Email already registered"):
await user_service.create(UserCreate(email="taken@example.com", ...))
# Parametrized validation
@pytest.mark.parametrize("email,expected", [
("valid@example.com", True),
("invalid", False),
("", False),
("a@b.c", True),
])
def test_email_validation(email, expected):
if expected:
UserCreate(email=email, name="Test", password="12345678")
else:
with pytest.raises(ValidationError):
UserCreate(email=email, name="Test", password="12345678")
7 Testing Rules
- Test services, not routers — business logic lives in services
- Use fixtures for DI override — swap real DB with test DB via
app.dependency_overrides - One assertion per test — clear what broke when it fails
- Test error paths — 40% of tests should be sad-path
- Use factories for test data —
UserFactory.create()not manual dict construction - Async tests need
@pytest.mark.asyncio— or setasyncio_mode = "auto"in config - Run tests in CI — block merge if tests fail
Phase 8: Structured Logging & Observability
Structlog Setup
import structlog
from uuid import uuid4
from starlette.middleware.base import BaseHTTPMiddleware
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.stdlib.add_log_level,
structlog.stdlib.add_logger_name,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer(),
],
logger_factory=structlog.stdlib.LoggerFactory(),
)
logger = structlog.get_logger()
# Request ID middleware
class RequestIDMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
request_id = request.headers.get("X-Request-ID", str(uuid4()))
structlog.contextvars.clear_contextvars()
structlog.contextvars.bind_contextvars(
request_id=request_id,
method=request.method,
path=request.url.path,
)
response = await call_next(request)
response.headers["X-Request-ID"] = request_id
logger.info(
"request_completed",
status_code=response.status_code,
)
return response
Health Check Endpoints
@router.get("/health")
async def health():
"""Liveness probe — is the process running?"""
return {"status": "ok"}
@router.get("/ready")
async def ready(db: AsyncSession = Depends(get_db)):
"""Readiness probe — can we serve traffic?"""
checks = {}
try:
await db.execute(text("SELECT 1"))
checks["database"] = "ok"
except Exception:
checks["database"] = "error"
all_ok = all(v == "ok" for v in checks.values())
return JSONResponse(
status_code=200 if all_ok else 503,
content={"status": "ok" if all_ok else "degraded", "checks": checks},
)
Phase 9: Performance Optimization
Priority Stack
| # | Technique | Impact | Effort |
|---|---|---|---|
| 1 | Async database queries | High | Low |
| 2 | Connection pooling (tuned) | High | Low |
| 3 | Response caching (Redis) | High | Medium |
| 4 | Background tasks for heavy work | High | Low |
| 5 | Pagination on all list endpoints | Medium | Low |
| 6 | Select only needed columns | Medium | Low |
| 7 | Eager loading (joinedload) | Medium | Medium |
| 8 | Rate limiting | Medium | Low |
Background Tasks
from fastapi import BackgroundTasks
@router.post("/users", status_code=201)
async def create_user(
user_in: UserCreate,
background_tasks: BackgroundTasks,
service: UserService = Depends(get_user_service),
):
user = await service.create(user_in)
background_tasks.add_task(send_welcome_email, user.email, user.name)
return user
Caching Pattern
from redis.asyncio import Redis
import json
class CacheService:
def __init__(self, redis: Redis):
self.redis = redis
async def get_or_set(self, key: str, factory, ttl: int = 300):
cached = await self.redis.get(key)
if cached:
return json.loads(cached)
result = await factory()
await self.redis.setex(key, ttl, json.dumps(result, default=str))
return result
async def invalidate(self, pattern: str):
keys = await self.redis.keys(pattern)
if keys:
await self.redis.delete(*keys)
Phase 10: Production Deployment
Multi-Stage Dockerfile
# Build stage
FROM python:3.12-slim AS builder
WORKDIR /app
RUN pip install --no-cache-dir uv
COPY pyproject.toml uv.lock ./
RUN uv sync --frozen --no-dev --no-editable
# Production stage
FROM python:3.12-slim
WORKDIR /app
RUN adduser --disabled-password --no-create-home appuser
COPY --from=builder /app/.venv /app/.venv
COPY src/ ./src/
COPY migrations/ ./migrations/
COPY alembic.ini ./
ENV PATH="/app/.venv/bin:$PATH"
ENV PYTHONUNBUFFERED=1
ENV PYTHONDONTWRITEBYTECODE=1
USER appuser
EXPOSE 8000
HEALTHCHECK --interval=30s --timeout=5s --retries=3 \
CMD ["python", "-c", "import httpx; httpx.get('http://localhost:8000/health').raise_for_status()"]
CMD ["uvicorn", "src.app.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
App Factory Pattern
from fastapi import FastAPI
from contextlib import asynccontextmanager
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
logger.info("starting_up", environment=settings.environment)
await init_db()
yield
# Shutdown
logger.info("shutting_down")
await engine.dispose()
def create_app() -> FastAPI:
settings = get_settings()
app = FastAPI(
title=settings.app_name,
lifespan=lifespan,
docs_url="/docs" if settings.debug else None,
redoc_url=None,
)
# Middleware (order matters — last added = first executed)
app.add_middleware(
CORSMiddleware,
allow_origins=settings.cors_origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.add_middleware(RequestIDMiddleware)
# Error handlers
app.add_exception_handler(AppError, app_error_handler)
# Routers
app.include_router(auth_router, prefix="/api/auth", tags=["auth"])
app.include_router(users_router, prefix="/api/users", tags=["users"])
app.include_router(health_router, tags=["health"])
return app
app = create_app()
GitHub Actions CI
name: CI
on: [push, pull_request]
jobs:
test:
runs-on: ubuntu-latest
services:
postgres:
image: postgres:16
env:
POSTGRES_PASSWORD: test
POSTGRES_DB: testdb
ports: ["5432:5432"]
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with: { python-version: "3.12" }
- run: pip install uv && uv sync
- run: uv run ruff check .
- run: uv run mypy src/
- run: uv run pytest --cov=src --cov-report=xml -x
env:
DATABASE_URL: postgresql+asyncpg://postgres:test@localhost:5432/testdb
JWT_SECRET: test-secret-key-at-least-32-chars
Production Checklist
P0 — Mandatory:
- All secrets from environment variables (SecretStr)
- HTTPS enforced
- CORS configured per environment
- Rate limiting on auth endpoints
- Input validation on all endpoints
- Structured error responses (no stack traces)
- Health + readiness endpoints
- Database connection pooling
- Migrations run before deploy
- Structured logging (JSON)
- Tests passing in CI
P1 — Recommended:
- OpenTelemetry tracing
- Prometheus metrics endpoint
- Background task queue (Celery/ARQ)
- Redis caching layer
- API versioning strategy
- Request/response logging
- Dependency security scanning
- Performance benchmarks established
Phase 11: Advanced Patterns
Middleware Stack Order
# Applied bottom-to-top (last added = first executed)
app.add_middleware(GZipMiddleware, minimum_size=1000) # 5. Compress
app.add_middleware(CORSMiddleware, ...) # 4. CORS
app.add_middleware(RequestIDMiddleware) # 3. Request ID
app.add_middleware(RateLimitMiddleware) # 2. Rate limit
app.add_middleware(TrustedHostMiddleware, allowed=["*"]) # 1. Host check
Pagination with Cursor-Based Option
from fastapi import Query
class PaginationParams:
def __init__(
self,
page: int = Query(1, ge=1, description="Page number"),
page_size: int = Query(20, ge=1, le=100, description="Items per page"),
):
self.offset = (page - 1) * page_size
self.limit = page_size
self.page = page
self.page_size = page_size
@router.get("/users", response_model=PaginatedResponse[UserResponse])
async def list_users(
pagination: PaginationParams = Depends(),
service: UserService = Depends(get_user_service),
):
items, total = await service.list(
offset=pagination.offset, limit=pagination.limit
)
return PaginatedResponse(
items=items, total=total,
page=pagination.page, page_size=pagination.page_size,
has_next=(pagination.offset + pagination.limit) < total,
)
WebSocket Pattern
from fastapi import WebSocket, WebSocketDisconnect
class ConnectionManager:
def __init__(self):
self.connections: dict[str, WebSocket] = {}
async def connect(self, user_id: str, ws: WebSocket):
await ws.accept()
self.connections[user_id] = ws
def disconnect(self, user_id: str):
self.connections.pop(user_id, None)
async def send(self, user_id: str, message: dict):
if ws := self.connections.get(user_id):
await ws.send_json(message)
manager = ConnectionManager()
@router.websocket("/ws/{user_id}")
async def websocket_endpoint(websocket: WebSocket, user_id: str):
await manager.connect(user_id, websocket)
try:
while True:
data = await websocket.receive_json()
# Process message
except WebSocketDisconnect:
manager.disconnect(user_id)
File Upload Pattern
from fastapi import UploadFile, File
@router.post("/upload")
async def upload_file(
file: UploadFile = File(..., description="File to upload"),
user: User = Depends(get_current_user),
):
# Validate
if file.size and file.size > 10 * 1024 * 1024: # 10MB
raise ValidationError("File too large (max 10MB)")
allowed_types = {"image/jpeg", "image/png", "application/pdf"}
if file.content_type not in allowed_types:
raise ValidationError(f"File type not allowed: {file.content_type}")
# Save
contents = await file.read()
path = f"uploads/{user.id}/{file.filename}"
# Save to S3/local storage...
return {"filename": file.filename, "size": len(contents)}
Phase 12: Common Mistakes
| # | Mistake | Fix |
|---|---|---|
| 1 | Sync database calls in async app | Use async SQLAlchemy/databases |
| 2 | Business logic in route handlers | Move to service layer |
| 3 | No input validation | Pydantic models on every endpoint |
| 4 | Returning ORM models directly | Use response schemas (from_attributes) |
| 5 | Hardcoded config values | Pydantic Settings + env vars |
| 6 | No error handling strategy | Custom exception hierarchy + global handler |
| 7 | Missing health checks | /health + /ready endpoints |
| 8 | print() for logging | structlog with JSON output |
| 9 | No pagination on list endpoints | Default limit, max cap (100) |
| 10 | Testing against production DB | Test fixtures with separate DB |
Quality Scoring (0–100)
| Dimension | Weight | 0–25 | 50 | 75 | 100 |
|---|---|---|---|---|---|
| Type Safety | 15% | No types | Partial Pydantic | Full schemas | Strict mypy pass |
| Error Handling | 15% | Bare HTTPException | Custom errors | Full hierarchy | + monitoring |
| Testing | 15% | None | Happy path | 80%+ coverage | + contract tests |
| Security | 15% | No auth | Basic JWT | + RBAC + rate limit | + scanning + audit |
| Performance | 10% | Sync everything | Async DB | + caching | + profiling |
| Observability | 10% | print() | Structured logs | + tracing | + metrics + alerts |
| Database | 10% | Raw SQL | ORM + migrations | + repository pattern | + connection tuning |
| Deployment | 10% | Manual | Dockerfile | + CI/CD | + health + rollback |
Scoring: Your Score = Σ (dimension score × weight). < 40 = critical, 40–60 = needs work, 60–80 = solid, 80+ = production-grade.
10 Commandments of FastAPI Production
- Pydantic models at every boundary — request, response, config
- Async all the way down — one sync call blocks the event loop
- Services own business logic — routers are thin wrappers
- Dependency injection for testability —
Depends()is your best friend - Structured errors, structured logs — JSON everything
- Health checks are non-negotiable — liveness + readiness
- Test the sad paths — 40% of tests should be error cases
- Migrations before deployment — never modify schema manually
- Secrets in environment, never in code —
SecretStrenforces this - Profile before optimizing — measure, don't guess
Natural Language Commands
audit my FastAPI project→ Run health check, identify gapsset up a new FastAPI project→ Generate project structure + configadd authentication to my API→ JWT + RBAC dependency patterncreate a CRUD feature for [resource]→ Full router/service/repo/schemasoptimize my database queries→ Connection pooling + async + N+1 preventionadd structured logging→ Structlog + request ID middlewarewrite tests for [feature]→ Async test patterns + fixturesprepare for production deployment→ Dockerfile + CI + checklistadd caching to my API→ Redis caching patternset up error handling→ Custom exception hierarchy + global handleradd WebSocket support→ Connection manager patternreview my API security→ 10-point security checklist audit
⚡ Level up your FastAPI APIs → Get the AfrexAI SaaS Context Pack ($47) for complete SaaS architecture, pricing strategies, and go-to-market playbooks.
🔗 More free skills by AfrexAI:
- afrexai-python-production — Python production engineering
- afrexai-api-architecture — API design methodology
- afrexai-database-engineering — Database patterns
- afrexai-test-automation-engineering — Testing strategy
- afrexai-cicd-engineering — CI/CD pipeline design
🛒 Browse all packs → AfrexAI Storefront