agent-workflow-designer

Agent Workflow Designer

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "agent-workflow-designer" with this command: npx skills add borghei/claude-skills/borghei-claude-skills-agent-workflow-designer

Agent Workflow Designer

Tier: POWERFUL Category: Engineering / AI Systems Maintainer: Claude Skills Team

Overview

Design production-grade multi-agent orchestration systems from requirements. Covers five core patterns (sequential pipeline, parallel fan-out/fan-in, hierarchical delegation, event-driven reactor, consensus validation), agent routing strategies, structured handoff protocols, persistent state management, error recovery with circuit breakers, context window budgeting, and cost optimization. Includes framework-specific implementations for LangGraph, CrewAI, AutoGen, and native Claude Code agent teams.

Keywords

agent workflow, multi-agent orchestration, workflow DAG, agent routing, fan-out fan-in, hierarchical delegation, handoff protocol, state management, agent pipeline, LangGraph, CrewAI, AutoGen, context budgeting

Core Capabilities

  1. Pattern Selection and Design
  • Sequential pipelines with typed handoffs

  • Parallel fan-out/fan-in with merge strategies

  • Hierarchical delegation with dynamic subtask discovery

  • Event-driven reactors with pub/sub agent triggers

  • Consensus validation with voting and arbitration

  1. Agent Routing
  • Intent-based routing with classifier agents

  • Skill-based routing using capability matching

  • Cost-aware routing (cheap models for simple tasks)

  • Load-balanced routing across agent pools

  • Fallback chains with graceful degradation

  1. State and Context Management
  • Persistent workflow state across agent hops

  • Context window budgeting and summarization

  • Checkpoint/resume for long-running workflows

  • Conflict resolution for parallel state updates

  1. Reliability Engineering
  • Circuit breakers for failing agents

  • Retry with exponential backoff and model fallback

  • Dead letter queues for unprocessable tasks

  • Timeout enforcement at every agent boundary

  • Idempotent operations for safe retries

When to Use

  • Building multi-step AI pipelines that exceed one agent's capability

  • Parallelizing research, analysis, or generation tasks

  • Creating specialist agent teams with defined roles and contracts

  • Designing fault-tolerant AI workflows for production deployment

  • Optimizing cost across workflows with mixed model tiers

Pattern Selection Decision Tree

What does the workflow look like? │ ├─ Linear: step A feeds step B feeds step C │ └─ SEQUENTIAL PIPELINE │ Best for: content pipelines, code review chains, data transformation │ ├─ Parallel: N independent tasks, then combine │ └─ FAN-OUT / FAN-IN │ Best for: competitive research, multi-source analysis, parallel code gen │ ├─ Tree: orchestrator breaks work into subtasks dynamically │ └─ HIERARCHICAL DELEGATION │ Best for: complex projects, open-ended research, code generation with planning │ ├─ Reactive: agents respond to events/triggers │ └─ EVENT-DRIVEN REACTOR │ Best for: monitoring, alerting, continuous integration, chat workflows │ └─ Verification: multiple agents must agree on output └─ CONSENSUS VALIDATION Best for: high-stakes decisions, code review, fact checking, safety-critical output

Pattern 1: Sequential Pipeline

Each stage transforms input and passes structured output to the next. Type-safe handoffs prevent data loss between stages.

LangGraph Implementation

from langgraph.graph import StateGraph, END from typing import TypedDict, Annotated from langchain_anthropic import ChatAnthropic

class PipelineState(TypedDict): topic: str research: str draft: str final: str stage_costs: Annotated[list[dict], "append"] # accumulates cost per stage

def research_stage(state: PipelineState) -> dict: model = ChatAnthropic(model="claude-sonnet-4-20250514", max_tokens=2048) result = model.invoke( f"Research the following topic thoroughly. Provide key facts, statistics, " f"and expert perspectives:\n\n{state['topic']}" ) return { "research": result.content, "stage_costs": [{"stage": "research", "tokens": result.usage_metadata["total_tokens"]}], }

def writing_stage(state: PipelineState) -> dict: model = ChatAnthropic(model="claude-sonnet-4-20250514", max_tokens=4096) result = model.invoke( f"Using this research, write a compelling 800-word blog post with a hook, " f"3 main sections, and a CTA:\n\n{state['research']}" ) return { "draft": result.content, "stage_costs": [{"stage": "writing", "tokens": result.usage_metadata["total_tokens"]}], }

def editing_stage(state: PipelineState) -> dict: model = ChatAnthropic(model="claude-haiku-4-20250514", max_tokens=4096) result = model.invoke( f"Edit this draft for clarity, flow, and grammar. Return only the improved " f"version:\n\n{state['draft']}" ) return { "final": result.content, "stage_costs": [{"stage": "editing", "tokens": result.usage_metadata["total_tokens"]}], }

Build the graph

graph = StateGraph(PipelineState) graph.add_node("research", research_stage) graph.add_node("write", writing_stage) graph.add_node("edit", editing_stage) graph.add_edge("research", "write") graph.add_edge("write", "edit") graph.add_edge("edit", END) graph.set_entry_point("research")

pipeline = graph.compile()

Execute

result = pipeline.invoke({"topic": "The future of AI agents in enterprise software"}) print(f"Total cost: {sum(s['tokens'] for s in result['stage_costs'])} tokens")

Pattern 2: Parallel Fan-Out / Fan-In

Independent tasks run concurrently. A merge function combines results.

import asyncio from dataclasses import dataclass

@dataclass class FanOutTask: name: str system_prompt: str user_message: str model: str = "claude-sonnet-4-20250514"

@dataclass class FanOutResult: task_name: str output: str tokens_used: int success: bool error: str | None = None

async def fan_out_fan_in( tasks: list[FanOutTask], merge_prompt: str, max_concurrent: int = 5, timeout_seconds: float = 60.0, ) -> dict: """Execute tasks in parallel with concurrency limit and timeout.""" import anthropic

client = anthropic.AsyncAnthropic()
semaphore = asyncio.Semaphore(max_concurrent)

async def run_one(task: FanOutTask) -> FanOutResult:
    async with semaphore:
        try:
            response = await asyncio.wait_for(
                client.messages.create(
                    model=task.model,
                    max_tokens=2048,
                    system=task.system_prompt,
                    messages=[{"role": "user", "content": task.user_message}],
                ),
                timeout=timeout_seconds,
            )
            return FanOutResult(
                task_name=task.name,
                output=response.content[0].text,
                tokens_used=response.usage.input_tokens + response.usage.output_tokens,
                success=True,
            )
        except Exception as e:
            return FanOutResult(
                task_name=task.name, output="", tokens_used=0,
                success=False, error=str(e),
            )

# FAN-OUT: run all tasks concurrently
results = await asyncio.gather(*[run_one(t) for t in tasks])
successful = [r for r in results if r.success]
failed = [r for r in results if not r.success]

if not successful:
    raise RuntimeError(f"All {len(tasks)} fan-out tasks failed: {[f.error for f in failed]}")

# FAN-IN: merge results
combined = "\n\n---\n\n".join(
    f"## {r.task_name}\n{r.output}" for r in successful
)

merge_response = await client.messages.create(
    model="claude-sonnet-4-20250514",
    max_tokens=4096,
    system="Synthesize the following parallel analyses into a unified report.",
    messages=[{"role": "user", "content": f"{merge_prompt}\n\n{combined}"}],
)

return {
    "synthesis": merge_response.content[0].text,
    "individual_results": successful,
    "failures": failed,
    "total_tokens": sum(r.tokens_used for r in results) + merge_response.usage.input_tokens + merge_response.usage.output_tokens,
}

Pattern 3: Hierarchical Delegation

An orchestrator agent dynamically decomposes work and delegates to specialists.

from typing import Literal

SPECIALISTS = { "researcher": "Find accurate information with sources. Be thorough and cite evidence.", "coder": "Write clean, tested code. Include error handling and type hints.", "writer": "Create clear, engaging content. Match the requested tone and format.", "analyst": "Analyze data and produce evidence-backed conclusions with visualizations.", "reviewer": "Review work product for quality, accuracy, and completeness.", }

@dataclass class SubTask: id: str agent: Literal["researcher", "coder", "writer", "analyst", "reviewer"] task: str depends_on: list[str] priority: int = 0 # higher = run first when deps are equal

class HierarchicalOrchestrator: def init(self, client): self.client = client

async def plan(self, request: str) -> list[SubTask]:
    """Orchestrator creates an execution plan with dependencies."""
    response = await self.client.messages.create(
        model="claude-sonnet-4-20250514",
        max_tokens=2048,
        system=f"""You are a task orchestrator. Break down the request into subtasks.

Available specialists: {', '.join(SPECIALISTS.keys())} Respond with JSON: {{"subtasks": [{{"id": "1", "agent": "researcher", "task": "...", "depends_on": []}}]}} Rules:

  • Minimize the number of subtasks (prefer fewer, more substantial tasks)

  • Only add dependencies when output is genuinely needed

  • Independent tasks should have empty depends_on for parallel execution""", messages=[{"role": "user", "content": request}], ) import json plan = json.loads(response.content[0].text) return [SubTask(**st) for st in plan["subtasks"]]

    async def execute(self, request: str) -> str: """Plan, execute with dependency resolution, and synthesize.""" subtasks = await self.plan(request) results = {}

      # Execute in dependency order, parallelize where possible
      for batch in self._batch_by_dependencies(subtasks):
          batch_results = await asyncio.gather(*[
              self._run_specialist(st, results) for st in batch
          ])
          for st, result in zip(batch, batch_results):
              results[st.id] = result
    
      # Final synthesis
      all_outputs = "\n\n".join(f"### {k}\n{v}" for k, v in results.items())
      synthesis = await self.client.messages.create(
          model="claude-sonnet-4-20250514",
          max_tokens=4096,
          system="Synthesize specialist outputs into a coherent final response.",
          messages=[{"role": "user", "content": f"Request: {request}\n\nOutputs:\n{all_outputs}"}],
      )
      return synthesis.content[0].text
    

    def _batch_by_dependencies(self, subtasks: list[SubTask]) -> list[list[SubTask]]: """Group subtasks into batches that can run in parallel.""" completed = set() remaining = list(subtasks) batches = [] while remaining: batch = [t for t in remaining if all(d in completed for d in t.depends_on)] if not batch: raise ValueError("Circular dependency detected in subtask plan") batches.append(sorted(batch, key=lambda t: -t.priority)) completed.update(t.id for t in batch) remaining = [t for t in remaining if t.id not in completed] return batches

Pattern 4: Event-Driven Reactor

Agents react to events from a message bus. Decoupled and scalable.

from collections import defaultdict from typing import Callable, Any

class AgentEventBus: """Simple event bus for agent-to-agent communication."""

def __init__(self):
    self._handlers: dict[str, list[Callable]] = defaultdict(list)
    self._history: list[dict] = []

def subscribe(self, event_type: str, handler: Callable):
    self._handlers[event_type].append(handler)

async def publish(self, event_type: str, payload: Any, source: str):
    event = {"type": event_type, "payload": payload, "source": source}
    self._history.append(event)
    handlers = self._handlers.get(event_type, [])
    results = await asyncio.gather(
        *[h(event) for h in handlers],
        return_exceptions=True,
    )
    errors = [(h, r) for h, r in zip(handlers, results) if isinstance(r, Exception)]
    if errors:
        for handler, error in errors:
            print(f"Handler {handler.__name__} failed: {error}")
    return results

Usage: code review pipeline triggered by PR events

bus = AgentEventBus()

async def on_pr_opened(event): """Security agent scans PR for vulnerabilities.""" diff = event["payload"]["diff"] # ... scan and publish results await bus.publish("security_scan_complete", {"findings": findings}, "security-agent")

async def on_security_complete(event): """Review agent incorporates security findings into review.""" # ... generate review with security context

bus.subscribe("pr_opened", on_pr_opened) bus.subscribe("security_scan_complete", on_security_complete)

Pattern 5: Consensus Validation

Multiple agents independently evaluate the same input. A quorum determines the final output.

@dataclass class Vote: agent: str verdict: str # "approve" | "reject" | "revise" confidence: float # 0.0 - 1.0 reasoning: str

async def consensus_validate( content: str, validators: list[dict], # [{"name": "...", "system": "..."}] quorum: float = 0.66, confidence_threshold: float = 0.7, ) -> dict: """Run content through multiple validators and determine consensus.""" votes: list[Vote] = []

# Collect independent votes (no agent sees another's vote)
vote_tasks = []
for v in validators:
    vote_tasks.append(get_agent_vote(v["name"], v["system"], content))
raw_votes = await asyncio.gather(*vote_tasks)
votes = [v for v in raw_votes if v is not None]

# Calculate consensus
approvals = [v for v in votes if v.verdict == "approve"]
approval_rate = len(approvals) / len(votes) if votes else 0
avg_confidence = sum(v.confidence for v in votes) / len(votes) if votes else 0

if approval_rate >= quorum and avg_confidence >= confidence_threshold:
    return {"decision": "approved", "approval_rate": approval_rate, "votes": votes}
elif any(v.verdict == "reject" for v in votes):
    rejections = [v for v in votes if v.verdict == "reject"]
    return {"decision": "rejected", "reasons": [r.reasoning for r in rejections], "votes": votes}
else:
    return {"decision": "needs_revision", "feedback": [v.reasoning for v in votes], "votes": votes}

Agent Routing Strategies

Intent-Based Router

class IntentRouter: """Route requests to specialized agents based on intent classification."""

ROUTING_TABLE = {
    "code_generation": {"agent": "coder", "model": "claude-sonnet-4-20250514"},
    "code_review": {"agent": "reviewer", "model": "claude-sonnet-4-20250514"},
    "research": {"agent": "researcher", "model": "claude-sonnet-4-20250514"},
    "simple_question": {"agent": "assistant", "model": "claude-haiku-4-20250514"},
    "creative_writing": {"agent": "writer", "model": "claude-sonnet-4-20250514"},
    "complex_analysis": {"agent": "analyst", "model": "claude-sonnet-4-20250514"},
}

async def route(self, message: str) -> dict:
    # Use a fast, cheap model for classification
    classification = await self.client.messages.create(
        model="claude-haiku-4-20250514",
        max_tokens=50,
        system="Classify the user intent. Respond with ONLY one of: code_generation, code_review, research, simple_question, creative_writing, complex_analysis",
        messages=[{"role": "user", "content": message}],
    )
    intent = classification.content[0].text.strip().lower()
    return self.ROUTING_TABLE.get(intent, self.ROUTING_TABLE["simple_question"])

Context Window Budgeting

MODEL_LIMITS = { "claude-sonnet-4-20250514": 200_000, "claude-haiku-4-20250514": 200_000, "claude-opus-4-20250514": 200_000, "gpt-4o": 128_000, }

class ContextBudget: def init(self, model: str, pipeline_stages: int, reserve_pct: float = 0.15): self.total = MODEL_LIMITS.get(model, 128_000) self.reserve = int(self.total * reserve_pct) self.per_stage = (self.total - self.reserve) // pipeline_stages self.used = 0

def allocate(self, stage: str) -> int:
    available = self.total - self.reserve - self.used
    allocation = min(self.per_stage, int(available * 0.6))
    return max(allocation, 1000)  # minimum 1000 tokens per stage

def consume(self, tokens: int):
    self.used += tokens

def summarize_if_needed(self, text: str, budget: int) -> str:
    estimated_tokens = len(text) // 4
    if estimated_tokens <= budget:
        return text
    # Truncate to budget with marker
    char_limit = budget * 4
    return text[:char_limit] + "\n\n[Content truncated to fit context budget]"

Cost Optimization Matrix

Strategy Cost Reduction Quality Impact When to Use

Haiku for routing/classification 85-90% Minimal Always for intent routing

Haiku for editing/formatting 60-70% Low Mechanical tasks

Sonnet for most stages Baseline Baseline Default choice

Opus only for final synthesis +50% on that stage Higher quality High-stakes output

Prompt caching (system prompts) 50-90% per call None Repeated system prompts

Truncate intermediate outputs 20-40% May lose detail Long pipelines

Parallel + early termination 30-50% None if threshold met Search/validation tasks

Batch similar requests Up to 50% Increased latency Non-real-time workloads

Reliability Patterns

Circuit Breaker

import time

class CircuitBreaker: """Prevent cascading failures when an agent/model is down."""

def __init__(self, failure_threshold: int = 5, recovery_time: float = 60.0):
    self.failure_threshold = failure_threshold
    self.recovery_time = recovery_time
    self.failures = 0
    self.state = "closed"  # closed = healthy, open = failing, half-open = testing
    self.last_failure_time = 0.0

def can_execute(self) -> bool:
    if self.state == "closed":
        return True
    if self.state == "open":
        if time.time() - self.last_failure_time > self.recovery_time:
            self.state = "half-open"
            return True
        return False
    return True  # half-open: allow one test request

def record_success(self):
    self.failures = 0
    self.state = "closed"

def record_failure(self):
    self.failures += 1
    self.last_failure_time = time.time()
    if self.failures >= self.failure_threshold:
        self.state = "open"

Common Pitfalls

  • Over-orchestration — if a single prompt can handle it, adding agents adds cost and latency, not value

  • Circular dependencies in subtask graphs causing infinite loops; always validate DAG structure before execution

  • Context bleed — passing entire previous outputs to every stage; summarize or extract only what is needed

  • No timeout enforcement — a stuck agent blocks the entire pipeline; set wall-clock timeouts at every boundary

  • Silent failures — agent returns plausible but incorrect output; add validation stages for critical paths

  • Ignoring cost — 10 parallel Opus calls is expensive; model selection is a cost decision, not just a quality one

  • Stateless retries on stateful operations — ensure idempotency before enabling automatic retries

  • Single point of failure in orchestrator — if the orchestrator agent fails, the entire workflow fails

Best Practices

  • Start with a single prompt — only add agents when you prove one cannot handle the task

  • Type your handoffs — use dataclasses or TypedDicts for inter-agent data, not raw strings

  • Budget context upfront — calculate token allocations before running the pipeline

  • Use cheap models for routing — Haiku for classification costs 10x less than Sonnet

  • Validate DAG structure at build time, not runtime

  • Log every agent call with input hash, output hash, tokens, latency, and cost

  • Set SLAs per stage — if research takes >30s, timeout and use cached results

  • Test with production-scale inputs — a pipeline that works on 100 words may fail on 10,000

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Automation

ml-ops-engineer

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

senior-secops

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

agent-designer

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

self-improving-agent

No summary provided by upstream source.

Repository SourceNeeds Review