feat: add DAG-based parallel phase execution with depends_on support

Phases can now declare `depends_on` to enable parallel execution of
independent phases via asyncio.gather. Backward compatible — templates
without depends_on continue to use sequential execution.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
renjianbo
2026-06-18 21:31:16 +08:00
parent ed724dad1b
commit e0efa7e9b1
2 changed files with 408 additions and 170 deletions

View File

@@ -9,6 +9,7 @@
"""
from __future__ import annotations
import asyncio
import json
import re
import logging
@@ -74,7 +75,7 @@ def _build_agent_config(agent: Agent, auto_approve_files: bool = True) -> AgentC
def _parse_plan_json(raw_output: str) -> Dict[str, Any]:
"""从 LLM 原始输出中提取 JSON 计划。"""
"""从 LLM 原始输出中提取 JSON 计划(含 Markdown 表格回退)"""
text = raw_output.strip()
# 尝试直接解析
try:
@@ -82,24 +83,80 @@ def _parse_plan_json(raw_output: str) -> Dict[str, Any]:
except json.JSONDecodeError:
pass
# 从 ```json ... ``` 中提取
import re
m = re.search(r'```(?:json)?\s*([\s\S]*?)\s*```', text)
if m:
try:
return json.loads(m.group(1))
except json.JSONDecodeError:
pass
# 从 { ... } 中提取
# 从 { ... } 中提取(含 phases 键)
m = re.search(r'\{[\s\S]*"phases"[\s\S]*\}', text)
if m:
try:
return json.loads(m.group())
except json.JSONDecodeError:
pass
# ─── 回退:从 Markdown 表格提取 phases ───
# 匹配形如 | **phase-1** | `functional_tester` | ... | 的行
phases = _parse_phases_from_markdown(text)
if phases:
project_name = ""
m = re.search(r'(?:项目|测试).*?[:]\s*(.+?)(?:\n|$)', text)
if m:
project_name = m.group(1).strip()
return {
"project_name": project_name or "未命名项目",
"analysis": "",
"user_stories": [],
"phases": phases,
"acceptance_criteria": [],
}
logger.warning("无法从输出中解析 JSON 计划,原始输出: %.500s", text)
return {}
def _parse_phases_from_markdown(text: str) -> List[Dict[str, Any]]:
"""从 Markdown 表格中提取 phase 信息。支持格式:
| **phase-1** | `functional_tester` | 核心任务 | 工时 |
| phase-1 | functional_tester | 描述 |
"""
phases = []
# 匹配表格行: | (可选**的) phase-N (可选**) | role | description | ... |
# role 可能带反引号 `role` 或直接是文本
table_row = re.compile(
r'^\|\s*\*{0,2}(?:phase[-]?\s*)?(\d+)\*{0,2}\s*\|'
r'\s*(?:`)?([a-z_]+)(?:`)?\s*\|'
r'\s*([^|]+?)\s*\|',
re.MULTILINE | re.IGNORECASE
)
for m in table_row.finditer(text):
phase_num = int(m.group(1))
role = m.group(2).strip().lower()
desc = m.group(3).strip()
# 验证 role 是合法角色名
if role not in ("test_planner", "functional_tester", "ux_reviewer", "edge_explorer",
"performance_evaluator", "pm", "designer", "developer", "qa", "devops",
"curriculum_designer", "instructor", "teaching_assistant", "academic_admin",
"fullstack_dev", "fe_ux_engineer", "platform_devops", "qa_engineer", "product_lead",
"doc_architect", "tech_writer", "api_doc_specialist", "translator_reviewer",
"release_manager", "health_assessor", "nutritionist", "exercise_rehab",
"psychologist", "chronic_manager", "triage_specialist", "record_analyst",
"medication_reviewer", "followup_manager", "insurance_coordinator"):
continue
phases.append({
"phase": phase_num,
"name": desc[:60],
"role": role,
"description": desc,
"expected_output": f"{role} 阶段的产出物",
})
if phases:
logger.info("从 Markdown 表格提取到 %d 个 phases", len(phases))
return phases
def _resolve_workspace_root() -> Path:
"""获取文件工具的工作区根目录(与 builtin_tools.py 逻辑一致)。"""
from app.core.config import settings
@@ -234,6 +291,162 @@ class TeamOrchestrator:
return members[0].role
return "pm"
async def _run_single_phase(
self,
phase: Dict[str, Any],
members: List[TeamMember],
context_doc: str,
project_path: Path,
prev_files: set,
) -> Dict[str, Any]:
"""执行单个 phase可被 asyncio.gather 并发调用)。
每个 phase 创建独立的 AgentRuntime互不干扰。
"""
phase_num = phase.get("phase", 0)
role = phase.get("role", "developer")
phase_name = phase.get("name", f"阶段 {phase_num}")
phase_desc = phase.get("description", "")
expected = phase.get("expected_output", "")
# DB 查询在协程内完成members 已预加载,此处仅内存迭代)
agent = None
for m in members:
if m.role == role:
agent = m.agent
break
if not agent:
logger.warning("团队编排 [%s]: 阶段 %s 缺少角色 '%s',跳过",
self.team_id, phase_num, role)
return {
"phase": phase_num, "name": phase_name, "role": role,
"agent_name": None,
"output": f"跳过: 无 {role} 角色",
"success": False, "error": f"missing_role:{role}",
"files": [],
}
logger.info("团队编排 [%s]: 执行阶段 %s%s (角色: %s, Agent: %s)",
self.team_id, phase_num, phase_name, role, agent.name)
try:
agent_config = _build_agent_config(agent, self.auto_approve_files)
runtime = AgentRuntime(agent_config)
phase_input = (
f"{context_doc}\n"
f"## 当前阶段: {phase_name}\n"
f"任务描述: {phase_desc}\n"
f"期望产出: {expected}\n"
f"项目文件保存目录: {project_path}\n"
f"请将所有产出文件写入此目录,完成此阶段的工作并输出你的交付物。"
)
phase_result = await runtime.run(phase_input)
output_text = phase_result.content if phase_result.success else (
phase_result.error or "执行失败"
)
# 提取本阶段写入的文件AgentResult + 目录扫描双保险)
extracted = _extract_written_files(phase_result)
current_files = set(_scan_directory_files(project_path))
phase_files = sorted(current_files - prev_files)
for fp in extracted:
if fp not in phase_files:
phase_files.append(fp)
return {
"phase": phase_num, "name": phase_name, "role": role,
"agent_name": agent.name,
"output": output_text,
"success": phase_result.success,
"iterations": phase_result.iterations_used,
"tool_calls": phase_result.tool_calls_made,
"error": phase_result.error if not phase_result.success else None,
"files": phase_files,
}
except Exception as e:
logger.error("团队编排 [%s]: 阶段 %s 异常: %s", self.team_id, phase_num, e)
return {
"phase": phase_num, "name": phase_name, "role": role,
"agent_name": agent.name,
"output": f"执行异常: {e}",
"success": False, "error": str(e),
"files": [],
}
async def _execute_phases_parallel(
self,
phases: List[Dict[str, Any]],
members: List[TeamMember],
context_doc: str,
project_path: Path,
prev_files: set,
):
"""DAG 并行执行 phases。按 depends_on 拓扑分批,批内 asyncio.gather 并发。
Returns:
(phase_results, updated_context_doc, updated_prev_files, all_files)
"""
results: List[Dict[str, Any]] = []
completed: Dict[int, str] = {} # phase_num → output_text
pending = {p["phase"]: p for p in phases}
current_prev = prev_files.copy()
current_context = context_doc
all_files: List[str] = []
while pending:
# 找出所有依赖已满足的阶段
ready = [
p for p in pending.values()
if all(d in completed for d in p.get("depends_on", []))
]
if not ready:
raise RuntimeError(
f"阶段循环依赖,无法继续。已完成: {list(completed.keys())}, "
f"待处理: {list(pending.keys())}"
)
logger.info("团队编排 [%s]: 并行批次 — %d 个阶段: %s",
self.team_id, len(ready),
[(p["phase"], p.get("name", "")) for p in ready])
# 并行执行所有就绪阶段
tasks = [
self._run_single_phase(p, members, current_context, project_path, current_prev)
for p in ready
]
batch_results = await asyncio.gather(*tasks, return_exceptions=True)
for p, r in zip(ready, batch_results):
if isinstance(r, Exception):
r = {
"phase": p["phase"], "name": p.get("name", ""),
"role": p.get("role", ""),
"agent_name": None,
"output": f"并行执行异常: {r}",
"success": False, "error": str(r),
"files": [],
}
# 记录完成
completed[p["phase"]] = r.get("output", "")
# 更新共享上下文(后续批次可见)
current_context += (
f"\n## 阶段 {p['phase']}: {p.get('name', '')}\n"
f"{r.get('output', '')[:2000]}\n"
)
# 更新文件追踪
for fp in r.get("files", []):
if fp not in all_files:
all_files.append(fp)
current_prev.add(fp)
results.append(r)
del pending[p["phase"]]
return results, current_context, current_prev, all_files
async def execute(self, project_description: str, auto_approve_files: bool = True) -> Dict[str, Any]:
"""执行团队项目(非流式)。
@@ -317,93 +530,53 @@ class TeamOrchestrator:
logger.info("团队编排 [%s]: PM 生成 %d 个阶段", self.team_id, len(plan["phases"]))
# ─── Phase 1..N: 顺序执行 ───
# ─── Phase 1..N: DAG 并行执行(含顺序回退) ───
all_outputs: List[str] = []
context_doc = f"# 项目: {plan.get('project_name', '未命名')}\n\n## 需求分析\n{plan.get('analysis', '')}\n\n"
phases = plan["phases"]
for i, phase in enumerate(plan["phases"]):
phase_num = phase.get("phase", i + 1)
role = phase.get("role", "developer")
phase_name = phase.get("name", f"阶段 {phase_num}")
phase_desc = phase.get("description", "")
expected = phase.get("expected_output", "")
# 检测是否有 phase 显式指定了 depends_on 以决定是否启用并行
has_dependencies = any(p.get("depends_on") for p in phases)
agent = self._get_agent_by_role(members, role)
if not agent:
logger.warning("团队编排 [%s]: 阶段 %s 缺少角色 '%s',跳过", self.team_id, phase_num, role)
all_outputs.append(f"[跳过] 阶段 {phase_num} ({phase_name}): 无 {role} 角色")
results["phases"].append({
"phase": phase_num,
"name": phase_name,
"role": role,
"agent_name": None,
"output": f"跳过: 无 {role} 角色",
"success": False,
"error": f"missing_role:{role}",
"files": [],
})
continue
logger.info("团队编排 [%s]: 执行阶段 %s%s (角色: %s, Agent: %s)",
self.team_id, phase_num, phase_name, role, agent.name)
try:
agent_config = _build_agent_config(agent, self.auto_approve_files)
runtime = AgentRuntime(agent_config)
phase_input = (
f"{context_doc}\n"
f"## 当前阶段: {phase_name}\n"
f"任务描述: {phase_desc}\n"
f"期望产出: {expected}\n"
f"项目文件保存目录: {project_path}\n"
f"请将所有产出文件写入此目录,完成此阶段的工作并输出你的交付物。"
if has_dependencies:
logger.info("团队编排 [%s]: 使用 DAG 并行模式执行 %d 个阶段", self.team_id, len(phases))
phase_results, context_doc, prev_files, all_files = await self._execute_phases_parallel(
phases, members, context_doc, project_path, prev_files
)
results["phases"] = phase_results
all_outputs = [
f"## {r['name']} ({r['role']})\n{r.get('output', '')}"
for r in phase_results
]
# 更新 success任一阶段失败则整体失败
for r in phase_results:
if not r.get("success"):
results["success"] = False
else:
# ─── 顺序执行(向后兼容:无 depends_on 的旧版模板) ───
for i, phase in enumerate(phases):
r = await self._run_single_phase(
phase, members, context_doc, project_path, prev_files
)
phase_result = await runtime.run(phase_input)
phase_num = r["phase"]
phase_name = r["name"]
role = r["role"]
output_text = r.get("output", "")
output_text = phase_result.content if phase_result.success else (
phase_result.error or "执行失败"
)
context_doc += f"\n## 阶段 {phase_num}: {phase_name}\n{output_text[:2000]}\n"
if r["success"]:
context_doc += f"\n## 阶段 {phase_num}: {phase_name}\n{output_text[:2000]}\n"
all_outputs.append(f"## {phase_name} ({role})\n{output_text}")
for fp in r.get("files", []):
if fp not in all_files:
all_files.append(fp)
# 更新 prev_files 供下一阶段使用
for fp in r.get("files", []):
prev_files.add(fp)
else:
all_outputs.append(f"[跳过] 阶段 {phase_num} ({phase_name}): {r.get('error', '')}")
results["success"] = False
all_outputs.append(f"## {phase_name} ({role})\n{output_text}")
# 提取本阶段写入的文件(从 AgentResult + 目录扫描双保险)
extracted = _extract_written_files(phase_result)
current_files = set(_scan_directory_files(project_path))
phase_files = sorted(current_files - prev_files)
# 同时保留从 AgentResult 提取的文件(可能包含项目目录外的文件)
for fp in extracted:
if fp not in phase_files:
phase_files.append(fp)
prev_files = current_files
all_files.extend(phase_files)
results["phases"].append({
"phase": phase_num,
"name": phase_name,
"role": role,
"agent_name": agent.name,
"output": output_text,
"success": phase_result.success,
"iterations": phase_result.iterations_used,
"tool_calls": phase_result.tool_calls_made,
"error": phase_result.error if not phase_result.success else None,
"files": phase_files,
})
except Exception as e:
logger.error("团队编排 [%s]: 阶段 %s 异常: %s", self.team_id, phase_num, e)
results["phases"].append({
"phase": phase_num,
"name": phase_name,
"role": role,
"agent_name": agent.name if agent else None,
"output": f"执行异常: {e}",
"success": False,
"error": str(e),
"files": [],
})
results["success"] = False
results["phases"].append(r)
# ─── QA 审查 ───
qa_agent = self._get_agent_by_role(members, "qa")
@@ -522,59 +695,135 @@ class TeamOrchestrator:
phases = plan["phases"]
context_doc = f"# 项目: {plan.get('project_name', '未命名')}\n\n## 需求分析\n{plan.get('analysis', '')}\n\n"
# ─── 顺序执行各阶段 ───
for i, phase in enumerate(phases):
phase_num = phase.get("phase", i + 1)
role = phase.get("role", "developer")
phase_name = phase.get("name", f"阶段 {phase_num}")
phase_desc = phase.get("description", "")
expected = phase.get("expected_output", "")
# 检测是否有 phase 显式指定了 depends_on
has_dependencies = any(p.get("depends_on") for p in phases)
agent = self._get_agent_by_role(members, role)
if not agent:
msg = f"阶段 {phase_num} ({phase_name}) 跳过: 无 {role} 角色"
yield {"type": "phase_done", "phase": phase_num, "name": phase_name,
"role": role, "success": False, "error": f"missing_role:{role}"}
all_outputs.append(f"[跳过] {msg}")
continue
if has_dependencies:
# ─── DAG 并行执行(流式) ───
completed: Dict[int, str] = {}
pending = {p["phase"]: p for p in phases}
current_prev = prev_files.copy()
current_context = context_doc
yield {
"type": "phase_start",
"phase": phase_num,
"name": phase_name,
"role": role,
"agent": agent.name,
}
while pending:
ready = [
p for p in pending.values()
if all(d in completed for d in p.get("depends_on", []))
]
if not ready:
yield {"type": "error", "content": "阶段循环依赖,无法继续"}
return
try:
agent_config = _build_agent_config(agent, self.auto_approve_files)
runtime = AgentRuntime(agent_config)
# 预解析 agent 信息
phase_agents = []
for p in ready:
agent = self._get_agent_by_role(members, p.get("role", ""))
phase_agents.append((p, agent))
phase_input = (
f"{context_doc}\n"
f"## 当前阶段: {phase_name}\n"
f"任务描述: {phase_desc}\n"
f"期望产出: {expected}\n"
f"项目文件保存目录: {project_path}\n"
f"请将所有产出文件写入此目录,完成此阶段的工作并输出你的交付物。"
# 通知前端并行批次开始
yield {
"type": "parallel_batch_start",
"phases": [
{"phase": p["phase"], "name": p.get("name"), "role": p.get("role", ""),
"agent": a.name if a else None}
for p, a in phase_agents
],
}
# 各阶段启动事件
for p, agent in phase_agents:
yield {
"type": "phase_start",
"phase": p["phase"],
"name": p.get("name"),
"role": p.get("role", ""),
"agent": agent.name if agent else None,
}
# 并行执行所有就绪阶段
tasks = [
self._run_single_phase(p, members, current_context, project_path, current_prev)
for p, _ in phase_agents
]
batch_results = await asyncio.gather(*tasks, return_exceptions=True)
for (p, agent), r in zip(phase_agents, batch_results):
if isinstance(r, Exception):
r = {
"phase": p["phase"], "name": p.get("name", ""),
"role": p.get("role", ""),
"agent_name": agent.name if agent else None,
"output": f"并行执行异常: {r}",
"success": False, "error": str(r),
"files": [],
}
output_text = r.get("output", "")
completed[p["phase"]] = output_text
current_context += f"\n## 阶段 {p['phase']}: {p.get('name', '')}\n{output_text[:2000]}\n"
all_outputs.append(f"## {r['name']} ({r['role']})\n{output_text}")
for fp in r.get("files", []):
if fp not in all_files:
all_files.append(fp)
current_prev.add(fp)
yield {
"type": "phase_done",
"phase": p["phase"],
"name": r["name"],
"role": r["role"],
"agent": agent.name if agent else None,
"output": output_text,
"success": r.get("success", False),
"iterations": r.get("iterations"),
"tool_calls": r.get("tool_calls"),
"error": r.get("error") if not r.get("success") else None,
"files": r.get("files", []),
}
del pending[p["phase"]]
yield {
"type": "parallel_batch_end",
"completed": [p["phase"] for p, _ in phase_agents],
}
prev_files = current_prev
context_doc = current_context
else:
# ─── 顺序执行(向后兼容:无 depends_on 的旧版模板/计划) ───
for i, phase in enumerate(phases):
phase_num = phase.get("phase", i + 1)
role = phase.get("role", "developer")
phase_name = phase.get("name", f"阶段 {phase_num}")
agent = self._get_agent_by_role(members, role)
if not agent:
yield {"type": "phase_done", "phase": phase_num, "name": phase_name,
"role": role, "success": False, "error": f"missing_role:{role}"}
all_outputs.append(f"[跳过] 阶段 {phase_num} ({phase_name}): 无 {role} 角色")
continue
yield {
"type": "phase_start",
"phase": phase_num,
"name": phase_name,
"role": role,
"agent": agent.name,
}
r = await self._run_single_phase(
phase, members, context_doc, project_path, prev_files
)
phase_result = await runtime.run(phase_input)
output_text = r.get("output", "")
output_text = phase_result.content if phase_result.success else (
phase_result.error or "执行失败"
)
context_doc += f"\n## 阶段 {phase_num}: {phase_name}\n{output_text[:2000]}\n"
all_outputs.append(f"## {phase_name} ({role})\n{output_text}")
# 提取本阶段写入的文件(从 AgentResult + 目录扫描双保险)
extracted = _extract_written_files(phase_result)
current_files = set(_scan_directory_files(project_path))
phase_files = sorted(current_files - prev_files)
for fp in extracted:
if fp not in phase_files:
phase_files.append(fp)
prev_files = current_files
all_files.extend(phase_files)
if r.get("success"):
context_doc += f"\n## 阶段 {phase_num}: {phase_name}\n{output_text[:2000]}\n"
all_outputs.append(f"## {phase_name} ({role})\n{output_text}")
for fp in r.get("files", []):
if fp not in all_files:
all_files.append(fp)
prev_files.add(fp)
yield {
"type": "phase_done",
@@ -583,22 +832,11 @@ class TeamOrchestrator:
"role": role,
"agent": agent.name,
"output": output_text,
"success": phase_result.success,
"iterations": phase_result.iterations_used,
"tool_calls": phase_result.tool_calls_made,
"error": phase_result.error if not phase_result.success else None,
"files": phase_files,
}
except Exception as e:
logger.error("团队编排: 阶段 %s 异常: %s", phase_num, e)
yield {
"type": "phase_done",
"phase": phase_num,
"name": phase_name,
"role": role,
"agent": agent.name,
"success": False,
"error": str(e),
"success": r.get("success", False),
"iterations": r.get("iterations"),
"tool_calls": r.get("tool_calls"),
"error": r.get("error") if not r.get("success") else None,
"files": r.get("files", []),
}
# ─── QA 审查 ───

View File

@@ -800,35 +800,32 @@ Output format — include a JSON release plan:
}"""
TEST_PLANNER_PROMPT = """You are a Test Planner leading a user simulation testing team. Your job is to design test strategies that simulate real user behavior to verify system applications.
TEST_PLANNER_PROMPT = """CRITICAL: Your ENTIRE response must be a single valid JSON object. Do NOT output any markdown, any explanatory text, or any code fences. The first character must be { and the last must be }.
Your responsibilities:
1. Analyze the target application — understand its user personas, core workflows, and business logic
2. Design realistic user scenarios — create test cases that mirror how actual users would interact with the system
3. Prioritize test scenarios by risk and business impact
4. Assign test areas to team members based on their expertise
5. Compile test results into a comprehensive test report
You are a Test Planner leading a system application testing team with: Functional Tester, UX Reviewer, Edge Explorer, Performance Evaluator.
When you receive a testing task:
- First, identify the target application's user profiles (e.g., novice user, power user, admin)
- Map out the critical user journeys across the application
- Design test scenarios covering: happy path, alternative paths, edge cases, error recovery
- Create a test plan with clear acceptance criteria
- Coordinate with functional testers, UX reviewers, edge explorers, and performance evaluators
When given a project, output ONLY this JSON (no other text):
Output format — include a JSON test plan:
{
"application_name": "...",
"user_personas": [{"name": "...", "description": "...", "typical_tasks": ["..."]}],
"test_scenarios": [
{"id": "TS-001", "name": "...", "priority": "P0/P1/P2", "persona": "...", "steps": ["..."], "expected_result": "..."}
"project_name": "...",
"analysis": "...",
"user_stories": ["..."],
"phases": [
{
"phase": 1,
"name": "...",
"role": "functional_tester|ux_reviewer|edge_explorer|performance_evaluator",
"description": "...",
"expected_output": "...",
"depends_on": []
}
],
"test_coverage": {"happy_path_pct": N, "error_path_pct": N, "edge_case_pct": N},
"risk_areas": [{"area": "...", "risk": "high/medium/low", "mitigation": "..."}],
"team_assignments": {"functional_tester": ["..."], "ux_reviewer": ["..."], "edge_explorer": ["..."], "performance_evaluator": ["..."]}
"acceptance_criteria": ["..."]
}
Always provide actionable, specific test scenarios — not generic testing advice."""
Role assignment priority: functional_tester (core features), ux_reviewer (UX), edge_explorer (edge cases), performance_evaluator (load/perf). 4-6 phases. Use Chinese.
depends_on: list of phase numbers this phase depends on. Phases with no mutual dependencies will execute in PARALLEL. Example: UX review and edge exploration can both depend on functional testing, and they'll run concurrently. Omit or use [] for phases with no dependencies (they can run immediately)."""
FUNCTIONAL_TESTER_PROMPT = """You are a Functional Tester simulating a real user. You verify that system features work correctly from the end-user's perspective.
@@ -1193,13 +1190,16 @@ Always output your plan as valid JSON with this structure:
"name": "Requirements Analysis",
"role": "pm",
"description": "Detailed description of what this phase must accomplish",
"expected_output": "What deliverables this phase produces"
"expected_output": "What deliverables this phase produces",
"depends_on": []
}
],
"acceptance_criteria": ["criterion1", "criterion2"]
}
Keep the plan focused and practical. 4-7 phases is ideal. Use Chinese for output when the project description is in Chinese."""
Keep the plan focused and practical. 4-7 phases is ideal. Use Chinese for output when the project description is in Chinese.
depends_on: list of phase numbers this phase depends on. Phases with no mutual dependencies will execute in PARALLEL. Example: after PM's requirements (phase 1), Designer (phase 2) and Developer (phase 3) can both depend on phase 1 and run concurrently. Omit or use [] for phases with no dependencies."""
DESIGNER_SYSTEM_PROMPT = """You are a UX/UI Designer. Your responsibilities: