""" 执行结果展示和WebSocket功能测试脚本 """ import asyncio import sys import os import json import time # 添加项目路径 sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from app.core.database import SessionLocal from app.models.workflow import Workflow from app.models.execution import Execution from app.models.user import User from app.services.workflow_engine import WorkflowEngine import uuid def test_execution_api(): """测试执行API""" print("=" * 60) print("测试1: 执行API功能") print("=" * 60) db = SessionLocal() try: # 获取或创建测试用户 user = db.query(User).first() if not user: print("❌ 没有找到用户,请先创建用户") return False # 获取或创建测试工作流 workflow = db.query(Workflow).filter(Workflow.user_id == user.id).first() if not workflow: print("❌ 没有找到工作流,请先创建工作流") return False print(f"✅ 使用工作流: {workflow.id} ({workflow.name})") # 创建执行记录 execution = Execution( workflow_id=str(workflow.id), input_data={"input": "测试输入数据"}, status="pending" ) db.add(execution) db.commit() db.refresh(execution) print(f"✅ 创建执行记录: {execution.id}") print(f" 状态: {execution.status}") print(f" 工作流ID: {execution.workflow_id}") print(f" 输入数据: {execution.input_data}") # 测试获取执行列表 executions = db.query(Execution).filter( Execution.workflow_id == workflow.id ).limit(5).all() print(f"\n✅ 获取执行列表: 共 {len(executions)} 条记录") for i, exec in enumerate(executions[:3], 1): print(f" {i}. ID: {exec.id[:8]}..., 状态: {exec.status}, 创建时间: {exec.created_at}") # 测试获取执行详情 execution_detail = db.query(Execution).filter( Execution.id == execution.id ).first() if execution_detail: print(f"\n✅ 获取执行详情成功") print(f" ID: {execution_detail.id}") print(f" 状态: {execution_detail.status}") print(f" 输入数据: {execution_detail.input_data}") print(f" 输出数据: {execution_detail.output_data or '暂无'}") return True except Exception as e: print(f"❌ 测试失败: {str(e)}") import traceback traceback.print_exc() return False finally: db.close() async def test_workflow_execution(): """测试工作流执行""" print("\n" + "=" * 60) print("测试2: 工作流执行") print("=" * 60) db = SessionLocal() try: # 获取测试工作流 workflow = db.query(Workflow).first() if not workflow: print("❌ 没有找到工作流") return False print(f"✅ 使用工作流: {workflow.id}") print(f" 节点数: {len(workflow.nodes)}") print(f" 边数: {len(workflow.edges)}") # 创建执行记录 execution = Execution( workflow_id=str(workflow.id), input_data={"input": "测试执行"}, status="pending" ) db.add(execution) db.commit() db.refresh(execution) print(f"✅ 创建执行记录: {execution.id}") # 执行工作流 workflow_data = { 'nodes': workflow.nodes, 'edges': workflow.edges } print("\n🔄 开始执行工作流...") start_time = time.time() engine = WorkflowEngine(str(workflow.id), workflow_data) result = await engine.execute(execution.input_data) execution_time = int((time.time() - start_time) * 1000) # 更新执行记录 execution.status = "completed" execution.output_data = result execution.execution_time = execution_time db.commit() db.refresh(execution) print(f"✅ 工作流执行完成") print(f" 执行时间: {execution_time}ms") print(f" 状态: {execution.status}") print(f" 结果: {json.dumps(result, ensure_ascii=False, indent=2)[:200]}...") return True except Exception as e: print(f"❌ 执行失败: {str(e)}") import traceback traceback.print_exc() return False finally: db.close() def test_execution_status_api(): """测试执行状态API""" print("\n" + "=" * 60) print("测试3: 执行状态API") print("=" * 60) db = SessionLocal() try: # 获取执行记录 execution = db.query(Execution).order_by(Execution.created_at.desc()).first() if not execution: print("❌ 没有找到执行记录") return False print(f"✅ 测试执行记录: {execution.id}") print(f" 当前状态: {execution.status}") print(f" 执行时间: {execution.execution_time or 'N/A'}ms") print(f" 输出数据: {'有' if execution.output_data else '无'}") print(f" 错误信息: {execution.error_message or '无'}") # 测试不同状态 statuses = ['pending', 'running', 'completed', 'failed'] print(f"\n✅ 支持的状态: {', '.join(statuses)}") return True except Exception as e: print(f"❌ 测试失败: {str(e)}") return False finally: db.close() def test_execution_response_format(): """测试执行响应格式""" print("\n" + "=" * 60) print("测试4: 执行响应格式") print("=" * 60) db = SessionLocal() try: execution = db.query(Execution).order_by(Execution.created_at.desc()).first() if not execution: print("❌ 没有找到执行记录") return False # 检查响应字段 required_fields = [ 'id', 'workflow_id', 'status', 'input_data', 'output_data', 'created_at' ] print("✅ 检查响应字段:") for field in required_fields: has_field = hasattr(execution, field) value = getattr(execution, field, None) status = "✅" if has_field else "❌" print(f" {status} {field}: {type(value).__name__}") # 检查created_at类型 if execution.created_at: print(f"\n✅ created_at类型: {type(execution.created_at).__name__}") print(f" created_at值: {execution.created_at}") return True except Exception as e: print(f"❌ 测试失败: {str(e)}") import traceback traceback.print_exc() return False finally: db.close() async def main(): """主测试函数""" print("\n" + "🚀 开始执行结果展示功能测试" + "\n") results = [] # 运行测试 results.append(test_execution_api()) results.append(await test_workflow_execution()) results.append(test_execution_status_api()) results.append(test_execution_response_format()) # 汇总结果 print("\n" + "=" * 60) print("测试结果汇总") print("=" * 60) passed = sum(results) total = len(results) print(f"通过: {passed}/{total}") print(f"失败: {total - passed}/{total}") if passed == total: print("\n✅ 所有测试通过!执行结果展示功能正常!") else: print(f"\n⚠️ 有 {total - passed} 个测试失败") if __name__ == "__main__": asyncio.run(main())