Outbox Pattern ()
Ensure atomic state changes and event publishing by writing both to a database transaction, then publishing asynchronously.
Overview
-
Ensuring database writes and event publishing are atomic
-
Building reliable event-driven microservices
-
Implementing exactly-once message delivery semantics
-
Avoiding dual-write problems (DB + message broker)
-
Decoupling domain logic from message infrastructure
-
High-throughput systems needing CDC-based publishing
Quick Reference
Outbox Table Schema
CREATE TABLE outbox ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), aggregate_type VARCHAR(100) NOT NULL, aggregate_id UUID NOT NULL, event_type VARCHAR(100) NOT NULL, payload JSONB NOT NULL, idempotency_key VARCHAR(255) UNIQUE, -- For consumer deduplication created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), published_at TIMESTAMPTZ, retry_count INT DEFAULT 0, last_error TEXT );
-- Index for polling unpublished messages CREATE INDEX idx_outbox_unpublished ON outbox(created_at) WHERE published_at IS NULL;
-- Index for aggregate ordering CREATE INDEX idx_outbox_aggregate ON outbox(aggregate_id, created_at);
-- Index for idempotency key lookups CREATE INDEX idx_outbox_idempotency ON outbox(idempotency_key) WHERE idempotency_key IS NOT NULL;
SQLAlchemy Model
from sqlalchemy.dialects.postgresql import UUID, JSONB import hashlib
class OutboxMessage(Base): tablename = "outbox"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
aggregate_type = Column(String(100), nullable=False)
aggregate_id = Column(UUID(as_uuid=True), nullable=False, index=True)
event_type = Column(String(100), nullable=False)
payload = Column(JSONB, nullable=False)
idempotency_key = Column(String(255), unique=True, nullable=True)
created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc))
published_at = Column(DateTime, nullable=True)
retry_count = Column(Integer, default=0)
last_error = Column(Text, nullable=True)
@staticmethod
def generate_idempotency_key(aggregate_id: str, event_type: str, payload: dict) -> str:
"""Generate deterministic idempotency key for deduplication."""
content = f"{aggregate_id}:{event_type}:{json.dumps(payload, sort_keys=True)}"
return hashlib.sha256(content.encode()).hexdigest()[:32]
Write to Outbox in Transaction
from sqlalchemy.ext.asyncio import AsyncSession
class OrderService: def init(self, db: AsyncSession): self.db = db
async def create_order(self, order_data: OrderCreate) -> Order:
"""Create order AND outbox message in single transaction."""
# Create the order
order = Order(**order_data.model_dump())
self.db.add(order)
# Create outbox message in SAME transaction
outbox_msg = OutboxMessage(
aggregate_type="Order",
aggregate_id=order.id,
event_type="OrderCreated",
payload={
"order_id": str(order.id),
"customer_id": str(order.customer_id),
"total": order.total,
},
idempotency_key=OutboxMessage.generate_idempotency_key(
str(order.id), "OrderCreated", {"total": order.total}
),
)
self.db.add(outbox_msg)
await self.db.flush() # Both written atomically
return order
Polling Publisher
class OutboxPublisher: """Polls outbox and publishes to message broker."""
def __init__(self, session_factory, producer):
self.session_factory = session_factory
self.producer = producer
async def publish_pending(self, batch_size: int = 100) -> int:
async with self.session_factory() as session:
stmt = (
select(OutboxMessage)
.where(OutboxMessage.published_at.is_(None))
.order_by(OutboxMessage.created_at)
.limit(batch_size)
.with_for_update(skip_locked=True) # Prevent duplicate processing
)
result = await session.execute(stmt)
messages = result.scalars().all()
published = 0
for msg in messages:
try:
await self.producer.publish(
topic=f"{msg.aggregate_type.lower()}-events",
key=str(msg.aggregate_id),
value={
"type": msg.event_type,
"idempotency_key": msg.idempotency_key,
**msg.payload
},
)
msg.published_at = datetime.now(timezone.utc)
published += 1
except Exception as e:
msg.retry_count += 1
msg.last_error = str(e)
await session.commit()
return published
CDC with Debezium (High-Throughput)
docker-compose.yml - Debezium connector
version: '3.8' services: debezium: image: debezium/connect:2.5 environment: BOOTSTRAP_SERVERS: kafka:9092 GROUP_ID: outbox-connector CONFIG_STORAGE_TOPIC: connect-configs OFFSET_STORAGE_TOPIC: connect-offsets
Register outbox connector
POST http://debezium:8083/connectors
{ "name": "outbox-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "postgres", "database.port": "5432", "database.user": "debezium", "database.password": "secret", "database.dbname": "app", "table.include.list": "public.outbox", "transforms": "outbox", "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter", "transforms.outbox.table.field.event.id": "id", "transforms.outbox.table.field.event.key": "aggregate_id", "transforms.outbox.table.field.event.payload": "payload", "transforms.outbox.route.topic.replacement": "${routedByValue}-events" } }
Idempotent Consumer
class IdempotentConsumer: """Consumer with deduplication using idempotency keys."""
def __init__(self, db: AsyncSession, redis: Redis):
self.db = db
self.redis = redis
async def process(self, event: dict, handler) -> bool:
"""Process event idempotently - returns False if duplicate."""
idempotency_key = event.get("idempotency_key")
if not idempotency_key:
# No key = always process (but risky)
await handler(event)
return True
# Check Redis cache first (fast path)
if await self.redis.exists(f"processed:{idempotency_key}"):
return False # Already processed
# Check database (slow path, but durable)
exists = await self.db.execute(
select(ProcessedEvent)
.where(ProcessedEvent.idempotency_key == idempotency_key)
)
if exists.scalar_one_or_none():
# Cache for future fast lookups
await self.redis.setex(f"processed:{idempotency_key}", 86400, "1")
return False
# Process and record
async with self.db.begin():
await handler(event)
self.db.add(ProcessedEvent(idempotency_key=idempotency_key))
await self.db.flush()
# Cache the processed key
await self.redis.setex(f"processed:{idempotency_key}", 86400, "1")
return True
class ProcessedEvent(Base): """Track processed events for idempotency.""" tablename = "processed_events"
idempotency_key = Column(String(255), primary_key=True)
processed_at = Column(DateTime, default=lambda: datetime.now(timezone.utc))
Dapr Outbox Integration
Using Dapr's built-in outbox support
from dapr.clients import DaprClient
async def create_order_with_dapr(order_data: dict): """Dapr handles outbox automatically.""" async with DaprClient() as client: # Single call - Dapr ensures atomicity await client.save_state( store_name="statestore", key=f"order-{order_data['id']}", value=order_data, state_metadata={ "outbox.publish": "true", "outbox.topic": "orders", } )
Key Decisions
Decision Option A Option B Recommendation
Delivery Polling CDC (Debezium) Polling for simplicity, CDC for > 10K msg/s
Batch size Small (10-50) Large (100-500) Start with 100, tune based on latency
Retry Fixed delay Exponential backoff Exponential with max 5 retries
Cleanup Delete published Archive to cold storage Archive for audit, delete after 30 days
Ordering Per-aggregate Global Per-aggregate via partition key
Idempotency Consumer-side Built-in key Always include idempotency key
Tool Custom Dapr Dapr if K8s, custom otherwise
Outbox vs CDC Trade-offs
┌────────────────────────────────────────────────────────────────────────┐ │ POLLING vs CDC COMPARISON │ ├────────────────────────────────────────────────────────────────────────┤ │ │ │ POLLING (OutboxPublisher) CDC (Debezium) │ │ ────────────────────────── ──────────────── │ │ ✓ Simple to implement ✓ Higher throughput (100K+ msg/s) │ │ ✓ No extra infrastructure ✓ Lower latency (sub-second) │ │ ✓ Easy to debug ✓ No polling overhead │ │ ✗ Polling overhead ✗ Complex infrastructure │ │ ✗ Higher latency (1-5s) ✗ Harder to debug │ │ ✗ Limited throughput (~10K/s) ✗ Requires Kafka Connect │ │ │ │ USE WHEN: USE WHEN: │ │ - Starting out - High throughput required │ │ - Simple architecture - Sub-second latency needed │ │ - < 10K events/second - Already using Kafka │ │ │ └────────────────────────────────────────────────────────────────────────┘
Anti-Patterns (FORBIDDEN)
NEVER publish before commit - dual-write problem
await producer.publish(event) # May succeed but commit may fail! await session.commit()
NEVER delete without publishing - events lost
await session.execute(delete(OutboxMessage).where(...))
NEVER process without locking - causes duplicates
messages = await session.execute(select(OutboxMessage)) # No lock!
NEVER store large payloads - use URL references instead
OutboxMessage(payload={"file": large_binary_data})
NEVER ignore ordering - use aggregate_id as partition key
await producer.publish(event_a) # May arrive out of order!
NEVER skip idempotency keys
OutboxMessage(payload=event_data) # No idempotency_key = duplicate risk
NEVER process without idempotency check on consumer
async def handle(event): await process(event) # Duplicate processing on retry!
Related Skills
-
message-queues
-
Kafka, RabbitMQ, Redis Streams integration
-
event-sourcing
-
Full event-sourced architecture patterns
-
database-schema-designer
-
Schema design and migrations
-
sqlalchemy-2-async
-
Async database session patterns
Capability Details
outbox-schema
Keywords: outbox table, transactional outbox, event table, schema, idempotency Solves:
-
How do I design an outbox table?
-
What indexes are needed for outbox?
-
Outbox table best practices
-
Idempotency key generation
polling-publisher
Keywords: polling, publish outbox, background worker, relay Solves:
-
How do I publish from the outbox?
-
Batch publishing patterns
-
Handling publish failures
-
FOR UPDATE SKIP LOCKED pattern
cdc-debezium
Keywords: cdc, debezium, change data capture, kafka connect Solves:
-
When to use CDC vs polling?
-
Debezium connector configuration
-
High-throughput event publishing
-
Outbox event routing
idempotent-consumer
Keywords: idempotent, deduplication, exactly-once, processed events Solves:
-
How to handle duplicate messages?
-
Idempotency key patterns
-
Consumer deduplication strategies
-
Redis + DB deduplication
atomic-operations
Keywords: atomic, transaction, dual-write, consistency Solves:
-
How to avoid dual-write problems?
-
Ensuring atomicity between DB and events
-
Exactly-once delivery patterns