Files

100 lines
2.8 KiB
Python
Raw Permalink Normal View History

2026-01-19 00:09:36 +08:00
"""
节点测试API
"""
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from pydantic import BaseModel
from typing import Dict, Any, Optional
import logging
from app.core.database import get_db
from app.api.auth import get_current_user
from app.models.user import User
from app.services.workflow_engine import WorkflowEngine
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v1/nodes", tags=["nodes"])
class NodeTestRequest(BaseModel):
"""节点测试请求模型"""
node: Dict[str, Any]
input_data: Dict[str, Any]
class NodeTestResponse(BaseModel):
"""节点测试响应模型"""
status: str
output: Any
execution_time: int
error_message: Optional[str] = None
@router.post("/test", response_model=NodeTestResponse)
async def test_node(
request: NodeTestRequest,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
测试单个节点
接收节点配置和输入数据执行节点并返回结果
"""
import asyncio
import time
node = request.node
input_data = request.input_data
if not node:
raise HTTPException(status_code=400, detail="节点配置不能为空")
start_time = time.time()
try:
# 创建一个临时的工作流引擎来执行单个节点
# 只需要节点本身,不需要完整的工作流
workflow_data = {
"nodes": [node],
"edges": []
}
engine = WorkflowEngine("test-node", workflow_data, db=db)
# 执行节点
result = await engine.execute_node(node, input_data)
execution_time = int((time.time() - start_time) * 1000)
# 检查节点执行状态
node_status = result.get("status", "success")
if node_status == "failed" or node_status == "error":
# 节点执行失败,返回错误信息
error_msg = result.get("error", result.get("error_message", "节点执行失败"))
return NodeTestResponse(
status="error",
output=result.get("output"),
execution_time=execution_time,
error_message=error_msg
)
# 节点执行成功
output = result.get("output", result)
return NodeTestResponse(
status="success",
output=output,
execution_time=execution_time
)
except Exception as e:
execution_time = int((time.time() - start_time) * 1000)
logger.error(f"节点测试失败: {str(e)}", exc_info=True)
return NodeTestResponse(
status="error",
output=None,
execution_time=execution_time,
error_message=str(e)
)