#!/usr/bin/env python3 """ E2E: 验证 subworkflow 真执行 + 父子 execution 关联链路。 用法: cd backend && .\\venv\\Scripts\\python.exe scripts/e2e_subworkflow_chain.py """ from __future__ import annotations import os import time from typing import Any, Dict, Optional import requests BASE = os.getenv("PLATFORM_BASE_URL", "http://127.0.0.1:8037").rstrip("/") USER = os.getenv("PLATFORM_USERNAME", "admin") PWD = os.getenv("PLATFORM_PASSWORD", "123456") CHILD_NAME = "E2E-Subworkflow-Child" PARENT_NAME = "E2E-Subworkflow-Parent" def _login_headers() -> Dict[str, str]: r = requests.post( f"{BASE}/api/v1/auth/login", data={"username": USER, "password": PWD}, headers={"Content-Type": "application/x-www-form-urlencoded"}, timeout=15, ) r.raise_for_status() token = r.json().get("access_token") if not token: raise RuntimeError("登录成功但缺少 access_token") return {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} def _find_agent_id(h: Dict[str, str], name: str) -> Optional[str]: r = requests.get(f"{BASE}/api/v1/agents", params={"search": name, "limit": 100}, headers=h, timeout=30) r.raise_for_status() for a in r.json() or []: if a.get("name") == name: return a.get("id") return None def _create_or_update_agent( h: Dict[str, str], name: str, desc: str, wf: Dict[str, Any] ) -> str: aid = _find_agent_id(h, name) if aid: r = requests.put( f"{BASE}/api/v1/agents/{aid}", headers=h, json={"description": desc, "workflow_config": wf}, timeout=30, ) r.raise_for_status() return aid r = requests.post( f"{BASE}/api/v1/agents", headers=h, json={"name": name, "description": desc, "workflow_config": wf}, timeout=30, ) r.raise_for_status() return r.json()["id"] def _child_workflow() -> Dict[str, Any]: return { "nodes": [ {"id": "start-1", "type": "start", "data": {"label": "开始"}, "position": {"x": 60, "y": 120}}, { "id": "code-1", "type": "code", "data": { "label": "子流程代码", "language": "python", "code": "q = input_data.get('query', '') if isinstance(input_data, dict) else ''\nresult = {'reply': f'子流程收到: {q}', 'child_ok': True}", }, "position": {"x": 340, "y": 120}, }, {"id": "end-1", "type": "end", "data": {"label": "结束"}, "position": {"x": 620, "y": 120}}, ], "edges": [ {"id": "e_start_code", "source": "start-1", "target": "code-1", "sourceHandle": "right", "targetHandle": "left"}, {"id": "e_code_end", "source": "code-1", "target": "end-1", "sourceHandle": "right", "targetHandle": "left"}, ], } def _parent_workflow(child_agent_id: str) -> Dict[str, Any]: return { "nodes": [ {"id": "start-1", "type": "start", "data": {"label": "开始"}, "position": {"x": 60, "y": 120}}, { "id": "sub-1", "type": "subworkflow", "data": { "label": "调用子Agent", "agent_id": child_agent_id, "input_mapping": {"query": "query", "user_id": "user_id"}, "max_subworkflow_depth": 2, }, "position": {"x": 340, "y": 120}, }, {"id": "end-1", "type": "end", "data": {"label": "结束"}, "position": {"x": 620, "y": 120}}, ], "edges": [ {"id": "e_start_sub", "source": "start-1", "target": "sub-1", "sourceHandle": "right", "targetHandle": "left"}, {"id": "e_sub_end", "source": "sub-1", "target": "end-1", "sourceHandle": "right", "targetHandle": "left"}, ], } def _wait_execution(h: Dict[str, str], execution_id: str, timeout_s: float = 120.0) -> Dict[str, Any]: t0 = time.time() while time.time() - t0 < timeout_s: r = requests.get(f"{BASE}/api/v1/executions/{execution_id}", headers=h, timeout=30) r.raise_for_status() d = r.json() st = d.get("status") if st in ("completed", "failed"): return d time.sleep(1.2) raise TimeoutError(f"执行超时: {execution_id}") def main() -> int: h = _login_headers() child_id = _create_or_update_agent( h, CHILD_NAME, "E2E 子流程 Agent", _child_workflow() ) parent_id = _create_or_update_agent( h, PARENT_NAME, "E2E 父流程 Agent(含 subworkflow)", _parent_workflow(child_id) ) print(f"child_agent={child_id}") print(f"parent_agent={parent_id}") create = requests.post( f"{BASE}/api/v1/executions", headers=h, json={ "agent_id": parent_id, "input_data": {"query": "你好", "user_id": "e2e_sub_user"}, }, timeout=30, ) create.raise_for_status() execution_id = create.json()["id"] print(f"execution={execution_id}") done = _wait_execution(h, execution_id) print(f"status={done.get('status')}") if done.get("status") != "completed": print(f"error={done.get('error_message')}") return 2 chain = requests.get( f"{BASE}/api/v1/execution-logs/executions/{execution_id}/chain", headers=h, timeout=30, ) chain.raise_for_status() tree = chain.json() child_count = len(tree.get("children") or []) print(f"child_executions={child_count}") if child_count < 1: print("未发现子执行记录,校验失败") return 3 print("E2E 通过") return 0 if __name__ == "__main__": raise SystemExit(main())