AgentSkillsCN

Celery

Celery

SKILL.md
skill
---
name: Celery Background Tasks
description: Celery task patterns, worker management, and async job processing
---

# Celery Background Tasks

## Architecture
```
Producer (API) → Broker (RabbitMQ) → Consumer (Worker) → Backend (Redis)
```

## Task Definition

### Basic Task
```python
from celery import Celery

celery_app = Celery("app", broker="amqp://...", backend="redis://...")

@celery_app.task
def my_task(arg1: str) -> dict:
    return {"result": arg1}
```

### Task with Retry
```python
@celery_app.task(bind=True, max_retries=3, default_retry_delay=60)
def risky_task(self, data: dict) -> dict:
    try:
        result = external_api_call(data)
        return result
    except TemporaryError as exc:
        raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))
```

## Task Invocation

```python
# Async (fire and forget)
task = my_task.delay(arg1="value")
task_id = task.id

# Check result
from celery.result import AsyncResult
result = AsyncResult(task_id, app=celery_app)
if result.ready():
    data = result.get() if result.successful() else result.info
```

## Configuration Best Practices
```python
celery_app.conf.update(
    task_serializer="json",
    task_acks_late=True,           # ACK after completion (reliability)
    worker_prefetch_multiplier=1,  # One task at a time (prevents loss)
    result_expires=86400,          # 24 hours
    task_time_limit=3600,          # 1 hour max
)
```

## Worker Management
```bash
# Start worker
celery -A app.core.celery_app worker --loglevel=info --concurrency=4

# Options
--concurrency=N     # Worker processes
--queues=q1,q2      # Listen to specific queues
--autoscale=10,3    # Dynamic scaling (max,min)
```

## Task Patterns

### Chaining
```python
from celery import chain
workflow = chain(task1.s(arg), task2.s(), task3.s())
workflow.apply_async()
```

### Parallel Execution
```python
from celery import group
job = group(task.s(id=1), task.s(id=2), task.s(id=3))
result = job.apply_async()
```

## Periodic Tasks (Celery Beat)
```python
from celery.schedules import crontab

celery_app.conf.beat_schedule = {
    "daily-cleanup": {
        "task": "cleanup_task",
        "schedule": crontab(hour=2, minute=0),
    },
}
```

## Testing Mode
```python
# Sync execution for tests
celery_app.conf.task_always_eager = True
```

## Monitoring
- Flower UI: Real-time task monitoring
- `celery inspect active`: View active tasks
- `celery inspect registered`: List registered tasks
- `celery purge`: Clear all pending tasks

## This Project
- Tasks: `app/core/tasks.py`, `app/webhooks/tasks.py`
- Config: `app/core/celery_app.py`
- Flower: http://localhost:5555