ai workflow orchestrator

AI Workflow Orchestrator

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "ai workflow orchestrator" with this command: npx skills add krosebrook/source-of-truth-monorepo/krosebrook-source-of-truth-monorepo-ai-workflow-orchestrator

AI Workflow Orchestrator

Enterprise AI workflow automation with n8n, Zapier, and custom orchestration.

n8n Workflow Patterns

AI Agent Workflow

{ "nodes": [ { "name": "Webhook", "type": "n8n-nodes-base.webhook", "typeVersion": 1, "position": [250, 300], "webhookId": "user-query" }, { "name": "OpenAI Chat", "type": "@n8n/n8n-nodes-langchain.lmChatOpenAi", "typeVersion": 1, "position": [450, 300], "parameters": { "model": "gpt-4-turbo", "temperature": 0.7, "systemMessage": "You are a helpful assistant." } }, { "name": "Vector Store Query", "type": "@n8n/n8n-nodes-langchain.vectorStorePinecone", "typeVersion": 1, "position": [450, 150], "parameters": { "operation": "retrieve", "topK": 5 } }, { "name": "Response Formatter", "type": "n8n-nodes-base.function", "typeVersion": 1, "position": [650, 300], "parameters": { "functionCode": "return items.map(item => ({\n json: {\n response: item.json.response,\n sources: item.json.sources,\n confidence: item.json.confidence\n }\n}));" } } ], "connections": { "Webhook": { "main": [[{ "node": "Vector Store Query", "type": "main", "index": 0 }]] }, "Vector Store Query": { "main": [[{ "node": "OpenAI Chat", "type": "main", "index": 0 }]] }, "OpenAI Chat": { "main": [[{ "node": "Response Formatter", "type": "main", "index": 0 }]] } } }

Multi-Agent Orchestration

// n8n Custom Node for Agent Orchestration import { IExecuteFunctions } from 'n8n-core'; import { INodeExecutionData, INodeType, INodeTypeDescription } from 'n8n-workflow';

export class MultiAgentOrchestrator implements INodeType { description: INodeTypeDescription = { displayName: 'Multi-Agent Orchestrator', name: 'multiAgentOrchestrator', group: ['transform'], version: 1, description: 'Orchestrate multiple AI agents', defaults: { name: 'Multi-Agent Orchestrator', }, inputs: ['main'], outputs: ['main'], properties: [ { displayName: 'Agents', name: 'agents', type: 'fixedCollection', typeOptions: { multipleValues: true, }, default: {}, options: [ { name: 'agentValues', displayName: 'Agent', values: [ { displayName: 'Agent Name', name: 'name', type: 'string', default: '', }, { displayName: 'Agent Type', name: 'type', type: 'options', options: [ { name: 'Researcher', value: 'researcher' }, { name: 'Writer', value: 'writer' }, { name: 'Reviewer', value: 'reviewer' }, ], default: 'researcher', }, ], }, ], }, ], };

async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> { const items = this.getInputData(); const returnData: INodeExecutionData[] = [];

for (let itemIndex = 0; itemIndex &#x3C; items.length; itemIndex++) {
  const input = this.getNodeParameter('input', itemIndex, '') as string;
  const agents = this.getNodeParameter('agents', itemIndex, {}) as any;

  const results: any[] = [];

  for (const agent of agents.agentValues || []) {
    const result = await this.executeAgent(agent.type, input);
    results.push({ agent: agent.name, result });
  }

  returnData.push({
    json: {
      input,
      results,
      summary: this.summarizeResults(results),
    },
  });
}

return [returnData];

}

private async executeAgent(type: string, input: string): Promise<string> { // Agent execution logic return Result from ${type} agent; }

private summarizeResults(results: any[]): string { return results.map(r => r.result).join(' '); } }

Zapier Integration Patterns

AI-Powered Email Automation

// Zapier Custom Code Action const inputData = inputData || {}; const { email_content, sender } = inputData;

// Call OpenAI API const response = await fetch('https://api.openai.com/v1/chat/completions', { method: 'POST', headers: { 'Authorization': Bearer ${process.env.OPENAI_API_KEY}, 'Content-Type': 'application/json', }, body: JSON.stringify({ model: 'gpt-4-turbo', messages: [ { role: 'system', content: 'You are an email assistant. Categorize and draft responses.', }, { role: 'user', content: Categorize and draft a response to this email:\n\n${email_content}, }, ], }), });

const data = await response.json(); const ai_response = data.choices[0].message.content;

// Parse AI response const category = ai_response.match(/Category: (.)/)?.[1] || 'General'; const draft_response = ai_response.match(/Draft:([\s\S])/)?.[1]?.trim() || '';

output = { category, draft_response, priority: category === 'Urgent' ? 'high' : 'normal', };

Custom Orchestration System

Workflow Engine

from typing import Dict, List, Callable, Any from pydantic import BaseModel import asyncio

class WorkflowStep(BaseModel): name: str function: str inputs: Dict[str, str] = {} condition: str | None = None

class Workflow(BaseModel): name: str steps: List[WorkflowStep]

class WorkflowEngine: def init(self): self.functions: Dict[str, Callable] = {} self.context: Dict[str, Any] = {}

def register_function(self, name: str, func: Callable):
    """Register a function that can be used in workflows."""
    self.functions[name] = func

async def execute_workflow(self, workflow: Workflow, initial_context: Dict[str, Any]):
    """Execute a workflow with the given context."""
    self.context = initial_context.copy()

    for step in workflow.steps:
        # Check condition
        if step.condition and not self._evaluate_condition(step.condition):
            continue

        # Prepare inputs
        inputs = {
            key: self._resolve_value(value)
            for key, value in step.inputs.items()
        }

        # Execute function
        func = self.functions.get(step.function)
        if not func:
            raise ValueError(f"Function not found: {step.function}")

        result = await func(**inputs) if asyncio.iscoroutinefunction(func) else func(**inputs)

        # Store result
        self.context[step.name] = result

    return self.context

def _resolve_value(self, value: str) -> Any:
    """Resolve context variables in string values."""
    if value.startswith('$'):
        return self.context.get(value[1:])
    return value

def _evaluate_condition(self, condition: str) -> bool:
    """Evaluate a condition expression."""
    return eval(condition, {}, self.context)

Usage

engine = WorkflowEngine()

Register AI functions

async def analyze_sentiment(text: str) -> str: # Call AI API return "positive"

async def generate_response(sentiment: str, text: str) -> str: # Call AI API return f"Response based on {sentiment} sentiment"

engine.register_function('analyze_sentiment', analyze_sentiment) engine.register_function('generate_response', generate_response)

Define workflow

workflow = Workflow( name="Email Response", steps=[ WorkflowStep( name="sentiment", function="analyze_sentiment", inputs={"text": "$email_content"} ), WorkflowStep( name="response", function="generate_response", inputs={"sentiment": "$sentiment", "text": "$email_content"}, condition="sentiment in ['positive', 'neutral']" ), ] )

Execute

result = await engine.execute_workflow( workflow, {"email_content": "Thank you for your help!"} )

Agent Chain Pattern

interface Agent { name: string; execute: (input: any) => Promise<any>; }

class AgentChain { private agents: Agent[] = [];

add(agent: Agent): this { this.agents.push(agent); return this; }

async execute(initialInput: any): Promise<any> { let result = initialInput;

for (const agent of this.agents) {
  console.log(`Executing ${agent.name}...`);
  result = await agent.execute(result);
}

return result;

} }

// Usage const chain = new AgentChain() .add({ name: 'Researcher', execute: async (query: string) => { // Research logic return { query, sources: ['source1', 'source2'] }; }, }) .add({ name: 'Summarizer', execute: async (data: any) => { // Summarization logic return { ...data, summary: 'Summary of research' }; }, }) .add({ name: 'Formatter', execute: async (data: any) => { // Formatting logic return { title: data.query, content: data.summary, references: data.sources, }; }, });

const result = await chain.execute('What is quantum computing?');

Integration Patterns

Webhook Handlers

from fastapi import FastAPI, Request from pydantic import BaseModel

app = FastAPI()

class ZapierWebhook(BaseModel): event: str data: dict

@app.post("/zapier/webhook") async def handle_zapier_webhook(webhook: ZapierWebhook): """Handle incoming Zapier webhooks.""" if webhook.event == "new_lead": await process_lead(webhook.data) elif webhook.event == "support_ticket": await process_ticket(webhook.data)

return {"status": "processed"}

@app.post("/n8n/webhook/{workflow_id}") async def handle_n8n_webhook(workflow_id: str, request: Request): """Handle incoming n8n webhooks.""" data = await request.json() result = await execute_workflow(workflow_id, data) return result

Task Queues

from celery import Celery from typing import Dict

celery = Celery('tasks', broker='redis://localhost:6379')

@celery.task def process_with_ai(data: Dict) -> Dict: """Process data with AI agent in background.""" result = ai_agent.process(data) notify_completion(result) return result

Trigger from workflow

process_with_ai.delay({"text": "Process this"})

Best Practices

✅ Use idempotent operations ✅ Implement error handling and retries ✅ Add logging and monitoring ✅ Use task queues for long-running operations ✅ Implement rate limiting ✅ Version your workflows ✅ Test workflows thoroughly ✅ Use environment variables for secrets ✅ Implement rollback mechanisms ✅ Monitor workflow performance

When to Use: AI workflow automation, n8n/Zapier integrations, multi-agent orchestration, business process automation.

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Automation

pydantic ai agent builder

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

git advanced workflow expert

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

testing-skills-with-subagents

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

dispatching-parallel-agents

No summary provided by upstream source.

Repository SourceNeeds Review