Event Store Design
Comprehensive guide to designing event stores for event-sourced applications.
When to Use This Skill
-
Designing event sourcing infrastructure
-
Choosing between event store technologies
-
Implementing custom event stores
-
Optimizing event storage and retrieval
-
Setting up event store schemas
-
Planning for event store scaling
Core Concepts
- Event Store Architecture
┌─────────────────────────────────────────────────────┐ │ Event Store │ ├─────────────────────────────────────────────────────┤ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ Stream 1 │ │ Stream 2 │ │ Stream 3 │ │ │ │ (Aggregate) │ │ (Aggregate) │ │ (Aggregate) │ │ │ ├─────────────┤ ├─────────────┤ ├─────────────┤ │ │ │ Event 1 │ │ Event 1 │ │ Event 1 │ │ │ │ Event 2 │ │ Event 2 │ │ Event 2 │ │ │ │ Event 3 │ │ ... │ │ Event 3 │ │ │ │ ... │ │ │ │ Event 4 │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ ├─────────────────────────────────────────────────────┤ │ Global Position: 1 → 2 → 3 → 4 → 5 → 6 → ... │ └─────────────────────────────────────────────────────┘
- Event Store Requirements
Requirement Description
Append-only Events are immutable, only appends
Ordered Per-stream and global ordering
Versioned Optimistic concurrency control
Subscriptions Real-time event notifications
Idempotent Handle duplicate writes safely
Technology Comparison
Technology Best For Limitations
EventStoreDB Pure event sourcing Single-purpose
PostgreSQL Existing Postgres stack Manual implementation
Kafka High-throughput streaming Not ideal for per-stream queries
DynamoDB Serverless, AWS-native Query limitations
Marten .NET ecosystems .NET specific
Templates
Template 1: PostgreSQL Event Store Schema
-- Events table CREATE TABLE events ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), stream_id VARCHAR(255) NOT NULL, stream_type VARCHAR(255) NOT NULL, event_type VARCHAR(255) NOT NULL, event_data JSONB NOT NULL, metadata JSONB DEFAULT '{}', version BIGINT NOT NULL, global_position BIGSERIAL, created_at TIMESTAMPTZ DEFAULT NOW(),
CONSTRAINT unique_stream_version UNIQUE (stream_id, version)
);
-- Index for stream queries CREATE INDEX idx_events_stream_id ON events(stream_id, version);
-- Index for global subscription CREATE INDEX idx_events_global_position ON events(global_position);
-- Index for event type queries CREATE INDEX idx_events_event_type ON events(event_type);
-- Index for time-based queries CREATE INDEX idx_events_created_at ON events(created_at);
-- Snapshots table CREATE TABLE snapshots ( stream_id VARCHAR(255) PRIMARY KEY, stream_type VARCHAR(255) NOT NULL, snapshot_data JSONB NOT NULL, version BIGINT NOT NULL, created_at TIMESTAMPTZ DEFAULT NOW() );
-- Subscriptions checkpoint table CREATE TABLE subscription_checkpoints ( subscription_id VARCHAR(255) PRIMARY KEY, last_position BIGINT NOT NULL DEFAULT 0, updated_at TIMESTAMPTZ DEFAULT NOW() );
Template 2: Python Event Store Implementation
from dataclasses import dataclass, field from datetime import datetime from typing import Any, Optional, List from uuid import UUID, uuid4 import json import asyncpg
@dataclass class Event: stream_id: str event_type: str data: dict metadata: dict = field(default_factory=dict) event_id: UUID = field(default_factory=uuid4) version: Optional[int] = None global_position: Optional[int] = None created_at: datetime = field(default_factory=datetime.utcnow)
class EventStore: def init(self, pool: asyncpg.Pool): self.pool = pool
async def append_events(
self,
stream_id: str,
stream_type: str,
events: List[Event],
expected_version: Optional[int] = None
) -> List[Event]:
"""Append events to a stream with optimistic concurrency."""
async with self.pool.acquire() as conn:
async with conn.transaction():
# Check expected version
if expected_version is not None:
current = await conn.fetchval(
"SELECT MAX(version) FROM events WHERE stream_id = $1",
stream_id
)
current = current or 0
if current != expected_version:
raise ConcurrencyError(
f"Expected version {expected_version}, got {current}"
)
# Get starting version
start_version = await conn.fetchval(
"SELECT COALESCE(MAX(version), 0) + 1 FROM events WHERE stream_id = $1",
stream_id
)
# Insert events
saved_events = []
for i, event in enumerate(events):
event.version = start_version + i
row = await conn.fetchrow(
"""
INSERT INTO events (id, stream_id, stream_type, event_type,
event_data, metadata, version, created_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
RETURNING global_position
""",
event.event_id,
stream_id,
stream_type,
event.event_type,
json.dumps(event.data),
json.dumps(event.metadata),
event.version,
event.created_at
)
event.global_position = row['global_position']
saved_events.append(event)
return saved_events
async def read_stream(
self,
stream_id: str,
from_version: int = 0,
limit: int = 1000
) -> List[Event]:
"""Read events from a stream."""
async with self.pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT id, stream_id, event_type, event_data, metadata,
version, global_position, created_at
FROM events
WHERE stream_id = $1 AND version >= $2
ORDER BY version
LIMIT $3
""",
stream_id, from_version, limit
)
return [self._row_to_event(row) for row in rows]
async def read_all(
self,
from_position: int = 0,
limit: int = 1000
) -> List[Event]:
"""Read all events globally."""
async with self.pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT id, stream_id, event_type, event_data, metadata,
version, global_position, created_at
FROM events
WHERE global_position > $1
ORDER BY global_position
LIMIT $2
""",
from_position, limit
)
return [self._row_to_event(row) for row in rows]
async def subscribe(
self,
subscription_id: str,
handler,
from_position: int = 0,
batch_size: int = 100
):
"""Subscribe to all events from a position."""
# Get checkpoint
async with self.pool.acquire() as conn:
checkpoint = await conn.fetchval(
"""
SELECT last_position FROM subscription_checkpoints
WHERE subscription_id = $1
""",
subscription_id
)
position = checkpoint or from_position
while True:
events = await self.read_all(position, batch_size)
if not events:
await asyncio.sleep(1) # Poll interval
continue
for event in events:
await handler(event)
position = event.global_position
# Save checkpoint
async with self.pool.acquire() as conn:
await conn.execute(
"""
INSERT INTO subscription_checkpoints (subscription_id, last_position)
VALUES ($1, $2)
ON CONFLICT (subscription_id)
DO UPDATE SET last_position = $2, updated_at = NOW()
""",
subscription_id, position
)
def _row_to_event(self, row) -> Event:
return Event(
event_id=row['id'],
stream_id=row['stream_id'],
event_type=row['event_type'],
data=json.loads(row['event_data']),
metadata=json.loads(row['metadata']),
version=row['version'],
global_position=row['global_position'],
created_at=row['created_at']
)
class ConcurrencyError(Exception): """Raised when optimistic concurrency check fails.""" pass
Template 3: EventStoreDB Usage
from esdbclient import EventStoreDBClient, NewEvent, StreamState import json
Connect
client = EventStoreDBClient(uri="esdb://localhost:2113?tls=false")
Append events
def append_events(stream_name: str, events: list, expected_revision=None): new_events = [ NewEvent( type=event['type'], data=json.dumps(event['data']).encode(), metadata=json.dumps(event.get('metadata', {})).encode() ) for event in events ]
if expected_revision is None:
state = StreamState.ANY
elif expected_revision == -1:
state = StreamState.NO_STREAM
else:
state = expected_revision
return client.append_to_stream(
stream_name=stream_name,
events=new_events,
current_version=state
)
Read stream
def read_stream(stream_name: str, from_revision: int = 0): events = client.get_stream( stream_name=stream_name, stream_position=from_revision ) return [ { 'type': event.type, 'data': json.loads(event.data), 'metadata': json.loads(event.metadata) if event.metadata else {}, 'stream_position': event.stream_position, 'commit_position': event.commit_position } for event in events ]
Subscribe to all
async def subscribe_to_all(handler, from_position: int = 0): subscription = client.subscribe_to_all(commit_position=from_position) async for event in subscription: await handler({ 'type': event.type, 'data': json.loads(event.data), 'stream_id': event.stream_name, 'position': event.commit_position })
Category projection ($ce-Category)
def read_category(category: str): """Read all events for a category using system projection.""" return read_stream(f"$ce-{category}")
Template 4: DynamoDB Event Store
import boto3 from boto3.dynamodb.conditions import Key from datetime import datetime import json import uuid
class DynamoEventStore: def init(self, table_name: str): self.dynamodb = boto3.resource('dynamodb') self.table = self.dynamodb.Table(table_name)
def append_events(self, stream_id: str, events: list, expected_version: int = None):
"""Append events with conditional write for concurrency."""
with self.table.batch_writer() as batch:
for i, event in enumerate(events):
version = (expected_version or 0) + i + 1
item = {
'PK': f"STREAM#{stream_id}",
'SK': f"VERSION#{version:020d}",
'GSI1PK': 'EVENTS',
'GSI1SK': datetime.utcnow().isoformat(),
'event_id': str(uuid.uuid4()),
'stream_id': stream_id,
'event_type': event['type'],
'event_data': json.dumps(event['data']),
'version': version,
'created_at': datetime.utcnow().isoformat()
}
batch.put_item(Item=item)
return events
def read_stream(self, stream_id: str, from_version: int = 0):
"""Read events from a stream."""
response = self.table.query(
KeyConditionExpression=Key('PK').eq(f"STREAM#{stream_id}") &
Key('SK').gte(f"VERSION#{from_version:020d}")
)
return [
{
'event_type': item['event_type'],
'data': json.loads(item['event_data']),
'version': item['version']
}
for item in response['Items']
]
Table definition (CloudFormation/Terraform)
""" DynamoDB Table:
- PK (Partition Key): String
- SK (Sort Key): String
- GSI1PK, GSI1SK for global ordering
Capacity: On-demand or provisioned based on throughput needs """
Best Practices
Do's
-
Use stream IDs that include aggregate type - Order-{uuid}
-
Include correlation/causation IDs - For tracing
-
Version events from day one - Plan for schema evolution
-
Implement idempotency - Use event IDs for deduplication
-
Index appropriately - For your query patterns
Don'ts
-
Don't update or delete events - They're immutable facts
-
Don't store large payloads - Keep events small
-
Don't skip optimistic concurrency - Prevents data corruption
-
Don't ignore backpressure - Handle slow consumers
Resources
-
EventStoreDB
-
Marten Events
-
Event Sourcing Pattern