Saga Patterns for Distributed Transactions
Maintain consistency across microservices without distributed locks.
Overview
-
Multi-service business transactions (order -> payment -> inventory -> shipping)
-
Operations that must eventually succeed or roll back completely
-
Long-running business processes (minutes to days)
-
Microservices avoiding 2PC (two-phase commit)
When NOT to Use
-
Single database operations (use transactions)
-
Real-time consistency requirements (use synchronous calls)
-
When eventual consistency is unacceptable
Orchestration vs Choreography
Aspect Orchestration Choreography
Control Central orchestrator Distributed events
Coupling Services depend on orchestrator Loosely coupled
Visibility Single point of observation Requires distributed tracing
Best for Complex, ordered workflows Simple, parallel flows
Orchestration Pattern
from enum import Enum from dataclasses import dataclass, field from typing import Callable, Any from datetime import datetime, timezone
class SagaStatus(Enum): PENDING = "pending" RUNNING = "running" COMPLETED = "completed" COMPENSATING = "compensating" COMPENSATED = "compensated" FAILED = "failed"
@dataclass class SagaStep: name: str action: Callable compensation: Callable status: SagaStatus = SagaStatus.PENDING result: Any = None
@dataclass class SagaContext: saga_id: str data: dict = field(default_factory=dict) steps: list[SagaStep] = field(default_factory=list) status: SagaStatus = SagaStatus.PENDING current_step: int = 0
class SagaOrchestrator: def init(self, saga_repository, event_publisher): self.repo = saga_repository self.publisher = event_publisher
async def execute(self, saga: SagaContext) -> SagaContext:
saga.status = SagaStatus.RUNNING
await self.repo.save(saga)
for i, step in enumerate(saga.steps):
saga.current_step = i
try:
step.result = await step.action(saga.data)
saga.data.update(step.result or {})
step.status = SagaStatus.COMPLETED
except Exception:
step.status = SagaStatus.FAILED
await self._compensate(saga, i)
return saga
saga.status = SagaStatus.COMPLETED
await self.repo.save(saga)
return saga
async def _compensate(self, saga: SagaContext, failed_step: int):
saga.status = SagaStatus.COMPENSATING
for i in range(failed_step - 1, -1, -1):
step = saga.steps[i]
if step.status == SagaStatus.COMPLETED:
try:
await step.compensation(saga.data)
step.status = SagaStatus.COMPENSATED
except Exception as e:
step.error = f"Compensation failed: {e}"
saga.status = SagaStatus.COMPENSATED
await self.repo.save(saga)
Order Saga Example
class OrderSaga: def init(self, payment_service, inventory_service, shipping_service): self.payment = payment_service self.inventory = inventory_service self.shipping = shipping_service
def create_saga(self, order: Order) -> SagaContext:
return SagaContext(
saga_id=f"order-{order.id}",
data={"order": order.dict()},
steps=[
SagaStep("reserve_inventory", self._reserve_inventory, self._release_inventory),
SagaStep("process_payment", self._process_payment, self._refund_payment),
SagaStep("create_shipment", self._create_shipment, self._cancel_shipment),
],
)
async def _reserve_inventory(self, data: dict) -> dict:
reservation = await self.inventory.reserve(items=data["order"]["items"])
return {"reservation_id": reservation.id}
async def _release_inventory(self, data: dict):
await self.inventory.release(data["reservation_id"])
async def _process_payment(self, data: dict) -> dict:
payment = await self.payment.charge(amount=data["order"]["total"])
return {"payment_id": payment.id}
async def _refund_payment(self, data: dict):
await self.payment.refund(data["payment_id"])
async def _create_shipment(self, data: dict) -> dict:
shipment = await self.shipping.create(order_id=data["order"]["id"])
return {"shipment_id": shipment.id}
async def _cancel_shipment(self, data: dict):
if "shipment_id" in data:
await self.shipping.cancel(data["shipment_id"])
Choreography Pattern
class OrderChoreography: """Event handlers for order saga choreography."""
def __init__(self, event_bus, order_repo):
self.bus = event_bus
self.repo = order_repo
async def handle_order_created(self, event):
await self.bus.publish("inventory.reserve.requested", {
"saga_id": event.saga_id,
"items": event.payload["order"]["items"],
})
async def handle_inventory_reserved(self, event):
await self.bus.publish("payment.charge.requested", {
"saga_id": event.saga_id,
"amount": event.payload["amount"],
})
async def handle_payment_failed(self, event):
# Compensation: release inventory
await self.bus.publish("inventory.release.requested", {
"saga_id": event.saga_id,
"reservation_id": event.payload["reservation_id"],
})
async def handle_shipment_created(self, event):
order = await self.repo.get(event.payload["order_id"])
order.status = "shipped"
await self.repo.save(order)
Timeout and Recovery
from datetime import timedelta import asyncio
class SagaRecovery: def init(self, saga_repo, orchestrator): self.repo = saga_repo self.orchestrator = orchestrator
async def recover_stuck_sagas(self, timeout: timedelta = timedelta(hours=1)):
cutoff = datetime.now(timezone.utc) - timeout
stuck_sagas = await self.repo.find_by_status_and_age(SagaStatus.RUNNING, cutoff)
for saga in stuck_sagas:
try:
await self.orchestrator.resume(saga)
except Exception:
await self.orchestrator._compensate(saga, saga.current_step)
async def retry_failed_step(self, saga_id: str, max_retries: int = 3):
saga = await self.repo.get(saga_id)
failed_step = saga.steps[saga.current_step]
for attempt in range(max_retries):
try:
failed_step.result = await failed_step.action(saga.data)
failed_step.status = SagaStatus.COMPLETED
await self.orchestrator.resume(saga, from_step=saga.current_step + 1)
return
except Exception:
await asyncio.sleep(2 ** attempt)
await self.orchestrator._compensate(saga, saga.current_step)
Idempotency
class IdempotentSagaStep: def init(self, step_name: str, idempotency_store): self.step_name = step_name self.store = idempotency_store
async def execute(self, saga_id: str, action: Callable, *args, **kwargs):
idempotency_key = f"{saga_id}:{self.step_name}"
existing = await self.store.get(idempotency_key)
if existing:
return existing["result"]
result = await action(*args, **kwargs)
await self.store.set(idempotency_key, {"result": result}, ttl=timedelta(days=7))
return result
Key Decisions
Decision Recommendation
Pattern choice Orchestration for complex flows, Choreography for simple
State storage Persistent store (PostgreSQL) for saga state
Idempotency Required for all saga steps
Timeouts Per-step timeouts with recovery
Compensation Always implement, test thoroughly
Observability Trace saga ID across all services
Anti-Patterns (FORBIDDEN)
NEVER skip compensation logic
async def _process_payment(self, data: dict): return await self.payment.charge(data) # Missing compensation!
NEVER rely on synchronous calls across services
async def _reserve_and_pay(self, data: dict): await self.inventory.reserve(data) await self.payment.charge(data) # If fails, inventory stuck!
NEVER ignore idempotency
async def _create_order(self, data: dict): return await self.db.insert(Order(**data)) # Duplicate on retry!
NEVER use in-memory saga state
sagas = {} # Lost on restart!
ALWAYS persist saga state and test compensation paths
Related Skills
-
temporal-io
-
Durable workflow execution
-
event-sourcing
-
Event-driven state management
-
idempotency-patterns
-
Idempotent operations