cqrs-patterns

Separate read and write concerns for optimized data access.

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "cqrs-patterns" with this command: npx skills add yonatangross/orchestkit/yonatangross-orchestkit-cqrs-patterns

CQRS Patterns

Separate read and write concerns for optimized data access.

Overview

  • Read-heavy workloads with complex queries

  • Different scaling requirements for reads vs writes

  • Event sourcing implementations

  • Multiple read model representations of same data

  • Complex domain models with simple read requirements

When NOT to Use

  • Simple CRUD applications

  • Strong consistency requirements everywhere

  • Small datasets with simple queries

Architecture Overview

┌─────────────────┐ ┌─────────────────┐ │ Write Side │ │ Read Side │ ├─────────────────┤ ├─────────────────┤ │ ┌───────────┐ │ │ ┌───────────┐ │ │ │ Commands │ │ │ │ Queries │ │ │ └─────┬─────┘ │ │ └─────┬─────┘ │ │ ┌─────▼─────┐ │ │ ┌─────▼─────┐ │ │ │ Aggregate │ │ │ │Read Model │ │ │ └─────┬─────┘ │ │ └───────────┘ │ │ ┌─────▼─────┐ │ │ ▲ │ │ │ Events │──┼─────────┼────────┘ │ │ └───────────┘ │ Publish │ Project │ └─────────────────┘ └─────────────────┘

Command Side (Write Model)

Command and Handler

from pydantic import BaseModel, Field from uuid import UUID, uuid4 from datetime import datetime, timezone from abc import ABC, abstractmethod

class Command(BaseModel): """Base command with metadata.""" command_id: UUID = Field(default_factory=uuid4) timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) user_id: UUID | None = None

class CreateOrder(Command): customer_id: UUID items: list[OrderItem] shipping_address: Address

class CommandHandler(ABC): @abstractmethod async def handle(self, command: Command) -> list["DomainEvent"]: pass

class CreateOrderHandler(CommandHandler): def init(self, order_repo, inventory_service): self.order_repo = order_repo self.inventory = inventory_service

async def handle(self, command: CreateOrder) -> list[DomainEvent]:
    for item in command.items:
        if not await self.inventory.check_availability(item.product_id, item.quantity):
            raise InsufficientInventoryError(item.product_id)

    order = Order.create(
        customer_id=command.customer_id,
        items=command.items,
        shipping_address=command.shipping_address,
    )
    await self.order_repo.save(order)
    return order.pending_events

class CommandBus: def init(self): self._handlers: dict[type, CommandHandler] = {}

def register(self, command_type: type, handler: CommandHandler):
    self._handlers[command_type] = handler

async def dispatch(self, command: Command) -> list[DomainEvent]:
    handler = self._handlers.get(type(command))
    if not handler:
        raise NoHandlerFoundError(type(command))
    events = await handler.handle(command)
    for event in events:
        await self.event_publisher.publish(event)
    return events

Write Model (Aggregate)

from dataclasses import dataclass, field

@dataclass class Order: id: UUID customer_id: UUID items: list[OrderItem] status: OrderStatus _pending_events: list[DomainEvent] = field(default_factory=list)

@classmethod
def create(cls, customer_id: UUID, items: list, shipping_address: Address) -> "Order":
    order = cls(id=uuid4(), customer_id=customer_id, items=[], status=OrderStatus.PENDING)
    for item in items:
        order.items.append(item)
        order._raise_event(OrderItemAdded(order_id=order.id, product_id=item.product_id))
    order._raise_event(OrderCreated(order_id=order.id, customer_id=customer_id))
    return order

def cancel(self, reason: str):
    if self.status == OrderStatus.SHIPPED:
        raise InvalidOperationError("Cannot cancel shipped order")
    self.status = OrderStatus.CANCELLED
    self._raise_event(OrderCancelled(order_id=self.id, reason=reason))

def _raise_event(self, event: DomainEvent):
    self._pending_events.append(event)

@property
def pending_events(self) -> list[DomainEvent]:
    events = self._pending_events.copy()
    self._pending_events.clear()
    return events

Query Side (Read Model)

Query Handler

class Query(BaseModel): pass

class GetOrderById(Query): order_id: UUID

class GetOrdersByCustomer(Query): customer_id: UUID status: OrderStatus | None = None page: int = 1 page_size: int = 20

class GetOrderByIdHandler: def init(self, read_db): self.db = read_db

async def handle(self, query: GetOrderById) -> OrderView | None:
    row = await self.db.fetchrow(
        "SELECT * FROM order_summary WHERE id = $1", query.order_id
    )
    return OrderView(**row) if row else None

class OrderView(BaseModel): """Denormalized read model for orders.""" id: UUID customer_id: UUID customer_name: str # Denormalized status: str total_amount: float item_count: int created_at: datetime

Projections

class OrderProjection: """Projects events to read models.""" def init(self, read_db, customer_service): self.db = read_db self.customers = customer_service

async def handle(self, event: DomainEvent):
    match event:
        case OrderCreated():
            await self._on_order_created(event)
        case OrderItemAdded():
            await self._on_item_added(event)
        case OrderCancelled():
            await self._on_order_cancelled(event)

async def _on_order_created(self, event: OrderCreated):
    customer = await self.customers.get(event.customer_id)
    await self.db.execute(
        """INSERT INTO order_summary (id, customer_id, customer_name, status, total_amount, item_count, created_at)
           VALUES ($1, $2, $3, 'pending', 0.0, 0, $4)
           ON CONFLICT (id) DO UPDATE SET customer_name = $3""",
        event.order_id, event.customer_id, customer.name, event.timestamp,
    )

async def _on_item_added(self, event: OrderItemAdded):
    subtotal = event.quantity * event.unit_price
    await self.db.execute(
        "UPDATE order_summary SET total_amount = total_amount + $1, item_count = item_count + 1 WHERE id = $2",
        subtotal, event.order_id,
    )

async def _on_order_cancelled(self, event: OrderCancelled):
    await self.db.execute(
        "UPDATE order_summary SET status = 'cancelled' WHERE id = $1", event.order_id
    )

FastAPI Integration

from fastapi import FastAPI, Depends, HTTPException

app = FastAPI()

@app.post("/api/v1/orders", status_code=201) async def create_order(request: CreateOrderRequest, bus: CommandBus = Depends(get_command_bus)): command = CreateOrder( customer_id=request.customer_id, items=request.items, shipping_address=request.shipping_address, ) try: events = await bus.dispatch(command) return {"order_id": events[0].order_id} except InsufficientInventoryError as e: raise HTTPException(400, f"Insufficient inventory: {e}")

@app.get("/api/v1/orders/{order_id}") async def get_order(order_id: UUID, bus: QueryBus = Depends(get_query_bus)): order = await bus.dispatch(GetOrderById(order_id=order_id)) if not order: raise HTTPException(404, "Order not found") return order

Key Decisions

Decision Recommendation

Consistency Eventual consistency between write and read models

Event storage Event store for write side, denormalized tables for read

Projection lag Monitor and alert on projection delay

Read model count Start with one, add more for specific query needs

Rebuild strategy Ability to rebuild projections from events

Anti-Patterns (FORBIDDEN)

NEVER query write model for reads

order = await aggregate_repo.get(order_id) # WRONG

CORRECT: Use read model

order = await query_bus.dispatch(GetOrderById(order_id=order_id))

NEVER modify read model directly

await read_db.execute("UPDATE orders SET status = $1", status) # WRONG

CORRECT: Dispatch command, let projection update

await bus.dispatch(UpdateOrderStatus(order_id=order_id, status=status))

NEVER skip projection idempotency - use UPSERT

await self.db.execute("INSERT INTO ... ON CONFLICT (id) DO UPDATE SET ...")

Related Skills

  • event-sourcing

  • Event-sourced write models

  • saga-patterns

  • Cross-aggregate transactions

  • database-schema-designer

  • Read model schema design

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

responsive-patterns

No summary provided by upstream source.

Repository SourceNeeds Review
General

domain-driven-design

No summary provided by upstream source.

Repository SourceNeeds Review
General

dashboard-patterns

No summary provided by upstream source.

Repository SourceNeeds Review
General

rag-retrieval

No summary provided by upstream source.

Repository SourceNeeds Review