skill
---
name: Redis
description: In-memory data store for caching, rate limiting, and session management
---
# Redis
## Core Data Types
### Strings
```bash
SET key "value"
GET key
SETEX key 3600 "value" # With TTL (1 hour)
INCR counter # Atomic increment
INCRBY counter 10
```
### Hashes
```bash
HSET user:1 name "John" age 30
HGET user:1 name
HGETALL user:1
HINCRBY user:1 age 1
```
### Lists
```bash
LPUSH queue "item" # Add to left
RPOP queue # Remove from right
LRANGE queue 0 -1 # Get all
LLEN queue
```
### Sets
```bash
SADD tags "python" "redis"
SMEMBERS tags
SISMEMBER tags "python"
```
### Sorted Sets
```bash
ZADD leaderboard 100 "user:1" 200 "user:2"
ZRANGE leaderboard 0 -1 WITHSCORES
ZINCRBY leaderboard 10 "user:1"
```
## Python (redis-py)
```python
import redis.asyncio as redis
# Connection
client = redis.from_url("redis://localhost:6379/0")
# Basic operations
await client.set("key", "value", ex=3600) # With 1h TTL
value = await client.get("key")
# Atomic operations
count = await client.incr("counter")
await client.incrby("counter", 10)
# Hash operations
await client.hset("user:1", mapping={"name": "John", "age": 30})
user = await client.hgetall("user:1")
# TTL management
await client.expire("key", 3600)
ttl = await client.ttl("key")
await client.delete("key")
```
## Rate Limiting Pattern
```python
async def rate_limit(client: redis.Redis, key: str, limit: int, window: int) -> tuple[bool, int]:
"""
Sliding window rate limiter.
Returns (allowed, remaining).
"""
current = await client.incr(key)
if current == 1:
await client.expire(key, window)
if current > limit:
return False, 0
return True, limit - current
```
## Caching Pattern
```python
async def get_cached(key: str, fetch_func, ttl: int = 300):
"""Cache-aside pattern."""
cached = await client.get(key)
if cached:
return json.loads(cached)
data = await fetch_func()
await client.setex(key, ttl, json.dumps(data))
return data
```
## Distributed Locking
```python
async def acquire_lock(client: redis.Redis, name: str, timeout: int = 10) -> bool:
"""Simple distributed lock using SET NX."""
lock_key = f"lock:{name}"
acquired = await client.set(lock_key, "1", nx=True, ex=timeout)
return bool(acquired)
async def release_lock(client: redis.Redis, name: str):
await client.delete(f"lock:{name}")
```
## Pub/Sub
```python
# Publisher
await client.publish("channel", json.dumps({"event": "update"}))
# Subscriber
pubsub = client.pubsub()
await pubsub.subscribe("channel")
async for message in pubsub.listen():
if message["type"] == "message":
data = json.loads(message["data"])
```
## Best Practices
1. **Key Naming**: Use colons for hierarchy: `user:1:sessions`
2. **TTL**: Always set expiration for cache keys
3. **Serialization**: JSON for complex objects
4. **Connection Pooling**: Use connection pools in production
5. **Pipelining**: Batch operations for performance
```python
# Pipeline example
async with client.pipeline(transaction=True) as pipe:
pipe.incr("counter1")
pipe.incr("counter2")
results = await pipe.execute()
```
## Monitoring
```bash
redis-cli INFO # Server stats
redis-cli MONITOR # Real-time commands
redis-cli DBSIZE # Key count
redis-cli MEMORY USAGE key # Memory for specific key
```
## This Project
- Rate limiting: `app/core/rate_limit.py`
- Connection: `app/core/redis.py`
- Celery result backend