""" Agent Orchestrator — 多 Agent 编排引擎。 支持四种协作模式: - route: Router Agent 分析问题 → 分发到最合适的 Specialist Agent - sequential: Agent 流水线执行,前者输出作为后者输入 - debate: 多个 Agent 独立回答 → Aggregator 汇总为最终答案 - pipeline: Planner 制定计划 → Executor 逐步骤执行 → Reviewer 审查交付 """ from __future__ import annotations import asyncio import json import logging import uuid from typing import Any, Callable, Dict, List, Optional from pydantic import BaseModel, Field from app.agent_runtime import ( AgentRuntime, AgentConfig, AgentLLMConfig, AgentToolConfig, AgentResult, ) from app.agent_runtime.core import _LLMClient logger = logging.getLogger(__name__) def resolve_template_variables(template: str, context: Dict[str, Any]) -> str: """解析模板变量 {{key}} 和 {{previous.output.field}} 格式。 支持的格式: {{key}} → 从 context 顶层取值 {{previous.output}} → 上一 Agent 的完整输出 {{previous.output.field}} → 上一 Agent 输出的 JSON 字段 {{agent_name.output}} → 指定 Agent 的完整输出 {{agent_name.output.field}}→ 指定 Agent 输出的 JSON 字段 {{agent_id.output.field}} → 通过 ID 指定 Agent 的输出字段 """ import re def _resolve(match): expr = match.group(1).strip() parts = expr.split(".", 1) key = parts[0] if "." not in expr: # 简单变量 {{key}} return str(context.get(key, context.get("previous_output", ""))) # 多级路径: {{x.output}} 或 {{x.output.field}} namespace = key # "previous" or agent_name/agent_id rest = parts[1] # "output" or "output.field" value = context.get(f"{namespace}_output") if value is None: # 尝试从 agent_outputs 列表中查找 agent_outputs = context.get("agent_outputs", []) for ao in agent_outputs: if ao.get("agent_name") == namespace or ao.get("agent_id") == namespace: value = ao.get("output", "") break if value is None: return match.group(0) # 未找到,保持原样 if rest == "output": if isinstance(value, str): return value return json.dumps(value, ensure_ascii=False) # 字段路径: output.field.subfield fields = rest.split(".", 1) if fields[0] == "output": field_path = fields[1] if len(fields) > 1 else "" else: field_path = rest if not field_path: return str(value) # 尝试从 JSON 中提取字段 try: if isinstance(value, str): try: parsed = json.loads(value) except (json.JSONDecodeError, TypeError): return value else: parsed = value path_parts = field_path.split(".") for p in path_parts: if isinstance(parsed, dict) and p in parsed: parsed = parsed[p] elif isinstance(parsed, list): try: idx = int(p) parsed = parsed[idx] except (ValueError, IndexError): return match.group(0) else: return match.group(0) return str(parsed) if parsed else match.group(0) except Exception: return match.group(0) return re.sub(r"\{\{(.+?)\}\}", _resolve, template) def validate_output_schema(output: Any, schema: Optional[Dict[str, Any]]) -> Optional[str]: """根据 JSON Schema 校验 Agent 输出。返回 None 表示通过,否则返回错误信息。""" if not schema: return None try: import jsonschema jsonschema.validate(output, schema) return None except ImportError: # jsonschema 未安装时跳过校验 logger.debug("jsonschema 未安装,跳过输出校验") return None except jsonschema.ValidationError as e: return str(e) except Exception as e: return str(e) class OrchestratorAgentConfig(BaseModel): """编排中单个 Agent 的配置""" id: str = Field(..., description="Agent 标识") name: str = Field(default="Agent", description="显示名称") system_prompt: str = Field(default="你是一个有用的AI助手。") model: str = Field(default="deepseek-v4-flash") provider: str = Field(default="deepseek") temperature: float = 0.7 max_iterations: int = 10 tools: List[str] = Field(default_factory=list, description="工具白名单,空=全部") description: str = Field(default="", description="Agent 专长描述(路由模式用)") class OrchestratorStep(BaseModel): """编排中的单步执行记录""" agent_id: str agent_name: str input: str = "" output: str = "" iterations_used: int = 0 tool_calls_made: int = 0 error: Optional[str] = None class OrchestratorResult(BaseModel): """编排执行结果""" mode: str final_answer: str steps: List[OrchestratorStep] = Field(default_factory=list) agent_results: List[Dict[str, Any]] = Field(default_factory=list) _ROUTER_SYSTEM_PROMPT = """你是一个路由调度员。你的任务是从以下 Specialist Agent 中选择一个最适合处理用户问题的 Agent。 可用的 Specialist Agent: {agent_list} 请返回 JSON 格式(不要 markdown 包裹),包含: 1. "selected_agent": 选中的 Agent ID 2. "reason": 选择理由(一句话) 规则: - 选择与问题最匹配的 Agent - 如果问题涉及多个领域,选择最相关的那个 - 必须从上述列表中选择,不能编造 Agent ID""" _PLANNER_SYSTEM_PROMPT = """你是一个任务规划员。将用户的问题拆解为可执行的步骤计划。 要求: 1. 分析问题的核心目标和子任务 2. 拆分 2-5 个具体、可操作的步骤 3. 步骤之间有明确的依赖顺序 4. 每个步骤包含预期输出 返回 JSON 格式(不要 markdown 包裹),严格按照以下结构: { "plan_title": "计划标题", "steps": [ {"step": 1, "description": "第一步做什么", "expected_output": "预期产出描述"}, {"step": 2, "description": "第二步做什么", "expected_output": "预期产出描述"} ], "success_criteria": "如何判断执行成功" }""" _EXECUTOR_STEP_PROMPT = """你正在执行一个计划中的步骤。 原始问题: {original_question} 计划标题: {plan_title} 当前步骤 ({current_step}/{total_steps}): {step_description} 预期输出: {expected_output} 前序步骤结果: {previous_output} 请专注执行当前步骤,使用可用工具完成任务。完成后输出本步骤的结果。""" _REVIEWER_SYSTEM_PROMPT = """你是一个质量审查员。审查计划执行结果,输出最终答案给用户。 原始问题: {original_question} 执行计划: {plan_title} 计划步骤: {plan_steps} 各步骤执行结果: {execution_results} 请: 1. 确认每个步骤是否完成 2. 汇总各步骤结果 3. 输出完整、清晰的最终答案 4. 如有改进空间,在末尾附加"改进建议" 最终答案应直接面向用户,不要提及内部步骤细节。""" _AGGREGATOR_SYSTEM_PROMPT = """你是一个回答汇总员。多个 AI Agent 对同一个问题给出了不同的回答。 请分析所有回答,输出一份综合的最终答案。 - 如果各 Agent 回答一致,合并要点 - 如果有分歧,指出不同观点并给出你的判断 - 以专业、清晰的格式输出最终答案""" class AgentOrchestrator: """ 多 Agent 编排器。 用法: orch = AgentOrchestrator() result = await orch.run("route", question, [agent1, agent2, agent3]) """ def __init__(self, default_llm_config: Optional[AgentLLMConfig] = None): self._default_llm = default_llm_config or AgentLLMConfig( model="deepseek-v4-flash", temperature=0.3, ) async def run( self, mode: str, question: str, agents: List[OrchestratorAgentConfig], on_llm_call: Optional[Callable[[Dict[str, Any]], Any]] = None, graph_nodes: Optional[List[Dict[str, Any]]] = None, graph_edges: Optional[List[Dict[str, Any]]] = None, ) -> OrchestratorResult: """执行多 Agent 编排。 Args: mode: route / sequential / debate / pipeline / graph question: 用户问题 agents: Agent 配置列表 on_llm_call: LLM 调用回调 graph_nodes: graph 模式的节点定义(mode=graph 时必填) graph_edges: graph 模式的边定义(mode=graph 时必填) """ mode = mode.lower() if mode == "route": return await self._route(question, agents, on_llm_call) elif mode == "sequential": return await self._sequential(question, agents, on_llm_call) elif mode == "debate": return await self._debate(question, agents, on_llm_call) elif mode == "pipeline": return await self._pipeline(question, agents, on_llm_call) elif mode == "graph": if not graph_nodes: raise ValueError("graph 模式需要提供 graph_nodes 参数") return await self._graph(question, graph_nodes, graph_edges or [], on_llm_call) else: raise ValueError(f"不支持的编排模式: {mode},可选: route, sequential, debate, pipeline, graph") async def _route( self, question: str, agents: List[OrchestratorAgentConfig], on_llm_call: Optional[Callable] = None, ) -> OrchestratorResult: """路由模式:Router → Specialist。""" # 构建 Agent 列表描述 agent_lines = [] for a in agents: desc = a.description or a.name agent_lines.append(f"- id: {a.id}, name: {a.name}, description: {desc}") agent_list_str = "\n".join(agent_lines) router_prompt = _ROUTER_SYSTEM_PROMPT.format(agent_list=agent_list_str) # 创建 Router Agent router_runtime = AgentRuntime( AgentConfig( name="router", system_prompt=router_prompt, llm=AgentLLMConfig( model=self._default_llm.model, temperature=0.1, # 低温度确保确定性 ), tools=AgentToolConfig( include_tools=[], # Router 不需要工具 ), ), on_llm_call=on_llm_call, ) router_result = await router_runtime.run(question) if not router_result.success: return OrchestratorResult( mode="route", final_answer=f"路由决策失败: {router_result.content}", steps=[], ) # 解析 Router 的输出 selected_agent_id = None try: parsed = json.loads(router_result.content.strip().removeprefix("```json").removesuffix("```").strip()) selected_agent_id = parsed.get("selected_agent", "") except (json.JSONDecodeError, AttributeError): # 尝试从文本中提取 for a in agents: if a.id in router_result.content: selected_agent_id = a.id break if not selected_agent_id: # 取第一个 selected_agent_id = agents[0].id if agents else "" # 找到对应的 Specialist Agent specialist = next((a for a in agents if a.id == selected_agent_id), agents[0] if agents else None) if not specialist: return OrchestratorResult( mode="route", final_answer="没有可用的 Specialist Agent", steps=[], ) # 运行 Specialist Agent specialist_runtime = AgentRuntime( AgentConfig( name=specialist.name, system_prompt=specialist.system_prompt, llm=AgentLLMConfig( model=specialist.model, provider=specialist.provider, temperature=specialist.temperature, max_iterations=specialist.max_iterations, ), tools=AgentToolConfig( include_tools=specialist.tools, ), ), on_llm_call=on_llm_call, ) specialist_result = await specialist_runtime.run(question) return OrchestratorResult( mode="route", final_answer=specialist_result.content, steps=[ OrchestratorStep( agent_id="router", agent_name="Router", input=question, output=f"选择: {specialist.name} ({specialist.id})", ), OrchestratorStep( agent_id=specialist.id, agent_name=specialist.name, input=question, output=specialist_result.content[:300], iterations_used=specialist_result.iterations_used, tool_calls_made=specialist_result.tool_calls_made, ), ], agent_results=[ {"agent_id": specialist.id, "agent_name": specialist.name, "output": specialist_result.content}, ], ) async def _sequential( self, question: str, agents: List[OrchestratorAgentConfig], on_llm_call: Optional[Callable] = None, ) -> OrchestratorResult: """顺序模式:Agent A 输出 → Agent B 输入。""" if not agents: return OrchestratorResult(mode="sequential", final_answer="无 Agent 可执行") steps: List[OrchestratorStep] = [] current_input = question for i, agent_cfg in enumerate(agents): runtime = AgentRuntime( AgentConfig( name=agent_cfg.name, system_prompt=agent_cfg.system_prompt, llm=AgentLLMConfig( model=agent_cfg.model, provider=agent_cfg.provider, temperature=agent_cfg.temperature, max_iterations=agent_cfg.max_iterations, ), tools=AgentToolConfig( include_tools=agent_cfg.tools, ), ), on_llm_call=on_llm_call, ) # 第一个 Agent 接收原始问题,后续 Agent 接收前一个的输出 agent_input = current_input if i > 0: # 构建模板上下文 template_ctx: Dict[str, Any] = {"previous_output": current_input} for j, prev_step in enumerate(steps): template_ctx[f"{prev_step.agent_name}_output"] = prev_step.output template_ctx[f"{prev_step.agent_id}_output"] = prev_step.output template_ctx["agent_outputs"] = [ {"agent_id": s.agent_id, "agent_name": s.agent_name, "output": s.output} for s in steps ] template_ctx["original_question"] = question # 解析 system_prompt 中的模板变量 resolved_system_prompt = resolve_template_variables( agent_cfg.system_prompt, template_ctx ) agent_input = ( f"这是前一个 Agent 的处理结果,请在此基础上继续处理。\n\n" f"原始问题: {question}\n\n" f"前序输出:\n{current_input}" ) # 重新创建 runtime(如果 system_prompt 被模板修改了) if resolved_system_prompt != agent_cfg.system_prompt: runtime = AgentRuntime( AgentConfig( name=agent_cfg.name, system_prompt=resolved_system_prompt, llm=AgentLLMConfig( model=agent_cfg.model, provider=agent_cfg.provider, temperature=agent_cfg.temperature, max_iterations=agent_cfg.max_iterations, ), tools=AgentToolConfig(include_tools=agent_cfg.tools), ), on_llm_call=on_llm_call, ) result = await runtime.run(agent_input) # Schema 校验输出 if getattr(agent_cfg, "output_schema", None): try: output_data = json.loads(result.content) except (json.JSONDecodeError, TypeError): output_data = result.content validation_error = validate_output_schema(output_data, agent_cfg.output_schema) if validation_error: logger.warning( "Agent %s 输出 Schema 校验失败: %s", agent_cfg.name, validation_error, ) step = OrchestratorStep( agent_id=agent_cfg.id, agent_name=agent_cfg.name, input=agent_input[:200], output=result.content[:500], iterations_used=result.iterations_used, tool_calls_made=result.tool_calls_made, error=None if result.success else result.error, ) steps.append(step) if not result.success: break current_input = result.content final_answer = steps[-1].output if steps else "无输出" return OrchestratorResult( mode="sequential", final_answer=final_answer, steps=steps, agent_results=[ {"agent_id": s.agent_id, "agent_name": s.agent_name, "output": s.output} for s in steps ], ) async def _debate( self, question: str, agents: List[OrchestratorAgentConfig], on_llm_call: Optional[Callable] = None, ) -> OrchestratorResult: """辩论模式:多 Agent 独立回答 → Aggregator 汇总。""" if not agents: return OrchestratorResult(mode="debate", final_answer="无 Agent 可执行") steps: List[OrchestratorStep] = [] agent_outputs: List[Dict[str, Any]] = [] # 第一阶段:所有 Agent 并行独立回答 runtimes = [] for agent_cfg in agents: runtimes.append(AgentRuntime( AgentConfig( name=agent_cfg.name, system_prompt=agent_cfg.system_prompt, llm=AgentLLMConfig( model=agent_cfg.model, provider=agent_cfg.provider, temperature=agent_cfg.temperature, max_iterations=agent_cfg.max_iterations, ), tools=AgentToolConfig( include_tools=agent_cfg.tools, ), ), on_llm_call=on_llm_call, )) results = await asyncio.gather( *[rt.run(question) for rt in runtimes], return_exceptions=True, ) for i, agent_cfg in enumerate(agents): result = results[i] if isinstance(result, BaseException): step = OrchestratorStep( agent_id=agent_cfg.id, agent_name=agent_cfg.name, input=question, output="", error=str(result), ) steps.append(step) agent_outputs.append({ "agent_id": agent_cfg.id, "agent_name": agent_cfg.name, "output": f"[错误] {result}", }) continue step = OrchestratorStep( agent_id=agent_cfg.id, agent_name=agent_cfg.name, input=question, output=result.content[:500], iterations_used=result.iterations_used, tool_calls_made=result.tool_calls_made, error=None if result.success else result.error, ) steps.append(step) agent_outputs.append({ "agent_id": agent_cfg.id, "agent_name": agent_cfg.name, "output": result.content, }) # 第二阶段:Aggregator 汇总所有回答 if len(agent_outputs) >= 2: outputs_text = "\n\n---\n\n".join( f"## {ao['agent_name']} 的回答\n{ao['output']}" for ao in agent_outputs ) aggregator_prompt = ( f"用户问题: {question}\n\n" f"以下是多个 AI Agent 对该问题的回答:\n\n{outputs_text}\n\n" "请综合所有回答,输出一份完整、准确的最终答案。" ) aggregator_runtime = AgentRuntime( AgentConfig( name="aggregator", system_prompt=_AGGREGATOR_SYSTEM_PROMPT, llm=AgentLLMConfig( model=self._default_llm.model, temperature=0.3, ), tools=AgentToolConfig(include_tools=[]), ), on_llm_call=on_llm_call, ) final_result = await aggregator_runtime.run(aggregator_prompt) final_answer = final_result.content steps.append(OrchestratorStep( agent_id="aggregator", agent_name="Aggregator", input="汇总各 Agent 回答", output=final_answer[:500], )) else: final_answer = agent_outputs[0]["output"] if agent_outputs else "无回答" return OrchestratorResult( mode="debate", final_answer=final_answer, steps=steps, agent_results=agent_outputs, ) async def _pipeline( self, question: str, agents: List[OrchestratorAgentConfig], on_llm_call: Optional[Callable] = None, ) -> OrchestratorResult: """流水线模式:Planner → Executor(逐步骤) → Reviewer。 使用内置的 Planner / Reviewer Agent,将用户提供的第一个 Agent 作为 Executor。 """ steps: List[OrchestratorStep] = [] # ── 1. Planner:制定计划 ── planner_runtime = AgentRuntime( AgentConfig( name="planner", system_prompt=_PLANNER_SYSTEM_PROMPT, llm=AgentLLMConfig( model=self._default_llm.model, temperature=0.2, ), tools=AgentToolConfig(include_tools=[]), ), on_llm_call=on_llm_call, ) planner_result = await planner_runtime.run(question) steps.append(OrchestratorStep( agent_id="planner", agent_name="Planner", input=question[:200], output=planner_result.content[:500], iterations_used=planner_result.iterations_used, tool_calls_made=planner_result.tool_calls_made, error=None if planner_result.success else planner_result.error, )) if not planner_result.success: return OrchestratorResult( mode="pipeline", final_answer=f"规划失败: {planner_result.content}", steps=steps, ) # 解析计划 plan = self._parse_plan(planner_result.content) plan_steps = plan.get("steps", []) if not plan_steps: return OrchestratorResult( mode="pipeline", final_answer="规划结果中没有有效的执行步骤", steps=steps, ) # ── 2. Executor:逐步骤执行(多 Agent 轮转分配)── executor_pool = agents if agents else [ OrchestratorAgentConfig( id="executor", name="Executor", system_prompt="你是一个有用的AI助手。", ) ] previous_output = "(尚无前序步骤)" execution_results = [] for step_info in plan_steps: step_num = step_info.get("step", 0) step_desc = step_info.get("description", f"步骤 {step_num}") step_expect = step_info.get("expected_output", "") # 按步骤轮转分配 Agent:不同步骤可分配给不同 Agent(按专长匹配) executor_cfg = executor_pool[(step_num - 1) % len(executor_pool)] executor_prompt = _EXECUTOR_STEP_PROMPT.format( original_question=question, plan_title=plan.get("plan_title", ""), current_step=step_num, total_steps=len(plan_steps), step_description=step_desc, expected_output=step_expect, previous_output=previous_output, ) executor_runtime = AgentRuntime( AgentConfig( name=executor_cfg.name, system_prompt=executor_cfg.system_prompt, llm=AgentLLMConfig( model=executor_cfg.model, provider=executor_cfg.provider, temperature=executor_cfg.temperature, max_iterations=executor_cfg.max_iterations, ), tools=AgentToolConfig( include_tools=executor_cfg.tools, ), ), on_llm_call=on_llm_call, ) step_result = await executor_runtime.run(executor_prompt) step_output = OrchestratorStep( agent_id=executor_cfg.id, agent_name=f"{executor_cfg.name} (步骤{step_num})", input=f"步骤{step_num}: {step_desc}", output=step_result.content[:500], iterations_used=step_result.iterations_used, tool_calls_made=step_result.tool_calls_made, error=None if step_result.success else step_result.error, ) steps.append(step_output) execution_results.append({ "step": step_num, "description": step_desc, "agent": executor_cfg.name, "output": step_result.content, "error": step_result.error if not step_result.success else None, }) previous_output = step_result.content if step_result.success else f"(步骤{step_num}执行出错)" if not step_result.success: logger.warning(f"Pipeline 步骤{step_num} ({executor_cfg.name}) 执行失败: {step_result.error}") # ── 3. Reviewer:审查并交付 ── plan_steps_text = "\n".join( f"步骤{s['step']}: {s['description']} → 预期: {s.get('expected_output', '')}" for s in plan_steps ) execution_text = "\n\n".join( f"【步骤{r['step']}】{r['description']}\n{r['output']}" for r in execution_results ) reviewer_prompt = _REVIEWER_SYSTEM_PROMPT.format( original_question=question, plan_title=plan.get("plan_title", ""), plan_steps=plan_steps_text, execution_results=execution_text, ) reviewer_runtime = AgentRuntime( AgentConfig( name="reviewer", system_prompt=reviewer_prompt, llm=AgentLLMConfig( model=self._default_llm.model, temperature=0.3, ), tools=AgentToolConfig(include_tools=[]), ), on_llm_call=on_llm_call, ) review_result = await reviewer_runtime.run( "请审查上述执行结果,输出最终答案。" ) steps.append(OrchestratorStep( agent_id="reviewer", agent_name="Reviewer", input="审查执行结果并输出最终答案", output=review_result.content[:500], iterations_used=review_result.iterations_used, tool_calls_made=review_result.tool_calls_made, error=None if review_result.success else review_result.error, )) return OrchestratorResult( mode="pipeline", final_answer=review_result.content if review_result.success else "审查环节失败", steps=steps, agent_results=execution_results, ) async def _graph( self, question: str, nodes: List[Dict[str, Any]], edges: List[Dict[str, Any]], on_llm_call: Optional[Callable] = None, ) -> OrchestratorResult: """图编排模式:按 DAG 拓扑顺序执行节点,支持 agent 和 condition 类型。""" if not nodes: return OrchestratorResult(mode="graph", final_answer="无节点可执行") # 建立节点索引 node_map: Dict[str, Dict[str, Any]] = {n["id"]: n for n in nodes} # 建立邻接表和入度 adj: Dict[str, List[tuple]] = {} # source_id → [(target_id, source_handle)] in_degree: Dict[str, int] = {n["id"]: 0 for n in nodes} for e in edges: src = e["source"] tgt = e["target"] sh = e.get("sourceHandle", "") if src not in adj: adj[src] = [] adj[src].append((tgt, sh)) if tgt in in_degree: in_degree[tgt] += 1 # 找起始节点(入度为 0) start_ids = [nid for nid, deg in in_degree.items() if deg == 0] if not start_ids: start_ids = [nodes[0]["id"]] steps: List[OrchestratorStep] = [] node_outputs: Dict[str, str] = {} # node_id → output text # BFS 拓扑执行 from collections import deque queue = deque(start_ids) # 将初始输入注入起始节点的"上游输出" for sid in start_ids: node_outputs[f"__input__{sid}"] = question while queue: node_id = queue.popleft() node = node_map.get(node_id) if not node: continue node_type = node.get("type", "agent") node_data = node.get("data", {}) # 收集上游输出作为本节点输入 upstream_inputs = [] for e in edges: if e["target"] == node_id: src_output = node_outputs.get(e["source"], "") if src_output: upstream_inputs.append(src_output) context_input = "\n\n".join(upstream_inputs) if upstream_inputs else question if node_type == "condition": # 条件节点:根据上游输出来决定走哪个分支 condition_expr = node_data.get("condition", "") condition_field = node_data.get("field", "output") # 取最后一个上游输出作为判断依据 last_output = upstream_inputs[-1] if upstream_inputs else question # 简单条件评估:支持 contains / not_contains / equals op = node_data.get("operator", "contains") value = node_data.get("value", "") result_true = self._eval_condition(last_output, op, value) branch = "true" if result_true else "false" steps.append(OrchestratorStep( agent_id=node_id, agent_name=f"条件: {condition_expr or node_data.get('name', node_id)}", input=f"判断: {op} '{value}' → {branch}", output=branch, )) node_outputs[node_id] = branch # 只沿匹配的分支继续 for tgt, sh in adj.get(node_id, []): if sh == branch: in_degree[tgt] -= 1 if in_degree[tgt] == 0: queue.append(tgt) continue # agent 节点:构建 AgentRuntime 并执行 agent_name = node_data.get("name", node_data.get("agent_name", node.get("label", node_id))) system_prompt = node_data.get("system_prompt", "你是一个有用的AI助手。") model = node_data.get("model", "deepseek-v4-flash") provider = node_data.get("provider", "deepseek") temperature = float(node_data.get("temperature", 0.7)) max_iterations = int(node_data.get("max_iterations", 10)) tools = node_data.get("tools", []) runtime = AgentRuntime( AgentConfig( name=agent_name, system_prompt=system_prompt, llm=AgentLLMConfig( model=model, provider=provider, temperature=temperature, max_iterations=max_iterations, ), tools=AgentToolConfig(include_tools=tools if isinstance(tools, list) else []), ), on_llm_call=on_llm_call, ) # 构建带上下文的输入 if len(upstream_inputs) > 1: agent_input = f"原始问题: {question}\n\n前序步骤的输出:\n{context_input}\n\n请基于以上信息继续处理。" elif len(upstream_inputs) == 1 and upstream_inputs[0] != question: agent_input = f"原始问题: {question}\n\n前一步输出:\n{upstream_inputs[0]}\n\n请基于以上信息继续处理。" else: agent_input = question result = await runtime.run(agent_input) steps.append(OrchestratorStep( agent_id=node_id, agent_name=agent_name, input=agent_input[:200], output=result.content[:500], iterations_used=result.iterations_used, tool_calls_made=result.tool_calls_made, error=None if result.success else result.error, )) node_outputs[node_id] = result.content if not result.success: logger.warning(f"Graph 节点 {agent_name} ({node_id}) 执行失败: {result.error}") # 将下游节点的入度减 1 for tgt, sh in adj.get(node_id, []): if tgt in in_degree: in_degree[tgt] -= 1 if in_degree[tgt] == 0: queue.append(tgt) # 收集最终输出(出度为 0 的节点) out_degree: Dict[str, int] = {n["id"]: 0 for n in nodes} for e in edges: out_degree[e["source"]] = out_degree.get(e["source"], 0) + 1 end_ids = [nid for nid, deg in out_degree.items() if deg == 0] if not end_ids: end_ids = [steps[-1].agent_id] if steps else [] final_parts = [] for eid in end_ids: out = node_outputs.get(eid, "") if out and out not in ("true", "false"): final_parts.append(out) final_answer = "\n\n".join(final_parts) if final_parts else (steps[-1].output if steps else "无输出") return OrchestratorResult( mode="graph", final_answer=final_answer, steps=steps, agent_results=[ {"agent_id": s.agent_id, "agent_name": s.agent_name, "output": s.output} for s in steps ], ) @staticmethod def _eval_condition(text: str, op: str, value: str) -> bool: """评估简单条件表达式。""" if op == "contains": return value.lower() in text.lower() elif op == "not_contains": return value.lower() not in text.lower() elif op == "equals": return text.strip().lower() == value.lower() elif op == "not_equals": return text.strip().lower() != value.lower() elif op == "starts_with": return text.strip().lower().startswith(value.lower()) elif op == "ends_with": return text.strip().lower().endswith(value.lower()) return True @staticmethod def _parse_plan(text: str) -> dict: """从 Planner 输出中解析 JSON 计划。""" import re # 尝试直接解析 cleaned = text.strip() # 移除 markdown 代码块包裹 cleaned = re.sub(r'^```(?:json)?\s*', '', cleaned) cleaned = re.sub(r'\s*```$', '', cleaned) try: return json.loads(cleaned) except json.JSONDecodeError: pass # 尝试提取 JSON 块 m = re.search(r'\{[\s\S]*\}', cleaned) if m: try: return json.loads(m.group()) except json.JSONDecodeError: pass # 兜底:返回基本结构 return { "plan_title": "执行计划", "steps": [{"step": 1, "description": text[:200], "expected_output": "完成"}], "success_criteria": text[:100], }