Files
aiagent/backend/app/services/team_orchestrator.py
renjianbo e0bcfe582b feat: add multi-window real-time agent execution display
Backend: execute_stream() now uses AgentRuntime.run_stream() per phase,
forwarding agent internal events (think/tool_call/tool_result/final) as
`agent_event` SSE type with phase metadata via asyncio.Queue for DAG
parallel batches.

Frontend: new agent-panels-grid with per-phase cards showing live
activity log, tool calls, iteration count, and collapsible full output.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-06-18 22:06:16 +08:00

1011 lines
40 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
团队编排引擎 — PM 规划 → 顺序阶段执行 → QA 审查 → 交付物
用法:
orchestrator = TeamOrchestrator(db, team_id, user_id)
result = await orchestrator.execute("做一个个人博客")
async for event in orchestrator.execute_stream("做一个个人博客"):
...
"""
from __future__ import annotations
import asyncio
import json
import re
import logging
from pathlib import Path
from typing import Any, AsyncGenerator, Dict, List, Optional
from datetime import datetime
from sqlalchemy.orm import Session
from app.models.team import Team, TeamMember
from app.models.agent import Agent
from app.agent_runtime.core import AgentRuntime
from app.agent_runtime.schemas import (
AgentConfig,
AgentLLMConfig,
AgentToolConfig,
AgentMemoryConfig,
AgentResult,
)
logger = logging.getLogger(__name__)
def _build_agent_config(agent: Agent, auto_approve_files: bool = True) -> AgentConfig:
"""从 Agent 的 workflow_config 构建 AgentConfig供 AgentRuntime 使用。"""
wf = agent.workflow_config or {}
nodes = wf.get("nodes", [])
llm_node = None
for n in nodes:
if n.get("type") == "llm":
llm_node = n
break
llm_data = llm_node.get("data", {}) if llm_node else {}
tools = llm_data.get("selected_tools") or llm_data.get("tools") or []
system_prompt = llm_data.get("prompt") or ""
model = llm_data.get("model", "deepseek-v4-pro")
provider = llm_data.get("provider", "deepseek")
temperature = float(llm_data.get("temperature", 0.4))
max_iterations = int(llm_data.get("max_iterations", 10))
perm = "acceptEdits" if auto_approve_files else "default"
logger.info("_build_agent_config: agent=%s tools=%d permission_level=%s", agent.name, len(tools), perm)
return AgentConfig(
name=agent.name,
system_prompt=system_prompt,
user_id=agent.user_id,
memory_scope_id=f"team_{agent.id}",
llm=AgentLLMConfig(
provider=provider,
model=model,
temperature=temperature,
max_iterations=max_iterations,
),
tools=AgentToolConfig(include_tools=tools, permission_level=perm),
memory=AgentMemoryConfig(
enabled=True,
max_history_messages=30,
persist_to_db=True,
learning_enabled=True,
),
)
def _parse_plan_json(raw_output: str) -> Dict[str, Any]:
"""从 LLM 原始输出中提取 JSON 计划(含 Markdown 表格回退)。"""
text = raw_output.strip()
# 尝试直接解析
try:
return json.loads(text)
except json.JSONDecodeError:
pass
# 从 ```json ... ``` 中提取
m = re.search(r'```(?:json)?\s*([\s\S]*?)\s*```', text)
if m:
try:
return json.loads(m.group(1))
except json.JSONDecodeError:
pass
# 从 { ... } 中提取(含 phases 键)
m = re.search(r'\{[\s\S]*"phases"[\s\S]*\}', text)
if m:
try:
return json.loads(m.group())
except json.JSONDecodeError:
pass
# ─── 回退:从 Markdown 表格提取 phases ───
# 匹配形如 | **phase-1** | `functional_tester` | ... | 的行
phases = _parse_phases_from_markdown(text)
if phases:
project_name = ""
m = re.search(r'(?:项目|测试).*?[:]\s*(.+?)(?:\n|$)', text)
if m:
project_name = m.group(1).strip()
return {
"project_name": project_name or "未命名项目",
"analysis": "",
"user_stories": [],
"phases": phases,
"acceptance_criteria": [],
}
logger.warning("无法从输出中解析 JSON 计划,原始输出: %.500s", text)
return {}
def _parse_phases_from_markdown(text: str) -> List[Dict[str, Any]]:
"""从 Markdown 表格中提取 phase 信息。支持格式:
| **phase-1** | `functional_tester` | 核心任务 | 工时 |
| phase-1 | functional_tester | 描述 |
"""
phases = []
# 匹配表格行: | (可选**的) phase-N (可选**) | role | description | ... |
# role 可能带反引号 `role` 或直接是文本
table_row = re.compile(
r'^\|\s*\*{0,2}(?:phase[-]?\s*)?(\d+)\*{0,2}\s*\|'
r'\s*(?:`)?([a-z_]+)(?:`)?\s*\|'
r'\s*([^|]+?)\s*\|',
re.MULTILINE | re.IGNORECASE
)
for m in table_row.finditer(text):
phase_num = int(m.group(1))
role = m.group(2).strip().lower()
desc = m.group(3).strip()
# 验证 role 是合法角色名
if role not in ("test_planner", "functional_tester", "ux_reviewer", "edge_explorer",
"performance_evaluator", "pm", "designer", "developer", "qa", "devops",
"curriculum_designer", "instructor", "teaching_assistant", "academic_admin",
"fullstack_dev", "fe_ux_engineer", "platform_devops", "qa_engineer", "product_lead",
"doc_architect", "tech_writer", "api_doc_specialist", "translator_reviewer",
"release_manager", "health_assessor", "nutritionist", "exercise_rehab",
"psychologist", "chronic_manager", "triage_specialist", "record_analyst",
"medication_reviewer", "followup_manager", "insurance_coordinator"):
continue
phases.append({
"phase": phase_num,
"name": desc[:60],
"role": role,
"description": desc,
"expected_output": f"{role} 阶段的产出物",
})
if phases:
logger.info("从 Markdown 表格提取到 %d 个 phases", len(phases))
return phases
def _resolve_workspace_root() -> Path:
"""获取文件工具的工作区根目录(与 builtin_tools.py 逻辑一致)。"""
from app.core.config import settings
raw = (getattr(settings, "LOCAL_FILE_TOOLS_ROOT", None) or "").strip()
if raw:
return Path(raw).expanduser().resolve()
# fallback: 4 级父目录 (builtin_tools.py → app/services → app → backend → repo root)
return Path(__file__).resolve().parent.parent.parent.parent
def _safe_project_name(description: str) -> str:
"""从项目描述中提取安全的目录名(英文/拼音/数字/连字符)。"""
# 取前 40 个字符,只保留英文、数字、中文转拼音首字母的近似
cleaned = description.strip().replace(" ", "-").replace("/", "-").replace("\\", "-")
# 去掉非 ASCII 和非基本标点
safe = re.sub(r'[^\w\-]', '', cleaned, flags=re.ASCII)
if not safe:
# 如果全是中文,用时间戳
from datetime import datetime
safe = f"project-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
return safe[:40].strip("-")
def _scan_directory_files(directory: Path) -> List[str]:
"""扫描目录下所有文件的绝对路径列表(递归)。"""
files: List[str] = []
if not directory.exists():
return files
for p in directory.rglob("*"):
if p.is_file():
files.append(str(p))
return sorted(files)
def _extract_written_files(result: AgentResult) -> List[str]:
"""从 AgentResult.steps 中提取 file_write 成功产出的绝对路径列表。"""
files: List[str] = []
seen = set()
for step in result.steps:
if step.tool_name == "file_write":
# 优先从 tool_result JSON 中取绝对路径(仅当写入成功时)
if step.tool_result:
try:
data = json.loads(step.tool_result)
# 跳过失败的调用(权限不足 / 需确认)
if data.get("error") or data.get("requires_confirmation"):
logger.warning("file_write 未成功(%s),跳过文件追踪", data.get("error", "requires_confirmation"))
continue
fp = data.get("file_path")
if fp and fp not in seen:
files.append(fp)
seen.add(fp)
continue
except (json.JSONDecodeError, TypeError):
pass
# 回退:从 tool_input 取相对路径,再拼接工作区根(仅当 tool_result 为空时)
if step.tool_input:
fp = step.tool_input.get("file_path")
if fp:
abs_path = str(_resolve_workspace_root() / fp)
if abs_path not in seen:
files.append(abs_path)
seen.add(abs_path)
return files
class TeamOrchestrator:
"""虚拟团队编排器 — 按角色分工顺序执行软件项目。"""
def __init__(
self,
db: Session,
team_id: str,
user_id: str,
auto_approve_files: bool = True,
):
self.db = db
self.team_id = team_id
self.user_id = user_id
self.auto_approve_files = auto_approve_files
def _load_team(self) -> Team:
team = self.db.query(Team).filter(Team.id == self.team_id).first()
if not team:
raise ValueError(f"团队不存在: {self.team_id}")
return team
def _load_members(self) -> List[TeamMember]:
return (
self.db.query(TeamMember)
.filter(TeamMember.team_id == self.team_id)
.order_by(TeamMember.position)
.all()
)
def _get_agent_by_role(self, members: List[TeamMember], role: str) -> Optional[Agent]:
"""按角色查找 Agent。"""
for m in members:
if m.role == role:
return m.agent
return None
def _resolve_planner_role(self, members: List[TeamMember], workflow: str) -> str:
"""根据团队 workflow 类型解析规划者角色。
- software_company → pm
- education_training → curriculum_designer
- platform_engineering → product_lead (产品负责人做规划)
- 回退: 第一个 is_lead=True 的成员
- 最终回退: 第一个成员
"""
workflow_planner_map = {
"software_company": "pm",
"education_training": "curriculum_designer",
"platform_engineering": "product_lead",
"tech_doc": "doc_architect",
"health_management": "health_assessor",
"medical_consultation": "triage_specialist",
"user_simulation_test": "test_planner",
}
role = workflow_planner_map.get(workflow)
if role and self._get_agent_by_role(members, role):
return role
# 回退:找第一个 Leader
for m in sorted(members, key=lambda x: x.position):
if m.is_lead:
return m.role
# 最终回退:第一个成员
if members:
return members[0].role
return "pm"
async def _run_single_phase(
self,
phase: Dict[str, Any],
members: List[TeamMember],
context_doc: str,
project_path: Path,
prev_files: set,
) -> Dict[str, Any]:
"""执行单个 phase可被 asyncio.gather 并发调用)。
每个 phase 创建独立的 AgentRuntime互不干扰。
"""
phase_num = phase.get("phase", 0)
role = phase.get("role", "developer")
phase_name = phase.get("name", f"阶段 {phase_num}")
phase_desc = phase.get("description", "")
expected = phase.get("expected_output", "")
# DB 查询在协程内完成members 已预加载,此处仅内存迭代)
agent = None
for m in members:
if m.role == role:
agent = m.agent
break
if not agent:
logger.warning("团队编排 [%s]: 阶段 %s 缺少角色 '%s',跳过",
self.team_id, phase_num, role)
return {
"phase": phase_num, "name": phase_name, "role": role,
"agent_name": None,
"output": f"跳过: 无 {role} 角色",
"success": False, "error": f"missing_role:{role}",
"files": [],
}
logger.info("团队编排 [%s]: 执行阶段 %s%s (角色: %s, Agent: %s)",
self.team_id, phase_num, phase_name, role, agent.name)
try:
agent_config = _build_agent_config(agent, self.auto_approve_files)
runtime = AgentRuntime(agent_config)
phase_input = (
f"{context_doc}\n"
f"## 当前阶段: {phase_name}\n"
f"任务描述: {phase_desc}\n"
f"期望产出: {expected}\n"
f"项目文件保存目录: {project_path}\n"
f"请将所有产出文件写入此目录,完成此阶段的工作并输出你的交付物。"
)
phase_result = await runtime.run(phase_input)
output_text = phase_result.content if phase_result.success else (
phase_result.error or "执行失败"
)
# 提取本阶段写入的文件AgentResult + 目录扫描双保险)
extracted = _extract_written_files(phase_result)
current_files = set(_scan_directory_files(project_path))
phase_files = sorted(current_files - prev_files)
for fp in extracted:
if fp not in phase_files:
phase_files.append(fp)
return {
"phase": phase_num, "name": phase_name, "role": role,
"agent_name": agent.name,
"output": output_text,
"success": phase_result.success,
"iterations": phase_result.iterations_used,
"tool_calls": phase_result.tool_calls_made,
"error": phase_result.error if not phase_result.success else None,
"files": phase_files,
}
except Exception as e:
logger.error("团队编排 [%s]: 阶段 %s 异常: %s", self.team_id, phase_num, e)
return {
"phase": phase_num, "name": phase_name, "role": role,
"agent_name": agent.name,
"output": f"执行异常: {e}",
"success": False, "error": str(e),
"files": [],
}
async def _run_phase_stream_and_collect(
self,
phase: Dict[str, Any],
members: List[TeamMember],
context_doc: str,
project_path: Path,
prev_files: set,
event_queue: asyncio.Queue,
) -> Dict[str, Any]:
"""流式执行单个 phase —— 用 run_stream() 替代 run()。
将 Agent 内部事件think/tool_call/tool_result/final实时放入 event_queue
前端可据此渲染每个 Agent 的独立执行面板。
Returns:
最终结果字典(与 _run_single_phase 相同结构)
"""
phase_num = phase.get("phase", 0)
role = phase.get("role", "developer")
phase_name = phase.get("name", f"阶段 {phase_num}")
phase_desc = phase.get("description", "")
expected = phase.get("expected_output", "")
agent = None
for m in members:
if m.role == role:
agent = m.agent
break
base_event = {
"phase": phase_num,
"role": role,
"name": phase_name,
"agent": agent.name if agent else None,
}
if not agent:
event_queue.put_nowait({
**base_event,
"type": "phase_done",
"success": False,
"error": f"missing_role:{role}",
"output": f"跳过: 无 {role} 角色",
"files": [],
})
return {
"phase": phase_num, "name": phase_name, "role": role,
"agent_name": None,
"output": f"跳过: 无 {role} 角色",
"success": False, "error": f"missing_role:{role}",
"files": [],
}
try:
agent_config = _build_agent_config(agent, self.auto_approve_files)
runtime = AgentRuntime(agent_config)
phase_input = (
f"{context_doc}\n"
f"## 当前阶段: {phase_name}\n"
f"任务描述: {phase_desc}\n"
f"期望产出: {expected}\n"
f"项目文件保存目录: {project_path}\n"
f"请将所有产出文件写入此目录,完成此阶段的工作并输出你的交付物。"
)
output_parts: List[str] = []
last_iteration = 0
tool_count = 0
async for agent_event in runtime.run_stream(phase_input):
event_type = agent_event.get("type", "")
# 转发 Agent 内部事件到前端(带 phase 元数据)
event_queue.put_nowait({
**base_event,
"type": "agent_event",
"data": agent_event,
})
if event_type == "final":
content = agent_event.get("content", "")
if content:
output_parts.append(content)
elif event_type == "tool_result":
tool_count += 1
last_iteration = agent_event.get("iteration", last_iteration)
output_text = "\n\n".join(output_parts) if output_parts else "执行完成"
# 提取本阶段写入的文件
# (run_stream 不返回 AgentResult所以用目录扫描)
current_files = set(_scan_directory_files(project_path))
phase_files = sorted(current_files - prev_files)
result = {
"phase": phase_num, "name": phase_name, "role": role,
"agent_name": agent.name,
"output": output_text,
"success": True,
"iterations": last_iteration,
"tool_calls": tool_count,
"error": None,
"files": phase_files,
}
event_queue.put_nowait({
**base_event,
"type": "phase_done",
"output": output_text,
"success": True,
"iterations": last_iteration,
"tool_calls": tool_count,
"files": phase_files,
"error": None,
})
return result
except Exception as e:
logger.error("团队编排 [%s]: 阶段 %s 流式异常: %s", self.team_id, phase_num, e)
result = {
"phase": phase_num, "name": phase_name, "role": role,
"agent_name": agent.name,
"output": f"执行异常: {e}",
"success": False, "error": str(e),
"files": [],
}
event_queue.put_nowait({
**base_event,
"type": "phase_done",
"success": False,
"error": str(e),
"output": f"执行异常: {e}",
"files": [],
})
return result
async def _execute_phases_parallel(
self,
phases: List[Dict[str, Any]],
members: List[TeamMember],
context_doc: str,
project_path: Path,
prev_files: set,
):
"""DAG 并行执行 phases。按 depends_on 拓扑分批,批内 asyncio.gather 并发。
Returns:
(phase_results, updated_context_doc, updated_prev_files, all_files)
"""
results: List[Dict[str, Any]] = []
completed: Dict[int, str] = {} # phase_num → output_text
pending = {p["phase"]: p for p in phases}
current_prev = prev_files.copy()
current_context = context_doc
all_files: List[str] = []
while pending:
# 找出所有依赖已满足的阶段
ready = [
p for p in pending.values()
if all(d in completed for d in p.get("depends_on", []))
]
if not ready:
raise RuntimeError(
f"阶段循环依赖,无法继续。已完成: {list(completed.keys())}, "
f"待处理: {list(pending.keys())}"
)
logger.info("团队编排 [%s]: 并行批次 — %d 个阶段: %s",
self.team_id, len(ready),
[(p["phase"], p.get("name", "")) for p in ready])
# 并行执行所有就绪阶段
tasks = [
self._run_single_phase(p, members, current_context, project_path, current_prev)
for p in ready
]
batch_results = await asyncio.gather(*tasks, return_exceptions=True)
for p, r in zip(ready, batch_results):
if isinstance(r, Exception):
r = {
"phase": p["phase"], "name": p.get("name", ""),
"role": p.get("role", ""),
"agent_name": None,
"output": f"并行执行异常: {r}",
"success": False, "error": str(r),
"files": [],
}
# 记录完成
completed[p["phase"]] = r.get("output", "")
# 更新共享上下文(后续批次可见)
current_context += (
f"\n## 阶段 {p['phase']}: {p.get('name', '')}\n"
f"{r.get('output', '')[:2000]}\n"
)
# 更新文件追踪
for fp in r.get("files", []):
if fp not in all_files:
all_files.append(fp)
current_prev.add(fp)
results.append(r)
del pending[p["phase"]]
return results, current_context, current_prev, all_files
async def execute(self, project_description: str, auto_approve_files: bool = True) -> Dict[str, Any]:
"""执行团队项目(非流式)。
Returns:
{
"project_name": str,
"plan": {...},
"phases": [{phase, role, agent_name, output, steps, success, error}],
"qa_review": {...} | None,
"final_deliverable": str,
"success": bool,
}
"""
self.auto_approve_files = auto_approve_files
team = self._load_team()
members = self._load_members()
if not members:
raise ValueError("团队没有成员,请先为角色分配 Agent")
# ─── 创建项目目录 ───
workspace_root = _resolve_workspace_root()
safe_name = _safe_project_name(project_description)
project_path = workspace_root / "team_projects" / self.team_id / safe_name
project_path.mkdir(parents=True, exist_ok=True)
logger.info("团队编排 [%s]: 项目目录 %s", self.team_id, project_path)
prev_files = set(_scan_directory_files(project_path))
all_files: List[str] = []
results: Dict[str, Any] = {
"team_id": self.team_id,
"team_name": team.name,
"project_description": project_description,
"project_path": str(project_path),
"files": [],
"started_at": datetime.now().isoformat(),
"plan": None,
"phases": [],
"qa_review": None,
"final_deliverable": "",
"success": True,
"error": None,
}
# ─── Phase 0: 规划阶段(由 Leader Agent 担任) ───
# 选择规划者:优先用 team.config 中指定的 workflow否则回退兼容
workflow = (team.config or {}).get("workflow", "software_company")
planner_role = self._resolve_planner_role(members, workflow)
planner_agent = self._get_agent_by_role(members, planner_role)
if not planner_agent:
raise ValueError(
f"团队缺少 Leader 角色(需要 role='{planner_role}'),无法分解项目。"
f"可用角色: {[m.role for m in members]}"
)
# 动态构建可用的角色列表(来自团队成员实际角色)
available_roles = [m.role for m in members]
roles_hint = "/".join(available_roles)
logger.info("团队编排 [%s]: 规划开始 (workflow=%s, planner=%s, role=%s)",
self.team_id, workflow, planner_agent.name, planner_role)
planner_config = _build_agent_config(planner_agent, self.auto_approve_files)
planner_runtime = AgentRuntime(planner_config)
planner_input = (
f"请分析以下需求,按 JSON 格式输出执行计划:\n\n"
f"需求描述:{project_description}\n"
f"输出文件目录:{project_path}\n\n"
f"你的团队有以下角色(每个 phase 必须使用这些角色之一):{roles_hint}\n"
f"请输出包含 phases 数组的完整 JSON 计划,每个 phase 需指定 role 为上述角色之一。"
)
planner_result = await planner_runtime.run(planner_input)
plan = _parse_plan_json(planner_result.content) if planner_result.success else {}
results["plan"] = plan
if not plan or not plan.get("phases"):
results["success"] = False
results["error"] = f"{planner_role} 未能生成有效计划"
results["final_deliverable"] = planner_result.content
return results
logger.info("团队编排 [%s]: PM 生成 %d 个阶段", self.team_id, len(plan["phases"]))
# ─── Phase 1..N: DAG 并行执行(含顺序回退) ───
all_outputs: List[str] = []
context_doc = f"# 项目: {plan.get('project_name', '未命名')}\n\n## 需求分析\n{plan.get('analysis', '')}\n\n"
phases = plan["phases"]
# 检测是否有 phase 显式指定了 depends_on 以决定是否启用并行
has_dependencies = any(p.get("depends_on") for p in phases)
if has_dependencies:
logger.info("团队编排 [%s]: 使用 DAG 并行模式执行 %d 个阶段", self.team_id, len(phases))
phase_results, context_doc, prev_files, all_files = await self._execute_phases_parallel(
phases, members, context_doc, project_path, prev_files
)
results["phases"] = phase_results
all_outputs = [
f"## {r['name']} ({r['role']})\n{r.get('output', '')}"
for r in phase_results
]
# 更新 success任一阶段失败则整体失败
for r in phase_results:
if not r.get("success"):
results["success"] = False
else:
# ─── 顺序执行(向后兼容:无 depends_on 的旧版模板) ───
for i, phase in enumerate(phases):
r = await self._run_single_phase(
phase, members, context_doc, project_path, prev_files
)
phase_num = r["phase"]
phase_name = r["name"]
role = r["role"]
output_text = r.get("output", "")
if r["success"]:
context_doc += f"\n## 阶段 {phase_num}: {phase_name}\n{output_text[:2000]}\n"
all_outputs.append(f"## {phase_name} ({role})\n{output_text}")
for fp in r.get("files", []):
if fp not in all_files:
all_files.append(fp)
# 更新 prev_files 供下一阶段使用
for fp in r.get("files", []):
prev_files.add(fp)
else:
all_outputs.append(f"[跳过] 阶段 {phase_num} ({phase_name}): {r.get('error', '')}")
results["success"] = False
results["phases"].append(r)
# ─── QA 审查 ───
qa_agent = self._get_agent_by_role(members, "qa")
if qa_agent and all_outputs:
logger.info("团队编排 [%s]: QA 审查开始", self.team_id)
try:
qa_config = _build_agent_config(qa_agent, self.auto_approve_files)
qa_runtime = AgentRuntime(qa_config)
deliverables = "\n\n---\n\n".join(all_outputs)
qa_input = (
f"请审查以下项目交付物:\n\n"
f"项目: {project_description}\n\n"
f"{deliverables[:8000]}\n\n"
f"请按 JSON 格式输出审查结果,包含 pass/score/issues/overall_assessment。"
)
qa_result = await qa_runtime.run(qa_input)
qa_review = _parse_plan_json(qa_result.content) if qa_result.success else {}
results["qa_review"] = {
"output": qa_result.content,
"review": qa_review,
"success": qa_result.success,
}
except Exception as e:
logger.error("团队编排 [%s]: QA 审查异常: %s", self.team_id, e)
results["qa_review"] = {"error": str(e)}
# ─── 汇总最终交付物 ───
results["final_deliverable"] = "\n\n---\n\n".join(all_outputs)
# 最终文件列表以目录扫描为准,合并从 AgentResult 提取的
final_scan = set(_scan_directory_files(project_path))
for fp in all_files:
final_scan.add(fp)
results["files"] = sorted(final_scan)
results["completed_at"] = datetime.now().isoformat()
return results
async def execute_stream(self, project_description: str, auto_approve_files: bool = True) -> AsyncGenerator[Dict[str, Any], None]:
"""流式执行团队项目yield SSE 事件。
事件类型:
- plan_start / plan / plan_done — PM 规划阶段
- phase_start / phase_output / phase_done — 各执行阶段
- qa_start / qa_review / qa_done — QA 审查
- final — 全部完成
- error — 错误
"""
self.auto_approve_files = auto_approve_files
try:
team = self._load_team()
members = self._load_members()
except Exception as e:
yield {"type": "error", "content": str(e)}
return
if not members:
yield {"type": "error", "content": "团队没有成员"}
return
# ─── 创建项目目录 ───
workspace_root = _resolve_workspace_root()
safe_name = _safe_project_name(project_description)
project_path = workspace_root / "team_projects" / self.team_id / safe_name
project_path.mkdir(parents=True, exist_ok=True)
prev_files = set(_scan_directory_files(project_path))
all_files: List[str] = []
yield {
"type": "plan_start",
"team_name": team.name,
"member_count": len(members),
"project_path": str(project_path),
}
all_outputs: List[str] = []
# ─── 规划阶段(由 Leader Agent 担任) ───
workflow = (team.config or {}).get("workflow", "software_company")
planner_role = self._resolve_planner_role(members, workflow)
planner_agent = self._get_agent_by_role(members, planner_role)
if not planner_agent:
yield {"type": "error", "content": f"团队缺少 Leader 角色(需要 role='{planner_role}'),可用角色: {[m.role for m in members]}"}
return
try:
available_roles = [m.role for m in members]
roles_hint = "/".join(available_roles)
planner_config = _build_agent_config(planner_agent, self.auto_approve_files)
planner_runtime = AgentRuntime(planner_config)
planner_input = (
f"请分析以下需求,按 JSON 格式输出执行计划:\n\n"
f"需求描述:{project_description}\n"
f"输出文件目录:{project_path}\n"
f"你的团队有以下角色(每个 phase 必须使用这些角色之一):{roles_hint}\n"
f"请输出包含 phases 数组的完整 JSON 计划,每个 phase 需指定 role 为上述角色之一。"
)
yield {"type": "plan", "status": "running", "agent": planner_agent.name}
planner_result = await planner_runtime.run(planner_input)
plan = _parse_plan_json(planner_result.content) if planner_result.success else {}
yield {
"type": "plan_done",
"plan": plan,
"raw_output": planner_result.content[:2000],
"success": planner_result.success,
}
except Exception as e:
yield {"type": "error", "content": f"{planner_role} 规划失败: {e}"}
return
if not plan or not plan.get("phases"):
yield {"type": "error", "content": f"{planner_role} 未能生成有效计划"}
return
phases = plan["phases"]
context_doc = f"# 项目: {plan.get('project_name', '未命名')}\n\n## 需求分析\n{plan.get('analysis', '')}\n\n"
# 检测是否有 phase 显式指定了 depends_on
has_dependencies = any(p.get("depends_on") for p in phases)
if has_dependencies:
# ─── DAG 并行执行(流式 + 每 Agent 独立事件队列) ───
completed: Dict[int, str] = {}
pending = {p["phase"]: p for p in phases}
current_prev = prev_files.copy()
current_context = context_doc
while pending:
ready = [
p for p in pending.values()
if all(d in completed for d in p.get("depends_on", []))
]
if not ready:
yield {"type": "error", "content": "阶段循环依赖,无法继续"}
return
# 预解析 agent 信息
phase_agents = []
for p in ready:
agent = self._get_agent_by_role(members, p.get("role", ""))
phase_agents.append((p, agent))
yield {
"type": "parallel_batch_start",
"phases": [
{"phase": p["phase"], "name": p.get("name"), "role": p.get("role", ""),
"agent": a.name if a else None}
for p, a in phase_agents
],
}
for p, agent in phase_agents:
yield {
"type": "phase_start",
"phase": p["phase"],
"name": p.get("name"),
"role": p.get("role", ""),
"agent": agent.name if agent else None,
}
# 并行执行:每个 phase 通过 event_queue 实时推送 Agent 内部事件
event_queue: asyncio.Queue = asyncio.Queue()
tasks = {
asyncio.create_task(
self._run_phase_stream_and_collect(
p, members, current_context, project_path, current_prev, event_queue
)
): p
for p, _ in phase_agents
}
# 消费事件队列直到所有 phase 完成
done_count = 0
while done_count < len(tasks):
evt = await event_queue.get()
yield evt
if evt.get("type") == "phase_done":
done_count += 1
# 收集最终结果
for task, p in tasks.items():
try:
r = task.result()
except Exception as exc:
r = {
"phase": p["phase"], "name": p.get("name", ""),
"role": p.get("role", ""),
"agent_name": None,
"output": f"并行执行异常: {exc}",
"success": False, "error": str(exc),
"files": [],
}
output_text = r.get("output", "")
completed[p["phase"]] = output_text
current_context += f"\n## 阶段 {p['phase']}: {p.get('name', '')}\n{output_text[:2000]}\n"
all_outputs.append(f"## {r['name']} ({r['role']})\n{output_text}")
for fp in r.get("files", []):
if fp not in all_files:
all_files.append(fp)
current_prev.add(fp)
del pending[p["phase"]]
yield {
"type": "parallel_batch_end",
"completed": [p["phase"] for p, _ in phase_agents],
}
prev_files = current_prev
context_doc = current_context
else:
# ─── 顺序执行(流式:每 phase 实时推送 Agent 内部事件) ───
for i, phase in enumerate(phases):
event_queue = asyncio.Queue()
task = asyncio.create_task(
self._run_phase_stream_and_collect(
phase, members, context_doc, project_path, prev_files, event_queue
)
)
# 消费事件直到 phase_done
while True:
evt = await event_queue.get()
yield evt
if evt.get("type") == "phase_done":
break
try:
r = task.result()
except Exception as exc:
r = {
"phase": phase.get("phase", i + 1),
"name": phase.get("name", ""),
"role": phase.get("role", ""),
"agent_name": None,
"output": f"执行异常: {exc}",
"success": False, "error": str(exc),
"files": [],
}
output_text = r.get("output", "")
phase_num = r["phase"]
phase_name = r["name"]
role = r["role"]
if r.get("success"):
context_doc += f"\n## 阶段 {phase_num}: {phase_name}\n{output_text[:2000]}\n"
all_outputs.append(f"## {phase_name} ({role})\n{output_text}")
for fp in r.get("files", []):
if fp not in all_files:
all_files.append(fp)
prev_files.add(fp)
# ─── QA 审查 ───
qa_agent = self._get_agent_by_role(members, "qa")
if qa_agent and all_outputs:
yield {"type": "qa_start", "agent": qa_agent.name}
try:
qa_config = _build_agent_config(qa_agent, self.auto_approve_files)
qa_runtime = AgentRuntime(qa_config)
deliverables = "\n\n---\n\n".join(all_outputs)
qa_input = (
f"请审查以下项目交付物:\n\n"
f"项目: {project_description}\n\n"
f"{deliverables[:8000]}\n\n"
f"请按 JSON 格式输出审查结果,包含 pass/score/issues/overall_assessment。"
)
qa_result = await qa_runtime.run(qa_input)
qa_review_data = _parse_plan_json(qa_result.content) if qa_result.success else {}
yield {
"type": "qa_done",
"output": qa_result.content,
"review": qa_review_data,
"success": qa_result.success,
}
except Exception as e:
logger.error("团队编排: QA 审查异常: %s", e)
yield {"type": "qa_done", "error": str(e)}
# ─── 最终交付物 ───
final = "\n\n---\n\n".join(all_outputs)
final_scan = set(_scan_directory_files(project_path))
for fp in all_files:
final_scan.add(fp)
yield {
"type": "final",
"deliverable": final,
"phase_count": len(phases),
"team_name": team.name,
"project_path": str(project_path),
"files": sorted(final_scan),
}