From 5423aca684d67f70ca0baa493528e1b3323078b9 Mon Sep 17 00:00:00 2001 From: renjianbo <18691577328@163.com> Date: Sat, 2 May 2026 11:03:51 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=96=B0=E5=A2=9E=20Pipeline=20?= =?UTF-8?q?=E6=B5=81=E6=B0=B4=E7=BA=BF=E7=BC=96=E6=8E=92=E6=A8=A1=E5=BC=8F?= =?UTF-8?q?=20(Planner=E2=86=92Executor=E2=86=92Reviewer)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 新增第四种编排模式 pipeline,实现规划→执行→审查的自动化流水线: - Planner 自动将问题拆解为 2-5 步 JSON 执行计划 - Executor 使用用户配置的 Agent 逐步骤执行 - Reviewer 审查全部输出并交付最终答案 - 前端编排模式选择器新增"流水线模式"选项 - 更新完善自主 AI Agent 改造完成情况文档 Co-Authored-By: Claude Sonnet 4.6 --- backend/app/agent_runtime/orchestrator.py | 243 ++++++++++++++++++- frontend/src/views/AgentChat.vue | 10 +- scripts/seed_agents.py | 269 ++++++++++++++++++++++ 自主AI Agent改造完成情况.md | 121 ++++++++-- 4 files changed, 624 insertions(+), 19 deletions(-) create mode 100644 scripts/seed_agents.py diff --git a/backend/app/agent_runtime/orchestrator.py b/backend/app/agent_runtime/orchestrator.py index 275207b..6f73c1d 100644 --- a/backend/app/agent_runtime/orchestrator.py +++ b/backend/app/agent_runtime/orchestrator.py @@ -1,10 +1,11 @@ """ Agent Orchestrator — 多 Agent 编排引擎。 -支持三种协作模式: +支持四种协作模式: - route: Router Agent 分析问题 → 分发到最合适的 Specialist Agent - sequential: Agent 流水线执行,前者输出作为后者输入 - debate: 多个 Agent 独立回答 → Aggregator 汇总为最终答案 +- pipeline: Planner 制定计划 → Executor 逐步骤执行 → Reviewer 审查交付 """ from __future__ import annotations @@ -73,6 +74,53 @@ _ROUTER_SYSTEM_PROMPT = """你是一个路由调度员。你的任务是从以 - 如果问题涉及多个领域,选择最相关的那个 - 必须从上述列表中选择,不能编造 Agent ID""" +_PLANNER_SYSTEM_PROMPT = """你是一个任务规划员。将用户的问题拆解为可执行的步骤计划。 + +要求: +1. 分析问题的核心目标和子任务 +2. 拆分 2-5 个具体、可操作的步骤 +3. 步骤之间有明确的依赖顺序 +4. 每个步骤包含预期输出 + +返回 JSON 格式(不要 markdown 包裹),严格按照以下结构: +{ + "plan_title": "计划标题", + "steps": [ + {"step": 1, "description": "第一步做什么", "expected_output": "预期产出描述"}, + {"step": 2, "description": "第二步做什么", "expected_output": "预期产出描述"} + ], + "success_criteria": "如何判断执行成功" +}""" + +_EXECUTOR_STEP_PROMPT = """你正在执行一个计划中的步骤。 + +原始问题: {original_question} +计划标题: {plan_title} + +当前步骤 ({current_step}/{total_steps}): {step_description} +预期输出: {expected_output} +前序步骤结果: +{previous_output} + +请专注执行当前步骤,使用可用工具完成任务。完成后输出本步骤的结果。""" + +_REVIEWER_SYSTEM_PROMPT = """你是一个质量审查员。审查计划执行结果,输出最终答案给用户。 + +原始问题: {original_question} +执行计划: {plan_title} +计划步骤: {plan_steps} + +各步骤执行结果: +{execution_results} + +请: +1. 确认每个步骤是否完成 +2. 汇总各步骤结果 +3. 输出完整、清晰的最终答案 +4. 如有改进空间,在末尾附加"改进建议" + +最终答案应直接面向用户,不要提及内部步骤细节。""" + _AGGREGATOR_SYSTEM_PROMPT = """你是一个回答汇总员。多个 AI Agent 对同一个问题给出了不同的回答。 请分析所有回答,输出一份综合的最终答案。 @@ -111,8 +159,10 @@ class AgentOrchestrator: return await self._sequential(question, agents, on_llm_call) elif mode == "debate": return await self._debate(question, agents, on_llm_call) + elif mode == "pipeline": + return await self._pipeline(question, agents, on_llm_call) else: - raise ValueError(f"不支持的编排模式: {mode},可选: route, sequential, debate") + raise ValueError(f"不支持的编排模式: {mode},可选: route, sequential, debate, pipeline") async def _route( self, question: str, agents: List[OrchestratorAgentConfig], @@ -375,3 +425,192 @@ class AgentOrchestrator: steps=steps, agent_results=agent_outputs, ) + + async def _pipeline( + self, question: str, agents: List[OrchestratorAgentConfig], + on_llm_call: Optional[Callable] = None, + ) -> OrchestratorResult: + """流水线模式:Planner → Executor(逐步骤) → Reviewer。 + + 使用内置的 Planner / Reviewer Agent,将用户提供的第一个 Agent 作为 Executor。 + """ + steps: List[OrchestratorStep] = [] + + # ── 1. Planner:制定计划 ── + planner_runtime = AgentRuntime( + AgentConfig( + name="planner", + system_prompt=_PLANNER_SYSTEM_PROMPT, + llm=AgentLLMConfig( + model=self._default_llm.model, + temperature=0.2, + ), + tools=AgentToolConfig(include_tools=[]), + ), + on_llm_call=on_llm_call, + ) + planner_result = await planner_runtime.run(question) + steps.append(OrchestratorStep( + agent_id="planner", agent_name="Planner", + input=question[:200], + output=planner_result.content[:500], + iterations_used=planner_result.iterations_used, + tool_calls_made=planner_result.tool_calls_made, + error=None if planner_result.success else planner_result.error, + )) + + if not planner_result.success: + return OrchestratorResult( + mode="pipeline", + final_answer=f"规划失败: {planner_result.content}", + steps=steps, + ) + + # 解析计划 + plan = self._parse_plan(planner_result.content) + plan_steps = plan.get("steps", []) + if not plan_steps: + return OrchestratorResult( + mode="pipeline", + final_answer="规划结果中没有有效的执行步骤", + steps=steps, + ) + + # ── 2. Executor:逐步骤执行 ── + executor_cfg = agents[0] if agents else OrchestratorAgentConfig( + id="executor", name="Executor", + system_prompt="你是一个有用的AI助手。", + ) + + previous_output = "(尚无前序步骤)" + execution_results = [] + + for step_info in plan_steps: + step_num = step_info.get("step", 0) + step_desc = step_info.get("description", f"步骤 {step_num}") + step_expect = step_info.get("expected_output", "") + + executor_prompt = _EXECUTOR_STEP_PROMPT.format( + original_question=question, + plan_title=plan.get("plan_title", ""), + current_step=step_num, + total_steps=len(plan_steps), + step_description=step_desc, + expected_output=step_expect, + previous_output=previous_output, + ) + + executor_runtime = AgentRuntime( + AgentConfig( + name=executor_cfg.name, + system_prompt=executor_cfg.system_prompt, + llm=AgentLLMConfig( + model=executor_cfg.model, + provider=executor_cfg.provider, + temperature=executor_cfg.temperature, + max_iterations=executor_cfg.max_iterations, + ), + tools=AgentToolConfig( + include_tools=executor_cfg.tools, + ), + ), + on_llm_call=on_llm_call, + ) + step_result = await executor_runtime.run(executor_prompt) + + step_output = OrchestratorStep( + agent_id=executor_cfg.id, + agent_name=f"{executor_cfg.name} (步骤{step_num})", + input=f"步骤{step_num}: {step_desc}", + output=step_result.content[:500], + iterations_used=step_result.iterations_used, + tool_calls_made=step_result.tool_calls_made, + error=None if step_result.success else step_result.error, + ) + steps.append(step_output) + execution_results.append({ + "step": step_num, + "description": step_desc, + "output": step_result.content, + "error": step_result.error if not step_result.success else None, + }) + + previous_output = step_result.content if step_result.success else f"(步骤{step_num}执行出错)" + + if not step_result.success: + logger.warning(f"Pipeline 步骤{step_num} 执行失败: {step_result.error}") + + # ── 3. Reviewer:审查并交付 ── + plan_steps_text = "\n".join( + f"步骤{s['step']}: {s['description']} → 预期: {s.get('expected_output', '')}" + for s in plan_steps + ) + execution_text = "\n\n".join( + f"【步骤{r['step']}】{r['description']}\n{r['output']}" + for r in execution_results + ) + + reviewer_prompt = _REVIEWER_SYSTEM_PROMPT.format( + original_question=question, + plan_title=plan.get("plan_title", ""), + plan_steps=plan_steps_text, + execution_results=execution_text, + ) + + reviewer_runtime = AgentRuntime( + AgentConfig( + name="reviewer", + system_prompt=reviewer_prompt, + llm=AgentLLMConfig( + model=self._default_llm.model, + temperature=0.3, + ), + tools=AgentToolConfig(include_tools=[]), + ), + on_llm_call=on_llm_call, + ) + review_result = await reviewer_runtime.run( + "请审查上述执行结果,输出最终答案。" + ) + steps.append(OrchestratorStep( + agent_id="reviewer", agent_name="Reviewer", + input="审查执行结果并输出最终答案", + output=review_result.content[:500], + iterations_used=review_result.iterations_used, + tool_calls_made=review_result.tool_calls_made, + error=None if review_result.success else review_result.error, + )) + + return OrchestratorResult( + mode="pipeline", + final_answer=review_result.content if review_result.success else "审查环节失败", + steps=steps, + agent_results=execution_results, + ) + + @staticmethod + def _parse_plan(text: str) -> dict: + """从 Planner 输出中解析 JSON 计划。""" + import re + # 尝试直接解析 + cleaned = text.strip() + # 移除 markdown 代码块包裹 + cleaned = re.sub(r'^```(?:json)?\s*', '', cleaned) + cleaned = re.sub(r'\s*```$', '', cleaned) + try: + return json.loads(cleaned) + except json.JSONDecodeError: + pass + # 尝试提取 JSON 块 + m = re.search(r'\{[\s\S]*\}', cleaned) + if m: + try: + return json.loads(m.group()) + except json.JSONDecodeError: + pass + # 兜底:返回基本结构 + return { + "plan_title": "执行计划", + "steps": [{"step": 1, "description": text[:200], "expected_output": "完成"}], + "success_criteria": text[:100], + } diff --git a/frontend/src/views/AgentChat.vue b/frontend/src/views/AgentChat.vue index 1fa00a3..52de537 100644 --- a/frontend/src/views/AgentChat.vue +++ b/frontend/src/views/AgentChat.vue @@ -25,10 +25,11 @@