AgentSkillsCN

State Management

LangGraph 状态架构、归约器、检查点机制与持久化存储

SKILL.md
--- frontmatter
description: LangGraph state schemas, reducers, checkpointing, and persistence

State Management

LangGraph state schemas, reducers, checkpointing, and persistence

State Management Skill

Manage agent state in LangGraph workflows - schemas, reducers, and persistence.

State Management Skill

Manage agent state in LangGraph workflows - schemas, reducers, and persistence.

Process

Step 1: Define State Schema

python
from typing import Annotated, TypedDict, Literal
from langgraph.graph.message import add_messages

class AgentState(TypedDict):
    """State schema for the agent."""
    # Messages with reducer (accumulates)
    messages: Annotated[list, add_messages]
    
    # Simple fields (overwrite on update)
    current_step: str
    iteration: int
    
    # Complex nested state
    context: dict
    results: list

Step 2: Custom Reducers

python
from operator import add
from typing import Annotated

def merge_dicts(left: dict, right: dict) -> dict:
    """Merge two dictionaries, right overwrites left."""
    return {**left, **right}

def append_unique(left: list, right: list) -> list:
    """Append only unique items."""
    return list(set(left + right))

class AdvancedState(TypedDict):
    # Accumulate messages
    messages: Annotated[list, add_messages]
    
    # Accumulate numeric values
    token_count: Annotated[int, add]
    
    # Merge dictionaries
    metadata: Annotated[dict, merge_dicts]
    
    # Unique list items
    visited_nodes: Annotated[list[str], append_unique]
    
    # Simple overwrite
    status: str

Step 3: State Updates in Nodes

python
from langgraph.graph import StateGraph

async def process_node(state: AgentState) -> dict:
    """Node that updates state."""
    # Return only the fields to update
    return {
        "current_step": "processing",
        "iteration": state["iteration"] + 1,
        "context": {**state["context"], "processed": True}
    }

async def accumulate_node(state: AdvancedState) -> dict:
    """Node using reducers."""
    return {
        "token_count": 150,  # Will be added to existing
        "metadata": {"source": "api"},  # Will be merged
        "visited_nodes": ["accumulate"],  # Will be appended uniquely
    }

Step 4: Checkpointing with MemorySaver

python
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, END

# Create checkpointer
checkpointer = MemorySaver()

# Build graph
graph = StateGraph(AgentState)
graph.add_node("process", process_node)
graph.add_node("decide", decide_node)
graph.set_entry_point("process")
graph.add_edge("process", "decide")
graph.add_conditional_edges("decide", route_function, {"continue": "process", "end": END})

# Compile with checkpointing
app = graph.compile(checkpointer=checkpointer)

# Run with thread ID for persistence
config = {"configurable": {"thread_id": "task_001"}}
result = await app.ainvoke(initial_state, config)

# Resume later with same thread ID
resumed_result = await app.ainvoke(None, config)

Step 5: Redis Checkpointer

python
from langgraph.checkpoint.base import BaseCheckpointSaver
import redis
import json
from datetime import datetime

class RedisCheckpointer(BaseCheckpointSaver):
    """Redis-backed checkpointer for distributed systems."""
    
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.client = redis.from_url(redis_url)
        self.ttl = 86400 * 7  # 7 days
    
    def _key(self, thread_id: str, checkpoint_id: str) -> str:
        return f"checkpoint:{thread_id}:{checkpoint_id}"
    
    def get(self, config: dict) -> dict | None:
        thread_id = config["configurable"]["thread_id"]
        checkpoint_id = config["configurable"].get("checkpoint_id", "latest")
        
        if checkpoint_id == "latest":
            # Get most recent
            keys = self.client.keys(f"checkpoint:{thread_id}:*")
            if not keys:
                return None
            latest_key = sorted(keys)[-1]
            data = self.client.get(latest_key)
        else:
            data = self.client.get(self._key(thread_id, checkpoint_id))
        
        return json.loads(data) if data else None
    
    def put(self, config: dict, checkpoint: dict) -> dict:
        thread_id = config["configurable"]["thread_id"]
        checkpoint_id = datetime.now().isoformat()
        
        key = self._key(thread_id, checkpoint_id)
        self.client.setex(key, self.ttl, json.dumps(checkpoint))
        
        return {"configurable": {"thread_id": thread_id, "checkpoint_id": checkpoint_id}}

# Use Redis checkpointer
redis_checkpointer = RedisCheckpointer()
app = graph.compile(checkpointer=redis_checkpointer)

Step 6: PostgreSQL Checkpointer

python
from langgraph.checkpoint.postgres import PostgresSaver
import asyncpg

# Async PostgreSQL checkpointer
async def create_postgres_checkpointer():
    conn = await asyncpg.connect("postgresql://user:pass@localhost/db")
    
    # Create table if not exists
    await conn.execute("""
        CREATE TABLE IF NOT EXISTS checkpoints (
            thread_id TEXT,
            checkpoint_id TEXT,
            state JSONB,
            created_at TIMESTAMP DEFAULT NOW(),
            PRIMARY KEY (thread_id, checkpoint_id)
        )
    """)
    
    return PostgresSaver(conn)

# Use
checkpointer = await create_postgres_checkpointer()
app = graph.compile(checkpointer=checkpointer)

Step 7: State Visualization

python
def visualize_state(state: AgentState) -> str:
    """Create visual representation of state."""
    lines = [
        "┌─ Agent State ─────────────────┐",
        f"│ Step: {state['current_step']:<22} │",
        f"│ Iteration: {state['iteration']:<17} │",
        f"│ Messages: {len(state['messages']):<18} │",
        "├───────────────────────────────┤",
    ]
    
    for key, value in state.get('context', {}).items():
        lines.append(f"│ {key}: {str(value)[:20]:<20} │")
    
    lines.append("└───────────────────────────────┘")
    return "\n".join(lines)

# In node
async def debug_node(state: AgentState) -> dict:
    print(visualize_state(state))
    return {}
python
from typing import Annotated, TypedDict, Literal
from langgraph.graph.message import add_messages

class AgentState(TypedDict):
    """State schema for the agent."""
    # Messages with reducer (accumulates)
    messages: Annotated[list, add_messages]
    
    # Simple fields (overwrite on update)
    current_step: str
    iteration: int
    
    # Complex nested state
    context: dict
    results: list
python
from operator import add
from typing import Annotated

def merge_dicts(left: dict, right: dict) -> dict:
    """Merge two dictionaries, right overwrites left."""
    return {**left, **right}

def append_unique(left: list, right: list) -> list:
    """Append only unique items."""
    return list(set(left + right))

class AdvancedState(TypedDict):
    # Accumulate messages
    messages: Annotated[list, add_messages]
    
    # Accumulate numeric values
    token_count: Annotated[int, add]
    
    # Merge dictionaries
    metadata: Annotated[dict, merge_dicts]
    
    # Unique list items
    visited_nodes: Annotated[list[str], append_unique]
    
    # Simple overwrite
    status: str
python
from langgraph.graph import StateGraph

async def process_node(state: AgentState) -> dict:
    """Node that updates state."""
    # Return only the fields to update
    return {
        "current_step": "processing",
        "iteration": state["iteration"] + 1,
        "context": {**state["context"], "processed": True}
    }

async def accumulate_node(state: AdvancedState) -> dict:
    """Node using reducers."""
    return {
        "token_count": 150,  # Will be added to existing
        "metadata": {"source": "api"},  # Will be merged
        "visited_nodes": ["accumulate"],  # Will be appended uniquely
    }
python
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, END

# Create checkpointer
checkpointer = MemorySaver()

# Build graph
graph = StateGraph(AgentState)
graph.add_node("process", process_node)
graph.add_node("decide", decide_node)
graph.set_entry_point("process")
graph.add_edge("process", "decide")
graph.add_conditional_edges("decide", route_function, {"continue": "process", "end": END})

# Compile with checkpointing
app = graph.compile(checkpointer=checkpointer)

# Run with thread ID for persistence
config = {"configurable": {"thread_id": "task_001"}}
result = await app.ainvoke(initial_state, config)

# Resume later with same thread ID
resumed_result = await app.ainvoke(None, config)
python
from langgraph.checkpoint.base import BaseCheckpointSaver
import redis
import json
from datetime import datetime

class RedisCheckpointer(BaseCheckpointSaver):
    """Redis-backed checkpointer for distributed systems."""
    
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.client = redis.from_url(redis_url)
        self.ttl = 86400 * 7  # 7 days
    
    def _key(self, thread_id: str, checkpoint_id: str) -> str:
        return f"checkpoint:{thread_id}:{checkpoint_id}"
    
    def get(self, config: dict) -> dict | None:
        thread_id = config["configurable"]["thread_id"]
        checkpoint_id = config["configurable"].get("checkpoint_id", "latest")
        
        if checkpoint_id == "latest":
            # Get most recent
            keys = self.client.keys(f"checkpoint:{thread_id}:*")
            if not keys:
                return None
            latest_key = sorted(keys)[-1]
            data = self.client.get(latest_key)
        else:
            data = self.client.get(self._key(thread_id, checkpoint_id))
        
        return json.loads(data) if data else None
    
    def put(self, config: dict, checkpoint: dict) -> dict:
        thread_id = config["configurable"]["thread_id"]
        checkpoint_id = datetime.now().isoformat()
        
        key = self._key(thread_id, checkpoint_id)
        self.client.setex(key, self.ttl, json.dumps(checkpoint))
        
        return {"configurable": {"thread_id": thread_id, "checkpoint_id": checkpoint_id}}

# Use Redis checkpointer
redis_checkpointer = RedisCheckpointer()
app = graph.compile(checkpointer=redis_checkpointer)
python
from langgraph.checkpoint.postgres import PostgresSaver
import asyncpg

# Async PostgreSQL checkpointer
async def create_postgres_checkpointer():
    conn = await asyncpg.connect("postgresql://user:pass@localhost/db")
    
    # Create table if not exists
    await conn.execute("""
        CREATE TABLE IF NOT EXISTS checkpoints (
            thread_id TEXT,
            checkpoint_id TEXT,
            state JSONB,
            created_at TIMESTAMP DEFAULT NOW(),
            PRIMARY KEY (thread_id, checkpoint_id)
        )
    """)
    
    return PostgresSaver(conn)

# Use
checkpointer = await create_postgres_checkpointer()
app = graph.compile(checkpointer=checkpointer)
python
def visualize_state(state: AgentState) -> str:
    """Create visual representation of state."""
    lines = [
        "┌─ Agent State ─────────────────┐",
        f"│ Step: {state['current_step']:<22} │",
        f"│ Iteration: {state['iteration']:<17} │",
        f"│ Messages: {len(state['messages']):<18} │",
        "├───────────────────────────────┤",
    ]
    
    for key, value in state.get('context', {}).items():
        lines.append(f"│ {key}: {str(value)[:20]:<20} │")
    
    lines.append("└───────────────────────────────┘")
    return "\n".join(lines)

# In node
async def debug_node(state: AgentState) -> dict:
    print(visualize_state(state))
    return {}

State Patterns

PatternUse Case
Message AccumulationChat history
Counter ReducerToken counting, iterations
Dict MergeMetadata aggregation
Set ReducerUnique items tracking
OverwriteCurrent status, step

Checkpointer Comparison

BackendUse CaseProsCons
MemorySaverDevelopmentFast, simpleNot persistent
RedisDistributedFast, TTLMemory-bound
PostgreSQLProductionDurable, queryableSlower
SQLiteLocal prodDurable, simpleSingle node

Best Practices

  • Define clear state schemas with TypedDict
  • Use appropriate reducers for each field
  • Implement checkpointing for production
  • Use thread IDs for multi-user scenarios
  • Clean up old checkpoints periodically
  • Validate state transitions

Anti-Patterns

Anti-PatternFix
Mutable stateReturn new state dict
No checkpointingAdd MemorySaver minimum
Unbounded stateCompress/prune old data
No schemaUse TypedDict

Related

  • Knowledge: knowledge/state-patterns.json
  • Skill: langgraph-agent-building
  • Skill: memory-management

Prerequisites

[!IMPORTANT] Requirements:

  • Packages: langgraph
  • Knowledge: state-patterns.json