cqrs-implementation

Comprehensive guide to implementing CQRS (Command Query Responsibility Segregation) patterns.

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "cqrs-implementation" with this command: npx skills add wshobson/agents/wshobson-agents-cqrs-implementation

CQRS Implementation

Comprehensive guide to implementing CQRS (Command Query Responsibility Segregation) patterns.

When to Use This Skill

  • Separating read and write concerns

  • Scaling reads independently from writes

  • Building event-sourced systems

  • Optimizing complex query scenarios

  • Different read/write data models needed

  • High-performance reporting requirements

Core Concepts

  1. CQRS Architecture

                 ┌─────────────┐
                 │   Client    │
                 └──────┬──────┘
                        │
           ┌────────────┴────────────┐
           │                         │
           ▼                         ▼
    ┌─────────────┐          ┌─────────────┐
    │  Commands   │          │   Queries   │
    │    API      │          │    API      │
    └──────┬──────┘          └──────┬──────┘
           │                         │
           ▼                         ▼
    ┌─────────────┐          ┌─────────────┐
    │  Command    │          │   Query     │
    │  Handlers   │          │  Handlers   │
    └──────┬──────┘          └──────┬──────┘
           │                         │
           ▼                         ▼
    ┌─────────────┐          ┌─────────────┐
    │   Write     │─────────►│    Read     │
    │   Model     │  Events  │   Model     │
    └─────────────┘          └─────────────┘
    
  2. Key Components

Component Responsibility

Command Intent to change state

Command Handler Validates and executes commands

Event Record of state change

Query Request for data

Query Handler Retrieves data from read model

Projector Updates read model from events

Templates

Template 1: Command Infrastructure

from abc import ABC, abstractmethod from dataclasses import dataclass from typing import TypeVar, Generic, Dict, Any, Type from datetime import datetime import uuid

Command base

@dataclass class Command: command_id: str = None timestamp: datetime = None

def __post_init__(self):
    self.command_id = self.command_id or str(uuid.uuid4())
    self.timestamp = self.timestamp or datetime.utcnow()

Concrete commands

@dataclass class CreateOrder(Command): customer_id: str items: list shipping_address: dict

@dataclass class AddOrderItem(Command): order_id: str product_id: str quantity: int price: float

@dataclass class CancelOrder(Command): order_id: str reason: str

Command handler base

T = TypeVar('T', bound=Command)

class CommandHandler(ABC, Generic[T]): @abstractmethod async def handle(self, command: T) -> Any: pass

Command bus

class CommandBus: def init(self): self._handlers: Dict[Type[Command], CommandHandler] = {}

def register(self, command_type: Type[Command], handler: CommandHandler):
    self._handlers[command_type] = handler

async def dispatch(self, command: Command) -> Any:
    handler = self._handlers.get(type(command))
    if not handler:
        raise ValueError(f"No handler for {type(command).__name__}")
    return await handler.handle(command)

Command handler implementation

class CreateOrderHandler(CommandHandler[CreateOrder]): def init(self, order_repository, event_store): self.order_repository = order_repository self.event_store = event_store

async def handle(self, command: CreateOrder) -> str:
    # Validate
    if not command.items:
        raise ValueError("Order must have at least one item")

    # Create aggregate
    order = Order.create(
        customer_id=command.customer_id,
        items=command.items,
        shipping_address=command.shipping_address
    )

    # Persist events
    await self.event_store.append_events(
        stream_id=f"Order-{order.id}",
        stream_type="Order",
        events=order.uncommitted_events
    )

    return order.id

Template 2: Query Infrastructure

from abc import ABC, abstractmethod from dataclasses import dataclass from typing import TypeVar, Generic, List, Optional

Query base

@dataclass class Query: pass

Concrete queries

@dataclass class GetOrderById(Query): order_id: str

@dataclass class GetCustomerOrders(Query): customer_id: str status: Optional[str] = None page: int = 1 page_size: int = 20

@dataclass class SearchOrders(Query): query: str filters: dict = None sort_by: str = "created_at" sort_order: str = "desc"

Query result types

@dataclass class OrderView: order_id: str customer_id: str status: str total_amount: float item_count: int created_at: datetime shipped_at: Optional[datetime] = None

@dataclass class PaginatedResult(Generic[T]): items: List[T] total: int page: int page_size: int

@property
def total_pages(self) -> int:
    return (self.total + self.page_size - 1) // self.page_size

Query handler base

T = TypeVar('T', bound=Query) R = TypeVar('R')

class QueryHandler(ABC, Generic[T, R]): @abstractmethod async def handle(self, query: T) -> R: pass

Query bus

class QueryBus: def init(self): self._handlers: Dict[Type[Query], QueryHandler] = {}

def register(self, query_type: Type[Query], handler: QueryHandler):
    self._handlers[query_type] = handler

async def dispatch(self, query: Query) -> Any:
    handler = self._handlers.get(type(query))
    if not handler:
        raise ValueError(f"No handler for {type(query).__name__}")
    return await handler.handle(query)

Query handler implementation

class GetOrderByIdHandler(QueryHandler[GetOrderById, Optional[OrderView]]): def init(self, read_db): self.read_db = read_db

async def handle(self, query: GetOrderById) -> Optional[OrderView]:
    async with self.read_db.acquire() as conn:
        row = await conn.fetchrow(
            """
            SELECT order_id, customer_id, status, total_amount,
                   item_count, created_at, shipped_at
            FROM order_views
            WHERE order_id = $1
            """,
            query.order_id
        )
        if row:
            return OrderView(**dict(row))
        return None

class GetCustomerOrdersHandler(QueryHandler[GetCustomerOrders, PaginatedResult[OrderView]]): def init(self, read_db): self.read_db = read_db

async def handle(self, query: GetCustomerOrders) -> PaginatedResult[OrderView]:
    async with self.read_db.acquire() as conn:
        # Build query with optional status filter
        where_clause = "customer_id = $1"
        params = [query.customer_id]

        if query.status:
            where_clause += " AND status = $2"
            params.append(query.status)

        # Get total count
        total = await conn.fetchval(
            f"SELECT COUNT(*) FROM order_views WHERE {where_clause}",
            *params
        )

        # Get paginated results
        offset = (query.page - 1) * query.page_size
        rows = await conn.fetch(
            f"""
            SELECT order_id, customer_id, status, total_amount,
                   item_count, created_at, shipped_at
            FROM order_views
            WHERE {where_clause}
            ORDER BY created_at DESC
            LIMIT ${len(params) + 1} OFFSET ${len(params) + 2}
            """,
            *params, query.page_size, offset
        )

        return PaginatedResult(
            items=[OrderView(**dict(row)) for row in rows],
            total=total,
            page=query.page,
            page_size=query.page_size
        )

Template 3: FastAPI CQRS Application

from fastapi import FastAPI, HTTPException, Depends from pydantic import BaseModel from typing import List, Optional

app = FastAPI()

Request/Response models

class CreateOrderRequest(BaseModel): customer_id: str items: List[dict] shipping_address: dict

class OrderResponse(BaseModel): order_id: str customer_id: str status: str total_amount: float item_count: int created_at: datetime

Dependency injection

def get_command_bus() -> CommandBus: return app.state.command_bus

def get_query_bus() -> QueryBus: return app.state.query_bus

Command endpoints (POST, PUT, DELETE)

@app.post("/orders", response_model=dict) async def create_order( request: CreateOrderRequest, command_bus: CommandBus = Depends(get_command_bus) ): command = CreateOrder( customer_id=request.customer_id, items=request.items, shipping_address=request.shipping_address ) order_id = await command_bus.dispatch(command) return {"order_id": order_id}

@app.post("/orders/{order_id}/items") async def add_item( order_id: str, product_id: str, quantity: int, price: float, command_bus: CommandBus = Depends(get_command_bus) ): command = AddOrderItem( order_id=order_id, product_id=product_id, quantity=quantity, price=price ) await command_bus.dispatch(command) return {"status": "item_added"}

@app.delete("/orders/{order_id}") async def cancel_order( order_id: str, reason: str, command_bus: CommandBus = Depends(get_command_bus) ): command = CancelOrder(order_id=order_id, reason=reason) await command_bus.dispatch(command) return {"status": "cancelled"}

Query endpoints (GET)

@app.get("/orders/{order_id}", response_model=OrderResponse) async def get_order( order_id: str, query_bus: QueryBus = Depends(get_query_bus) ): query = GetOrderById(order_id=order_id) result = await query_bus.dispatch(query) if not result: raise HTTPException(status_code=404, detail="Order not found") return result

@app.get("/customers/{customer_id}/orders") async def get_customer_orders( customer_id: str, status: Optional[str] = None, page: int = 1, page_size: int = 20, query_bus: QueryBus = Depends(get_query_bus) ): query = GetCustomerOrders( customer_id=customer_id, status=status, page=page, page_size=page_size ) return await query_bus.dispatch(query)

@app.get("/orders/search") async def search_orders( q: str, sort_by: str = "created_at", query_bus: QueryBus = Depends(get_query_bus) ): query = SearchOrders(query=q, sort_by=sort_by) return await query_bus.dispatch(query)

Template 4: Read Model Synchronization

class ReadModelSynchronizer: """Keeps read models in sync with events."""

def __init__(self, event_store, read_db, projections: List[Projection]):
    self.event_store = event_store
    self.read_db = read_db
    self.projections = {p.name: p for p in projections}

async def run(self):
    """Continuously sync read models."""
    while True:
        for name, projection in self.projections.items():
            await self._sync_projection(projection)
        await asyncio.sleep(0.1)

async def _sync_projection(self, projection: Projection):
    checkpoint = await self._get_checkpoint(projection.name)

    events = await self.event_store.read_all(
        from_position=checkpoint,
        limit=100
    )

    for event in events:
        if event.event_type in projection.handles():
            try:
                await projection.apply(event)
            except Exception as e:
                # Log error, possibly retry or skip
                logger.error(f"Projection error: {e}")
                continue

        await self._save_checkpoint(projection.name, event.global_position)

async def rebuild_projection(self, projection_name: str):
    """Rebuild a projection from scratch."""
    projection = self.projections[projection_name]

    # Clear existing data
    await projection.clear()

    # Reset checkpoint
    await self._save_checkpoint(projection_name, 0)

    # Rebuild
    while True:
        checkpoint = await self._get_checkpoint(projection_name)
        events = await self.event_store.read_all(checkpoint, 1000)

        if not events:
            break

        for event in events:
            if event.event_type in projection.handles():
                await projection.apply(event)

        await self._save_checkpoint(
            projection_name,
            events[-1].global_position
        )

Template 5: Eventual Consistency Handling

class ConsistentQueryHandler: """Query handler that can wait for consistency."""

def __init__(self, read_db, event_store):
    self.read_db = read_db
    self.event_store = event_store

async def query_after_command(
    self,
    query: Query,
    expected_version: int,
    stream_id: str,
    timeout: float = 5.0
):
    """
    Execute query, ensuring read model is at expected version.
    Used for read-your-writes consistency.
    """
    start_time = time.time()

    while time.time() - start_time < timeout:
        # Check if read model is caught up
        projection_version = await self._get_projection_version(stream_id)

        if projection_version >= expected_version:
            return await self.execute_query(query)

        # Wait a bit and retry
        await asyncio.sleep(0.1)

    # Timeout - return stale data with warning
    return {
        "data": await self.execute_query(query),
        "_warning": "Data may be stale"
    }

async def _get_projection_version(self, stream_id: str) -> int:
    """Get the last processed event version for a stream."""
    async with self.read_db.acquire() as conn:
        return await conn.fetchval(
            "SELECT last_event_version FROM projection_state WHERE stream_id = $1",
            stream_id
        ) or 0

Best Practices

Do's

  • Separate command and query models - Different needs

  • Use eventual consistency - Accept propagation delay

  • Validate in command handlers - Before state change

  • Denormalize read models - Optimize for queries

  • Version your events - For schema evolution

Don'ts

  • Don't query in commands - Use only for writes

  • Don't couple read/write schemas - Independent evolution

  • Don't over-engineer - Start simple

  • Don't ignore consistency SLAs - Define acceptable lag

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Automation

tailwind-design-system

Tailwind Design System (v4)

Repository Source
31.3K19K
wshobson
Automation

api-design-principles

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

nodejs-backend-patterns

No summary provided by upstream source.

Repository SourceNeeds Review