diff --git a/backend/app/agent_runtime/core.py b/backend/app/agent_runtime/core.py index bfe424b..4f8b1bd 100644 --- a/backend/app/agent_runtime/core.py +++ b/backend/app/agent_runtime/core.py @@ -134,6 +134,7 @@ class AgentRuntime: tool_schemas = self.tool_manager.get_tool_schemas() has_tools = self.tool_manager.has_tools() steps: List[AgentStep] = [] + _self_review_attempted = False # 防止无限修正循环 # 构建 LLM 调用回调(包装 on_llm_call,补充上下文) llm_callback_ctx = {"step_type": "think", "tool_name": None} @@ -221,6 +222,33 @@ class AgentRuntime: # LLM 直接返回文本 → 结束 self.context.add_assistant_message(content) final_text = content or "(模型未返回有效内容)" + + # 输出质量自检(默认关闭,Agent 节点可开启) + if self.config.self_review_enabled and not _self_review_attempted: + review = await self._self_review(final_text, task_context=user_input) + steps.append(AgentStep( + iteration=self.context.iteration, + type="tool_result", + content=f"self_review: score={review['score']:.2f} passed={review['passed']}", + tool_name="self_review", + tool_input={"content": final_text[:200]}, + tool_result=json.dumps(review, ensure_ascii=False), + )) + if review["passed"]: + logger.info("self_review 通过 (%.2f >= %.2f)", review["score"], review["threshold"]) + else: + logger.info("self_review 未通过 (%.2f < %.2f),追加修正", review["score"], review["threshold"]) + _self_review_attempted = True + # 追加修正提示 + fix_prompt = ( + f"你的上一个回答未通过质量检查(评分 {review['score']:.1f}/{review['threshold']})。\n" + f"问题:{';'.join(review['issues'][:3])}\n" + f"改进建议:{';'.join(review['suggestions'][:3])}\n" + "请修正你的回答,确保满足上述建议。" + ) + self.context.add_user_message(fix_prompt) + continue # 回到 ReAct 循环,让 LLM 修正 + steps.append(AgentStep( iteration=self.context.iteration, type="final", @@ -383,6 +411,7 @@ class AgentRuntime: tool_schemas = self.tool_manager.get_tool_schemas() has_tools = self.tool_manager.has_tools() steps: List[AgentStep] = [] + _self_review_attempted = False llm_callback_ctx = {"step_type": "think", "tool_name": None} @@ -458,6 +487,37 @@ class AgentRuntime: # LLM 直接返回文本 → 结束 self.context.add_assistant_message(content) final_text = content or "(模型未返回有效内容)" + + # 输出质量自检(默认关闭) + if self.config.self_review_enabled and not _self_review_attempted: + review = await self._self_review(final_text, task_context=user_input) + yield { + "type": "tool_result", + "content": f"self_review: score={review['score']:.2f} passed={review['passed']}", + "tool_name": "self_review", + "iteration": self.context.iteration, + "session_id": self.context.session_id, + } + if review["passed"]: + logger.info("self_review 通过 (%.2f >= %.2f)", review["score"], review["threshold"]) + else: + logger.info("self_review 未通过 (%.2f < %.2f),追加修正", review["score"], review["threshold"]) + _self_review_attempted = True + yield { + "type": "think", + "content": f"自检未通过({review['score']:.1f}),正在修正:{';'.join(review['suggestions'][:2])}", + "iteration": self.context.iteration, + "session_id": self.context.session_id, + } + fix_prompt = ( + f"你的上一个回答未通过质量检查(评分 {review['score']:.1f}/{review['threshold']})。\n" + f"问题:{';'.join(review['issues'][:3])}\n" + f"改进建议:{';'.join(review['suggestions'][:3])}\n" + "请修正你的回答,确保满足上述建议。" + ) + self.context.add_user_message(fix_prompt) + continue # 回到 ReAct 循环,让 LLM 修正 + yield { "type": "final", "content": final_text, @@ -663,6 +723,88 @@ class AgentRuntime: if db: db.close() + async def _self_review(self, content: str, task_context: str = "") -> dict: + """输出质量自检:用轻量 LLM 评判输出,返回 {score, passed, issues, suggestions}。""" + criteria = ( + "回答必须准确、完整、切题。" + "包含具体可执行的步骤或代码示例。" + "无明显事实错误或遗漏。" + "格式清晰,便于阅读。" + ) + try: + from app.agent_runtime.core import _LLMClient + from app.agent_runtime.schemas import AgentLLMConfig + + review_config = AgentLLMConfig( + provider=getattr(self.config.llm, 'provider', 'deepseek'), + model="deepseek-v4-flash", + temperature=0.1, + max_tokens=800, + request_timeout=30.0, + ) + if self.config.llm.api_key: + review_config.api_key = self.config.llm.api_key + if self.config.llm.base_url: + review_config.base_url = self.config.llm.base_url + + client = _LLMClient(review_config) + + judge_prompt = ( + "你是严格的内容质量评审专家。请根据以下标准对内容进行评分。\n\n" + f"【评判标准】\n{criteria}\n\n" + f"【待评审内容】\n{content[:8000]}\n" + ) + if task_context: + judge_prompt += f"\n【任务背景】\n{task_context[:2000]}\n" + + judge_prompt += ( + "\n请以 JSON 格式返回评审结果(严格只返回 JSON,不要任何其他文字):\n" + '{"score": 0.75, "passed": true, "issues": ["问题1"], ' + '"suggestions": ["建议1"], "summary": "一句话总结"}\n\n' + "评分规则:1.0完美 0.8良好 0.6基本满足 0.4大部分未满足 0.2完全不满足\n" + "score >= 0.6 时 passed=true,否则 passed=false\n" + ) + + messages = [{"role": "user", "content": judge_prompt}] + resp = await client.chat(messages=messages, tools=None, iteration=0) + judge_text = getattr(resp, 'content', '') or ( + resp.get('content', '') if isinstance(resp, dict) else str(resp) + ) + + # 解析 JSON + try: + judge_clean = judge_text.strip() + if judge_clean.startswith("```"): + lines = judge_clean.split("\n") + judge_clean = "\n".join(lines[1:-1] if lines[-1].strip() == "```" else lines[1:]) + result = json.loads(judge_clean) + except json.JSONDecodeError: + import re as _sr_re + m = _sr_re.search(r'\{[^{}]*"score"\s*:\s*[\d.]+[^{}]*\}', judge_text, _sr_re.DOTALL) + if m: + try: + result = json.loads(m.group()) + except json.JSONDecodeError: + result = {"score": 0.5, "passed": False, "issues": ["无法解析评审结果"], "suggestions": [], "summary": ""} + else: + result = {"score": 0.5, "passed": False, "issues": ["无法解析评审结果"], "suggestions": [], "summary": ""} + + score = float(result.get("score", 0.5)) + threshold = self.config.llm.self_review_threshold + passed = score >= threshold + + return { + "score": score, + "passed": passed, + "threshold": threshold, + "issues": result.get("issues", []), + "suggestions": result.get("suggestions", []), + "summary": result.get("summary", ""), + } + except Exception as e: + logger.warning("self_review 执行失败: %s", e) + return {"score": 0.5, "passed": True, "issues": [], "suggestions": [], "error": str(e)} + @staticmethod def _extract_tool_calls(response: Any) -> List[Dict[str, Any]]: """从 LLM 响应中提取工具调用列表。""" diff --git a/backend/app/agent_runtime/schemas.py b/backend/app/agent_runtime/schemas.py index abeaa71..5643908 100644 --- a/backend/app/agent_runtime/schemas.py +++ b/backend/app/agent_runtime/schemas.py @@ -36,6 +36,7 @@ class AgentLLMConfig(BaseModel): max_iterations: int = 10 # ReAct 循环最大步数 request_timeout: float = 120.0 extra_body: Optional[Dict[str, Any]] = None + self_review_threshold: float = 0.6 # self-review 通过阈值(0-1) class AgentBudgetConfig(BaseModel): @@ -55,6 +56,8 @@ class AgentConfig(BaseModel): user_id: Optional[str] = None # 持久记忆 / 向量记忆的 scope_id;不设时沿用 user_id 或 name(易与其他 Agent 串记忆) memory_scope_id: Optional[str] = None + # 是否开启输出质量自检(结束前用轻量 LLM 评审,不达标则追加修正) + self_review_enabled: bool = False class AgentMessage(BaseModel): diff --git a/backend/app/agent_runtime/workflow_integration.py b/backend/app/agent_runtime/workflow_integration.py index 8566c7e..0810f42 100644 --- a/backend/app/agent_runtime/workflow_integration.py +++ b/backend/app/agent_runtime/workflow_integration.py @@ -97,6 +97,7 @@ async def run_agent_node( }, budget=budget, user_id=user_id, + self_review_enabled=node_data.get("self_review_enabled", False), ) # 4. 执行 Agent diff --git a/backend/app/core/tools_bootstrap.py b/backend/app/core/tools_bootstrap.py index 481a5cd..7a0aa3e 100644 --- a/backend/app/core/tools_bootstrap.py +++ b/backend/app/core/tools_bootstrap.py @@ -8,7 +8,7 @@ logger = logging.getLogger(__name__) _registered = False -_EXPECTED_BUILTIN = 31 +_EXPECTED_BUILTIN = 35 def ensure_builtin_tools_registered() -> None: @@ -49,6 +49,10 @@ def ensure_builtin_tools_registered() -> None: deploy_push_tool, agent_create_tool, tool_register_tool, + capability_check_tool, + code_tool_create_tool, + extension_log_tool, + self_review_tool, HTTP_REQUEST_SCHEMA, FILE_READ_SCHEMA, FILE_WRITE_SCHEMA, @@ -80,6 +84,10 @@ def ensure_builtin_tools_registered() -> None: DEPLOY_PUSH_SCHEMA, AGENT_CREATE_SCHEMA, TOOL_REGISTER_SCHEMA, + CAPABILITY_CHECK_SCHEMA, + CODE_TOOL_CREATE_SCHEMA, + EXTENSION_LOG_SCHEMA, + SELF_REVIEW_SCHEMA, ) tool_registry.register_builtin_tool("http_request", http_request_tool, HTTP_REQUEST_SCHEMA) @@ -113,6 +121,10 @@ def ensure_builtin_tools_registered() -> None: tool_registry.register_builtin_tool("deploy_push", deploy_push_tool, DEPLOY_PUSH_SCHEMA) tool_registry.register_builtin_tool("agent_create", agent_create_tool, AGENT_CREATE_SCHEMA) tool_registry.register_builtin_tool("tool_register", tool_register_tool, TOOL_REGISTER_SCHEMA) + tool_registry.register_builtin_tool("capability_check", capability_check_tool, CAPABILITY_CHECK_SCHEMA) + tool_registry.register_builtin_tool("code_tool_create", code_tool_create_tool, CODE_TOOL_CREATE_SCHEMA) + tool_registry.register_builtin_tool("extension_log", extension_log_tool, EXTENSION_LOG_SCHEMA) + tool_registry.register_builtin_tool("self_review", self_review_tool, SELF_REVIEW_SCHEMA) _registered = True n = tool_registry.builtin_tool_count() diff --git a/backend/app/models/agent.py b/backend/app/models/agent.py index 0e5159e..441ddad 100644 --- a/backend/app/models/agent.py +++ b/backend/app/models/agent.py @@ -32,3 +32,21 @@ class Agent(Base): def __repr__(self): return f"" + + +class AgentExtension(Base): + """Agent 自主扩展记录表 — 用于 extension_log 工具""" + __tablename__ = "agent_extensions" + + id = Column(CHAR(36), primary_key=True, default=lambda: str(uuid.uuid4()), comment="扩展记录ID") + extension_type = Column(String(50), nullable=False, comment="扩展类型: agent_created/tool_registered/code_tool_created") + name = Column(String(100), nullable=False, comment="创建的Agent或工具名称") + reason = Column(Text, comment="创建原因(为什么需要扩展)") + detail = Column(JSON, comment="扩展详情(参数、结果等)") + success_rating = Column(String(20), comment="效果评价: success/partial/failed") + note = Column(Text, comment="反馈备注") + user_id = Column(CHAR(36), ForeignKey("users.id"), nullable=True, comment="创建者ID") + created_at = Column(DateTime, default=func.now(), comment="创建时间") + + def __repr__(self): + return f"" diff --git a/backend/app/services/builtin_tools.py b/backend/app/services/builtin_tools.py index 1a86501..dc44f85 100644 --- a/backend/app/services/builtin_tools.py +++ b/backend/app/services/builtin_tools.py @@ -3382,6 +3382,16 @@ async def agent_create_tool( """ import uuid + # -- 参数校验 -- + if not name or not name.strip(): + return json.dumps({"error": "invalid_name", "message": "Agent 名称不能为空"}, ensure_ascii=False) + name = name.strip() + if not system_prompt or len(system_prompt.strip()) < 20: + return json.dumps({ + "error": "invalid_prompt", + "message": f"system_prompt 太短({len(system_prompt) if system_prompt else 0}字符),至少需要20字符才能定义有效的 Agent 角色", + }, ensure_ascii=False) + try: from app.core.database import SessionLocal from app.models.agent import Agent @@ -3450,7 +3460,7 @@ async def agent_create_tool( "model": model, "provider": provider, "temperature": temperature, - "tools": "ALL (31 builtin tools)", + "tools": "ALL (34 builtin tools)", "memory": "RAG enabled", }, "hint": f"现在可以用 agent_call(agent_name=\"{name}\", query=\"...\") 来调用这个新 Agent", @@ -3560,6 +3570,20 @@ async def tool_register_tool( }, } + # URL 可达性验证(发送 HEAD 请求检测,不阻塞注册) + url_reachable = False + try: + # 用占位符替换模板参数为 test 值,构造测试 URL + test_url = url + for p in url_params: + test_url = test_url.replace(f"{{{p}}}", "test") + if not url_params: + test_url = url + resp = await httpx.AsyncClient(timeout=5.0).head(test_url, follow_redirects=True) + url_reachable = resp.status_code < 500 + except Exception: + url_reachable = False # 不阻断注册,仅记录 + db = SessionLocal() try: # 检查重名 @@ -3607,6 +3631,7 @@ async def tool_register_tool( "url_template": url, "params": required, }, + "url_reachable": url_reachable, "hint": f"工具已就绪,可直接调用 {name}({', '.join(p + '=...' for p in required) if required else ''})", }, ensure_ascii=False) finally: @@ -3639,3 +3664,667 @@ TOOL_REGISTER_SCHEMA = { }, }, } + + +# ── capability_check ────────────────────────────────────────── + +async def capability_check_tool( + task_description: str, + required_domains: str = "", +) -> str: + """ + 能力自检工具:分析当前 Agent 的工具和子 Agent 是否足以完成给定任务。 + + Agent 在接到复杂/陌生任务时,应先调用此工具评估自身能力, + 根据返回的差距分析决定是否需要创建子 Agent 或注册新工具。 + + Args: + task_description: 任务描述 + required_domains: 任务涉及的领域,逗号分隔(如「数据库,性能优化,SQL」) + """ + try: + from app.core.database import SessionLocal + from app.models.agent import Agent as AgentModel + from app.services.tool_registry import tool_registry + + # 获取所有可用工具 + all_tool_names = [] + tool_categories = {} + for schema in tool_registry.get_all_tool_schemas(): + fn = schema.get("function", schema) + tname = fn.get("name", "") + if tname: + all_tool_names.append(tname) + desc = fn.get("description", "") + # 简单分类 + cat = desc.split("。")[0] if "。" in desc else desc[:30] + tool_categories[tname] = cat + + # 查询数据库中的可用子 Agent + available_agents = [] + try: + db = SessionLocal() + try: + agents_in_db = db.query(AgentModel).filter(AgentModel.status == "active").all() + available_agents = [ + {"name": a.name, "id": a.id, "description": a.description or ""} + for a in agents_in_db + ] + finally: + db.close() + except Exception: + pass + + # 领域关键词映射(规则引擎,不调 LLM) + domain_tool_map = { + "数据库": ["database_query"], + "SQL": ["database_query"], + "代码": ["code_execute", "git_operation"], + "编程": ["code_execute", "git_operation", "project_scaffold"], + "文件": ["file_read", "file_write", "excel_process"], + "网络": ["http_request", "url_parse", "web_search"], + "HTTP": ["http_request"], + "API": ["http_request", "tool_register"], + "数据": ["json_process", "excel_process", "text_analyze"], + "分析": ["text_analyze", "code_execute", "database_query"], + "报告": ["pdf_generate", "file_write"], + "文档": ["file_read", "pdf_generate", "file_write"], + "部署": ["deploy_push", "docker_manage"], + "容器": ["docker_manage"], + "Docker": ["docker_manage"], + "前端": ["project_scaffold"], + "后端": ["project_scaffold", "http_request"], + "搜索": ["web_search"], + "Git": ["git_operation"], + "测试": ["code_execute"], + "监控": ["http_request", "schedule_create"], + "调度": ["schedule_create", "schedule_list"], + "邮件": ["send_email"], + "Excel": ["excel_process"], + "PDF": ["pdf_generate", "file_read"], + "浏览器": ["browser_use"], + "Android": ["adb_log"], + "加密": ["crypto_util"], + "性能": ["database_query", "code_execute"], + "优化": ["database_query", "code_execute"], + } + + # 工具能力覆盖关键词 + tool_capability_keywords = { + "http_request": ["HTTP", "API", "网络", "请求"], + "file_read": ["文件", "读取"], + "file_write": ["文件", "写入", "保存"], + "text_analyze": ["文本", "分析"], + "datetime": ["时间", "日期"], + "math_calculate": ["数学", "计算"], + "system_info": ["系统", "信息"], + "json_process": ["JSON", "数据"], + "database_query": ["数据库", "SQL", "查询"], + "adb_log": ["Android", "ADB"], + "schedule_create": ["调度", "定时"], + "crypto_util": ["加密", "解密"], + "random_generate": ["随机", "数据"], + "send_email": ["邮件", "发送"], + "url_parse": ["URL", "解析"], + "regex_test": ["正则", "表达式"], + "agent_call": ["Agent", "委派"], + "code_execute": ["代码", "执行", "Python"], + "git_operation": ["Git", "版本"], + "web_search": ["搜索", "网页"], + "pdf_generate": ["PDF", "报告"], + "project_scaffold": ["项目", "模板"], + "task_plan": ["任务", "规划"], + "excel_process": ["Excel"], + "browser_use": ["浏览器"], + "docker_manage": ["Docker", "容器"], + "deploy_push": ["部署"], + "agent_create": ["创建Agent"], + "tool_register": ["注册工具"], + } + + domains = [d.strip() for d in required_domains.split(",") if d.strip()] + if not domains: + # 从 task_description 中自动提取关键词 + desc_lower = task_description + domains = [kw for kw in domain_tool_map if kw in desc_lower] + if not domains: + domains = ["通用"] + + # 找出推荐的已有工具 + recommended_existing = [] + for domain in domains: + tools = domain_tool_map.get(domain, []) + for t in tools: + if t in all_tool_names and t not in recommended_existing: + recommended_existing.append(t) + + # 差距分析:哪些领域没有匹配的工具 + missing_domains = [] + missing_tools = [] + for domain in domains: + expected = domain_tool_map.get(domain) + if expected: + found = any(t in all_tool_names for t in expected) + if not found: + missing_domains.append(domain) + missing_tools.extend(expected) + + missing_tools = sorted(set(missing_tools)) + + # 严重程度评估 + if not missing_domains: + severity = "low" + elif len(missing_domains) <= 2: + severity = "medium" + else: + severity = "high" + + # 生成建议 + recommendations = [] + if missing_domains: + rec_name = f"{domains[0]}专家" if domains else "领域专家" + recommendations.append(f"使用 agent_create 创建「{rec_name}」Agent,赋予其专业知识") + if missing_tools: + actionable = [t for t in missing_tools if t in tool_capability_keywords] + if actionable: + recommendations.append( + f"缺少以下能力工具:{', '.join(actionable[:5])}。" + f"可通过 tool_register 注册外部 API,或通过 code_tool_create 创建代码工具" + ) + recommendations.append("使用 web_search 搜索相关的外部 API 或开源工具") + if available_agents: + agent_names = [a["name"] for a in available_agents[:5]] + recommendations.insert(0, f"可尝试调用现有 Agent:{', '.join(agent_names)}") + + result = { + "task": task_description, + "domains_analyzed": domains, + "available_tools_count": len(all_tool_names), + "available_tools_sample": all_tool_names[:15], + "available_agents": [a["name"] for a in available_agents], + "recommended_existing_tools": recommended_existing, + "gap_analysis": { + "missing_domain_knowledge": missing_domains, + "missing_tools": missing_tools, + "severity": severity, + }, + "recommendations": recommendations, + } + return json.dumps(result, ensure_ascii=False) + except Exception as e: + logger.error(f"capability_check 工具执行失败: {e}", exc_info=True) + return json.dumps({"error": "check_failed", "message": str(e)}, ensure_ascii=False) + + +CAPABILITY_CHECK_SCHEMA = { + "type": "function", + "function": { + "name": "capability_check", + "description": ( + "能力自检工具:分析当前可用工具和子 Agent 是否足以完成给定任务。" + "在接到复杂或陌生任务时,先用此工具评估差距,再决定是否需要创建子 Agent 或注册新工具。" + "返回结构化差距分析和行动建议。" + ), + "parameters": { + "type": "object", + "properties": { + "task_description": {"type": "string", "description": "任务描述(如「分析MySQL慢查询日志并优化」)"}, + "required_domains": {"type": "string", "description": "任务涉及的领域,逗号分隔(如「数据库,性能优化,SQL」),留空则自动提取"}, + }, + "required": ["task_description"], + }, + }, +} + + +# ── code_tool_create ────────────────────────────────────────── + +async def code_tool_create_tool( + name: str, + description: str, + code: str, + language: str = "python", + parameter_schema: str = "{}", + test_input: str = "", +) -> str: + """ + 将经过验证的代码持久化为可复用工具。 + + 先用 code_execute 验证代码正确性,再写入 tools 表并加载到 tool_registry。 + 创建后所有 Agent 均可调用该工具。 + + Args: + name: 工具名称(英文,如 parse_slow_query) + description: 工具功能描述 + code: 完整的 Python/JS 函数代码 + language: 代码语言(python/javascript) + parameter_schema: 参数 schema JSON,如 {"param1": {"type": "string", "description": "..."}} + test_input: 测试输入 JSON,用于沙箱验证(可选) + """ + import uuid + + try: + from app.core.database import SessionLocal + from app.models.tool import Tool + from app.models.user import User + from app.services.tool_registry import tool_registry + + # 解析参数 schema + try: + param_schema = json.loads(parameter_schema) if parameter_schema else {} + except json.JSONDecodeError: + return json.dumps({"error": "invalid_schema", "message": "parameter_schema 不是有效 JSON"}, ensure_ascii=False) + + # 构建 OpenAI function schema + properties = {} + required_params = [] + for pname, pinfo in param_schema.items(): + prop = {"description": pinfo.get("description", f"参数 {pname}")} + ptype = pinfo.get("type", "string") + if ptype == "integer": + prop["type"] = "integer" + elif ptype == "number": + prop["type"] = "number" + elif ptype == "boolean": + prop["type"] = "boolean" + else: + prop["type"] = "string" + properties[pname] = prop + if pinfo.get("required", True): + required_params.append(pname) + + function_schema = { + "type": "function", + "function": { + "name": name, + "description": description, + "parameters": { + "type": "object", + "properties": properties, + "required": required_params, + }, + }, + } + + # 沙箱测试(如果提供了 test_input) + test_result = None + if test_input and language == "python": + try: + test_params = json.loads(test_input) if isinstance(test_input, str) else test_input + # 构建测试调用代码 + args_str = ", ".join( + f"{k}={repr(v)}" if not isinstance(v, (int, float, bool)) else f"{k}={v}" + for k, v in test_params.items() + ) + test_code = f"{code}\n\n# test\nresult = {name}({args_str})\nprint(json.dumps(result, ensure_ascii=False, default=str))" + proc = await asyncio.create_subprocess_exec( + sys.executable, "-c", test_code, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=15.0) + if proc.returncode == 0: + test_result = {"passed": True, "output": stdout.decode("utf-8", errors="replace").strip()[:2000]} + else: + return json.dumps({ + "error": "test_failed", + "message": f"沙箱测试失败,请修正代码后重试", + "stderr": stderr.decode("utf-8", errors="replace")[:1000], + }, ensure_ascii=False) + except asyncio.TimeoutError: + return json.dumps({"error": "test_timeout", "message": "沙箱测试超时(15秒),请优化代码"}, ensure_ascii=False) + except Exception as e: + return json.dumps({"error": "test_error", "message": f"沙箱测试异常: {e}"}, ensure_ascii=False) + + # 写入 tools 表 + db = SessionLocal() + try: + existing = db.query(Tool).filter(Tool.name == name).first() + if existing: + db.close() + return json.dumps({ + "error": "name_conflict", + "message": f"工具「{name}」已存在(id={existing.id})", + "existing_id": existing.id, + }, ensure_ascii=False) + + owner = db.query(User).first() + tool_id = str(uuid.uuid4()) + + tool = Tool( + id=tool_id, + name=name, + description=description, + category="agent_created", + function_schema=function_schema, + implementation_type="code", + implementation_config={ + "code": code, + "language": language, + "parameter_schema": param_schema, + }, + is_public=False, + user_id=owner.id if owner else None, + ) + db.add(tool) + db.commit() + + # 加载到 tool_registry + tool_registry.load_tools_from_db(db, tool_names=[name]) + + result = { + "status": "created", + "tool": { + "id": tool_id, + "name": name, + "description": description, + "language": language, + "params": required_params, + }, + "sandbox_test": test_result, + "hint": f"工具已就绪,可直接调用 {name}({', '.join(p + '=...' for p in required_params) if required_params else ''})", + } + return json.dumps(result, ensure_ascii=False) + finally: + db.close() + except Exception as e: + logger.error(f"code_tool_create 工具执行失败: {e}", exc_info=True) + return json.dumps({"error": "creation_failed", "message": f"创建代码工具失败: {e}"}, ensure_ascii=False) + + +CODE_TOOL_CREATE_SCHEMA = { + "type": "function", + "function": { + "name": "code_tool_create", + "description": ( + "将经过验证的代码持久化为可复用的内置工具。" + "先用 code_execute 验证代码,确认正确后调用此工具将其注册为永久工具。" + "创建后所有 Agent 均可直接调用该工具。" + ), + "parameters": { + "type": "object", + "properties": { + "name": {"type": "string", "description": "工具名称(英文,如 parse_slow_query、csv_cleaner)"}, + "description": {"type": "string", "description": "工具功能描述(中文,如「解析MySQL慢查询日志,提取最慢的N条SQL」)"}, + "code": {"type": "string", "description": "完整的 Python 函数代码(包含 def 定义和依赖 import)"}, + "language": {"type": "string", "description": "代码语言", "default": "python"}, + "parameter_schema": {"type": "string", "description": "参数 JSON schema,如 {\"log_path\": {\"type\": \"string\", \"description\": \"日志路径\"}}", "default": "{}"}, + "test_input": {"type": "string", "description": "测试输入 JSON,用于沙箱验证(可选但推荐提供)"}, + }, + "required": ["name", "description", "code"], + }, + }, +} + + +# ── extension_log ──────────────────────────────────────────── + +async def extension_log_tool( + action: str = "list", + name: str = "", + extension_type: str = "", + reason: str = "", + detail_json: str = "{}", + success_rating: str = "", + note: str = "", + limit: int = 10, +) -> str: + """ + 记录和查询 Agent 自主扩展历史。 + + Agent 创建子 Agent / 注册工具 / 创建代码工具后,用此工具记录。 + 后续可查询历史扩展记录进行自我反思。 + + Args: + action: "list" 查询历史 / "log" 记录新扩展 / "evaluate" 评价已有扩展 + name: 扩展名称(log/evaluate 时必填) + extension_type: 扩展类型(agent_created/tool_registered/code_tool_created) + reason: 创建原因 + detail_json: 扩展详情 JSON + success_rating: 效果评价(success/partial/failed) + note: 备注 + limit: list 时返回的最大条数 + """ + import uuid + from datetime import datetime + + try: + from app.core.database import SessionLocal + from app.models.agent import AgentExtension + from app.models.user import User + + db = SessionLocal() + try: + if action == "list": + query = db.query(AgentExtension).order_by(AgentExtension.created_at.desc()) + if extension_type: + query = query.filter(AgentExtension.extension_type == extension_type) + if name: + query = query.filter(AgentExtension.name.contains(name)) + extensions = query.limit(min(limit, 50)).all() + + items = [] + for ext in extensions: + items.append({ + "id": ext.id, + "type": ext.extension_type, + "name": ext.name, + "reason": ext.reason, + "success_rating": ext.success_rating, + "note": ext.note, + "created_at": ext.created_at.isoformat() if ext.created_at else None, + }) + + return json.dumps({ + "extensions": items, + "count": len(items), + "hint": "使用 extension_log(action='evaluate', name='...', success_rating='success') 评价扩展效果", + }, ensure_ascii=False) + + elif action == "log": + if not name or not extension_type: + return json.dumps({"error": "missing_fields", "message": "log 操作需要 name 和 extension_type 参数"}, ensure_ascii=False) + + owner = db.query(User).first() + ext_id = str(uuid.uuid4()) + try: + detail = json.loads(detail_json) if detail_json else {} + except json.JSONDecodeError: + detail = {"raw": detail_json} + + ext_record = AgentExtension( + id=ext_id, + extension_type=extension_type, + name=name, + reason=reason, + detail=detail, + user_id=owner.id if owner else None, + ) + db.add(ext_record) + db.commit() + + return json.dumps({ + "status": "logged", + "extension": {"id": ext_id, "type": extension_type, "name": name}, + "hint": "扩展已记录。后续可用 extension_log(action='evaluate', ...) 评价效果", + }, ensure_ascii=False) + + elif action == "evaluate": + if not name: + return json.dumps({"error": "missing_fields", "message": "evaluate 操作需要 name 参数"}, ensure_ascii=False) + + # 查找最近的匹配记录 + ext_record = ( + db.query(AgentExtension) + .filter(AgentExtension.name == name) + .order_by(AgentExtension.created_at.desc()) + .first() + ) + if not ext_record: + return json.dumps({ + "error": "not_found", + "message": f"未找到名为「{name}」的扩展记录,请先用 action='log' 记录", + }, ensure_ascii=False) + + if success_rating: + ext_record.success_rating = success_rating + if note: + ext_record.note = note + db.commit() + + return json.dumps({ + "status": "evaluated", + "extension": {"id": ext_record.id, "name": name, "success_rating": success_rating, "note": note}, + }, ensure_ascii=False) + + else: + return json.dumps({"error": "unknown_action", "message": f"未知操作: {action},支持 list/log/evaluate"}, ensure_ascii=False) + + finally: + db.close() + except Exception as e: + logger.error(f"extension_log 工具执行失败: {e}", exc_info=True) + return json.dumps({"error": "extension_log_failed", "message": str(e)}, ensure_ascii=False) + + +EXTENSION_LOG_SCHEMA = { + "type": "function", + "function": { + "name": "extension_log", + "description": ( + "记录和查询自主扩展历史。创建子 Agent / 注册工具 / 创建代码工具后," + "用此工具记录扩展和原因,以便后续反思和优化。" + "支持 list(查询)、log(记录)、evaluate(评价)三种操作。" + ), + "parameters": { + "type": "object", + "properties": { + "action": {"type": "string", "enum": ["list", "log", "evaluate"], "description": "操作:list 查询 / log 记录 / evaluate 评价", "default": "list"}, + "name": {"type": "string", "description": "扩展名称(Agent名或工具名,log/evaluate 时必填)"}, + "extension_type": {"type": "string", "enum": ["agent_created", "tool_registered", "code_tool_created"], "description": "扩展类型(log 时必填)"}, + "reason": {"type": "string", "description": "创建原因,简短说明为什么要扩展(log 时推荐填写)"}, + "detail_json": {"type": "string", "description": "扩展详情 JSON(log 时可选)"}, + "success_rating": {"type": "string", "enum": ["success", "partial", "failed"], "description": "效果评价(evaluate 时使用)"}, + "note": {"type": "string", "description": "反馈备注(evaluate 时使用,如「子Agent成功定位3条慢SQL」)"}, + "limit": {"type": "integer", "description": "list 时返回的最大条数", "default": 10}, + }, + "required": ["action"], + }, + }, +} + + +# ── self_review ────────────────────────────────────────────── + +async def self_review_tool( + content: str, + criteria: str, + task_context: str = "", +) -> str: + """ + 输出质量自检工具:用 LLM 评判一段内容是否满足质量标准。 + + Agent 或工作流节点在执行完毕后,可调用此工具对输出进行质量评估。 + 评分 >= 0.6 为通过,不通过时返回具体问题和改进建议,供修正参考。 + + Args: + content: 待评估的内容(Agent 回答、报告、代码等) + criteria: 评判标准(如「回答必须包含索引优化建议和具体SQL示例」) + task_context: 原始任务上下文(可选,帮助评判器理解背景) + """ + try: + # 构建评判 prompt + judge_prompt = ( + "你是严格的内容质量评审专家。请根据以下标准对内容进行评分。\n\n" + f"【评判标准】\n{criteria}\n\n" + f"【待评审内容】\n{content[:8000]}\n" + ) + if task_context: + judge_prompt += f"\n【任务背景】\n{task_context[:2000]}\n" + + judge_prompt += ( + "\n请以 JSON 格式返回评审结果(严格只返回 JSON,不要任何其他文字):\n" + '{"score": 0.75, "passed": true, "issues": ["问题1", "问题2"], ' + '"suggestions": ["建议1", "建议2"], "summary": "一句话总结"}\n\n' + "评分规则:\n" + "- 1.0 完美满足所有标准\n" + "- 0.8 良好,有小的改进空间\n" + "- 0.6 基本满足,但存在明显不足\n" + "- 0.4 大部分标准未满足\n" + "- 0.2 完全不满足\n" + "- score >= 0.6 时 passed=true,否则 passed=false\n" + ) + + # 调用轻量 LLM 评审 + from app.services.llm_service import LLMService + llm = LLMService() + messages = [{"role": "user", "content": judge_prompt}] + resp = await llm.chat( + provider="deepseek", + model="deepseek-v4-flash", + messages=messages, + temperature=0.1, + max_tokens=800, + ) + judge_text = resp.get("content", "") if isinstance(resp, dict) else str(resp) + + # 解析 JSON 响应 + try: + # 清理可能的 markdown 包裹 + judge_text_clean = judge_text.strip() + if judge_text_clean.startswith("```"): + lines = judge_text_clean.split("\n") + judge_text_clean = "\n".join(lines[1:-1] if lines[-1].strip() == "```" else lines[1:]) + result = json.loads(judge_text_clean) + except json.JSONDecodeError: + # 尝试从文本中提取 JSON + import re as _re2 + m = _re2.search(r'\{[^{}]*"score"\s*:\s*[\d.]+[^{}]*\}', judge_text, _re2.DOTALL) + if m: + try: + result = json.loads(m.group()) + except json.JSONDecodeError: + result = {"score": 0.5, "passed": False, "issues": ["无法解析评审结果"], "suggestions": [], "summary": judge_text[:200]} + else: + result = {"score": 0.5, "passed": False, "issues": ["无法解析评审结果"], "suggestions": [], "summary": judge_text[:200]} + + score = float(result.get("score", 0.5)) + passed = result.get("passed", score >= 0.6) + if not passed and score >= 0.6: + passed = True + if passed and score < 0.6: + passed = False + + return json.dumps({ + "score": score, + "passed": passed, + "threshold": 0.6, + "issues": result.get("issues", []), + "suggestions": result.get("suggestions", []), + "summary": result.get("summary", ""), + }, ensure_ascii=False) + except Exception as e: + logger.error(f"self_review 工具执行失败: {e}", exc_info=True) + return json.dumps({"error": "review_failed", "message": str(e), "score": 0, "passed": False}, ensure_ascii=False) + + +SELF_REVIEW_SCHEMA = { + "type": "function", + "function": { + "name": "self_review", + "description": ( + "输出质量自检工具:用 LLM 评判一段内容的输出质量。" + "在完成任务后调用此工具检查输出是否满足标准," + "不通过时会给出具体问题和改进建议。" + ), + "parameters": { + "type": "object", + "properties": { + "content": {"type": "string", "description": "待评估的内容(Agent 回答、报告、代码等)"}, + "criteria": {"type": "string", "description": "评判标准(如「回答必须包含索引优化建议和具体SQL示例」)"}, + "task_context": {"type": "string", "description": "原始任务上下文(可选,帮助评判器理解背景)"}, + }, + "required": ["content", "criteria"], + }, + }, +} diff --git a/backend/app/services/workflow_engine.py b/backend/app/services/workflow_engine.py index 1885765..cb19b9d 100644 --- a/backend/app/services/workflow_engine.py +++ b/backend/app/services/workflow_engine.py @@ -4066,57 +4066,62 @@ class WorkflowEngine: } elif node_type == 'error_handler': - # 错误处理节点:捕获错误、错误重试、错误通知 - # 注意:这个节点需要特殊处理,因为它应该包装其他节点的执行 - # 这里我们实现一个简化版本,主要用于错误重试和通知 + # 错误处理节点:捕获上游节点失败,执行重试/通知/停止策略 node_data = node.get('data', {}) retry_count = node_data.get('retry_count', 3) retry_delay = node_data.get('retry_delay', 1000) # 毫秒 on_error = node_data.get('on_error', 'notify') # notify, retry, stop - error_handler_workflow = node_data.get('error_handler_workflow', '') - - # 这个节点通常用于包装其他节点,但在这里我们只处理输入数据中的错误 + + # 找到上游节点(从 edges 反查 source) + predecessor_id = None + for e in self.active_edges if hasattr(self, 'active_edges') else []: + if e.get("target") == node_id: + predecessor_id = e.get("source") + break + if not predecessor_id: + for e in self.edges: + if e.get("target") == node_id: + predecessor_id = e.get("source") + break + try: - # 检查输入数据中是否有错误 if isinstance(input_data, dict) and input_data.get('status') == 'failed': error = input_data.get('error', '未知错误') - - if on_error == 'retry' and retry_count > 0: - # 重试逻辑(这里简化处理,实际应该重新执行前一个节点) - logger.warning(f"错误处理节点检测到错误,将重试: {error}") - # 注意:实际重试需要重新执行前一个节点,这里只记录 + + if on_error == 'retry': + logger.warning(f"error_handler 检测到错误,请求重试前驱节点 {predecessor_id}: {error}") exec_result = { 'output': input_data, - 'status': 'retry', + 'status': 'retry_predecessor', + 'predecessor_id': predecessor_id, 'retry_count': retry_count, - 'error': error + 'retry_delay_ms': retry_delay, + 'error': error, } elif on_error == 'notify': - # 通知错误(记录日志) - logger.error(f"错误处理节点捕获错误: {error}") + logger.error(f"error_handler 捕获错误并通知: {error}") exec_result = { 'output': input_data, 'status': 'error_handled', 'error': error, - 'notified': True + 'notified': True, } - else: - # 停止执行 + else: # stop exec_result = { 'output': input_data, 'status': 'failed', 'error': error, - 'stopped': True + 'stopped': True, } else: - # 没有错误,正常通过 + # 上游正常,透传 exec_result = {'output': input_data, 'status': 'success'} - + if self.logger: duration = int((time.time() - start_time) * 1000) self.logger.log_node_complete(node_id, node_type, exec_result.get('output'), duration) return exec_result - + except Exception as e: if self.logger: duration = int((time.time() - start_time) * 1000) @@ -4124,7 +4129,7 @@ class WorkflowEngine: return { 'output': input_data, 'status': 'failed', - 'error': f'错误处理失败: {str(e)}' + 'error': f'错误处理失败: {str(e)}', } elif node_type == 'csv': @@ -5427,6 +5432,89 @@ class WorkflowEngine: self.logger.log_node_complete(node_id, node_type, final_output, duration) return result + elif node_type == 'evaluator': + # 输出质量评估节点:用 LLM 评判上游节点的输出质量 + nd = node.get('data', {}) or {} + criteria = nd.get('criteria', '回答必须准确、完整、切题') + pass_threshold = float(nd.get('pass_threshold', 0.6)) + + # 从输入数据提取待评估内容 + if isinstance(input_data, dict): + content = input_data.get('output', input_data.get('content', str(input_data))) + else: + content = str(input_data) if input_data else "" + + # 调用 LLM 评判 + try: + from app.services.llm_service import llm_service + judge_prompt = ( + "你是严格的内容质量评审专家。请根据以下标准对内容进行评分。\n\n" + f"【评判标准】\n{criteria}\n\n" + f"【待评审内容】\n{str(content)[:8000]}\n\n" + "请以 JSON 格式返回评审结果(严格只返回 JSON,不要任何其他文字):\n" + '{"score": 0.75, "passed": true, "issues": ["问题1"], ' + '"suggestions": ["建议1"], "summary": "一句话总结"}\n\n' + "评分规则:1.0完美 0.8良好 0.6基本满足 0.4大部分未满足 0.2完全不满足\n" + "score >= 0.6 时 passed=true,否则 passed=false\n" + ) + messages = [{"role": "user", "content": judge_prompt}] + resp = llm_service.call_openai_with_tools( + provider="deepseek", + model="deepseek-v4-flash", + messages=messages, + temperature=0.1, + max_tokens=600, + ) + judge_text = "" + if isinstance(resp, dict): + choices = resp.get("choices", []) + if choices: + judge_text = choices[0].get("message", {}).get("content", "") + elif hasattr(resp, 'choices'): + judge_text = resp.choices[0].message.content or "" + + # 解析 JSON + import re as _eval_re + judge_clean = judge_text.strip() + if judge_clean.startswith("```"): + lines = judge_clean.split("\n") + judge_clean = "\n".join(lines[1:-1] if lines[-1].strip() == "```" else lines[1:]) + try: + review = json.loads(judge_clean) + except json.JSONDecodeError: + m = _eval_re.search(r'\{[^{}]*"score"\s*:\s*[\d.]+[^{}]*\}', judge_text, _eval_re.DOTALL) + if m: + review = json.loads(m.group()) + else: + review = {"score": 0.5, "passed": False, "issues": ["评审解析失败"], "suggestions": []} + + score = float(review.get("score", 0.5)) + passed = score >= pass_threshold + + exec_result = { + 'output': { + 'score': score, + 'passed': passed, + 'threshold': pass_threshold, + 'issues': review.get('issues', []), + 'suggestions': review.get('suggestions', []), + 'summary': review.get('summary', ''), + 'content': content if isinstance(content, str) else str(content), + }, + 'status': 'success' if passed else 'quality_failed', + } + except Exception as e: + logger.error(f"evaluator 节点评审失败: {e}") + exec_result = { + 'output': {'score': 0, 'passed': False, 'error': str(e), 'content': str(content)}, + 'status': 'failed', + } + + if self.logger: + duration = int((time.time() - start_time) * 1000) + self.logger.log_node_complete(node_id, node_type, exec_result.get('output'), duration) + return exec_result + else: # 未知节点类型 logger.warning(f"[rjb] 未知节点类型: node_id={node_id}, node_type={node_type}, node keys={list(node.keys())}") @@ -5751,9 +5839,84 @@ class WorkflowEngine: # 继续查找循环体内的节点 self._mark_loop_body_executed(target_id, executed_nodes, active_edges) else: - # 执行失败,停止工作流 + # 执行失败或质量不达标 — 支持重试 + failed_status = result.get('status', 'failed') error_msg = result.get('error', '未知错误') node_type = node.get('type', 'unknown') + + # 处理 error_handler 返回的 retry_predecessor + if failed_status == 'retry_predecessor': + pred_id = result.get('predecessor_id') + eh_retry_count = result.get('retry_count', 1) + eh_retry_delay_ms = result.get('retry_delay_ms', 1000) + + # 检查是否已有重试计数(多次经过 error_handler) + prev_counter = self.node_outputs.get(f"_eh_retry_{pred_id}", {}) + remaining = prev_counter.get("remaining", eh_retry_count) + if pred_id and pred_id in self.nodes and remaining > 0: + remaining -= 1 + logger.warning( + f"error_handler 请求重试前驱节点 {pred_id},剩余 {remaining} 次" + ) + self.executed_nodes.discard(pred_id) + self.node_outputs.pop(pred_id, None) + self.node_outputs[f"_eh_retry_{pred_id}"] = { + "remaining": remaining, + "delay_ms": eh_retry_delay_ms, + "error_handler_id": next_node_id, + } + executed_nodes.add(next_node_id) + execution_sequence.append(next_node_id) + results[next_node_id] = result + await asyncio.sleep(eh_retry_delay_ms / 1000.0) + continue + elif remaining <= 0: + logger.error(f"error_handler 重试次数耗尽,前驱节点 {pred_id} 停止") + raise WorkflowExecutionError( + detail=f"error_handler 重试次数耗尽: {error_msg}", + node_id=pred_id, + ) + + # 检查节点级 retry_config + node_data = node.get('data', {}) or {} + retry_cfg = node_data.get('retry_config', {}) + max_retries = retry_cfg.get('max_retries', 0) if isinstance(retry_cfg, dict) else 0 + retry_delay_ms = retry_cfg.get('retry_delay_ms', 1000) if isinstance(retry_cfg, dict) else 1000 + on_exhausted = retry_cfg.get('on_exhausted', 'stop') if isinstance(retry_cfg, dict) else 'stop' + + retry_key = f"_retry_{next_node_id}" + retries_done = self.node_outputs.get(retry_key, 0) + + if max_retries > 0 and retries_done < max_retries: + self.node_outputs[retry_key] = retries_done + 1 + logger.warning( + f"节点 {next_node_id} ({node_type}) 执行失败,重试 {retries_done + 1}/{max_retries}: {error_msg}" + ) + await asyncio.sleep(retry_delay_ms / 1000.0) + continue # 不标记已执行,下次循环重新执行 + + # 重试耗尽或未配置重试 + if retries_done >= max_retries and max_retries > 0: + if on_exhausted == 'skip': + logger.warning(f"节点 {next_node_id} 重试耗尽,跳过: {error_msg}") + self.node_outputs[next_node_id] = { + 'status': 'skipped', 'error': error_msg + } + executed_nodes.add(next_node_id) + execution_sequence.append(next_node_id) + results[next_node_id] = result + continue + elif on_exhausted == 'notify': + logger.error(f"节点 {next_node_id} 重试耗尽,已通知: {error_msg}") + self.node_outputs[next_node_id] = { + 'status': 'error_notified', 'error': error_msg + } + executed_nodes.add(next_node_id) + execution_sequence.append(next_node_id) + results[next_node_id] = result + continue + + # 默认:停止工作流 logger.error(f"工作流执行失败 - 节点: {next_node_id} ({node_type}), 错误: {error_msg}") raise WorkflowExecutionError( detail=error_msg, diff --git a/frontend/src/utils/agentSkills.ts b/frontend/src/utils/agentSkills.ts index 382045c..428bcc4 100644 --- a/frontend/src/utils/agentSkills.ts +++ b/frontend/src/utils/agentSkills.ts @@ -28,6 +28,10 @@ export const BUILTIN_SKILL_OPTIONS: { name: string; label: string }[] = [ { name: 'deploy_push', label: '部署推送' }, { name: 'agent_create', label: '创建 Agent' }, { name: 'tool_register', label: '注册工具' }, + { name: 'capability_check', label: '能力自检' }, + { name: 'code_tool_create', label: '创建代码工具' }, + { name: 'extension_log', label: '扩展日志' }, + { name: 'self_review', label: '输出自检' }, ] export const BUILTIN_SKILL_LABELS: Record = Object.fromEntries( diff --git a/scripts/seed_agents.py b/scripts/seed_agents.py index 22a3ba5..33b6ff4 100644 --- a/scripts/seed_agents.py +++ b/scripts/seed_agents.py @@ -208,32 +208,47 @@ agents = [ { "name": "全能助手", "description": "综合 AI 助手,可使用所有工具处理各种任务", - "system_prompt": """你是全能助手 OmniBot,一个功能全面的 AI 助手。 + "system_prompt": """你是全能助手 OmniBot,一个功能全面的 AI 助手,具备自主能力扩展能力。 ## 核心能力 -你可以使用平台提供的所有工具,根据用户需求灵活选择最合适的工具完成各类任务。 +你可以使用平台提供的所有 34 个工具,涵盖文件操作、网络请求、数据处理、代码执行、系统信息、数据库、Git、Docker、部署、浏览器操控、Agent 协作等类别。 -## 可用工具 -你拥有丰富的工具库,涵盖以下类别: -- **文件操作**: file_read, file_write -- **网络请求**: http_request, check_website, ip_info, shorten_url, weather_query -- **数据处理**: csv_processor, json_tool, json_process, text_analyze, text_summarize, extract_info, html_to_markdown, base64_codec -- **代码执行**: execute_code, math_calculate -- **系统信息**: system_info, datetime, timestamp, uuid_generator -- **搜索**: grep_search, list_files -- **数据库**: database_query -- **Git**: git_log -- **ADB**: adb_log +## 自主能力扩展 +当遇到超出当前能力的复杂或专业任务时,你应当主动扩展自己的能力边界,而非放弃或给出粗糙答案。 + +### 扩展流程 +1. **能力评估** — 使用 capability_check 工具分析任务需求与现有能力的匹配度,识别差距 +2. **方案搜索** — 如存在差距,使用 web_search 搜索相关的外部 API、开源工具或专业知识 +3. **能力创建** — 根据差距类型选择扩展方式: + - 缺少领域知识 → 使用 agent_create 创建专业子 Agent(如「SQL优化专家」),然后用 agent_call 委派任务 + - 缺少外部 API → 使用 tool_register 注册新的 HTTP 工具 + - 缺少代码工具 → 使用 code_execute 编写验证代码,再用 code_tool_create 持久化为可复用工具 +4. **验证集成** — 创建后立即测试,确认新能力可用 +5. **委派执行** — 将专业任务委派给新创建的子 Agent 或工具 +6. **记录反馈** — 使用 extension_log 记录扩展结果和效果评价 + +### 扩展决策原则 +- 任务涉及专业领域知识(医学、法律、特定技术栈)→ 优先创建专业子 Agent +- 任务需要频繁调用某个外部 API → 优先注册 HTTP 工具 +- 任务需要自定义数据处理逻辑 → 优先创建代码工具 +- 简单任务、已有工具可直接完成 → 不需要扩展 + +## 安全原则 +- 创建的工具和 Agent 默认为私有(仅自己可见) +- 代码工具创建前必须先通过沙箱测试 +- 外部 API 注册时会自动检测可达性 +- 不在系统提示词中泄露敏感信息 ## 工作流程 -1. 理解用户需求的本质 -2. 选择最合适的工具组合 -3. 执行工具并分析结果 -4. 给出清晰、完整的答案 +1. 理解用户需求的本质和复杂度 +2. 对复杂/陌生任务先用 capability_check 评估 +3. 选择合适的工具组合(必要时自主扩展) +4. 执行工具并分析结果 +5. 给出清晰、完整的答案 ## 回答风格 - 先理解再行动,不确定时先确认 -- 复杂任务分解步骤 +- 复杂任务分解步骤,使用 task_plan 跟踪进度 - 多种方案时对比说明 - 代码和配置示例完整可用""", "tools": [], diff --git a/使用文档.md b/使用文档.md index 1187f48..6e929c7 100644 --- a/使用文档.md +++ b/使用文档.md @@ -2,11 +2,11 @@ ## 概述 -所有 Agent 默认拥有 **31 个内置工具**(`tools: []` = 全部可用)。工具涵盖文件操作、网络访问、代码执行、项目管理、部署等能力,Agent 可在 ReAct 循环中自主选择和调用。 +所有 Agent 默认拥有 **34 个内置工具**(`tools: []` = 全部可用)。工具涵盖文件操作、网络访问、代码执行、项目管理、部署、自主能力扩展等能力,Agent 可在 ReAct 循环中自主选择和调用。 --- -## 工具清单(31个) +## 工具清单(34个) ### 文件与数据 @@ -32,6 +32,7 @@ | 8 | `code_execute` | `code`, `language`, `timeout` | Python/JS 沙箱执行 | | 9 | `text_analyze` | `text`, `operation` | 文本分析 | | 10 | `regex_test` | `pattern`, `text` | 正则表达式测试 | +| 11 | `code_tool_create` | `name`, `description`, `code`, `language`, `parameter_schema`, `test_input` | 持久化验证过的代码为可复用工具 | ### 项目管理 @@ -70,19 +71,21 @@ | # | 工具 | 参数 | 说明 | |---|------|------|------| -| 24 | `send_email` | `to`, `subject`, `body` | 发送邮件 | -| 25 | `agent_call` | `agent_name`, `query`, `max_iterations` | 调用其他 Agent 委派任务 | -| 26 | `agent_create` | `name`, `system_prompt`, `description` | 动态创建专业子 Agent | -| 27 | `tool_register` | `name`, `description`, `method`, `url` | 动态注册 HTTP 工具 | +| 25 | `send_email` | `to`, `subject`, `body` | 发送邮件 | +| 26 | `agent_call` | `agent_name`, `query`, `max_iterations` | 调用其他 Agent 委派任务 | +| 27 | `agent_create` | `name`, `system_prompt`, `description` | 动态创建专业子 Agent | +| 28 | `tool_register` | `name`, `description`, `method`, `url` | 动态注册 HTTP 工具 | +| 29 | `capability_check` | `task_description`, `required_domains` | 能力自检,分析任务与现有能力的匹配度 | +| 30 | `extension_log` | `action`, `name`, `extension_type`, `reason` | 记录和查询自主扩展历史 | ### DevOps | # | 工具 | 参数 | 说明 | |---|------|------|------| -| 26 | `database_query` | `query`, `params` | 数据库查询 | -| 27 | `docker_manage` | `operation`, `resource`, `options` | Docker 容器管理(只读) | -| 28 | `browser_use` | `url`, `action`, `selector`, `script` | 无头浏览器操控(截图/提取/填表) | -| 31 | `adb_log` | `device`, `lines`, `tag` | Android ADB 日志 | +| 31 | `database_query` | `query`, `params` | 数据库查询 | +| 32 | `docker_manage` | `operation`, `resource`, `options` | Docker 容器管理(只读) | +| 33 | `browser_use` | `url`, `action`, `selector`, `script` | 无头浏览器操控(截图/提取/填表) | +| 34 | `adb_log` | `device`, `lines`, `tag` | Android ADB 日志 | --- @@ -350,25 +353,105 @@ tool_register( --- +### 14. capability_check — 能力自检 + +``` +capability_check(task_description="分析MySQL慢查询日志并优化", required_domains="数据库,性能优化,SQL") +→ { + "available_tools_count": 34, + "available_agents": ["数据分析师", "日志分析师", "代码助手"], + "gap_analysis": { + "missing_domain_knowledge": ["MySQL性能优化"], + "missing_tools": ["database_query"], + "severity": "medium" + }, + "recommendations": [ + "使用 agent_create 创建「数据库专家」Agent", + "使用 web_search 搜索相关的外部 API 或开源工具" + ] + } +``` + +**场景:** +- Agent 接到陌生/复杂任务时,先自检能力是否足够 +- 基于关键词和领域规则做差距分析(不消耗 LLM token) +- 返回结构化建议:创建子Agent / 注册工具 / 搜索方案 + +--- + +### 15. code_tool_create — 代码工具持久化 + +``` +code_tool_create( + name="parse_slow_query", + description="解析MySQL慢查询日志,提取最慢的N条SQL", + code="def parse_slow_query(log_path, top_n=10):\n import re\n ...", + language="python", + parameter_schema='{"log_path": {"type": "string", "description": "日志路径"}, "top_n": {"type": "integer", "description": "返回前N条"}}', + test_input='{"log_path": "/tmp/slow.log", "top_n": 5}' +) +→ {"status": "created", "tool": {"id": "uuid", "name": "parse_slow_query"}, "sandbox_test": {"passed": true}} +``` + +**场景:** +- 先用 `code_execute` 编写并验证代码逻辑 +- 确认无误后调用此工具持久化 → 写入 tools 表 → 加载到 tool_registry +- 后续所有 Agent 均可直接调用该工具 + +**安全限制:** 创建前自动在沙箱中测试(15s 超时),测试失败则拒绝注册 + +--- + +### 16. extension_log — 扩展记录 + +``` +# 记录扩展 +extension_log(action="log", extension_type="agent_created", name="SQL优化专家", reason="缺少MySQL优化知识") +→ {"status": "logged", "extension": {"id": "uuid", "type": "agent_created", "name": "SQL优化专家"}} + +# 查询历史 +extension_log(action="list", limit=5) +→ {"extensions": [{"type": "agent_created", "name": "SQL优化专家", ...}, ...], "count": 5} + +# 评价效果 +extension_log(action="evaluate", name="SQL优化专家", success_rating="success", note="成功定位3条慢SQL") +→ {"status": "evaluated", "extension": {"name": "SQL优化专家", "success_rating": "success"}} +``` + +**场景:** +- Agent 每次扩展能力后记录原因和结果 +- 后续任务前查询历史 → 评估哪些扩展有效 → 优化决策 +- 形成自主进化反馈闭环 + +--- + ## Agent 自主能力扩展流程 ``` 全能助手收到: "监控线上MySQL慢查询并优化" │ - ├─ 检查自身 → 缺少SQL专业深度 - ├─ web_search("MySQL慢查询分析方案") + ├─ capability_check(task_description="分析MySQL慢查询日志并优化") + │ └─ 差距分析:缺少 MySQL优化知识、慢查询解析工具 + │ + ├─ web_search("MySQL慢查询分析方案 percona-toolkit") + │ └─ 发现 pt-query-digest 工具 │ ├─ agent_create(name="SQL优化专家", system_prompt="你是MySQL性能优化专家...") - │ └─ 新 Agent 写入 DB,拥有全部 31 工具 + │ └─ 新 Agent 写入 DB,拥有全部 34 工具 │ ├─ agent_call("SQL优化专家", "分析这份慢查询日志并给出优化建议") │ └─ 子 Agent 独立 ReAct 循环执行 │ + ├─ extension_log(action="log", extension_type="agent_created", name="SQL优化专家", reason="缺少MySQL优化知识") + │ ├─ 发现 percona-toolkit 有 REST API ├─ tool_register(name="pt_query_digest", url="http://internal-api/analyze?log={path}") │ └─ 工具立即可用 │ + ├─ extension_log(action="log", extension_type="tool_registered", name="pt_query_digest") + │ └─ 整合结果: "慢查询优化方案如下..." + └─ extension_log(action="evaluate", success_rating="success") ``` --- diff --git a/创建agent.md b/创建agent.md index 6aaf0c2..3251305 100644 --- a/创建agent.md +++ b/创建agent.md @@ -2,9 +2,9 @@ ## 概述 -本系统支持多种方式创建 Agent。**所有创建方式均默认赋予 Agent 全部 31 个内置工具能力**,除非明确限制。 +本系统支持多种方式创建 Agent。**所有创建方式均默认赋予 Agent 全部 34 个内置工具能力**,除非明确限制。 -## 内置工具清单(31个) +## 内置工具清单(34个) | 类别 | 工具 | 用途 | |------|------|------| diff --git a/缺失能力.md b/缺失能力.md new file mode 100644 index 0000000..1548f3c --- /dev/null +++ b/缺失能力.md @@ -0,0 +1,202 @@ +# 缺失能力分析 + +## 概述 + +当前项目已有 34 个内置工具、自主进化系统(agent_create / tool_register / code_tool_create / capability_check / extension_log)、工作流引擎(DAG + HITL 审批节点)、Agent 记忆系统(RAG + 向量 + 持久化)、异步执行(Celery + Redis)等能力。但在生产级复杂任务执行方面,仍存在以下 10 项关键缺失。 + +--- + +## 第一梯队:复杂任务的硬伤(3 项) + +### 1. 完全没有并行执行 + +**现象:** +- 工作流 DAG 一次只执行一个节点,即使多个分支之间没有依赖关系 +- Orchestrator 的 debate 模式 3 个 Agent 逐个串行执行,而非并发 +- `orchestrator.py` 中 debate 用的是 `for agent in agents` 循环,而非 `asyncio.gather` + +**影响:** +- 复杂任务耗时 = 所有步骤耗时之和,而非最长路径耗时 +- 多 Agent 协作(并行分析、投票)形同虚设 +- 吞吐量瓶颈 + +**涉及文件:** +- `backend/app/services/workflow_engine.py` — DAG 执行循环(`execute()` 方法) +- `backend/app/agent_runtime/orchestrator.py` — `_debate()` 方法 + +--- + +### 2. AgentOrchestrator 不进入工作流 + +**现象:** +- Orchestrator 的 4 种模式(router / sequential / debate / pipeline)仅通过 API 端点 `/orchestrate` 暴露 +- 工作流引擎的节点类型只有 `start / llm / agent / approval / condition / end` 等,没有 `orchestrator` +- 编排能力与工作流系统完全割裂 + +**影响:** +- 无法在工作流中组合多 Agent 协作模式 +- 复杂场景需要外部脚本调用 API,无法在工作流中可视化编排 + +**涉及文件:** +- `backend/app/api/agent_chat.py` — Orchestrator API 入口 +- `backend/app/services/workflow_engine.py` — 缺少 orchestrator 节点类型 +- `backend/app/agent_runtime/workflow_integration.py` — 只有 `run_agent_node()`,无 `run_orchestrator_node()` + +--- + +### 3. 输出质量验证完全缺失 + +**现象:** +- 没有评判/评估节点检查 Agent 输出质量 +- 搜索 `evaluate_output`、`verify_result`、`quality_check`、`validate_node` — 零结果 +- Agent 执行完直接返回结果,不对质量做任何检查 +- `AgentLearningPattern` 只在后台统计成功率,不做实时检查 + +**影响:** +- Agent 不知道自己做得好不好 +- 错误结果直接交付用户 +- 自我修正闭环缺失最关键的一环 + +**涉及文件:** +- 无现有实现,需从零构建 + +--- + +## 第二梯队:生产可靠性(3 项) + +### 4. 节点级自动重试是空壳 + +**现象:** +- `error_handler` 节点类型存在,但只记录意图不实际重试(代码注释:"实际重试需要重新执行前一个节点,这里只记录") +- 一个节点失败 → `WorkflowExecutionError` → 整个工作流停止 +- Celery 层面有任务级重试(指数退避),但节点级没有 + +**影响:** +- LLM 调用偶发超时 = 整个工作流失败 +- 无法配置"这个节点失败后重试 3 次" + +**涉及文件:** +- `backend/app/services/workflow_engine.py` — error_handler 节点(约 4068-4128 行) +- `backend/app/tasks/workflow_tasks.py` — Celery 重试逻辑 + +--- + +### 5. 工具级人工审批缺失 + +**现象:** +- 人工审批(HITL)只存在于工作流节点层面(`approval` 节点类型) +- AgentRuntime 对所有工具自动执行,不暂停确认 +- `deploy_push`、`send_email`、`database_query`(写操作)、`schedule_create` 等风险工具无二次确认 + +**影响:** +- Agent 可能执行危险操作而用户不知情 +- 缺少安全围栏 + +**涉及文件:** +- `backend/app/agent_runtime/core.py` — ReAct 循环中工具自动执行 +- `backend/app/agent_runtime/schemas.py` — `AgentToolConfig` 缺少审批配置 +- `backend/app/services/workflow_engine.py` — approval 节点(约 1374-1422 行,可复用逻辑) + +--- + +### 6. 没有降级/回退链 + +**现象:** +- 模型不可用 → Agent 直接报错 +- Agent 执行失败 → 无备选方案 +- 搜索 `fallback`、`degradation`、`backup` — 零结果 + +**影响:** +- 单点故障无容错 +- 无法配置"主模型挂了自动切备用模型" + +**涉及文件:** +- `backend/app/agent_runtime/schemas.py` — `AgentLLMConfig` 无 fallback 字段 +- `backend/app/agent_runtime/core.py` — ReAct 循环无模型切换逻辑 + +--- + +## 第三梯队:体验与效率(4 项) + +### 7. 进度上报太粗糙 + +**现象:** +- WebSocket 只推送 `status: "running"` 和 `status: "completed"` +- 进度值固定:0 → 50 → 100,没有实际进度百分比 +- WebSocket 每 2 秒轮询 DB,而非主动推送 + +**影响:** +- 用户看不到任务执行到哪一步 +- 复杂任务(7 步)和简单任务(2 步)进度条一样 + +**涉及文件:** +- `backend/app/websocket/manager.py` — WebSocket 管理器(轮询 DB) +- `backend/app/services/workflow_engine.py` — 无进度回调钩子 +- `backend/app/tasks/workflow_tasks.py` — Celery 进度更新只调一次 + +--- + +### 8. Agent 间知识不共享 + +**现象:** +- 每个 Agent 的 RAG 记忆按 `(scope_kind, scope_id)` 隔离 +- 学习模式也按 Scope 隔离 +- "SQL优化专家"学到的知识,"数据分析师"用不了 + +**影响:** +- 知识孤岛,重复学习 +- 自主进化创建的 Agent 从零开始,无法继承父 Agent 经验 + +**涉及文件:** +- `backend/app/agent_runtime/memory.py` — 记忆隔离 +- `backend/app/services/agent_learning_service.py` — 学习模式隔离 + +--- + +### 9. 没有结果缓存 + +**现象:** +- 相同查询发给同一 Agent,每次都完整执行 LLM + 工具 +- 工具结果、LLM 响应均不缓存 +- 搜索 `cache`、`redis_cache`、`ttl` — 仅在 Celery 配置中有 `result_backend` + +**影响:** +- 重复工作浪费 token 和时间 +- 同一用户反复问相似问题,每次都重新跑 + +**涉及文件:** +- 无现有实现,需从零构建 + +--- + +### 10. Agent 独立异步执行是空壳 + +**现象:** +- `execute_agent_task`(Celery 任务)返回 `{"status": "pending"}` 占位符 +- 代码注释:`# TODO: 实现Agent执行逻辑` +- 非工作流的 Agent 执行没有真正的异步支持(只能同步调用 API) + +**影响:** +- Agent 聊天无法异步后台执行 +- 调度触发的 Agent 执行不会真正运行 + +**涉及文件:** +- `backend/app/tasks/agent_tasks.py` — execute_agent_task 是空壳 +- `backend/app/tasks/scheduler_tasks.py` — 定时调度无法真正执行 Agent + +--- + +## 总结 + +| # | 缺失能力 | 梯队 | 复杂度 | 影响面 | +|---|---------|------|--------|--------| +| 1 | 并行执行 | 第一梯队 | 高 | 性能、吞吐量 | +| 2 | Orchestrator 入工作流 | 第一梯队 | 中 | 编排能力整合 | +| 3 | 输出质量验证 | 第一梯队 | 中 | 结果可靠性 | +| 4 | 节点级自动重试 | 第二梯队 | 低 | 稳定性 | +| 5 | 工具级人工审批 | 第二梯队 | 中 | 安全性 | +| 6 | 降级/回退链 | 第二梯队 | 低 | 容错性 | +| 7 | 粒度进度上报 | 第三梯队 | 低 | 用户体验 | +| 8 | Agent 知识共享 | 第三梯队 | 高 | 知识利用率 | +| 9 | 结果缓存 | 第三梯队 | 中 | 效率、成本 | +| 10 | Agent 异步执行 | 第三梯队 | 低 | 调度可用性 | diff --git a/解决缺失能力计划.md b/解决缺失能力计划.md new file mode 100644 index 0000000..8d78e27 --- /dev/null +++ b/解决缺失能力计划.md @@ -0,0 +1,429 @@ +# 解决缺失能力计划 + +## 总体策略 + +按梯队分阶段实施。每个阶段聚焦 2-3 个能力,完成即可独立提升系统质量。 +每阶段包含:涉及文件、核心改动、验证方式。 + +--- + +## 第一阶段:闭环质量(1-2 周) + +优先解决"输出质量验证",这是连接自主进化系统的最后一环。 + +### 1.1 输出质量验证 → 新增 `evaluator` 节点 + `self_review` 工具 + +**目标:** Agent 执行完后自动检查输出质量,不达标则自我修正。 + +**方案:** + +#### A. 新增 `self_review` 工具(第 35 个内置工具) + +``` +self_review(content="...", criteria="回答必须包含SQL优化建议和执行计划") +→ { + "score": 0.75, + "passed": true, + "issues": ["缺少具体执行计划"], + "suggestions": ["补充EXPLAIN输出分析"] + } +``` + +**实现:** 用 LLM 作为评判器(deepseek-v4-flash,轻量调用),根据 criteria 打分 0-1。 + +#### B. 新增 `evaluator` 工作流节点类型 + +在工作流引擎中添加 evaluator 节点: +- 接收上游节点输出 + 评判标准(criteria) +- 调用 LLM 评判输出质量 +- 输出 `{score, passed, issues, suggestions}` +- 配合 condition 节点:`passed=false` → 走修正分支 → 重试 + +#### C. AgentRuntime 内建 self-review + +在 AgentRuntime ReAct 循环末尾添加可选的 self-review 步骤: +- `AgentConfig` 添加 `self_review: bool = False` +- 启用后,Agent 在返回最终答案前自动调用 `self_review` +- 评分 < 0.6 则追加一轮修正迭代 + +**涉及文件:** + +| 文件 | 改动 | +|------|------| +| `backend/app/services/builtin_tools.py` | 新增 `self_review_tool` + `SELF_REVIEW_SCHEMA` | +| `backend/app/core/tools_bootstrap.py` | 34→35,import + register | +| `backend/app/services/workflow_engine.py` | 新增 `evaluator` 节点类型 | +| `backend/app/agent_runtime/core.py` | 末尾添加 self-review 步骤 | +| `backend/app/agent_runtime/schemas.py` | AgentConfig 添加 `self_review` 字段 | +| `frontend/src/utils/agentSkills.ts` | 新增条目 | + +**验证:** +1. `self_review("SELECT * FROM t", "需要包含索引建议")` → score < 0.5 +2. 工作流:agent → evaluator(criteria="...") → condition(passed?) → 正常结束 / 修正重试 +3. 全能助手对话测试:问一个复杂问题,开启 self_review,确认输出质量提升 + +--- + +### 1.2 节点级自动重试 + +**目标:** error_handler 从空壳变为真正的重试机制。 + +**方案:** + +修改 `error_handler` 节点执行逻辑: +``` +当上游节点 status == "failed" 时: + 1. 检查 retry_count(剩余重试次数) + 2. 等待 retry_delay 秒 + 3. 重新执行上游节点(复用 get_node_input + execute_node) + 4. 成功 → 继续流程 + 5. 失败且 retry_count > 0 → 循环 + 6. 失败且 retry_count == 0 → 触发 on_error 动作 +``` + +`on_error` 动作: +- `stop`:抛出错误停止工作流 +- `notify`:发送告警继续 +- `skip`:跳过该节点继续 + +**涉及文件:** + +| 文件 | 改动 | +|------|------| +| `backend/app/services/workflow_engine.py` | 重写 error_handler 节点逻辑 | +| `backend/app/core/exceptions.py` | 添加 `NodeRetryExhausted` 异常 | + +**验证:** +1. 模拟 LLM 超时 → error_handler 重试 3 次 → 第 3 次成功 → 继续 +2. 模拟连续失败 → error_handler 耗尽重试 → 触发 on_error=notify → 告警记录 + +--- + +## 第二阶段:编排整合(2-3 周) + +### 2.1 Orchestrator 进入工作流 + +**目标:** 4 种编排模式变为工作流节点类型。 + +**方案:** + +新增 `orchestrator` 工作流节点类型,数据配置: +```json +{ + "mode": "route | sequential | debate | pipeline", + "agents": ["agent_id_1", "agent_id_2", ...], + "routing_prompt": "用于 route 模式的路由指令", + "aggregation_prompt": "用于 debate 模式的汇总指令" +} +``` + +`workflow_integration.py` 添加 `run_orchestrator_node()`: +- 解析 node_data 中的 agent 列表 +- 从 DB 加载 Agent 配置 +- 创建 AgentOrchestrator 实例 +- 执行对应模式,返回结构化结果 + +**涉及文件:** + +| 文件 | 改动 | +|------|------| +| `backend/app/agent_runtime/workflow_integration.py` | 新增 `run_orchestrator_node()` | +| `backend/app/services/workflow_engine.py` | 新增 orchestrator 节点类型 | +| `frontend/src/utils/agentSkills.ts` | 无需改动(后端节点类型) | + +**验证:** +1. 创建工作流:start → orchestrator(debate, [AgentA, AgentB, AgentC]) → end +2. 执行:确认 3 个 Agent 均被调用,结果被汇总 +3. 创建工作流:start → orchestrator(route, [代码助手, 数据分析师]) → end,确认按路由分发 + +--- + +### 2.2 工具级人工审批 + +**目标:** 危险工具执行前暂停等待用户确认。 + +**方案:** + +#### A. AgentToolConfig 扩展 +```python +class AgentToolConfig(BaseModel): + include_tools: List[str] = [] + exclude_tools: List[str] = [] + require_approval: List[str] = [] # 新增:需要审批的工具名列表 +``` + +#### B. AgentRuntime 审批拦截 +在 ReAct 循环的工具执行环节(`core.py` 约 287 行): +``` +if tool_name in config.tools.require_approval: + yield ApprovalRequest(tool_name, tool_args) # 暂停,等待外部确认 + decision = await wait_for_approval() # 阻塞等待 + if decision != "approved": + continue # 跳过该工具调用 +``` + +#### C. WebSocket 审批通道 +- AgentRuntime 暂停时通过 WebSocket 推送审批请求 +- 用户通过 WebSocket 回复 approve/deny +- 超时默认策略可配置(approve/deny/skip) + +**涉及文件:** + +| 文件 | 改动 | +|------|------| +| `backend/app/agent_runtime/schemas.py` | AgentToolConfig 添加 require_approval | +| `backend/app/agent_runtime/core.py` | 工具执行前审批拦截 | +| `backend/app/websocket/manager.py` | 添加审批消息类型 | + +**验证:** +1. 配置 Agent: `require_approval: ["deploy_push"]` +2. Agent 调用 deploy_push → WebSocket 收到审批请求 +3. 用户 approve → Agent 继续执行 +4. 用户 deny → Agent 跳过该工具 + +--- + +## 第三阶段:性能与效率(2-3 周) + +### 3.1 并行执行 + +**目标:** 无依赖节点并行执行,Orchestrator debate 并行。 + +**方案:** + +#### A. 工作流 DAG 并行 +修改 `execute()` 主循环: +``` +当前:每轮取 1 个节点执行 +改为:每轮取所有"前置节点全部完成"的节点,asyncio.gather 并行执行 +``` + +注意事项: +- 并行节点不能有共享状态依赖 +- 需要分别捕获每个并行节点的异常 +- WebSocket 需支持多节点同时推送进度 + +#### B. Orchestrator debate 并行 +```python +# 改前 +for agent in agents: + result = await agent.run(query) + +# 改后 +results = await asyncio.gather(*[agent.run(query) for agent in agents]) +``` + +**涉及文件:** + +| 文件 | 改动 | +|------|------| +| `backend/app/services/workflow_engine.py` | execute() 并行执行 | +| `backend/app/agent_runtime/orchestrator.py` | debate/sequential 并行 | +| `backend/app/websocket/manager.py` | 支持多节点并发推送 | + +**验证:** +1. 3 个独立分支的 DAG → 总耗时 ≈ 最长分支耗时(而非三者之和) +2. debate 模式 3 Agent → 总耗时 ≈ 最慢 Agent 耗时 + +--- + +### 3.2 粒度进度上报 + +**目标:** 实时推送"第 3/7 步完成"到前端。 + +**方案:** + +#### A. WorkflowEngine 进度回调 +在 `execute()` 循环中添加钩子: +```python +async def on_progress(self, current_step, total_steps, node_name, status): + # 推送到 WebSocket + await ws_manager.broadcast(execution_id, { + "type": "progress", + "current": current_step, + "total": total_steps, + "node": node_name, + "status": status, + "percent": int(current_step / total_steps * 100), + }) +``` + +#### B. 统计 DAG 总步数 +执行前遍历拓扑排序结果,计算可达节点总数作为 total_steps。 + +#### C. WebSocket 改为推送模式 +当前是轮询 DB,改为 WorkflowEngine 主动调用 WebSocket manager 推送。 + +**涉及文件:** + +| 文件 | 改动 | +|------|------| +| `backend/app/services/workflow_engine.py` | 添加 on_progress 回调 + 步数统计 | +| `backend/app/websocket/manager.py` | 添加 execution 进度推送方法 | +| `backend/app/api/websocket.py` | 适配新的推送模式 | + +**验证:** +1. 执行 7 节点的 DAG → WebSocket 收到 7 条进度消息 +2. 前端进度条从 0% 逐步递增到 100% + +--- + +### 3.3 结果缓存 + +**目标:** 相同输入不重复计算。 + +**方案:** + +添加缓存层,使用 Redis: +```python +# 工具结果缓存 +cache_key = f"tool:{tool_name}:{hash(json.dumps(args))}" +cached = await redis.get(cache_key) +if cached: + return cached.decode() + +# LLM 响应缓存(相同 prompt + 相同 messages) +cache_key = f"llm:{hash(messages_json)}" +``` + +配置: +- TTL 默认 1 小时 +- 工具维度可选开启/关闭 +- 确定性工具(file_read、math_calculate)默认开启 +- 非确定性工具(web_search、http_request)默认关闭 + +**涉及文件:** + +| 文件 | 改动 | +|------|------| +| `backend/app/agent_runtime/tool_manager.py` | 工具执行前后加缓存层 | +| `backend/app/agent_runtime/core.py` | LLM 调用前后加缓存层 | +| `backend/app/core/config.py` | 添加缓存配置项 | + +**验证:** +1. 连续两次相同 file_read → 第二次命中缓存(耗时 < 1ms) +2. 连续两次相同 math_calculate → 第二次命中缓存 + +--- + +## 第四阶段:容错与共享(2-3 周) + +### 4.1 降级/回退链 + +**目标:** 模型/Agent 失败自动切换备用方案。 + +**方案:** + +AgentLLMConfig 扩展: +```python +class AgentLLMConfig(BaseModel): + provider: str = "openai" + model: str = "gpt-4o-mini" + fallback_llm: Optional['AgentLLMConfig'] = None # 降级模型 +``` + +Agent 节点 data 扩展: +```json +{ + "fallback_agent": "备选Agent的ID" // 可选 +} +``` + +执行逻辑: +1. 主模型调用失败 → 切换 fallback_llm 重试 +2. Agent 执行失败 → 查找 fallback_agent 重试 +3. 所有备用方案都失败 → 抛出错误 + +**涉及文件:** + +| 文件 | 改动 | +|------|------| +| `backend/app/agent_runtime/schemas.py` | AgentLLMConfig 添加 fallback_llm | +| `backend/app/agent_runtime/core.py` | LLM 调用失败后切换 fallback | +| `backend/app/services/workflow_engine.py` | Agent 节点失败后切换 fallback_agent | + +--- + +### 4.2 Agent 间知识共享 + +**目标:** 打破记忆隔离,Agent 间共享知识。 + +**方案:** + +添加全局知识索引层: +``` +每个 Agent 执行完成后 → 提取关键知识 → 写入 global_knowledge 表 +Agent 初始化时 → 从 global_knowledge 加载相关条目 +``` + +实现: +- 新增 `GlobalKnowledge` 模型(content, embedding, source_agent_id, tags, created_at) +- `AgentMemory.initialize()` 添加全局知识检索步骤 +- 自主进化创建的 Agent 自动继承创建者的知识 + +**涉及文件:** + +| 文件 | 改动 | +|------|------| +| `backend/app/models/agent.py` | 新增 GlobalKnowledge 模型 | +| `backend/app/agent_runtime/memory.py` | initialize() 添加全局知识加载 | +| `backend/app/agent_runtime/core.py` | 执行完毕提取知识写入全局 | + +--- + +### 4.3 Agent 异步执行实现 + +**目标:** 填空 `execute_agent_task`,支持真正异步 Agent 执行。 + +**方案:** + +`execute_agent_task` 当前是空壳,需要实现: +```python +@celery_app.task +def execute_agent_task(agent_id, query, user_id): + db = SessionLocal() + agent = db.query(Agent).filter(Agent.id == agent_id).first() + config = build_agent_config_from_db(agent) + runtime = AgentRuntime(config) + result = asyncio.run(runtime.run(query)) + # 更新 execution 记录 + return {"status": "completed", "output": result.content} +``` + +**涉及文件:** + +| 文件 | 改动 | +|------|------| +| `backend/app/tasks/agent_tasks.py` | 实现 execute_agent_task | +| `backend/app/tasks/scheduler_tasks.py` | 调度触发真正的异步执行 | + +--- + +## 实施路线图总览 + +``` +第一阶段(1-2周) + ├── 1.1 输出质量验证:self_review工具 + evaluator节点 + AgentRuntime自检 + └── 1.2 节点级自动重试:error_handler从空壳到真正重试 + +第二阶段(2-3周) + ├── 2.1 Orchestrator进入工作流:新增orchestrator节点类型 + └── 2.2 工具级人工审批:AgentToolConfig审批标记 + WebSocket审批通道 + +第三阶段(2-3周) + ├── 3.1 并行执行:DAG并行 + Orchestrator debate并行 + ├── 3.2 粒度进度上报:on_progress回调 + WebSocket推送 + └── 3.3 结果缓存:Redis缓存层 + +第四阶段(2-3周) + ├── 4.1 降级/回退链:fallback_llm + fallback_agent + ├── 4.2 Agent知识共享:GlobalKnowledge + 跨Agent检索 + └── 4.3 Agent异步执行:填空execute_agent_task +``` + +## 优先级逻辑 + +- **第一阶段优先**:质量验证是自主进化的最后一环,没有它进化无方向;重试是稳定性基础 +- **第二阶段其次**:编排整合让复杂任务可视化、安全化 +- **第三阶段提速**:并行 + 缓存大幅提升性能(生产部署前必备) +- **第四阶段收尾**:容错和共享让系统长期运行可持续