AgentSkillsCN

Redis

Redis

SKILL.md
skill
---
name: Redis
description: In-memory data store for caching, rate limiting, and session management
---

# Redis

## Core Data Types

### Strings
```bash
SET key "value"
GET key
SETEX key 3600 "value"     # With TTL (1 hour)
INCR counter               # Atomic increment
INCRBY counter 10
```

### Hashes
```bash
HSET user:1 name "John" age 30
HGET user:1 name
HGETALL user:1
HINCRBY user:1 age 1
```

### Lists
```bash
LPUSH queue "item"         # Add to left
RPOP queue                 # Remove from right
LRANGE queue 0 -1          # Get all
LLEN queue
```

### Sets
```bash
SADD tags "python" "redis"
SMEMBERS tags
SISMEMBER tags "python"
```

### Sorted Sets
```bash
ZADD leaderboard 100 "user:1" 200 "user:2"
ZRANGE leaderboard 0 -1 WITHSCORES
ZINCRBY leaderboard 10 "user:1"
```

## Python (redis-py)

```python
import redis.asyncio as redis

# Connection
client = redis.from_url("redis://localhost:6379/0")

# Basic operations
await client.set("key", "value", ex=3600)  # With 1h TTL
value = await client.get("key")

# Atomic operations
count = await client.incr("counter")
await client.incrby("counter", 10)

# Hash operations
await client.hset("user:1", mapping={"name": "John", "age": 30})
user = await client.hgetall("user:1")

# TTL management
await client.expire("key", 3600)
ttl = await client.ttl("key")
await client.delete("key")
```

## Rate Limiting Pattern

```python
async def rate_limit(client: redis.Redis, key: str, limit: int, window: int) -> tuple[bool, int]:
    """
    Sliding window rate limiter.
    Returns (allowed, remaining).
    """
    current = await client.incr(key)
    if current == 1:
        await client.expire(key, window)

    if current > limit:
        return False, 0
    return True, limit - current
```

## Caching Pattern

```python
async def get_cached(key: str, fetch_func, ttl: int = 300):
    """Cache-aside pattern."""
    cached = await client.get(key)
    if cached:
        return json.loads(cached)

    data = await fetch_func()
    await client.setex(key, ttl, json.dumps(data))
    return data
```

## Distributed Locking

```python
async def acquire_lock(client: redis.Redis, name: str, timeout: int = 10) -> bool:
    """Simple distributed lock using SET NX."""
    lock_key = f"lock:{name}"
    acquired = await client.set(lock_key, "1", nx=True, ex=timeout)
    return bool(acquired)

async def release_lock(client: redis.Redis, name: str):
    await client.delete(f"lock:{name}")
```

## Pub/Sub

```python
# Publisher
await client.publish("channel", json.dumps({"event": "update"}))

# Subscriber
pubsub = client.pubsub()
await pubsub.subscribe("channel")
async for message in pubsub.listen():
    if message["type"] == "message":
        data = json.loads(message["data"])
```

## Best Practices

1. **Key Naming**: Use colons for hierarchy: `user:1:sessions`
2. **TTL**: Always set expiration for cache keys
3. **Serialization**: JSON for complex objects
4. **Connection Pooling**: Use connection pools in production
5. **Pipelining**: Batch operations for performance

```python
# Pipeline example
async with client.pipeline(transaction=True) as pipe:
    pipe.incr("counter1")
    pipe.incr("counter2")
    results = await pipe.execute()
```

## Monitoring
```bash
redis-cli INFO                 # Server stats
redis-cli MONITOR              # Real-time commands
redis-cli DBSIZE               # Key count
redis-cli MEMORY USAGE key     # Memory for specific key
```

## This Project
- Rate limiting: `app/core/rate_limit.py`
- Connection: `app/core/redis.py`
- Celery result backend