""" 工作流执行引擎测试 """ import pytest from app.services.workflow_engine import WorkflowEngine @pytest.mark.unit @pytest.mark.workflow class TestWorkflowEngine: """工作流引擎测试""" def test_build_execution_graph(self): """测试构建执行图""" workflow_data = { "nodes": [ {"id": "start-1", "type": "start"}, {"id": "llm-1", "type": "llm"}, {"id": "end-1", "type": "end"} ], "edges": [ {"id": "e1", "source": "start-1", "target": "llm-1"}, {"id": "e2", "source": "llm-1", "target": "end-1"} ] } engine = WorkflowEngine("test-workflow", workflow_data) execution_order = engine.build_execution_graph() assert "start-1" in execution_order assert "llm-1" in execution_order assert "end-1" in execution_order assert execution_order.index("start-1") < execution_order.index("llm-1") assert execution_order.index("llm-1") < execution_order.index("end-1") def test_get_node_input(self): """测试获取节点输入""" workflow_data = { "nodes": [ {"id": "start-1", "type": "start"}, {"id": "llm-1", "type": "llm"} ], "edges": [ {"id": "e1", "source": "start-1", "target": "llm-1"} ] } engine = WorkflowEngine("test-workflow", workflow_data) engine.node_outputs = { "start-1": {"input": "test data"} } input_data = engine.get_node_input("llm-1", engine.node_outputs) assert "input" in input_data assert input_data["input"] == "test data" @pytest.mark.asyncio async def test_execute_start_node(self): """测试执行开始节点""" workflow_data = { "nodes": [ {"id": "start-1", "type": "start", "data": {"label": "开始"}} ], "edges": [] } engine = WorkflowEngine("test-workflow", workflow_data) node = workflow_data["nodes"][0] input_data = {"test": "data"} result = await engine.execute_node(node, input_data) assert result["status"] == "success" assert result["output"] == input_data @pytest.mark.asyncio async def test_execute_end_node(self): """测试执行结束节点""" workflow_data = { "nodes": [ {"id": "end-1", "type": "end", "data": {"label": "结束"}} ], "edges": [] } engine = WorkflowEngine("test-workflow", workflow_data) node = workflow_data["nodes"][0] input_data = {"result": "final output"} result = await engine.execute_node(node, input_data) assert result["status"] == "success" assert result["output"] == input_data @pytest.mark.asyncio async def test_execute_condition_node(self): """测试执行条件节点""" workflow_data = { "nodes": [ { "id": "condition-1", "type": "condition", "data": { "label": "条件判断", "expression": "{value} > 10" } } ], "edges": [] } engine = WorkflowEngine("test-workflow", workflow_data) node = workflow_data["nodes"][0] # 测试条件为真 input_data = {"value": 15} result = await engine.execute_node(node, input_data) assert result["status"] == "success" assert result["branch"] == "true" # 测试条件为假 input_data = {"value": 5} result = await engine.execute_node(node, input_data) assert result["status"] == "success" assert result["branch"] == "false" @pytest.mark.asyncio async def test_execute_transform_node(self): """测试执行转换节点""" workflow_data = { "nodes": [ { "id": "transform-1", "type": "transform", "data": { "label": "数据转换", "mode": "mapping", "mapping": { "new_field": "{old_field}" } } } ], "edges": [] } engine = WorkflowEngine("test-workflow", workflow_data) node = workflow_data["nodes"][0] input_data = {"old_field": "test value"} result = await engine.execute_node(node, input_data) assert result["status"] == "success" assert "new_field" in result["output"] assert result["output"]["new_field"] == "test value"