Flow Coordinator
Lightweight workflow coordinator supporting two workflow formats:
-
Legacy Templates: Command chains with slash-command execution
-
Unified Workflows: DAG-based PromptTemplate nodes (spec: spec/unified-workflow-spec.md )
Specification Reference
-
Unified Workflow Spec: @spec/unified-workflow-spec.md
-
Demo Workflow: ccw/data/flows/demo-unified-workflow.json
Architecture
User Task → Detect Format → Select Workflow → Init Status → Execute → Complete │ │ ├─ Legacy Template │ │ └─ Sequential cmd execution │ │ │ └─ Unified Workflow │ └─ DAG traversal with contextRefs │ │ └──────────────── Resume (from status.json) ──────────────┘
Execution Modes: ├─ analysis → Read-only, CLI --mode analysis ├─ write → File changes, CLI --mode write ├─ mainprocess → Blocking, synchronous └─ async → Background, ccw cli
Core Concepts
Dual Format Support:
-
Legacy: templates/*.json with cmd , args , execution
-
Unified: ccw/data/flows/*.json with nodes , edges , contextRefs
Unified PromptTemplate Model: All workflow steps are natural language instructions with:
-
instruction : What to execute (natural language)
-
slashCommand : Optional slash command name (e.g., "workflow:plan")
-
slashArgs : Optional arguments for slash command (supports {{variable}})
-
outputName : Name for output reference
-
contextRefs : References to previous step outputs
-
tool : Optional CLI tool (gemini/qwen/codex/claude)
-
mode : Execution mode (analysis/write/mainprocess/async)
DAG Execution: Unified workflows execute as directed acyclic graphs with parallel branches and conditional edges.
Dynamic Discovery: Both formats discovered at runtime via Glob.
Execution Flow
async function execute(task) { // 1. Discover and select template const templates = await discoverTemplates(); const template = await selectTemplate(templates);
// 2. Init status
const sessionId = fc-${timestamp()};
const statusPath = .workflow/.flow-coordinator/${sessionId}/status.json;
const status = initStatus(template, task);
write(statusPath, JSON.stringify(status, null, 2));
// 3. Execute steps based on execution config await executeSteps(status, statusPath); }
async function executeSteps(status, statusPath) { for (let i = status.current; i < status.steps.length; i++) { const step = status.steps[i]; status.current = i;
// Execute based on step mode (all steps use slash-command type)
const execConfig = step.execution || { type: 'slash-command', mode: 'mainprocess' };
if (execConfig.mode === 'async') {
// Async execution - stop and wait for hook callback
await executeSlashCommandAsync(step, status, statusPath);
break;
} else {
// Mainprocess execution - continue immediately
await executeSlashCommandSync(step, status);
step.status = 'done';
write(statusPath, JSON.stringify(status, null, 2));
}
}
// All steps complete if (status.current >= status.steps.length) { status.complete = true; write(statusPath, JSON.stringify(status, null, 2)); } }
Unified Workflow Execution
For workflows using the unified PromptTemplate format (ccw/data/flows/*.json ):
async function executeUnifiedWorkflow(workflow, task) {
// 1. Initialize execution state
const sessionId = ufc-${timestamp()};
const statusPath = .workflow/.flow-coordinator/${sessionId}/status.json;
const state = {
id: sessionId,
workflow: workflow.id,
goal: task,
nodeStates: {}, // nodeId -> { status, result, error }
outputs: {}, // outputName -> result
complete: false
};
// 2. Topological sort for execution order const executionOrder = topologicalSort(workflow.nodes, workflow.edges);
// 3. Execute nodes respecting DAG dependencies await executeDAG(workflow, executionOrder, state, statusPath); }
async function executeDAG(workflow, order, state, statusPath) { for (const nodeId of order) { const node = workflow.nodes.find(n => n.id === nodeId); const data = node.data;
// Check if all dependencies are satisfied
if (!areDependenciesSatisfied(nodeId, workflow.edges, state)) {
continue; // Will be executed when dependencies complete
}
// Build instruction from slashCommand or raw instruction
let instruction = buildNodeInstruction(data, state.outputs);
// Execute based on mode
state.nodeStates[nodeId] = { status: 'running' };
write(statusPath, JSON.stringify(state, null, 2));
const result = await executeNode(instruction, data.tool, data.mode);
// Store output for downstream nodes
state.nodeStates[nodeId] = { status: 'completed', result };
if (data.outputName) {
state.outputs[data.outputName] = result;
}
write(statusPath, JSON.stringify(state, null, 2));
}
state.complete = true; write(statusPath, JSON.stringify(state, null, 2)); }
/**
- Build node instruction from slashCommand or raw instruction
- Handles slashCommand/slashArgs fields from frontend orchestrator */ function buildNodeInstruction(data, outputs) { const refs = data.contextRefs || [];
// If slashCommand is set, construct instruction from it if (data.slashCommand) { // Resolve variables in slashArgs const args = data.slashArgs ? resolveContextRefs(data.slashArgs, refs, outputs) : '';
// Build slash command instruction
let instruction = `/${data.slashCommand}${args ? ' ' + args : ''}`;
// Append additional instruction if provided
if (data.instruction) {
const additionalInstruction = resolveContextRefs(data.instruction, refs, outputs);
instruction = `${instruction}\n\n${additionalInstruction}`;
}
return instruction;
}
// Fallback: use raw instruction with context refs resolved return resolveContextRefs(data.instruction || '', refs, outputs); }
function resolveContextRefs(instruction, refs, outputs) {
let resolved = instruction;
for (const ref of refs) {
const value = outputs[ref];
const placeholder = {{${ref}}};
resolved = resolved.replace(new RegExp(placeholder, 'g'),
typeof value === 'object' ? JSON.stringify(value) : String(value));
}
return resolved;
}
async function executeNode(instruction, tool, mode) { // Build CLI command based on tool and mode const cliTool = tool || 'gemini'; const cliMode = mode === 'write' ? 'write' : 'analysis';
if (mode === 'async') {
// Background execution
return Bash(
ccw cli -p "${escapePrompt(instruction)}" --tool ${cliTool} --mode ${cliMode},
{ run_in_background: true }
);
} else {
// Synchronous execution
return Bash(
ccw cli -p "${escapePrompt(instruction)}" --tool ${cliTool} --mode ${cliMode}
);
}
}
Unified Workflow Discovery
async function discoverUnifiedWorkflows() { const files = Glob('*.json', { path: 'ccw/data/flows/' });
const workflows = []; for (const file of files) { const content = JSON.parse(Read(file)); // Detect unified format by checking for 'nodes' array if (content.nodes && Array.isArray(content.nodes)) { workflows.push({ id: content.id, name: content.name, description: content.description, nodeCount: content.nodes.length, format: 'unified', file: file }); } } return workflows; }
Format Detection
function detectWorkflowFormat(content) { if (content.nodes && content.edges) { return 'unified'; // PromptTemplate DAG format } if (content.steps && content.steps[0]?.cmd) { return 'legacy'; // Command chain format } throw new Error('Unknown workflow format'); }
Legacy Template Discovery
Dynamic query - never hardcode template list:
async function discoverTemplates() { // Discover all JSON templates const files = Glob('*.json', { path: 'templates/' });
// Parse each template const templates = []; for (const file of files) { const content = JSON.parse(Read(file)); templates.push({ name: content.name, description: content.description, steps: content.steps.map(s => s.cmd).join(' → '), file: file }); }
return templates; }
Template Selection
User chooses from discovered templates:
async function selectTemplate(templates) { // Build options from discovered templates const options = templates.slice(0, 4).map(t => ({ label: t.name, description: t.steps }));
const response = await AskUserQuestion({ questions: [{ question: 'Select workflow template:', header: 'Template', options: options, multiSelect: false }] });
// Handle "Other" - show remaining templates or custom input if (response.template === 'Other') { return await selectFromRemainingTemplates(templates.slice(4)); }
return templates.find(t => t.name === response.template); }
Status Schema
Creation: Copy template JSON → Update id , template , goal , set all steps status: "pending"
Location: .workflow/.flow-coordinator/{session-id}/status.json
Core Fields:
-
id : Session ID (fc-YYYYMMDD-HHMMSS)
-
template : Template name
-
goal : User task description
-
current : Current step index
-
steps[] : Step array from template (with runtime status , session , taskId )
-
complete : All steps done?
Step Status: pending → running → done | failed | skipped
Extended Template Schema
Templates stored in: templates/*.json (discovered at runtime via Glob)
TemplateStep Fields:
-
cmd : Full command path (e.g., /workflow:lite-plan , /workflow:execute )
-
args? : Arguments with {{goal}} and {{prev}} placeholders
-
unit? : Minimum execution unit name (groups related commands)
-
optional? : Can be skipped by user
-
execution : Type and mode configuration
-
type : Always 'slash-command' (for all workflow commands)
-
mode : 'mainprocess' (blocking) or 'async' (background)
-
contextHint? : Natural language guidance for context assembly
Template Example:
{ "name": "rapid", "steps": [ { "cmd": "/workflow:lite-plan", "args": ""{{goal}}"", "unit": "quick-implementation", "execution": { "type": "slash-command", "mode": "mainprocess" }, "contextHint": "Create lightweight implementation plan" }, { "cmd": "/workflow:lite-execute", "args": "--in-memory", "unit": "quick-implementation", "execution": { "type": "slash-command", "mode": "async" }, "contextHint": "Execute plan from previous step" } ] }
Execution Implementation
Mainprocess Mode (Blocking)
async function executeSlashCommandSync(step, status) { // Build command: /workflow:cmd -y args const cmd = buildCommand(step, status); const result = await SlashCommand({ command: cmd });
step.session = result.session_id; step.status = 'done'; return result; }
Async Mode (Background)
async function executeSlashCommandAsync(step, status, statusPath) { // Build prompt: /workflow:cmd -y args + context const prompt = buildCommandPrompt(step, status);
step.status = 'running'; write(statusPath, JSON.stringify(status, null, 2));
// Execute via ccw cli in background
const taskId = Bash(
ccw cli -p "${escapePrompt(prompt)}" --tool claude --mode write,
{ run_in_background: true }
).task_id;
step.taskId = taskId; write(statusPath, JSON.stringify(status, null, 2));
console.log(Executing: ${step.cmd} (async));
console.log(Resume: /flow-coordinator --resume ${status.id});
}
Prompt Building
Prompts are built in format: /workflow:cmd -y args
- context
function buildCommandPrompt(step, status) {
// step.cmd already contains full path: /workflow:lite-plan, /workflow:execute, etc.
let prompt = ${step.cmd} -y;
// Add arguments (with placeholder replacement)
if (step.args) {
const args = step.args
.replace('{{goal}}', status.goal)
.replace('{{prev}}', getPreviousSessionId(status));
prompt += ${args};
}
// Add context based on contextHint
if (step.contextHint) {
const context = buildContextFromHint(step.contextHint, status);
prompt += \n\nContext:\n${context};
} else {
// Default context: previous session IDs
const previousContext = collectPreviousResults(status);
if (previousContext) {
prompt += \n\nPrevious results:\n${previousContext};
}
}
return prompt; }
function buildContextFromHint(hint, status) { // Parse contextHint instruction and build context accordingly // Examples: // "Summarize IMPL_PLAN.md" → read and summarize plan // "List test coverage gaps" → analyze previous test results // "Pass session ID" → just return session reference
return parseAndBuildContext(hint, status); }
Example Prompt Output
/workflow:lite-plan -y "Implement user registration"
Context: Task: Implement user registration Previous results:
- None (first step)
/workflow:execute -y --in-memory
Context: Task: Implement user registration Previous results:
- lite-plan: WFS-plan-20250130 (planning-context.md)
User Interaction
Step 1: Select Template
Select workflow template:
○ rapid lite-plan → lite-execute → test-cycle-execute ○ coupled plan → plan-verify → execute → review → test ○ bugfix lite-fix → lite-execute → test-cycle-execute ○ tdd tdd-plan → execute → tdd-verify ○ Other (more templates or custom)
Step 2: Review Execution Plan
Template: coupled Steps:
- /workflow:plan (slash-command mainprocess)
- /workflow:plan-verify (slash-command mainprocess)
- /workflow:execute (slash-command async)
- /workflow:review-session-cycle (slash-command mainprocess)
- /workflow:review-cycle-fix (slash-command mainprocess)
- /workflow:test-fix-gen (slash-command mainprocess)
- /workflow:test-cycle-execute (slash-command async)
Proceed? [Confirm / Cancel]
Resume Capability
async function resume(sessionId) {
const statusPath = .workflow/.flow-coordinator/${sessionId}/status.json;
const status = JSON.parse(Read(statusPath));
// Find first incomplete step status.current = status.steps.findIndex(s => s.status !== 'done'); if (status.current === -1) { console.log('All steps complete'); return; }
// Continue executing steps await executeSteps(status, statusPath); }
Available Templates
Templates discovered from templates/*.json :
Template Use Case Steps
rapid Simple feature /workflow:lite-plan → /workflow:lite-execute → /workflow:test-cycle-execute
coupled Complex feature /workflow:plan → /workflow:plan-verify → /workflow:execute → /workflow:review-session-cycle → /workflow:test-fix-gen
bugfix Bug fix /workflow:lite-fix → /workflow:lite-execute → /workflow:test-cycle-execute
tdd Test-driven /workflow:tdd-plan → /workflow:execute → /workflow:tdd-verify
test-fix Fix failing tests /workflow:test-fix-gen → /workflow:test-cycle-execute
brainstorm Exploration /workflow:brainstorm-with-file
debug Debug with docs /workflow:debug-with-file
analyze Collaborative analysis /workflow:analyze-with-file
issue Issue workflow /workflow:issue:plan → /workflow:issue:queue → /workflow:issue:execute
Design Principles
-
Minimal fields: Only essential tracking data
-
Flat structure: No nested objects beyond steps array
-
Step-level execution: Each step defines how it's executed
-
Resumable: Any step can be resumed from status
-
Human readable: Clear JSON format
Reference Documents
Document Purpose
spec/unified-workflow-spec.md Unified PromptTemplate workflow specification
ccw/data/flows/*.json Unified workflows (DAG format, dynamic discovery)
templates/*.json Legacy workflow templates (command chain format)
Demo Workflows (Unified Format)
File Description Nodes
demo-unified-workflow.json
Auth implementation 7 nodes: Analyze → Plan → Implement → Review → Tests → Report
parallel-ci-workflow.json
CI/CD pipeline 8 nodes: Parallel checks → Merge → Conditional notify
simple-analysis-workflow.json
Analysis pipeline 3 nodes: Explore → Analyze → Report