#!/usr/bin/env python3 """ 创建「企业复杂编排」示例 Agent(单 DAG、无环): Start → Condition(用户 query 是否含触发标记) ├─ True: Code(预检/标注) → LLM(多内置工具) └─ False: LLM(轻量直连) → Merge → End 触发深度链路:在用户问题中任意位置包含 **[[深度]]**(不含引号),例如: 「[[深度]]请用 http_request 拉取 https://httpbin.org/get 的 JSON 并摘要」 未包含标记则走轻量分支(仍可调 LLM,但默认关闭工具)。 用法: cd backend && .\\venv\\Scripts\\python.exe scripts/create_complex_enterprise_agent.py USE_TESTCLIENT=1 # 不经 TCP,直接内存请求 FastAPI(推荐本机多实例抢端口时) 环境变量: PLATFORM_BASE_URL, PLATFORM_USERNAME, PLATFORM_PASSWORD COMPLEX_AGENT_NAME(默认 企业复杂编排_深度分流) """ from __future__ import annotations import json import os import sys from typing import Any, Dict, List DEEP_MARKER = "[[深度]]" DEFAULT_NAME = os.getenv("COMPLEX_AGENT_NAME", "企业复杂编排_深度分流") TOOLS_DEEP = [ "http_request", "text_analyze", "json_process", "datetime", "math_calculate", "file_read", "system_info", ] PROMPT_DEEP = f"""你是企业级「深度分析」助手。用户消息中可能含有触发标记「{DEEP_MARKER}」,回答时请忽略该标记本身,专注实质需求。 可调用工具完成:HTTP 请求、文本分析、JSON 处理、时间、数学、读文件、系统信息。按需调用,勿编造工具结果。 最后用简洁中文总结结论。""" PROMPT_FAST = """你是企业助手「快速通道」。用户未走深度编排;请直接、简洁地回答,避免冗长。若信息不足先追问一句。""" CODE_PREFLIGHT = """ d = input_data if isinstance(input_data, dict) else {} q = str(d.get("query", "") or "") result = dict(d) result["_deep_preflight"] = True result["_query_chars"] = len(q) result["_marker_detected"] = "[[深度]]" in q """.strip() def _sanitize_edges(edges: List[Dict[str, Any]]) -> List[Dict[str, Any]]: seen: set = set() out: List[Dict[str, Any]] = [] for e in edges or []: s, t = e.get("source"), e.get("target") if not s or not t or s == t: continue key = (s, t, e.get("sourceHandle") or "") if key in seen: continue seen.add(key) ne = dict(e) if not ne.get("targetHandle"): ne["targetHandle"] = "left" if not ne.get("id"): sh = ne.get("sourceHandle") or "r" ne["id"] = f"e_{s}_{t}_{sh}" out.append(ne) return out def build_complex_workflow() -> Dict[str, Any]: """Condition 表达式与引擎一致:{{query}} contains \"...\" """ cond_expr = f'{{query}} contains "{DEEP_MARKER}"' nodes: List[Dict[str, Any]] = [ {"id": "start-1", "type": "start", "position": {"x": 40, "y": 260}, "data": {"label": "开始"}}, { "id": "cond-1", "type": "condition", "position": {"x": 280, "y": 260}, "data": {"label": "深度分流", "condition": cond_expr}, }, { "id": "code-pf", "type": "code", "position": {"x": 520, "y": 120}, "data": { "label": "预检/标注", "language": "python", "code": CODE_PREFLIGHT, "timeout": 15, }, }, { "id": "llm-deep", "type": "llm", "position": {"x": 760, "y": 120}, "data": { "label": "深度 LLM+工具", "prompt": PROMPT_DEEP, "temperature": 0.25, "enable_tools": True, "tools": TOOLS_DEEP, "selected_tools": TOOLS_DEEP, }, }, { "id": "llm-fast", "type": "llm", "position": {"x": 520, "y": 400}, "data": { "label": "快速 LLM", "prompt": PROMPT_FAST, "temperature": 0.35, "enable_tools": False, "tools": [], "selected_tools": [], }, }, { "id": "merge-1", "type": "merge", "position": {"x": 1000, "y": 260}, "data": {"label": "合并输出", "mode": "merge_all", "strategy": "object"}, }, {"id": "end-1", "type": "end", "position": {"x": 1240, "y": 260}, "data": {"label": "结束"}}, ] edges = _sanitize_edges( [ {"source": "start-1", "target": "cond-1", "sourceHandle": "right", "targetHandle": "left"}, {"source": "cond-1", "target": "code-pf", "sourceHandle": "true", "targetHandle": "left"}, {"source": "cond-1", "target": "llm-fast", "sourceHandle": "false", "targetHandle": "left"}, {"source": "code-pf", "target": "llm-deep", "sourceHandle": "right", "targetHandle": "left"}, {"source": "llm-deep", "target": "merge-1", "sourceHandle": "right", "targetHandle": "left"}, {"source": "llm-fast", "target": "merge-1", "sourceHandle": "right", "targetHandle": "left"}, {"source": "merge-1", "target": "end-1", "sourceHandle": "right", "targetHandle": "left"}, ] ) return {"nodes": nodes, "edges": edges} def _validate_local(wf: Dict[str, Any]) -> None: from app.services.workflow_validator import validate_workflow r = validate_workflow(wf.get("nodes") or [], wf.get("edges") or []) if not r.get("valid"): errs = r.get("errors") or [] raise ValueError("工作流校验失败: " + "; ".join(errs)) def _via_requests() -> int: import requests base = os.getenv("PLATFORM_BASE_URL", "http://127.0.0.1:8037").rstrip("/") user = os.getenv("PLATFORM_USERNAME", "admin") pwd = os.getenv("PLATFORM_PASSWORD", "123456") wf = build_complex_workflow() _validate_local(wf) lr = requests.post( f"{base}/api/v1/auth/login", data={"username": user, "password": pwd}, headers={"Content-Type": "application/x-www-form-urlencoded"}, timeout=20, ) if lr.status_code != 200: print("登录失败:", lr.status_code, lr.text[:500], file=sys.stderr) return 1 token = lr.json().get("access_token") if not token: print("无 access_token", file=sys.stderr) return 1 h = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} def find_id() -> str | None: rr = requests.get(f"{base}/api/v1/agents", params={"search": DEFAULT_NAME, "limit": 50}, headers=h, timeout=30) if rr.status_code != 200: return None for a in rr.json() or []: if a.get("name") == DEFAULT_NAME: return a.get("id") return None desc = ( f"复杂编排示例:条件分流(query 含「{DEEP_MARKER}」→ 代码预检 + 多工具 LLM),否则快速 LLM;Merge 汇总后结束。" " 适合演示画布、引擎分支与预算。" ) budget = {"max_steps": 120, "max_llm_invocations": 6, "max_tool_calls": 48} existing = find_id() if existing: ur = requests.put( f"{base}/api/v1/agents/{existing}", headers=h, json={"description": desc, "workflow_config": wf, "budget_config": budget}, timeout=120, ) if ur.status_code != 200: print("更新失败:", ur.status_code, ur.text[:800], file=sys.stderr) return 1 aid = existing print("已更新:", DEFAULT_NAME, aid) else: cr = requests.post( f"{base}/api/v1/agents", headers=h, json={ "name": DEFAULT_NAME, "description": desc, "workflow_config": wf, "budget_config": budget, }, timeout=120, ) if cr.status_code != 201: print("创建失败:", cr.status_code, cr.text[:800], file=sys.stderr) return 1 aid = cr.json()["id"] print("已创建:", DEFAULT_NAME, aid) print( json.dumps( { "id": aid, "name": DEFAULT_NAME, "deep_marker": DEEP_MARKER, "hint": f"执行时在 query 中加入 {DEEP_MARKER} 可走深度分支", }, ensure_ascii=False, ) ) return 0 def _via_testclient() -> int: from fastapi.testclient import TestClient from app.main import app wf = build_complex_workflow() _validate_local(wf) c = TestClient(app) lr = c.post( "/api/v1/auth/login", data={ "username": os.getenv("PLATFORM_USERNAME", "admin"), "password": os.getenv("PLATFORM_PASSWORD", "123456"), }, headers={"Content-Type": "application/x-www-form-urlencoded"}, ) if lr.status_code != 200: print("login:", lr.status_code, lr.text[:400], file=sys.stderr) return 1 token = lr.json().get("access_token") h = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} gr = c.get("/api/v1/agents", params={"search": DEFAULT_NAME, "limit": 50}, headers=h) existing = None if gr.status_code == 200: for a in gr.json() or []: if a.get("name") == DEFAULT_NAME: existing = a.get("id") break desc = ( f"复杂编排示例:条件分流(query 含「{DEEP_MARKER}」→ 代码预检 + 多工具 LLM),否则快速 LLM;Merge 汇总后结束。" ) budget = {"max_steps": 120, "max_llm_invocations": 6, "max_tool_calls": 48} if existing: ur = c.put( f"/api/v1/agents/{existing}", headers=h, json={"description": desc, "workflow_config": wf, "budget_config": budget}, ) if ur.status_code != 200: print("put:", ur.status_code, ur.text[:800], file=sys.stderr) return 1 aid = existing print("已更新(TestClient):", DEFAULT_NAME, aid) else: cr = c.post( "/api/v1/agents", headers=h, json={ "name": DEFAULT_NAME, "description": desc, "workflow_config": wf, "budget_config": budget, }, ) if cr.status_code != 201: print("post:", cr.status_code, cr.text[:800], file=sys.stderr) return 1 aid = cr.json()["id"] print("已创建(TestClient):", DEFAULT_NAME, aid) print( json.dumps( {"id": aid, "name": DEFAULT_NAME, "deep_marker": DEEP_MARKER}, ensure_ascii=False, ) ) return 0 def main() -> int: if os.getenv("USE_TESTCLIENT", "").strip() in ("1", "true", "yes"): return _via_testclient() return _via_requests() if __name__ == "__main__": raise SystemExit(main())