Files
aiagent/backend/scripts/create_router_invoke_demo_agent.py

168 lines
5.5 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""
创建/更新演示主链路 invoke_agent 委派Agent
拓扑最小可运行:
Start invoke_agent委派到子 Agent End
Agent 默认使用 **知你客服16号**可通过环境变量 `CHILD_AGENT_NAME` 改为轻量 E2E Agent
说明
- `invoke_agent` `subworkflow` 共用引擎实现节点里写 `agent_id` + `input_mapping` 即可
- **动态按 LLM 输出切换目标 Agent** 需引擎支持从 input 解析 `agent_id`后续迭代本演示为 **固定委派**用于验证链路与父子执行记录
用法:
cd backend && .\\venv\\Scripts\\python.exe scripts/create_router_invoke_demo_agent.py
环境变量:
PLATFORM_BASE_URL, PLATFORM_USERNAME, PLATFORM_PASSWORD
CHILD_AGENT_NAME默认 知你客服16号
DEMO_AGENT_NAME默认 演示-主链路委派invoke
"""
from __future__ import annotations
import json
import os
import sys
from typing import Any, Dict, List, Optional
import requests
BASE = os.getenv("PLATFORM_BASE_URL", "http://127.0.0.1:8037").rstrip("/")
USER = os.getenv("PLATFORM_USERNAME", "admin")
PWD = os.getenv("PLATFORM_PASSWORD", "123456")
CHILD_AGENT_NAME = os.getenv("CHILD_AGENT_NAME", "知你客服16号")
DEMO_AGENT_NAME = os.getenv("DEMO_AGENT_NAME", "演示-主链路委派invoke")
def _sanitize_edges(edges: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
seen: set = set()
out: List[Dict[str, Any]] = []
for e in edges or []:
s, t = e.get("source"), e.get("target")
if not s or not t or s == t:
continue
key = (s, t)
if key in seen:
continue
seen.add(key)
ne = dict(e)
ne["sourceHandle"] = "right"
ne["targetHandle"] = "left"
if not ne.get("id"):
ne["id"] = f"edge_{s}_{t}"
out.append(ne)
return out
def _login_headers() -> Dict[str, str]:
r = requests.post(
f"{BASE}/api/v1/auth/login",
data={"username": USER, "password": PWD},
headers={"Content-Type": "application/x-www-form-urlencoded"},
timeout=15,
)
r.raise_for_status()
token = r.json().get("access_token")
if not token:
raise RuntimeError("无 access_token")
return {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
def _find_agent_id(h: Dict[str, str], name: str) -> Optional[str]:
r = requests.get(
f"{BASE}/api/v1/agents", params={"search": name, "limit": 100}, headers=h, timeout=30
)
if r.status_code != 200:
return None
for a in r.json() or []:
if a.get("name") == name:
return a.get("id")
return None
def _demo_workflow(child_agent_id: str) -> Dict[str, Any]:
return {
"nodes": [
{"id": "start-1", "type": "start", "data": {"label": "开始"}, "position": {"x": 80, "y": 160}},
{
"id": "invoke-1",
"type": "invoke_agent",
"data": {
"label": "委派子Agent",
"agent_id": child_agent_id,
"input_mapping": {"query": "query", "user_id": "user_id"},
"max_subworkflow_depth": 3,
},
"position": {"x": 380, "y": 160},
},
{"id": "end-1", "type": "end", "data": {"label": "结束"}, "position": {"x": 680, "y": 160}},
],
"edges": _sanitize_edges(
[
{
"id": "e_start_invoke",
"source": "start-1",
"target": "invoke-1",
"sourceHandle": "right",
"targetHandle": "left",
},
{
"id": "e_invoke_end",
"source": "invoke-1",
"target": "end-1",
"sourceHandle": "right",
"targetHandle": "left",
},
]
),
}
def main() -> int:
h = _login_headers()
child_id = _find_agent_id(h, CHILD_AGENT_NAME)
if not child_id:
print(f"未找到子 Agent: {CHILD_AGENT_NAME}", file=sys.stderr)
return 1
wf = _demo_workflow(child_id)
existing = _find_agent_id(h, DEMO_AGENT_NAME)
desc = (
f"演示Start→invoke_agent→End委派到「{CHILD_AGENT_NAME}」;"
"用于验证父子执行与链路 API非动态路由版目标 Agent 在画布中固定)。"
)
if existing:
r = requests.put(
f"{BASE}/api/v1/agents/{existing}",
headers=h,
json={"description": desc, "workflow_config": wf},
timeout=60,
)
if r.status_code != 200:
print("更新失败:", r.status_code, r.text[:800], file=sys.stderr)
return 1
new_id = existing
print("已更新:", DEMO_AGENT_NAME, new_id)
else:
r = requests.post(
f"{BASE}/api/v1/agents",
headers=h,
json={"name": DEMO_AGENT_NAME, "description": desc, "workflow_config": wf},
timeout=60,
)
if r.status_code != 201:
print("创建失败:", r.status_code, r.text[:800], file=sys.stderr)
return 1
new_id = r.json()["id"]
print("已创建:", DEMO_AGENT_NAME, new_id)
print(json.dumps({"id": new_id, "name": DEMO_AGENT_NAME, "child_agent_id": child_id}, ensure_ascii=False))
return 0
if __name__ == "__main__":
raise SystemExit(main())