event-driven-architecture

Event-Driven Architecture

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "event-driven-architecture" with this command: npx skills add doanchienthangdev/omgkit/doanchienthangdev-omgkit-event-driven-architecture

Event-Driven Architecture

Implement event-driven systems with event sourcing, CQRS, and message queues. This skill covers distributed patterns for scalable, resilient applications.

Purpose

Build loosely coupled, scalable systems:

  • Implement event sourcing for audit trails

  • Apply CQRS for read/write optimization

  • Use message queues for async processing

  • Handle distributed transactions with sagas

  • Ensure eventual consistency

  • Build replay and recovery capabilities

Features

  1. Event Sourcing

// Event definitions interface DomainEvent { eventId: string; eventType: string; aggregateId: string; aggregateType: string; timestamp: Date; version: number; data: Record<string, any>; metadata: { userId?: string; correlationId?: string; causationId?: string; }; }

// Order aggregate events type OrderEvent = | { type: 'OrderCreated'; data: { customerId: string; items: OrderItem[] } } | { type: 'OrderItemAdded'; data: { item: OrderItem } } | { type: 'OrderItemRemoved'; data: { itemId: string } } | { type: 'OrderSubmitted'; data: { submittedAt: Date } } | { type: 'PaymentReceived'; data: { paymentId: string; amount: number } } | { type: 'OrderShipped'; data: { trackingNumber: string; carrier: string } } | { type: 'OrderDelivered'; data: { deliveredAt: Date } } | { type: 'OrderCancelled'; data: { reason: string } };

// Event store class EventStore { async append( aggregateId: string, events: DomainEvent[], expectedVersion: number ): Promise<void> { // Optimistic concurrency check const currentVersion = await this.getVersion(aggregateId);

if (currentVersion !== expectedVersion) {
  throw new ConcurrencyError(
    `Expected version ${expectedVersion}, but found ${currentVersion}`
  );
}

// Append events atomically
await db.$transaction(async (tx) => {
  for (let i = 0; i &#x3C; events.length; i++) {
    await tx.event.create({
      data: {
        ...events[i],
        version: expectedVersion + i + 1,
      },
    });
  }
});

// Publish to event bus
for (const event of events) {
  await eventBus.publish(event);
}

}

async getEvents( aggregateId: string, fromVersion?: number ): Promise<DomainEvent[]> { return db.event.findMany({ where: { aggregateId, version: fromVersion ? { gt: fromVersion } : undefined, }, orderBy: { version: 'asc' }, }); }

async getVersion(aggregateId: string): Promise<number> { const lastEvent = await db.event.findFirst({ where: { aggregateId }, orderBy: { version: 'desc' }, });

return lastEvent?.version ?? 0;

} }

// Aggregate with event sourcing class OrderAggregate { private id: string; private state: OrderState; private version: number = 0; private uncommittedEvents: OrderEvent[] = [];

static async load(eventStore: EventStore, id: string): Promise<OrderAggregate> { const aggregate = new OrderAggregate(id); const events = await eventStore.getEvents(id);

for (const event of events) {
  aggregate.apply(event, false);
}

return aggregate;

}

// Command handlers create(customerId: string, items: OrderItem[]): void { if (this.state) { throw new Error('Order already exists'); }

this.applyChange({
  type: 'OrderCreated',
  data: { customerId, items },
});

}

addItem(item: OrderItem): void { this.ensureState(['draft']);

this.applyChange({
  type: 'OrderItemAdded',
  data: { item },
});

}

submit(): void { this.ensureState(['draft']);

if (this.state.items.length === 0) {
  throw new Error('Cannot submit empty order');
}

this.applyChange({
  type: 'OrderSubmitted',
  data: { submittedAt: new Date() },
});

}

// Event application private apply(event: OrderEvent, isNew: boolean): void { switch (event.type) { case 'OrderCreated': this.state = { status: 'draft', customerId: event.data.customerId, items: event.data.items, total: this.calculateTotal(event.data.items), }; break;

  case 'OrderItemAdded':
    this.state.items.push(event.data.item);
    this.state.total = this.calculateTotal(this.state.items);
    break;

  case 'OrderSubmitted':
    this.state.status = 'submitted';
    this.state.submittedAt = event.data.submittedAt;
    break;

  // ... other event handlers
}

this.version++;

if (isNew) {
  this.uncommittedEvents.push(event);
}

}

private applyChange(event: OrderEvent): void { this.apply(event, true); }

async save(eventStore: EventStore): Promise<void> { const domainEvents = this.uncommittedEvents.map((e, i) => ({ eventId: uuid(), eventType: e.type, aggregateId: this.id, aggregateType: 'Order', timestamp: new Date(), version: this.version - this.uncommittedEvents.length + i + 1, data: e.data, metadata: {}, }));

await eventStore.append(
  this.id,
  domainEvents,
  this.version - this.uncommittedEvents.length
);

this.uncommittedEvents = [];

} }

  1. CQRS Pattern

// Command side (writes) interface Command { type: string; payload: any; metadata: { userId: string; timestamp: Date; correlationId: string; }; }

class CommandBus { private handlers = new Map<string, CommandHandler>();

register(commandType: string, handler: CommandHandler): void { this.handlers.set(commandType, handler); }

async dispatch(command: Command): Promise<void> { const handler = this.handlers.get(command.type);

if (!handler) {
  throw new Error(`No handler for command: ${command.type}`);
}

await handler.handle(command);

} }

// Command handler class CreateOrderHandler implements CommandHandler { constructor( private eventStore: EventStore, private orderRepository: OrderRepository ) {}

async handle(command: CreateOrderCommand): Promise<void> { const order = new OrderAggregate(uuid()); order.create(command.payload.customerId, command.payload.items); await order.save(this.eventStore); } }

// Query side (reads) interface Query { type: string; params: any; }

class QueryBus { private handlers = new Map<string, QueryHandler>();

register(queryType: string, handler: QueryHandler): void { this.handlers.set(queryType, handler); }

async execute<T>(query: Query): Promise<T> { const handler = this.handlers.get(query.type);

if (!handler) {
  throw new Error(`No handler for query: ${query.type}`);
}

return handler.handle(query);

} }

// Read model projection class OrderReadModel { async project(event: DomainEvent): Promise<void> { switch (event.eventType) { case 'OrderCreated': await db.orderView.create({ data: { id: event.aggregateId, customerId: event.data.customerId, status: 'draft', itemCount: event.data.items.length, total: event.data.total, createdAt: event.timestamp, }, }); break;

  case 'OrderSubmitted':
    await db.orderView.update({
      where: { id: event.aggregateId },
      data: {
        status: 'submitted',
        submittedAt: event.data.submittedAt,
      },
    });
    break;

  case 'OrderShipped':
    await db.orderView.update({
      where: { id: event.aggregateId },
      data: {
        status: 'shipped',
        trackingNumber: event.data.trackingNumber,
      },
    });
    break;
}

}

// Rebuild projection from events async rebuild(): Promise<void> { // Clear existing read model await db.orderView.deleteMany();

// Replay all events
const events = await eventStore.getAllEvents();

for (const event of events) {
  await this.project(event);
}

} }

  1. Message Queues with RabbitMQ

import amqp from 'amqplib';

class RabbitMQBroker { private connection: amqp.Connection; private channel: amqp.Channel;

async connect(): Promise<void> { this.connection = await amqp.connect(process.env.RABBITMQ_URL!); this.channel = await this.connection.createChannel();

// Setup exchanges
await this.channel.assertExchange('events', 'topic', { durable: true });
await this.channel.assertExchange('commands', 'direct', { durable: true });
await this.channel.assertExchange('dlx', 'fanout', { durable: true });

}

async publish(exchange: string, routingKey: string, message: any): Promise<void> { const content = Buffer.from(JSON.stringify(message));

this.channel.publish(exchange, routingKey, content, {
  persistent: true,
  contentType: 'application/json',
  messageId: uuid(),
  timestamp: Date.now(),
});

}

async subscribe( queue: string, exchange: string, routingKey: string, handler: (message: any) => Promise<void> ): Promise<void> { // Setup queue with dead letter exchange await this.channel.assertQueue(queue, { durable: true, deadLetterExchange: 'dlx', deadLetterRoutingKey: ${queue}.dlq, });

await this.channel.bindQueue(queue, exchange, routingKey);

// Consume messages
await this.channel.consume(queue, async (msg) => {
  if (!msg) return;

  try {
    const content = JSON.parse(msg.content.toString());
    await handler(content);
    this.channel.ack(msg);
  } catch (error) {
    console.error('Message processing failed:', error);

    // Retry or dead-letter
    const retryCount = (msg.properties.headers?.['x-retry-count'] || 0) + 1;

    if (retryCount &#x3C; 3) {
      // Retry with exponential backoff
      setTimeout(() => {
        this.channel.publish(exchange, routingKey, msg.content, {
          ...msg.properties,
          headers: {
            ...msg.properties.headers,
            'x-retry-count': retryCount,
          },
        });
        this.channel.ack(msg);
      }, Math.pow(2, retryCount) * 1000);
    } else {
      // Send to dead letter queue
      this.channel.reject(msg, false);
    }
  }
});

} }

// Event publishing class EventPublisher { constructor(private broker: RabbitMQBroker) {}

async publish(event: DomainEvent): Promise<void> { const routingKey = ${event.aggregateType}.${event.eventType}; await this.broker.publish('events', routingKey, event); } }

// Event consumer class OrderEventConsumer { constructor( private broker: RabbitMQBroker, private readModel: OrderReadModel ) {}

async start(): Promise<void> { await this.broker.subscribe( 'order-projector', 'events', 'Order.*', async (event) => { await this.readModel.project(event); } ); } }

  1. Saga Pattern for Distributed Transactions

// Saga orchestrator interface SagaStep { name: string; execute: (context: SagaContext) => Promise<void>; compensate: (context: SagaContext) => Promise<void>; }

class SagaOrchestrator { private steps: SagaStep[] = []; private executedSteps: SagaStep[] = [];

addStep(step: SagaStep): this { this.steps.push(step); return this; }

async execute(context: SagaContext): Promise<void> { try { for (const step of this.steps) { console.log(Executing step: ${step.name}); await step.execute(context); this.executedSteps.push(step); } } catch (error) { console.error('Saga failed, compensating...', error); await this.compensate(context); throw error; } }

private async compensate(context: SagaContext): Promise<void> { // Execute compensation in reverse order for (const step of this.executedSteps.reverse()) { try { console.log(Compensating step: ${step.name}); await step.compensate(context); } catch (error) { console.error(Compensation failed for ${step.name}:, error); // Log for manual intervention await this.logCompensationFailure(step, context, error); } } } }

// Order saga example const createOrderSaga = new SagaOrchestrator() .addStep({ name: 'Reserve Inventory', execute: async (ctx) => { const reservation = await inventoryService.reserve(ctx.items); ctx.reservationId = reservation.id; }, compensate: async (ctx) => { if (ctx.reservationId) { await inventoryService.releaseReservation(ctx.reservationId); } }, }) .addStep({ name: 'Process Payment', execute: async (ctx) => { const payment = await paymentService.charge(ctx.customerId, ctx.total); ctx.paymentId = payment.id; }, compensate: async (ctx) => { if (ctx.paymentId) { await paymentService.refund(ctx.paymentId); } }, }) .addStep({ name: 'Create Order', execute: async (ctx) => { const order = await orderService.create({ customerId: ctx.customerId, items: ctx.items, paymentId: ctx.paymentId, reservationId: ctx.reservationId, }); ctx.orderId = order.id; }, compensate: async (ctx) => { if (ctx.orderId) { await orderService.cancel(ctx.orderId); } }, }) .addStep({ name: 'Send Confirmation', execute: async (ctx) => { await notificationService.sendOrderConfirmation(ctx.orderId); }, compensate: async (ctx) => { // No compensation needed for notifications }, });

// Execute saga async function handleCreateOrder(command: CreateOrderCommand): Promise<void> { const context: SagaContext = { customerId: command.customerId, items: command.items, total: calculateTotal(command.items), };

await createOrderSaga.execute(context); }

  1. Kafka Streaming

import { Kafka, Producer, Consumer, EachMessagePayload } from 'kafkajs';

class KafkaService { private kafka: Kafka; private producer: Producer; private consumers: Map<string, Consumer> = new Map();

constructor() { this.kafka = new Kafka({ clientId: process.env.SERVICE_NAME, brokers: (process.env.KAFKA_BROKERS || '').split(','), }); }

async connect(): Promise<void> { this.producer = this.kafka.producer({ idempotent: true, maxInFlightRequests: 5, });

await this.producer.connect();

}

async publish(topic: string, messages: KafkaMessage[]): Promise<void> { await this.producer.send({ topic, messages: messages.map(m => ({ key: m.key, value: JSON.stringify(m.value), headers: m.headers, partition: m.partition, })), }); }

async subscribe( groupId: string, topics: string[], handler: (payload: EachMessagePayload) => Promise<void> ): Promise<void> { const consumer = this.kafka.consumer({ groupId, sessionTimeout: 30000, heartbeatInterval: 3000, });

await consumer.connect();
await consumer.subscribe({ topics, fromBeginning: false });

await consumer.run({
  eachMessage: async (payload) => {
    try {
      await handler(payload);
    } catch (error) {
      console.error('Message processing failed:', error);
      // Implement retry/DLQ logic
    }
  },
});

this.consumers.set(groupId, consumer);

}

async disconnect(): Promise<void> { await this.producer.disconnect(); for (const consumer of this.consumers.values()) { await consumer.disconnect(); } } }

// Stream processing class OrderStreamProcessor { constructor(private kafka: KafkaService) {}

async start(): Promise<void> { await this.kafka.subscribe( 'order-processor', ['order-events'], async ({ topic, partition, message }) => { const event = JSON.parse(message.value?.toString() || '{}');

    switch (event.type) {
      case 'OrderCreated':
        await this.handleOrderCreated(event);
        break;
      case 'OrderCompleted':
        await this.handleOrderCompleted(event);
        break;
    }
  }
);

}

private async handleOrderCreated(event: any): Promise<void> { // Update analytics await analyticsService.recordOrder(event.data);

// Trigger downstream processes
await this.kafka.publish('inventory-commands', [{
  key: event.aggregateId,
  value: {
    type: 'ReserveInventory',
    orderId: event.aggregateId,
    items: event.data.items,
  },
}]);

} }

Use Cases

  1. Order Processing System

// Complete order workflow async function processOrder(orderId: string): Promise<void> { const saga = new SagaOrchestrator() .addStep(reserveInventoryStep) .addStep(processPaymentStep) .addStep(createShipmentStep) .addStep(sendNotificationStep);

await saga.execute({ orderId }); }

  1. Real-time Analytics

// Stream aggregation const orderTotalsStream = kafka.subscribe( 'analytics-aggregator', ['order-events'], async (event) => { await updateDailySales(event.data.total); await updateProductMetrics(event.data.items); } );

Best Practices

Do's

  • Design events as facts - Immutable, past-tense naming

  • Implement idempotent handlers - Handle duplicates gracefully

  • Plan for event versioning - Schema evolution

  • Use dead letter queues - Handle failures

  • Monitor queue depths - Alert on backlogs

  • Test with chaos - Simulate failures

Don'ts

  • Don't couple services through shared databases

  • Don't ignore message ordering requirements

  • Don't skip compensation logic

  • Don't forget about exactly-once semantics

  • Don't over-engineer for simple use cases

  • Don't ignore backpressure

Related Skills

  • redis - Pub/sub and caching

  • real-time-systems - WebSocket integration

  • backend-development - Service architecture

Reference Resources

  • Event Sourcing Pattern

  • CQRS Pattern

  • RabbitMQ Documentation

  • Apache Kafka Documentation

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Coding

postgresql

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

building-nestjs-apis

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

real-time-systems

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

managing-databases

No summary provided by upstream source.

Repository SourceNeeds Review