diff --git a/backend/app/agent_runtime/core.py b/backend/app/agent_runtime/core.py index 4f8b1bd..e8f3b0e 100644 --- a/backend/app/agent_runtime/core.py +++ b/backend/app/agent_runtime/core.py @@ -311,6 +311,25 @@ class AgentRuntime: except (json.JSONDecodeError, TypeError): targs = {} + # 工具执行前审批检查 + if tname in self.config.tools.require_approval: + from app.services.approval_manager import approval_manager as _am + logger.info("Agent 工具需审批 [%s]: %s", tname, targs) + approval_req = await _am.submit( + tool_name=tname, args=targs, + timeout_ms=self.config.tools.approval_timeout_ms, + ) + decision = approval_req.decision + if decision == "denied": + result = f"[审批拒绝] 工具 {tname} 需要人工审批但被拒绝。" + self.context.add_tool_result(tcid, tname, result) + continue + elif decision == "skip": + result = f"[审批跳过] 工具 {tname} 被跳过。" + self.context.add_tool_result(tcid, tname, result) + continue + # decision == "approved" → 继续执行 + logger.info("Agent 执行工具 [%s]: %s", tname, targs) result = await self.tool_manager.execute(tname, targs) @@ -591,6 +610,34 @@ class AgentRuntime: "iteration": self.context.iteration, } + # 工具执行前审批检查(流式:先 create → yield 事件带 ID → 等待决定) + if tname in self.config.tools.require_approval: + from app.services.approval_manager import approval_manager as _am + logger.info("Agent 工具需审批 [%s]: %s", tname, targs) + approval_req = _am.create(tool_name=tname, args=targs) + yield { + "type": "approval_required", + "approval_id": approval_req.approval_id, + "tool_name": tname, + "args": targs, + "iteration": self.context.iteration, + } + decision = await _am.wait_for_decision( + approval_req.approval_id, + timeout_ms=self.config.tools.approval_timeout_ms, + ) + if decision == "denied": + result = f"[审批拒绝] 工具 {tname} 需要人工审批但被拒绝。" + yield {"type": "tool_result", "name": tname, "result": result, "iteration": self.context.iteration} + self.context.add_tool_result(tcid, tname, result) + continue + elif decision == "skip": + result = f"[审批跳过] 工具 {tname} 被跳过。" + yield {"type": "tool_result", "name": tname, "result": result, "iteration": self.context.iteration} + self.context.add_tool_result(tcid, tname, result) + continue + # decision == "approved" → 继续执行 + logger.info("Agent 执行工具 [%s]: %s", tname, targs) result = await self.tool_manager.execute(tname, targs) diff --git a/backend/app/agent_runtime/schemas.py b/backend/app/agent_runtime/schemas.py index 5643908..bca2bc7 100644 --- a/backend/app/agent_runtime/schemas.py +++ b/backend/app/agent_runtime/schemas.py @@ -12,6 +12,9 @@ class AgentToolConfig(BaseModel): # 若为空列表则使用全部已注册工具 include_tools: List[str] = Field(default_factory=list, description="允许的工具名称白名单") exclude_tools: List[str] = Field(default_factory=list, description="排除的工具名称黑名单") + require_approval: List[str] = Field(default_factory=list, description="需要人工审批的工具名列表") + approval_timeout_ms: int = Field(default=60000, description="审批超时(毫秒),超时使用默认策略") + approval_default: str = Field(default="deny", description="超时默认策略: approve | deny | skip") class AgentMemoryConfig(BaseModel): diff --git a/backend/app/agent_runtime/workflow_integration.py b/backend/app/agent_runtime/workflow_integration.py index 0810f42..b5a7e79 100644 --- a/backend/app/agent_runtime/workflow_integration.py +++ b/backend/app/agent_runtime/workflow_integration.py @@ -129,3 +129,163 @@ async def run_agent_node( "status": "error", "error": result.error, } + + +async def run_orchestrator_node( + node_data: Dict[str, Any], + input_data: Dict[str, Any], + execution_logger: Optional[Any] = None, + user_id: Optional[str] = None, + on_tool_executed: Optional[Any] = None, + on_llm_invocation: Optional[Any] = None, +) -> Dict[str, Any]: + """ + 在工作流中执行多 Agent 编排节点。 + + node_data 支持的字段: + mode — "route" | "sequential" | "debate" | "pipeline" + agents — Agent ID 列表(必填,至少 2 个) + routing_prompt — route 模式的路由指令(可选) + aggregation_prompt— debate 模式的汇总指令(可选) + model — 覆盖各 Agent 的模型(可选) + provider — 覆盖各 Agent 的提供商(可选) + temperature — 覆盖各 Agent 的温度(可选) + max_iterations — 覆盖各 Agent 的最大步数(可选) + + input_data 中的 "query" 或 "input" 字段作为用户输入。 + """ + from sqlalchemy.orm import Session + from app.core.database import SessionLocal + from app.models.agent import Agent + from app.agent_runtime.orchestrator import ( + AgentOrchestrator, + OrchestratorAgentConfig, + ) + + # 1. 解析输入 + query = ( + input_data.get("query") + or input_data.get("input") + or input_data.get("text", "") + ) + if not isinstance(query, str): + query = str(query) if query else "" + + if not query: + return {"output": "错误:Orchestrator 节点未收到用户输入", "status": "error"} + + # 2. 解析编排模式 + mode = node_data.get("mode", "debate").lower() + if mode not in ("route", "sequential", "debate", "pipeline"): + return {"output": f"错误:不支持的编排模式 '{mode}',可选: route, sequential, debate, pipeline", "status": "error"} + + # 3. 解析 Agent 列表 + agent_ids = node_data.get("agents", []) + if not agent_ids or not isinstance(agent_ids, list) or len(agent_ids) < 1: + return {"output": "错误:Orchestrator 节点需要至少 1 个 Agent", "status": "error"} + + # 4. 从 DB 加载 Agent 配置 + db: Optional[Session] = None + try: + db = SessionLocal() + + # 覆盖配置(可选) + override_model = node_data.get("model") + override_provider = node_data.get("provider") + override_temperature = node_data.get("temperature") + override_max_iterations = node_data.get("max_iterations") + + agent_configs: list = [] + for agent_id in agent_ids: + agent = db.query(Agent).filter(Agent.id == agent_id).first() + if not agent: + logger.warning("Orchestrator: Agent %s 不存在,跳过", agent_id) + continue + + # 从 workflow_config 提取 Agent 的 LLM 配置 + wc = agent.workflow_config or {} + nodes = wc.get("nodes", []) + system_prompt = agent.description or "" + model = override_model or "deepseek-v4-flash" + provider = override_provider or "deepseek" + temperature = float(override_temperature) if override_temperature else 0.7 + max_iterations = int(override_max_iterations) if override_max_iterations else 10 + tools_whitelist: list = [] + + for n in nodes: + if n.get("type") not in ("agent", "llm", "template"): + continue + cfg = n.get("data", {}) if isinstance(n, dict) else getattr(n, "data", {}) + system_prompt = cfg.get("system_prompt", "") or system_prompt + if not override_model: + model = cfg.get("model", model) + if not override_provider: + provider = cfg.get("provider", provider) + if not override_temperature: + temperature = float(cfg.get("temperature", temperature)) + if not override_max_iterations: + max_iterations = int(cfg.get("max_iterations", max_iterations)) + tools_whitelist = cfg.get("tools", tools_whitelist) + break + + agent_configs.append(OrchestratorAgentConfig( + id=agent.id, + name=agent.name or "Agent", + system_prompt=system_prompt, + model=model, + provider=provider, + temperature=temperature, + max_iterations=max_iterations, + tools=tools_whitelist, + description=agent.description or "", + )) + + if not agent_configs: + return {"output": "错误:没有找到可用的 Agent", "status": "error"} + + # 5. 创建 Orchestrator 并执行 + orchestrator = AgentOrchestrator( + default_llm_config=AgentLLMConfig( + model=override_model or "deepseek-v4-flash", + temperature=0.3, + ), + ) + result = await orchestrator.run( + mode=mode, + question=query, + agents=agent_configs, + on_llm_call=on_llm_invocation, + ) + + # 6. 返回结构化结果 + return { + "output": result.final_answer, + "status": "success", + "orchestrator_meta": { + "mode": result.mode, + "agent_count": len(agent_configs), + "steps": [ + { + "agent_id": s.agent_id, + "agent_name": s.agent_name, + "input": s.input[:200] if s.input else "", + "output": s.output[:500] if s.output else "", + "iterations_used": s.iterations_used, + "tool_calls_made": s.tool_calls_made, + "error": s.error, + } + for s in result.steps + ], + }, + } + + except Exception as e: + logger.error("Orchestrator 节点执行失败: %s", e, exc_info=True) + return { + "output": None, + "status": "failed", + "error": f"Orchestrator 执行失败: {e}", + } + finally: + if db: + db.close() diff --git a/backend/app/api/approval.py b/backend/app/api/approval.py new file mode 100644 index 0000000..de2423a --- /dev/null +++ b/backend/app/api/approval.py @@ -0,0 +1,46 @@ +"""工具审批 REST API — 前端提交审批决定""" + +from __future__ import annotations + +import logging + +from fastapi import APIRouter, HTTPException +from pydantic import BaseModel, Field + +from app.services.approval_manager import approval_manager + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/api/v1/approval", tags=["approval"]) + + +class ApprovalDecisionRequest(BaseModel): + decision: str = Field(..., description="审批决定: approved | denied | skip") + + +@router.post("/{approval_id}/resolve") +async def resolve_approval(approval_id: str, req: ApprovalDecisionRequest): + """提交工具审批决定。 + + - **approved**: 批准执行 + - **denied**: 拒绝执行(Agent 会收到拒绝提示) + - **skip**: 跳过该工具(Agent 会跳过继续) + """ + ok = approval_manager.resolve(approval_id, req.decision) + if not ok: + raise HTTPException(status_code=404, detail=f"审批请求不存在或已完成: {approval_id}") + return {"success": True, "approval_id": approval_id, "decision": req.decision} + + +@router.get("/{approval_id}") +async def get_approval(approval_id: str): + """查询待审批请求详情(用于前端展示工具名和参数)。""" + req = approval_manager.get_pending(approval_id) + if not req: + raise HTTPException(status_code=404, detail=f"审批请求不存在或已完成: {approval_id}") + return { + "approval_id": req.approval_id, + "tool_name": req.tool_name, + "args": req.args, + "decision": req.decision, + } diff --git a/backend/app/main.py b/backend/app/main.py index 81154f1..53ed6ac 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -249,7 +249,7 @@ async def startup_event(): logger.error(f"灵犀长连接启动失败: {e}") # 注册路由 -from app.api import auth, uploads, workflows, executions, websocket, execution_logs, data_sources, agents, platform_templates, model_configs, webhooks, template_market, batch_operations, collaboration, permissions, monitoring, alert_rules, node_test, node_templates, tools, agent_chat, agent_monitoring, knowledge_base, agent_schedules, notifications, feishu_bind +from app.api import auth, uploads, workflows, executions, websocket, execution_logs, data_sources, agents, platform_templates, model_configs, webhooks, template_market, batch_operations, collaboration, permissions, monitoring, alert_rules, node_test, node_templates, tools, agent_chat, agent_monitoring, knowledge_base, agent_schedules, notifications, feishu_bind, approval app.include_router(auth.router) app.include_router(uploads.router) @@ -277,6 +277,7 @@ app.include_router(knowledge_base.router) app.include_router(agent_schedules.router) app.include_router(notifications.router) app.include_router(feishu_bind.router) +app.include_router(approval.router) if __name__ == "__main__": import uvicorn diff --git a/backend/app/services/approval_manager.py b/backend/app/services/approval_manager.py new file mode 100644 index 0000000..d2b867a --- /dev/null +++ b/backend/app/services/approval_manager.py @@ -0,0 +1,94 @@ +"""工具级人工审批管理器 — asyncio.Event 驱动的异步审批等待/唤醒""" + +from __future__ import annotations + +import asyncio +import logging +import uuid +from dataclasses import dataclass, field +from typing import Any, Dict, Optional + +logger = logging.getLogger(__name__) + + +@dataclass +class ApprovalRequest: + approval_id: str + tool_name: str + args: Dict[str, Any] + event: asyncio.Event = field(default_factory=asyncio.Event) + decision: str = "deny" # approved | denied | skip + + +class ApprovalManager: + """全局单例 — 管理工具执行前的审批等待/唤醒。""" + + _pending: Dict[str, ApprovalRequest] = {} + + def create(self, tool_name: str, args: Dict[str, Any]) -> ApprovalRequest: + """创建审批请求(不等待),返回 ApprovalRequest(含 approval_id)。 + + 用于流式场景:先 create → yield SSE 事件(带 approval_id)→ wait_for_decision。 + """ + approval_id = str(uuid.uuid4())[:8] + req = ApprovalRequest( + approval_id=approval_id, + tool_name=tool_name, + args=args, + ) + self._pending[approval_id] = req + logger.info("审批请求已创建: id=%s tool=%s", approval_id, tool_name) + return req + + async def wait_for_decision(self, approval_id: str, timeout_ms: int = 60000) -> str: + """等待审批决定(带超时)。返回 "approved" | "denied" | "skip" """ + req = self._pending.get(approval_id) + if not req: + return "deny" + try: + await asyncio.wait_for(req.event.wait(), timeout=timeout_ms / 1000.0) + except asyncio.TimeoutError: + logger.warning("审批请求超时: id=%s", approval_id) + req.decision = "deny" + self._pending.pop(approval_id, None) + return req.decision + + async def submit(self, tool_name: str, args: Dict[str, Any], timeout_ms: int = 60000) -> ApprovalRequest: + """提交审批请求并等待决策(带超时)— 非流式场景一步完成。 + + Returns: + ApprovalRequest(含 decision 字段,调用方读取即可) + """ + req = self.create(tool_name, args) + try: + await asyncio.wait_for(req.event.wait(), timeout=timeout_ms / 1000.0) + except asyncio.TimeoutError: + logger.warning("审批请求超时: id=%s tool=%s", req.approval_id, tool_name) + req.decision = "deny" + self._pending.pop(req.approval_id, None) + return req + + def resolve(self, approval_id: str, decision: str) -> bool: + """外部(API)调用,写入审批决定并唤醒等待方。 + + Returns: + True 表示成功唤醒,False 表示审批 ID 无效或已完成。 + """ + req = self._pending.get(approval_id) + if not req: + logger.warning("审批 ID 无效或已完成: %s", approval_id) + return False + if decision not in ("approved", "denied", "skip"): + decision = "deny" + req.decision = decision + req.event.set() + logger.info("审批已解决: id=%s decision=%s", approval_id, decision) + return True + + def get_pending(self, approval_id: str) -> Optional[ApprovalRequest]: + """查询待审批请求(用于前端展示详情)。""" + return self._pending.get(approval_id) + + +# 全局单例 +approval_manager = ApprovalManager() diff --git a/backend/app/services/workflow_engine.py b/backend/app/services/workflow_engine.py index cb19b9d..698e832 100644 --- a/backend/app/services/workflow_engine.py +++ b/backend/app/services/workflow_engine.py @@ -1966,6 +1966,53 @@ class WorkflowEngine: "error": f"Agent 执行失败: {e}", } + elif node_type == 'orchestrator': + # 多 Agent 编排节点:route / sequential / debate / pipeline + if self.logger: + self.logger.info( + "Orchestrator 节点开始执行", + data={"node_id": node_id, "input": input_data}, + ) + try: + from app.agent_runtime.workflow_integration import run_orchestrator_node + + _agent_on_tool = None + if hasattr(self, '_on_tool_executed_budget'): + _agent_on_tool = self._on_tool_executed_budget + + # Orchestrator 内各 Agent 的 LLM 调用计入工作流预算 + def _on_agent_llm(): + self._llm_invocations += 1 + if self._llm_invocations > self._cap_llm: + raise WorkflowExecutionError( + detail=f"已超过 LLM 节点调用预算({self._cap_llm} 次)", + ) + + result = await run_orchestrator_node( + node_data=node.get("data", {}), + input_data=input_data, + execution_logger=self.logger, + user_id=self.trusted_model_config_user_id, + on_tool_executed=_agent_on_tool, + on_llm_invocation=_on_agent_llm, + ) + if self.logger: + duration = int((time.time() - start_time) * 1000) + self.logger.log_node_complete( + node_id, node_type, result.get("output"), duration, + ) + return result + except Exception as e: + if self.logger: + duration = int((time.time() - start_time) * 1000) + self.logger.log_node_error(node_id, node_type, e, duration) + logger.error(f"Orchestrator 节点执行失败: {e}", exc_info=True) + return { + "output": None, + "status": "failed", + "error": f"Orchestrator 执行失败: {e}", + } + elif node_type == 'condition': # 条件节点:判断分支(output 必须透传上游 dict,否则 sourceHandle true/false 下游只收到布尔值,丢失 reply/memory) condition = node.get('data', {}).get('condition', '') diff --git a/frontend/src/views/AgentChat.vue b/frontend/src/views/AgentChat.vue index 52de537..973e676 100644 --- a/frontend/src/views/AgentChat.vue +++ b/frontend/src/views/AgentChat.vue @@ -204,6 +204,22 @@ 关闭 + + + +
+ {{ approvalToolName }} + 需要人工审批才能执行 +
+
+
{{ JSON.stringify(approvalArgs, null, 2) }}
+
+ +
@@ -249,6 +265,18 @@ interface ChatState { orchestrateAgents: OrchestrateAgentForm[] } +async function resolveApproval(decision: string) { + try { + await api.post(`/api/v1/approval/${approvalId.value}/resolve`, { decision }) + } catch { + // 审批已超时或已处理,忽略 + } + showApprovalDialog.value = false + approvalId.value = '' + approvalToolName.value = '' + approvalArgs.value = {} +} + function saveState() { try { const state: ChatState = { @@ -285,6 +313,11 @@ const messages = ref>({}) const inputMessage = ref('') const loading = ref(false) const streamingActive = ref(false) +// 工具审批状态 +const showApprovalDialog = ref(false) +const approvalId = ref('') +const approvalToolName = ref('') +const approvalArgs = ref>({}) const messagesRef = ref(null) const sessionId = ref>({}) const agent = ref(null) @@ -486,6 +519,11 @@ async function sendMessage() { tool_name: data.name, tool_result: data.result, }) + } else if (eventType === 'approval_required') { + approvalId.value = data.approval_id || '' + approvalToolName.value = data.tool_name || '' + approvalArgs.value = data.args || {} + showApprovalDialog.value = true } else if (eventType === 'final') { currentMsg.content = data.content || '' currentMsg.iterations = data.iterations_used || 0