""" 简化的执行功能测试 - 使用DeepSeek """ import asyncio import sys import os import json sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from app.core.database import SessionLocal from app.models.workflow import Workflow from app.models.execution import Execution from app.services.workflow_engine import WorkflowEngine async def test_simple_execution(): """测试简单执行流程""" print("=" * 60) print("执行功能测试") print("=" * 60) db = SessionLocal() try: # 获取工作流 workflow = db.query(Workflow).first() if not workflow: print("❌ 没有找到工作流") return False print(f"✅ 找到工作流: {workflow.id}") print(f" 名称: {workflow.name}") print(f" 节点数: {len(workflow.nodes)}") print(f" 边数: {len(workflow.edges)}") # 创建一个简单的测试工作流数据(使用DeepSeek) test_workflow_data = { "nodes": [ { "id": "start-1", "type": "start", "data": {"label": "开始"} }, { "id": "llm-1", "type": "llm", "data": { "label": "DeepSeek节点", "prompt": "请用一句话总结:{input}", "provider": "deepseek", "model": "deepseek-chat", "temperature": 0.7 } }, { "id": "end-1", "type": "end", "data": {"label": "结束"} } ], "edges": [ {"id": "e1", "source": "start-1", "target": "llm-1"}, {"id": "e2", "source": "llm-1", "target": "end-1"} ] } # 创建执行记录 execution = Execution( workflow_id=str(workflow.id), input_data={"input": "人工智能是计算机科学的一个分支"}, status="pending" ) db.add(execution) db.commit() db.refresh(execution) print(f"\n✅ 创建执行记录: {execution.id}") print(f" 输入数据: {execution.input_data}") # 执行工作流 print("\n🔄 开始执行工作流(使用DeepSeek)...") engine = WorkflowEngine(str(workflow.id), test_workflow_data) result = await engine.execute(execution.input_data) # 更新执行记录 execution.status = "completed" execution.output_data = result execution.execution_time = 1000 # 模拟执行时间 db.commit() db.refresh(execution) print(f"\n✅ 执行完成") print(f" 状态: {execution.status}") print(f" 执行时间: {execution.execution_time}ms") print(f" 结果: {json.dumps(result, ensure_ascii=False, indent=2)}") # 测试获取执行列表 executions = db.query(Execution).filter( Execution.workflow_id == workflow.id ).order_by(Execution.created_at.desc()).limit(5).all() print(f"\n✅ 执行记录列表: 共 {len(executions)} 条") for i, exec in enumerate(executions[:3], 1): print(f" {i}. ID: {exec.id[:8]}..., 状态: {exec.status}") return True except Exception as e: print(f"\n❌ 测试失败: {str(e)}") import traceback traceback.print_exc() return False finally: db.close() def test_execution_model(): """测试执行模型""" print("\n" + "=" * 60) print("执行模型测试") print("=" * 60) db = SessionLocal() try: execution = db.query(Execution).order_by(Execution.created_at.desc()).first() if not execution: print("❌ 没有找到执行记录") return False print(f"✅ 执行记录模型测试") print(f" ID: {execution.id}") print(f" 工作流ID: {execution.workflow_id}") print(f" 状态: {execution.status}") print(f" 输入数据: {execution.input_data}") print(f" 输出数据: {execution.output_data or '暂无'}") print(f" 执行时间: {execution.execution_time or 'N/A'}ms") print(f" 创建时间: {execution.created_at}") print(f" 创建时间类型: {type(execution.created_at).__name__}") # 验证字段 assert hasattr(execution, 'id'), "缺少id字段" assert hasattr(execution, 'status'), "缺少status字段" assert hasattr(execution, 'created_at'), "缺少created_at字段" assert isinstance(execution.created_at, type(execution.created_at)), "created_at类型错误" print("\n✅ 所有字段验证通过") return True except Exception as e: print(f"❌ 测试失败: {str(e)}") return False finally: db.close() async def main(): """主测试函数""" print("\n🚀 开始执行功能测试\n") results = [] results.append(await test_simple_execution()) results.append(test_execution_model()) print("\n" + "=" * 60) print("测试结果汇总") print("=" * 60) passed = sum(results) total = len(results) print(f"通过: {passed}/{total}") print(f"失败: {total - passed}/{total}") if passed == total: print("\n✅ 所有测试通过!执行功能正常!") else: print(f"\n⚠️ 有 {total - passed} 个测试失败") if __name__ == "__main__": asyncio.run(main())