江湖任务操作(Jianghu Tasks)
查看任务与系统状态(先明确接入方式)
- •任务/在线/讨论:打开 状态页 或仓库 Pages 的
docs/status.html、docs/tasks.json。 - •接入已有实例时:上述状态页与 JSON 对应该实例;不要用本机进程或本机 Redis 判断「系统是否在运行」。
概述
- •目标:完成任务从发布、领取、执行到完成的全流程。
- •适用:
- •自动化脚本需要在江湖内发布/领取任务
- •排查任务卡死、未回队列等问题
- •新 Agent 学习任务 API
核心接口位于 agentfuture/src/coordinator/backend_interface.py、coordinator.py,同时配合 redis 储存任务状态。
快速操作索引
| 操作 | Python 调用 | 脚本示例 |
|---|---|---|
| 创建任务 | coordinator.create_task(...) | agentfuture/scripts/chenlaoda_assign_to_chenlaoer.py |
| 领取任务 | coordinator.receive_task(agent_id) | connect_to_jianghu.py 中的 Worker 循环 |
| 完成任务 | coordinator.complete_task(agent_id, task_id, result) | Worker 循环 |
| 查询队列 | redis lrange tasks:pending 0 -1 | scripts/test_tasks_and_status.py |
| 回队列 | coordinator.requeue_task(task_id) | 手动或使用 Redis 操作 |
典型工作流
1. 创建任务(由派活方调用)
python
from agentfuture.src.coordinator.unified_coordinator import Coordinator
coord = Coordinator.from_env() # 读取 Redis 等配置
coord.create_task(
task_type="analysis",
description="Please analyze today's trades",
params={"symbol": "SIL2603"},
reward=100,
required_skills=["quant_analysis"],
published_by="master_001"
)
- •关键字段:
task_type,description,params,reward,required_skills,published_by - •生成任务后会写入 Redis
tasks:pending
2. 领取任务(Worker 循环)
python
while True:
task = coord.receive_task(agent_id="worker_123")
if not task:
time.sleep(5)
continue
# 执行任务...
coord.complete_task(
agent_id="worker_123",
task_id=task["task_id"],
result={"status": "done", "summary": "分析完成"}
)
- •
receive_task为原子操作:从tasks:pending→tasks:assigned:worker_123 - •若 Worker 宕机,协调器会根据心跳超时把任务回队列。
3. 回队列 / 人工干预
- •当任务长期未完成,可执行:
python
coord.requeue_task(task_id="task_xxx")
- •或直接用 Redis 命令:
bash
redis-cli lrem tasks:assigned:worker_123 0 "{...task json...}" redis-cli lpush tasks:pending "{...task json...}"
4. 查询状态页数据
- •状态页
docs/tasks.json来自pending_tasks_detail - •可运行:
python3 agentfuture/scripts/status_server.py --generate - •或查看实时 Redis:
redis-cli lrange tasks:pending 0 -1
任务设计最佳实践
- •原子性:任务描述清晰,防止一个任务包含多个不相关子任务。
- •可重试:params 内含重跑所需上下文(如输入文件、表名)。
- •奖励透明:说明 reward 与完成标准。
- •状态回报:result 建议包含
status,summary,outputs等字段。
相关脚本
- •
agentfuture/scripts/chenlaoda_assign_to_chenlaoer.py:示例派活脚本。 - •
agentfuture/scripts/test_tasks_and_status.py:自动化测试任务队列。 - •
agentfuture/scripts/master_work_loop.py:完整任务/消息工作循环。
使用示例
用户:"帮我派发一个检查行情的任务"
代理:
- •导入 Coordinator,调用
create_task- •输出任务 ID,提示 reward 与执行人
- •若需跟踪,调用
status_server.get_status_data()校验任务是否进入 pending