skill
---
name: RabbitMQ
description: Message broker for async communication and task queues
---
# RabbitMQ
## Core Concepts
```
Producer → Exchange → Queue → Consumer
↓
Binding (routing rules)
```
- **Producer**: Sends messages
- **Exchange**: Routes messages to queues
- **Queue**: Stores messages
- **Consumer**: Receives and processes messages
- **Binding**: Rules connecting exchanges to queues
## Exchange Types
| Type | Routing |
|------|---------|
| **direct** | Exact routing key match |
| **fanout** | Broadcast to all bound queues |
| **topic** | Pattern matching (`*.error`, `log.#`) |
| **headers** | Match on headers |
## Python (pika)
### Publisher
```python
import pika
import json
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Declare exchange and queue
channel.exchange_declare(exchange='events', exchange_type='topic', durable=True)
channel.queue_declare(queue='orders', durable=True)
channel.queue_bind(exchange='events', queue='orders', routing_key='order.*')
# Publish message
channel.basic_publish(
exchange='events',
routing_key='order.created',
body=json.dumps({"order_id": 123}),
properties=pika.BasicProperties(
delivery_mode=2, # Persistent
content_type='application/json',
)
)
```
### Consumer
```python
def callback(ch, method, properties, body):
data = json.loads(body)
process_order(data)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1) # One message at a time
channel.basic_consume(queue='orders', on_message_callback=callback)
channel.start_consuming()
```
## Message Acknowledgment
```python
# Manual ACK (recommended)
channel.basic_consume(queue='tasks', on_message_callback=callback, auto_ack=False)
# In callback
ch.basic_ack(delivery_tag=method.delivery_tag) # Success
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True) # Retry
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False) # Discard
```
## Reliability Patterns
### Publisher Confirms
```python
channel.confirm_delivery()
try:
channel.basic_publish(exchange='', routing_key='queue', body=message)
except pika.exceptions.UnroutableError:
print("Message was not delivered")
```
### Dead Letter Queue
```python
# Main queue with DLX
channel.queue_declare(
queue='tasks',
arguments={
'x-dead-letter-exchange': 'dlx',
'x-dead-letter-routing-key': 'failed',
'x-message-ttl': 86400000, # 24 hours
}
)
# DLQ
channel.queue_declare(queue='failed_tasks')
channel.queue_bind(exchange='dlx', queue='failed_tasks', routing_key='failed')
```
## Queue Options
```python
channel.queue_declare(
queue='tasks',
durable=True, # Survive broker restart
exclusive=False, # Allow multiple consumers
auto_delete=False, # Keep queue when no consumers
arguments={
'x-max-length': 10000, # Max messages
'x-max-priority': 10, # Priority queue
'x-queue-type': 'quorum', # High availability
}
)
```
## With Celery
Celery uses RabbitMQ as message broker:
```python
celery_app = Celery(
broker='amqp://user:pass@localhost:5672/vhost',
task_queues=[
Queue('default', routing_key='task.#'),
Queue('priority', routing_key='priority.#'),
]
)
# Route tasks to specific queues
celery_app.conf.task_routes = {
'app.tasks.urgent_task': {'queue': 'priority'},
}
```
## Management
```bash
# CLI
rabbitmqctl list_queues name messages consumers
rabbitmqctl list_exchanges
rabbitmqctl purge_queue queue_name
# Management UI (port 15672)
# Enable: rabbitmq-plugins enable rabbitmq_management
```
## Best Practices
1. **Durability**: Set `durable=True` for queues and `delivery_mode=2` for messages
2. **Acknowledgment**: Use manual ACK with `auto_ack=False`
3. **Prefetch**: Set `prefetch_count` to control consumer throughput
4. **Dead Letters**: Configure DLX for failed message handling
5. **Monitoring**: Use management plugin for visibility
## This Project
- Broker URL: `amqp://guest:guest@rabbitmq:5672//`
- Used by Celery for task distribution
- Management UI: http://localhost:15672