auto-parallel-dev (Prototype)
自动化并行开发管理 Skill - 原型版本。
原型目标: 验证核心功能可行性
- •监控循环(30秒定时 + 事件驱动)
- •状态同步机制(Agent 完成后立即同步)
- •并发安全保护(FileLock)
使用方式
启动原型
code
/auto-parallel-dev-prototype start
停止原型
code
/auto-parallel-dev-prototype stop
核心逻辑
1. 监控循环
python
def monitoring_loop():
"""主监控循环 - 混合模式:事件驱动 + 定时检查"""
last_check = time.time()
check_interval = 30 # 秒
while project_status != "COMPLETED":
# 【优先级1】事件驱动检查(Agent 完成通知)
if has_task_notification():
handle_agent_completion()
# 【优先级2】定时检查(每30秒)
if time.time() - last_check >= check_interval:
check_all_agents_status()
sync_taskmaster_states()
assign_pending_tasks()
last_check = time.time()
# 【优先级3】项目完成检测
if no_pending_tasks() and all_agents_idle():
trigger_completion()
break
time.sleep(5) # 降低 CPU 占用
2. Agent 完成处理(关键修复)
python
def handle_agent_completion():
"""处理 Agent 完成事件 - 修复状态同步断链"""
# 步骤1:读取 Agent 输出
for agent_id, output in get_completed_agents():
task_id = output.task_id
member_id = output.member_id
status = output.status # "done", "failed", "rejected"
# 步骤2:状态同步(原子操作)
sync_status_atomically(task_id, member_id, status)
# 步骤3:立即检查并分配新任务
if has_pending_tasks() and has_idle_members():
assign_tasks_to_idle_members()
3. 状态同步(并发安全)
python
def sync_status_atomically(task_id, member_id, status):
"""原子状态同步 - 使用 FileLock 防止并发冲突"""
lock_file = ".claude/team/registry.lock"
try:
# 获取文件锁(超时10秒)
with FileLock(lock_file, timeout=10):
# 1. 更新 Task-Master
success = update_taskmaster(task_id, status)
# 2. 更新 registry.json
if success:
update_registry(member_id, task_id, status)
else:
# Task-Master 失败,记录错误
log_error("taskmaster_sync_failed", {
"task_id": task_id,
"status": status
})
except TimeoutError:
# 获取锁超时,记录冲突
log_error("registry_lock_timeout", {
"task_id": task_id,
"member_id": member_id
})
4. Task-Master 同步(带重试)
python
def update_taskmaster(task_id, status):
"""更新 Task-Master 状态 - 带重试机制"""
for attempt in range(3):
try:
result = mcp__task-master-ai__set_task_status({
"id": task_id,
"status": status,
"projectRoot": get_project_root(),
"tag": get_task_tag(task_id)
})
return True
except Exception as e:
if attempt < 2:
# 指数退避:1s, 2s, 4s
sleep(2 ** attempt)
else:
# 最后一次尝试失败
raise e
return False
5. Agent 启动验证
python
def launch_agent_with_verification(member_id, task_id):
"""启动 Agent 并验证真实运行"""
# 启动后台 Agent
agent_id = Task(
subagent_type=get_member_type(member_id),
prompt=build_prompt(member_id, task_id),
run_in_background=True
)
# 关键:等待 5 秒验证 Agent 真正在运行
sleep(5)
if not is_agent_alive(agent_id):
# Agent 启动失败
log_error("agent_launch_failed", {
"agent_id": agent_id,
"member_id": member_id,
"task_id": task_id
})
return None
return agent_id
6. 任务分配(智能匹配)
python
def assign_tasks_to_idle_members():
"""为空闲成员分配任务"""
pending = get_pending_tasks()
idle_members = get_idle_members()
for member in idle_members:
# 找到匹配技能的任务
matched = find_matching_tasks(member, pending)
# 预检依赖
ready_tasks = [t for t in matched if all_deps_satisfied(t)]
if ready_tasks:
# 分配 3-4 个任务
tasks_to_assign = ready_tasks[:4]
for task in tasks_to_assign:
agent_id = launch_agent_with_verification(member.id, task.id)
if agent_id:
# 更新 registry
update_registry_with_lock({
"members": {
member.id: {
"current_tasks": [{
"task_id": task.id,
"agent_id": agent_id,
"started_at": now()
}]
}
}
})
错误处理
错误记录到 registry.json
json
{
"member_id": "alice-backend",
"blocked_tasks": [
{
"task_id": "LWP-2-T050",
"block_reason": {
"type": "gatekeeper_rejection",
"message": "缺少规范文档",
"timestamp": "2026-01-20T01:35:00Z",
"retry_count": 0,
"agent_id": "abc123"
}
}
]
}
错误类型和处理
| 错误类型 | 处理策略 |
|---|---|
| Task-Master 超时 | 重试 3 次(指数退避) |
| Gatekeeper 拒绝 | 标记 blocked,跳过 |
| Agent 启动失败 | 重新分配给其他成员 |
| 并发写入冲突 | FileLock 等待 |
| 依赖不满足 | 跳过任务,等待依赖 |
测试
运行原型测试
bash
cd .claude/skills/auto-parallel-dev/prototype python test_prototype.py
测试覆盖
- •✅ 监控循环启动和停止
- •✅ Agent 完成后状态同步
- •✅ 并发写入保护
- •✅ Task-Master 重试机制
- •✅ Agent 启动验证
- •✅ 项目完成检测
实现细节
文件锁实现(Python)
python
from filelock import FileLock, Timeout
def update_registry_safe(updates):
"""并发安全的 registry 更新"""
lock_file = ".claude/team/registry.lock"
try:
with FileLock(lock_file, timeout=10):
registry = load_json("registry.json")
apply_updates(registry, updates)
save_json("registry.json", registry)
return True
except Timeout:
log_error("concurrent_write_timeout")
return False
Agent 存活检查
python
def is_agent_alive(agent_id):
"""检查 Agent 是否真正在运行"""
output_file = get_agent_output_file(agent_id)
if not os.path.exists(output_file):
return False
# 读取输出文件
output = read_agent_output(output_file)
# 检查状态
return output.status in ["running", "in-progress"]
限制(原型版本)
- •仅支持监控循环和状态同步
- •不包含交互式配置生成
- •不包含 Worktree 自动创建
- •不包含自动清理功能
这些功能将在完整版本中实现。
下一步
原型验证成功后,实现:
- •交互式配置生成
- •Git Worktree 自动创建
- •项目完成自动清理
- •完整错误处理和恢复