LangGraph Functional API
Build workflows using decorators instead of explicit graph construction.
Overview
-
Sequential workflows with conditional branching
-
Orchestrator-worker patterns with parallel execution
-
Workflows needing persistence and checkpointing
-
Human-in-the-loop approval flows
-
Simpler alternative to explicit StateGraph construction
Core Concepts
Graph API vs Functional API
Graph API (explicit): Functional API (implicit): StateGraph → add_node → @task functions + add_edge → compile @entrypoint orchestration
When to Use Functional API:
-
Sequential workflows with conditional logic
-
Orchestrator-worker patterns
-
Simpler debugging (regular Python functions)
-
Parallel task execution
Quick Start
Basic Pattern
from langgraph.func import entrypoint, task
@task def step_one(data: str) -> str: """Task returns a future - call .result() to block""" return process(data)
@task def step_two(result: str) -> str: return transform(result)
@entrypoint() def my_workflow(input_data: str) -> str: # Tasks return futures - enables parallel execution result1 = step_one(input_data).result() result2 = step_two(result1).result() return result2
Invoke
output = my_workflow.invoke("hello")
Key Rules
-
@task functions return futures - call .result() to get value
-
@entrypoint is the workflow entry point - orchestrates tasks
-
Tasks inside entrypoint are tracked for persistence/streaming
-
Regular functions (no decorator) execute normally
Parallel Execution
Fan-Out Pattern
@task def fetch_source_a(query: str) -> dict: return api_a.search(query)
@task def fetch_source_b(query: str) -> dict: return api_b.search(query)
@task def merge_results(results: list[dict]) -> dict: return {"combined": results}
@entrypoint() def parallel_search(query: str) -> dict: # Launch in parallel - futures start immediately future_a = fetch_source_a(query) future_b = fetch_source_b(query)
# Block on both results
results = [future_a.result(), future_b.result()]
return merge_results(results).result()
Map Over Collection
@task def process_item(item: dict) -> dict: return transform(item)
@entrypoint() def batch_workflow(items: list[dict]) -> list[dict]: # Launch all in parallel futures = [process_item(item) for item in items]
# Collect results
return [f.result() for f in futures]
Persistence & Checkpointing
Enable Checkpointing
from langgraph.checkpoint.memory import InMemorySaver
checkpointer = InMemorySaver()
@entrypoint(checkpointer=checkpointer) def resumable_workflow(data: str) -> str: # Workflow state is automatically saved after each task result = expensive_task(data).result() return result
Use thread_id for persistence
config = {"configurable": {"thread_id": "session-123"}} result = resumable_workflow.invoke("input", config)
Access Previous State
from typing import Optional
@entrypoint(checkpointer=checkpointer) def stateful_workflow(data: str, previous: Optional[dict] = None) -> dict: """previous contains last return value for this thread_id"""
if previous and previous.get("step") == "complete":
return previous # Already done
result = process(data).result()
return {"step": "complete", "result": result}
Human-in-the-Loop
Interrupt for Approval
from langgraph.types import interrupt, Command
@entrypoint(checkpointer=checkpointer) def approval_workflow(request: dict) -> dict: # Process request result = analyze_request(request).result()
# Pause for human approval
approved = interrupt({
"question": "Approve this action?",
"details": result
})
if approved:
return execute_action(result).result()
else:
return {"status": "rejected"}
Initial run - pauses at interrupt
config = {"configurable": {"thread_id": "approval-1"}} for chunk in approval_workflow.stream(request, config): print(chunk)
Resume after human review
for chunk in approval_workflow.stream(Command(resume=True), config): print(chunk)
Conditional Logic
Branching
@task def classify(text: str) -> str: return llm.invoke(f"Classify: {text}") # "positive" or "negative"
@task def handle_positive(text: str) -> str: return "Thank you for the positive feedback!"
@task def handle_negative(text: str) -> str: return "We're sorry to hear that. Creating support ticket..."
@entrypoint() def feedback_workflow(text: str) -> str: sentiment = classify(text).result()
if sentiment == "positive":
return handle_positive(text).result()
else:
return handle_negative(text).result()
Loop Until Done
@task def call_llm(messages: list) -> dict: return llm_with_tools.invoke(messages)
@task def call_tool(tool_call: dict) -> str: tool = tools[tool_call["name"]] return tool.invoke(tool_call["args"])
@entrypoint() def agent_loop(query: str) -> str: messages = [{"role": "user", "content": query}]
while True:
response = call_llm(messages).result()
if not response.get("tool_calls"):
return response["content"]
# Execute tools in parallel
tool_futures = [call_tool(tc) for tc in response["tool_calls"]]
tool_results = [f.result() for f in tool_futures]
messages.extend([response, *tool_results])
Streaming
Stream Updates
@entrypoint() def streaming_workflow(data: str) -> str: step1 = task_one(data).result() step2 = task_two(step1).result() return step2
Stream task completion updates
for update in streaming_workflow.stream("input", stream_mode="updates"): print(f"Task completed: {update}")
Stream Modes
"updates" - task completion events
for chunk in workflow.stream(input, stream_mode="updates"): print(chunk)
"values" - full state after each task
for chunk in workflow.stream(input, stream_mode="values"): print(chunk)
"custom" - custom events from your code
for chunk in workflow.stream(input, stream_mode="custom"): print(chunk)
TypeScript
Basic Pattern
import { entrypoint, task, MemorySaver } from "@langchain/langgraph";
const processData = task("processData", async (data: string) => { return await transform(data); });
const workflow = entrypoint( { name: "myWorkflow", checkpointer: new MemorySaver() }, async (input: string) => { const result = await processData(input); return result; } );
// Invoke const config = { configurable: { thread_id: "session-1" } }; const result = await workflow.invoke("hello", config);
Parallel Execution
const fetchA = task("fetchA", async (q: string) => api.fetchA(q)); const fetchB = task("fetchB", async (q: string) => api.fetchB(q));
const parallelWorkflow = entrypoint("parallel", async (query: string) => { // Launch in parallel using Promise.all const [resultA, resultB] = await Promise.all([ fetchA(query), fetchB(query) ]); return { a: resultA, b: resultB }; });
Common Patterns
Orchestrator-Worker
@task def plan(topic: str) -> list[str]: """Orchestrator creates work items""" sections = planner.invoke(f"Create outline for: {topic}") return sections
@task def write_section(section: str) -> str: """Worker processes one item""" return llm.invoke(f"Write section: {section}")
@task def synthesize(sections: list[str]) -> str: """Combine results""" return "\n\n".join(sections)
@entrypoint() def report_workflow(topic: str) -> str: sections = plan(topic).result()
# Fan-out to workers
section_futures = [write_section(s) for s in sections]
completed = [f.result() for f in section_futures]
# Fan-in
return synthesize(completed).result()
Retry Pattern
@task def unreliable_api(data: str) -> dict: return external_api.call(data)
@entrypoint() def retry_workflow(data: str, max_retries: int = 3) -> dict: for attempt in range(max_retries): try: return unreliable_api(data).result() except Exception as e: if attempt == max_retries - 1: raise continue
Anti-Patterns
-
Forgetting .result(): Tasks return futures, must call .result()
-
Blocking in tasks: Keep tasks focused, don't nest entrypoints
-
Missing checkpointer: Without it, can't resume interrupted workflows
-
Sequential when parallel: Launch tasks before blocking on results
Migration from Graph API
Graph API (before)
from langgraph.graph import StateGraph
def node_a(state): return {"data": process(state["input"])} def node_b(state): return {"result": transform(state["data"])}
graph = StateGraph(State) graph.add_node("a", node_a) graph.add_node("b", node_b) graph.add_edge("a", "b") app = graph.compile()
Functional API (after)
@task def process_data(input: str) -> str: return process(input)
@task def transform_data(data: str) -> str: return transform(data)
@entrypoint() def workflow(input: str) -> str: data = process_data(input).result() return transform_data(data).result()
Evaluations
See references/evaluations.md for test cases.
Related Skills
-
langgraph-state
-
State management patterns for complex workflow data
-
langgraph-routing
-
Conditional routing and branching decisions
-
langgraph-parallel
-
Advanced parallel execution and fan-out patterns
-
langgraph-checkpoints
-
Persistence and recovery for long-running workflows
-
langgraph-human-in-loop
-
Human approval with Functional API
-
langgraph-subgraphs
-
Compose functional workflows as subgraphs
Key Decisions
Decision Choice Rationale
API Style Functional over Graph Simpler debugging, familiar Python patterns, implicit graph construction
Task Returns Futures with .result() Enables parallel execution without explicit async/await
Checkpointing Optional per-entrypoint Flexibility for stateless vs. resumable workflows
Human-in-Loop interrupt() function Clean pause/resume semantics with Command pattern
Resources
-
LangGraph Functional API: https://langchain-ai.github.io/langgraph/concepts/functional_api/
-
Workflows Tutorial: https://langchain-ai.github.io/langgraph/tutorials/workflows/