369 lines
10 KiB
JavaScript
369 lines
10 KiB
JavaScript
// 消息路由系统 - 基础版本
|
||
// 文件位置: D:\openclaw-multi-agent\scripts\message-router.js
|
||
// 功能: 基于文件系统的简单消息路由
|
||
|
||
const fs = require('fs');
|
||
const path = require('path');
|
||
const { exec } = require('child_process');
|
||
const { promisify } = require('util');
|
||
|
||
const execAsync = promisify(exec);
|
||
|
||
class MessageRouter {
|
||
constructor(basePath = 'D:\\openclaw-multi-agent') {
|
||
this.basePath = basePath;
|
||
this.notificationsPath = path.join(basePath, 'shared', 'notifications');
|
||
this.queues = {
|
||
incoming: path.join(this.notificationsPath, 'incoming'),
|
||
processing: path.join(this.notificationsPath, 'processing'),
|
||
completed: path.join(this.notificationsPath, 'completed'),
|
||
errors: path.join(this.notificationsPath, 'errors')
|
||
};
|
||
|
||
this.routes = {
|
||
'[PM]': 'pm',
|
||
'[BE]': 'backend',
|
||
'[FE]': 'frontend',
|
||
'[QA]': 'qa',
|
||
'[UI]': 'ui-design',
|
||
'[OPS]': 'devops',
|
||
'[SALES]': 'sales',
|
||
'[CEO]': 'ceo',
|
||
'[CTO]': 'cto',
|
||
'[CPO]': 'cpo',
|
||
'[COO]': 'coo',
|
||
'[CS]': 'customer-success',
|
||
'[MKT]': 'marketing',
|
||
'[FINANCE]': 'finance',
|
||
'[LEGAL]': 'legal',
|
||
'[HR]': 'hr'
|
||
};
|
||
|
||
this.defaultRoute = 'pm';
|
||
|
||
// 确保目录存在
|
||
this.ensureDirectories();
|
||
}
|
||
|
||
// 确保所有必要的目录存在
|
||
ensureDirectories() {
|
||
Object.values(this.queues).forEach(dir => {
|
||
if (!fs.existsSync(dir)) {
|
||
fs.mkdirSync(dir, { recursive: true });
|
||
console.log(`Created directory: ${dir}`);
|
||
}
|
||
});
|
||
}
|
||
|
||
// 生成唯一的消息ID
|
||
generateMessageId() {
|
||
const timestamp = Date.now();
|
||
const random = Math.floor(Math.random() * 1000);
|
||
return `msg-${timestamp}-${random}`;
|
||
}
|
||
|
||
// 解析消息,提取前缀和目标Agent
|
||
parseMessage(message) {
|
||
// 匹配 [前缀] 消息内容 格式
|
||
const prefixMatch = message.match(/^\[(\w+)\]\s*(.*)/);
|
||
|
||
if (!prefixMatch) {
|
||
throw new Error(`Invalid message format. Expected: [PREFIX] message. Got: ${message}`);
|
||
}
|
||
|
||
const prefix = prefixMatch[1];
|
||
const content = prefixMatch[2];
|
||
const routeKey = `[${prefix}]`;
|
||
|
||
// 查找目标Agent
|
||
let targetAgent = this.routes[routeKey];
|
||
|
||
if (!targetAgent) {
|
||
console.warn(`Unknown prefix: ${prefix}, using default route: ${this.defaultRoute}`);
|
||
targetAgent = this.defaultRoute;
|
||
}
|
||
|
||
return {
|
||
prefix,
|
||
content,
|
||
targetAgent,
|
||
originalMessage: message
|
||
};
|
||
}
|
||
|
||
// 保存消息到文件
|
||
async saveMessage(messageData, queueType) {
|
||
const messageId = this.generateMessageId();
|
||
const messageFile = {
|
||
id: messageId,
|
||
timestamp: new Date().toISOString(),
|
||
...messageData,
|
||
status: 'pending',
|
||
queue: queueType,
|
||
retryCount: 0
|
||
};
|
||
|
||
const filePath = path.join(this.queues[queueType], `${messageId}.json`);
|
||
|
||
await fs.promises.writeFile(
|
||
filePath,
|
||
JSON.stringify(messageFile, null, 2),
|
||
'utf8'
|
||
);
|
||
|
||
console.log(`Message saved: ${filePath}`);
|
||
return messageFile;
|
||
}
|
||
|
||
// 发送消息到Agent
|
||
async sendToAgent(agent, message) {
|
||
try {
|
||
// 转义消息中的特殊字符
|
||
const escapedMessage = message.replace(/"/g, '\\"');
|
||
const command = `openclaw agent --agent ${agent} --message "${escapedMessage}"`;
|
||
|
||
console.log(`Sending to agent ${agent}: ${message}`);
|
||
|
||
const { stdout, stderr } = await execAsync(command, {
|
||
cwd: this.basePath,
|
||
timeout: 30000 // 30秒超时
|
||
});
|
||
|
||
if (stderr && !stderr.includes('feishu_doc')) {
|
||
console.warn(`Agent ${agent} stderr: ${stderr}`);
|
||
}
|
||
|
||
return {
|
||
success: true,
|
||
agent,
|
||
messageId: null,
|
||
output: stdout
|
||
};
|
||
} catch (error) {
|
||
console.error(`Failed to send to agent ${agent}:`, error.message);
|
||
return {
|
||
success: false,
|
||
agent,
|
||
error: error.message,
|
||
stderr: error.stderr
|
||
};
|
||
}
|
||
}
|
||
|
||
// 处理单个消息
|
||
async processMessage(messageFile) {
|
||
const { targetAgent, originalMessage } = messageFile;
|
||
|
||
try {
|
||
// 移动消息到处理中队列
|
||
const oldPath = path.join(this.queues.incoming, `${messageFile.id}.json`);
|
||
const newPath = path.join(this.queues.processing, `${messageFile.id}.json`);
|
||
|
||
messageFile.status = 'processing';
|
||
messageFile.processingStart = new Date().toISOString();
|
||
|
||
await fs.promises.rename(oldPath, newPath);
|
||
await fs.promises.writeFile(newPath, JSON.stringify(messageFile, null, 2), 'utf8');
|
||
|
||
// 发送消息给Agent
|
||
const result = await this.sendToAgent(targetAgent, originalMessage);
|
||
|
||
// 更新消息状态
|
||
messageFile.status = result.success ? 'completed' : 'failed';
|
||
messageFile.processingEnd = new Date().toISOString();
|
||
messageFile.result = result;
|
||
|
||
// 移动到完成或错误队列
|
||
const finalQueue = result.success ? 'completed' : 'errors';
|
||
const finalPath = path.join(this.queues[finalQueue], `${messageFile.id}.json`);
|
||
|
||
await fs.promises.rename(newPath, finalPath);
|
||
await fs.promises.writeFile(finalPath, JSON.stringify(messageFile, null, 2), 'utf8');
|
||
|
||
console.log(`Message ${messageFile.id} processed: ${messageFile.status}`);
|
||
return result;
|
||
|
||
} catch (error) {
|
||
console.error(`Error processing message ${messageFile.id}:`, error);
|
||
|
||
// 移动到错误队列
|
||
messageFile.status = 'error';
|
||
messageFile.error = error.message;
|
||
|
||
const errorPath = path.join(this.queues.errors, `${messageFile.id}.json`);
|
||
await fs.promises.writeFile(errorPath, JSON.stringify(messageFile, null, 2), 'utf8');
|
||
|
||
return {
|
||
success: false,
|
||
error: error.message
|
||
};
|
||
}
|
||
}
|
||
|
||
// 处理所有待处理消息
|
||
async processAllMessages() {
|
||
try {
|
||
const files = await fs.promises.readdir(this.queues.incoming);
|
||
const jsonFiles = files.filter(file => file.endsWith('.json'));
|
||
|
||
console.log(`Found ${jsonFiles.length} messages to process`);
|
||
|
||
const results = [];
|
||
|
||
for (const file of jsonFiles) {
|
||
const filePath = path.join(this.queues.incoming, file);
|
||
const content = await fs.promises.readFile(filePath, 'utf8');
|
||
const messageFile = JSON.parse(content);
|
||
|
||
console.log(`Processing message: ${messageFile.id}`);
|
||
const result = await this.processMessage(messageFile);
|
||
results.push(result);
|
||
}
|
||
|
||
return results;
|
||
} catch (error) {
|
||
console.error('Error processing messages:', error);
|
||
return [];
|
||
}
|
||
}
|
||
|
||
// 接收新消息(主入口)
|
||
async receiveMessage(message) {
|
||
try {
|
||
console.log(`Receiving message: ${message}`);
|
||
|
||
// 解析消息
|
||
const parsed = this.parseMessage(message);
|
||
|
||
// 保存到接收队列
|
||
const messageFile = await this.saveMessage(parsed, 'incoming');
|
||
|
||
// 立即处理消息
|
||
const result = await this.processMessage(messageFile);
|
||
|
||
return {
|
||
success: true,
|
||
messageId: messageFile.id,
|
||
targetAgent: parsed.targetAgent,
|
||
processingResult: result
|
||
};
|
||
} catch (error) {
|
||
console.error('Error receiving message:', error);
|
||
return {
|
||
success: false,
|
||
error: error.message
|
||
};
|
||
}
|
||
}
|
||
|
||
// 获取系统状态
|
||
async getStatus() {
|
||
const status = {
|
||
queues: {},
|
||
routes: Object.keys(this.routes).length,
|
||
defaultRoute: this.defaultRoute,
|
||
timestamp: new Date().toISOString()
|
||
};
|
||
|
||
// 统计各队列消息数量
|
||
for (const [queueName, queuePath] of Object.entries(this.queues)) {
|
||
try {
|
||
const files = await fs.promises.readdir(queuePath);
|
||
const jsonFiles = files.filter(file => file.endsWith('.json'));
|
||
status.queues[queueName] = jsonFiles.length;
|
||
} catch (error) {
|
||
status.queues[queueName] = 0;
|
||
}
|
||
}
|
||
|
||
return status;
|
||
}
|
||
|
||
// 清理旧消息
|
||
async cleanupOldMessages(maxAgeHours = 24) {
|
||
const maxAgeMs = maxAgeHours * 60 * 60 * 1000;
|
||
const now = Date.now();
|
||
|
||
for (const queuePath of [this.queues.completed, this.queues.errors]) {
|
||
try {
|
||
const files = await fs.promises.readdir(queuePath);
|
||
|
||
for (const file of files) {
|
||
if (file.endsWith('.json')) {
|
||
const filePath = path.join(queuePath, file);
|
||
const stats = await fs.promises.stat(filePath);
|
||
const fileAge = now - stats.mtimeMs;
|
||
|
||
if (fileAge > maxAgeMs) {
|
||
await fs.promises.unlink(filePath);
|
||
console.log(`Cleaned up old message: ${file}`);
|
||
}
|
||
}
|
||
}
|
||
} catch (error) {
|
||
console.error(`Error cleaning up ${queuePath}:`, error);
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
// 命令行接口
|
||
if (require.main === module) {
|
||
const router = new MessageRouter();
|
||
|
||
const command = process.argv[2];
|
||
const message = process.argv.slice(3).join(' ');
|
||
|
||
async function main() {
|
||
switch (command) {
|
||
case 'receive':
|
||
if (!message) {
|
||
console.error('Usage: node message-router.js receive "[PREFIX] message"');
|
||
process.exit(1);
|
||
}
|
||
const result = await router.receiveMessage(message);
|
||
console.log(JSON.stringify(result, null, 2));
|
||
break;
|
||
|
||
case 'process':
|
||
const results = await router.processAllMessages();
|
||
console.log(`Processed ${results.length} messages`);
|
||
break;
|
||
|
||
case 'status':
|
||
const status = await router.getStatus();
|
||
console.log(JSON.stringify(status, null, 2));
|
||
break;
|
||
|
||
case 'cleanup':
|
||
await router.cleanupOldMessages();
|
||
console.log('Cleanup completed');
|
||
break;
|
||
|
||
case 'help':
|
||
default:
|
||
console.log(`
|
||
Message Router System - Usage:
|
||
node message-router.js receive "[PREFIX] message" # 接收并处理消息
|
||
node message-router.js process # 处理所有待处理消息
|
||
node message-router.js status # 查看系统状态
|
||
node message-router.js cleanup # 清理旧消息
|
||
node message-router.js help # 显示帮助
|
||
|
||
Supported prefixes:
|
||
${Object.keys(router.routes).join(', ')}
|
||
|
||
Example:
|
||
node message-router.js receive "[PM] 创建新任务"
|
||
`);
|
||
break;
|
||
}
|
||
}
|
||
|
||
main().catch(error => {
|
||
console.error('Fatal error:', error);
|
||
process.exit(1);
|
||
});
|
||
}
|
||
|
||
module.exports = MessageRouter; |