skill
---
name: Celery Background Tasks
description: Celery task patterns, worker management, and async job processing
---
# Celery Background Tasks
## Architecture
```
Producer (API) → Broker (RabbitMQ) → Consumer (Worker) → Backend (Redis)
```
## Task Definition
### Basic Task
```python
from celery import Celery
celery_app = Celery("app", broker="amqp://...", backend="redis://...")
@celery_app.task
def my_task(arg1: str) -> dict:
return {"result": arg1}
```
### Task with Retry
```python
@celery_app.task(bind=True, max_retries=3, default_retry_delay=60)
def risky_task(self, data: dict) -> dict:
try:
result = external_api_call(data)
return result
except TemporaryError as exc:
raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))
```
## Task Invocation
```python
# Async (fire and forget)
task = my_task.delay(arg1="value")
task_id = task.id
# Check result
from celery.result import AsyncResult
result = AsyncResult(task_id, app=celery_app)
if result.ready():
data = result.get() if result.successful() else result.info
```
## Configuration Best Practices
```python
celery_app.conf.update(
task_serializer="json",
task_acks_late=True, # ACK after completion (reliability)
worker_prefetch_multiplier=1, # One task at a time (prevents loss)
result_expires=86400, # 24 hours
task_time_limit=3600, # 1 hour max
)
```
## Worker Management
```bash
# Start worker
celery -A app.core.celery_app worker --loglevel=info --concurrency=4
# Options
--concurrency=N # Worker processes
--queues=q1,q2 # Listen to specific queues
--autoscale=10,3 # Dynamic scaling (max,min)
```
## Task Patterns
### Chaining
```python
from celery import chain
workflow = chain(task1.s(arg), task2.s(), task3.s())
workflow.apply_async()
```
### Parallel Execution
```python
from celery import group
job = group(task.s(id=1), task.s(id=2), task.s(id=3))
result = job.apply_async()
```
## Periodic Tasks (Celery Beat)
```python
from celery.schedules import crontab
celery_app.conf.beat_schedule = {
"daily-cleanup": {
"task": "cleanup_task",
"schedule": crontab(hour=2, minute=0),
},
}
```
## Testing Mode
```python
# Sync execution for tests
celery_app.conf.task_always_eager = True
```
## Monitoring
- Flower UI: Real-time task monitoring
- `celery inspect active`: View active tasks
- `celery inspect registered`: List registered tasks
- `celery purge`: Clear all pending tasks
## This Project
- Tasks: `app/core/tasks.py`, `app/webhooks/tasks.py`
- Config: `app/core/celery_app.py`
- Flower: http://localhost:5555