287 lines
6.0 KiB
Markdown
287 lines
6.0 KiB
Markdown
|
|
# WebSocket实时推送说明
|
|||
|
|
|
|||
|
|
## ✅ 已完成
|
|||
|
|
|
|||
|
|
已实现WebSocket实时推送功能,可以实时推送工作流执行状态。
|
|||
|
|
|
|||
|
|
## 功能特性
|
|||
|
|
|
|||
|
|
### 1. WebSocket连接管理器 (`backend/app/websocket/manager.py`)
|
|||
|
|
|
|||
|
|
- 管理多个WebSocket连接
|
|||
|
|
- 支持按执行ID分组连接
|
|||
|
|
- 支持广播消息到特定执行的所有连接
|
|||
|
|
- 自动处理连接断开
|
|||
|
|
|
|||
|
|
### 2. WebSocket API (`backend/app/api/websocket.py`)
|
|||
|
|
|
|||
|
|
- WebSocket端点:`/api/v1/ws/executions/{execution_id}`
|
|||
|
|
- 实时推送执行状态更新
|
|||
|
|
- 支持心跳检测(ping/pong)
|
|||
|
|
- 自动断开已完成或失败的执行
|
|||
|
|
|
|||
|
|
## WebSocket消息格式
|
|||
|
|
|
|||
|
|
### 客户端 → 服务器
|
|||
|
|
|
|||
|
|
#### 心跳消息
|
|||
|
|
```json
|
|||
|
|
{
|
|||
|
|
"type": "ping"
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
### 服务器 → 客户端
|
|||
|
|
|
|||
|
|
#### 状态更新消息
|
|||
|
|
```json
|
|||
|
|
{
|
|||
|
|
"type": "status",
|
|||
|
|
"execution_id": "execution-uuid",
|
|||
|
|
"status": "running",
|
|||
|
|
"progress": 50,
|
|||
|
|
"message": "执行中...",
|
|||
|
|
"result": null,
|
|||
|
|
"error": null,
|
|||
|
|
"execution_time": null
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
#### 心跳响应
|
|||
|
|
```json
|
|||
|
|
{
|
|||
|
|
"type": "pong"
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
#### 错误消息
|
|||
|
|
```json
|
|||
|
|
{
|
|||
|
|
"type": "error",
|
|||
|
|
"message": "错误描述"
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
## 状态值说明
|
|||
|
|
|
|||
|
|
- `pending`: 等待执行
|
|||
|
|
- `running`: 执行中
|
|||
|
|
- `completed`: 执行完成
|
|||
|
|
- `failed`: 执行失败
|
|||
|
|
|
|||
|
|
## 使用方法
|
|||
|
|
|
|||
|
|
### 1. 建立WebSocket连接
|
|||
|
|
|
|||
|
|
```javascript
|
|||
|
|
const executionId = 'your-execution-id';
|
|||
|
|
const ws = new WebSocket(`ws://localhost:8037/api/v1/ws/executions/${executionId}`);
|
|||
|
|
|
|||
|
|
ws.onopen = () => {
|
|||
|
|
console.log('WebSocket连接已建立');
|
|||
|
|
};
|
|||
|
|
|
|||
|
|
ws.onmessage = (event) => {
|
|||
|
|
const message = JSON.parse(event.data);
|
|||
|
|
console.log('收到消息:', message);
|
|||
|
|
|
|||
|
|
switch (message.type) {
|
|||
|
|
case 'status':
|
|||
|
|
// 更新执行状态
|
|||
|
|
updateExecutionStatus(message);
|
|||
|
|
break;
|
|||
|
|
case 'error':
|
|||
|
|
// 显示错误
|
|||
|
|
showError(message.message);
|
|||
|
|
break;
|
|||
|
|
case 'pong':
|
|||
|
|
// 心跳响应
|
|||
|
|
break;
|
|||
|
|
}
|
|||
|
|
};
|
|||
|
|
|
|||
|
|
ws.onerror = (error) => {
|
|||
|
|
console.error('WebSocket错误:', error);
|
|||
|
|
};
|
|||
|
|
|
|||
|
|
ws.onclose = () => {
|
|||
|
|
console.log('WebSocket连接已关闭');
|
|||
|
|
};
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
### 2. 发送心跳消息
|
|||
|
|
|
|||
|
|
```javascript
|
|||
|
|
// 定期发送心跳(每30秒)
|
|||
|
|
setInterval(() => {
|
|||
|
|
if (ws.readyState === WebSocket.OPEN) {
|
|||
|
|
ws.send(JSON.stringify({ type: 'ping' }));
|
|||
|
|
}
|
|||
|
|
}, 30000);
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
### 3. 处理状态更新
|
|||
|
|
|
|||
|
|
```javascript
|
|||
|
|
function updateExecutionStatus(message) {
|
|||
|
|
const { status, progress, result, error, execution_time } = message;
|
|||
|
|
|
|||
|
|
// 更新UI
|
|||
|
|
document.getElementById('status').textContent = status;
|
|||
|
|
document.getElementById('progress').style.width = `${progress}%`;
|
|||
|
|
|
|||
|
|
if (status === 'completed') {
|
|||
|
|
// 显示结果
|
|||
|
|
displayResult(result);
|
|||
|
|
} else if (status === 'failed') {
|
|||
|
|
// 显示错误
|
|||
|
|
displayError(error);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
## 前端集成示例(Vue 3)
|
|||
|
|
|
|||
|
|
```vue
|
|||
|
|
<template>
|
|||
|
|
<div>
|
|||
|
|
<div>状态: {{ executionStatus }}</div>
|
|||
|
|
<div>进度: {{ progress }}%</div>
|
|||
|
|
<div v-if="result">结果: {{ result }}</div>
|
|||
|
|
<div v-if="error" class="error">错误: {{ error }}</div>
|
|||
|
|
</div>
|
|||
|
|
</template>
|
|||
|
|
|
|||
|
|
<script setup lang="ts">
|
|||
|
|
import { ref, onMounted, onUnmounted } from 'vue'
|
|||
|
|
|
|||
|
|
const props = defineProps<{
|
|||
|
|
executionId: string
|
|||
|
|
}>()
|
|||
|
|
|
|||
|
|
const executionStatus = ref('pending')
|
|||
|
|
const progress = ref(0)
|
|||
|
|
const result = ref(null)
|
|||
|
|
const error = ref(null)
|
|||
|
|
let ws: WebSocket | null = null
|
|||
|
|
|
|||
|
|
onMounted(() => {
|
|||
|
|
// 获取WebSocket URL
|
|||
|
|
const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'
|
|||
|
|
const hostname = window.location.hostname
|
|||
|
|
const wsUrl = `${protocol}//${hostname}:8037/api/v1/ws/executions/${props.executionId}`
|
|||
|
|
|
|||
|
|
ws = new WebSocket(wsUrl)
|
|||
|
|
|
|||
|
|
ws.onopen = () => {
|
|||
|
|
console.log('WebSocket连接已建立')
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
ws.onmessage = (event) => {
|
|||
|
|
const message = JSON.parse(event.data)
|
|||
|
|
|
|||
|
|
if (message.type === 'status') {
|
|||
|
|
executionStatus.value = message.status
|
|||
|
|
progress.value = message.progress || 0
|
|||
|
|
result.value = message.result
|
|||
|
|
error.value = message.error
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
ws.onerror = (err) => {
|
|||
|
|
console.error('WebSocket错误:', err)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
ws.onclose = () => {
|
|||
|
|
console.log('WebSocket连接已关闭')
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 心跳
|
|||
|
|
const heartbeat = setInterval(() => {
|
|||
|
|
if (ws?.readyState === WebSocket.OPEN) {
|
|||
|
|
ws.send(JSON.stringify({ type: 'ping' }))
|
|||
|
|
}
|
|||
|
|
}, 30000)
|
|||
|
|
|
|||
|
|
onUnmounted(() => {
|
|||
|
|
clearInterval(heartbeat)
|
|||
|
|
ws?.close()
|
|||
|
|
})
|
|||
|
|
})
|
|||
|
|
</script>
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
## 完整示例
|
|||
|
|
|
|||
|
|
### 1. 执行工作流并监听状态
|
|||
|
|
|
|||
|
|
```javascript
|
|||
|
|
// 1. 创建执行任务
|
|||
|
|
const response = await fetch('/api/v1/executions', {
|
|||
|
|
method: 'POST',
|
|||
|
|
headers: {
|
|||
|
|
'Content-Type': 'application/json',
|
|||
|
|
'Authorization': `Bearer ${token}`
|
|||
|
|
},
|
|||
|
|
body: JSON.stringify({
|
|||
|
|
workflow_id: 'workflow-id',
|
|||
|
|
input_data: { input: 'test' }
|
|||
|
|
})
|
|||
|
|
})
|
|||
|
|
|
|||
|
|
const execution = await response.json()
|
|||
|
|
const executionId = execution.id
|
|||
|
|
|
|||
|
|
// 2. 建立WebSocket连接
|
|||
|
|
const ws = new WebSocket(`ws://localhost:8037/api/v1/ws/executions/${executionId}`)
|
|||
|
|
|
|||
|
|
ws.onmessage = (event) => {
|
|||
|
|
const message = JSON.parse(event.data)
|
|||
|
|
if (message.type === 'status') {
|
|||
|
|
console.log('执行状态:', message.status)
|
|||
|
|
console.log('进度:', message.progress)
|
|||
|
|
|
|||
|
|
if (message.status === 'completed') {
|
|||
|
|
console.log('执行结果:', message.result)
|
|||
|
|
} else if (message.status === 'failed') {
|
|||
|
|
console.error('执行失败:', message.error)
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
## 注意事项
|
|||
|
|
|
|||
|
|
1. **连接管理**:
|
|||
|
|
- WebSocket连接会在执行完成或失败后自动断开
|
|||
|
|
- 客户端应该处理连接断开的情况
|
|||
|
|
- 建议实现重连机制
|
|||
|
|
|
|||
|
|
2. **心跳检测**:
|
|||
|
|
- 客户端应该定期发送ping消息
|
|||
|
|
- 服务器会响应pong消息
|
|||
|
|
- 如果长时间没有收到消息,连接可能会被关闭
|
|||
|
|
|
|||
|
|
3. **错误处理**:
|
|||
|
|
- 处理网络错误
|
|||
|
|
- 处理执行失败的情况
|
|||
|
|
- 显示友好的错误信息
|
|||
|
|
|
|||
|
|
4. **性能考虑**:
|
|||
|
|
- 状态更新频率:每2秒更新一次
|
|||
|
|
- 多个客户端可以同时监听同一个执行
|
|||
|
|
- 连接会在执行完成后自动清理
|
|||
|
|
|
|||
|
|
## 后续计划
|
|||
|
|
|
|||
|
|
- [ ] 前端WebSocket组件封装
|
|||
|
|
- [ ] 执行进度百分比计算
|
|||
|
|
- [ ] 节点级别的状态推送
|
|||
|
|
- [ ] 执行日志实时推送
|
|||
|
|
- [ ] WebSocket认证支持
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
**状态**: ✅ 后端已完成,前端集成待完成
|
|||
|
|
**时间**: 2024年
|