AgentSkillsCN

execution-engine-analysis

分析代理框架中的控制流、并发模型和事件架构。适用于(1)理解异步与同步执行模式,(2)分类执行拓扑(DAG/FSM/线性),(3)映射事件发射和可观测性钩子,(4)评估可扩展性特征,或(5)比较不同框架的执行模型。

SKILL.md
--- frontmatter
name: execution-engine-analysis
description: Analyze control flow, concurrency models, and event architectures in agent frameworks. Use when (1) understanding async vs sync execution patterns, (2) classifying execution topology (DAG/FSM/Linear), (3) mapping event emission and observability hooks, (4) evaluating scalability characteristics, or (5) comparing execution models across frameworks.

Execution Engine Analysis

Analyzes the control flow substrate and concurrency model.

Process

  1. Identify async model — Native async, sync-with-wrappers, or hybrid
  2. Classify topology — DAG, FSM, or linear chain
  3. Catalog events — Callbacks, listeners, generators
  4. Map observability — Pre/post hooks, interception points

Concurrency Model Classification

Native Async

python
# Signature: async/await throughout
async def run(self):
    result = await self.llm.agenerate(messages)
    return await self.process(result)

# Entry point uses asyncio
asyncio.run(agent.run())

Indicators: async def, await, asyncio.gather, aiohttp

Sync with Wrappers

python
# Signature: sync API wrapping async internals
def run(self):
    return asyncio.run(self._async_run())

# Or using thread pools
def run(self):
    with ThreadPoolExecutor() as pool:
        future = pool.submit(self._blocking_call)
        return future.result()

Indicators: asyncio.run() inside sync methods, ThreadPoolExecutor, run_in_executor

Hybrid

python
# Both sync and async APIs exposed
def invoke(self, input):
    return self._sync_invoke(input)

async def ainvoke(self, input):
    return await self._async_invoke(input)

Indicators: Paired methods (invoke/ainvoke), sync_to_async decorators

Execution Topology

DAG (Directed Acyclic Graph)

python
# Signature: Nodes with dependencies
class Node:
    def __init__(self, deps: list[Node]): ...

graph.add_edge(node_a, node_b)
result = graph.execute()  # Topological order

Indicators: Graph, Node, Edge classes, networkx, topological sort

FSM (Finite State Machine)

python
# Signature: Explicit states and transitions
class State(Enum):
    THINKING = "thinking"
    ACTING = "acting"
    DONE = "done"

def transition(self, current: State, event: str) -> State:
    if current == State.THINKING and event == "action_chosen":
        return State.ACTING

Indicators: State enums, transition tables, current_state, state machine libraries

Linear Chain

python
# Signature: Sequential step execution
def run(self):
    result = self.step1()
    result = self.step2(result)
    result = self.step3(result)
    return result

# Or pipeline pattern
chain = step1 | step2 | step3
result = chain.invoke(input)

Indicators: Sequential calls, pipe operators (|), Chain, Pipeline classes

Event Architecture

Callbacks

python
class Callbacks:
    def on_llm_start(self, prompt): ...
    def on_llm_end(self, response): ...
    def on_tool_start(self, tool, input): ...
    def on_tool_end(self, output): ...
    def on_error(self, error): ...

Flexibility: Low — fixed hook points Traceability: Medium — easy to follow

Event Listeners/Emitters

python
emitter = EventEmitter()
emitter.on('llm:start', handler)
emitter.on('tool:*', wildcard_handler)
emitter.emit('llm:start', {'prompt': prompt})

Flexibility: High — dynamic registration Traceability: Low — harder to trace

Async Generators (Streaming)

python
async def run(self):
    async for chunk in self.llm.astream(prompt):
        yield {"type": "token", "content": chunk}
    yield {"type": "done"}

Flexibility: Medium — streaming-native Traceability: High — follows data flow

Observability Hooks Inventory

Hook PointPurposeInterception Level
Pre-LLMModify promptInput
Post-LLMAccess raw responseOutput
Pre-ToolValidate tool inputInput
Post-ToolTransform tool outputOutput
Pre-StepObserve stateRead-only
Post-StepModify next stepControl flow
On-ErrorHandle/transformError

Questions to Answer

  • Can you intercept tool input before execution?
  • Is the raw LLM response accessible (with token counts)?
  • Can you modify control flow from hooks?
  • Are hooks sync or async?

Output Template

markdown
## Execution Engine Analysis: [Framework Name]

### Concurrency Model
- **Type**: [Native Async / Sync-with-Wrappers / Hybrid]
- **Entry Point**: `path/to/main.py:run()`
- **Thread Safety**: [Yes/No/Partial]

### Execution Topology
- **Model**: [DAG / FSM / Linear Chain]
- **Implementation**: [Description with code refs]
- **Parallelization**: [Supported/Not Supported]

### Event Architecture
- **Pattern**: [Callbacks / Listeners / Generators]
- **Registration**: [Static / Dynamic]
- **Streaming**: [Supported / Not Supported]

### Observability Inventory

| Hook | Location | Async | Modifiable |
|------|----------|-------|------------|
| on_llm_start | callbacks.py:L23 | Yes | Input only |
| on_tool_end | callbacks.py:L45 | Yes | Output |
| ... | ... | ... | ... |

### Scalability Assessment
- **Blocking Operations**: [List any]
- **Resource Limits**: [Token counters, rate limits]
- **Recommended Concurrency**: [Threads/Processes/AsyncIO]

Integration

  • Prerequisite: codebase-mapping to identify execution files
  • Feeds into: comparative-matrix for async decisions
  • Related: control-loop-extraction for agent-specific flow