LangGraph Workflows
Summary
LangGraph is a framework for building stateful, multi-agent applications with LLMs. It implements state machines and directed graphs for orchestration, enabling complex workflows with persistent state management, human-in-the-loop support, and time-travel debugging.
Key Innovation: Transforms agent coordination from sequential chains into cyclic graphs with persistent state, conditional branching, and production-grade debugging capabilities.
When to Use
✅ Use LangGraph When:
-
Multi-agent coordination required
-
Complex state management needs
-
Human-in-the-loop workflows (approval gates, reviews)
-
Need debugging/observability (time-travel, replay)
-
Conditional branching based on outputs
-
Building production agent systems
-
State persistence across sessions
❌ Don't Use LangGraph When:
-
Simple single-agent tasks
-
No state persistence needed
-
Prototyping/experimentation phase (use simple chains)
-
Team lacks graph/state machine expertise
-
Stateless request-response patterns
Quick Start
from langgraph.graph import StateGraph, END from typing import TypedDict, Annotated import operator
1. Define state schema
class AgentState(TypedDict): messages: Annotated[list, operator.add] current_step: str
2. Create graph
workflow = StateGraph(AgentState)
3. Add nodes (agents)
def researcher(state): return {"messages": ["Research complete"], "current_step": "research"}
def writer(state): return {"messages": ["Article written"], "current_step": "writing"}
workflow.add_node("researcher", researcher) workflow.add_node("writer", writer)
4. Add edges (transitions)
workflow.add_edge("researcher", "writer") workflow.add_edge("writer", END)
5. Set entry point and compile
workflow.set_entry_point("researcher") app = workflow.compile()
6. Execute
result = app.invoke({"messages": [], "current_step": "start"}) print(result)
Core Concepts
StateGraph
The fundamental building block representing a directed graph of agents with shared state.
from langgraph.graph import StateGraph, END from typing import TypedDict
class AgentState(TypedDict): """State schema shared across all nodes.""" messages: list user_input: str final_output: str metadata: dict
Create graph with state schema
workflow = StateGraph(AgentState)
Key Properties:
-
Nodes: Agent functions that transform state
-
Edges: Transitions between nodes (static or conditional)
-
State: Shared data structure passed between nodes
-
Entry Point: Starting node of execution
-
END: Terminal node signaling completion
Nodes
Nodes are functions that receive current state and return state updates.
def research_agent(state: AgentState) -> dict: """Node function: receives state, returns updates.""" query = state["user_input"]
# Perform research (simplified)
results = search_web(query)
# Return state updates (partial state)
return {
"messages": state["messages"] + [f"Research: {results}"],
"metadata": {"research_complete": True}
}
Add node to graph
workflow.add_node("researcher", research_agent)
Node Behavior:
-
Receives full state as input
-
Returns partial state (only fields to update)
-
Can be sync or async functions
-
Can invoke LLMs, call APIs, run computations
Edges
Edges define transitions between nodes.
Static Edges
Direct transition: researcher → writer
workflow.add_edge("researcher", "writer")
Transition to END
workflow.add_edge("writer", END)
Conditional Edges
def should_continue(state: AgentState) -> str: """Routing function: decides next node based on state.""" last_message = state["messages"][-1]
if "APPROVED" in last_message:
return END
elif "NEEDS_REVISION" in last_message:
return "writer"
else:
return "reviewer"
workflow.add_conditional_edges( "reviewer", # Source node should_continue, # Routing function { END: END, "writer": "writer", "reviewer": "reviewer" } )
Graph Construction
Complete Workflow Example
from langgraph.graph import StateGraph, END from typing import TypedDict, Annotated import operator from langchain_anthropic import ChatAnthropic
State schema with reducer
class ResearchState(TypedDict): topic: str research_notes: Annotated[list, operator.add] # Reducer: appends to list draft: str revision_count: int approved: bool
Initialize LLM
llm = ChatAnthropic(model="claude-sonnet-4")
Node 1: Research
def research_node(state: ResearchState) -> dict: """Research the topic and gather information.""" topic = state["topic"]
prompt = f"Research key points about: {topic}"
response = llm.invoke(prompt)
return {
"research_notes": [response.content]
}
Node 2: Write
def write_node(state: ResearchState) -> dict: """Write draft based on research.""" notes = "\n".join(state["research_notes"])
prompt = f"Write article based on:\n{notes}"
response = llm.invoke(prompt)
return {
"draft": response.content,
"revision_count": state.get("revision_count", 0)
}
Node 3: Review
def review_node(state: ResearchState) -> dict: """Review the draft and decide if approved.""" draft = state["draft"]
prompt = f"Review this draft. Reply APPROVED or NEEDS_REVISION:\n{draft}"
response = llm.invoke(prompt)
approved = "APPROVED" in response.content
return {
"approved": approved,
"revision_count": state["revision_count"] + 1,
"research_notes": [f"Review feedback: {response.content}"]
}
Routing logic
def should_continue_writing(state: ResearchState) -> str: """Decide next step after review.""" if state["approved"]: return END elif state["revision_count"] >= 3: return END # Max revisions reached else: return "writer"
Build graph
workflow = StateGraph(ResearchState)
workflow.add_node("researcher", research_node) workflow.add_node("writer", write_node) workflow.add_node("reviewer", review_node)
Static edges
workflow.add_edge("researcher", "writer") workflow.add_edge("writer", "reviewer")
Conditional edge from reviewer
workflow.add_conditional_edges( "reviewer", should_continue_writing, { END: END, "writer": "writer" } )
workflow.set_entry_point("researcher")
Compile
app = workflow.compile()
Execute
result = app.invoke({ "topic": "AI Safety", "research_notes": [], "draft": "", "revision_count": 0, "approved": False })
print(f"Final draft: {result['draft']}") print(f"Revisions: {result['revision_count']}")
State Management
State Schema with TypedDict
from typing import TypedDict, Annotated, Literal import operator
class WorkflowState(TypedDict): # Simple fields (replaced on update) user_id: str request_id: str status: Literal["pending", "processing", "complete", "failed"]
# List with reducer (appends instead of replacing)
messages: Annotated[list, operator.add]
# Dict with custom reducer
metadata: Annotated[dict, lambda x, y: {**x, **y}]
# Optional fields
error: str | None
result: dict | None
State Reducers
Reducers control how state updates are merged.
import operator from typing import Annotated
Built-in reducers
Annotated[list, operator.add] # Append to list Annotated[set, operator.or_] # Union of sets Annotated[int, operator.add] # Sum integers
Custom reducer
def merge_dicts(existing: dict, update: dict) -> dict: """Deep merge dictionaries.""" result = existing.copy() for key, value in update.items(): if key in result and isinstance(result[key], dict) and isinstance(value, dict): result[key] = merge_dicts(result[key], value) else: result[key] = value return result
class State(TypedDict): config: Annotated[dict, merge_dicts]
State Updates
Nodes return partial state - only fields to update:
def node_function(state: State) -> dict: """Return only fields that should be updated.""" return { "messages": ["New message"], # Will be appended "status": "processing" # Will be replaced } # Other fields remain unchanged
Conditional Routing
Basic Routing Function
def route_based_on_score(state: State) -> str: """Route to different nodes based on state.""" score = state["quality_score"]
if score >= 0.9:
return "publish"
elif score >= 0.6:
return "review"
else:
return "revise"
workflow.add_conditional_edges( "evaluator", route_based_on_score, { "publish": "publisher", "review": "reviewer", "revise": "reviser" } )
Dynamic Routing with LLM
from langchain_anthropic import ChatAnthropic
def llm_router(state: State) -> str: """Use LLM to decide next step.""" llm = ChatAnthropic(model="claude-sonnet-4")
prompt = f"""
Current state: {state['current_step']}
User request: {state['user_input']}
Which agent should handle this next?
Options: researcher, coder, writer, FINISH
Return only one word.
"""
response = llm.invoke(prompt)
next_agent = response.content.strip().lower()
if next_agent == "finish":
return END
else:
return next_agent
workflow.add_conditional_edges( "supervisor", llm_router, { "researcher": "researcher", "coder": "coder", "writer": "writer", END: END } )
Multi-Condition Routing
def complex_router(state: State) -> str: """Route based on multiple conditions.""" # Check multiple conditions has_errors = bool(state.get("errors")) is_approved = state.get("approved", False) iteration_count = state.get("iterations", 0)
# Priority-based routing
if has_errors:
return "error_handler"
elif is_approved:
return END
elif iteration_count >= 5:
return "escalation"
else:
return "processor"
workflow.add_conditional_edges( "validator", complex_router, { "error_handler": "error_handler", "processor": "processor", "escalation": "escalation", END: END } )
Multi-Agent Patterns
Supervisor Pattern
One supervisor coordinates multiple specialized agents.
from langgraph.graph import StateGraph, END from langchain_anthropic import ChatAnthropic from typing import TypedDict, Literal
class SupervisorState(TypedDict): task: str agent_history: list result: dict next_agent: str
Specialized agents
class ResearchAgent: def init(self): self.llm = ChatAnthropic(model="claude-sonnet-4")
def run(self, state: SupervisorState) -> dict:
response = self.llm.invoke(f"Research: {state['task']}")
return {
"agent_history": [f"Research: {response.content}"],
"result": {"research": response.content}
}
class CodingAgent: def init(self): self.llm = ChatAnthropic(model="claude-sonnet-4")
def run(self, state: SupervisorState) -> dict:
response = self.llm.invoke(f"Code: {state['task']}")
return {
"agent_history": [f"Code: {response.content}"],
"result": {"code": response.content}
}
class SupervisorAgent: def init(self): self.llm = ChatAnthropic(model="claude-sonnet-4")
def route(self, state: SupervisorState) -> dict:
"""Decide which agent to use next."""
history = "\n".join(state.get("agent_history", []))
prompt = f"""
Task: {state['task']}
Progress: {history}
Which agent should handle the next step?
Options: researcher, coder, FINISH
Return only one word.
"""
response = self.llm.invoke(prompt)
next_agent = response.content.strip().lower()
return {"next_agent": next_agent}
Build supervisor workflow
def create_supervisor_workflow(): workflow = StateGraph(SupervisorState)
# Initialize agents
research_agent = ResearchAgent()
coding_agent = CodingAgent()
supervisor = SupervisorAgent()
# Add nodes
workflow.add_node("supervisor", supervisor.route)
workflow.add_node("researcher", research_agent.run)
workflow.add_node("coder", coding_agent.run)
# Conditional routing from supervisor
def route_from_supervisor(state: SupervisorState) -> str:
next_agent = state.get("next_agent", "FINISH")
if next_agent == "finish":
return END
return next_agent
workflow.add_conditional_edges(
"supervisor",
route_from_supervisor,
{
"researcher": "researcher",
"coder": "coder",
END: END
}
)
# Loop back to supervisor after each agent
workflow.add_edge("researcher", "supervisor")
workflow.add_edge("coder", "supervisor")
workflow.set_entry_point("supervisor")
return workflow.compile()
Execute
app = create_supervisor_workflow() result = app.invoke({ "task": "Build a REST API for user management", "agent_history": [], "result": {}, "next_agent": "" })
Hierarchical Multi-Agent
Nested supervisor pattern with sub-teams.
class TeamState(TypedDict): task: str team_results: dict
def create_backend_team(): """Sub-graph for backend development.""" workflow = StateGraph(TeamState)
workflow.add_node("api_designer", design_api)
workflow.add_node("database_designer", design_db)
workflow.add_node("implementer", implement_backend)
workflow.add_edge("api_designer", "database_designer")
workflow.add_edge("database_designer", "implementer")
workflow.add_edge("implementer", END)
workflow.set_entry_point("api_designer")
return workflow.compile()
def create_frontend_team(): """Sub-graph for frontend development.""" workflow = StateGraph(TeamState)
workflow.add_node("ui_designer", design_ui)
workflow.add_node("component_builder", build_components)
workflow.add_node("integrator", integrate_frontend)
workflow.add_edge("ui_designer", "component_builder")
workflow.add_edge("component_builder", "integrator")
workflow.add_edge("integrator", END)
workflow.set_entry_point("ui_designer")
return workflow.compile()
Top-level coordinator
def create_project_workflow(): workflow = StateGraph(TeamState)
# Add team sub-graphs as nodes
backend_team = create_backend_team()
frontend_team = create_frontend_team()
workflow.add_node("backend_team", backend_team)
workflow.add_node("frontend_team", frontend_team)
workflow.add_node("integrator", integrate_teams)
# Parallel execution of teams
workflow.add_edge("backend_team", "integrator")
workflow.add_edge("frontend_team", "integrator")
workflow.add_edge("integrator", END)
workflow.set_entry_point("backend_team")
return workflow.compile()
Swarm Pattern (2025)
Dynamic agent hand-offs with collaborative decision-making.
from typing import Literal
class SwarmState(TypedDict): task: str messages: list current_agent: str handoff_reason: str
def create_swarm(): """Agents can dynamically hand off to each other."""
def research_agent(state: SwarmState) -> dict:
llm = ChatAnthropic(model="claude-sonnet-4")
# Perform research
response = llm.invoke(f"Research: {state['task']}")
# Decide if handoff needed
needs_code = "implementation" in response.content.lower()
if needs_code:
return {
"messages": [response.content],
"current_agent": "coder",
"handoff_reason": "Need implementation"
}
else:
return {
"messages": [response.content],
"current_agent": "writer",
"handoff_reason": "Ready for documentation"
}
def coding_agent(state: SwarmState) -> dict:
llm = ChatAnthropic(model="claude-sonnet-4")
response = llm.invoke(f"Implement: {state['task']}")
return {
"messages": [response.content],
"current_agent": "tester",
"handoff_reason": "Code complete, needs testing"
}
def tester_agent(state: SwarmState) -> dict:
# Test the code
tests_pass = True # Simplified
if tests_pass:
return {
"messages": ["Tests passed"],
"current_agent": "DONE",
"handoff_reason": "All tests passing"
}
else:
return {
"messages": ["Tests failed"],
"current_agent": "coder",
"handoff_reason": "Fix failing tests"
}
# Build graph
workflow = StateGraph(SwarmState)
workflow.add_node("researcher", research_agent)
workflow.add_node("coder", coding_agent)
workflow.add_node("tester", tester_agent)
# Dynamic routing based on current_agent
def route_swarm(state: SwarmState) -> str:
next_agent = state.get("current_agent", "DONE")
if next_agent == "DONE":
return END
return next_agent
workflow.add_conditional_edges(
"researcher",
route_swarm,
{"coder": "coder", "writer": "writer"}
)
workflow.add_conditional_edges(
"coder",
route_swarm,
{"tester": "tester"}
)
workflow.add_conditional_edges(
"tester",
route_swarm,
{"coder": "coder", END: END}
)
workflow.set_entry_point("researcher")
return workflow.compile()
Human-in-the-Loop
Interrupt for Approval
from langgraph.checkpoint.sqlite import SqliteSaver from langgraph.graph import StateGraph, END
Enable checkpointing for interrupts
memory = SqliteSaver.from_conn_string(":memory:")
def create_approval_workflow(): workflow = StateGraph(dict)
workflow.add_node("draft", create_draft)
workflow.add_node("approve", human_approval) # Interrupt point
workflow.add_node("publish", publish_content)
workflow.add_edge("draft", "approve")
workflow.add_edge("approve", "publish")
workflow.add_edge("publish", END)
workflow.set_entry_point("draft")
# Compile with interrupt before "approve"
return workflow.compile(
checkpointer=memory,
interrupt_before=["approve"] # Pause here
)
Usage
app = create_approval_workflow() config = {"configurable": {"thread_id": "1"}}
Step 1: Run until interrupt
for event in app.stream({"content": "Draft article..."}, config): print(event) # Workflow pauses at "approve" node
Human reviews draft
print("Draft ready for review. Approve? (y/n)") approval = input()
Step 2: Resume with approval decision
if approval == "y": # Continue execution result = app.invoke(None, config) # Resume from checkpoint print("Published:", result) else: # Optionally update state and retry app.update_state(config, {"needs_revision": True}) result = app.invoke(None, config)
Interactive Approval Gates
class ApprovalState(TypedDict): draft: str approved: bool feedback: str
def approval_node(state: ApprovalState) -> dict: """This node requires human interaction.""" # This is where workflow pauses # Human provides input through update_state return state # No automatic changes
def create_interactive_workflow(): workflow = StateGraph(ApprovalState)
workflow.add_node("writer", write_draft)
workflow.add_node("approval_gate", approval_node)
workflow.add_node("reviser", revise_draft)
workflow.add_node("publisher", publish_final)
workflow.add_edge("writer", "approval_gate")
# Route based on approval
def route_after_approval(state: ApprovalState) -> str:
if state.get("approved"):
return "publisher"
else:
return "reviser"
workflow.add_conditional_edges(
"approval_gate",
route_after_approval,
{"publisher": "publisher", "reviser": "reviser"}
)
workflow.add_edge("reviser", "approval_gate") # Loop back
workflow.add_edge("publisher", END)
workflow.set_entry_point("writer")
memory = SqliteSaver.from_conn_string("approvals.db")
return workflow.compile(
checkpointer=memory,
interrupt_before=["approval_gate"]
)
Execute with human intervention
app = create_interactive_workflow() config = {"configurable": {"thread_id": "article_123"}}
Step 1: Generate draft (pauses at approval_gate)
for event in app.stream({"draft": "", "approved": False, "feedback": ""}, config): print(event)
Step 2: Human reviews and provides feedback
current_state = app.get_state(config) print(f"Review this draft: {current_state.values['draft']}")
Human decides
app.update_state(config, { "approved": False, "feedback": "Add more examples in section 2" })
Step 3: Resume (goes to reviser due to approved=False)
result = app.invoke(None, config)
Checkpointing and Persistence
MemorySaver (In-Memory)
from langgraph.checkpoint.memory import MemorySaver
In-memory checkpointing (lost on restart)
memory = MemorySaver()
app = workflow.compile(checkpointer=memory)
config = {"configurable": {"thread_id": "conversation_1"}}
State automatically saved at each step
result = app.invoke(initial_state, config)
SQLite Persistence
from langgraph.checkpoint.sqlite import SqliteSaver
Persistent checkpointing with SQLite
checkpointer = SqliteSaver.from_conn_string("./workflow_state.db")
app = workflow.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "user_session_456"}}
State persists across restarts
result = app.invoke(initial_state, config)
Later: Resume from same thread
result2 = app.invoke(None, config) # Continues from last state
PostgreSQL for Production
from langgraph.checkpoint.postgres import PostgresSaver
Production-grade persistence
checkpointer = PostgresSaver.from_conn_string( "postgresql://user:pass@localhost/langgraph_db" )
app = workflow.compile(checkpointer=checkpointer)
Supports concurrent workflows with isolation
config1 = {"configurable": {"thread_id": "user_1"}} config2 = {"configurable": {"thread_id": "user_2"}}
Each thread maintains independent state
result1 = app.invoke(state1, config1) result2 = app.invoke(state2, config2)
Redis for Distributed Systems
from langgraph.checkpoint.redis import RedisSaver
Distributed checkpointing
checkpointer = RedisSaver.from_conn_string( "redis://localhost:6379", ttl=3600 # 1 hour TTL )
app = workflow.compile(checkpointer=checkpointer)
Supports horizontal scaling
Multiple workers can process different threads
Time-Travel Debugging
View State History
from langgraph.checkpoint.sqlite import SqliteSaver
checkpointer = SqliteSaver.from_conn_string("workflow.db") app = workflow.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "debug_session"}}
Run workflow
result = app.invoke(initial_state, config)
Retrieve full history
history = app.get_state_history(config)
for i, state_snapshot in enumerate(history): print(f"Step {i}:") print(f" Node: {state_snapshot.next}") print(f" State: {state_snapshot.values}") print(f" Metadata: {state_snapshot.metadata}") print()
Replay from Checkpoint
Get state at specific step
history = list(app.get_state_history(config)) checkpoint_3 = history[3] # Get state at step 3
Replay from that checkpoint
config_replay = { "configurable": { "thread_id": "debug_session", "checkpoint_id": checkpoint_3.config["configurable"]["checkpoint_id"] } }
Resume from step 3
result = app.invoke(None, config_replay)
Rewind and Modify
Rewind to step 5
history = list(app.get_state_history(config)) step_5 = history[5]
Update state at that point
app.update_state( config, {"messages": ["Modified message"]}, as_node="researcher" # Apply update as if from this node )
Continue execution with modified state
result = app.invoke(None, config)
Debug Workflow
def debug_workflow(): """Debug workflow step-by-step.""" checkpointer = SqliteSaver.from_conn_string("debug.db") app = workflow.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "debug"}}
# Execute step-by-step
state = initial_state
for step in range(10): # Max 10 steps
print(f"\n=== Step {step} ===")
# Get current state
current = app.get_state(config)
print(f"Current state: {current.values}")
print(f"Next node: {current.next}")
if not current.next:
print("Workflow complete")
break
# Execute one step
for event in app.stream(None, config):
print(f"Event: {event}")
# Pause for inspection
input("Press Enter to continue...")
# View full history
print("\n=== Full History ===")
for i, snapshot in enumerate(app.get_state_history(config)):
print(f"Step {i}: {snapshot.next} -> {snapshot.values}")
debug_workflow()
Streaming
Token Streaming
from langchain_anthropic import ChatAnthropic
def streaming_node(state: dict) -> dict: """Node that streams LLM output.""" llm = ChatAnthropic(model="claude-sonnet-4")
# Use streaming
response = ""
for chunk in llm.stream(state["prompt"]):
content = chunk.content
print(content, end="", flush=True)
response += content
return {"response": response}
Stream events from workflow
for event in app.stream(initial_state): print(event)
Event Streaming
Stream all events (node execution, state updates)
for event in app.stream(initial_state, stream_mode="updates"): node_name = event.keys()[0] state_update = event[node_name] print(f"Node '{node_name}' updated state: {state_update}")
Custom Streaming
from typing import AsyncGenerator
async def streaming_workflow(input_data: dict) -> AsyncGenerator: """Stream workflow progress in real-time.""" config = {"configurable": {"thread_id": "stream"}}
async for event in app.astream(input_data, config):
# Stream each state update
yield {
"type": "state_update",
"data": event
}
# Stream final result
final_state = app.get_state(config)
yield {
"type": "complete",
"data": final_state.values
}
Usage in FastAPI
from fastapi import FastAPI from fastapi.responses import StreamingResponse
app_api = FastAPI()
@app_api.post("/workflow/stream") async def stream_workflow(input: dict): return StreamingResponse( streaming_workflow(input), media_type="text/event-stream" )
Tool Integration
LangChain Tools
from langchain.tools import Tool from langchain_community.tools import DuckDuckGoSearchRun from langchain.agents import AgentExecutor, create_react_agent
Define tools
search = DuckDuckGoSearchRun()
tools = [ Tool( name="Search", func=search.run, description="Search the web for information" ) ]
def agent_with_tools(state: dict) -> dict: """Node that uses LangChain tools.""" from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(model="claude-sonnet-4")
# Create agent with tools
agent = create_react_agent(llm, tools, prompt_template)
agent_executor = AgentExecutor(agent=agent, tools=tools)
# Execute with tools
result = agent_executor.invoke({"input": state["query"]})
return {"result": result["output"]}
Add to graph
workflow.add_node("agent_with_tools", agent_with_tools)
Custom Tools
from langchain.tools import StructuredTool from pydantic import BaseModel, Field
class CalculatorInput(BaseModel): expression: str = Field(description="Mathematical expression to evaluate")
def calculate(expression: str) -> str: """Evaluate mathematical expression.""" try: result = eval(expression) # Simplified return f"Result: {result}" except Exception as e: return f"Error: {e}"
calculator_tool = StructuredTool.from_function( func=calculate, name="Calculator", description="Evaluate mathematical expressions", args_schema=CalculatorInput )
tools = [calculator_tool]
def tool_using_node(state: dict) -> dict: """Use custom tools.""" result = calculator_tool.invoke({"expression": state["math_problem"]}) return {"solution": result}
Anthropic Tool Use
from anthropic import Anthropic import json
def node_with_anthropic_tools(state: dict) -> dict: """Use Anthropic's tool use feature.""" client = Anthropic()
# Define tools
tools = [
{
"name": "get_weather",
"description": "Get weather for a location",
"input_schema": {
"type": "object",
"properties": {
"location": {"type": "string"}
},
"required": ["location"]
}
}
]
# First call: Request tool use
response = client.messages.create(
model="claude-sonnet-4",
max_tokens=1024,
tools=tools,
messages=[{"role": "user", "content": state["query"]}]
)
# Execute tool if requested
if response.stop_reason == "tool_use":
tool_use = next(
block for block in response.content
if block.type == "tool_use"
)
# Execute tool
tool_result = execute_tool(tool_use.name, tool_use.input)
# Second call: Provide tool result
final_response = client.messages.create(
model="claude-sonnet-4",
max_tokens=1024,
tools=tools,
messages=[
{"role": "user", "content": state["query"]},
{"role": "assistant", "content": response.content},
{
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": tool_use.id,
"content": str(tool_result)
}
]
}
]
)
return {"response": final_response.content[0].text}
return {"response": response.content[0].text}
Error Handling
Try-Catch in Nodes
def robust_node(state: dict) -> dict: """Node with error handling.""" try: # Main logic result = risky_operation(state["input"])
return {
"result": result,
"error": None,
"status": "success"
}
except ValueError as e:
# Handle specific error
return {
"error": f"Validation error: {e}",
"status": "validation_failed"
}
except Exception as e:
# Handle unexpected errors
return {
"error": f"Unexpected error: {e}",
"status": "failed"
}
Route based on error status
def error_router(state: dict) -> str: status = state.get("status")
if status == "success":
return "next_step"
elif status == "validation_failed":
return "validator"
else:
return "error_handler"
workflow.add_conditional_edges( "robust_node", error_router, { "next_step": "next_step", "validator": "validator", "error_handler": "error_handler" } )
Retry Logic
from typing import TypedDict
class RetryState(TypedDict): input: str result: str error: str | None retry_count: int max_retries: int
def node_with_retry(state: RetryState) -> dict: """Node that can be retried on failure.""" try: result = unstable_operation(state["input"])
return {
"result": result,
"error": None,
"retry_count": 0
}
except Exception as e:
return {
"error": str(e),
"retry_count": state.get("retry_count", 0) + 1
}
def should_retry(state: RetryState) -> str: """Decide whether to retry or fail.""" if state.get("error") is None: return "success"
retry_count = state.get("retry_count", 0)
max_retries = state.get("max_retries", 3)
if retry_count < max_retries:
return "retry"
else:
return "failed"
Build retry loop
workflow.add_node("operation", node_with_retry) workflow.add_node("success_handler", handle_success) workflow.add_node("failure_handler", handle_failure)
workflow.add_conditional_edges( "operation", should_retry, { "success": "success_handler", "retry": "operation", # Loop back "failed": "failure_handler" } )
Fallback Strategies
def node_with_fallback(state: dict) -> dict: """Try primary method, fall back to secondary.""" # Try primary LLM try: result = primary_llm(state["prompt"]) return { "result": result, "method": "primary" } except Exception as e_primary: # Fallback to secondary LLM try: result = secondary_llm(state["prompt"]) return { "result": result, "method": "fallback", "primary_error": str(e_primary) } except Exception as e_secondary: # Both failed return { "error": f"Both methods failed: {e_primary}, {e_secondary}", "method": "failed" }
Subgraphs and Composition
Subgraphs as Nodes
def create_subgraph(): """Create reusable subgraph.""" subgraph = StateGraph(dict)
subgraph.add_node("step1", step1_func)
subgraph.add_node("step2", step2_func)
subgraph.add_edge("step1", "step2")
subgraph.add_edge("step2", END)
subgraph.set_entry_point("step1")
return subgraph.compile()
Use subgraph in main graph
def create_main_graph(): workflow = StateGraph(dict)
# Add subgraph as a node
subgraph_compiled = create_subgraph()
workflow.add_node("subgraph", subgraph_compiled)
workflow.add_node("before", before_func)
workflow.add_node("after", after_func)
workflow.add_edge("before", "subgraph")
workflow.add_edge("subgraph", "after")
workflow.add_edge("after", END)
workflow.set_entry_point("before")
return workflow.compile()
Parallel Subgraphs
from langgraph.graph import StateGraph, END
def create_parallel_workflow(): """Execute multiple subgraphs in parallel."""
# Create subgraphs
backend_graph = create_backend_subgraph()
frontend_graph = create_frontend_subgraph()
database_graph = create_database_subgraph()
# Main graph
workflow = StateGraph(dict)
# Add subgraphs as parallel nodes
workflow.add_node("backend", backend_graph)
workflow.add_node("frontend", frontend_graph)
workflow.add_node("database", database_graph)
# Merge results
workflow.add_node("merge", merge_results)
# All subgraphs lead to merge
workflow.add_edge("backend", "merge")
workflow.add_edge("frontend", "merge")
workflow.add_edge("database", "merge")
workflow.add_edge("merge", END)
# Set all as entry points (parallel execution)
workflow.set_entry_point("backend")
workflow.set_entry_point("frontend")
workflow.set_entry_point("database")
return workflow.compile()
Map-Reduce Patterns
Map-Reduce with LangGraph
from typing import TypedDict, List
class MapReduceState(TypedDict): documents: List[str] summaries: List[str] final_summary: str
def map_node(state: MapReduceState) -> dict: """Map: Summarize each document.""" llm = ChatAnthropic(model="claude-sonnet-4")
summaries = []
for doc in state["documents"]:
summary = llm.invoke(f"Summarize: {doc}")
summaries.append(summary.content)
return {"summaries": summaries}
def reduce_node(state: MapReduceState) -> dict: """Reduce: Combine summaries into final summary.""" llm = ChatAnthropic(model="claude-sonnet-4")
combined = "\n".join(state["summaries"])
final = llm.invoke(f"Combine these summaries:\n{combined}")
return {"final_summary": final.content}
Build map-reduce workflow
workflow = StateGraph(MapReduceState)
workflow.add_node("map", map_node) workflow.add_node("reduce", reduce_node)
workflow.add_edge("map", "reduce") workflow.add_edge("reduce", END)
workflow.set_entry_point("map")
app = workflow.compile()
Execute
result = app.invoke({ "documents": ["Doc 1...", "Doc 2...", "Doc 3..."], "summaries": [], "final_summary": "" })
Parallel Map with Batching
import asyncio from typing import List
async def parallel_map_node(state: dict) -> dict: """Map: Process documents in parallel.""" llm = ChatAnthropic(model="claude-sonnet-4")
async def summarize(doc: str) -> str:
response = await llm.ainvoke(f"Summarize: {doc}")
return response.content
# Process all documents in parallel
summaries = await asyncio.gather(
*[summarize(doc) for doc in state["documents"]]
)
return {"summaries": list(summaries)}
Use async workflow
app = workflow.compile()
Execute with asyncio
result = asyncio.run(app.ainvoke({ "documents": ["Doc 1", "Doc 2", "Doc 3"], "summaries": [], "final_summary": "" }))
Production Deployment
LangGraph Cloud
Deploy to LangGraph Cloud (managed service)
1. Install LangGraph CLI
pip install langgraph-cli
2. Create langgraph.json config
langgraph_config = { "dependencies": ["langchain", "langchain-anthropic"], "graphs": { "agent": "./graph.py:app" }, "env": { "ANTHROPIC_API_KEY": "$ANTHROPIC_API_KEY" } }
3. Deploy
langgraph deploy
4. Use deployed graph via API
import requests
response = requests.post( "https://your-deployment.langraph.cloud/invoke", json={ "input": {"messages": ["Hello"]}, "config": {"thread_id": "user_123"} }, headers={"Authorization": "Bearer YOUR_API_KEY"} )
result = response.json()
Self-Hosted with FastAPI
from fastapi import FastAPI, HTTPException from pydantic import BaseModel from langgraph.checkpoint.postgres import PostgresSaver import uvicorn
Create FastAPI app
app_api = FastAPI()
Initialize workflow
checkpointer = PostgresSaver.from_conn_string( "postgresql://user:pass@localhost/langgraph" ) workflow_app = workflow.compile(checkpointer=checkpointer)
class WorkflowRequest(BaseModel): input: dict thread_id: str
class WorkflowResponse(BaseModel): output: dict thread_id: str
@app_api.post("/invoke", response_model=WorkflowResponse) async def invoke_workflow(request: WorkflowRequest): """Invoke workflow synchronously.""" try: config = {"configurable": {"thread_id": request.thread_id}} result = workflow_app.invoke(request.input, config)
return WorkflowResponse(
output=result,
thread_id=request.thread_id
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app_api.post("/stream") async def stream_workflow(request: WorkflowRequest): """Stream workflow execution.""" from fastapi.responses import StreamingResponse
config = {"configurable": {"thread_id": request.thread_id}}
async def generate():
async for event in workflow_app.astream(request.input, config):
yield f"data: {json.dumps(event)}\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
Run server
if name == "main": uvicorn.run(app_api, host="0.0.0.0", port=8000)
Docker Deployment
Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["uvicorn", "main:app_api", "--host", "0.0.0.0", "--port", "8000"]
docker-compose.yml
version: '3.8'
services: app: build: . ports: - "8000:8000" environment: - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY} - POSTGRES_URL=postgresql://user:pass@db:5432/langgraph depends_on: - db - redis
db: image: postgres:15 environment: - POSTGRES_USER=user - POSTGRES_PASSWORD=pass - POSTGRES_DB=langgraph volumes: - postgres_data:/var/lib/postgresql/data
redis: image: redis:7 ports: - "6379:6379"
volumes: postgres_data:
LangSmith Integration
Automatic Tracing
import os from langchain_anthropic import ChatAnthropic
Enable LangSmith tracing
os.environ["LANGCHAIN_TRACING_V2"] = "true" os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-key" os.environ["LANGCHAIN_PROJECT"] = "my-langgraph-project"
All LangGraph executions automatically traced
app = workflow.compile()
result = app.invoke(initial_state)
View trace in LangSmith UI: https://smith.langchain.com
Custom Metadata
from langsmith import traceable
@traceable( run_type="chain", name="research_agent", metadata={"version": "1.0.0"} ) def research_agent(state: dict) -> dict: """Node with custom LangSmith metadata.""" # Function execution automatically traced return {"research": "results"}
workflow.add_node("researcher", research_agent)
Evaluation with LangSmith
from langsmith import Client from langsmith.evaluation import evaluate
client = Client()
Create test dataset
examples = [ { "inputs": {"topic": "AI Safety"}, "outputs": {"quality_score": 0.9} } ]
dataset = client.create_dataset("langgraph_tests") for example in examples: client.create_example( dataset_id=dataset.id, inputs=example["inputs"], outputs=example["outputs"] )
Evaluate workflow
def workflow_wrapper(inputs: dict) -> dict: result = app.invoke(inputs) return result
def quality_evaluator(run, example): """Custom evaluator.""" score = calculate_quality(run.outputs) return {"key": "quality", "score": score}
results = evaluate( workflow_wrapper, data="langgraph_tests", evaluators=[quality_evaluator], experiment_prefix="langgraph_v1" )
print(f"Average quality: {results['results']['quality']:.2f}")
Real-World Examples
Customer Support Agent
from typing import TypedDict, Literal from langchain_anthropic import ChatAnthropic
class SupportState(TypedDict): customer_query: str category: Literal["billing", "technical", "general"] priority: Literal["low", "medium", "high"] resolution: str escalated: bool
def categorize(state: SupportState) -> dict: """Categorize customer query.""" llm = ChatAnthropic(model="claude-sonnet-4")
prompt = f"""
Categorize this support query:
"{state['customer_query']}"
Categories: billing, technical, general
Priority: low, medium, high
Return format: category|priority
"""
response = llm.invoke(prompt)
category, priority = response.content.strip().split("|")
return {
"category": category,
"priority": priority
}
def handle_billing(state: SupportState) -> dict: """Handle billing issues.""" llm = ChatAnthropic(model="claude-sonnet-4")
response = llm.invoke(f"Resolve billing issue: {state['customer_query']}")
return {"resolution": response.content}
def handle_technical(state: SupportState) -> dict: """Handle technical issues.""" llm = ChatAnthropic(model="claude-sonnet-4")
response = llm.invoke(f"Resolve technical issue: {state['customer_query']}")
return {"resolution": response.content}
def escalate(state: SupportState) -> dict: """Escalate to human agent.""" return { "escalated": True, "resolution": "Escalated to senior support team" }
def route_by_category(state: SupportState) -> str: """Route based on category and priority.""" if state["priority"] == "high": return "escalate"
category = state["category"]
if category == "billing":
return "billing"
elif category == "technical":
return "technical"
else:
return "general"
Build support workflow
workflow = StateGraph(SupportState)
workflow.add_node("categorize", categorize) workflow.add_node("billing", handle_billing) workflow.add_node("technical", handle_technical) workflow.add_node("general", handle_technical) # Reuse workflow.add_node("escalate", escalate)
workflow.add_edge("categorize", "router")
workflow.add_conditional_edges( "categorize", route_by_category, { "billing": "billing", "technical": "technical", "general": "general", "escalate": "escalate" } )
workflow.add_edge("billing", END) workflow.add_edge("technical", END) workflow.add_edge("general", END) workflow.add_edge("escalate", END)
workflow.set_entry_point("categorize")
support_app = workflow.compile()
Usage
result = support_app.invoke({ "customer_query": "I was charged twice for my subscription", "category": "", "priority": "", "resolution": "", "escalated": False })
Research Assistant
class ResearchState(TypedDict): topic: str research_notes: list outline: str draft: str final_article: str revision_count: int
def research(state: ResearchState) -> dict: """Gather information on topic.""" # Use search tools results = search_web(state["topic"])
return {
"research_notes": [f"Research: {results}"]
}
def create_outline(state: ResearchState) -> dict: """Create article outline.""" llm = ChatAnthropic(model="claude-sonnet-4")
notes = "\n".join(state["research_notes"])
response = llm.invoke(f"Create outline for: {notes}")
return {"outline": response.content}
def write_draft(state: ResearchState) -> dict: """Write article draft.""" llm = ChatAnthropic(model="claude-sonnet-4")
prompt = f"""
Topic: {state['topic']}
Outline: {state['outline']}
Research: {state['research_notes']}
Write comprehensive article.
"""
response = llm.invoke(prompt)
return {"draft": response.content}
def review_and_revise(state: ResearchState) -> dict: """Review draft quality.""" llm = ChatAnthropic(model="claude-sonnet-4")
response = llm.invoke(f"Review and improve:\n{state['draft']}")
return {
"final_article": response.content,
"revision_count": state.get("revision_count", 0) + 1
}
Build research workflow
workflow = StateGraph(ResearchState)
workflow.add_node("research", research) workflow.add_node("outline", create_outline) workflow.add_node("write", write_draft) workflow.add_node("review", review_and_revise)
workflow.add_edge("research", "outline") workflow.add_edge("outline", "write") workflow.add_edge("write", "review")
def check_quality(state: ResearchState) -> str: if state["revision_count"] >= 2: return END
# Simple quality check
if len(state.get("final_article", "")) > 1000:
return END
else:
return "write" # Revise
workflow.add_conditional_edges( "review", check_quality, {END: END, "write": "write"} )
workflow.set_entry_point("research")
research_app = workflow.compile()
Code Review Workflow
class CodeReviewState(TypedDict): code: str language: str lint_results: list test_results: dict security_scan: dict review_comments: list approved: bool
def lint_code(state: CodeReviewState) -> dict: """Run linters on code.""" # Run appropriate linter issues = run_linter(state["code"], state["language"])
return {"lint_results": issues}
def run_tests(state: CodeReviewState) -> dict: """Execute test suite.""" results = execute_tests(state["code"])
return {"test_results": results}
def security_scan(state: CodeReviewState) -> dict: """Scan for security vulnerabilities.""" scan_results = scan_for_vulnerabilities(state["code"])
return {"security_scan": scan_results}
def ai_review(state: CodeReviewState) -> dict: """AI-powered code review.""" llm = ChatAnthropic(model="claude-sonnet-4")
prompt = f"""
Review this {state['language']} code:
{state['code']}
Lint issues: {state['lint_results']}
Test results: {state['test_results']}
Security: {state['security_scan']}
Provide review comments.
"""
response = llm.invoke(prompt)
return {"review_comments": [response.content]}
def approve_or_reject(state: CodeReviewState) -> dict: """Final approval decision.""" # Auto-approve if all checks pass lint_ok = len(state["lint_results"]) == 0 tests_ok = state["test_results"].get("passed", 0) == state["test_results"].get("total", 0) security_ok = len(state["security_scan"].get("vulnerabilities", [])) == 0
approved = lint_ok and tests_ok and security_ok
return {"approved": approved}
Build code review workflow
workflow = StateGraph(CodeReviewState)
workflow.add_node("lint", lint_code) workflow.add_node("test", run_tests) workflow.add_node("security", security_scan) workflow.add_node("ai_review", ai_review) workflow.add_node("decision", approve_or_reject)
Parallel execution of checks
workflow.add_edge("lint", "ai_review") workflow.add_edge("test", "ai_review") workflow.add_edge("security", "ai_review") workflow.add_edge("ai_review", "decision") workflow.add_edge("decision", END)
workflow.set_entry_point("lint") workflow.set_entry_point("test") workflow.set_entry_point("security")
code_review_app = workflow.compile()
Performance Optimization
Parallel Execution
Multiple nodes with same parent execute in parallel
workflow.add_edge("start", "task_a") workflow.add_edge("start", "task_b") workflow.add_edge("start", "task_c")
task_a, task_b, task_c run concurrently
Caching with Checkpointers
from langgraph.checkpoint.sqlite import SqliteSaver
Checkpoint results for reuse
checkpointer = SqliteSaver.from_conn_string("cache.db")
app = workflow.compile(checkpointer=checkpointer)
Same thread_id reuses cached results
config = {"configurable": {"thread_id": "cached_session"}}
First run: executes all nodes
result1 = app.invoke(input_data, config)
Second run: uses cached state
result2 = app.invoke(None, config) # Instant
Batching
def batch_processing_node(state: dict) -> dict: """Process items in batches.""" items = state["items"] batch_size = 10
results = []
for i in range(0, len(items), batch_size):
batch = items[i:i+batch_size]
batch_result = process_batch(batch)
results.extend(batch_result)
return {"results": results}
Async Execution
from langgraph.graph import StateGraph import asyncio
async def async_node(state: dict) -> dict: """Async node for I/O-bound operations.""" results = await asyncio.gather( fetch_api_1(state["input"]), fetch_api_2(state["input"]), fetch_api_3(state["input"]) )
return {"results": results}
Use async invoke
result = await app.ainvoke(input_data)
Comparison with Simple Chains
LangChain LCEL (Simple Chain)
from langchain_core.runnables import RunnablePassthrough from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(model="claude-sonnet-4")
Simple linear chain
chain = ( RunnablePassthrough() | llm | {"output": lambda x: x.content} )
result = chain.invoke("Hello")
Limitations:
-
❌ No state persistence
-
❌ No conditional branching
-
❌ No cycles/loops
-
❌ No human-in-the-loop
-
❌ Limited debugging
LangGraph (Cyclic Workflow)
from langgraph.graph import StateGraph, END
workflow = StateGraph(dict)
workflow.add_node("step1", step1_func) workflow.add_node("step2", step2_func)
Conditional loop
workflow.add_conditional_edges( "step2", should_continue, {"step1": "step1", END: END} )
app = workflow.compile(checkpointer=memory)
Advantages:
-
✅ Persistent state across steps
-
✅ Conditional branching
-
✅ Cycles and loops
-
✅ Human-in-the-loop support
-
✅ Time-travel debugging
-
✅ Production-ready
Migration from LangChain LCEL
Before: LCEL Chain
from langchain_core.runnables import RunnablePassthrough from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(model="claude-sonnet-4")
chain = ( {"input": RunnablePassthrough()} | llm | {"output": lambda x: x.content} )
result = chain.invoke("Question")
After: LangGraph
from langgraph.graph import StateGraph, END from typing import TypedDict
class State(TypedDict): input: str output: str
def llm_node(state: State) -> dict: llm = ChatAnthropic(model="claude-sonnet-4") response = llm.invoke(state["input"]) return {"output": response.content}
workflow = StateGraph(State) workflow.add_node("llm", llm_node) workflow.add_edge("llm", END) workflow.set_entry_point("llm")
app = workflow.compile()
result = app.invoke({"input": "Question", "output": ""})
Migration Checklist
-
Identify stateful vs stateless operations
-
Convert chain steps to graph nodes
-
Define state schema (TypedDict)
-
Add edges between nodes
-
Implement conditional routing if needed
-
Add checkpointing for state persistence
-
Test with same inputs
-
Verify output matches original chain
Best Practices
State Design
✅ GOOD: Flat, explicit state
class GoodState(TypedDict): user_id: str messages: list current_step: str result: dict
❌ BAD: Nested, ambiguous state
class BadState(TypedDict): data: dict # Too generic context: Any # No type safety
Node Granularity
✅ GOOD: Single responsibility per node
def validate_input(state): ... def process_data(state): ... def format_output(state): ...
❌ BAD: Monolithic node
def do_everything(state): ...
Error Handling
✅ GOOD: Explicit error states
class State(TypedDict): result: dict | None error: str | None status: Literal["pending", "success", "failed"]
def robust_node(state): try: result = operation() return {"result": result, "status": "success"} except Exception as e: return {"error": str(e), "status": "failed"}
❌ BAD: Silent failures
def fragile_node(state): try: return operation() except: return {} # Error lost
Testing
Test individual nodes
def test_node(): state = {"input": "test"} result = my_node(state) assert result["output"] == "expected"
Test full workflow
def test_workflow(): app = workflow.compile() result = app.invoke(initial_state) assert result["final_output"] == "expected"
Test error paths
def test_error_handling(): state = {"input": "invalid"} result = my_node(state) assert result["error"] is not None
Monitoring
import logging
logger = logging.getLogger(name)
def monitored_node(state: dict) -> dict: """Node with logging.""" logger.info(f"Executing node with state: {state.keys()}")
try:
result = operation()
logger.info("Node succeeded")
return {"result": result}
except Exception as e:
logger.error(f"Node failed: {e}")
raise
Summary
LangGraph transforms agent development from linear chains into cyclic, stateful workflows with production-grade capabilities:
Core Strengths:
-
🔄 Cyclic workflows with loops and branches
-
💾 Persistent state across sessions
-
🕐 Time-travel debugging and replay
-
👤 Human-in-the-loop approval gates
-
🔧 Tool integration (LangChain, Anthropic, custom)
-
📊 LangSmith integration for tracing
-
🏭 Production deployment (cloud or self-hosted)
When to Choose LangGraph:
-
Multi-agent coordination required
-
Complex branching logic
-
State persistence needed
-
Human approvals in workflow
-
Production systems with debugging needs
Learning Path:
-
Start with StateGraph basics
-
Add conditional routing
-
Implement checkpointing
-
Build multi-agent patterns
-
Deploy to production
Production Checklist:
-
State schema with TypedDict
-
Error handling in all nodes
-
Checkpointing configured
-
LangSmith tracing enabled
-
Monitoring and logging
-
Testing (unit + integration)
-
Deployment strategy (cloud/self-hosted)
For simple request-response patterns, use LangChain LCEL. For complex stateful workflows, LangGraph is the industry-standard solution.
Related Skills
When using Langgraph, these skills enhance your workflow:
-
dspy: DSPy for LLM prompt optimization (complementary to LangGraph orchestration)
-
test-driven-development: Testing LangGraph workflows and state machines
-
systematic-debugging: Debugging multi-agent workflows and state transitions
[Full documentation available in these skills if deployed in your bundle]