IPC Communication Skill
Expert guidance for implementing Inter-Process Communication between Rust Orchestrator and Platform Agents using JSON protocol via stdin/stdout.
Overview
WeReply uses JSON-based IPC protocol for communication:
-
Protocol: JSON messages via stdin/stdout
-
Direction: Bidirectional (Orchestrator ↔ Agent)
-
Message Format: Line-delimited JSON (one message per line)
-
Character Encoding: UTF-8
Message Protocol Design
Message Types
use serde::{Deserialize, Serialize}; use specta::Type;
// Agent → Orchestrator 消息 #[derive(Serialize, Deserialize, Type, Debug)] #[serde(tag = "type")] pub enum AgentMessage { MessageNew { content: String, sender: String, timestamp: String, }, CommandResponse { success: bool, #[serde(skip_serializing_if = "Option::is_none")] error: Option<String>, }, HealthStatus { status: String, // "ok", "degraded", "error" agent_type: String, // "windows_wxauto", "macos_accessibility" }, Heartbeat { timestamp: f64, }, Error { message: String, #[serde(skip_serializing_if = "Option::is_none")] code: Option<String>, }, }
// Orchestrator → Agent 消息 #[derive(Serialize, Deserialize, Type, Debug)] #[serde(tag = "type")] pub enum OrchestratorCommand { WriteInput { content: String, }, ClearInput, HealthCheck, Stop, }
Message Format Examples
// Agent → Orchestrator: 新消息 { "type": "MessageNew", "content": "你好,最近怎么样?", "sender": "张三", "timestamp": "2024-01-23T10:30:00" }
// Orchestrator → Agent: 写入输入框 { "type": "WriteInput", "content": "很好,谢谢!" }
// Agent → Orchestrator: 命令响应 { "type": "CommandResponse", "success": true }
// Agent → Orchestrator: 错误 { "type": "Error", "message": "无法访问微信窗口", "code": "ACCESS_DENIED" }
Rust Orchestrator Implementation
Agent Process Manager
use tokio::process::{Child, Command}; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use std::process::Stdio;
pub struct AgentProcess { child: Child, stdin: tokio::process::ChildStdin, stdout_reader: BufReader<tokio::process::ChildStdout>, }
impl AgentProcess { pub async fn spawn(agent_path: &str) -> anyhow::Result<Self> { let mut child = Command::new(agent_path) .stdin(Stdio::piped()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .spawn()?;
let stdin = child.stdin.take()
.ok_or_else(|| anyhow!("无法获取 Agent stdin"))?;
let stdout = child.stdout.take()
.ok_or_else(|| anyhow!("无法获取 Agent stdout"))?;
let stdout_reader = BufReader::new(stdout);
Ok(Self {
child,
stdin,
stdout_reader,
})
}
pub async fn send_command(&mut self, command: OrchestratorCommand) -> anyhow::Result<()> {
let json = serde_json::to_string(&command)?;
self.stdin.write_all(json.as_bytes()).await?;
self.stdin.write_all(b"\n").await?;
self.stdin.flush().await?;
Ok(())
}
pub async fn read_message(&mut self) -> anyhow::Result<AgentMessage> {
let mut line = String::new();
self.stdout_reader.read_line(&mut line).await?;
if line.is_empty() {
return Err(anyhow!("Agent 已关闭输出"));
}
let message = serde_json::from_str(&line)?;
Ok(message)
}
pub async fn kill(&mut self) -> anyhow::Result<()> {
self.child.kill().await?;
Ok(())
}
}
Message Handler Pattern
use tokio::sync::mpsc;
pub struct AgentHandler { agent: AgentProcess, message_tx: mpsc::UnboundedSender<AgentMessage>, }
impl AgentHandler { pub async fn start(agent_path: &str) -> anyhow::Result<(Self, mpsc::UnboundedReceiver<AgentMessage>)> { let agent = AgentProcess::spawn(agent_path).await?; let (message_tx, message_rx) = mpsc::unbounded_channel();
let handler = Self {
agent,
message_tx,
};
Ok((handler, message_rx))
}
pub async fn run(mut self) {
loop {
match self.agent.read_message().await {
Ok(message) => {
if self.message_tx.send(message).is_err() {
tracing::error!("消息接收器已关闭");
break;
}
}
Err(e) => {
tracing::error!(error = %e, "读取 Agent 消息失败");
break;
}
}
}
}
pub async fn send_command(&mut self, command: OrchestratorCommand) -> anyhow::Result<()> {
self.agent.send_command(command).await
}
}
Error Handling with Timeout
use tokio::time::{timeout, Duration};
pub async fn send_command_with_timeout( agent: &mut AgentProcess, command: OrchestratorCommand, timeout_secs: u64, ) -> anyhow::Result<()> { timeout( Duration::from_secs(timeout_secs), agent.send_command(command) ) .await .map_err(|_| anyhow!("发送命令超时"))? }
pub async fn read_message_with_timeout( agent: &mut AgentProcess, timeout_secs: u64, ) -> anyhow::Result<AgentMessage> { timeout( Duration::from_secs(timeout_secs), agent.read_message() ) .await .map_err(|_| anyhow!("读取消息超时"))? }
Python Agent Implementation
Message Sender Pattern
import json import sys from typing import Dict, Any
class MessageSender: """发送消息到 Rust Orchestrator (stdout)"""
@staticmethod
def send_message(message: Dict[str, Any]):
"""
发送 JSON 消息到 stdout
Args:
message: 消息字典,必须包含 'type' 字段
"""
json_str = json.dumps(message, ensure_ascii=False)
print(json_str, flush=True)
@staticmethod
def send_message_new(content: str, sender: str, timestamp: str):
"""发送新消息通知"""
message = {
"type": "MessageNew",
"content": content,
"sender": sender,
"timestamp": timestamp
}
MessageSender.send_message(message)
@staticmethod
def send_command_response(success: bool, error: str = None):
"""发送命令响应"""
message = {
"type": "CommandResponse",
"success": success
}
if error:
message["error"] = error
MessageSender.send_message(message)
@staticmethod
def send_error(error_message: str, code: str = None):
"""发送错误"""
message = {
"type": "Error",
"message": error_message
}
if code:
message["code"] = code
MessageSender.send_message(message)
@staticmethod
def send_heartbeat(timestamp: float):
"""发送心跳"""
message = {
"type": "Heartbeat",
"timestamp": timestamp
}
MessageSender.send_message(message)
Command Receiver Pattern
import threading from typing import Callable, Dict, Any
class CommandReceiver: """从 Rust Orchestrator 接收命令 (stdin)"""
def __init__(self):
self.handlers: Dict[str, Callable] = {}
self.running = True
def register_handler(self, command_type: str, handler: Callable):
"""注册命令处理器"""
self.handlers[command_type] = handler
def start_listening(self):
"""开始监听命令(阻塞)"""
try:
for line in sys.stdin:
if not self.running:
break
try:
command = json.loads(line.strip())
self.handle_command(command)
except json.JSONDecodeError as e:
MessageSender.send_error(f"JSON 解析失败: {str(e)}", "JSON_PARSE_ERROR")
except Exception as e:
MessageSender.send_error(f"处理命令失败: {str(e)}", "COMMAND_HANDLER_ERROR")
except KeyboardInterrupt:
pass
except Exception as e:
MessageSender.send_error(f"命令监听错误: {str(e)}", "LISTENER_ERROR")
def start_listening_async(self):
"""在后台线程中监听命令"""
thread = threading.Thread(target=self.start_listening, daemon=True)
thread.start()
return thread
def handle_command(self, command: Dict[str, Any]):
"""处理收到的命令"""
command_type = command.get('type')
if command_type not in self.handlers:
MessageSender.send_error(f"未知命令类型: {command_type}", "UNKNOWN_COMMAND")
return
try:
handler = self.handlers[command_type]
result = handler(command)
# 如果处理器返回 True,发送成功响应
if result is True or result is None:
MessageSender.send_command_response(success=True)
elif result is False:
MessageSender.send_command_response(success=False, error="处理失败")
except Exception as e:
MessageSender.send_command_response(success=False, error=str(e))
def stop(self):
"""停止监听"""
self.running = False
Complete Agent Example
from wechat_monitor import WeChatMonitor from input_writer import WeChatInputWriter
class Agent: def init(self): self.monitor = WeChatMonitor() self.input_writer = WeChatInputWriter() self.command_receiver = CommandReceiver()
# 注册命令处理器
self.command_receiver.register_handler("WriteInput", self.handle_write_input)
self.command_receiver.register_handler("ClearInput", self.handle_clear_input)
self.command_receiver.register_handler("HealthCheck", self.handle_health_check)
self.command_receiver.register_handler("Stop", self.handle_stop)
def handle_write_input(self, command: Dict[str, Any]) -> bool:
"""处理写入输入框命令"""
content = command.get('content', '')
return self.input_writer.write_to_input(content)
def handle_clear_input(self, command: Dict[str, Any]) -> bool:
"""处理清空输入框命令"""
return self.input_writer.clear_input()
def handle_health_check(self, command: Dict[str, Any]) -> bool:
"""处理健康检查"""
message = {
"type": "HealthStatus",
"status": "ok",
"agent_type": "windows_wxauto"
}
MessageSender.send_message(message)
return None # 不发送 CommandResponse
def handle_stop(self, command: Dict[str, Any]) -> bool:
"""处理停止命令"""
self.command_receiver.stop()
return True
def run(self):
"""启动 Agent"""
# 启动命令监听(后台线程)
self.command_receiver.start_listening_async()
# 启动微信监听(主线程,阻塞)
self.monitor.start_monitoring()
if name == 'main': agent = Agent() agent.run()
Swift Agent Implementation
Message Sender Pattern
import Foundation
class MessageSender { static func sendMessage(_ message: [String: Any]) { guard let jsonData = try? JSONSerialization.data(withJSONObject: message), let jsonString = String(data: jsonData, encoding: .utf8) else { return }
print(jsonString)
fflush(stdout)
}
static func sendMessageNew(content: String, sender: String, timestamp: String) {
let message: [String: Any] = [
"type": "MessageNew",
"content": content,
"sender": sender,
"timestamp": timestamp
]
sendMessage(message)
}
static func sendCommandResponse(success: Bool, error: String? = nil) {
var message: [String: Any] = [
"type": "CommandResponse",
"success": success
]
if let error = error {
message["error"] = error
}
sendMessage(message)
}
static func sendError(message errorMessage: String, code: String? = nil) {
var message: [String: Any] = [
"type": "Error",
"message": errorMessage
]
if let code = code {
message["code"] = code
}
sendMessage(message)
}
}
Command Receiver Pattern
class CommandReceiver { private var handlers: [String: (([String: Any]) -> Bool)] = [:] private var running = true
func registerHandler(commandType: String, handler: @escaping ([String: Any]) -> Bool) {
handlers[commandType] = handler
}
func startListening() {
while running {
guard let line = readLine() else {
break
}
guard let data = line.data(using: .utf8),
let command = try? JSONSerialization.jsonObject(with: data) as? [String: Any] else {
MessageSender.sendError(message: "JSON 解析失败", code: "JSON_PARSE_ERROR")
continue
}
handleCommand(command)
}
}
func startListeningAsync() {
DispatchQueue.global(qos: .userInitiated).async {
self.startListening()
}
}
private func handleCommand(_ command: [String: Any]) {
guard let commandType = command["type"] as? String else {
MessageSender.sendError(message: "命令缺少 type 字段", code: "INVALID_COMMAND")
return
}
guard let handler = handlers[commandType] else {
MessageSender.sendError(message: "未知命令类型: \(commandType)", code: "UNKNOWN_COMMAND")
return
}
let success = handler(command)
MessageSender.sendCommandResponse(success: success)
}
func stop() {
running = false
}
}
Message Validation
Rust Validation
pub fn validate_agent_message(message: &AgentMessage) -> Result<(), String> { match message { AgentMessage::MessageNew { content, sender, .. } => { if content.is_empty() { return Err("消息内容为空".to_string()); } if sender.is_empty() { return Err("发送者为空".to_string()); } if content.len() > 100000 { return Err("消息内容过长".to_string()); } } AgentMessage::Error { message, .. } => { if message.is_empty() { return Err("错误消息为空".to_string()); } } _ => {} } Ok(()) }
Python Validation
def validate_command(command: Dict[str, Any]) -> bool: """验证命令格式""" # 检查必需字段 if 'type' not in command: return False
cmd_type = command['type']
# 验证特定命令的字段
if cmd_type == 'WriteInput':
if 'content' not in command:
return False
content = command['content']
if len(content) > 10000: # 最大 10KB
return False
return True
Performance Optimization
Batch Message Processing
pub struct MessageBatcher { buffer: Vec<AgentMessage>, max_batch_size: usize, flush_interval: Duration, last_flush: Instant, }
impl MessageBatcher { pub fn new(max_batch_size: usize, flush_interval: Duration) -> Self { Self { buffer: Vec::new(), max_batch_size, flush_interval, last_flush: Instant::now(), } }
pub fn add_message(&mut self, message: AgentMessage) -> Option<Vec<AgentMessage>> {
self.buffer.push(message);
// 达到批量大小或超时,返回批次
if self.buffer.len() >= self.max_batch_size
|| self.last_flush.elapsed() >= self.flush_interval
{
return Some(self.flush());
}
None
}
pub fn flush(&mut self) -> Vec<AgentMessage> {
let messages = std::mem::take(&mut self.buffer);
self.last_flush = Instant::now();
messages
}
}
Async Message Queue
use tokio::sync::mpsc;
pub struct AsyncMessageQueue { tx: mpsc::UnboundedSender<AgentMessage>, rx: mpsc::UnboundedReceiver<AgentMessage>, }
impl AsyncMessageQueue { pub fn new() -> Self { let (tx, rx) = mpsc::unbounded_channel(); Self { tx, rx } }
pub fn sender(&self) -> mpsc::UnboundedSender<AgentMessage> {
self.tx.clone()
}
pub async fn receive(&mut self) -> Option<AgentMessage> {
self.rx.recv().await
}
}
Testing
Mock Agent Process
#[cfg(test)] mod tests { use super::*; use tokio::io::{duplex, AsyncWriteExt};
#[tokio::test]
async fn test_send_command() {
let (mut client, mut server) = duplex(1024);
// 模拟发送命令
let command = OrchestratorCommand::WriteInput {
content: "测试内容".to_string(),
};
let json = serde_json::to_string(&command).unwrap();
client.write_all(json.as_bytes()).await.unwrap();
client.write_all(b"\n").await.unwrap();
// 模拟接收
let mut buf = String::new();
use tokio::io::AsyncBufReadExt;
let mut reader = BufReader::new(server);
reader.read_line(&mut buf).await.unwrap();
let received: OrchestratorCommand = serde_json::from_str(&buf).unwrap();
match received {
OrchestratorCommand::WriteInput { content } => {
assert_eq!(content, "测试内容");
}
_ => panic!("错误的命令类型"),
}
}
}
When to Use This Skill
Activate this skill when:
-
Implementing IPC between Orchestrator and Agents
-
Designing message protocols
-
Working with stdin/stdout communication
-
Handling JSON serialization/deserialization
-
Implementing message validation
-
Optimizing IPC performance
-
Testing Agent communication
-
Debugging IPC issues