Files
aiagent/WebSocket实时推送说明.md
2026-01-19 00:09:36 +08:00

6.0 KiB
Raw Permalink Blame History

WebSocket实时推送说明

已完成

已实现WebSocket实时推送功能可以实时推送工作流执行状态。

功能特性

1. WebSocket连接管理器 (backend/app/websocket/manager.py)

  • 管理多个WebSocket连接
  • 支持按执行ID分组连接
  • 支持广播消息到特定执行的所有连接
  • 自动处理连接断开

2. WebSocket API (backend/app/api/websocket.py)

  • WebSocket端点/api/v1/ws/executions/{execution_id}
  • 实时推送执行状态更新
  • 支持心跳检测ping/pong
  • 自动断开已完成或失败的执行

WebSocket消息格式

客户端 → 服务器

心跳消息

{
  "type": "ping"
}

服务器 → 客户端

状态更新消息

{
  "type": "status",
  "execution_id": "execution-uuid",
  "status": "running",
  "progress": 50,
  "message": "执行中...",
  "result": null,
  "error": null,
  "execution_time": null
}

心跳响应

{
  "type": "pong"
}

错误消息

{
  "type": "error",
  "message": "错误描述"
}

状态值说明

  • pending: 等待执行
  • running: 执行中
  • completed: 执行完成
  • failed: 执行失败

使用方法

1. 建立WebSocket连接

const executionId = 'your-execution-id';
const ws = new WebSocket(`ws://localhost:8037/api/v1/ws/executions/${executionId}`);

ws.onopen = () => {
  console.log('WebSocket连接已建立');
};

ws.onmessage = (event) => {
  const message = JSON.parse(event.data);
  console.log('收到消息:', message);
  
  switch (message.type) {
    case 'status':
      // 更新执行状态
      updateExecutionStatus(message);
      break;
    case 'error':
      // 显示错误
      showError(message.message);
      break;
    case 'pong':
      // 心跳响应
      break;
  }
};

ws.onerror = (error) => {
  console.error('WebSocket错误:', error);
};

ws.onclose = () => {
  console.log('WebSocket连接已关闭');
};

2. 发送心跳消息

// 定期发送心跳每30秒
setInterval(() => {
  if (ws.readyState === WebSocket.OPEN) {
    ws.send(JSON.stringify({ type: 'ping' }));
  }
}, 30000);

3. 处理状态更新

function updateExecutionStatus(message) {
  const { status, progress, result, error, execution_time } = message;
  
  // 更新UI
  document.getElementById('status').textContent = status;
  document.getElementById('progress').style.width = `${progress}%`;
  
  if (status === 'completed') {
    // 显示结果
    displayResult(result);
  } else if (status === 'failed') {
    // 显示错误
    displayError(error);
  }
}

前端集成示例Vue 3

<template>
  <div>
    <div>状态: {{ executionStatus }}</div>
    <div>进度: {{ progress }}%</div>
    <div v-if="result">结果: {{ result }}</div>
    <div v-if="error" class="error">错误: {{ error }}</div>
  </div>
</template>

<script setup lang="ts">
import { ref, onMounted, onUnmounted } from 'vue'

const props = defineProps<{
  executionId: string
}>()

const executionStatus = ref('pending')
const progress = ref(0)
const result = ref(null)
const error = ref(null)
let ws: WebSocket | null = null

onMounted(() => {
  // 获取WebSocket URL
  const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'
  const hostname = window.location.hostname
  const wsUrl = `${protocol}//${hostname}:8037/api/v1/ws/executions/${props.executionId}`
  
  ws = new WebSocket(wsUrl)
  
  ws.onopen = () => {
    console.log('WebSocket连接已建立')
  }
  
  ws.onmessage = (event) => {
    const message = JSON.parse(event.data)
    
    if (message.type === 'status') {
      executionStatus.value = message.status
      progress.value = message.progress || 0
      result.value = message.result
      error.value = message.error
    }
  }
  
  ws.onerror = (err) => {
    console.error('WebSocket错误:', err)
  }
  
  ws.onclose = () => {
    console.log('WebSocket连接已关闭')
  }
  
  // 心跳
  const heartbeat = setInterval(() => {
    if (ws?.readyState === WebSocket.OPEN) {
      ws.send(JSON.stringify({ type: 'ping' }))
    }
  }, 30000)
  
  onUnmounted(() => {
    clearInterval(heartbeat)
    ws?.close()
  })
})
</script>

完整示例

1. 执行工作流并监听状态

// 1. 创建执行任务
const response = await fetch('/api/v1/executions', {
  method: 'POST',
  headers: {
    'Content-Type': 'application/json',
    'Authorization': `Bearer ${token}`
  },
  body: JSON.stringify({
    workflow_id: 'workflow-id',
    input_data: { input: 'test' }
  })
})

const execution = await response.json()
const executionId = execution.id

// 2. 建立WebSocket连接
const ws = new WebSocket(`ws://localhost:8037/api/v1/ws/executions/${executionId}`)

ws.onmessage = (event) => {
  const message = JSON.parse(event.data)
  if (message.type === 'status') {
    console.log('执行状态:', message.status)
    console.log('进度:', message.progress)
    
    if (message.status === 'completed') {
      console.log('执行结果:', message.result)
    } else if (message.status === 'failed') {
      console.error('执行失败:', message.error)
    }
  }
}

注意事项

  1. 连接管理

    • WebSocket连接会在执行完成或失败后自动断开
    • 客户端应该处理连接断开的情况
    • 建议实现重连机制
  2. 心跳检测

    • 客户端应该定期发送ping消息
    • 服务器会响应pong消息
    • 如果长时间没有收到消息,连接可能会被关闭
  3. 错误处理

    • 处理网络错误
    • 处理执行失败的情况
    • 显示友好的错误信息
  4. 性能考虑

    • 状态更新频率每2秒更新一次
    • 多个客户端可以同时监听同一个执行
    • 连接会在执行完成后自动清理

后续计划

  • 前端WebSocket组件封装
  • 执行进度百分比计算
  • 节点级别的状态推送
  • 执行日志实时推送
  • WebSocket认证支持

状态: 后端已完成,前端集成待完成 时间: 2024年