Files
realizemultiagent/scripts/message-router.js
2026-04-02 00:59:42 +08:00

369 lines
10 KiB
JavaScript
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// 消息路由系统 - 基础版本
// 文件位置: D:\openclaw-multi-agent\scripts\message-router.js
// 功能: 基于文件系统的简单消息路由
const fs = require('fs');
const path = require('path');
const { exec } = require('child_process');
const { promisify } = require('util');
const execAsync = promisify(exec);
class MessageRouter {
constructor(basePath = 'D:\\openclaw-multi-agent') {
this.basePath = basePath;
this.notificationsPath = path.join(basePath, 'shared', 'notifications');
this.queues = {
incoming: path.join(this.notificationsPath, 'incoming'),
processing: path.join(this.notificationsPath, 'processing'),
completed: path.join(this.notificationsPath, 'completed'),
errors: path.join(this.notificationsPath, 'errors')
};
this.routes = {
'[PM]': 'pm',
'[BE]': 'backend',
'[FE]': 'frontend',
'[QA]': 'qa',
'[UI]': 'ui-design',
'[OPS]': 'devops',
'[SALES]': 'sales',
'[CEO]': 'ceo',
'[CTO]': 'cto',
'[CPO]': 'cpo',
'[COO]': 'coo',
'[CS]': 'customer-success',
'[MKT]': 'marketing',
'[FINANCE]': 'finance',
'[LEGAL]': 'legal',
'[HR]': 'hr'
};
this.defaultRoute = 'pm';
// 确保目录存在
this.ensureDirectories();
}
// 确保所有必要的目录存在
ensureDirectories() {
Object.values(this.queues).forEach(dir => {
if (!fs.existsSync(dir)) {
fs.mkdirSync(dir, { recursive: true });
console.log(`Created directory: ${dir}`);
}
});
}
// 生成唯一的消息ID
generateMessageId() {
const timestamp = Date.now();
const random = Math.floor(Math.random() * 1000);
return `msg-${timestamp}-${random}`;
}
// 解析消息提取前缀和目标Agent
parseMessage(message) {
// 匹配 [前缀] 消息内容 格式
const prefixMatch = message.match(/^\[(\w+)\]\s*(.*)/);
if (!prefixMatch) {
throw new Error(`Invalid message format. Expected: [PREFIX] message. Got: ${message}`);
}
const prefix = prefixMatch[1];
const content = prefixMatch[2];
const routeKey = `[${prefix}]`;
// 查找目标Agent
let targetAgent = this.routes[routeKey];
if (!targetAgent) {
console.warn(`Unknown prefix: ${prefix}, using default route: ${this.defaultRoute}`);
targetAgent = this.defaultRoute;
}
return {
prefix,
content,
targetAgent,
originalMessage: message
};
}
// 保存消息到文件
async saveMessage(messageData, queueType) {
const messageId = this.generateMessageId();
const messageFile = {
id: messageId,
timestamp: new Date().toISOString(),
...messageData,
status: 'pending',
queue: queueType,
retryCount: 0
};
const filePath = path.join(this.queues[queueType], `${messageId}.json`);
await fs.promises.writeFile(
filePath,
JSON.stringify(messageFile, null, 2),
'utf8'
);
console.log(`Message saved: ${filePath}`);
return messageFile;
}
// 发送消息到Agent
async sendToAgent(agent, message) {
try {
// 转义消息中的特殊字符
const escapedMessage = message.replace(/"/g, '\\"');
const command = `openclaw agent --agent ${agent} --message "${escapedMessage}"`;
console.log(`Sending to agent ${agent}: ${message}`);
const { stdout, stderr } = await execAsync(command, {
cwd: this.basePath,
timeout: 30000 // 30秒超时
});
if (stderr && !stderr.includes('feishu_doc')) {
console.warn(`Agent ${agent} stderr: ${stderr}`);
}
return {
success: true,
agent,
messageId: null,
output: stdout
};
} catch (error) {
console.error(`Failed to send to agent ${agent}:`, error.message);
return {
success: false,
agent,
error: error.message,
stderr: error.stderr
};
}
}
// 处理单个消息
async processMessage(messageFile) {
const { targetAgent, originalMessage } = messageFile;
try {
// 移动消息到处理中队列
const oldPath = path.join(this.queues.incoming, `${messageFile.id}.json`);
const newPath = path.join(this.queues.processing, `${messageFile.id}.json`);
messageFile.status = 'processing';
messageFile.processingStart = new Date().toISOString();
await fs.promises.rename(oldPath, newPath);
await fs.promises.writeFile(newPath, JSON.stringify(messageFile, null, 2), 'utf8');
// 发送消息给Agent
const result = await this.sendToAgent(targetAgent, originalMessage);
// 更新消息状态
messageFile.status = result.success ? 'completed' : 'failed';
messageFile.processingEnd = new Date().toISOString();
messageFile.result = result;
// 移动到完成或错误队列
const finalQueue = result.success ? 'completed' : 'errors';
const finalPath = path.join(this.queues[finalQueue], `${messageFile.id}.json`);
await fs.promises.rename(newPath, finalPath);
await fs.promises.writeFile(finalPath, JSON.stringify(messageFile, null, 2), 'utf8');
console.log(`Message ${messageFile.id} processed: ${messageFile.status}`);
return result;
} catch (error) {
console.error(`Error processing message ${messageFile.id}:`, error);
// 移动到错误队列
messageFile.status = 'error';
messageFile.error = error.message;
const errorPath = path.join(this.queues.errors, `${messageFile.id}.json`);
await fs.promises.writeFile(errorPath, JSON.stringify(messageFile, null, 2), 'utf8');
return {
success: false,
error: error.message
};
}
}
// 处理所有待处理消息
async processAllMessages() {
try {
const files = await fs.promises.readdir(this.queues.incoming);
const jsonFiles = files.filter(file => file.endsWith('.json'));
console.log(`Found ${jsonFiles.length} messages to process`);
const results = [];
for (const file of jsonFiles) {
const filePath = path.join(this.queues.incoming, file);
const content = await fs.promises.readFile(filePath, 'utf8');
const messageFile = JSON.parse(content);
console.log(`Processing message: ${messageFile.id}`);
const result = await this.processMessage(messageFile);
results.push(result);
}
return results;
} catch (error) {
console.error('Error processing messages:', error);
return [];
}
}
// 接收新消息(主入口)
async receiveMessage(message) {
try {
console.log(`Receiving message: ${message}`);
// 解析消息
const parsed = this.parseMessage(message);
// 保存到接收队列
const messageFile = await this.saveMessage(parsed, 'incoming');
// 立即处理消息
const result = await this.processMessage(messageFile);
return {
success: true,
messageId: messageFile.id,
targetAgent: parsed.targetAgent,
processingResult: result
};
} catch (error) {
console.error('Error receiving message:', error);
return {
success: false,
error: error.message
};
}
}
// 获取系统状态
async getStatus() {
const status = {
queues: {},
routes: Object.keys(this.routes).length,
defaultRoute: this.defaultRoute,
timestamp: new Date().toISOString()
};
// 统计各队列消息数量
for (const [queueName, queuePath] of Object.entries(this.queues)) {
try {
const files = await fs.promises.readdir(queuePath);
const jsonFiles = files.filter(file => file.endsWith('.json'));
status.queues[queueName] = jsonFiles.length;
} catch (error) {
status.queues[queueName] = 0;
}
}
return status;
}
// 清理旧消息
async cleanupOldMessages(maxAgeHours = 24) {
const maxAgeMs = maxAgeHours * 60 * 60 * 1000;
const now = Date.now();
for (const queuePath of [this.queues.completed, this.queues.errors]) {
try {
const files = await fs.promises.readdir(queuePath);
for (const file of files) {
if (file.endsWith('.json')) {
const filePath = path.join(queuePath, file);
const stats = await fs.promises.stat(filePath);
const fileAge = now - stats.mtimeMs;
if (fileAge > maxAgeMs) {
await fs.promises.unlink(filePath);
console.log(`Cleaned up old message: ${file}`);
}
}
}
} catch (error) {
console.error(`Error cleaning up ${queuePath}:`, error);
}
}
}
}
// 命令行接口
if (require.main === module) {
const router = new MessageRouter();
const command = process.argv[2];
const message = process.argv.slice(3).join(' ');
async function main() {
switch (command) {
case 'receive':
if (!message) {
console.error('Usage: node message-router.js receive "[PREFIX] message"');
process.exit(1);
}
const result = await router.receiveMessage(message);
console.log(JSON.stringify(result, null, 2));
break;
case 'process':
const results = await router.processAllMessages();
console.log(`Processed ${results.length} messages`);
break;
case 'status':
const status = await router.getStatus();
console.log(JSON.stringify(status, null, 2));
break;
case 'cleanup':
await router.cleanupOldMessages();
console.log('Cleanup completed');
break;
case 'help':
default:
console.log(`
Message Router System - Usage:
node message-router.js receive "[PREFIX] message" # 接收并处理消息
node message-router.js process # 处理所有待处理消息
node message-router.js status # 查看系统状态
node message-router.js cleanup # 清理旧消息
node message-router.js help # 显示帮助
Supported prefixes:
${Object.keys(router.routes).join(', ')}
Example:
node message-router.js receive "[PM] 创建新任务"
`);
break;
}
}
main().catch(error => {
console.error('Fatal error:', error);
process.exit(1);
});
}
module.exports = MessageRouter;