Flow Coordinator
Lightweight workflow coordinator supporting two workflow formats:
- •Legacy Templates: Command chains with slash-command execution
- •Unified Workflows: DAG-based PromptTemplate nodes (spec:
spec/unified-workflow-spec.md)
Specification Reference
- •Unified Workflow Spec: @spec/unified-workflow-spec.md
- •Demo Workflow:
ccw/data/flows/demo-unified-workflow.json
Architecture
User Task → Detect Format → Select Workflow → Init Status → Execute → Complete
│ │
├─ Legacy Template │
│ └─ Sequential cmd execution │
│ │
└─ Unified Workflow │
└─ DAG traversal with contextRefs │
│
└──────────────── Resume (from status.json) ──────────────┘
Execution Modes:
├─ analysis → Read-only, CLI --mode analysis
├─ write → File changes, CLI --mode write
├─ mainprocess → Blocking, synchronous
└─ async → Background, ccw cli
Core Concepts
Dual Format Support:
- •Legacy:
templates/*.jsonwithcmd,args,execution - •Unified:
ccw/data/flows/*.jsonwithnodes,edges,contextRefs
Unified PromptTemplate Model: All workflow steps are natural language instructions with:
- •
instruction: What to execute (natural language) - •
outputName: Name for output reference - •
contextRefs: References to previous step outputs - •
tool: Optional CLI tool (gemini/qwen/codex/claude) - •
mode: Execution mode (analysis/write/mainprocess/async)
DAG Execution: Unified workflows execute as directed acyclic graphs with parallel branches and conditional edges.
Dynamic Discovery: Both formats discovered at runtime via Glob.
Execution Flow
async function execute(task) {
// 1. Discover and select template
const templates = await discoverTemplates();
const template = await selectTemplate(templates);
// 2. Init status
const sessionId = `fc-${timestamp()}`;
const statusPath = `.workflow/.flow-coordinator/${sessionId}/status.json`;
const status = initStatus(template, task);
write(statusPath, JSON.stringify(status, null, 2));
// 3. Execute steps based on execution config
await executeSteps(status, statusPath);
}
async function executeSteps(status, statusPath) {
for (let i = status.current; i < status.steps.length; i++) {
const step = status.steps[i];
status.current = i;
// Execute based on step mode (all steps use slash-command type)
const execConfig = step.execution || { type: 'slash-command', mode: 'mainprocess' };
if (execConfig.mode === 'async') {
// Async execution - stop and wait for hook callback
await executeSlashCommandAsync(step, status, statusPath);
break;
} else {
// Mainprocess execution - continue immediately
await executeSlashCommandSync(step, status);
step.status = 'done';
write(statusPath, JSON.stringify(status, null, 2));
}
}
// All steps complete
if (status.current >= status.steps.length) {
status.complete = true;
write(statusPath, JSON.stringify(status, null, 2));
}
}
Unified Workflow Execution
For workflows using the unified PromptTemplate format (ccw/data/flows/*.json):
async function executeUnifiedWorkflow(workflow, task) {
// 1. Initialize execution state
const sessionId = `ufc-${timestamp()}`;
const statusPath = `.workflow/.flow-coordinator/${sessionId}/status.json`;
const state = {
id: sessionId,
workflow: workflow.id,
goal: task,
nodeStates: {}, // nodeId -> { status, result, error }
outputs: {}, // outputName -> result
complete: false
};
// 2. Topological sort for execution order
const executionOrder = topologicalSort(workflow.nodes, workflow.edges);
// 3. Execute nodes respecting DAG dependencies
await executeDAG(workflow, executionOrder, state, statusPath);
}
async function executeDAG(workflow, order, state, statusPath) {
for (const nodeId of order) {
const node = workflow.nodes.find(n => n.id === nodeId);
const data = node.data;
// Check if all dependencies are satisfied
if (!areDependenciesSatisfied(nodeId, workflow.edges, state)) {
continue; // Will be executed when dependencies complete
}
// Resolve context references
const resolvedInstruction = resolveContextRefs(
data.instruction,
data.contextRefs || [],
state.outputs
);
// Execute based on mode
state.nodeStates[nodeId] = { status: 'running' };
write(statusPath, JSON.stringify(state, null, 2));
const result = await executeNode(resolvedInstruction, data.tool, data.mode);
// Store output for downstream nodes
state.nodeStates[nodeId] = { status: 'completed', result };
if (data.outputName) {
state.outputs[data.outputName] = result;
}
write(statusPath, JSON.stringify(state, null, 2));
}
state.complete = true;
write(statusPath, JSON.stringify(state, null, 2));
}
function resolveContextRefs(instruction, refs, outputs) {
let resolved = instruction;
for (const ref of refs) {
const value = outputs[ref];
const placeholder = `{{${ref}}}`;
resolved = resolved.replace(new RegExp(placeholder, 'g'),
typeof value === 'object' ? JSON.stringify(value) : String(value));
}
return resolved;
}
async function executeNode(instruction, tool, mode) {
// Build CLI command based on tool and mode
const cliTool = tool || 'gemini';
const cliMode = mode === 'write' ? 'write' : 'analysis';
if (mode === 'async') {
// Background execution
return Bash(
`ccw cli -p "${escapePrompt(instruction)}" --tool ${cliTool} --mode ${cliMode}`,
{ run_in_background: true }
);
} else {
// Synchronous execution
return Bash(
`ccw cli -p "${escapePrompt(instruction)}" --tool ${cliTool} --mode ${cliMode}`
);
}
}
Unified Workflow Discovery
async function discoverUnifiedWorkflows() {
const files = Glob('*.json', { path: 'ccw/data/flows/' });
const workflows = [];
for (const file of files) {
const content = JSON.parse(Read(file));
// Detect unified format by checking for 'nodes' array
if (content.nodes && Array.isArray(content.nodes)) {
workflows.push({
id: content.id,
name: content.name,
description: content.description,
nodeCount: content.nodes.length,
format: 'unified',
file: file
});
}
}
return workflows;
}
Format Detection
function detectWorkflowFormat(content) {
if (content.nodes && content.edges) {
return 'unified'; // PromptTemplate DAG format
}
if (content.steps && content.steps[0]?.cmd) {
return 'legacy'; // Command chain format
}
throw new Error('Unknown workflow format');
}
Legacy Template Discovery
Dynamic query - never hardcode template list:
async function discoverTemplates() {
// Discover all JSON templates
const files = Glob('*.json', { path: 'templates/' });
// Parse each template
const templates = [];
for (const file of files) {
const content = JSON.parse(Read(file));
templates.push({
name: content.name,
description: content.description,
steps: content.steps.map(s => s.cmd).join(' → '),
file: file
});
}
return templates;
}
Template Selection
User chooses from discovered templates:
async function selectTemplate(templates) {
// Build options from discovered templates
const options = templates.slice(0, 4).map(t => ({
label: t.name,
description: t.steps
}));
const response = await AskUserQuestion({
questions: [{
question: 'Select workflow template:',
header: 'Template',
options: options,
multiSelect: false
}]
});
// Handle "Other" - show remaining templates or custom input
if (response.template === 'Other') {
return await selectFromRemainingTemplates(templates.slice(4));
}
return templates.find(t => t.name === response.template);
}
Status Schema
Creation: Copy template JSON → Update id, template, goal, set all steps status: "pending"
Location: .workflow/.flow-coordinator/{session-id}/status.json
Core Fields:
- •
id: Session ID (fc-YYYYMMDD-HHMMSS) - •
template: Template name - •
goal: User task description - •
current: Current step index - •
steps[]: Step array from template (with runtimestatus,session,taskId) - •
complete: All steps done?
Step Status: pending → running → done | failed | skipped
Extended Template Schema
Templates stored in: templates/*.json (discovered at runtime via Glob)
TemplateStep Fields:
- •
cmd: Full command path (e.g.,/workflow:lite-plan,/workflow:execute) - •
args?: Arguments with{{goal}}and{{prev}}placeholders - •
unit?: Minimum execution unit name (groups related commands) - •
optional?: Can be skipped by user - •
execution: Type and mode configuration- •
type: Always'slash-command'(for all workflow commands) - •
mode:'mainprocess'(blocking) or'async'(background)
- •
- •
contextHint?: Natural language guidance for context assembly
Template Example:
{
"name": "rapid",
"steps": [
{
"cmd": "/workflow:lite-plan",
"args": "\"{{goal}}\"",
"unit": "quick-implementation",
"execution": { "type": "slash-command", "mode": "mainprocess" },
"contextHint": "Create lightweight implementation plan"
},
{
"cmd": "/workflow:lite-execute",
"args": "--in-memory",
"unit": "quick-implementation",
"execution": { "type": "slash-command", "mode": "async" },
"contextHint": "Execute plan from previous step"
}
]
}
Execution Implementation
Mainprocess Mode (Blocking)
async function executeSlashCommandSync(step, status) {
// Build command: /workflow:cmd -y args
const cmd = buildCommand(step, status);
const result = await SlashCommand({ command: cmd });
step.session = result.session_id;
step.status = 'done';
return result;
}
Async Mode (Background)
async function executeSlashCommandAsync(step, status, statusPath) {
// Build prompt: /workflow:cmd -y args + context
const prompt = buildCommandPrompt(step, status);
step.status = 'running';
write(statusPath, JSON.stringify(status, null, 2));
// Execute via ccw cli in background
const taskId = Bash(
`ccw cli -p "${escapePrompt(prompt)}" --tool claude --mode write`,
{ run_in_background: true }
).task_id;
step.taskId = taskId;
write(statusPath, JSON.stringify(status, null, 2));
console.log(`Executing: ${step.cmd} (async)`);
console.log(`Resume: /flow-coordinator --resume ${status.id}`);
}
Prompt Building
Prompts are built in format: /workflow:cmd -y args + context
function buildCommandPrompt(step, status) {
// step.cmd already contains full path: /workflow:lite-plan, /workflow:execute, etc.
let prompt = `${step.cmd} -y`;
// Add arguments (with placeholder replacement)
if (step.args) {
const args = step.args
.replace('{{goal}}', status.goal)
.replace('{{prev}}', getPreviousSessionId(status));
prompt += ` ${args}`;
}
// Add context based on contextHint
if (step.contextHint) {
const context = buildContextFromHint(step.contextHint, status);
prompt += `\n\nContext:\n${context}`;
} else {
// Default context: previous session IDs
const previousContext = collectPreviousResults(status);
if (previousContext) {
prompt += `\n\nPrevious results:\n${previousContext}`;
}
}
return prompt;
}
function buildContextFromHint(hint, status) {
// Parse contextHint instruction and build context accordingly
// Examples:
// "Summarize IMPL_PLAN.md" → read and summarize plan
// "List test coverage gaps" → analyze previous test results
// "Pass session ID" → just return session reference
return parseAndBuildContext(hint, status);
}
Example Prompt Output
/workflow:lite-plan -y "Implement user registration" Context: Task: Implement user registration Previous results: - None (first step)
/workflow:execute -y --in-memory Context: Task: Implement user registration Previous results: - lite-plan: WFS-plan-20250130 (planning-context.md)
User Interaction
Step 1: Select Template
Select workflow template: ○ rapid lite-plan → lite-execute → test-cycle-execute ○ coupled plan → plan-verify → execute → review → test ○ bugfix lite-fix → lite-execute → test-cycle-execute ○ tdd tdd-plan → execute → tdd-verify ○ Other (more templates or custom)
Step 2: Review Execution Plan
Template: coupled Steps: 1. /workflow:plan (slash-command mainprocess) 2. /workflow:plan-verify (slash-command mainprocess) 3. /workflow:execute (slash-command async) 4. /workflow:review-session-cycle (slash-command mainprocess) 5. /workflow:review-cycle-fix (slash-command mainprocess) 6. /workflow:test-fix-gen (slash-command mainprocess) 7. /workflow:test-cycle-execute (slash-command async) Proceed? [Confirm / Cancel]
Resume Capability
async function resume(sessionId) {
const statusPath = `.workflow/.flow-coordinator/${sessionId}/status.json`;
const status = JSON.parse(Read(statusPath));
// Find first incomplete step
status.current = status.steps.findIndex(s => s.status !== 'done');
if (status.current === -1) {
console.log('All steps complete');
return;
}
// Continue executing steps
await executeSteps(status, statusPath);
}
Available Templates
Templates discovered from templates/*.json:
| Template | Use Case | Steps |
|---|---|---|
| rapid | Simple feature | /workflow:lite-plan → /workflow:lite-execute → /workflow:test-cycle-execute |
| coupled | Complex feature | /workflow:plan → /workflow:plan-verify → /workflow:execute → /workflow:review-session-cycle → /workflow:test-fix-gen |
| bugfix | Bug fix | /workflow:lite-fix → /workflow:lite-execute → /workflow:test-cycle-execute |
| tdd | Test-driven | /workflow:tdd-plan → /workflow:execute → /workflow:tdd-verify |
| test-fix | Fix failing tests | /workflow:test-fix-gen → /workflow:test-cycle-execute |
| brainstorm | Exploration | /workflow:brainstorm-with-file |
| debug | Debug with docs | /workflow:debug-with-file |
| analyze | Collaborative analysis | /workflow:analyze-with-file |
| issue | Issue workflow | /workflow:issue:plan → /workflow:issue:queue → /workflow:issue:execute |
Design Principles
- •Minimal fields: Only essential tracking data
- •Flat structure: No nested objects beyond steps array
- •Step-level execution: Each step defines how it's executed
- •Resumable: Any step can be resumed from status
- •Human readable: Clear JSON format
Reference Documents
| Document | Purpose |
|---|---|
| spec/unified-workflow-spec.md | Unified PromptTemplate workflow specification |
| ccw/data/flows/*.json | Unified workflows (DAG format, dynamic discovery) |
| templates/*.json | Legacy workflow templates (command chain format) |
Demo Workflows (Unified Format)
| File | Description | Nodes |
|---|---|---|
demo-unified-workflow.json | Auth implementation | 7 nodes: Analyze → Plan → Implement → Review → Tests → Report |
parallel-ci-workflow.json | CI/CD pipeline | 8 nodes: Parallel checks → Merge → Conditional notify |
simple-analysis-workflow.json | Analysis pipeline | 3 nodes: Explore → Analyze → Report |