import pytest from app.services.workflow_engine import WorkflowEngine def _make_engine_with_node(node): """构造仅含单节点的工作流引擎""" wf_data = {"nodes": [node], "edges": []} return WorkflowEngine(workflow_id="wf_test", workflow_data=wf_data) @pytest.mark.asyncio async def test_subworkflow_mapping(): node = { "id": "sub-1", "type": "subworkflow", "data": { "workflow_id": "child_wf", "input_mapping": {"mapped": "source"}, }, } engine = _make_engine_with_node(node) result = await engine.execute_node(node, {"source": 123, "other": 1}) assert result["status"] == "success" assert result["output"]["workflow_id"] == "child_wf" assert result["output"]["input"]["mapped"] == 123 @pytest.mark.asyncio async def test_code_python_success(): node = { "id": "code-1", "type": "code", "data": { "language": "python", "code": "result = input_data['x'] * 2", }, } engine = _make_engine_with_node(node) result = await engine.execute_node(node, {"x": 3}) assert result["status"] == "success" assert result["output"] == 6 @pytest.mark.asyncio async def test_code_unsupported_language(): node = { "id": "code-2", "type": "code", "data": {"language": "go", "code": "result = 1"}, } engine = _make_engine_with_node(node) result = await engine.execute_node(node, {}) assert result["status"] == "success" assert "不支持的语言" in result["output"]["error"] @pytest.mark.asyncio async def test_oauth_mock_token(): node = { "id": "oauth-1", "type": "oauth", "data": {"provider": "google", "client_id": "id", "client_secret": "sec"}, } engine = _make_engine_with_node(node) result = await engine.execute_node(node, {}) assert result["status"] == "success" token = result["output"] assert token["access_token"].startswith("mock_access_token_google") assert token["token_type"] == "Bearer" @pytest.mark.asyncio async def test_validator_reject_and_continue(): # reject 分支 -> failed node_reject = { "id": "val-1", "type": "validator", "data": {"schema": {"type": "object"}, "on_error": "reject"}, } engine = _make_engine_with_node(node_reject) res_reject = await engine.execute_node(node_reject, "bad_type") assert res_reject["status"] == "failed" # continue 分支 -> success 且 warning node_continue = { "id": "val-2", "type": "validator", "data": {"schema": {"type": "object"}, "on_error": "continue"}, } engine = _make_engine_with_node(node_continue) res_continue = await engine.execute_node(node_continue, "bad_type") assert res_continue["status"] == "success" assert "warning" in res_continue @pytest.mark.asyncio async def test_batch_split_group_aggregate(): data = list(range(5)) # split node_split = { "id": "batch-1", "type": "batch", "data": {"batch_size": 2, "mode": "split"}, } engine = _make_engine_with_node(node_split) res_split = await engine.execute_node(node_split, data) assert res_split["status"] == "success" assert res_split["output"][0] == [0, 1] assert res_split["output"][1] == [2, 3] assert res_split["output"][2] == [4] # group(同 split 逻辑) node_group = { "id": "batch-2", "type": "batch", "data": {"batch_size": 3, "mode": "group"}, } engine = _make_engine_with_node(node_group) res_group = await engine.execute_node(node_group, data) assert res_group["status"] == "success" assert res_group["output"][0] == [0, 1, 2] assert res_group["output"][1] == [3, 4] # aggregate node_agg = { "id": "batch-3", "type": "batch", "data": {"mode": "aggregate"}, } engine = _make_engine_with_node(node_agg) res_agg = await engine.execute_node(node_agg, data) assert res_agg["status"] == "success" assert res_agg["output"]["count"] == 5 assert res_agg["output"]["samples"][:2] == [0, 1]