AgentSkillsCN

Rabbitmq

RabbitMQ

SKILL.md
skill
---
name: RabbitMQ
description: Message broker for async communication and task queues
---

# RabbitMQ

## Core Concepts

```
Producer → Exchange → Queue → Consumer
                ↓
           Binding (routing rules)
```

- **Producer**: Sends messages
- **Exchange**: Routes messages to queues
- **Queue**: Stores messages
- **Consumer**: Receives and processes messages
- **Binding**: Rules connecting exchanges to queues

## Exchange Types

| Type | Routing |
|------|---------|
| **direct** | Exact routing key match |
| **fanout** | Broadcast to all bound queues |
| **topic** | Pattern matching (`*.error`, `log.#`) |
| **headers** | Match on headers |

## Python (pika)

### Publisher
```python
import pika
import json

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Declare exchange and queue
channel.exchange_declare(exchange='events', exchange_type='topic', durable=True)
channel.queue_declare(queue='orders', durable=True)
channel.queue_bind(exchange='events', queue='orders', routing_key='order.*')

# Publish message
channel.basic_publish(
    exchange='events',
    routing_key='order.created',
    body=json.dumps({"order_id": 123}),
    properties=pika.BasicProperties(
        delivery_mode=2,  # Persistent
        content_type='application/json',
    )
)
```

### Consumer
```python
def callback(ch, method, properties, body):
    data = json.loads(body)
    process_order(data)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)  # One message at a time
channel.basic_consume(queue='orders', on_message_callback=callback)
channel.start_consuming()
```

## Message Acknowledgment

```python
# Manual ACK (recommended)
channel.basic_consume(queue='tasks', on_message_callback=callback, auto_ack=False)

# In callback
ch.basic_ack(delivery_tag=method.delivery_tag)      # Success
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)  # Retry
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)  # Discard
```

## Reliability Patterns

### Publisher Confirms
```python
channel.confirm_delivery()
try:
    channel.basic_publish(exchange='', routing_key='queue', body=message)
except pika.exceptions.UnroutableError:
    print("Message was not delivered")
```

### Dead Letter Queue
```python
# Main queue with DLX
channel.queue_declare(
    queue='tasks',
    arguments={
        'x-dead-letter-exchange': 'dlx',
        'x-dead-letter-routing-key': 'failed',
        'x-message-ttl': 86400000,  # 24 hours
    }
)

# DLQ
channel.queue_declare(queue='failed_tasks')
channel.queue_bind(exchange='dlx', queue='failed_tasks', routing_key='failed')
```

## Queue Options

```python
channel.queue_declare(
    queue='tasks',
    durable=True,        # Survive broker restart
    exclusive=False,     # Allow multiple consumers
    auto_delete=False,   # Keep queue when no consumers
    arguments={
        'x-max-length': 10000,       # Max messages
        'x-max-priority': 10,        # Priority queue
        'x-queue-type': 'quorum',    # High availability
    }
)
```

## With Celery

Celery uses RabbitMQ as message broker:
```python
celery_app = Celery(
    broker='amqp://user:pass@localhost:5672/vhost',
    task_queues=[
        Queue('default', routing_key='task.#'),
        Queue('priority', routing_key='priority.#'),
    ]
)

# Route tasks to specific queues
celery_app.conf.task_routes = {
    'app.tasks.urgent_task': {'queue': 'priority'},
}
```

## Management

```bash
# CLI
rabbitmqctl list_queues name messages consumers
rabbitmqctl list_exchanges
rabbitmqctl purge_queue queue_name

# Management UI (port 15672)
# Enable: rabbitmq-plugins enable rabbitmq_management
```

## Best Practices

1. **Durability**: Set `durable=True` for queues and `delivery_mode=2` for messages
2. **Acknowledgment**: Use manual ACK with `auto_ack=False`
3. **Prefetch**: Set `prefetch_count` to control consumer throughput
4. **Dead Letters**: Configure DLX for failed message handling
5. **Monitoring**: Use management plugin for visibility

## This Project
- Broker URL: `amqp://guest:guest@rabbitmq:5672//`
- Used by Celery for task distribution
- Management UI: http://localhost:15672