diff --git a/backend/app/services/team_orchestrator.py b/backend/app/services/team_orchestrator.py index b0bea16..cf930dd 100644 --- a/backend/app/services/team_orchestrator.py +++ b/backend/app/services/team_orchestrator.py @@ -376,6 +376,144 @@ class TeamOrchestrator: "files": [], } + async def _run_phase_stream_and_collect( + self, + phase: Dict[str, Any], + members: List[TeamMember], + context_doc: str, + project_path: Path, + prev_files: set, + event_queue: asyncio.Queue, + ) -> Dict[str, Any]: + """流式执行单个 phase —— 用 run_stream() 替代 run()。 + + 将 Agent 内部事件(think/tool_call/tool_result/final)实时放入 event_queue, + 前端可据此渲染每个 Agent 的独立执行面板。 + + Returns: + 最终结果字典(与 _run_single_phase 相同结构) + """ + phase_num = phase.get("phase", 0) + role = phase.get("role", "developer") + phase_name = phase.get("name", f"阶段 {phase_num}") + phase_desc = phase.get("description", "") + expected = phase.get("expected_output", "") + + agent = None + for m in members: + if m.role == role: + agent = m.agent + break + + base_event = { + "phase": phase_num, + "role": role, + "name": phase_name, + "agent": agent.name if agent else None, + } + + if not agent: + event_queue.put_nowait({ + **base_event, + "type": "phase_done", + "success": False, + "error": f"missing_role:{role}", + "output": f"跳过: 无 {role} 角色", + "files": [], + }) + return { + "phase": phase_num, "name": phase_name, "role": role, + "agent_name": None, + "output": f"跳过: 无 {role} 角色", + "success": False, "error": f"missing_role:{role}", + "files": [], + } + + try: + agent_config = _build_agent_config(agent, self.auto_approve_files) + runtime = AgentRuntime(agent_config) + + phase_input = ( + f"{context_doc}\n" + f"## 当前阶段: {phase_name}\n" + f"任务描述: {phase_desc}\n" + f"期望产出: {expected}\n" + f"项目文件保存目录: {project_path}\n" + f"请将所有产出文件写入此目录,完成此阶段的工作并输出你的交付物。" + ) + + output_parts: List[str] = [] + last_iteration = 0 + tool_count = 0 + + async for agent_event in runtime.run_stream(phase_input): + event_type = agent_event.get("type", "") + + # 转发 Agent 内部事件到前端(带 phase 元数据) + event_queue.put_nowait({ + **base_event, + "type": "agent_event", + "data": agent_event, + }) + + if event_type == "final": + content = agent_event.get("content", "") + if content: + output_parts.append(content) + elif event_type == "tool_result": + tool_count += 1 + last_iteration = agent_event.get("iteration", last_iteration) + + output_text = "\n\n".join(output_parts) if output_parts else "执行完成" + + # 提取本阶段写入的文件 + # (run_stream 不返回 AgentResult,所以用目录扫描) + current_files = set(_scan_directory_files(project_path)) + phase_files = sorted(current_files - prev_files) + + result = { + "phase": phase_num, "name": phase_name, "role": role, + "agent_name": agent.name, + "output": output_text, + "success": True, + "iterations": last_iteration, + "tool_calls": tool_count, + "error": None, + "files": phase_files, + } + + event_queue.put_nowait({ + **base_event, + "type": "phase_done", + "output": output_text, + "success": True, + "iterations": last_iteration, + "tool_calls": tool_count, + "files": phase_files, + "error": None, + }) + + return result + + except Exception as e: + logger.error("团队编排 [%s]: 阶段 %s 流式异常: %s", self.team_id, phase_num, e) + result = { + "phase": phase_num, "name": phase_name, "role": role, + "agent_name": agent.name, + "output": f"执行异常: {e}", + "success": False, "error": str(e), + "files": [], + } + event_queue.put_nowait({ + **base_event, + "type": "phase_done", + "success": False, + "error": str(e), + "output": f"执行异常: {e}", + "files": [], + }) + return result + async def _execute_phases_parallel( self, phases: List[Dict[str, Any]], @@ -699,7 +837,7 @@ class TeamOrchestrator: has_dependencies = any(p.get("depends_on") for p in phases) if has_dependencies: - # ─── DAG 并行执行(流式) ─── + # ─── DAG 并行执行(流式 + 每 Agent 独立事件队列) ─── completed: Dict[int, str] = {} pending = {p["phase"]: p for p in phases} current_prev = prev_files.copy() @@ -720,7 +858,6 @@ class TeamOrchestrator: agent = self._get_agent_by_role(members, p.get("role", "")) phase_agents.append((p, agent)) - # 通知前端并行批次开始 yield { "type": "parallel_batch_start", "phases": [ @@ -729,8 +866,6 @@ class TeamOrchestrator: for p, a in phase_agents ], } - - # 各阶段启动事件 for p, agent in phase_agents: yield { "type": "phase_start", @@ -740,21 +875,36 @@ class TeamOrchestrator: "agent": agent.name if agent else None, } - # 并行执行所有就绪阶段 - tasks = [ - self._run_single_phase(p, members, current_context, project_path, current_prev) + # 并行执行:每个 phase 通过 event_queue 实时推送 Agent 内部事件 + event_queue: asyncio.Queue = asyncio.Queue() + tasks = { + asyncio.create_task( + self._run_phase_stream_and_collect( + p, members, current_context, project_path, current_prev, event_queue + ) + ): p for p, _ in phase_agents - ] - batch_results = await asyncio.gather(*tasks, return_exceptions=True) + } - for (p, agent), r in zip(phase_agents, batch_results): - if isinstance(r, Exception): + # 消费事件队列直到所有 phase 完成 + done_count = 0 + while done_count < len(tasks): + evt = await event_queue.get() + yield evt + if evt.get("type") == "phase_done": + done_count += 1 + + # 收集最终结果 + for task, p in tasks.items(): + try: + r = task.result() + except Exception as exc: r = { "phase": p["phase"], "name": p.get("name", ""), "role": p.get("role", ""), - "agent_name": agent.name if agent else None, - "output": f"并行执行异常: {r}", - "success": False, "error": str(r), + "agent_name": None, + "output": f"并行执行异常: {exc}", + "success": False, "error": str(exc), "files": [], } @@ -767,20 +917,6 @@ class TeamOrchestrator: if fp not in all_files: all_files.append(fp) current_prev.add(fp) - - yield { - "type": "phase_done", - "phase": p["phase"], - "name": r["name"], - "role": r["role"], - "agent": agent.name if agent else None, - "output": output_text, - "success": r.get("success", False), - "iterations": r.get("iterations"), - "tool_calls": r.get("tool_calls"), - "error": r.get("error") if not r.get("success") else None, - "files": r.get("files", []), - } del pending[p["phase"]] yield { @@ -791,31 +927,39 @@ class TeamOrchestrator: prev_files = current_prev context_doc = current_context else: - # ─── 顺序执行(向后兼容:无 depends_on 的旧版模板/计划) ─── + # ─── 顺序执行(流式:每 phase 实时推送 Agent 内部事件) ─── for i, phase in enumerate(phases): - phase_num = phase.get("phase", i + 1) - role = phase.get("role", "developer") - phase_name = phase.get("name", f"阶段 {phase_num}") - - agent = self._get_agent_by_role(members, role) - if not agent: - yield {"type": "phase_done", "phase": phase_num, "name": phase_name, - "role": role, "success": False, "error": f"missing_role:{role}"} - all_outputs.append(f"[跳过] 阶段 {phase_num} ({phase_name}): 无 {role} 角色") - continue - - yield { - "type": "phase_start", - "phase": phase_num, - "name": phase_name, - "role": role, - "agent": agent.name, - } - - r = await self._run_single_phase( - phase, members, context_doc, project_path, prev_files + event_queue = asyncio.Queue() + task = asyncio.create_task( + self._run_phase_stream_and_collect( + phase, members, context_doc, project_path, prev_files, event_queue + ) ) + + # 消费事件直到 phase_done + while True: + evt = await event_queue.get() + yield evt + if evt.get("type") == "phase_done": + break + + try: + r = task.result() + except Exception as exc: + r = { + "phase": phase.get("phase", i + 1), + "name": phase.get("name", ""), + "role": phase.get("role", ""), + "agent_name": None, + "output": f"执行异常: {exc}", + "success": False, "error": str(exc), + "files": [], + } + output_text = r.get("output", "") + phase_num = r["phase"] + phase_name = r["name"] + role = r["role"] if r.get("success"): context_doc += f"\n## 阶段 {phase_num}: {phase_name}\n{output_text[:2000]}\n" @@ -825,20 +969,6 @@ class TeamOrchestrator: all_files.append(fp) prev_files.add(fp) - yield { - "type": "phase_done", - "phase": phase_num, - "name": phase_name, - "role": role, - "agent": agent.name, - "output": output_text, - "success": r.get("success", False), - "iterations": r.get("iterations"), - "tool_calls": r.get("tool_calls"), - "error": r.get("error") if not r.get("success") else None, - "files": r.get("files", []), - } - # ─── QA 审查 ─── qa_agent = self._get_agent_by_role(members, "qa") if qa_agent and all_outputs: diff --git a/frontend/src/views/TeamBuilder.vue b/frontend/src/views/TeamBuilder.vue index 01bb67c..5b9f619 100644 --- a/frontend/src/views/TeamBuilder.vue +++ b/frontend/src/views/TeamBuilder.vue @@ -159,7 +159,68 @@