Agent Workflow Designer
Tier: POWERFUL Category: Engineering / AI Systems Maintainer: Claude Skills Team
Overview
Design production-grade multi-agent orchestration systems from requirements. Covers five core patterns (sequential pipeline, parallel fan-out/fan-in, hierarchical delegation, event-driven reactor, consensus validation), agent routing strategies, structured handoff protocols, persistent state management, error recovery with circuit breakers, context window budgeting, and cost optimization. Includes framework-specific implementations for LangGraph, CrewAI, AutoGen, and native Claude Code agent teams.
Keywords
agent workflow, multi-agent orchestration, workflow DAG, agent routing, fan-out fan-in, hierarchical delegation, handoff protocol, state management, agent pipeline, LangGraph, CrewAI, AutoGen, context budgeting
Core Capabilities
- Pattern Selection and Design
-
Sequential pipelines with typed handoffs
-
Parallel fan-out/fan-in with merge strategies
-
Hierarchical delegation with dynamic subtask discovery
-
Event-driven reactors with pub/sub agent triggers
-
Consensus validation with voting and arbitration
- Agent Routing
-
Intent-based routing with classifier agents
-
Skill-based routing using capability matching
-
Cost-aware routing (cheap models for simple tasks)
-
Load-balanced routing across agent pools
-
Fallback chains with graceful degradation
- State and Context Management
-
Persistent workflow state across agent hops
-
Context window budgeting and summarization
-
Checkpoint/resume for long-running workflows
-
Conflict resolution for parallel state updates
- Reliability Engineering
-
Circuit breakers for failing agents
-
Retry with exponential backoff and model fallback
-
Dead letter queues for unprocessable tasks
-
Timeout enforcement at every agent boundary
-
Idempotent operations for safe retries
When to Use
-
Building multi-step AI pipelines that exceed one agent's capability
-
Parallelizing research, analysis, or generation tasks
-
Creating specialist agent teams with defined roles and contracts
-
Designing fault-tolerant AI workflows for production deployment
-
Optimizing cost across workflows with mixed model tiers
Pattern Selection Decision Tree
What does the workflow look like? │ ├─ Linear: step A feeds step B feeds step C │ └─ SEQUENTIAL PIPELINE │ Best for: content pipelines, code review chains, data transformation │ ├─ Parallel: N independent tasks, then combine │ └─ FAN-OUT / FAN-IN │ Best for: competitive research, multi-source analysis, parallel code gen │ ├─ Tree: orchestrator breaks work into subtasks dynamically │ └─ HIERARCHICAL DELEGATION │ Best for: complex projects, open-ended research, code generation with planning │ ├─ Reactive: agents respond to events/triggers │ └─ EVENT-DRIVEN REACTOR │ Best for: monitoring, alerting, continuous integration, chat workflows │ └─ Verification: multiple agents must agree on output └─ CONSENSUS VALIDATION Best for: high-stakes decisions, code review, fact checking, safety-critical output
Pattern 1: Sequential Pipeline
Each stage transforms input and passes structured output to the next. Type-safe handoffs prevent data loss between stages.
LangGraph Implementation
from langgraph.graph import StateGraph, END from typing import TypedDict, Annotated from langchain_anthropic import ChatAnthropic
class PipelineState(TypedDict): topic: str research: str draft: str final: str stage_costs: Annotated[list[dict], "append"] # accumulates cost per stage
def research_stage(state: PipelineState) -> dict: model = ChatAnthropic(model="claude-sonnet-4-20250514", max_tokens=2048) result = model.invoke( f"Research the following topic thoroughly. Provide key facts, statistics, " f"and expert perspectives:\n\n{state['topic']}" ) return { "research": result.content, "stage_costs": [{"stage": "research", "tokens": result.usage_metadata["total_tokens"]}], }
def writing_stage(state: PipelineState) -> dict: model = ChatAnthropic(model="claude-sonnet-4-20250514", max_tokens=4096) result = model.invoke( f"Using this research, write a compelling 800-word blog post with a hook, " f"3 main sections, and a CTA:\n\n{state['research']}" ) return { "draft": result.content, "stage_costs": [{"stage": "writing", "tokens": result.usage_metadata["total_tokens"]}], }
def editing_stage(state: PipelineState) -> dict: model = ChatAnthropic(model="claude-haiku-4-20250514", max_tokens=4096) result = model.invoke( f"Edit this draft for clarity, flow, and grammar. Return only the improved " f"version:\n\n{state['draft']}" ) return { "final": result.content, "stage_costs": [{"stage": "editing", "tokens": result.usage_metadata["total_tokens"]}], }
Build the graph
graph = StateGraph(PipelineState) graph.add_node("research", research_stage) graph.add_node("write", writing_stage) graph.add_node("edit", editing_stage) graph.add_edge("research", "write") graph.add_edge("write", "edit") graph.add_edge("edit", END) graph.set_entry_point("research")
pipeline = graph.compile()
Execute
result = pipeline.invoke({"topic": "The future of AI agents in enterprise software"}) print(f"Total cost: {sum(s['tokens'] for s in result['stage_costs'])} tokens")
Pattern 2: Parallel Fan-Out / Fan-In
Independent tasks run concurrently. A merge function combines results.
import asyncio from dataclasses import dataclass
@dataclass class FanOutTask: name: str system_prompt: str user_message: str model: str = "claude-sonnet-4-20250514"
@dataclass class FanOutResult: task_name: str output: str tokens_used: int success: bool error: str | None = None
async def fan_out_fan_in( tasks: list[FanOutTask], merge_prompt: str, max_concurrent: int = 5, timeout_seconds: float = 60.0, ) -> dict: """Execute tasks in parallel with concurrency limit and timeout.""" import anthropic
client = anthropic.AsyncAnthropic()
semaphore = asyncio.Semaphore(max_concurrent)
async def run_one(task: FanOutTask) -> FanOutResult:
async with semaphore:
try:
response = await asyncio.wait_for(
client.messages.create(
model=task.model,
max_tokens=2048,
system=task.system_prompt,
messages=[{"role": "user", "content": task.user_message}],
),
timeout=timeout_seconds,
)
return FanOutResult(
task_name=task.name,
output=response.content[0].text,
tokens_used=response.usage.input_tokens + response.usage.output_tokens,
success=True,
)
except Exception as e:
return FanOutResult(
task_name=task.name, output="", tokens_used=0,
success=False, error=str(e),
)
# FAN-OUT: run all tasks concurrently
results = await asyncio.gather(*[run_one(t) for t in tasks])
successful = [r for r in results if r.success]
failed = [r for r in results if not r.success]
if not successful:
raise RuntimeError(f"All {len(tasks)} fan-out tasks failed: {[f.error for f in failed]}")
# FAN-IN: merge results
combined = "\n\n---\n\n".join(
f"## {r.task_name}\n{r.output}" for r in successful
)
merge_response = await client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=4096,
system="Synthesize the following parallel analyses into a unified report.",
messages=[{"role": "user", "content": f"{merge_prompt}\n\n{combined}"}],
)
return {
"synthesis": merge_response.content[0].text,
"individual_results": successful,
"failures": failed,
"total_tokens": sum(r.tokens_used for r in results) + merge_response.usage.input_tokens + merge_response.usage.output_tokens,
}
Pattern 3: Hierarchical Delegation
An orchestrator agent dynamically decomposes work and delegates to specialists.
from typing import Literal
SPECIALISTS = { "researcher": "Find accurate information with sources. Be thorough and cite evidence.", "coder": "Write clean, tested code. Include error handling and type hints.", "writer": "Create clear, engaging content. Match the requested tone and format.", "analyst": "Analyze data and produce evidence-backed conclusions with visualizations.", "reviewer": "Review work product for quality, accuracy, and completeness.", }
@dataclass class SubTask: id: str agent: Literal["researcher", "coder", "writer", "analyst", "reviewer"] task: str depends_on: list[str] priority: int = 0 # higher = run first when deps are equal
class HierarchicalOrchestrator: def init(self, client): self.client = client
async def plan(self, request: str) -> list[SubTask]:
"""Orchestrator creates an execution plan with dependencies."""
response = await self.client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=2048,
system=f"""You are a task orchestrator. Break down the request into subtasks.
Available specialists: {', '.join(SPECIALISTS.keys())} Respond with JSON: {{"subtasks": [{{"id": "1", "agent": "researcher", "task": "...", "depends_on": []}}]}} Rules:
-
Minimize the number of subtasks (prefer fewer, more substantial tasks)
-
Only add dependencies when output is genuinely needed
-
Independent tasks should have empty depends_on for parallel execution""", messages=[{"role": "user", "content": request}], ) import json plan = json.loads(response.content[0].text) return [SubTask(**st) for st in plan["subtasks"]]
async def execute(self, request: str) -> str: """Plan, execute with dependency resolution, and synthesize.""" subtasks = await self.plan(request) results = {}
# Execute in dependency order, parallelize where possible for batch in self._batch_by_dependencies(subtasks): batch_results = await asyncio.gather(*[ self._run_specialist(st, results) for st in batch ]) for st, result in zip(batch, batch_results): results[st.id] = result # Final synthesis all_outputs = "\n\n".join(f"### {k}\n{v}" for k, v in results.items()) synthesis = await self.client.messages.create( model="claude-sonnet-4-20250514", max_tokens=4096, system="Synthesize specialist outputs into a coherent final response.", messages=[{"role": "user", "content": f"Request: {request}\n\nOutputs:\n{all_outputs}"}], ) return synthesis.content[0].textdef _batch_by_dependencies(self, subtasks: list[SubTask]) -> list[list[SubTask]]: """Group subtasks into batches that can run in parallel.""" completed = set() remaining = list(subtasks) batches = [] while remaining: batch = [t for t in remaining if all(d in completed for d in t.depends_on)] if not batch: raise ValueError("Circular dependency detected in subtask plan") batches.append(sorted(batch, key=lambda t: -t.priority)) completed.update(t.id for t in batch) remaining = [t for t in remaining if t.id not in completed] return batches
Pattern 4: Event-Driven Reactor
Agents react to events from a message bus. Decoupled and scalable.
from collections import defaultdict from typing import Callable, Any
class AgentEventBus: """Simple event bus for agent-to-agent communication."""
def __init__(self):
self._handlers: dict[str, list[Callable]] = defaultdict(list)
self._history: list[dict] = []
def subscribe(self, event_type: str, handler: Callable):
self._handlers[event_type].append(handler)
async def publish(self, event_type: str, payload: Any, source: str):
event = {"type": event_type, "payload": payload, "source": source}
self._history.append(event)
handlers = self._handlers.get(event_type, [])
results = await asyncio.gather(
*[h(event) for h in handlers],
return_exceptions=True,
)
errors = [(h, r) for h, r in zip(handlers, results) if isinstance(r, Exception)]
if errors:
for handler, error in errors:
print(f"Handler {handler.__name__} failed: {error}")
return results
Usage: code review pipeline triggered by PR events
bus = AgentEventBus()
async def on_pr_opened(event): """Security agent scans PR for vulnerabilities.""" diff = event["payload"]["diff"] # ... scan and publish results await bus.publish("security_scan_complete", {"findings": findings}, "security-agent")
async def on_security_complete(event): """Review agent incorporates security findings into review.""" # ... generate review with security context
bus.subscribe("pr_opened", on_pr_opened) bus.subscribe("security_scan_complete", on_security_complete)
Pattern 5: Consensus Validation
Multiple agents independently evaluate the same input. A quorum determines the final output.
@dataclass class Vote: agent: str verdict: str # "approve" | "reject" | "revise" confidence: float # 0.0 - 1.0 reasoning: str
async def consensus_validate( content: str, validators: list[dict], # [{"name": "...", "system": "..."}] quorum: float = 0.66, confidence_threshold: float = 0.7, ) -> dict: """Run content through multiple validators and determine consensus.""" votes: list[Vote] = []
# Collect independent votes (no agent sees another's vote)
vote_tasks = []
for v in validators:
vote_tasks.append(get_agent_vote(v["name"], v["system"], content))
raw_votes = await asyncio.gather(*vote_tasks)
votes = [v for v in raw_votes if v is not None]
# Calculate consensus
approvals = [v for v in votes if v.verdict == "approve"]
approval_rate = len(approvals) / len(votes) if votes else 0
avg_confidence = sum(v.confidence for v in votes) / len(votes) if votes else 0
if approval_rate >= quorum and avg_confidence >= confidence_threshold:
return {"decision": "approved", "approval_rate": approval_rate, "votes": votes}
elif any(v.verdict == "reject" for v in votes):
rejections = [v for v in votes if v.verdict == "reject"]
return {"decision": "rejected", "reasons": [r.reasoning for r in rejections], "votes": votes}
else:
return {"decision": "needs_revision", "feedback": [v.reasoning for v in votes], "votes": votes}
Agent Routing Strategies
Intent-Based Router
class IntentRouter: """Route requests to specialized agents based on intent classification."""
ROUTING_TABLE = {
"code_generation": {"agent": "coder", "model": "claude-sonnet-4-20250514"},
"code_review": {"agent": "reviewer", "model": "claude-sonnet-4-20250514"},
"research": {"agent": "researcher", "model": "claude-sonnet-4-20250514"},
"simple_question": {"agent": "assistant", "model": "claude-haiku-4-20250514"},
"creative_writing": {"agent": "writer", "model": "claude-sonnet-4-20250514"},
"complex_analysis": {"agent": "analyst", "model": "claude-sonnet-4-20250514"},
}
async def route(self, message: str) -> dict:
# Use a fast, cheap model for classification
classification = await self.client.messages.create(
model="claude-haiku-4-20250514",
max_tokens=50,
system="Classify the user intent. Respond with ONLY one of: code_generation, code_review, research, simple_question, creative_writing, complex_analysis",
messages=[{"role": "user", "content": message}],
)
intent = classification.content[0].text.strip().lower()
return self.ROUTING_TABLE.get(intent, self.ROUTING_TABLE["simple_question"])
Context Window Budgeting
MODEL_LIMITS = { "claude-sonnet-4-20250514": 200_000, "claude-haiku-4-20250514": 200_000, "claude-opus-4-20250514": 200_000, "gpt-4o": 128_000, }
class ContextBudget: def init(self, model: str, pipeline_stages: int, reserve_pct: float = 0.15): self.total = MODEL_LIMITS.get(model, 128_000) self.reserve = int(self.total * reserve_pct) self.per_stage = (self.total - self.reserve) // pipeline_stages self.used = 0
def allocate(self, stage: str) -> int:
available = self.total - self.reserve - self.used
allocation = min(self.per_stage, int(available * 0.6))
return max(allocation, 1000) # minimum 1000 tokens per stage
def consume(self, tokens: int):
self.used += tokens
def summarize_if_needed(self, text: str, budget: int) -> str:
estimated_tokens = len(text) // 4
if estimated_tokens <= budget:
return text
# Truncate to budget with marker
char_limit = budget * 4
return text[:char_limit] + "\n\n[Content truncated to fit context budget]"
Cost Optimization Matrix
Strategy Cost Reduction Quality Impact When to Use
Haiku for routing/classification 85-90% Minimal Always for intent routing
Haiku for editing/formatting 60-70% Low Mechanical tasks
Sonnet for most stages Baseline Baseline Default choice
Opus only for final synthesis +50% on that stage Higher quality High-stakes output
Prompt caching (system prompts) 50-90% per call None Repeated system prompts
Truncate intermediate outputs 20-40% May lose detail Long pipelines
Parallel + early termination 30-50% None if threshold met Search/validation tasks
Batch similar requests Up to 50% Increased latency Non-real-time workloads
Reliability Patterns
Circuit Breaker
import time
class CircuitBreaker: """Prevent cascading failures when an agent/model is down."""
def __init__(self, failure_threshold: int = 5, recovery_time: float = 60.0):
self.failure_threshold = failure_threshold
self.recovery_time = recovery_time
self.failures = 0
self.state = "closed" # closed = healthy, open = failing, half-open = testing
self.last_failure_time = 0.0
def can_execute(self) -> bool:
if self.state == "closed":
return True
if self.state == "open":
if time.time() - self.last_failure_time > self.recovery_time:
self.state = "half-open"
return True
return False
return True # half-open: allow one test request
def record_success(self):
self.failures = 0
self.state = "closed"
def record_failure(self):
self.failures += 1
self.last_failure_time = time.time()
if self.failures >= self.failure_threshold:
self.state = "open"
Common Pitfalls
-
Over-orchestration — if a single prompt can handle it, adding agents adds cost and latency, not value
-
Circular dependencies in subtask graphs causing infinite loops; always validate DAG structure before execution
-
Context bleed — passing entire previous outputs to every stage; summarize or extract only what is needed
-
No timeout enforcement — a stuck agent blocks the entire pipeline; set wall-clock timeouts at every boundary
-
Silent failures — agent returns plausible but incorrect output; add validation stages for critical paths
-
Ignoring cost — 10 parallel Opus calls is expensive; model selection is a cost decision, not just a quality one
-
Stateless retries on stateful operations — ensure idempotency before enabling automatic retries
-
Single point of failure in orchestrator — if the orchestrator agent fails, the entire workflow fails
Best Practices
-
Start with a single prompt — only add agents when you prove one cannot handle the task
-
Type your handoffs — use dataclasses or TypedDicts for inter-agent data, not raw strings
-
Budget context upfront — calculate token allocations before running the pipeline
-
Use cheap models for routing — Haiku for classification costs 10x less than Sonnet
-
Validate DAG structure at build time, not runtime
-
Log every agent call with input hash, output hash, tokens, latency, and cost
-
Set SLAs per stage — if research takes >30s, timeout and use cached results
-
Test with production-scale inputs — a pipeline that works on 100 words may fail on 10,000