AgentSkillsCN

nightly-jobs

在整合后的夜间作业调度器中添加脚本

SKILL.md
--- frontmatter
name: nightly-jobs
description: Add scripts to the consolidated nightly job runner

Nightly Jobs Skill

Overview

This skill guides you through integrating scripts into the brickston-ai consolidated nightly job system.

File Locations

  • Scripts: scripts/
  • Job Runner: apps/api/app/tasks/nightly_runner.py (or equivalent)
  • Scheduler Config: Check Cloud Scheduler jobs in GCP Console

Adding a New Nightly Job

Step 1: Create the Script

Location: scripts/<script_name>.py

python
#!/usr/bin/env python3
"""
Description: Brief description of what this job does.
Schedule: Nightly at 2:00 AM PST
"""

import asyncio
import logging
from datetime import datetime

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

async def main():
    """Main job execution."""
    start_time = datetime.now()
    logger.info(f"Starting job at {start_time}")
    
    try:
        # Job logic here
        await process_data()
        
        elapsed = datetime.now() - start_time
        logger.info(f"Job completed successfully in {elapsed}")
        return True
        
    except Exception as e:
        logger.error(f"Job failed: {e}")
        raise

async def process_data():
    """Core processing logic."""
    # Implementation
    pass

if __name__ == "__main__":
    asyncio.run(main())

Step 2: Add Database Connection (if needed)

python
import os
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker

DATABASE_URL = os.environ.get("DATABASE_URL")

async def get_db_session():
    engine = create_async_engine(DATABASE_URL)
    async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
    async with async_session() as session:
        yield session

Step 3: Register in Job Runner

Add to the consolidated job runner configuration:

python
NIGHTLY_JOBS = [
    {
        "name": "new_job_name",
        "script": "scripts/new_job.py",
        "schedule": "0 2 * * *",  # 2:00 AM daily
        "enabled": True,
        "timeout_minutes": 30,
    },
    # ... other jobs
]

Step 4: Add to Cloud Scheduler (for GCP deployments)

bash
# Create a Cloud Scheduler job that triggers the Cloud Run cron service
gcloud scheduler jobs create http new-nightly-job \
  --location us-west1 \
  --schedule "0 10 * * *" \
  --uri "https://brickston-cron-job-HASH-uw.a.run.app/run/new-job" \
  --http-method POST \
  --oidc-service-account-email brickston-scheduler@graphic-iridium-485814-b2.iam.gserviceaccount.com \
  --time-zone "UTC"

Job Patterns

Data Sync Job

python
async def sync_external_data():
    """Sync data from external source."""
    async with get_db_session() as db:
        # Fetch from external API
        external_data = await fetch_external_api()
        
        # Upsert into database
        for record in external_data:
            await upsert_record(db, record)
        
        await db.commit()

Cleanup Job

python
async def cleanup_old_records():
    """Remove records older than retention period."""
    cutoff_date = datetime.now() - timedelta(days=90)
    
    async with get_db_session() as db:
        result = await db.execute(
            text("DELETE FROM temp_data WHERE created_at < :cutoff"),
            {"cutoff": cutoff_date}
        )
        logger.info(f"Deleted {result.rowcount} old records")
        await db.commit()

Report Generation Job

python
async def generate_daily_report():
    """Generate and store daily summary report."""
    async with get_db_session() as db:
        # Aggregate data
        metrics = await calculate_daily_metrics(db)
        
        # Store report
        await db.execute(
            text("""
                INSERT INTO daily_reports (report_date, metrics)
                VALUES (:date, :metrics)
            """),
            {"date": datetime.now().date(), "metrics": json.dumps(metrics)}
        )
        await db.commit()

Error Handling & Alerting

python
import traceback

async def main():
    try:
        await run_job()
    except Exception as e:
        error_msg = f"Job failed: {e}\n{traceback.format_exc()}"
        logger.error(error_msg)
        # Optionally send alert
        await send_slack_alert(error_msg)
        raise

Testing

bash
# Run job manually
cd /path/to/repo
python scripts/new_job.py

# With environment
DATABASE_URL=postgresql://... python scripts/new_job.py

Checklist

  • Script created in scripts/
  • Proper logging configured
  • Database connection (if needed)
  • Error handling with clear messages
  • Registered in job runner
  • Added to Cloud Scheduler for deployment
  • Tested locally
  • Documented execution time expectations