Files
aiagent/backend/scripts/e2e_subworkflow_chain.py
renjianbo 0608161c82 feat: 完善企业场景多线路由与执行稳定性
补齐平台模板与场景 DSL、预算控制、执行看板和企业场景脚本,增强 Windows 启动/迁移与前端代理和聊天会话记忆,修复执行创建阶段 500 与异步链路排障体验。

Made-with: Cursor
2026-04-09 21:58:53 +08:00

182 lines
5.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
E2E: 验证 subworkflow 真执行 + 父子 execution 关联链路。
用法:
cd backend && .\\venv\\Scripts\\python.exe scripts/e2e_subworkflow_chain.py
"""
from __future__ import annotations
import os
import time
from typing import Any, Dict, Optional
import requests
BASE = os.getenv("PLATFORM_BASE_URL", "http://127.0.0.1:8037").rstrip("/")
USER = os.getenv("PLATFORM_USERNAME", "admin")
PWD = os.getenv("PLATFORM_PASSWORD", "123456")
CHILD_NAME = "E2E-Subworkflow-Child"
PARENT_NAME = "E2E-Subworkflow-Parent"
def _login_headers() -> Dict[str, str]:
r = requests.post(
f"{BASE}/api/v1/auth/login",
data={"username": USER, "password": PWD},
headers={"Content-Type": "application/x-www-form-urlencoded"},
timeout=15,
)
r.raise_for_status()
token = r.json().get("access_token")
if not token:
raise RuntimeError("登录成功但缺少 access_token")
return {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
def _find_agent_id(h: Dict[str, str], name: str) -> Optional[str]:
r = requests.get(f"{BASE}/api/v1/agents", params={"search": name, "limit": 100}, headers=h, timeout=30)
r.raise_for_status()
for a in r.json() or []:
if a.get("name") == name:
return a.get("id")
return None
def _create_or_update_agent(
h: Dict[str, str], name: str, desc: str, wf: Dict[str, Any]
) -> str:
aid = _find_agent_id(h, name)
if aid:
r = requests.put(
f"{BASE}/api/v1/agents/{aid}",
headers=h,
json={"description": desc, "workflow_config": wf},
timeout=30,
)
r.raise_for_status()
return aid
r = requests.post(
f"{BASE}/api/v1/agents",
headers=h,
json={"name": name, "description": desc, "workflow_config": wf},
timeout=30,
)
r.raise_for_status()
return r.json()["id"]
def _child_workflow() -> Dict[str, Any]:
return {
"nodes": [
{"id": "start-1", "type": "start", "data": {"label": "开始"}, "position": {"x": 60, "y": 120}},
{
"id": "code-1",
"type": "code",
"data": {
"label": "子流程代码",
"language": "python",
"code": "q = input_data.get('query', '') if isinstance(input_data, dict) else ''\nresult = {'reply': f'子流程收到: {q}', 'child_ok': True}",
},
"position": {"x": 340, "y": 120},
},
{"id": "end-1", "type": "end", "data": {"label": "结束"}, "position": {"x": 620, "y": 120}},
],
"edges": [
{"id": "e_start_code", "source": "start-1", "target": "code-1", "sourceHandle": "right", "targetHandle": "left"},
{"id": "e_code_end", "source": "code-1", "target": "end-1", "sourceHandle": "right", "targetHandle": "left"},
],
}
def _parent_workflow(child_agent_id: str) -> Dict[str, Any]:
return {
"nodes": [
{"id": "start-1", "type": "start", "data": {"label": "开始"}, "position": {"x": 60, "y": 120}},
{
"id": "sub-1",
"type": "subworkflow",
"data": {
"label": "调用子Agent",
"agent_id": child_agent_id,
"input_mapping": {"query": "query", "user_id": "user_id"},
"max_subworkflow_depth": 2,
},
"position": {"x": 340, "y": 120},
},
{"id": "end-1", "type": "end", "data": {"label": "结束"}, "position": {"x": 620, "y": 120}},
],
"edges": [
{"id": "e_start_sub", "source": "start-1", "target": "sub-1", "sourceHandle": "right", "targetHandle": "left"},
{"id": "e_sub_end", "source": "sub-1", "target": "end-1", "sourceHandle": "right", "targetHandle": "left"},
],
}
def _wait_execution(h: Dict[str, str], execution_id: str, timeout_s: float = 120.0) -> Dict[str, Any]:
t0 = time.time()
while time.time() - t0 < timeout_s:
r = requests.get(f"{BASE}/api/v1/executions/{execution_id}", headers=h, timeout=30)
r.raise_for_status()
d = r.json()
st = d.get("status")
if st in ("completed", "failed"):
return d
time.sleep(1.2)
raise TimeoutError(f"执行超时: {execution_id}")
def main() -> int:
h = _login_headers()
child_id = _create_or_update_agent(
h, CHILD_NAME, "E2E 子流程 Agent", _child_workflow()
)
parent_id = _create_or_update_agent(
h, PARENT_NAME, "E2E 父流程 Agent含 subworkflow", _parent_workflow(child_id)
)
print(f"child_agent={child_id}")
print(f"parent_agent={parent_id}")
create = requests.post(
f"{BASE}/api/v1/executions",
headers=h,
json={
"agent_id": parent_id,
"input_data": {"query": "你好", "user_id": "e2e_sub_user"},
},
timeout=30,
)
create.raise_for_status()
execution_id = create.json()["id"]
print(f"execution={execution_id}")
done = _wait_execution(h, execution_id)
print(f"status={done.get('status')}")
if done.get("status") != "completed":
print(f"error={done.get('error_message')}")
return 2
chain = requests.get(
f"{BASE}/api/v1/execution-logs/executions/{execution_id}/chain",
headers=h,
timeout=30,
)
chain.raise_for_status()
tree = chain.json()
child_count = len(tree.get("children") or [])
print(f"child_executions={child_count}")
if child_count < 1:
print("未发现子执行记录,校验失败")
return 3
print("E2E 通过")
return 0
if __name__ == "__main__":
raise SystemExit(main())