AI Workflow Orchestrator
Enterprise AI workflow automation with n8n, Zapier, and custom orchestration.
n8n Workflow Patterns
AI Agent Workflow
{ "nodes": [ { "name": "Webhook", "type": "n8n-nodes-base.webhook", "typeVersion": 1, "position": [250, 300], "webhookId": "user-query" }, { "name": "OpenAI Chat", "type": "@n8n/n8n-nodes-langchain.lmChatOpenAi", "typeVersion": 1, "position": [450, 300], "parameters": { "model": "gpt-4-turbo", "temperature": 0.7, "systemMessage": "You are a helpful assistant." } }, { "name": "Vector Store Query", "type": "@n8n/n8n-nodes-langchain.vectorStorePinecone", "typeVersion": 1, "position": [450, 150], "parameters": { "operation": "retrieve", "topK": 5 } }, { "name": "Response Formatter", "type": "n8n-nodes-base.function", "typeVersion": 1, "position": [650, 300], "parameters": { "functionCode": "return items.map(item => ({\n json: {\n response: item.json.response,\n sources: item.json.sources,\n confidence: item.json.confidence\n }\n}));" } } ], "connections": { "Webhook": { "main": [[{ "node": "Vector Store Query", "type": "main", "index": 0 }]] }, "Vector Store Query": { "main": [[{ "node": "OpenAI Chat", "type": "main", "index": 0 }]] }, "OpenAI Chat": { "main": [[{ "node": "Response Formatter", "type": "main", "index": 0 }]] } } }
Multi-Agent Orchestration
// n8n Custom Node for Agent Orchestration import { IExecuteFunctions } from 'n8n-core'; import { INodeExecutionData, INodeType, INodeTypeDescription } from 'n8n-workflow';
export class MultiAgentOrchestrator implements INodeType { description: INodeTypeDescription = { displayName: 'Multi-Agent Orchestrator', name: 'multiAgentOrchestrator', group: ['transform'], version: 1, description: 'Orchestrate multiple AI agents', defaults: { name: 'Multi-Agent Orchestrator', }, inputs: ['main'], outputs: ['main'], properties: [ { displayName: 'Agents', name: 'agents', type: 'fixedCollection', typeOptions: { multipleValues: true, }, default: {}, options: [ { name: 'agentValues', displayName: 'Agent', values: [ { displayName: 'Agent Name', name: 'name', type: 'string', default: '', }, { displayName: 'Agent Type', name: 'type', type: 'options', options: [ { name: 'Researcher', value: 'researcher' }, { name: 'Writer', value: 'writer' }, { name: 'Reviewer', value: 'reviewer' }, ], default: 'researcher', }, ], }, ], }, ], };
async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> { const items = this.getInputData(); const returnData: INodeExecutionData[] = [];
for (let itemIndex = 0; itemIndex < items.length; itemIndex++) {
const input = this.getNodeParameter('input', itemIndex, '') as string;
const agents = this.getNodeParameter('agents', itemIndex, {}) as any;
const results: any[] = [];
for (const agent of agents.agentValues || []) {
const result = await this.executeAgent(agent.type, input);
results.push({ agent: agent.name, result });
}
returnData.push({
json: {
input,
results,
summary: this.summarizeResults(results),
},
});
}
return [returnData];
}
private async executeAgent(type: string, input: string): Promise<string> {
// Agent execution logic
return Result from ${type} agent;
}
private summarizeResults(results: any[]): string { return results.map(r => r.result).join(' '); } }
Zapier Integration Patterns
AI-Powered Email Automation
// Zapier Custom Code Action const inputData = inputData || {}; const { email_content, sender } = inputData;
// Call OpenAI API
const response = await fetch('https://api.openai.com/v1/chat/completions', {
method: 'POST',
headers: {
'Authorization': Bearer ${process.env.OPENAI_API_KEY},
'Content-Type': 'application/json',
},
body: JSON.stringify({
model: 'gpt-4-turbo',
messages: [
{
role: 'system',
content: 'You are an email assistant. Categorize and draft responses.',
},
{
role: 'user',
content: Categorize and draft a response to this email:\n\n${email_content},
},
],
}),
});
const data = await response.json(); const ai_response = data.choices[0].message.content;
// Parse AI response const category = ai_response.match(/Category: (.)/)?.[1] || 'General'; const draft_response = ai_response.match(/Draft:([\s\S])/)?.[1]?.trim() || '';
output = { category, draft_response, priority: category === 'Urgent' ? 'high' : 'normal', };
Custom Orchestration System
Workflow Engine
from typing import Dict, List, Callable, Any from pydantic import BaseModel import asyncio
class WorkflowStep(BaseModel): name: str function: str inputs: Dict[str, str] = {} condition: str | None = None
class Workflow(BaseModel): name: str steps: List[WorkflowStep]
class WorkflowEngine: def init(self): self.functions: Dict[str, Callable] = {} self.context: Dict[str, Any] = {}
def register_function(self, name: str, func: Callable):
"""Register a function that can be used in workflows."""
self.functions[name] = func
async def execute_workflow(self, workflow: Workflow, initial_context: Dict[str, Any]):
"""Execute a workflow with the given context."""
self.context = initial_context.copy()
for step in workflow.steps:
# Check condition
if step.condition and not self._evaluate_condition(step.condition):
continue
# Prepare inputs
inputs = {
key: self._resolve_value(value)
for key, value in step.inputs.items()
}
# Execute function
func = self.functions.get(step.function)
if not func:
raise ValueError(f"Function not found: {step.function}")
result = await func(**inputs) if asyncio.iscoroutinefunction(func) else func(**inputs)
# Store result
self.context[step.name] = result
return self.context
def _resolve_value(self, value: str) -> Any:
"""Resolve context variables in string values."""
if value.startswith('$'):
return self.context.get(value[1:])
return value
def _evaluate_condition(self, condition: str) -> bool:
"""Evaluate a condition expression."""
return eval(condition, {}, self.context)
Usage
engine = WorkflowEngine()
Register AI functions
async def analyze_sentiment(text: str) -> str: # Call AI API return "positive"
async def generate_response(sentiment: str, text: str) -> str: # Call AI API return f"Response based on {sentiment} sentiment"
engine.register_function('analyze_sentiment', analyze_sentiment) engine.register_function('generate_response', generate_response)
Define workflow
workflow = Workflow( name="Email Response", steps=[ WorkflowStep( name="sentiment", function="analyze_sentiment", inputs={"text": "$email_content"} ), WorkflowStep( name="response", function="generate_response", inputs={"sentiment": "$sentiment", "text": "$email_content"}, condition="sentiment in ['positive', 'neutral']" ), ] )
Execute
result = await engine.execute_workflow( workflow, {"email_content": "Thank you for your help!"} )
Agent Chain Pattern
interface Agent { name: string; execute: (input: any) => Promise<any>; }
class AgentChain { private agents: Agent[] = [];
add(agent: Agent): this { this.agents.push(agent); return this; }
async execute(initialInput: any): Promise<any> { let result = initialInput;
for (const agent of this.agents) {
console.log(`Executing ${agent.name}...`);
result = await agent.execute(result);
}
return result;
} }
// Usage const chain = new AgentChain() .add({ name: 'Researcher', execute: async (query: string) => { // Research logic return { query, sources: ['source1', 'source2'] }; }, }) .add({ name: 'Summarizer', execute: async (data: any) => { // Summarization logic return { ...data, summary: 'Summary of research' }; }, }) .add({ name: 'Formatter', execute: async (data: any) => { // Formatting logic return { title: data.query, content: data.summary, references: data.sources, }; }, });
const result = await chain.execute('What is quantum computing?');
Integration Patterns
Webhook Handlers
from fastapi import FastAPI, Request from pydantic import BaseModel
app = FastAPI()
class ZapierWebhook(BaseModel): event: str data: dict
@app.post("/zapier/webhook") async def handle_zapier_webhook(webhook: ZapierWebhook): """Handle incoming Zapier webhooks.""" if webhook.event == "new_lead": await process_lead(webhook.data) elif webhook.event == "support_ticket": await process_ticket(webhook.data)
return {"status": "processed"}
@app.post("/n8n/webhook/{workflow_id}") async def handle_n8n_webhook(workflow_id: str, request: Request): """Handle incoming n8n webhooks.""" data = await request.json() result = await execute_workflow(workflow_id, data) return result
Task Queues
from celery import Celery from typing import Dict
celery = Celery('tasks', broker='redis://localhost:6379')
@celery.task def process_with_ai(data: Dict) -> Dict: """Process data with AI agent in background.""" result = ai_agent.process(data) notify_completion(result) return result
Trigger from workflow
process_with_ai.delay({"text": "Process this"})
Best Practices
✅ Use idempotent operations ✅ Implement error handling and retries ✅ Add logging and monitoring ✅ Use task queues for long-running operations ✅ Implement rate limiting ✅ Version your workflows ✅ Test workflows thoroughly ✅ Use environment variables for secrets ✅ Implement rollback mechanisms ✅ Monitor workflow performance
When to Use: AI workflow automation, n8n/Zapier integrations, multi-agent orchestration, business process automation.