LangGraph Architecture Decisions
When to Use LangGraph
Use LangGraph When You Need:
-
Stateful conversations - Multi-turn interactions with memory
-
Human-in-the-loop - Approval gates, corrections, interventions
-
Complex control flow - Loops, branches, conditional routing
-
Multi-agent coordination - Multiple LLMs working together
-
Persistence - Resume from checkpoints, time travel debugging
-
Streaming - Real-time token streaming, progress updates
-
Reliability - Retries, error recovery, durability guarantees
Consider Alternatives When:
Scenario Alternative Why
Single LLM call Direct API call Overhead not justified
Linear pipeline LangChain LCEL Simpler abstraction
Stateless tool use Function calling No persistence needed
Simple RAG LangChain retrievers Built-in patterns
Batch processing Async tasks Different execution model
State Schema Decisions
TypedDict vs Pydantic
TypedDict Pydantic
Lightweight, faster Runtime validation
Dict-like access Attribute access
No validation overhead Type coercion
Simpler serialization Complex nested models
Recommendation: Use TypedDict for most cases. Use Pydantic when you need validation or complex nested structures.
Reducer Selection
Use Case Reducer Example
Chat messages add_messages
Handles IDs, RemoveMessage
Simple append operator.add
Annotated[list, operator.add]
Keep latest None (LastValue) field: str
Custom merge Lambda Annotated[list, lambda a, b: ...]
Overwrite list Overwrite
Bypass reducer
State Size Considerations
SMALL STATE (< 1MB) - Put in state
class State(TypedDict): messages: Annotated[list, add_messages] context: str
LARGE DATA - Use Store
class State(TypedDict): messages: Annotated[list, add_messages] document_ref: str # Reference to store
def node(state, *, store: BaseStore): doc = store.get(namespace, state["document_ref"]) # Process without bloating checkpoints
Graph Structure Decisions
Single Graph vs Subgraphs
Single Graph when:
-
All nodes share the same state schema
-
Simple linear or branching flow
-
< 10 nodes
Subgraphs when:
-
Different state schemas needed
-
Reusable components across graphs
-
Team separation of concerns
-
Complex hierarchical workflows
Conditional Edges vs Command
Conditional Edges Command
Routing based on state Routing + state update
Separate router function Decision in node
Clearer visualization More flexible
Standard patterns Dynamic destinations
Conditional Edge - when routing is the focus
def router(state) -> Literal["a", "b"]: return "a" if condition else "b" builder.add_conditional_edges("node", router)
Command - when combining routing with updates
def node(state) -> Command: return Command(goto="next", update={"step": state["step"] + 1})
Static vs Dynamic Routing
Static Edges (add_edge ):
-
Fixed flow known at build time
-
Clearer graph visualization
-
Easier to reason about
Dynamic Routing (add_conditional_edges , Command , Send ):
-
Runtime decisions based on state
-
Agent-driven navigation
-
Fan-out patterns
Persistence Strategy
Checkpointer Selection
Checkpointer Use Case Characteristics
InMemorySaver
Testing only Lost on restart
SqliteSaver
Development Single file, local
PostgresSaver
Production Scalable, concurrent
Custom Special needs Implement BaseCheckpointSaver
Checkpointing Scope
Full persistence (default)
graph = builder.compile(checkpointer=checkpointer)
Subgraph options
subgraph = sub_builder.compile( checkpointer=None, # Inherit from parent checkpointer=True, # Independent checkpointing checkpointer=False, # No checkpointing (runs atomically) )
When to Disable Checkpointing
-
Short-lived subgraphs that should be atomic
-
Subgraphs with incompatible state schemas
-
Performance-critical paths without need for resume
Multi-Agent Architecture
Supervisor Pattern
Best for:
-
Clear hierarchy
-
Centralized decision making
-
Different agent specializations
┌─────────────┐ │ Supervisor │ └──────┬──────┘┌────────┬───┴───┬────────┐ ▼ ▼ ▼ ▼ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ │Agent1│ │Agent2│ │Agent3│ │Agent4│ └──────┘ └──────┘ └──────┘ └──────┘
Peer-to-Peer Pattern
Best for:
-
Collaborative agents
-
No clear hierarchy
-
Flexible communication
┌──────┐ ┌──────┐ │Agent1│◄───►│Agent2│ └──┬───┘ └───┬──┘ │ │ ▼ ▼ ┌──────┐ ┌──────┐ │Agent3│◄───►│Agent4│ └──────┘ └──────┘
Handoff Pattern
Best for:
-
Sequential specialization
-
Clear stage transitions
-
Different capabilities per stage
┌────────┐ ┌────────┐ ┌────────┐ │Research│───►│Planning│───►│Execute │ └────────┘ └────────┘ └────────┘
Streaming Strategy
Stream Mode Selection
Mode Use Case Data
updates
UI updates Node outputs only
values
State inspection Full state each step
messages
Chat UX LLM tokens
custom
Progress/logs Your data via StreamWriter
debug
Debugging Tasks + checkpoints
Subgraph Streaming
Stream from subgraphs
async for chunk in graph.astream( input, stream_mode="updates", subgraphs=True # Include subgraph events ): namespace, data = chunk # namespace indicates depth
Human-in-the-Loop Design
Interrupt Placement
Strategy Use Case
interrupt_before
Approval before action
interrupt_after
Review after completion
interrupt() in node Dynamic, contextual pauses
Resume Patterns
Simple resume (same thread)
graph.invoke(None, config)
Resume with value
graph.invoke(Command(resume="approved"), config)
Resume specific interrupt
graph.invoke(Command(resume={interrupt_id: value}), config)
Modify state and resume
graph.update_state(config, {"field": "new_value"}) graph.invoke(None, config)
Error Handling Strategy
Retry Configuration
Per-node retry
RetryPolicy( initial_interval=0.5, backoff_factor=2.0, max_interval=60.0, max_attempts=3, retry_on=lambda e: isinstance(e, (APIError, TimeoutError)) )
Multiple policies (first match wins)
builder.add_node("node", fn, retry_policy=[ RetryPolicy(retry_on=RateLimitError, max_attempts=5), RetryPolicy(retry_on=Exception, max_attempts=2), ])
Fallback Patterns
def node_with_fallback(state): try: return primary_operation(state) except PrimaryError: return fallback_operation(state)
Or use conditional edges for complex fallback routing
def route_on_error(state) -> Literal["retry", "fallback", "end"]: if state.get("error") and state["attempts"] < 3: return "retry" elif state.get("error"): return "fallback" return END
Scaling Considerations
Horizontal Scaling
-
Use PostgresSaver for shared state
-
Consider LangGraph Platform for managed infrastructure
-
Use stores for large data outside checkpoints
Performance Optimization
-
Minimize state size - Use references for large data
-
Parallel nodes - Fan out when possible
-
Cache expensive operations - Use CachePolicy
-
Async everywhere - Use ainvoke, astream
Resource Limits
Set recursion limit
config = {"recursion_limit": 50} graph.invoke(input, config)
Track remaining steps in state
class State(TypedDict): remaining_steps: RemainingSteps
def check_budget(state): if state["remaining_steps"] < 5: return "wrap_up" return "continue"
Decision Checklist
Before implementing:
-
Is LangGraph the right tool? (vs simpler alternatives)
-
State schema defined with appropriate reducers?
-
Persistence strategy chosen? (dev vs prod checkpointer)
-
Streaming needs identified?
-
Human-in-the-loop points defined?
-
Error handling and retry strategy?
-
Multi-agent coordination pattern? (if applicable)
-
Resource limits configured?