AgentSkillsCN

LangGraph Multi-Branch Workflow

借助 LangGraph StateGraph,实现并行分支执行与聚合处理的模式

SKILL.md
--- frontmatter
name: LangGraph Multi-Branch Workflow
description: Pattern for parallel branch execution with aggregation using LangGraph StateGraph

LangGraph Multi-Branch Workflow Pattern

병렬 브랜치 실행 및 집계를 위한 LangGraph StateGraph 패턴. GCRI의 핵심 워크플로우 구조.

2-Phase 아키텍처 개요

GCRI는 Hypothesis Generation → Aggregation → Verification 2단계 구조를 사용:

mermaid
graph LR
    subgraph "Phase 1: Hypothesis Generation"
        A[sample_strategies] --> B[map_hypothesis_branches]
        B --> C1[hypothesis + refine]
        B --> C2[hypothesis + refine]
    end
    
    subgraph "Phase 2: Aggregation"
        C1 --> D[aggregate_hypotheses]
        C2 --> D
    end
    
    subgraph "Phase 3: Verification"
        D --> E[map_verification_branches]
        E --> F1[verify_aggregated]
        E --> F2[verify_aggregated]
    end
    
    subgraph "Phase 4: Decision"
        F1 --> G[collect_verification]
        F2 --> G
        G --> H[decide]
        H --> I[update_memory]
    end

핵심 구조

python
from langgraph.constants import START, END
from langgraph.graph import StateGraph
from langgraph.types import Send

# 1. 메인 그래프와 브랜치 서브그래프 분리
graph = StateGraph(TaskState)

# 2. Phase 1: Hypothesis 생성 서브그래프
hypothesis_branch = StateGraph(BranchState)
hypothesis_branch.add_node('hypothesis', hypothesis_func)
hypothesis_branch.add_node('reasoning_and_refine', refine_func)
hypothesis_branch.add_edge(START, 'hypothesis')
hypothesis_branch.add_edge('hypothesis', 'reasoning_and_refine')
hypothesis_branch.add_edge('reasoning_and_refine', END)
hypothesis_workflow = hypothesis_branch.compile()

# 3. Phase 3: Verification 서브그래프
verification_branch = StateGraph(VerificationBranchState)
verification_branch.add_node('verify', verify_func)
verification_branch.add_edge(START, 'verify')
verification_branch.add_edge('verify', END)
verification_workflow = verification_branch.compile()

# 4. 메인 그래프 구성
graph.add_node('sample_strategies', sample_strategies_func)
graph.add_node('hypothesis_executor', hypothesis_workflow)
graph.add_node('aggregate_hypotheses', aggregate_hypotheses_func)
graph.add_node('verification_executor', verification_workflow)
graph.add_node('collect_verification', collect_verification_func)
graph.add_node('decide', decide_func)
graph.add_node('update_memory', update_memory_func)

# 5. 엣지 연결
graph.add_edge(START, 'sample_strategies')
graph.add_conditional_edges('sample_strategies', map_hypothesis_branches, ['hypothesis_executor'])
graph.add_edge('hypothesis_executor', 'aggregate_hypotheses')
graph.add_conditional_edges('aggregate_hypotheses', map_verification_branches, ['verification_executor'])
graph.add_edge('verification_executor', 'collect_verification')
graph.add_edge('collect_verification', 'decide')
graph.add_edge('decide', 'update_memory')
graph.add_edge('update_memory', END)

State 정의 패턴

python
from pydantic import BaseModel, Field
from typing import List, Optional, Annotated, Dict
import operator

# Phase 1 결과
class RawHypothesis(BaseModel):
    index: int
    strategy_name: str
    strategy_description: str
    hypothesis: str
    reasoning: str
    container_id: str

# Phase 2 결과 (Aggregation 후)
class AggregatedBranch(BaseModel):
    index: int
    combined_hypothesis: str
    source_indices: List[int]
    merge_reasoning: str

class AggregationResult(BaseModel):
    branches: List[AggregatedBranch]
    discarded_indices: List[int] = Field(default_factory=list)
    aggregation_summary: str

# 브랜치 상태
class BranchState(BaseModel):
    index: int
    strategy: Strategy
    container_id: str
    hypothesis: Optional[str] = None
    reasoning: Optional[str] = None

class VerificationBranchState(BaseModel):
    index: int
    aggregated_branch: AggregatedBranch
    container_id: str

# 메인 상태
class TaskState(BaseModel):
    task: str
    strategies: List[Strategy] = Field(default_factory=list)
    # Annotated로 리스트 병합 연산자 지정
    raw_hypotheses: Annotated[List[RawHypothesis], operator.add] = Field(default_factory=list)
    aggregated_branches: List[AggregatedBranch] = Field(default_factory=list)
    verification_container_map: Dict[int, str] = Field(default_factory=dict)
    results: Annotated[List[HypothesisResult], operator.add] = Field(default_factory=list)
    final_decision: Optional[bool] = None

병렬 브랜치 매핑

Phase 1: Hypothesis 브랜치 생성

python
def map_hypothesis_branches(state: TaskState):
    """Send 객체 리스트를 반환하여 병렬 hypothesis 브랜치 실행"""
    sends = []
    for index, strategy in enumerate(state.strategies):
        container_id = sandbox.setup_branch(state.count, index)
        sends.append(Send('hypothesis_executor', {
            'index': index,
            'strategy': strategy.model_dump(),
            'container_id': container_id,
            'task': state.task
        }))
    return sends

Phase 3: Verification 브랜치 생성

python
def map_verification_branches(state: TaskState):
    """Aggregation 결과로부터 verification 브랜치 생성"""
    sends = []
    for branch in state.aggregated_branches:
        container_id = state.verification_container_map[branch.index]
        sends.append(Send('verification_executor', {
            'index': branch.index,
            'aggregated_branch': branch.model_dump(),
            'container_id': container_id,
            'task': state.task
        }))
    return sends

Aggregation 패턴

python
def aggregate_hypotheses(state: TaskState):
    """RawHypothesis들을 AggregatedBranch로 병합"""
    # 1. HypothesisAggregator로 병합 수행
    aggregation_result = aggregator.aggregate(state)
    
    # 2. 검증용 컨테이너 매핑 생성
    source_containers = {hyp.index: hyp.container_id for hyp in state.raw_hypotheses}
    verification_containers = sandbox.setup_verification_branches(
        state.count, aggregation_result.branches, source_containers
    )
    
    return {
        'aggregated_branches': aggregation_result.branches,
        'verification_container_map': verification_containers
    }

컨테이너 재사용 전략

시나리오처리 방식
단일 소스 브랜치기존 컨테이너 재사용
다중 소스 병합첫 번째 소스 컨테이너 사용 (또는 새 컨테이너 생성)

사용 시점

  • 동일 작업을 여러 전략으로 병렬 시도할 때
  • 가설을 집계/병합하여 중복을 제거할 때
  • 집계된 가설만 검증하여 리소스를 절약할 때
  • 메인 코디네이터가 병렬 워커를 2-phase로 제어할 때

참고 파일

  • gcri/graphs/gcri_unit.py - GCRI 클래스의 워크플로우 구성
  • gcri/graphs/states.py - TaskState, BranchState, VerificationBranchState 정의
  • gcri/graphs/schemas.py - RawHypothesis, AggregatedBranch, AggregationResult 정의
  • gcri/graphs/generators.py - BranchesGeneratorProtocol 및 구현체
  • gcri/graphs/aggregator.py - HypothesisAggregator 클래스