langgraph

LangGraph is a framework for building stateful, multi-agent applications with LLMs. It implements state machines and directed graphs for orchestration, enabling complex workflows with persistent state management, human-in-the-loop support, and time-travel debugging.

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "langgraph" with this command: npx skills add bobmatnyc/claude-mpm-skills/bobmatnyc-claude-mpm-skills-langgraph

LangGraph Workflows

Summary

LangGraph is a framework for building stateful, multi-agent applications with LLMs. It implements state machines and directed graphs for orchestration, enabling complex workflows with persistent state management, human-in-the-loop support, and time-travel debugging.

Key Innovation: Transforms agent coordination from sequential chains into cyclic graphs with persistent state, conditional branching, and production-grade debugging capabilities.

When to Use

✅ Use LangGraph When:

  • Multi-agent coordination required

  • Complex state management needs

  • Human-in-the-loop workflows (approval gates, reviews)

  • Need debugging/observability (time-travel, replay)

  • Conditional branching based on outputs

  • Building production agent systems

  • State persistence across sessions

❌ Don't Use LangGraph When:

  • Simple single-agent tasks

  • No state persistence needed

  • Prototyping/experimentation phase (use simple chains)

  • Team lacks graph/state machine expertise

  • Stateless request-response patterns

Quick Start

from langgraph.graph import StateGraph, END from typing import TypedDict, Annotated import operator

1. Define state schema

class AgentState(TypedDict): messages: Annotated[list, operator.add] current_step: str

2. Create graph

workflow = StateGraph(AgentState)

3. Add nodes (agents)

def researcher(state): return {"messages": ["Research complete"], "current_step": "research"}

def writer(state): return {"messages": ["Article written"], "current_step": "writing"}

workflow.add_node("researcher", researcher) workflow.add_node("writer", writer)

4. Add edges (transitions)

workflow.add_edge("researcher", "writer") workflow.add_edge("writer", END)

5. Set entry point and compile

workflow.set_entry_point("researcher") app = workflow.compile()

6. Execute

result = app.invoke({"messages": [], "current_step": "start"}) print(result)

Core Concepts

StateGraph

The fundamental building block representing a directed graph of agents with shared state.

from langgraph.graph import StateGraph, END from typing import TypedDict

class AgentState(TypedDict): """State schema shared across all nodes.""" messages: list user_input: str final_output: str metadata: dict

Create graph with state schema

workflow = StateGraph(AgentState)

Key Properties:

  • Nodes: Agent functions that transform state

  • Edges: Transitions between nodes (static or conditional)

  • State: Shared data structure passed between nodes

  • Entry Point: Starting node of execution

  • END: Terminal node signaling completion

Nodes

Nodes are functions that receive current state and return state updates.

def research_agent(state: AgentState) -> dict: """Node function: receives state, returns updates.""" query = state["user_input"]

# Perform research (simplified)
results = search_web(query)

# Return state updates (partial state)
return {
    "messages": state["messages"] + [f"Research: {results}"],
    "metadata": {"research_complete": True}
}

Add node to graph

workflow.add_node("researcher", research_agent)

Node Behavior:

  • Receives full state as input

  • Returns partial state (only fields to update)

  • Can be sync or async functions

  • Can invoke LLMs, call APIs, run computations

Edges

Edges define transitions between nodes.

Static Edges

Direct transition: researcher → writer

workflow.add_edge("researcher", "writer")

Transition to END

workflow.add_edge("writer", END)

Conditional Edges

def should_continue(state: AgentState) -> str: """Routing function: decides next node based on state.""" last_message = state["messages"][-1]

if "APPROVED" in last_message:
    return END
elif "NEEDS_REVISION" in last_message:
    return "writer"
else:
    return "reviewer"

workflow.add_conditional_edges( "reviewer", # Source node should_continue, # Routing function { END: END, "writer": "writer", "reviewer": "reviewer" } )

Graph Construction

Complete Workflow Example

from langgraph.graph import StateGraph, END from typing import TypedDict, Annotated import operator from langchain_anthropic import ChatAnthropic

State schema with reducer

class ResearchState(TypedDict): topic: str research_notes: Annotated[list, operator.add] # Reducer: appends to list draft: str revision_count: int approved: bool

Initialize LLM

llm = ChatAnthropic(model="claude-sonnet-4")

Node 1: Research

def research_node(state: ResearchState) -> dict: """Research the topic and gather information.""" topic = state["topic"]

prompt = f"Research key points about: {topic}"
response = llm.invoke(prompt)

return {
    "research_notes": [response.content]
}

Node 2: Write

def write_node(state: ResearchState) -> dict: """Write draft based on research.""" notes = "\n".join(state["research_notes"])

prompt = f"Write article based on:\n{notes}"
response = llm.invoke(prompt)

return {
    "draft": response.content,
    "revision_count": state.get("revision_count", 0)
}

Node 3: Review

def review_node(state: ResearchState) -> dict: """Review the draft and decide if approved.""" draft = state["draft"]

prompt = f"Review this draft. Reply APPROVED or NEEDS_REVISION:\n{draft}"
response = llm.invoke(prompt)

approved = "APPROVED" in response.content

return {
    "approved": approved,
    "revision_count": state["revision_count"] + 1,
    "research_notes": [f"Review feedback: {response.content}"]
}

Routing logic

def should_continue_writing(state: ResearchState) -> str: """Decide next step after review.""" if state["approved"]: return END elif state["revision_count"] >= 3: return END # Max revisions reached else: return "writer"

Build graph

workflow = StateGraph(ResearchState)

workflow.add_node("researcher", research_node) workflow.add_node("writer", write_node) workflow.add_node("reviewer", review_node)

Static edges

workflow.add_edge("researcher", "writer") workflow.add_edge("writer", "reviewer")

Conditional edge from reviewer

workflow.add_conditional_edges( "reviewer", should_continue_writing, { END: END, "writer": "writer" } )

workflow.set_entry_point("researcher")

Compile

app = workflow.compile()

Execute

result = app.invoke({ "topic": "AI Safety", "research_notes": [], "draft": "", "revision_count": 0, "approved": False })

print(f"Final draft: {result['draft']}") print(f"Revisions: {result['revision_count']}")

State Management

State Schema with TypedDict

from typing import TypedDict, Annotated, Literal import operator

class WorkflowState(TypedDict): # Simple fields (replaced on update) user_id: str request_id: str status: Literal["pending", "processing", "complete", "failed"]

# List with reducer (appends instead of replacing)
messages: Annotated[list, operator.add]

# Dict with custom reducer
metadata: Annotated[dict, lambda x, y: {**x, **y}]

# Optional fields
error: str | None
result: dict | None

State Reducers

Reducers control how state updates are merged.

import operator from typing import Annotated

Built-in reducers

Annotated[list, operator.add] # Append to list Annotated[set, operator.or_] # Union of sets Annotated[int, operator.add] # Sum integers

Custom reducer

def merge_dicts(existing: dict, update: dict) -> dict: """Deep merge dictionaries.""" result = existing.copy() for key, value in update.items(): if key in result and isinstance(result[key], dict) and isinstance(value, dict): result[key] = merge_dicts(result[key], value) else: result[key] = value return result

class State(TypedDict): config: Annotated[dict, merge_dicts]

State Updates

Nodes return partial state - only fields to update:

def node_function(state: State) -> dict: """Return only fields that should be updated.""" return { "messages": ["New message"], # Will be appended "status": "processing" # Will be replaced } # Other fields remain unchanged

Conditional Routing

Basic Routing Function

def route_based_on_score(state: State) -> str: """Route to different nodes based on state.""" score = state["quality_score"]

if score >= 0.9:
    return "publish"
elif score >= 0.6:
    return "review"
else:
    return "revise"

workflow.add_conditional_edges( "evaluator", route_based_on_score, { "publish": "publisher", "review": "reviewer", "revise": "reviser" } )

Dynamic Routing with LLM

from langchain_anthropic import ChatAnthropic

def llm_router(state: State) -> str: """Use LLM to decide next step.""" llm = ChatAnthropic(model="claude-sonnet-4")

prompt = f"""
Current state: {state['current_step']}
User request: {state['user_input']}

Which agent should handle this next?
Options: researcher, coder, writer, FINISH

Return only one word.
"""

response = llm.invoke(prompt)
next_agent = response.content.strip().lower()

if next_agent == "finish":
    return END
else:
    return next_agent

workflow.add_conditional_edges( "supervisor", llm_router, { "researcher": "researcher", "coder": "coder", "writer": "writer", END: END } )

Multi-Condition Routing

def complex_router(state: State) -> str: """Route based on multiple conditions.""" # Check multiple conditions has_errors = bool(state.get("errors")) is_approved = state.get("approved", False) iteration_count = state.get("iterations", 0)

# Priority-based routing
if has_errors:
    return "error_handler"
elif is_approved:
    return END
elif iteration_count >= 5:
    return "escalation"
else:
    return "processor"

workflow.add_conditional_edges( "validator", complex_router, { "error_handler": "error_handler", "processor": "processor", "escalation": "escalation", END: END } )

Multi-Agent Patterns

Supervisor Pattern

One supervisor coordinates multiple specialized agents.

from langgraph.graph import StateGraph, END from langchain_anthropic import ChatAnthropic from typing import TypedDict, Literal

class SupervisorState(TypedDict): task: str agent_history: list result: dict next_agent: str

Specialized agents

class ResearchAgent: def init(self): self.llm = ChatAnthropic(model="claude-sonnet-4")

def run(self, state: SupervisorState) -> dict:
    response = self.llm.invoke(f"Research: {state['task']}")
    return {
        "agent_history": [f"Research: {response.content}"],
        "result": {"research": response.content}
    }

class CodingAgent: def init(self): self.llm = ChatAnthropic(model="claude-sonnet-4")

def run(self, state: SupervisorState) -> dict:
    response = self.llm.invoke(f"Code: {state['task']}")
    return {
        "agent_history": [f"Code: {response.content}"],
        "result": {"code": response.content}
    }

class SupervisorAgent: def init(self): self.llm = ChatAnthropic(model="claude-sonnet-4")

def route(self, state: SupervisorState) -> dict:
    """Decide which agent to use next."""
    history = "\n".join(state.get("agent_history", []))

    prompt = f"""
    Task: {state['task']}
    Progress: {history}

    Which agent should handle the next step?
    Options: researcher, coder, FINISH

    Return only one word.
    """

    response = self.llm.invoke(prompt)
    next_agent = response.content.strip().lower()

    return {"next_agent": next_agent}

Build supervisor workflow

def create_supervisor_workflow(): workflow = StateGraph(SupervisorState)

# Initialize agents
research_agent = ResearchAgent()
coding_agent = CodingAgent()
supervisor = SupervisorAgent()

# Add nodes
workflow.add_node("supervisor", supervisor.route)
workflow.add_node("researcher", research_agent.run)
workflow.add_node("coder", coding_agent.run)

# Conditional routing from supervisor
def route_from_supervisor(state: SupervisorState) -> str:
    next_agent = state.get("next_agent", "FINISH")
    if next_agent == "finish":
        return END
    return next_agent

workflow.add_conditional_edges(
    "supervisor",
    route_from_supervisor,
    {
        "researcher": "researcher",
        "coder": "coder",
        END: END
    }
)

# Loop back to supervisor after each agent
workflow.add_edge("researcher", "supervisor")
workflow.add_edge("coder", "supervisor")

workflow.set_entry_point("supervisor")

return workflow.compile()

Execute

app = create_supervisor_workflow() result = app.invoke({ "task": "Build a REST API for user management", "agent_history": [], "result": {}, "next_agent": "" })

Hierarchical Multi-Agent

Nested supervisor pattern with sub-teams.

class TeamState(TypedDict): task: str team_results: dict

def create_backend_team(): """Sub-graph for backend development.""" workflow = StateGraph(TeamState)

workflow.add_node("api_designer", design_api)
workflow.add_node("database_designer", design_db)
workflow.add_node("implementer", implement_backend)

workflow.add_edge("api_designer", "database_designer")
workflow.add_edge("database_designer", "implementer")
workflow.add_edge("implementer", END)

workflow.set_entry_point("api_designer")

return workflow.compile()

def create_frontend_team(): """Sub-graph for frontend development.""" workflow = StateGraph(TeamState)

workflow.add_node("ui_designer", design_ui)
workflow.add_node("component_builder", build_components)
workflow.add_node("integrator", integrate_frontend)

workflow.add_edge("ui_designer", "component_builder")
workflow.add_edge("component_builder", "integrator")
workflow.add_edge("integrator", END)

workflow.set_entry_point("ui_designer")

return workflow.compile()

Top-level coordinator

def create_project_workflow(): workflow = StateGraph(TeamState)

# Add team sub-graphs as nodes
backend_team = create_backend_team()
frontend_team = create_frontend_team()

workflow.add_node("backend_team", backend_team)
workflow.add_node("frontend_team", frontend_team)
workflow.add_node("integrator", integrate_teams)

# Parallel execution of teams
workflow.add_edge("backend_team", "integrator")
workflow.add_edge("frontend_team", "integrator")
workflow.add_edge("integrator", END)

workflow.set_entry_point("backend_team")

return workflow.compile()

Swarm Pattern (2025)

Dynamic agent hand-offs with collaborative decision-making.

from typing import Literal

class SwarmState(TypedDict): task: str messages: list current_agent: str handoff_reason: str

def create_swarm(): """Agents can dynamically hand off to each other."""

def research_agent(state: SwarmState) -> dict:
    llm = ChatAnthropic(model="claude-sonnet-4")

    # Perform research
    response = llm.invoke(f"Research: {state['task']}")

    # Decide if handoff needed
    needs_code = "implementation" in response.content.lower()

    if needs_code:
        return {
            "messages": [response.content],
            "current_agent": "coder",
            "handoff_reason": "Need implementation"
        }
    else:
        return {
            "messages": [response.content],
            "current_agent": "writer",
            "handoff_reason": "Ready for documentation"
        }

def coding_agent(state: SwarmState) -> dict:
    llm = ChatAnthropic(model="claude-sonnet-4")

    response = llm.invoke(f"Implement: {state['task']}")

    return {
        "messages": [response.content],
        "current_agent": "tester",
        "handoff_reason": "Code complete, needs testing"
    }

def tester_agent(state: SwarmState) -> dict:
    # Test the code
    tests_pass = True  # Simplified

    if tests_pass:
        return {
            "messages": ["Tests passed"],
            "current_agent": "DONE",
            "handoff_reason": "All tests passing"
        }
    else:
        return {
            "messages": ["Tests failed"],
            "current_agent": "coder",
            "handoff_reason": "Fix failing tests"
        }

# Build graph
workflow = StateGraph(SwarmState)

workflow.add_node("researcher", research_agent)
workflow.add_node("coder", coding_agent)
workflow.add_node("tester", tester_agent)

# Dynamic routing based on current_agent
def route_swarm(state: SwarmState) -> str:
    next_agent = state.get("current_agent", "DONE")
    if next_agent == "DONE":
        return END
    return next_agent

workflow.add_conditional_edges(
    "researcher",
    route_swarm,
    {"coder": "coder", "writer": "writer"}
)

workflow.add_conditional_edges(
    "coder",
    route_swarm,
    {"tester": "tester"}
)

workflow.add_conditional_edges(
    "tester",
    route_swarm,
    {"coder": "coder", END: END}
)

workflow.set_entry_point("researcher")

return workflow.compile()

Human-in-the-Loop

Interrupt for Approval

from langgraph.checkpoint.sqlite import SqliteSaver from langgraph.graph import StateGraph, END

Enable checkpointing for interrupts

memory = SqliteSaver.from_conn_string(":memory:")

def create_approval_workflow(): workflow = StateGraph(dict)

workflow.add_node("draft", create_draft)
workflow.add_node("approve", human_approval)  # Interrupt point
workflow.add_node("publish", publish_content)

workflow.add_edge("draft", "approve")
workflow.add_edge("approve", "publish")
workflow.add_edge("publish", END)

workflow.set_entry_point("draft")

# Compile with interrupt before "approve"
return workflow.compile(
    checkpointer=memory,
    interrupt_before=["approve"]  # Pause here
)

Usage

app = create_approval_workflow() config = {"configurable": {"thread_id": "1"}}

Step 1: Run until interrupt

for event in app.stream({"content": "Draft article..."}, config): print(event) # Workflow pauses at "approve" node

Human reviews draft

print("Draft ready for review. Approve? (y/n)") approval = input()

Step 2: Resume with approval decision

if approval == "y": # Continue execution result = app.invoke(None, config) # Resume from checkpoint print("Published:", result) else: # Optionally update state and retry app.update_state(config, {"needs_revision": True}) result = app.invoke(None, config)

Interactive Approval Gates

class ApprovalState(TypedDict): draft: str approved: bool feedback: str

def approval_node(state: ApprovalState) -> dict: """This node requires human interaction.""" # This is where workflow pauses # Human provides input through update_state return state # No automatic changes

def create_interactive_workflow(): workflow = StateGraph(ApprovalState)

workflow.add_node("writer", write_draft)
workflow.add_node("approval_gate", approval_node)
workflow.add_node("reviser", revise_draft)
workflow.add_node("publisher", publish_final)

workflow.add_edge("writer", "approval_gate")

# Route based on approval
def route_after_approval(state: ApprovalState) -> str:
    if state.get("approved"):
        return "publisher"
    else:
        return "reviser"

workflow.add_conditional_edges(
    "approval_gate",
    route_after_approval,
    {"publisher": "publisher", "reviser": "reviser"}
)

workflow.add_edge("reviser", "approval_gate")  # Loop back
workflow.add_edge("publisher", END)

workflow.set_entry_point("writer")

memory = SqliteSaver.from_conn_string("approvals.db")
return workflow.compile(
    checkpointer=memory,
    interrupt_before=["approval_gate"]
)

Execute with human intervention

app = create_interactive_workflow() config = {"configurable": {"thread_id": "article_123"}}

Step 1: Generate draft (pauses at approval_gate)

for event in app.stream({"draft": "", "approved": False, "feedback": ""}, config): print(event)

Step 2: Human reviews and provides feedback

current_state = app.get_state(config) print(f"Review this draft: {current_state.values['draft']}")

Human decides

app.update_state(config, { "approved": False, "feedback": "Add more examples in section 2" })

Step 3: Resume (goes to reviser due to approved=False)

result = app.invoke(None, config)

Checkpointing and Persistence

MemorySaver (In-Memory)

from langgraph.checkpoint.memory import MemorySaver

In-memory checkpointing (lost on restart)

memory = MemorySaver()

app = workflow.compile(checkpointer=memory)

config = {"configurable": {"thread_id": "conversation_1"}}

State automatically saved at each step

result = app.invoke(initial_state, config)

SQLite Persistence

from langgraph.checkpoint.sqlite import SqliteSaver

Persistent checkpointing with SQLite

checkpointer = SqliteSaver.from_conn_string("./workflow_state.db")

app = workflow.compile(checkpointer=checkpointer)

config = {"configurable": {"thread_id": "user_session_456"}}

State persists across restarts

result = app.invoke(initial_state, config)

Later: Resume from same thread

result2 = app.invoke(None, config) # Continues from last state

PostgreSQL for Production

from langgraph.checkpoint.postgres import PostgresSaver

Production-grade persistence

checkpointer = PostgresSaver.from_conn_string( "postgresql://user:pass@localhost/langgraph_db" )

app = workflow.compile(checkpointer=checkpointer)

Supports concurrent workflows with isolation

config1 = {"configurable": {"thread_id": "user_1"}} config2 = {"configurable": {"thread_id": "user_2"}}

Each thread maintains independent state

result1 = app.invoke(state1, config1) result2 = app.invoke(state2, config2)

Redis for Distributed Systems

from langgraph.checkpoint.redis import RedisSaver

Distributed checkpointing

checkpointer = RedisSaver.from_conn_string( "redis://localhost:6379", ttl=3600 # 1 hour TTL )

app = workflow.compile(checkpointer=checkpointer)

Supports horizontal scaling

Multiple workers can process different threads

Time-Travel Debugging

View State History

from langgraph.checkpoint.sqlite import SqliteSaver

checkpointer = SqliteSaver.from_conn_string("workflow.db") app = workflow.compile(checkpointer=checkpointer)

config = {"configurable": {"thread_id": "debug_session"}}

Run workflow

result = app.invoke(initial_state, config)

Retrieve full history

history = app.get_state_history(config)

for i, state_snapshot in enumerate(history): print(f"Step {i}:") print(f" Node: {state_snapshot.next}") print(f" State: {state_snapshot.values}") print(f" Metadata: {state_snapshot.metadata}") print()

Replay from Checkpoint

Get state at specific step

history = list(app.get_state_history(config)) checkpoint_3 = history[3] # Get state at step 3

Replay from that checkpoint

config_replay = { "configurable": { "thread_id": "debug_session", "checkpoint_id": checkpoint_3.config["configurable"]["checkpoint_id"] } }

Resume from step 3

result = app.invoke(None, config_replay)

Rewind and Modify

Rewind to step 5

history = list(app.get_state_history(config)) step_5 = history[5]

Update state at that point

app.update_state( config, {"messages": ["Modified message"]}, as_node="researcher" # Apply update as if from this node )

Continue execution with modified state

result = app.invoke(None, config)

Debug Workflow

def debug_workflow(): """Debug workflow step-by-step.""" checkpointer = SqliteSaver.from_conn_string("debug.db") app = workflow.compile(checkpointer=checkpointer)

config = {"configurable": {"thread_id": "debug"}}

# Execute step-by-step
state = initial_state
for step in range(10):  # Max 10 steps
    print(f"\n=== Step {step} ===")

    # Get current state
    current = app.get_state(config)
    print(f"Current state: {current.values}")
    print(f"Next node: {current.next}")

    if not current.next:
        print("Workflow complete")
        break

    # Execute one step
    for event in app.stream(None, config):
        print(f"Event: {event}")

    # Pause for inspection
    input("Press Enter to continue...")

# View full history
print("\n=== Full History ===")
for i, snapshot in enumerate(app.get_state_history(config)):
    print(f"Step {i}: {snapshot.next} -> {snapshot.values}")

debug_workflow()

Streaming

Token Streaming

from langchain_anthropic import ChatAnthropic

def streaming_node(state: dict) -> dict: """Node that streams LLM output.""" llm = ChatAnthropic(model="claude-sonnet-4")

# Use streaming
response = ""
for chunk in llm.stream(state["prompt"]):
    content = chunk.content
    print(content, end="", flush=True)
    response += content

return {"response": response}

Stream events from workflow

for event in app.stream(initial_state): print(event)

Event Streaming

Stream all events (node execution, state updates)

for event in app.stream(initial_state, stream_mode="updates"): node_name = event.keys()[0] state_update = event[node_name] print(f"Node '{node_name}' updated state: {state_update}")

Custom Streaming

from typing import AsyncGenerator

async def streaming_workflow(input_data: dict) -> AsyncGenerator: """Stream workflow progress in real-time.""" config = {"configurable": {"thread_id": "stream"}}

async for event in app.astream(input_data, config):
    # Stream each state update
    yield {
        "type": "state_update",
        "data": event
    }

# Stream final result
final_state = app.get_state(config)
yield {
    "type": "complete",
    "data": final_state.values
}

Usage in FastAPI

from fastapi import FastAPI from fastapi.responses import StreamingResponse

app_api = FastAPI()

@app_api.post("/workflow/stream") async def stream_workflow(input: dict): return StreamingResponse( streaming_workflow(input), media_type="text/event-stream" )

Tool Integration

LangChain Tools

from langchain.tools import Tool from langchain_community.tools import DuckDuckGoSearchRun from langchain.agents import AgentExecutor, create_react_agent

Define tools

search = DuckDuckGoSearchRun()

tools = [ Tool( name="Search", func=search.run, description="Search the web for information" ) ]

def agent_with_tools(state: dict) -> dict: """Node that uses LangChain tools.""" from langchain_anthropic import ChatAnthropic

llm = ChatAnthropic(model="claude-sonnet-4")

# Create agent with tools
agent = create_react_agent(llm, tools, prompt_template)
agent_executor = AgentExecutor(agent=agent, tools=tools)

# Execute with tools
result = agent_executor.invoke({"input": state["query"]})

return {"result": result["output"]}

Add to graph

workflow.add_node("agent_with_tools", agent_with_tools)

Custom Tools

from langchain.tools import StructuredTool from pydantic import BaseModel, Field

class CalculatorInput(BaseModel): expression: str = Field(description="Mathematical expression to evaluate")

def calculate(expression: str) -> str: """Evaluate mathematical expression.""" try: result = eval(expression) # Simplified return f"Result: {result}" except Exception as e: return f"Error: {e}"

calculator_tool = StructuredTool.from_function( func=calculate, name="Calculator", description="Evaluate mathematical expressions", args_schema=CalculatorInput )

tools = [calculator_tool]

def tool_using_node(state: dict) -> dict: """Use custom tools.""" result = calculator_tool.invoke({"expression": state["math_problem"]}) return {"solution": result}

Anthropic Tool Use

from anthropic import Anthropic import json

def node_with_anthropic_tools(state: dict) -> dict: """Use Anthropic's tool use feature.""" client = Anthropic()

# Define tools
tools = [
    {
        "name": "get_weather",
        "description": "Get weather for a location",
        "input_schema": {
            "type": "object",
            "properties": {
                "location": {"type": "string"}
            },
            "required": ["location"]
        }
    }
]

# First call: Request tool use
response = client.messages.create(
    model="claude-sonnet-4",
    max_tokens=1024,
    tools=tools,
    messages=[{"role": "user", "content": state["query"]}]
)

# Execute tool if requested
if response.stop_reason == "tool_use":
    tool_use = next(
        block for block in response.content
        if block.type == "tool_use"
    )

    # Execute tool
    tool_result = execute_tool(tool_use.name, tool_use.input)

    # Second call: Provide tool result
    final_response = client.messages.create(
        model="claude-sonnet-4",
        max_tokens=1024,
        tools=tools,
        messages=[
            {"role": "user", "content": state["query"]},
            {"role": "assistant", "content": response.content},
            {
                "role": "user",
                "content": [
                    {
                        "type": "tool_result",
                        "tool_use_id": tool_use.id,
                        "content": str(tool_result)
                    }
                ]
            }
        ]
    )

    return {"response": final_response.content[0].text}

return {"response": response.content[0].text}

Error Handling

Try-Catch in Nodes

def robust_node(state: dict) -> dict: """Node with error handling.""" try: # Main logic result = risky_operation(state["input"])

    return {
        "result": result,
        "error": None,
        "status": "success"
    }

except ValueError as e:
    # Handle specific error
    return {
        "error": f"Validation error: {e}",
        "status": "validation_failed"
    }

except Exception as e:
    # Handle unexpected errors
    return {
        "error": f"Unexpected error: {e}",
        "status": "failed"
    }

Route based on error status

def error_router(state: dict) -> str: status = state.get("status")

if status == "success":
    return "next_step"
elif status == "validation_failed":
    return "validator"
else:
    return "error_handler"

workflow.add_conditional_edges( "robust_node", error_router, { "next_step": "next_step", "validator": "validator", "error_handler": "error_handler" } )

Retry Logic

from typing import TypedDict

class RetryState(TypedDict): input: str result: str error: str | None retry_count: int max_retries: int

def node_with_retry(state: RetryState) -> dict: """Node that can be retried on failure.""" try: result = unstable_operation(state["input"])

    return {
        "result": result,
        "error": None,
        "retry_count": 0
    }

except Exception as e:
    return {
        "error": str(e),
        "retry_count": state.get("retry_count", 0) + 1
    }

def should_retry(state: RetryState) -> str: """Decide whether to retry or fail.""" if state.get("error") is None: return "success"

retry_count = state.get("retry_count", 0)
max_retries = state.get("max_retries", 3)

if retry_count < max_retries:
    return "retry"
else:
    return "failed"

Build retry loop

workflow.add_node("operation", node_with_retry) workflow.add_node("success_handler", handle_success) workflow.add_node("failure_handler", handle_failure)

workflow.add_conditional_edges( "operation", should_retry, { "success": "success_handler", "retry": "operation", # Loop back "failed": "failure_handler" } )

Fallback Strategies

def node_with_fallback(state: dict) -> dict: """Try primary method, fall back to secondary.""" # Try primary LLM try: result = primary_llm(state["prompt"]) return { "result": result, "method": "primary" } except Exception as e_primary: # Fallback to secondary LLM try: result = secondary_llm(state["prompt"]) return { "result": result, "method": "fallback", "primary_error": str(e_primary) } except Exception as e_secondary: # Both failed return { "error": f"Both methods failed: {e_primary}, {e_secondary}", "method": "failed" }

Subgraphs and Composition

Subgraphs as Nodes

def create_subgraph(): """Create reusable subgraph.""" subgraph = StateGraph(dict)

subgraph.add_node("step1", step1_func)
subgraph.add_node("step2", step2_func)

subgraph.add_edge("step1", "step2")
subgraph.add_edge("step2", END)

subgraph.set_entry_point("step1")

return subgraph.compile()

Use subgraph in main graph

def create_main_graph(): workflow = StateGraph(dict)

# Add subgraph as a node
subgraph_compiled = create_subgraph()
workflow.add_node("subgraph", subgraph_compiled)

workflow.add_node("before", before_func)
workflow.add_node("after", after_func)

workflow.add_edge("before", "subgraph")
workflow.add_edge("subgraph", "after")
workflow.add_edge("after", END)

workflow.set_entry_point("before")

return workflow.compile()

Parallel Subgraphs

from langgraph.graph import StateGraph, END

def create_parallel_workflow(): """Execute multiple subgraphs in parallel."""

# Create subgraphs
backend_graph = create_backend_subgraph()
frontend_graph = create_frontend_subgraph()
database_graph = create_database_subgraph()

# Main graph
workflow = StateGraph(dict)

# Add subgraphs as parallel nodes
workflow.add_node("backend", backend_graph)
workflow.add_node("frontend", frontend_graph)
workflow.add_node("database", database_graph)

# Merge results
workflow.add_node("merge", merge_results)

# All subgraphs lead to merge
workflow.add_edge("backend", "merge")
workflow.add_edge("frontend", "merge")
workflow.add_edge("database", "merge")

workflow.add_edge("merge", END)

# Set all as entry points (parallel execution)
workflow.set_entry_point("backend")
workflow.set_entry_point("frontend")
workflow.set_entry_point("database")

return workflow.compile()

Map-Reduce Patterns

Map-Reduce with LangGraph

from typing import TypedDict, List

class MapReduceState(TypedDict): documents: List[str] summaries: List[str] final_summary: str

def map_node(state: MapReduceState) -> dict: """Map: Summarize each document.""" llm = ChatAnthropic(model="claude-sonnet-4")

summaries = []
for doc in state["documents"]:
    summary = llm.invoke(f"Summarize: {doc}")
    summaries.append(summary.content)

return {"summaries": summaries}

def reduce_node(state: MapReduceState) -> dict: """Reduce: Combine summaries into final summary.""" llm = ChatAnthropic(model="claude-sonnet-4")

combined = "\n".join(state["summaries"])
final = llm.invoke(f"Combine these summaries:\n{combined}")

return {"final_summary": final.content}

Build map-reduce workflow

workflow = StateGraph(MapReduceState)

workflow.add_node("map", map_node) workflow.add_node("reduce", reduce_node)

workflow.add_edge("map", "reduce") workflow.add_edge("reduce", END)

workflow.set_entry_point("map")

app = workflow.compile()

Execute

result = app.invoke({ "documents": ["Doc 1...", "Doc 2...", "Doc 3..."], "summaries": [], "final_summary": "" })

Parallel Map with Batching

import asyncio from typing import List

async def parallel_map_node(state: dict) -> dict: """Map: Process documents in parallel.""" llm = ChatAnthropic(model="claude-sonnet-4")

async def summarize(doc: str) -> str:
    response = await llm.ainvoke(f"Summarize: {doc}")
    return response.content

# Process all documents in parallel
summaries = await asyncio.gather(
    *[summarize(doc) for doc in state["documents"]]
)

return {"summaries": list(summaries)}

Use async workflow

app = workflow.compile()

Execute with asyncio

result = asyncio.run(app.ainvoke({ "documents": ["Doc 1", "Doc 2", "Doc 3"], "summaries": [], "final_summary": "" }))

Production Deployment

LangGraph Cloud

Deploy to LangGraph Cloud (managed service)

1. Install LangGraph CLI

pip install langgraph-cli

2. Create langgraph.json config

langgraph_config = { "dependencies": ["langchain", "langchain-anthropic"], "graphs": { "agent": "./graph.py:app" }, "env": { "ANTHROPIC_API_KEY": "$ANTHROPIC_API_KEY" } }

3. Deploy

langgraph deploy

4. Use deployed graph via API

import requests

response = requests.post( "https://your-deployment.langraph.cloud/invoke", json={ "input": {"messages": ["Hello"]}, "config": {"thread_id": "user_123"} }, headers={"Authorization": "Bearer YOUR_API_KEY"} )

result = response.json()

Self-Hosted with FastAPI

from fastapi import FastAPI, HTTPException from pydantic import BaseModel from langgraph.checkpoint.postgres import PostgresSaver import uvicorn

Create FastAPI app

app_api = FastAPI()

Initialize workflow

checkpointer = PostgresSaver.from_conn_string( "postgresql://user:pass@localhost/langgraph" ) workflow_app = workflow.compile(checkpointer=checkpointer)

class WorkflowRequest(BaseModel): input: dict thread_id: str

class WorkflowResponse(BaseModel): output: dict thread_id: str

@app_api.post("/invoke", response_model=WorkflowResponse) async def invoke_workflow(request: WorkflowRequest): """Invoke workflow synchronously.""" try: config = {"configurable": {"thread_id": request.thread_id}} result = workflow_app.invoke(request.input, config)

    return WorkflowResponse(
        output=result,
        thread_id=request.thread_id
    )
except Exception as e:
    raise HTTPException(status_code=500, detail=str(e))

@app_api.post("/stream") async def stream_workflow(request: WorkflowRequest): """Stream workflow execution.""" from fastapi.responses import StreamingResponse

config = {"configurable": {"thread_id": request.thread_id}}

async def generate():
    async for event in workflow_app.astream(request.input, config):
        yield f"data: {json.dumps(event)}\n\n"

return StreamingResponse(generate(), media_type="text/event-stream")

Run server

if name == "main": uvicorn.run(app_api, host="0.0.0.0", port=8000)

Docker Deployment

Dockerfile

FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt

COPY . .

CMD ["uvicorn", "main:app_api", "--host", "0.0.0.0", "--port", "8000"]

docker-compose.yml

version: '3.8'

services: app: build: . ports: - "8000:8000" environment: - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY} - POSTGRES_URL=postgresql://user:pass@db:5432/langgraph depends_on: - db - redis

db: image: postgres:15 environment: - POSTGRES_USER=user - POSTGRES_PASSWORD=pass - POSTGRES_DB=langgraph volumes: - postgres_data:/var/lib/postgresql/data

redis: image: redis:7 ports: - "6379:6379"

volumes: postgres_data:

LangSmith Integration

Automatic Tracing

import os from langchain_anthropic import ChatAnthropic

Enable LangSmith tracing

os.environ["LANGCHAIN_TRACING_V2"] = "true" os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-key" os.environ["LANGCHAIN_PROJECT"] = "my-langgraph-project"

All LangGraph executions automatically traced

app = workflow.compile()

result = app.invoke(initial_state)

View trace in LangSmith UI: https://smith.langchain.com

Custom Metadata

from langsmith import traceable

@traceable( run_type="chain", name="research_agent", metadata={"version": "1.0.0"} ) def research_agent(state: dict) -> dict: """Node with custom LangSmith metadata.""" # Function execution automatically traced return {"research": "results"}

workflow.add_node("researcher", research_agent)

Evaluation with LangSmith

from langsmith import Client from langsmith.evaluation import evaluate

client = Client()

Create test dataset

examples = [ { "inputs": {"topic": "AI Safety"}, "outputs": {"quality_score": 0.9} } ]

dataset = client.create_dataset("langgraph_tests") for example in examples: client.create_example( dataset_id=dataset.id, inputs=example["inputs"], outputs=example["outputs"] )

Evaluate workflow

def workflow_wrapper(inputs: dict) -> dict: result = app.invoke(inputs) return result

def quality_evaluator(run, example): """Custom evaluator.""" score = calculate_quality(run.outputs) return {"key": "quality", "score": score}

results = evaluate( workflow_wrapper, data="langgraph_tests", evaluators=[quality_evaluator], experiment_prefix="langgraph_v1" )

print(f"Average quality: {results['results']['quality']:.2f}")

Real-World Examples

Customer Support Agent

from typing import TypedDict, Literal from langchain_anthropic import ChatAnthropic

class SupportState(TypedDict): customer_query: str category: Literal["billing", "technical", "general"] priority: Literal["low", "medium", "high"] resolution: str escalated: bool

def categorize(state: SupportState) -> dict: """Categorize customer query.""" llm = ChatAnthropic(model="claude-sonnet-4")

prompt = f"""
Categorize this support query:
"{state['customer_query']}"

Categories: billing, technical, general
Priority: low, medium, high

Return format: category|priority
"""

response = llm.invoke(prompt)
category, priority = response.content.strip().split("|")

return {
    "category": category,
    "priority": priority
}

def handle_billing(state: SupportState) -> dict: """Handle billing issues.""" llm = ChatAnthropic(model="claude-sonnet-4")

response = llm.invoke(f"Resolve billing issue: {state['customer_query']}")

return {"resolution": response.content}

def handle_technical(state: SupportState) -> dict: """Handle technical issues.""" llm = ChatAnthropic(model="claude-sonnet-4")

response = llm.invoke(f"Resolve technical issue: {state['customer_query']}")

return {"resolution": response.content}

def escalate(state: SupportState) -> dict: """Escalate to human agent.""" return { "escalated": True, "resolution": "Escalated to senior support team" }

def route_by_category(state: SupportState) -> str: """Route based on category and priority.""" if state["priority"] == "high": return "escalate"

category = state["category"]
if category == "billing":
    return "billing"
elif category == "technical":
    return "technical"
else:
    return "general"

Build support workflow

workflow = StateGraph(SupportState)

workflow.add_node("categorize", categorize) workflow.add_node("billing", handle_billing) workflow.add_node("technical", handle_technical) workflow.add_node("general", handle_technical) # Reuse workflow.add_node("escalate", escalate)

workflow.add_edge("categorize", "router")

workflow.add_conditional_edges( "categorize", route_by_category, { "billing": "billing", "technical": "technical", "general": "general", "escalate": "escalate" } )

workflow.add_edge("billing", END) workflow.add_edge("technical", END) workflow.add_edge("general", END) workflow.add_edge("escalate", END)

workflow.set_entry_point("categorize")

support_app = workflow.compile()

Usage

result = support_app.invoke({ "customer_query": "I was charged twice for my subscription", "category": "", "priority": "", "resolution": "", "escalated": False })

Research Assistant

class ResearchState(TypedDict): topic: str research_notes: list outline: str draft: str final_article: str revision_count: int

def research(state: ResearchState) -> dict: """Gather information on topic.""" # Use search tools results = search_web(state["topic"])

return {
    "research_notes": [f"Research: {results}"]
}

def create_outline(state: ResearchState) -> dict: """Create article outline.""" llm = ChatAnthropic(model="claude-sonnet-4")

notes = "\n".join(state["research_notes"])
response = llm.invoke(f"Create outline for: {notes}")

return {"outline": response.content}

def write_draft(state: ResearchState) -> dict: """Write article draft.""" llm = ChatAnthropic(model="claude-sonnet-4")

prompt = f"""
Topic: {state['topic']}
Outline: {state['outline']}
Research: {state['research_notes']}

Write comprehensive article.
"""

response = llm.invoke(prompt)

return {"draft": response.content}

def review_and_revise(state: ResearchState) -> dict: """Review draft quality.""" llm = ChatAnthropic(model="claude-sonnet-4")

response = llm.invoke(f"Review and improve:\n{state['draft']}")

return {
    "final_article": response.content,
    "revision_count": state.get("revision_count", 0) + 1
}

Build research workflow

workflow = StateGraph(ResearchState)

workflow.add_node("research", research) workflow.add_node("outline", create_outline) workflow.add_node("write", write_draft) workflow.add_node("review", review_and_revise)

workflow.add_edge("research", "outline") workflow.add_edge("outline", "write") workflow.add_edge("write", "review")

def check_quality(state: ResearchState) -> str: if state["revision_count"] >= 2: return END

# Simple quality check
if len(state.get("final_article", "")) > 1000:
    return END
else:
    return "write"  # Revise

workflow.add_conditional_edges( "review", check_quality, {END: END, "write": "write"} )

workflow.set_entry_point("research")

research_app = workflow.compile()

Code Review Workflow

class CodeReviewState(TypedDict): code: str language: str lint_results: list test_results: dict security_scan: dict review_comments: list approved: bool

def lint_code(state: CodeReviewState) -> dict: """Run linters on code.""" # Run appropriate linter issues = run_linter(state["code"], state["language"])

return {"lint_results": issues}

def run_tests(state: CodeReviewState) -> dict: """Execute test suite.""" results = execute_tests(state["code"])

return {"test_results": results}

def security_scan(state: CodeReviewState) -> dict: """Scan for security vulnerabilities.""" scan_results = scan_for_vulnerabilities(state["code"])

return {"security_scan": scan_results}

def ai_review(state: CodeReviewState) -> dict: """AI-powered code review.""" llm = ChatAnthropic(model="claude-sonnet-4")

prompt = f"""
Review this {state['language']} code:

{state['code']}

Lint issues: {state['lint_results']}
Test results: {state['test_results']}
Security: {state['security_scan']}

Provide review comments.
"""

response = llm.invoke(prompt)

return {"review_comments": [response.content]}

def approve_or_reject(state: CodeReviewState) -> dict: """Final approval decision.""" # Auto-approve if all checks pass lint_ok = len(state["lint_results"]) == 0 tests_ok = state["test_results"].get("passed", 0) == state["test_results"].get("total", 0) security_ok = len(state["security_scan"].get("vulnerabilities", [])) == 0

approved = lint_ok and tests_ok and security_ok

return {"approved": approved}

Build code review workflow

workflow = StateGraph(CodeReviewState)

workflow.add_node("lint", lint_code) workflow.add_node("test", run_tests) workflow.add_node("security", security_scan) workflow.add_node("ai_review", ai_review) workflow.add_node("decision", approve_or_reject)

Parallel execution of checks

workflow.add_edge("lint", "ai_review") workflow.add_edge("test", "ai_review") workflow.add_edge("security", "ai_review") workflow.add_edge("ai_review", "decision") workflow.add_edge("decision", END)

workflow.set_entry_point("lint") workflow.set_entry_point("test") workflow.set_entry_point("security")

code_review_app = workflow.compile()

Performance Optimization

Parallel Execution

Multiple nodes with same parent execute in parallel

workflow.add_edge("start", "task_a") workflow.add_edge("start", "task_b") workflow.add_edge("start", "task_c")

task_a, task_b, task_c run concurrently

Caching with Checkpointers

from langgraph.checkpoint.sqlite import SqliteSaver

Checkpoint results for reuse

checkpointer = SqliteSaver.from_conn_string("cache.db")

app = workflow.compile(checkpointer=checkpointer)

Same thread_id reuses cached results

config = {"configurable": {"thread_id": "cached_session"}}

First run: executes all nodes

result1 = app.invoke(input_data, config)

Second run: uses cached state

result2 = app.invoke(None, config) # Instant

Batching

def batch_processing_node(state: dict) -> dict: """Process items in batches.""" items = state["items"] batch_size = 10

results = []
for i in range(0, len(items), batch_size):
    batch = items[i:i+batch_size]
    batch_result = process_batch(batch)
    results.extend(batch_result)

return {"results": results}

Async Execution

from langgraph.graph import StateGraph import asyncio

async def async_node(state: dict) -> dict: """Async node for I/O-bound operations.""" results = await asyncio.gather( fetch_api_1(state["input"]), fetch_api_2(state["input"]), fetch_api_3(state["input"]) )

return {"results": results}

Use async invoke

result = await app.ainvoke(input_data)

Comparison with Simple Chains

LangChain LCEL (Simple Chain)

from langchain_core.runnables import RunnablePassthrough from langchain_anthropic import ChatAnthropic

llm = ChatAnthropic(model="claude-sonnet-4")

Simple linear chain

chain = ( RunnablePassthrough() | llm | {"output": lambda x: x.content} )

result = chain.invoke("Hello")

Limitations:

  • ❌ No state persistence

  • ❌ No conditional branching

  • ❌ No cycles/loops

  • ❌ No human-in-the-loop

  • ❌ Limited debugging

LangGraph (Cyclic Workflow)

from langgraph.graph import StateGraph, END

workflow = StateGraph(dict)

workflow.add_node("step1", step1_func) workflow.add_node("step2", step2_func)

Conditional loop

workflow.add_conditional_edges( "step2", should_continue, {"step1": "step1", END: END} )

app = workflow.compile(checkpointer=memory)

Advantages:

  • ✅ Persistent state across steps

  • ✅ Conditional branching

  • ✅ Cycles and loops

  • ✅ Human-in-the-loop support

  • ✅ Time-travel debugging

  • ✅ Production-ready

Migration from LangChain LCEL

Before: LCEL Chain

from langchain_core.runnables import RunnablePassthrough from langchain_anthropic import ChatAnthropic

llm = ChatAnthropic(model="claude-sonnet-4")

chain = ( {"input": RunnablePassthrough()} | llm | {"output": lambda x: x.content} )

result = chain.invoke("Question")

After: LangGraph

from langgraph.graph import StateGraph, END from typing import TypedDict

class State(TypedDict): input: str output: str

def llm_node(state: State) -> dict: llm = ChatAnthropic(model="claude-sonnet-4") response = llm.invoke(state["input"]) return {"output": response.content}

workflow = StateGraph(State) workflow.add_node("llm", llm_node) workflow.add_edge("llm", END) workflow.set_entry_point("llm")

app = workflow.compile()

result = app.invoke({"input": "Question", "output": ""})

Migration Checklist

  • Identify stateful vs stateless operations

  • Convert chain steps to graph nodes

  • Define state schema (TypedDict)

  • Add edges between nodes

  • Implement conditional routing if needed

  • Add checkpointing for state persistence

  • Test with same inputs

  • Verify output matches original chain

Best Practices

State Design

✅ GOOD: Flat, explicit state

class GoodState(TypedDict): user_id: str messages: list current_step: str result: dict

❌ BAD: Nested, ambiguous state

class BadState(TypedDict): data: dict # Too generic context: Any # No type safety

Node Granularity

✅ GOOD: Single responsibility per node

def validate_input(state): ... def process_data(state): ... def format_output(state): ...

❌ BAD: Monolithic node

def do_everything(state): ...

Error Handling

✅ GOOD: Explicit error states

class State(TypedDict): result: dict | None error: str | None status: Literal["pending", "success", "failed"]

def robust_node(state): try: result = operation() return {"result": result, "status": "success"} except Exception as e: return {"error": str(e), "status": "failed"}

❌ BAD: Silent failures

def fragile_node(state): try: return operation() except: return {} # Error lost

Testing

Test individual nodes

def test_node(): state = {"input": "test"} result = my_node(state) assert result["output"] == "expected"

Test full workflow

def test_workflow(): app = workflow.compile() result = app.invoke(initial_state) assert result["final_output"] == "expected"

Test error paths

def test_error_handling(): state = {"input": "invalid"} result = my_node(state) assert result["error"] is not None

Monitoring

import logging

logger = logging.getLogger(name)

def monitored_node(state: dict) -> dict: """Node with logging.""" logger.info(f"Executing node with state: {state.keys()}")

try:
    result = operation()
    logger.info("Node succeeded")
    return {"result": result}
except Exception as e:
    logger.error(f"Node failed: {e}")
    raise

Summary

LangGraph transforms agent development from linear chains into cyclic, stateful workflows with production-grade capabilities:

Core Strengths:

  • 🔄 Cyclic workflows with loops and branches

  • 💾 Persistent state across sessions

  • 🕐 Time-travel debugging and replay

  • 👤 Human-in-the-loop approval gates

  • 🔧 Tool integration (LangChain, Anthropic, custom)

  • 📊 LangSmith integration for tracing

  • 🏭 Production deployment (cloud or self-hosted)

When to Choose LangGraph:

  • Multi-agent coordination required

  • Complex branching logic

  • State persistence needed

  • Human approvals in workflow

  • Production systems with debugging needs

Learning Path:

  • Start with StateGraph basics

  • Add conditional routing

  • Implement checkpointing

  • Build multi-agent patterns

  • Deploy to production

Production Checklist:

  • State schema with TypedDict

  • Error handling in all nodes

  • Checkpointing configured

  • LangSmith tracing enabled

  • Monitoring and logging

  • Testing (unit + integration)

  • Deployment strategy (cloud/self-hosted)

For simple request-response patterns, use LangChain LCEL. For complex stateful workflows, LangGraph is the industry-standard solution.

Related Skills

When using Langgraph, these skills enhance your workflow:

  • dspy: DSPy for LLM prompt optimization (complementary to LangGraph orchestration)

  • test-driven-development: Testing LangGraph workflows and state machines

  • systematic-debugging: Debugging multi-agent workflows and state transitions

[Full documentation available in these skills if deployed in your bundle]

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Automation

git-workflow

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

local-llm-ops

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

phoenix-ops

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

digitalocean-agentic-cloud

No summary provided by upstream source.

Repository SourceNeeds Review