AgentSkillsCN

langgraph

智能体编排:StateGraph、工具调用、中断机制、检查点管理、人机协同工作流。 触发方式:StateGraph、智能体、图结构、interrupt_before、工具节点、get_state()、Command、条件路由。 适用范围:仅限智能体逻辑——不适用于FastAPI路由或异步I/O模式(相关技能请另寻他途)。

SKILL.md
--- frontmatter
name: langgraph
description: |
  Agent orchestration: StateGraph, tool calling, interrupt, checkpointer, human-in-the-loop workflows.
  Triggers: StateGraph, agent, graph, interrupt_before, tool node, get_state(), Command, conditional routing.
  Scope: Agent logic only - NOT for FastAPI routes or async I/O patterns (see those skills instead).
source: vibeship-spawner-skills (Apache 2.0)

LangGraph

Role: LangGraph Agent Architect

You are an expert in building production-grade AI agents with LangGraph. You understand that agents need explicit structure - graphs make the flow visible and debuggable. You design state carefully, use reducers appropriately, and always consider persistence for production. You know when cycles are needed and how to prevent infinite loops.

Capabilities

  • Graph construction (StateGraph)
  • State management and reducers
  • Node and edge definitions
  • Conditional routing
  • Checkpointers and persistence
  • Human-in-the-loop patterns
  • Tool integration
  • Streaming and async execution

Requirements

  • Python 3.9+
  • langgraph package
  • LLM API access (OpenAI, Anthropic, etc.)
  • Understanding of graph concepts

When to Use (AI Sales Agent Project)

WeekTaskPattern Needed
3.1Define agent StateState with Reducers
3.2Async tool check_stockBasic Agent Graph + async tools
3.4Build StateGraphBasic Agent Graph
3.5Router node (Search vs Chat)Conditional Branching
4.1interrupt_before=["order_node"]Human-in-the-Loop (below)
4.2graph.get_state(config)State Inspection

Patterns

Basic Agent Graph

Simple ReAct-style agent with tools

When to use: Single agent with tool calling

python
from typing import Annotated, TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool

# 1. Define State
class AgentState(TypedDict):
    messages: Annotated[list, add_messages]
    # add_messages reducer appends, doesn't overwrite

# 2. Define Tools
@tool
def search(query: str) -> str:
    """Search the web for information."""
    # Implementation here
    return f"Results for: {query}"

@tool
def calculator(expression: str) -> str:
    """Evaluate a math expression."""
    return str(eval(expression))

tools = [search, calculator]

# 3. Create LLM with tools
llm = ChatOpenAI(model="gpt-4o").bind_tools(tools)

# 4. Define Nodes
def agent(state: AgentState) -> dict:
    """The agent node - calls LLM."""
    response = llm.invoke(state["messages"])
    return {"messages": [response]}

# Tool node handles tool execution
tool_node = ToolNode(tools)

# 5. Define Routing
def should_continue(state: AgentState) -> str:
    """Route based on whether tools were called."""
    last_message = state["messages"][-1]
    if last_message.tool_calls:
        return "tools"
    return END

# 6. Build Graph
graph = StateGraph(AgentState)

# Add nodes
graph.add_node("agent", agent)
graph.add_node("tools", tool_node)

# Add edges
graph.add_edge(START, "agent")
graph.add_conditional_edges("agent", should_continue, ["tools", END])
graph.add_edge("tools", "agent")  # Loop back

# Compile
app = graph.compile()

# 7. Run
result = app.invoke({
    "messages": [("user", "What is 25 * 4?")]
})

State with Reducers

Complex state management with custom reducers

When to use: Multiple agents updating shared state

python
from typing import Annotated, TypedDict
from operator import add
from langgraph.graph import StateGraph

# Custom reducer for merging dictionaries
def merge_dicts(left: dict, right: dict) -> dict:
    return {**left, **right}

# State with multiple reducers
class ResearchState(TypedDict):
    # Messages append (don't overwrite)
    messages: Annotated[list, add_messages]

    # Research findings merge
    findings: Annotated[dict, merge_dicts]

    # Sources accumulate
    sources: Annotated[list[str], add]

    # Current step (overwrites - no reducer)
    current_step: str

    # Error count (custom reducer)
    errors: Annotated[int, lambda a, b: a + b]

# Nodes return partial state updates
def researcher(state: ResearchState) -> dict:
    # Only return fields being updated
    return {
        "findings": {"topic_a": "New finding"},
        "sources": ["source1.com"],
        "current_step": "researching"
    }

def writer(state: ResearchState) -> dict:
    # Access accumulated state
    all_findings = state["findings"]
    all_sources = state["sources"]

    return {
        "messages": [("assistant", f"Report based on {len(all_sources)} sources")],
        "current_step": "writing"
    }

# Build graph
graph = StateGraph(ResearchState)
graph.add_node("researcher", researcher)
graph.add_node("writer", writer)
# ... add edges

Conditional Branching

Route to different paths based on state

When to use: Multiple possible workflows

python
from langgraph.graph import StateGraph, START, END

class RouterState(TypedDict):
    query: str
    query_type: str
    result: str

def classifier(state: RouterState) -> dict:
    """Classify the query type."""
    query = state["query"].lower()
    if "code" in query or "program" in query:
        return {"query_type": "coding"}
    elif "search" in query or "find" in query:
        return {"query_type": "search"}
    else:
        return {"query_type": "chat"}

def coding_agent(state: RouterState) -> dict:
    return {"result": "Here's your code..."}

def search_agent(state: RouterState) -> dict:
    return {"result": "Search results..."}

def chat_agent(state: RouterState) -> dict:
    return {"result": "Let me help..."}

# Routing function
def route_query(state: RouterState) -> str:
    """Route to appropriate agent."""
    query_type = state["query_type"]
    return query_type  # Returns node name

# Build graph
graph = StateGraph(RouterState)

graph.add_node("classifier", classifier)
graph.add_node("coding", coding_agent)
graph.add_node("search", search_agent)
graph.add_node("chat", chat_agent)

graph.add_edge(START, "classifier")

# Conditional edges from classifier
graph.add_conditional_edges(
    "classifier",
    route_query,
    {
        "coding": "coding",
        "search": "search",
        "chat": "chat"
    }
)

# All agents lead to END
graph.add_edge("coding", END)
graph.add_edge("search", END)
graph.add_edge("chat", END)

app = graph.compile()

Anti-Patterns

❌ Infinite Loop Without Exit

Why bad: Agent loops forever. Burns tokens and costs. Eventually errors out.

Instead: Always have exit conditions:

  • Max iterations counter in state
  • Clear END conditions in routing
  • Timeout at application level

def should_continue(state): if state["iterations"] > 10: return END if state["task_complete"]: return END return "agent"

❌ Stateless Nodes

Why bad: Loses LangGraph's benefits. State not persisted. Can't resume conversations.

Instead: Always use state for data flow. Return state updates from nodes. Use reducers for accumulation. Let LangGraph manage state.

❌ Giant Monolithic State

Why bad: Hard to reason about. Unnecessary data in context. Serialization overhead.

Instead: Use input/output schemas for clean interfaces. Private state for internal data. Clear separation of concerns.

Human-in-the-Loop Pattern

Interrupt execution for human approval (Week 4)

When to use: Order confirmation, sensitive actions, admin review

python
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver

class OrderState(TypedDict):
    messages: Annotated[list, add_messages]
    order_details: dict
    approved: bool

def prepare_order(state: OrderState) -> dict:
    """Prepare order for review."""
    return {"order_details": {"items": [...], "total": 150000}}

def confirm_order(state: OrderState) -> dict:
    """Execute order after approval."""
    # Only runs after human approves
    return {"messages": [("assistant", "Đơn hàng đã được xác nhận!")]}

# Build graph with interrupt
graph = StateGraph(OrderState)
graph.add_node("prepare", prepare_order)
graph.add_node("confirm", confirm_order)
graph.add_edge(START, "prepare")
graph.add_edge("prepare", "confirm")
graph.add_edge("confirm", END)

# Compile with interrupt BEFORE confirm node
checkpointer = MemorySaver()
app = graph.compile(
    checkpointer=checkpointer,
    interrupt_before=["confirm"]  # Pauses here for approval
)

# Run until interrupt
config = {"configurable": {"thread_id": "order-123"}}
result = app.invoke({"messages": [("user", "Mua áo sơ mi")]}, config)

# Check state (admin sees pending order)
state = app.get_state(config)
print(state.next)  # ('confirm',) - waiting at confirm node
print(state.values["order_details"])  # Order to review

# Resume after approval
app.invoke(None, config)  # Continues from interrupt

Limitations

  • Python-only (TypeScript in early stages)
  • Learning curve for graph concepts
  • State management complexity
  • Debugging can be challenging

Related Skills

Works well with: async-python-patterns (for async tools), fastapi (for API integration)