cqrs

Command Query Responsibility Segregation for scalable architectures.

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "cqrs" with this command: npx skills add eyadsibai/ltk/eyadsibai-ltk-cqrs

CQRS Implementation

Command Query Responsibility Segregation for scalable architectures.

Architecture

          ┌────────────┴────────────┐
          │                         │
          ▼                         ▼
   ┌─────────────┐          ┌─────────────┐
   │  Commands   │          │   Queries   │
   │    API      │          │    API      │
   └──────┬──────┘          └──────┬──────┘
          │                         │
          ▼                         ▼
   ┌─────────────┐          ┌─────────────┐
   │   Write     │─────────►│    Read     │
   │   Model     │  Events  │   Model     │
   └─────────────┘          └─────────────┘

Command Infrastructure

@dataclass class Command: command_id: str = field(default_factory=lambda: str(uuid.uuid4())) timestamp: datetime = field(default_factory=datetime.utcnow)

@dataclass class CreateOrder(Command): customer_id: str items: list shipping_address: dict

class CommandHandler(ABC, Generic[T]): @abstractmethod async def handle(self, command: T) -> Any: pass

class CommandBus: def init(self): self._handlers: Dict[Type[Command], CommandHandler] = {}

def register(self, command_type, handler):
    self._handlers[command_type] = handler

async def dispatch(self, command: Command) -> Any:
    handler = self._handlers.get(type(command))
    return await handler.handle(command)

Query Infrastructure

@dataclass class GetOrderById(Query): order_id: str

@dataclass class OrderView: order_id: str customer_id: str status: str total_amount: float created_at: datetime

class GetOrderByIdHandler(QueryHandler[GetOrderById, OrderView]): async def handle(self, query: GetOrderById) -> Optional[OrderView]: row = await self.read_db.fetchrow( "SELECT * FROM order_views WHERE order_id = $1", query.order_id ) return OrderView(**dict(row)) if row else None

FastAPI Integration

Command endpoints (POST, PUT, DELETE)

@app.post("/orders") async def create_order(request: CreateOrderRequest, command_bus: CommandBus = Depends()): command = CreateOrder( customer_id=request.customer_id, items=request.items ) order_id = await command_bus.dispatch(command) return {"order_id": order_id}

Query endpoints (GET)

@app.get("/orders/{order_id}") async def get_order(order_id: str, query_bus: QueryBus = Depends()): query = GetOrderById(order_id=order_id) return await query_bus.dispatch(query)

Read Model Synchronization

class ReadModelSynchronizer: async def sync_projection(self, projection: Projection): checkpoint = await self._get_checkpoint(projection.name) events = await self.event_store.read_all(from_position=checkpoint)

    for event in events:
        if event.event_type in projection.handles():
            await projection.apply(event)
        await self._save_checkpoint(projection.name, event.position)

async def rebuild_projection(self, projection_name: str):
    projection = self.projections[projection_name]
    await projection.clear()
    await self._save_checkpoint(projection_name, 0)
    # Rebuild from beginning

Eventual Consistency

async def query_after_command(self, query, expected_version, stream_id, timeout=5.0): """Read-your-writes consistency.""" start = time.time() while time.time() - start < timeout: projection_version = await self._get_projection_version(stream_id) if projection_version >= expected_version: return await self.execute_query(query) await asyncio.sleep(0.1)

return {"data": await self.execute_query(query), "_warning": "May be stale"}

Best Practices

  • Separate command and query models - Different optimization needs

  • Accept eventual consistency - Define acceptable lag

  • Validate in command handlers - Before state change

  • Denormalize read models - Optimize for queries

  • Version your events - For schema evolution

When to Use CQRS

Good for:

  • Different read/write scaling needs

  • Complex query requirements

  • Event-sourced systems

  • High-performance reporting

Not for:

  • Simple CRUD applications

  • No scaling requirements

  • Small data sets

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

document-processing

No summary provided by upstream source.

Repository SourceNeeds Review
General

stripe-payments

No summary provided by upstream source.

Repository SourceNeeds Review
General

file-organization

No summary provided by upstream source.

Repository SourceNeeds Review
General

literature-review

No summary provided by upstream source.

Repository SourceNeeds Review