182 lines
5.8 KiB
Python
182 lines
5.8 KiB
Python
|
|
#!/usr/bin/env python3
|
|||
|
|
"""
|
|||
|
|
E2E: 验证 subworkflow 真执行 + 父子 execution 关联链路。
|
|||
|
|
|
|||
|
|
用法:
|
|||
|
|
cd backend && .\\venv\\Scripts\\python.exe scripts/e2e_subworkflow_chain.py
|
|||
|
|
"""
|
|||
|
|
from __future__ import annotations
|
|||
|
|
|
|||
|
|
import os
|
|||
|
|
import time
|
|||
|
|
from typing import Any, Dict, Optional
|
|||
|
|
|
|||
|
|
import requests
|
|||
|
|
|
|||
|
|
BASE = os.getenv("PLATFORM_BASE_URL", "http://127.0.0.1:8037").rstrip("/")
|
|||
|
|
USER = os.getenv("PLATFORM_USERNAME", "admin")
|
|||
|
|
PWD = os.getenv("PLATFORM_PASSWORD", "123456")
|
|||
|
|
|
|||
|
|
CHILD_NAME = "E2E-Subworkflow-Child"
|
|||
|
|
PARENT_NAME = "E2E-Subworkflow-Parent"
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _login_headers() -> Dict[str, str]:
|
|||
|
|
r = requests.post(
|
|||
|
|
f"{BASE}/api/v1/auth/login",
|
|||
|
|
data={"username": USER, "password": PWD},
|
|||
|
|
headers={"Content-Type": "application/x-www-form-urlencoded"},
|
|||
|
|
timeout=15,
|
|||
|
|
)
|
|||
|
|
r.raise_for_status()
|
|||
|
|
token = r.json().get("access_token")
|
|||
|
|
if not token:
|
|||
|
|
raise RuntimeError("登录成功但缺少 access_token")
|
|||
|
|
return {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _find_agent_id(h: Dict[str, str], name: str) -> Optional[str]:
|
|||
|
|
r = requests.get(f"{BASE}/api/v1/agents", params={"search": name, "limit": 100}, headers=h, timeout=30)
|
|||
|
|
r.raise_for_status()
|
|||
|
|
for a in r.json() or []:
|
|||
|
|
if a.get("name") == name:
|
|||
|
|
return a.get("id")
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _create_or_update_agent(
|
|||
|
|
h: Dict[str, str], name: str, desc: str, wf: Dict[str, Any]
|
|||
|
|
) -> str:
|
|||
|
|
aid = _find_agent_id(h, name)
|
|||
|
|
if aid:
|
|||
|
|
r = requests.put(
|
|||
|
|
f"{BASE}/api/v1/agents/{aid}",
|
|||
|
|
headers=h,
|
|||
|
|
json={"description": desc, "workflow_config": wf},
|
|||
|
|
timeout=30,
|
|||
|
|
)
|
|||
|
|
r.raise_for_status()
|
|||
|
|
return aid
|
|||
|
|
|
|||
|
|
r = requests.post(
|
|||
|
|
f"{BASE}/api/v1/agents",
|
|||
|
|
headers=h,
|
|||
|
|
json={"name": name, "description": desc, "workflow_config": wf},
|
|||
|
|
timeout=30,
|
|||
|
|
)
|
|||
|
|
r.raise_for_status()
|
|||
|
|
return r.json()["id"]
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _child_workflow() -> Dict[str, Any]:
|
|||
|
|
return {
|
|||
|
|
"nodes": [
|
|||
|
|
{"id": "start-1", "type": "start", "data": {"label": "开始"}, "position": {"x": 60, "y": 120}},
|
|||
|
|
{
|
|||
|
|
"id": "code-1",
|
|||
|
|
"type": "code",
|
|||
|
|
"data": {
|
|||
|
|
"label": "子流程代码",
|
|||
|
|
"language": "python",
|
|||
|
|
"code": "q = input_data.get('query', '') if isinstance(input_data, dict) else ''\nresult = {'reply': f'子流程收到: {q}', 'child_ok': True}",
|
|||
|
|
},
|
|||
|
|
"position": {"x": 340, "y": 120},
|
|||
|
|
},
|
|||
|
|
{"id": "end-1", "type": "end", "data": {"label": "结束"}, "position": {"x": 620, "y": 120}},
|
|||
|
|
],
|
|||
|
|
"edges": [
|
|||
|
|
{"id": "e_start_code", "source": "start-1", "target": "code-1", "sourceHandle": "right", "targetHandle": "left"},
|
|||
|
|
{"id": "e_code_end", "source": "code-1", "target": "end-1", "sourceHandle": "right", "targetHandle": "left"},
|
|||
|
|
],
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _parent_workflow(child_agent_id: str) -> Dict[str, Any]:
|
|||
|
|
return {
|
|||
|
|
"nodes": [
|
|||
|
|
{"id": "start-1", "type": "start", "data": {"label": "开始"}, "position": {"x": 60, "y": 120}},
|
|||
|
|
{
|
|||
|
|
"id": "sub-1",
|
|||
|
|
"type": "subworkflow",
|
|||
|
|
"data": {
|
|||
|
|
"label": "调用子Agent",
|
|||
|
|
"agent_id": child_agent_id,
|
|||
|
|
"input_mapping": {"query": "query", "user_id": "user_id"},
|
|||
|
|
"max_subworkflow_depth": 2,
|
|||
|
|
},
|
|||
|
|
"position": {"x": 340, "y": 120},
|
|||
|
|
},
|
|||
|
|
{"id": "end-1", "type": "end", "data": {"label": "结束"}, "position": {"x": 620, "y": 120}},
|
|||
|
|
],
|
|||
|
|
"edges": [
|
|||
|
|
{"id": "e_start_sub", "source": "start-1", "target": "sub-1", "sourceHandle": "right", "targetHandle": "left"},
|
|||
|
|
{"id": "e_sub_end", "source": "sub-1", "target": "end-1", "sourceHandle": "right", "targetHandle": "left"},
|
|||
|
|
],
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _wait_execution(h: Dict[str, str], execution_id: str, timeout_s: float = 120.0) -> Dict[str, Any]:
|
|||
|
|
t0 = time.time()
|
|||
|
|
while time.time() - t0 < timeout_s:
|
|||
|
|
r = requests.get(f"{BASE}/api/v1/executions/{execution_id}", headers=h, timeout=30)
|
|||
|
|
r.raise_for_status()
|
|||
|
|
d = r.json()
|
|||
|
|
st = d.get("status")
|
|||
|
|
if st in ("completed", "failed"):
|
|||
|
|
return d
|
|||
|
|
time.sleep(1.2)
|
|||
|
|
raise TimeoutError(f"执行超时: {execution_id}")
|
|||
|
|
|
|||
|
|
|
|||
|
|
def main() -> int:
|
|||
|
|
h = _login_headers()
|
|||
|
|
|
|||
|
|
child_id = _create_or_update_agent(
|
|||
|
|
h, CHILD_NAME, "E2E 子流程 Agent", _child_workflow()
|
|||
|
|
)
|
|||
|
|
parent_id = _create_or_update_agent(
|
|||
|
|
h, PARENT_NAME, "E2E 父流程 Agent(含 subworkflow)", _parent_workflow(child_id)
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
print(f"child_agent={child_id}")
|
|||
|
|
print(f"parent_agent={parent_id}")
|
|||
|
|
|
|||
|
|
create = requests.post(
|
|||
|
|
f"{BASE}/api/v1/executions",
|
|||
|
|
headers=h,
|
|||
|
|
json={
|
|||
|
|
"agent_id": parent_id,
|
|||
|
|
"input_data": {"query": "你好", "user_id": "e2e_sub_user"},
|
|||
|
|
},
|
|||
|
|
timeout=30,
|
|||
|
|
)
|
|||
|
|
create.raise_for_status()
|
|||
|
|
execution_id = create.json()["id"]
|
|||
|
|
print(f"execution={execution_id}")
|
|||
|
|
|
|||
|
|
done = _wait_execution(h, execution_id)
|
|||
|
|
print(f"status={done.get('status')}")
|
|||
|
|
if done.get("status") != "completed":
|
|||
|
|
print(f"error={done.get('error_message')}")
|
|||
|
|
return 2
|
|||
|
|
|
|||
|
|
chain = requests.get(
|
|||
|
|
f"{BASE}/api/v1/execution-logs/executions/{execution_id}/chain",
|
|||
|
|
headers=h,
|
|||
|
|
timeout=30,
|
|||
|
|
)
|
|||
|
|
chain.raise_for_status()
|
|||
|
|
tree = chain.json()
|
|||
|
|
child_count = len(tree.get("children") or [])
|
|||
|
|
print(f"child_executions={child_count}")
|
|||
|
|
if child_count < 1:
|
|||
|
|
print("未发现子执行记录,校验失败")
|
|||
|
|
return 3
|
|||
|
|
|
|||
|
|
print("E2E 通过")
|
|||
|
|
return 0
|
|||
|
|
|
|||
|
|
|
|||
|
|
if __name__ == "__main__":
|
|||
|
|
raise SystemExit(main())
|