AgentSkillsCN

Long Term Task

Long Term Task

中文原作
SKILL.md

Long Term Task Skill

概述

这是一个通用长期任务管理 Skill,用于协调多个智能体完成需要长时间运行的任务。

设计原则:

  1. Skill 只管基础设施 - 任务调度、状态管理、进度追踪
  2. Agent 决定业务逻辑 - 具体执行什么、怎么通知用户
  3. 零侵入 - Agent 不需要修改代码,通过 CLI 或 SDK 使用

使用场景

  • 每天定时学习 HF 论文
  • 持续监控某个数据源
  • 多步骤数据处理流程
  • 需要定期汇报进度的长期项目

Agent 使用方式

方式1: CLI(推荐 OpenClaw 使用)

python
import subprocess
import json

# 创建任务
result = subprocess.run(
    ["ltt", "--work-dir", "./.ltt", "create",
     "--name", "hf-paper-learning",
     "--goals", "下载今日论文,选择Top3,提取摘要,生成笔记",
     "--interval", "30"],  # 每30分钟汇报一次
    capture_output=True, text=True
)

# 解析任务 ID
task_id = None
for line in result.stdout.split("\n"):
    if line.startswith("ID: "):
        task_id = line.replace("ID: ", "").strip()

# 手动触发执行(或配置 cron)
# 分步执行模式:每次只执行一个步骤
subprocess.run(["ltt", "--work-dir", "./.ltt", "exec", task_id, "--step"])

# 完整执行模式:自动执行所有步骤(向后兼容)
subprocess.run(["ltt", "--work-dir", "./.ltt", "exec", task_id])

# 检查状态并读取事件
result = subprocess.run(
    ["ltt", "--work-dir", "./.ltt", "check", task_id, "--json"],
    capture_output=True, text=True
)
data = json.loads(result.stdout)

# 根据事件决定如何通知用户
for event in data.get("recent_events", []):
    if event["event"] == "progress_milestone":
        message.send(user, f"🎯 任务进度: {event['data']['milestone_percent']}%")

方式2: Python SDK(推荐 Claude Code 使用)

python
from long_term_task import TaskManager, FileReporter, ProgressTracker

# 创建管理器
manager = TaskManager(work_dir="./.ltt")

# 创建任务
task = manager.create_task(
    name="数据处理",
    goals=["下载数据", "清洗数据", "分析数据"],
    report_interval_minutes=30,
)

# 获取任务状态
task = manager.get_task(task.id)
print(f"当前步骤: {task.current_goal}")
print(f"进度: {task.progress_percent}%")

# 读取事件
for event in task.get_recent_events(5):
    print(f"[{event['timestamp']}] {event['event']}")

与 OpenClaw 集成

1. 创建长期任务

当用户说"创建长期任务"时,Agent 应该:

  1. 多轮对话收集需求(10个问题)
  2. 计算汇报间隔(根据任务预估时长)
  3. 创建任务并配置 cron
python
# 示例: Agent 创建 HF 论文学习任务
def create_hf_paper_task():
    # 创建任务
    result = subprocess.run([
        "ltt", "--work-dir", "./.ltt", "create",
        "--name", "daily-hf-papers",
        "--goals", "下载论文列表,筛选Top3,下载PDF,提取摘要,生成笔记",
        "--schedule", "daily",
        "--interval", "60"  # 每小时汇报
    ], capture_output=True, text=True)
    
    task_id = parse_task_id(result.stdout)
    
    # 配置执行心跳(每早9点执行)- 使用分步模式
    cron.add(
        name=f"ltt-exec-{task_id}",
        schedule={"kind": "cron", "expr": "0 9 * * *"},
        payload={
            "kind": "systemEvent",
            "text": f"执行长期任务 #{task_id} --step"
        }
    )
    
    # 配置检查心跳(每天10点检查)
    cron.add(
        name=f"ltt-check-{task_id}",
        schedule={"kind": "cron", "expr": "0 10 * * *"},
        payload={
            "kind": "systemEvent",
            "text": f"ltt --work-dir ./.ltt check {task_id} --json"
        }
    )
    
    return task_id

2. 处理执行事件

当执行心跳触发时,Agent 应该:

python
def on_exec_trigger(task_id):
    # 1. 先检查当前状态
    result = subprocess.run(
        ["ltt", "--work-dir", "./.ltt", "check", task_id, "--json"],
        capture_output=True, text=True
    )
    status = json.loads(result.stdout)
    
    # 2. 如果正在运行,跳过
    if status["status"] == "running":
        return
    
    # 3. 执行当前步骤
    task = manager.get_task(task_id)
    current_goal = task.current_goal
    
    if current_goal == "下载论文列表":
        download_papers()
        # 标记步骤完成(分步执行模式)
        subprocess.run(["ltt", "--work-dir", "./.ltt", "exec", task_id, "--step"])  # 执行下一步
    elif current_goal == "筛选Top3":
        select_top3()
        subprocess.run(["ltt", "--work-dir", "./.ltt", "exec", task_id, "--step"])
    # ...

3. 进度通知

Agent 监听任务事件并决定如何通知用户:

python
def check_and_notify(task_id):
    result = subprocess.run(
        ["ltt", "status", task_id, "--json"],
        capture_output=True, text=True
    )
    task = json.loads(result.stdout)
    
    # 检查最近事件
    for event in task["state"]["events"][-5:]:
        event_type = event["event"]
        data = event["data"]
        
        if event_type == "task_started":
            message.send(user, f"🚀 任务开始: {task['name']}")
            
        elif event_type == "progress_periodic":
            msg = f"📊 {task['name']} 进度: {data['progress_percent']:.0f}%"
            if data.get("estimated_remaining_seconds"):
                mins = data["estimated_remaining_seconds"] // 60
                msg += f",预计还剩 {mins} 分钟"
            message.send(user, msg)
            
        elif event_type == "progress_milestone":
            message.send(user, 
                f"🎯 {task['name']} 达到里程碑: {data['milestone_percent']}%")
            
        elif event_type == "task_completed":
            elapsed = data["elapsed_seconds"] // 60
            message.send(user, 
                f"✅ {task['name']} 完成!耗时 {elapsed} 分钟")
            
        elif event_type == "task_failed_final":
            message.send(user, 
                f"❌ {task['name']} 失败: {data.get('error', '未知错误')}")
            
        elif event_type == "executor_orphaned":
            message.send(user, 
                f"⚠️ {task['name']} 执行器失联,可能需要重启")

汇报间隔计算

如果用户没有指定汇报间隔,Agent 应该根据预估时长自动计算:

python
def calculate_interval(estimated_minutes):
    if estimated_minutes < 10:
        return 2  # 每2分钟
    elif estimated_minutes < 30:
        return 5  # 每5分钟
    elif estimated_minutes < 60:
        return 10  # 每10分钟
    elif estimated_minutes < 240:
        return 30  # 每30分钟
    elif estimated_minutes < 480:
        return 60  # 每1小时
    else:
        return 120  # 每2小时

多智能体协作

创建子任务

python
# 父任务创建多个子任务并行执行
subtask_a = manager.create_subtask(
    parent_id=parent_id,
    goal="下载数据A",
)

subtask_b = manager.create_subtask(
    parent_id=parent_id,
    goal="下载数据B",
)

# 启动子任务
subprocess.run(["ltt", "exec", subtask_a.id])
subprocess.run(["ltt", "exec", subtask_b.id])

# 等待子任务完成
while True:
    task_a = manager.get_task(subtask_a.id)
    task_b = manager.get_task(subtask_b.id)
    
    if task_a.is_completed and task_b.is_completed:
        # 两个子任务都完成了,执行合并
        subprocess.run(["ltt", "exec", parent_id])  # 执行父任务下一步
        break
    
    time.sleep(60)

全局白板

智能体通过白板共享进度:

python
# 智能体A
from long_term_task import ProgressTracker, FileReporter

reporter = FileReporter(task_dir)
tracker = ProgressTracker(task_id, reporter, ...)
tracker.report_to_whiteboard("agent-a", {
    "progress": 50,
    "status": "处理中",
    "current_item": "paper-001"
})

# 智能体B 读取
import json
whiteboard_path = task_dir / "whiteboard.json"
with open(whiteboard_path) as f:
    board = json.load(f)
    agent_a_progress = board["agents"]["agent-a"]["progress"]

最佳实践

1. 错误处理

python
try:
    result = subprocess.run(
        ["ltt", "exec", task_id],
        capture_output=True, text=True, timeout=3600
    )
    if result.returncode != 0:
        # 检查失败原因
        check_result = subprocess.run(
            ["ltt", "check", task_id, "--format", "json"],
            capture_output=True, text=True
        )
        status = json.loads(check_result.stdout)
        if status.get("retry_count", 0) >= 3:
            message.send(user, "任务连续失败3次,需要人工介入")
except subprocess.TimeoutExpired:
    # 执行超时,可能是 orphan
    message.send(user, "任务执行超时,请检查状态")

2. 定期清理

python
# 删除已完成的任务
for task in manager.list_tasks():
    if task.is_completed:
        # 保留7天后删除
        completed_at = task.state.get("last_end")
        if is_older_than(completed_at, days=7):
            manager.delete_task(task.id, soft=True)

故障排除

问题原因解决
任务状态卡在 running执行器崩溃checker 会检测 orphan 并标记
收不到进度通知reporter 配置错误检查 state.json 中的事件
多个执行器同时运行锁竞争Skill 内部有文件锁保护
状态文件损坏异常退出删除 state.json 重置

参考

  • CLI: ltt --help
  • 源码: src/long_term_task/
  • 示例: examples/