AgentSkillsCN

erpnext-syntax-scheduler

Frappe/ERPNext v14/v15/v16 的调度器与后台作业语法。适用于 hooks.py 中的 scheduler_events、frappe.enqueue() 用于异步作业、队列配置、作业去重、错误处理与监控时使用。可通过关于定时任务、后台处理、cron 任务、RQ 工作者、作业队列、异步任务等问题进行触发。 [注:以上内容为中文翻译,具体功能与应用场景可根据实际需求进一步调整与优化。]

SKILL.md
--- frontmatter
name: erpnext-syntax-scheduler
description: Scheduler and background jobs syntax for Frappe/ERPNext v14/v15/v16. Use for scheduler_events in hooks.py, frappe.enqueue() for async jobs, queue configuration, job deduplication, error handling, and monitoring. Triggers on questions about scheduled tasks, background processing, cron jobs, RQ workers, job queues, async tasks.

ERPNext Syntax: Scheduler & Background Jobs

Deterministic syntax reference for Frappe scheduler events and background job processing.

Quick Reference

Scheduler Events (hooks.py)

python
# hooks.py
scheduler_events = {
    "all": ["myapp.tasks.every_tick"],
    "hourly": ["myapp.tasks.hourly_task"],
    "daily": ["myapp.tasks.daily_task"],
    "weekly": ["myapp.tasks.weekly_task"],
    "monthly": ["myapp.tasks.monthly_task"],
    "daily_long": ["myapp.tasks.heavy_daily"],  # Long queue
    "cron": {
        "0 9 * * 1-5": ["myapp.tasks.weekday_9am"],
        "*/15 * * * *": ["myapp.tasks.every_15_min"]
    }
}

CRITICAL: After EVERY change to scheduler_events: bench migrate

frappe.enqueue Basics

python
# Simple
frappe.enqueue("myapp.tasks.process", customer="CUST-001")

# With queue and timeout
frappe.enqueue(
    "myapp.tasks.heavy_task",
    queue="long",
    timeout=3600,
    param="value"
)

# With deduplication (v15)
from frappe.utils.background_jobs import is_job_enqueued

job_id = f"import::{doc.name}"
if not is_job_enqueued(job_id):
    frappe.enqueue("myapp.tasks.import_data", job_id=job_id, doc=doc.name)

Scheduler Event Types

EventFrequencyQueue
allEvery tick (v14: 4min, v15: 60s)default
hourlyPer hourdefault
dailyPer daydefault
weeklyPer weekdefault
monthlyPer monthdefault
hourly_longPer hourlong
daily_longPer daylong
weekly_longPer weeklong
monthly_longPer monthlong
cronCustom scheduleconfigurable

Version difference scheduler tick:

  • v14: ~240 seconds (4 min)
  • v15: ~60 seconds

Queue Types

QueueTimeoutUsage
short300s (5 min)Quick tasks, UI responses
default300s (5 min)Standard tasks
long1500s (25 min)Heavy processing, imports

frappe.enqueue Parameters

python
frappe.enqueue(
    method,                      # REQUIRED: function or module path
    queue="default",             # Queue name
    timeout=None,                # Override timeout (seconds)
    is_async=True,               # False = execute directly
    now=False,                   # True = via frappe.call()
    job_id=None,                 # v15: unique ID for deduplication
    enqueue_after_commit=False,  # Wait for DB commit
    at_front=False,              # Place at front of queue
    on_success=None,             # Success callback
    on_failure=None,             # Failure callback
    **kwargs                     # Arguments for method
)

Job Deduplication

v15+ (Recommended)

python
from frappe.utils.background_jobs import is_job_enqueued

job_id = f"process::{doc.name}"
if not is_job_enqueued(job_id):
    frappe.enqueue(
        "myapp.tasks.process",
        job_id=job_id,
        doc_name=doc.name
    )

v14 (Deprecated)

python
# DO NOT USE - only for legacy code
from frappe.core.page.background_jobs.background_jobs import get_info
enqueued = [d.get("job_name") for d in get_info()]
if name not in enqueued:
    frappe.enqueue(..., job_name=name)

Error Handling Pattern

python
def process_records(records):
    for record in records:
        try:
            process_single(record)
            frappe.db.commit()  # Commit per success
        except Exception:
            frappe.db.rollback()  # Rollback on error
            frappe.log_error(
                frappe.get_traceback(),
                f"Process Error: {record}"
            )

Callbacks

python
def on_success_handler(job, connection, result, *args, **kwargs):
    frappe.publish_realtime("show_alert", {"message": "Done!"})

def on_failure_handler(job, connection, type, value, traceback):
    frappe.log_error(f"Job {job.id} failed: {value}")

frappe.enqueue(
    "myapp.tasks.risky_task",
    on_success=on_success_handler,
    on_failure=on_failure_handler
)

User Context

IMPORTANT: Scheduler jobs run as Administrator!

python
def scheduled_task():
    # frappe.session.user = "Administrator"
    
    # Set explicit owner:
    doc = frappe.new_doc("ToDo")
    doc.owner = "user@example.com"
    doc.insert(ignore_permissions=True)

Monitoring

ToolDescription
RQ Worker (DocType)Worker status, busy/idle
RQ Job (DocType)Job status, queue filter
bench doctorScheduler status overview
Scheduled Job LogExecution history

Version Differences v14 vs v15

Featurev14v15
Tick interval4 min60 sec
Config keyscheduler_intervalscheduler_tick_interval
Deduplicationjob_namejob_id + is_job_enqueued()

Reference Files

Critical Rules

  1. ALWAYS bench migrate after hooks.py scheduler_events changes
  2. USE job_id + is_job_enqueued() for deduplication (v15)
  3. CHOOSE correct queue: short/default/long based on duration
  4. COMMIT per successful record, rollback on error
  5. REMEMBER that jobs run as Administrator
  6. ENQUEUE heavy tasks from scheduler events, don't execute directly