Files
aiagent/backend/scripts/e2e_platform_capability_smoke.py
renjianbo 0608161c82 feat: 完善企业场景多线路由与执行稳定性
补齐平台模板与场景 DSL、预算控制、执行看板和企业场景脚本,增强 Windows 启动/迁移与前端代理和聊天会话记忆,修复执行创建阶段 500 与异步链路排障体验。

Made-with: Cursor
2026-04-09 21:58:53 +08:00

185 lines
6.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
常用场景验证:模板化客服 Agent + 统一 DSL 执行 + 执行链可读
验证点与《90 天路线图》对齐):
- GET /api/v1/agents/scene-templates 场景模板可列举
- POST /api/v1/agents/from-scene-template 一键生成可执行 Agent
- POST /api/v1/executions 携带 scenario_dsl 的执行(引擎入口校验 + _scenario
- GET /api/v1/executions/{id} 终态
- GET /api/v1/execution-logs/executions/{id}/chain/summary 链路汇总(根执行至少 1 条)
前置: API 已启动;已执行 alembicCelery Worker 已启动(否则执行会长期 pending
用法:
cd backend && .\\venv\\Scripts\\python.exe scripts/e2e_platform_capability_smoke.py
环境变量: PLATFORM_BASE_URL, PLATFORM_USERNAME, PLATFORM_PASSWORD
"""
from __future__ import annotations
import os
import sys
import time
from typing import Any, Dict
import requests
BASE = os.getenv("PLATFORM_BASE_URL", "http://127.0.0.1:8037").rstrip("/")
USER = os.getenv("PLATFORM_USERNAME", "admin")
PWD = os.getenv("PLATFORM_PASSWORD", "123456")
def _h() -> Dict[str, str]:
r = requests.post(
f"{BASE}/api/v1/auth/login",
data={"username": USER, "password": PWD},
headers={"Content-Type": "application/x-www-form-urlencoded"},
timeout=15,
)
r.raise_for_status()
token = r.json().get("access_token")
if not token:
raise RuntimeError("无 access_token")
return {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
def _wait_terminal(h: Dict[str, str], eid: str, timeout_s: float = 120.0) -> Dict[str, Any]:
t0 = time.time()
while time.time() - t0 < timeout_s:
r = requests.get(f"{BASE}/api/v1/executions/{eid}", headers=h, timeout=30)
r.raise_for_status()
d = r.json()
st = d.get("status")
if st in ("completed", "failed", "awaiting_approval"):
return d
time.sleep(1.0)
raise TimeoutError(f"执行未在 {timeout_s}s 内结束: {eid}")
def main() -> int:
# 0) 健康检查
try:
hr = requests.get(f"{BASE}/health", timeout=5)
print("health:", hr.status_code, hr.text[:200])
except Exception as e:
print("health 请求失败(可忽略):", e, file=sys.stderr)
try:
h = _h()
except Exception as e:
print("登录失败:", e, file=sys.stderr)
return 1
# 1) 场景模板列表
tr = requests.get(f"{BASE}/api/v1/platform/scene-templates", headers=h, timeout=30)
if tr.status_code != 200:
print("scene-templates:", tr.status_code, tr.text[:800], file=sys.stderr)
return 2
templates = tr.json() or []
print(f"scene-templates: {len(templates)}")
tid = "template_customer_service"
ids = [x.get("id") for x in templates if isinstance(x, dict)]
if tid not in ids:
print(f"警告: 未找到 {tid},使用列表第一项", file=sys.stderr)
tid = ids[0] if ids else None
if not tid:
print("无可用模板", file=sys.stderr)
return 3
# 2) 从模板创建 Agent带轻量预算验证 DB + 合并逻辑;可按需改大)
name = os.getenv("SMOKE_AGENT_NAME") or f"冒烟_客服DSL_{int(time.time())}"
cr = requests.post(
f"{BASE}/api/v1/platform/agents/from-template",
headers=h,
json={
"template_id": tid,
"name": name,
"description": "e2e_platform_capability_smoke 自动创建",
"parameters": {"temperature": 0.35},
"budget_config": {
"max_steps": 50,
"max_llm_invocations": 5,
"max_tool_calls": 20,
},
},
timeout=60,
)
if cr.status_code not in (200, 201):
print("from-scene-template:", cr.status_code, cr.text[:1200], file=sys.stderr)
return 4
agent = cr.json()
aid = agent.get("id")
print("created agent:", aid, agent.get("name"))
# 3) 携带 scenario_dsl 的执行(统一输入契约)
user_msg = os.getenv("SMOKE_USER_QUERY") or "你好,请用一句话说明你能帮我做什么。"
body = {
"agent_id": aid,
"input_data": {
"query": user_msg,
"scenario_dsl": {
"version": "1",
"scene": "customer_service_smoke",
"goal": "验证平台 DSL + 模板 Agent 执行",
"constraints": ["回答简短", "不要编造联系方式"],
"deliverables": ["一句中文回复"],
"acceptance": ["有明确语义"],
"payload": {"channel": "smoke", "user_id": "e2e_smoke_user"},
},
},
}
er = requests.post(f"{BASE}/api/v1/executions", headers=h, json=body, timeout=30)
if er.status_code not in (200, 201):
print("executions:", er.status_code, er.text[:1200], file=sys.stderr)
return 5
eid = er.json()["id"]
print("execution:", eid, "status=pending/running等待 Celery …")
try:
fin = _wait_terminal(h, eid, timeout_s=float(os.getenv("SMOKE_WAIT_SEC", "120")))
except TimeoutError as te:
print(str(te), file=sys.stderr)
print("若长期 running请确认 Celery Worker 已启动且 REDIS 可用。", file=sys.stderr)
return 6
st = fin.get("status")
print("final status:", st)
if st == "failed":
print("error_message:", fin.get("error_message"), file=sys.stderr)
return 7
if st == "awaiting_approval":
print("停在审批节点(本冒烟未配置 approval请检查工作流。", file=sys.stderr)
return 8
out = fin.get("output_data")
preview = str(out)[:500] if out is not None else ""
print("output_data preview:", preview)
# 4) 执行链汇总(单节点链也应返回 total_executions>=1
sr = requests.get(
f"{BASE}/api/v1/execution-logs/executions/{eid}/chain/summary",
headers=h,
timeout=30,
)
if sr.status_code == 200:
s = sr.json()
print(
"chain/summary:",
"total_executions=",
s.get("total_executions"),
"status_count=",
s.get("status_count"),
"max_depth=",
s.get("max_depth"),
)
else:
print("chain/summary:", sr.status_code, sr.text[:400], file=sys.stderr)
print("OK — 常用场景(模板 + DSL + 执行 + 链)验证完成。")
return 0
if __name__ == "__main__":
raise SystemExit(main())