Files
aiagent/backend/scripts/e2e_enterprise_multilane_agent.py

240 lines
7.9 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""
测试企业场景_多线路由Agent
- 查找同名 Agent POST /api/v1/executions多条 query默认线 / 客服/研发
- 轮询 GET /api/v1/executions/{id} 直到终态 Celery Worker + LLM否则会 pending/失败
用法:
cd backend && .\\venv\\Scripts\\python.exe scripts/e2e_enterprise_multilane_agent.py
环境变量:
PLATFORM_BASE_URL 默认 http://127.0.0.1:8037
PLATFORM_USERNAME / PLATFORM_PASSWORD
AGENT_NAME 默认 企业场景_多线路由
USE_TESTCLIENT=1 不经 TCP仅验证创建执行入队轮询仍走 TestClient
POLL_TIMEOUT_S 默认 180
SKIP_POLL=1 只创建执行不等待终态
"""
from __future__ import annotations
import json
import os
import sys
import time
from typing import Any, Dict, List, Optional, Tuple
AGENT_NAME = os.getenv("AGENT_NAME", "企业场景_多线路由").strip() or "企业场景_多线路由"
POLL_TIMEOUT_S = float(os.getenv("POLL_TIMEOUT_S", "180"))
SKIP_POLL = os.getenv("SKIP_POLL", "").strip().lower() in ("1", "true", "yes")
# 与 create_enterprise_scenario_agents.py 中 Code 节点约定一致
CASES: List[Tuple[str, str]] = [
("default", "你好,请用一句话说明你能做什么。"),
("cs", "【客服】订单物流一般几天能到?"),
("dev", "[[研发]]写一个 Python 读取 JSON 文件的最小示例思路。"),
]
def _login_requests(base: str) -> Dict[str, str]:
import requests
r = requests.post(
f"{base}/api/v1/auth/login",
data={
"username": os.getenv("PLATFORM_USERNAME", "admin"),
"password": os.getenv("PLATFORM_PASSWORD", "123456"),
},
headers={"Content-Type": "application/x-www-form-urlencoded"},
timeout=20,
)
r.raise_for_status()
token = r.json().get("access_token")
if not token:
raise RuntimeError("无 access_token")
return {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
def _find_agent_id_requests(base: str, h: Dict[str, str]) -> Optional[str]:
import requests
r = requests.get(
f"{base}/api/v1/agents",
params={"search": AGENT_NAME, "limit": 50},
headers=h,
timeout=45,
)
r.raise_for_status()
for a in r.json() or []:
if isinstance(a, dict) and a.get("name") == AGENT_NAME:
return a.get("id")
return None
def _post_execution_requests(base: str, h: Dict[str, str], agent_id: str, query: str) -> Dict[str, Any]:
import requests
r = requests.post(
f"{base}/api/v1/executions",
headers=h,
json={
"agent_id": agent_id,
"input_data": {"query": query, "USER_INPUT": query},
},
timeout=60,
)
if r.status_code not in (200, 201):
raise RuntimeError(f"创建执行失败 {r.status_code}: {r.text[:1200]}")
return r.json()
def _get_execution_requests(base: str, h: Dict[str, str], eid: str) -> Dict[str, Any]:
import requests
r = requests.get(f"{base}/api/v1/executions/{eid}", headers=h, timeout=45)
r.raise_for_status()
return r.json()
def _poll_requests(base: str, h: Dict[str, str], eid: str) -> Dict[str, Any]:
t0 = time.time()
while time.time() - t0 < POLL_TIMEOUT_S:
d = _get_execution_requests(base, h, eid)
st = d.get("status")
if st in ("completed", "failed", "awaiting_approval"):
return d
time.sleep(1.2)
raise TimeoutError(f"执行 {eid}{POLL_TIMEOUT_S}s 内未结束")
def _run_http() -> int:
import requests
base = os.getenv("PLATFORM_BASE_URL", "http://127.0.0.1:8037").rstrip("/")
try:
hr = requests.get(f"{base}/health", timeout=8)
print("GET /health", hr.status_code, hr.text[:300])
except Exception as e:
print("GET /health 失败:", e, file=sys.stderr)
try:
h = _login_requests(base)
except Exception as e:
print("登录失败:", e, file=sys.stderr)
return 1
aid = _find_agent_id_requests(base, h)
if not aid:
print(f"未找到 Agent: {AGENT_NAME}(请先运行 create_enterprise_scenario_agents.py", file=sys.stderr)
return 2
print(f"Agent: {AGENT_NAME} id={aid}")
results: List[Dict[str, Any]] = []
for tag, query in CASES:
print(f"\n--- 用例 [{tag}] query 前缀: {query[:40]!r} ...")
try:
ex = _post_execution_requests(base, h, aid, query)
except Exception as e:
print("创建执行失败:", e, file=sys.stderr)
return 3
eid = ex.get("id")
print(f" execution_id={eid} status={ex.get('status')} task_id={ex.get('task_id')}")
if SKIP_POLL:
results.append({"tag": tag, "execution_id": eid, "skipped_poll": True})
continue
try:
final = _poll_requests(base, h, str(eid))
except TimeoutError as te:
print(" ", te, file=sys.stderr)
results.append(
{
"tag": tag,
"execution_id": eid,
"error": str(te),
"last_status": ex.get("status"),
}
)
continue
err = final.get("error_message")
print(f" 终态: {final.get('status')} error_message={err!r}")
out = final.get("output_data")
if isinstance(out, dict):
preview = json.dumps(out, ensure_ascii=False)[:600]
else:
preview = str(out)[:600] if out else ""
print(f" output_data 预览: {preview}")
results.append(
{
"tag": tag,
"execution_id": eid,
"status": final.get("status"),
"error_message": err,
"has_output": bool(out),
}
)
print("\n汇总:", json.dumps(results, ensure_ascii=False, indent=2))
failed = [x for x in results if x.get("status") == "failed" or x.get("error")]
return 0 if not failed else 4
def _run_testclient() -> int:
from fastapi.testclient import TestClient
from app.main import app
c = TestClient(app)
r = c.post(
"/api/v1/auth/login",
data={
"username": os.getenv("PLATFORM_USERNAME", "admin"),
"password": os.getenv("PLATFORM_PASSWORD", "123456"),
},
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
if r.status_code != 200:
print("login:", r.status_code, r.text[:400], file=sys.stderr)
return 1
token = r.json().get("access_token")
h = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
gr = c.get("/api/v1/agents", params={"search": AGENT_NAME, "limit": 50}, headers=h)
aid = None
if gr.status_code == 200:
for a in gr.json() or []:
if isinstance(a, dict) and a.get("name") == AGENT_NAME:
aid = a.get("id")
break
if not aid:
print(f"未找到 Agent: {AGENT_NAME}", file=sys.stderr)
return 2
print(f"[TestClient] Agent: {AGENT_NAME} id={aid}")
for tag, query in CASES:
pr = c.post(
"/api/v1/executions",
headers=h,
json={"agent_id": aid, "input_data": {"query": query, "USER_INPUT": query}},
)
if pr.status_code not in (200, 201):
print(f"[{tag}] POST executions {pr.status_code}", pr.text[:800], file=sys.stderr)
return 3
body = pr.json()
print(f"[{tag}] created execution_id={body.get('id')} status={body.get('status')}")
print("TestClient 模式仅验证「创建执行」成功;完整跑图请在 USE_TESTCLIENT=0 且 Celery+LLM 可用时测试。")
return 0
def main() -> int:
if os.getenv("USE_TESTCLIENT", "").strip().lower() in ("1", "true", "yes"):
return _run_testclient()
return _run_http()
if __name__ == "__main__":
raise SystemExit(main())