#!/usr/bin/env python3 """ 创建/更新「演示:主链路 invoke_agent 委派」Agent。 拓扑(最小可运行): Start → invoke_agent(委派到子 Agent)→ End 子 Agent 默认使用 **知你客服16号**(可通过环境变量 `CHILD_AGENT_NAME` 改为轻量 E2E 子 Agent 等)。 说明: - `invoke_agent` 与 `subworkflow` 共用引擎实现;节点里写 `agent_id` + `input_mapping` 即可。 - **动态按 LLM 输出切换目标 Agent** 需引擎支持从 input 解析 `agent_id`(后续迭代);本演示为 **固定委派**,用于验证链路与父子执行记录。 用法: cd backend && .\\venv\\Scripts\\python.exe scripts/create_router_invoke_demo_agent.py 环境变量: PLATFORM_BASE_URL, PLATFORM_USERNAME, PLATFORM_PASSWORD CHILD_AGENT_NAME(默认 知你客服16号) DEMO_AGENT_NAME(默认 演示-主链路委派invoke) """ from __future__ import annotations import json import os import sys from typing import Any, Dict, List, Optional import requests BASE = os.getenv("PLATFORM_BASE_URL", "http://127.0.0.1:8037").rstrip("/") USER = os.getenv("PLATFORM_USERNAME", "admin") PWD = os.getenv("PLATFORM_PASSWORD", "123456") CHILD_AGENT_NAME = os.getenv("CHILD_AGENT_NAME", "知你客服16号") DEMO_AGENT_NAME = os.getenv("DEMO_AGENT_NAME", "演示-主链路委派invoke") def _sanitize_edges(edges: List[Dict[str, Any]]) -> List[Dict[str, Any]]: seen: set = set() out: List[Dict[str, Any]] = [] for e in edges or []: s, t = e.get("source"), e.get("target") if not s or not t or s == t: continue key = (s, t) if key in seen: continue seen.add(key) ne = dict(e) ne["sourceHandle"] = "right" ne["targetHandle"] = "left" if not ne.get("id"): ne["id"] = f"edge_{s}_{t}" out.append(ne) return out def _login_headers() -> Dict[str, str]: r = requests.post( f"{BASE}/api/v1/auth/login", data={"username": USER, "password": PWD}, headers={"Content-Type": "application/x-www-form-urlencoded"}, timeout=15, ) r.raise_for_status() token = r.json().get("access_token") if not token: raise RuntimeError("无 access_token") return {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} def _find_agent_id(h: Dict[str, str], name: str) -> Optional[str]: r = requests.get( f"{BASE}/api/v1/agents", params={"search": name, "limit": 100}, headers=h, timeout=30 ) if r.status_code != 200: return None for a in r.json() or []: if a.get("name") == name: return a.get("id") return None def _demo_workflow(child_agent_id: str) -> Dict[str, Any]: return { "nodes": [ {"id": "start-1", "type": "start", "data": {"label": "开始"}, "position": {"x": 80, "y": 160}}, { "id": "invoke-1", "type": "invoke_agent", "data": { "label": "委派子Agent", "agent_id": child_agent_id, "input_mapping": {"query": "query", "user_id": "user_id"}, "max_subworkflow_depth": 3, }, "position": {"x": 380, "y": 160}, }, {"id": "end-1", "type": "end", "data": {"label": "结束"}, "position": {"x": 680, "y": 160}}, ], "edges": _sanitize_edges( [ { "id": "e_start_invoke", "source": "start-1", "target": "invoke-1", "sourceHandle": "right", "targetHandle": "left", }, { "id": "e_invoke_end", "source": "invoke-1", "target": "end-1", "sourceHandle": "right", "targetHandle": "left", }, ] ), } def main() -> int: h = _login_headers() child_id = _find_agent_id(h, CHILD_AGENT_NAME) if not child_id: print(f"未找到子 Agent: {CHILD_AGENT_NAME}", file=sys.stderr) return 1 wf = _demo_workflow(child_id) existing = _find_agent_id(h, DEMO_AGENT_NAME) desc = ( f"演示:Start→invoke_agent→End,委派到「{CHILD_AGENT_NAME}」;" "用于验证父子执行与链路 API;非动态路由版(目标 Agent 在画布中固定)。" ) if existing: r = requests.put( f"{BASE}/api/v1/agents/{existing}", headers=h, json={"description": desc, "workflow_config": wf}, timeout=60, ) if r.status_code != 200: print("更新失败:", r.status_code, r.text[:800], file=sys.stderr) return 1 new_id = existing print("已更新:", DEMO_AGENT_NAME, new_id) else: r = requests.post( f"{BASE}/api/v1/agents", headers=h, json={"name": DEMO_AGENT_NAME, "description": desc, "workflow_config": wf}, timeout=60, ) if r.status_code != 201: print("创建失败:", r.status_code, r.text[:800], file=sys.stderr) return 1 new_id = r.json()["id"] print("已创建:", DEMO_AGENT_NAME, new_id) print(json.dumps({"id": new_id, "name": DEMO_AGENT_NAME, "child_agent_id": child_id}, ensure_ascii=False)) return 0 if __name__ == "__main__": raise SystemExit(main())