AgentSkillsCN

flow-orchestrator

基于PocketFlow构建的高级多代理工作流编排引擎。在构建复杂的多代理流水线时使用,这些流水线需要支持暂停/恢复、状态快照、执行轨迹追踪,以及协同运作的代理池。将orchestra-forge、kernel-forge与pocketflow整合为统一的运行时。

SKILL.md
--- frontmatter
name: flow-orchestrator
description: Advanced multi-agent workflow orchestration engine built on PocketFlow. Use when building complex multi-agent pipelines that need pause/resume, state snapshots, execution tracing, and coordinated agent pools. Combines orchestra-forge, kernel-forge, and pocketflow into a unified runtime.

Flow Orchestrator

A production-grade multi-agent orchestration skill that extends PocketFlow with runtime control, observability, and resilience.

When to use this skill

  • Building complex multi-agent pipelines (e.g., Planner → Workers → Reviewer)
  • Need pause/resume for long-running LLM workflows
  • Need state snapshots for rollback and debugging
  • Need execution tracing with timestamps, diffs, and transition logs
  • Coordinating parallel agents with message queues

Core Components

1. OrchestratorRuntime (scripts/orchestrator.py)

Wraps PocketFlow Flows with runtime control:

  • runtime.run(shared) — Start the pipeline
  • runtime.pause() — Freeze execution between nodes
  • runtime.resume() — Continue from last pause point
  • runtime.status() — Get current node, elapsed time, state

2. Tracer (scripts/tracer.py)

Logs every node transition:

  • Entry/exit timestamps per node
  • Shared data diffs (what changed)
  • Action edges taken
  • Retry/fallback events
  • Export to JSON or Mermaid diagram

3. SnapshotManager (scripts/snapshot.py)

State persistence:

  • save(shared, label) — Serialize shared data to disk
  • load(label) — Restore from a checkpoint
  • list() — Show all available snapshots
  • diff(a, b) — Compare two snapshots

4. AgentPool (scripts/orchestrator.py)

Multi-agent coordination via asyncio queues:

  • Spawn N worker agents
  • Round-robin or priority-based task dispatch
  • Collect and merge results

How to use it

Step 1: Define your agents as PocketFlow Nodes

python
from pocketflow import Node, Flow, AsyncNode, AsyncFlow

class PlannerAgent(Node):
    def prep(self, shared):
        return shared["task_description"]
    
    def exec(self, task):
        return call_llm(f"Break this into subtasks:\n{task}")
    
    def post(self, shared, prep_res, exec_res):
        shared["subtasks"] = parse_subtasks(exec_res)
        return "dispatch"

Step 2: Wire the flow

python
planner = PlannerAgent()
workers = WorkerPool()  # BatchNode for parallel execution
reviewer = ReviewerAgent()
finalizer = FinalizeNode()

planner - "dispatch" >> workers
workers - "review" >> reviewer
reviewer - "approved" >> finalizer
reviewer - "retry" >> planner

pipeline = Flow(start=planner)

Step 3: Run with the Orchestrator

python
from scripts.orchestrator import OrchestratorRuntime

runtime = OrchestratorRuntime(pipeline, enable_trace=True, enable_snapshots=True)
shared = {"task_description": "Build a REST API for user management"}

# Run (can be paused mid-execution)
runtime.run(shared)

# After completion:
runtime.tracer.export("trace.json")
runtime.tracer.to_mermaid("trace.md")

Step 4: Pause and Resume

python
# In another thread or signal handler:
runtime.pause()  # Freezes after current node completes

# Later:
runtime.resume()  # Picks up where it left off

Step 5: Snapshots

python
from scripts.snapshot import SnapshotManager

sm = SnapshotManager(directory=".snapshots")
sm.save(shared, label="after-planning")

# ... something goes wrong ...
shared = sm.load("after-planning")  # Rollback!

Decision Tree

code
Is your task a single LLM call?
├── YES → Use a simple Node, not this skill
└── NO → Is it a linear pipeline?
    ├── YES → Use pocketflow Workflow pattern
    └── NO → Does it need multiple agents?
        ├── YES → Use this skill (flow-orchestrator)
        └── NO → Does it need pause/resume or tracing?
            ├── YES → Use this skill
            └── NO → Use orchestra-forge

Integration with other skills

  • pocketflow: This skill extends PocketFlow. All Node/Flow/Batch/Async primitives work.
  • orchestra-forge: Use orchestra-forge templates as sub-flows inside the orchestrator.
  • kernel-forge: The orchestrator can delegate to kernel-forge for policy enforcement and idempotency.
  • skill-architect: Run skill-architect to audit your orchestrated pipelines.