feat: 新增 Pipeline 流水线编排模式 (Planner→Executor→Reviewer)

新增第四种编排模式 pipeline,实现规划→执行→审查的自动化流水线:
- Planner 自动将问题拆解为 2-5 步 JSON 执行计划
- Executor 使用用户配置的 Agent 逐步骤执行
- Reviewer 审查全部输出并交付最终答案
- 前端编排模式选择器新增"流水线模式"选项
- 更新完善自主 AI Agent 改造完成情况文档

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
renjianbo
2026-05-02 11:03:51 +08:00
parent 7aba0f9bc5
commit 5423aca684
4 changed files with 624 additions and 19 deletions

View File

@@ -1,10 +1,11 @@
"""
Agent Orchestrator — 多 Agent 编排引擎。
支持种协作模式:
支持种协作模式:
- route: Router Agent 分析问题 → 分发到最合适的 Specialist Agent
- sequential: Agent 流水线执行,前者输出作为后者输入
- debate: 多个 Agent 独立回答 → Aggregator 汇总为最终答案
- pipeline: Planner 制定计划 → Executor 逐步骤执行 → Reviewer 审查交付
"""
from __future__ import annotations
@@ -73,6 +74,53 @@ _ROUTER_SYSTEM_PROMPT = """你是一个路由调度员。你的任务是从以
- 如果问题涉及多个领域,选择最相关的那个
- 必须从上述列表中选择,不能编造 Agent ID"""
_PLANNER_SYSTEM_PROMPT = """你是一个任务规划员。将用户的问题拆解为可执行的步骤计划。
要求:
1. 分析问题的核心目标和子任务
2. 拆分 2-5 个具体、可操作的步骤
3. 步骤之间有明确的依赖顺序
4. 每个步骤包含预期输出
返回 JSON 格式(不要 markdown 包裹),严格按照以下结构:
{
"plan_title": "计划标题",
"steps": [
{"step": 1, "description": "第一步做什么", "expected_output": "预期产出描述"},
{"step": 2, "description": "第二步做什么", "expected_output": "预期产出描述"}
],
"success_criteria": "如何判断执行成功"
}"""
_EXECUTOR_STEP_PROMPT = """你正在执行一个计划中的步骤。
原始问题: {original_question}
计划标题: {plan_title}
当前步骤 ({current_step}/{total_steps}): {step_description}
预期输出: {expected_output}
前序步骤结果:
{previous_output}
请专注执行当前步骤,使用可用工具完成任务。完成后输出本步骤的结果。"""
_REVIEWER_SYSTEM_PROMPT = """你是一个质量审查员。审查计划执行结果,输出最终答案给用户。
原始问题: {original_question}
执行计划: {plan_title}
计划步骤: {plan_steps}
各步骤执行结果:
{execution_results}
请:
1. 确认每个步骤是否完成
2. 汇总各步骤结果
3. 输出完整、清晰的最终答案
4. 如有改进空间,在末尾附加"改进建议"
最终答案应直接面向用户,不要提及内部步骤细节。"""
_AGGREGATOR_SYSTEM_PROMPT = """你是一个回答汇总员。多个 AI Agent 对同一个问题给出了不同的回答。
请分析所有回答,输出一份综合的最终答案。
@@ -111,8 +159,10 @@ class AgentOrchestrator:
return await self._sequential(question, agents, on_llm_call)
elif mode == "debate":
return await self._debate(question, agents, on_llm_call)
elif mode == "pipeline":
return await self._pipeline(question, agents, on_llm_call)
else:
raise ValueError(f"不支持的编排模式: {mode},可选: route, sequential, debate")
raise ValueError(f"不支持的编排模式: {mode},可选: route, sequential, debate, pipeline")
async def _route(
self, question: str, agents: List[OrchestratorAgentConfig],
@@ -375,3 +425,192 @@ class AgentOrchestrator:
steps=steps,
agent_results=agent_outputs,
)
async def _pipeline(
self, question: str, agents: List[OrchestratorAgentConfig],
on_llm_call: Optional[Callable] = None,
) -> OrchestratorResult:
"""流水线模式Planner → Executor逐步骤 → Reviewer。
使用内置的 Planner / Reviewer Agent将用户提供的第一个 Agent 作为 Executor。
"""
steps: List[OrchestratorStep] = []
# ── 1. Planner制定计划 ──
planner_runtime = AgentRuntime(
AgentConfig(
name="planner",
system_prompt=_PLANNER_SYSTEM_PROMPT,
llm=AgentLLMConfig(
model=self._default_llm.model,
temperature=0.2,
),
tools=AgentToolConfig(include_tools=[]),
),
on_llm_call=on_llm_call,
)
planner_result = await planner_runtime.run(question)
steps.append(OrchestratorStep(
agent_id="planner", agent_name="Planner",
input=question[:200],
output=planner_result.content[:500],
iterations_used=planner_result.iterations_used,
tool_calls_made=planner_result.tool_calls_made,
error=None if planner_result.success else planner_result.error,
))
if not planner_result.success:
return OrchestratorResult(
mode="pipeline",
final_answer=f"规划失败: {planner_result.content}",
steps=steps,
)
# 解析计划
plan = self._parse_plan(planner_result.content)
plan_steps = plan.get("steps", [])
if not plan_steps:
return OrchestratorResult(
mode="pipeline",
final_answer="规划结果中没有有效的执行步骤",
steps=steps,
)
# ── 2. Executor逐步骤执行 ──
executor_cfg = agents[0] if agents else OrchestratorAgentConfig(
id="executor", name="Executor",
system_prompt="你是一个有用的AI助手。",
)
previous_output = "(尚无前序步骤)"
execution_results = []
for step_info in plan_steps:
step_num = step_info.get("step", 0)
step_desc = step_info.get("description", f"步骤 {step_num}")
step_expect = step_info.get("expected_output", "")
executor_prompt = _EXECUTOR_STEP_PROMPT.format(
original_question=question,
plan_title=plan.get("plan_title", ""),
current_step=step_num,
total_steps=len(plan_steps),
step_description=step_desc,
expected_output=step_expect,
previous_output=previous_output,
)
executor_runtime = AgentRuntime(
AgentConfig(
name=executor_cfg.name,
system_prompt=executor_cfg.system_prompt,
llm=AgentLLMConfig(
model=executor_cfg.model,
provider=executor_cfg.provider,
temperature=executor_cfg.temperature,
max_iterations=executor_cfg.max_iterations,
),
tools=AgentToolConfig(
include_tools=executor_cfg.tools,
),
),
on_llm_call=on_llm_call,
)
step_result = await executor_runtime.run(executor_prompt)
step_output = OrchestratorStep(
agent_id=executor_cfg.id,
agent_name=f"{executor_cfg.name} (步骤{step_num})",
input=f"步骤{step_num}: {step_desc}",
output=step_result.content[:500],
iterations_used=step_result.iterations_used,
tool_calls_made=step_result.tool_calls_made,
error=None if step_result.success else step_result.error,
)
steps.append(step_output)
execution_results.append({
"step": step_num,
"description": step_desc,
"output": step_result.content,
"error": step_result.error if not step_result.success else None,
})
previous_output = step_result.content if step_result.success else f"(步骤{step_num}执行出错)"
if not step_result.success:
logger.warning(f"Pipeline 步骤{step_num} 执行失败: {step_result.error}")
# ── 3. Reviewer审查并交付 ──
plan_steps_text = "\n".join(
f"步骤{s['step']}: {s['description']} → 预期: {s.get('expected_output', '')}"
for s in plan_steps
)
execution_text = "\n\n".join(
f"【步骤{r['step']}{r['description']}\n{r['output']}"
for r in execution_results
)
reviewer_prompt = _REVIEWER_SYSTEM_PROMPT.format(
original_question=question,
plan_title=plan.get("plan_title", ""),
plan_steps=plan_steps_text,
execution_results=execution_text,
)
reviewer_runtime = AgentRuntime(
AgentConfig(
name="reviewer",
system_prompt=reviewer_prompt,
llm=AgentLLMConfig(
model=self._default_llm.model,
temperature=0.3,
),
tools=AgentToolConfig(include_tools=[]),
),
on_llm_call=on_llm_call,
)
review_result = await reviewer_runtime.run(
"请审查上述执行结果,输出最终答案。"
)
steps.append(OrchestratorStep(
agent_id="reviewer", agent_name="Reviewer",
input="审查执行结果并输出最终答案",
output=review_result.content[:500],
iterations_used=review_result.iterations_used,
tool_calls_made=review_result.tool_calls_made,
error=None if review_result.success else review_result.error,
))
return OrchestratorResult(
mode="pipeline",
final_answer=review_result.content if review_result.success else "审查环节失败",
steps=steps,
agent_results=execution_results,
)
@staticmethod
def _parse_plan(text: str) -> dict:
"""从 Planner 输出中解析 JSON 计划。"""
import re
# 尝试直接解析
cleaned = text.strip()
# 移除 markdown 代码块包裹
cleaned = re.sub(r'^```(?:json)?\s*', '', cleaned)
cleaned = re.sub(r'\s*```$', '', cleaned)
try:
return json.loads(cleaned)
except json.JSONDecodeError:
pass
# 尝试提取 JSON 块
m = re.search(r'\{[\s\S]*\}', cleaned)
if m:
try:
return json.loads(m.group())
except json.JSONDecodeError:
pass
# 兜底:返回基本结构
return {
"plan_title": "执行计划",
"steps": [{"step": 1, "description": text[:200], "expected_output": "完成"}],
"success_criteria": text[:100],
}