Files
aiagent/backend/app/api/websocket.py
2026-01-19 00:09:36 +08:00

122 lines
4.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
WebSocket API
"""
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
from app.websocket.manager import websocket_manager
from app.core.database import SessionLocal
from app.models.execution import Execution
from typing import Optional
import json
import asyncio
router = APIRouter()
@router.websocket("/api/v1/ws/executions/{execution_id}")
async def websocket_execution_status(
websocket: WebSocket,
execution_id: str,
token: Optional[str] = None
):
"""
WebSocket实时推送执行状态
Args:
websocket: WebSocket连接
execution_id: 执行记录ID
token: JWT Token可选通过query参数传递
"""
# 验证token可选如果需要认证
# user = await get_current_user_optional(token)
# 建立连接
await websocket_manager.connect(websocket, execution_id)
db = SessionLocal()
try:
# 发送初始状态
execution = db.query(Execution).filter(Execution.id == execution_id).first()
if execution:
await websocket_manager.send_personal_message({
"type": "status",
"execution_id": execution_id,
"status": execution.status,
"progress": 0,
"message": "连接已建立"
}, websocket)
else:
await websocket_manager.send_personal_message({
"type": "error",
"message": f"执行记录 {execution_id} 不存在"
}, websocket)
await websocket.close()
return
# 持续监听并推送状态更新
while True:
try:
# 接收客户端消息(心跳等)
data = await websocket.receive_text()
# 处理客户端消息
try:
message = json.loads(data)
if message.get("type") == "ping":
await websocket_manager.send_personal_message({
"type": "pong"
}, websocket)
except:
pass
except WebSocketDisconnect:
break
# 检查执行状态
db.refresh(execution)
# 如果执行完成或失败,发送最终状态并断开
if execution.status in ["completed", "failed"]:
await websocket_manager.send_personal_message({
"type": "status",
"execution_id": execution_id,
"status": execution.status,
"progress": 100,
"result": execution.output_data if execution.status == "completed" else None,
"error": execution.error_message if execution.status == "failed" else None,
"execution_time": execution.execution_time
}, websocket)
# 等待一下再断开,确保客户端收到消息
await asyncio.sleep(1)
break
# 定期发送状态更新每2秒
await asyncio.sleep(2)
# 重新查询执行状态
db.refresh(execution)
await websocket_manager.send_personal_message({
"type": "status",
"execution_id": execution_id,
"status": execution.status,
"progress": 50 if execution.status == "running" else 0,
"message": f"执行中..." if execution.status == "running" else "等待执行"
}, websocket)
except WebSocketDisconnect:
pass
except Exception as e:
print(f"WebSocket错误: {e}")
try:
await websocket_manager.send_personal_message({
"type": "error",
"message": f"发生错误: {str(e)}"
}, websocket)
except:
pass
finally:
websocket_manager.disconnect(websocket, execution_id)
db.close()