AgentSkillsCN

python-async-workers

生成 Python 后台任务、Cron 作业、消息队列消费者。适用于创建定时任务(APScheduler)、Celery 工作者、RabbitMQ/Redis 队列消费者,或执行异步任务处理时使用。遵循项目异步模式,并与现有服务无缝集成。

SKILL.md
--- frontmatter
name: python-async-workers
description: Generate Python background workers, cron jobs, message queue consumers. Use when creating scheduled tasks (APScheduler), Celery workers, RabbitMQ/Redis queue consumers, or async job processing. Follows project async patterns and integrates with existing services.
allowed-tools:
  - Read
  - Write
  - Edit
  - Glob
  - Grep
  - Bash

Python Async Workers

Background processing patterns for FastAPI applications.

Architecture Overview

code
┌─────────────────────────────────────────────────────────────┐
│                      FastAPI Application                     │
├─────────────────────────────────────────────────────────────┤
│  API Request  │  Cron Scheduler  │  Queue Consumer          │
│      ↓        │        ↓         │        ↓                 │
│  Enqueue Job  │  Trigger Task    │  Process Message         │
│      ↓        │        ↓         │        ↓                 │
│  ┌─────────────────────────────────────────────────────┐   │
│  │                   Service Layer                      │   │
│  │              (Shared Business Logic)                 │   │
│  └─────────────────────────────────────────────────────┘   │
│      ↓                   ↓                  ↓               │
│  Message Queue      Database            External APIs       │
│  (RabbitMQ/Redis)   (PostgreSQL)                            │
└─────────────────────────────────────────────────────────────┘

Components

ComponentPurposeLocation
Cron JobsScheduled recurring tasksapp/jobs/
Celery TasksDistributed task queueapp/tasks/
Queue ConsumersMessage processingapp/consumers/
WorkersBackground processorsapp/workers/

When to Use What

Use CaseSolution
Run every X minutes/hoursCron (APScheduler)
Async processing after API callCelery Task
Process messages from external systemQueue Consumer
Long-running background processWorker with asyncio
Distributed across multiple serversCelery + RabbitMQ
Simple in-process backgroundFastAPI BackgroundTasks

Key Principles

1. Reuse Service Layer

Workers should call existing services, not duplicate logic:

python
# GOOD: Reuse service
async def process_order(order_id: int):
    async with get_session() as session:
        service = OrderService(session)
        await service.process(order_id)

# BAD: Duplicate logic in worker
async def process_order(order_id: int):
    # Don't copy-paste service code here!
    pass

2. Idempotency

All background tasks must be idempotent (safe to retry):

python
# GOOD: Check before processing
async def send_email(user_id: int, email_type: str):
    if await was_email_sent(user_id, email_type):
        return  # Already sent, skip
    await do_send_email(user_id, email_type)
    await mark_email_sent(user_id, email_type)

3. Error Handling

Always handle errors gracefully with retries:

python
@celery.task(bind=True, max_retries=3)
def process_payment(self, payment_id: int):
    try:
        # Process
        pass
    except TransientError as e:
        raise self.retry(exc=e, countdown=60)
    except PermanentError as e:
        # Log and don't retry
        logger.error(f"Payment {payment_id} failed permanently: {e}")

Reference Navigation

Scheduling:

Task Queues:

Message Queues:

Integration with python-backend-development

This skill extends python-backend-development:

  • Workers call Services from app/services/
  • Use same Repository pattern for database access
  • Follow same async patterns (async def, await)
  • Use same error handling (custom exceptions)
  • Share configuration from app/core/config.py