LangGraph Implementation
Core Concepts
LangGraph builds stateful, multi-actor agent applications using a graph-based architecture:
-
StateGraph: Builder class for defining graphs with shared state
-
Nodes: Functions that read state and return partial updates
-
Edges: Define execution flow (static or conditional)
-
Channels: Internal state management (LastValue, BinaryOperatorAggregate)
-
Checkpointer: Persistence for pause/resume capabilities
Essential Imports
from langgraph.graph import StateGraph, START, END from langgraph.graph.message import MessagesState, add_messages from langgraph.checkpoint.memory import InMemorySaver from langgraph.types import Command, Send, interrupt, RetryPolicy from typing import Annotated from typing_extensions import TypedDict
State Schema Patterns
Basic State with TypedDict
class State(TypedDict): counter: int # LastValue - stores last value messages: Annotated[list, operator.add] # Reducer - appends lists items: Annotated[list, lambda a, b: a + [b] if b else a] # Custom reducer
MessagesState for Chat Applications
from langgraph.graph.message import MessagesState
class State(MessagesState): # Inherits: messages: Annotated[list[AnyMessage], add_messages] user_id: str context: dict
Pydantic State (for validation)
from pydantic import BaseModel
class State(BaseModel): messages: Annotated[list, add_messages] validated_field: str # Pydantic validates on assignment
Building Graphs
Basic Pattern
builder = StateGraph(State)
Add nodes - functions that take state, return partial updates
builder.add_node("process", process_fn) builder.add_node("decide", decide_fn)
Add edges
builder.add_edge(START, "process") builder.add_edge("process", "decide") builder.add_edge("decide", END)
Compile
graph = builder.compile()
Node Function Signature
def my_node(state: State) -> dict: """Node receives full state, returns partial update.""" return {"counter": state["counter"] + 1}
With config access
def my_node(state: State, config: RunnableConfig) -> dict: thread_id = config["configurable"]["thread_id"] return {"result": process(state, thread_id)}
With Runtime context (v0.6+)
def my_node(state: State, runtime: Runtime[Context]) -> dict: user_id = runtime.context.get("user_id") return {"result": user_id}
Conditional Edges
from typing import Literal
def router(state: State) -> Literal["agent", "tools", "end"]: last_msg = state["messages"][-1] if hasattr(last_msg, "tool_calls") and last_msg.tool_calls: return "tools" return END # or "end"
builder.add_conditional_edges("agent", router)
With path_map for visualization
builder.add_conditional_edges( "agent", router, path_map={"agent": "agent", "tools": "tools", "end": END} )
Command Pattern (Dynamic Routing + State Update)
from langgraph.types import Command
def dynamic_node(state: State) -> Command[Literal["next", "end"]]: if state["should_continue"]: return Command(goto="next", update={"step": state["step"] + 1}) return Command(goto=END)
Must declare destinations for visualization
builder.add_node("dynamic", dynamic_node, destinations=["next", END])
Send Pattern (Fan-out/Map-Reduce)
from langgraph.types import Send
def fan_out(state: State) -> list[Send]: """Route to multiple node instances with different inputs.""" return [Send("worker", {"item": item}) for item in state["items"]]
builder.add_conditional_edges(START, fan_out) builder.add_edge("worker", "aggregate") # Workers converge
Checkpointing
Enable Persistence
from langgraph.checkpoint.memory import InMemorySaver from langgraph.checkpoint.sqlite import SqliteSaver # Development from langgraph.checkpoint.postgres import PostgresSaver # Production
In-memory (testing only)
graph = builder.compile(checkpointer=InMemorySaver())
SQLite (development)
with SqliteSaver.from_conn_string("checkpoints.db") as checkpointer: graph = builder.compile(checkpointer=checkpointer)
Thread-based invocation
config = {"configurable": {"thread_id": "user-123"}} result = graph.invoke({"messages": [...]}, config)
State Management
Get current state
state = graph.get_state(config)
Get state history
for state in graph.get_state_history(config): print(state.values, state.next)
Update state manually
graph.update_state(config, {"key": "new_value"}, as_node="node_name")
Human-in-the-Loop
Using interrupt()
from langgraph.types import interrupt, Command
def review_node(state: State) -> dict: # Pause and surface value to client human_input = interrupt({"question": "Please review", "data": state["draft"]}) return {"approved": human_input["approved"]}
Resume with Command
graph.invoke(Command(resume={"approved": True}), config)
Interrupt Before/After Nodes
graph = builder.compile( checkpointer=checkpointer, interrupt_before=["human_review"], # Pause before node interrupt_after=["agent"], # Pause after node )
Check pending interrupts
state = graph.get_state(config) if state.next: # Has pending nodes # Resume graph.invoke(None, config)
Streaming
Stream modes: "values", "updates", "custom", "messages", "debug"
Updates only (node outputs)
for chunk in graph.stream(input, stream_mode="updates"): print(chunk) # {"node_name": {"key": "value"}}
Full state after each step
for chunk in graph.stream(input, stream_mode="values"): print(chunk)
Multiple modes
for mode, chunk in graph.stream(input, stream_mode=["updates", "messages"]): if mode == "messages": print("Token:", chunk)
Custom streaming from within nodes
from langgraph.config import get_stream_writer
def my_node(state): writer = get_stream_writer() writer({"progress": 0.5}) # Custom event return {"result": "done"}
Subgraphs
Define subgraph
sub_builder = StateGraph(SubState) sub_builder.add_node("step", step_fn) sub_builder.add_edge(START, "step") subgraph = sub_builder.compile()
Use as node in parent
parent_builder = StateGraph(ParentState) parent_builder.add_node("subprocess", subgraph) parent_builder.add_edge(START, "subprocess")
Subgraph checkpointing
subgraph = sub_builder.compile( checkpointer=None, # Inherit from parent (default) # checkpointer=True, # Use persistent checkpointing # checkpointer=False, # Disable checkpointing )
Retry and Caching
from langgraph.types import RetryPolicy, CachePolicy
retry = RetryPolicy( initial_interval=0.5, backoff_factor=2.0, max_attempts=3, retry_on=ValueError, # Or callable: lambda e: isinstance(e, ValueError) )
cache = CachePolicy(ttl=3600) # Cache for 1 hour
builder.add_node("risky", risky_fn, retry_policy=retry, cache_policy=cache)
Prebuilt Components
create_react_agent (moved to langchain.agents in v1.0)
from langgraph.prebuilt import create_react_agent, ToolNode
Simple agent
graph = create_react_agent( model="anthropic:claude-3-5-sonnet", tools=[my_tool], prompt="You are a helpful assistant", checkpointer=InMemorySaver(), )
Custom tool node
tool_node = ToolNode([tool1, tool2]) builder.add_node("tools", tool_node)
Common Patterns
Agent Loop
def should_continue(state) -> Literal["tools", "end"]: if state["messages"][-1].tool_calls: return "tools" return END
builder.add_node("agent", call_model) builder.add_node("tools", ToolNode(tools)) builder.add_edge(START, "agent") builder.add_conditional_edges("agent", should_continue) builder.add_edge("tools", "agent")
Parallel Execution
Multiple nodes execute in parallel when they share the same trigger
builder.add_edge(START, "node_a") builder.add_edge(START, "node_b") # Runs parallel with node_a builder.add_edge(["node_a", "node_b"], "join") # Wait for both
See PATTERNS.md for advanced patterns including multi-agent systems, hierarchical graphs, and complex workflows.