IPC Communication Skill
Expert guidance for implementing Inter-Process Communication between Rust Orchestrator and Platform Agents using JSON protocol via stdin/stdout.
Overview
WeReply uses JSON-based IPC protocol for communication:
- •Protocol: JSON messages via stdin/stdout
- •Direction: Bidirectional (Orchestrator ↔ Agent)
- •Message Format: Line-delimited JSON (one message per line)
- •Character Encoding: UTF-8
Message Protocol Design
Message Types
rust
use serde::{Deserialize, Serialize};
use specta::Type;
// Agent → Orchestrator 消息
#[derive(Serialize, Deserialize, Type, Debug)]
#[serde(tag = "type")]
pub enum AgentMessage {
MessageNew {
content: String,
sender: String,
timestamp: String,
},
CommandResponse {
success: bool,
#[serde(skip_serializing_if = "Option::is_none")]
error: Option<String>,
},
HealthStatus {
status: String, // "ok", "degraded", "error"
agent_type: String, // "windows_wxauto", "macos_accessibility"
},
Heartbeat {
timestamp: f64,
},
Error {
message: String,
#[serde(skip_serializing_if = "Option::is_none")]
code: Option<String>,
},
}
// Orchestrator → Agent 消息
#[derive(Serialize, Deserialize, Type, Debug)]
#[serde(tag = "type")]
pub enum OrchestratorCommand {
WriteInput {
content: String,
},
ClearInput,
HealthCheck,
Stop,
}
Message Format Examples
json
// Agent → Orchestrator: 新消息
{
"type": "MessageNew",
"content": "你好,最近怎么样?",
"sender": "张三",
"timestamp": "2024-01-23T10:30:00"
}
// Orchestrator → Agent: 写入输入框
{
"type": "WriteInput",
"content": "很好,谢谢!"
}
// Agent → Orchestrator: 命令响应
{
"type": "CommandResponse",
"success": true
}
// Agent → Orchestrator: 错误
{
"type": "Error",
"message": "无法访问微信窗口",
"code": "ACCESS_DENIED"
}
Rust Orchestrator Implementation
Agent Process Manager
rust
use tokio::process::{Child, Command};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use std::process::Stdio;
pub struct AgentProcess {
child: Child,
stdin: tokio::process::ChildStdin,
stdout_reader: BufReader<tokio::process::ChildStdout>,
}
impl AgentProcess {
pub async fn spawn(agent_path: &str) -> anyhow::Result<Self> {
let mut child = Command::new(agent_path)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()?;
let stdin = child.stdin.take()
.ok_or_else(|| anyhow!("无法获取 Agent stdin"))?;
let stdout = child.stdout.take()
.ok_or_else(|| anyhow!("无法获取 Agent stdout"))?;
let stdout_reader = BufReader::new(stdout);
Ok(Self {
child,
stdin,
stdout_reader,
})
}
pub async fn send_command(&mut self, command: OrchestratorCommand) -> anyhow::Result<()> {
let json = serde_json::to_string(&command)?;
self.stdin.write_all(json.as_bytes()).await?;
self.stdin.write_all(b"\n").await?;
self.stdin.flush().await?;
Ok(())
}
pub async fn read_message(&mut self) -> anyhow::Result<AgentMessage> {
let mut line = String::new();
self.stdout_reader.read_line(&mut line).await?;
if line.is_empty() {
return Err(anyhow!("Agent 已关闭输出"));
}
let message = serde_json::from_str(&line)?;
Ok(message)
}
pub async fn kill(&mut self) -> anyhow::Result<()> {
self.child.kill().await?;
Ok(())
}
}
Message Handler Pattern
rust
use tokio::sync::mpsc;
pub struct AgentHandler {
agent: AgentProcess,
message_tx: mpsc::UnboundedSender<AgentMessage>,
}
impl AgentHandler {
pub async fn start(agent_path: &str) -> anyhow::Result<(Self, mpsc::UnboundedReceiver<AgentMessage>)> {
let agent = AgentProcess::spawn(agent_path).await?;
let (message_tx, message_rx) = mpsc::unbounded_channel();
let handler = Self {
agent,
message_tx,
};
Ok((handler, message_rx))
}
pub async fn run(mut self) {
loop {
match self.agent.read_message().await {
Ok(message) => {
if self.message_tx.send(message).is_err() {
tracing::error!("消息接收器已关闭");
break;
}
}
Err(e) => {
tracing::error!(error = %e, "读取 Agent 消息失败");
break;
}
}
}
}
pub async fn send_command(&mut self, command: OrchestratorCommand) -> anyhow::Result<()> {
self.agent.send_command(command).await
}
}
Error Handling with Timeout
rust
use tokio::time::{timeout, Duration};
pub async fn send_command_with_timeout(
agent: &mut AgentProcess,
command: OrchestratorCommand,
timeout_secs: u64,
) -> anyhow::Result<()> {
timeout(
Duration::from_secs(timeout_secs),
agent.send_command(command)
)
.await
.map_err(|_| anyhow!("发送命令超时"))?
}
pub async fn read_message_with_timeout(
agent: &mut AgentProcess,
timeout_secs: u64,
) -> anyhow::Result<AgentMessage> {
timeout(
Duration::from_secs(timeout_secs),
agent.read_message()
)
.await
.map_err(|_| anyhow!("读取消息超时"))?
}
Python Agent Implementation
Message Sender Pattern
python
import json
import sys
from typing import Dict, Any
class MessageSender:
"""发送消息到 Rust Orchestrator (stdout)"""
@staticmethod
def send_message(message: Dict[str, Any]):
"""
发送 JSON 消息到 stdout
Args:
message: 消息字典,必须包含 'type' 字段
"""
json_str = json.dumps(message, ensure_ascii=False)
print(json_str, flush=True)
@staticmethod
def send_message_new(content: str, sender: str, timestamp: str):
"""发送新消息通知"""
message = {
"type": "MessageNew",
"content": content,
"sender": sender,
"timestamp": timestamp
}
MessageSender.send_message(message)
@staticmethod
def send_command_response(success: bool, error: str = None):
"""发送命令响应"""
message = {
"type": "CommandResponse",
"success": success
}
if error:
message["error"] = error
MessageSender.send_message(message)
@staticmethod
def send_error(error_message: str, code: str = None):
"""发送错误"""
message = {
"type": "Error",
"message": error_message
}
if code:
message["code"] = code
MessageSender.send_message(message)
@staticmethod
def send_heartbeat(timestamp: float):
"""发送心跳"""
message = {
"type": "Heartbeat",
"timestamp": timestamp
}
MessageSender.send_message(message)
Command Receiver Pattern
python
import threading
from typing import Callable, Dict, Any
class CommandReceiver:
"""从 Rust Orchestrator 接收命令 (stdin)"""
def __init__(self):
self.handlers: Dict[str, Callable] = {}
self.running = True
def register_handler(self, command_type: str, handler: Callable):
"""注册命令处理器"""
self.handlers[command_type] = handler
def start_listening(self):
"""开始监听命令(阻塞)"""
try:
for line in sys.stdin:
if not self.running:
break
try:
command = json.loads(line.strip())
self.handle_command(command)
except json.JSONDecodeError as e:
MessageSender.send_error(f"JSON 解析失败: {str(e)}", "JSON_PARSE_ERROR")
except Exception as e:
MessageSender.send_error(f"处理命令失败: {str(e)}", "COMMAND_HANDLER_ERROR")
except KeyboardInterrupt:
pass
except Exception as e:
MessageSender.send_error(f"命令监听错误: {str(e)}", "LISTENER_ERROR")
def start_listening_async(self):
"""在后台线程中监听命令"""
thread = threading.Thread(target=self.start_listening, daemon=True)
thread.start()
return thread
def handle_command(self, command: Dict[str, Any]):
"""处理收到的命令"""
command_type = command.get('type')
if command_type not in self.handlers:
MessageSender.send_error(f"未知命令类型: {command_type}", "UNKNOWN_COMMAND")
return
try:
handler = self.handlers[command_type]
result = handler(command)
# 如果处理器返回 True,发送成功响应
if result is True or result is None:
MessageSender.send_command_response(success=True)
elif result is False:
MessageSender.send_command_response(success=False, error="处理失败")
except Exception as e:
MessageSender.send_command_response(success=False, error=str(e))
def stop(self):
"""停止监听"""
self.running = False
Complete Agent Example
python
from wechat_monitor import WeChatMonitor
from input_writer import WeChatInputWriter
class Agent:
def __init__(self):
self.monitor = WeChatMonitor()
self.input_writer = WeChatInputWriter()
self.command_receiver = CommandReceiver()
# 注册命令处理器
self.command_receiver.register_handler("WriteInput", self.handle_write_input)
self.command_receiver.register_handler("ClearInput", self.handle_clear_input)
self.command_receiver.register_handler("HealthCheck", self.handle_health_check)
self.command_receiver.register_handler("Stop", self.handle_stop)
def handle_write_input(self, command: Dict[str, Any]) -> bool:
"""处理写入输入框命令"""
content = command.get('content', '')
return self.input_writer.write_to_input(content)
def handle_clear_input(self, command: Dict[str, Any]) -> bool:
"""处理清空输入框命令"""
return self.input_writer.clear_input()
def handle_health_check(self, command: Dict[str, Any]) -> bool:
"""处理健康检查"""
message = {
"type": "HealthStatus",
"status": "ok",
"agent_type": "windows_wxauto"
}
MessageSender.send_message(message)
return None # 不发送 CommandResponse
def handle_stop(self, command: Dict[str, Any]) -> bool:
"""处理停止命令"""
self.command_receiver.stop()
return True
def run(self):
"""启动 Agent"""
# 启动命令监听(后台线程)
self.command_receiver.start_listening_async()
# 启动微信监听(主线程,阻塞)
self.monitor.start_monitoring()
if __name__ == '__main__':
agent = Agent()
agent.run()
Swift Agent Implementation
Message Sender Pattern
swift
import Foundation
class MessageSender {
static func sendMessage(_ message: [String: Any]) {
guard let jsonData = try? JSONSerialization.data(withJSONObject: message),
let jsonString = String(data: jsonData, encoding: .utf8) else {
return
}
print(jsonString)
fflush(stdout)
}
static func sendMessageNew(content: String, sender: String, timestamp: String) {
let message: [String: Any] = [
"type": "MessageNew",
"content": content,
"sender": sender,
"timestamp": timestamp
]
sendMessage(message)
}
static func sendCommandResponse(success: Bool, error: String? = nil) {
var message: [String: Any] = [
"type": "CommandResponse",
"success": success
]
if let error = error {
message["error"] = error
}
sendMessage(message)
}
static func sendError(message errorMessage: String, code: String? = nil) {
var message: [String: Any] = [
"type": "Error",
"message": errorMessage
]
if let code = code {
message["code"] = code
}
sendMessage(message)
}
}
Command Receiver Pattern
swift
class CommandReceiver {
private var handlers: [String: (([String: Any]) -> Bool)] = [:]
private var running = true
func registerHandler(commandType: String, handler: @escaping ([String: Any]) -> Bool) {
handlers[commandType] = handler
}
func startListening() {
while running {
guard let line = readLine() else {
break
}
guard let data = line.data(using: .utf8),
let command = try? JSONSerialization.jsonObject(with: data) as? [String: Any] else {
MessageSender.sendError(message: "JSON 解析失败", code: "JSON_PARSE_ERROR")
continue
}
handleCommand(command)
}
}
func startListeningAsync() {
DispatchQueue.global(qos: .userInitiated).async {
self.startListening()
}
}
private func handleCommand(_ command: [String: Any]) {
guard let commandType = command["type"] as? String else {
MessageSender.sendError(message: "命令缺少 type 字段", code: "INVALID_COMMAND")
return
}
guard let handler = handlers[commandType] else {
MessageSender.sendError(message: "未知命令类型: \(commandType)", code: "UNKNOWN_COMMAND")
return
}
let success = handler(command)
MessageSender.sendCommandResponse(success: success)
}
func stop() {
running = false
}
}
Message Validation
Rust Validation
rust
pub fn validate_agent_message(message: &AgentMessage) -> Result<(), String> {
match message {
AgentMessage::MessageNew { content, sender, .. } => {
if content.is_empty() {
return Err("消息内容为空".to_string());
}
if sender.is_empty() {
return Err("发送者为空".to_string());
}
if content.len() > 100000 {
return Err("消息内容过长".to_string());
}
}
AgentMessage::Error { message, .. } => {
if message.is_empty() {
return Err("错误消息为空".to_string());
}
}
_ => {}
}
Ok(())
}
Python Validation
python
def validate_command(command: Dict[str, Any]) -> bool:
"""验证命令格式"""
# 检查必需字段
if 'type' not in command:
return False
cmd_type = command['type']
# 验证特定命令的字段
if cmd_type == 'WriteInput':
if 'content' not in command:
return False
content = command['content']
if len(content) > 10000: # 最大 10KB
return False
return True
Performance Optimization
Batch Message Processing
rust
pub struct MessageBatcher {
buffer: Vec<AgentMessage>,
max_batch_size: usize,
flush_interval: Duration,
last_flush: Instant,
}
impl MessageBatcher {
pub fn new(max_batch_size: usize, flush_interval: Duration) -> Self {
Self {
buffer: Vec::new(),
max_batch_size,
flush_interval,
last_flush: Instant::now(),
}
}
pub fn add_message(&mut self, message: AgentMessage) -> Option<Vec<AgentMessage>> {
self.buffer.push(message);
// 达到批量大小或超时,返回批次
if self.buffer.len() >= self.max_batch_size
|| self.last_flush.elapsed() >= self.flush_interval
{
return Some(self.flush());
}
None
}
pub fn flush(&mut self) -> Vec<AgentMessage> {
let messages = std::mem::take(&mut self.buffer);
self.last_flush = Instant::now();
messages
}
}
Async Message Queue
rust
use tokio::sync::mpsc;
pub struct AsyncMessageQueue {
tx: mpsc::UnboundedSender<AgentMessage>,
rx: mpsc::UnboundedReceiver<AgentMessage>,
}
impl AsyncMessageQueue {
pub fn new() -> Self {
let (tx, rx) = mpsc::unbounded_channel();
Self { tx, rx }
}
pub fn sender(&self) -> mpsc::UnboundedSender<AgentMessage> {
self.tx.clone()
}
pub async fn receive(&mut self) -> Option<AgentMessage> {
self.rx.recv().await
}
}
Testing
Mock Agent Process
rust
#[cfg(test)]
mod tests {
use super::*;
use tokio::io::{duplex, AsyncWriteExt};
#[tokio::test]
async fn test_send_command() {
let (mut client, mut server) = duplex(1024);
// 模拟发送命令
let command = OrchestratorCommand::WriteInput {
content: "测试内容".to_string(),
};
let json = serde_json::to_string(&command).unwrap();
client.write_all(json.as_bytes()).await.unwrap();
client.write_all(b"\n").await.unwrap();
// 模拟接收
let mut buf = String::new();
use tokio::io::AsyncBufReadExt;
let mut reader = BufReader::new(server);
reader.read_line(&mut buf).await.unwrap();
let received: OrchestratorCommand = serde_json::from_str(&buf).unwrap();
match received {
OrchestratorCommand::WriteInput { content } => {
assert_eq!(content, "测试内容");
}
_ => panic!("错误的命令类型"),
}
}
}
When to Use This Skill
Activate this skill when:
- •Implementing IPC between Orchestrator and Agents
- •Designing message protocols
- •Working with stdin/stdout communication
- •Handling JSON serialization/deserialization
- •Implementing message validation
- •Optimizing IPC performance
- •Testing Agent communication
- •Debugging IPC issues