From e0efa7e9b15536e29df9696a4fb47bf1418057fd Mon Sep 17 00:00:00 2001 From: renjianbo <18691577328@163.com> Date: Thu, 18 Jun 2026 21:31:16 +0800 Subject: [PATCH] feat: add DAG-based parallel phase execution with depends_on support MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phases can now declare `depends_on` to enable parallel execution of independent phases via asyncio.gather. Backward compatible — templates without depends_on continue to use sequential execution. Co-Authored-By: Claude Opus 4.6 --- backend/app/services/team_orchestrator.py | 530 ++++++++++++++++------ backend/app/services/team_service.py | 48 +- 2 files changed, 408 insertions(+), 170 deletions(-) diff --git a/backend/app/services/team_orchestrator.py b/backend/app/services/team_orchestrator.py index 2d2f229..b0bea16 100644 --- a/backend/app/services/team_orchestrator.py +++ b/backend/app/services/team_orchestrator.py @@ -9,6 +9,7 @@ """ from __future__ import annotations +import asyncio import json import re import logging @@ -74,7 +75,7 @@ def _build_agent_config(agent: Agent, auto_approve_files: bool = True) -> AgentC def _parse_plan_json(raw_output: str) -> Dict[str, Any]: - """从 LLM 原始输出中提取 JSON 计划。""" + """从 LLM 原始输出中提取 JSON 计划(含 Markdown 表格回退)。""" text = raw_output.strip() # 尝试直接解析 try: @@ -82,24 +83,80 @@ def _parse_plan_json(raw_output: str) -> Dict[str, Any]: except json.JSONDecodeError: pass # 从 ```json ... ``` 中提取 - import re m = re.search(r'```(?:json)?\s*([\s\S]*?)\s*```', text) if m: try: return json.loads(m.group(1)) except json.JSONDecodeError: pass - # 从 { ... } 中提取 + # 从 { ... } 中提取(含 phases 键) m = re.search(r'\{[\s\S]*"phases"[\s\S]*\}', text) if m: try: return json.loads(m.group()) except json.JSONDecodeError: pass + + # ─── 回退:从 Markdown 表格提取 phases ─── + # 匹配形如 | **phase-1** | `functional_tester` | ... | 的行 + phases = _parse_phases_from_markdown(text) + if phases: + project_name = "" + m = re.search(r'(?:项目|测试).*?[::]\s*(.+?)(?:\n|$)', text) + if m: + project_name = m.group(1).strip() + return { + "project_name": project_name or "未命名项目", + "analysis": "", + "user_stories": [], + "phases": phases, + "acceptance_criteria": [], + } + logger.warning("无法从输出中解析 JSON 计划,原始输出: %.500s", text) return {} +def _parse_phases_from_markdown(text: str) -> List[Dict[str, Any]]: + """从 Markdown 表格中提取 phase 信息。支持格式: + | **phase-1** | `functional_tester` | 核心任务 | 工时 | + | phase-1 | functional_tester | 描述 | + """ + phases = [] + # 匹配表格行: | (可选**的) phase-N (可选**) | role | description | ... | + # role 可能带反引号 `role` 或直接是文本 + table_row = re.compile( + r'^\|\s*\*{0,2}(?:phase[-–]?\s*)?(\d+)\*{0,2}\s*\|' + r'\s*(?:`)?([a-z_]+)(?:`)?\s*\|' + r'\s*([^|]+?)\s*\|', + re.MULTILINE | re.IGNORECASE + ) + for m in table_row.finditer(text): + phase_num = int(m.group(1)) + role = m.group(2).strip().lower() + desc = m.group(3).strip() + # 验证 role 是合法角色名 + if role not in ("test_planner", "functional_tester", "ux_reviewer", "edge_explorer", + "performance_evaluator", "pm", "designer", "developer", "qa", "devops", + "curriculum_designer", "instructor", "teaching_assistant", "academic_admin", + "fullstack_dev", "fe_ux_engineer", "platform_devops", "qa_engineer", "product_lead", + "doc_architect", "tech_writer", "api_doc_specialist", "translator_reviewer", + "release_manager", "health_assessor", "nutritionist", "exercise_rehab", + "psychologist", "chronic_manager", "triage_specialist", "record_analyst", + "medication_reviewer", "followup_manager", "insurance_coordinator"): + continue + phases.append({ + "phase": phase_num, + "name": desc[:60], + "role": role, + "description": desc, + "expected_output": f"{role} 阶段的产出物", + }) + if phases: + logger.info("从 Markdown 表格提取到 %d 个 phases", len(phases)) + return phases + + def _resolve_workspace_root() -> Path: """获取文件工具的工作区根目录(与 builtin_tools.py 逻辑一致)。""" from app.core.config import settings @@ -234,6 +291,162 @@ class TeamOrchestrator: return members[0].role return "pm" + async def _run_single_phase( + self, + phase: Dict[str, Any], + members: List[TeamMember], + context_doc: str, + project_path: Path, + prev_files: set, + ) -> Dict[str, Any]: + """执行单个 phase(可被 asyncio.gather 并发调用)。 + + 每个 phase 创建独立的 AgentRuntime,互不干扰。 + """ + phase_num = phase.get("phase", 0) + role = phase.get("role", "developer") + phase_name = phase.get("name", f"阶段 {phase_num}") + phase_desc = phase.get("description", "") + expected = phase.get("expected_output", "") + + # DB 查询在协程内完成(members 已预加载,此处仅内存迭代) + agent = None + for m in members: + if m.role == role: + agent = m.agent + break + + if not agent: + logger.warning("团队编排 [%s]: 阶段 %s 缺少角色 '%s',跳过", + self.team_id, phase_num, role) + return { + "phase": phase_num, "name": phase_name, "role": role, + "agent_name": None, + "output": f"跳过: 无 {role} 角色", + "success": False, "error": f"missing_role:{role}", + "files": [], + } + + logger.info("团队编排 [%s]: 执行阶段 %s — %s (角色: %s, Agent: %s)", + self.team_id, phase_num, phase_name, role, agent.name) + + try: + agent_config = _build_agent_config(agent, self.auto_approve_files) + runtime = AgentRuntime(agent_config) + + phase_input = ( + f"{context_doc}\n" + f"## 当前阶段: {phase_name}\n" + f"任务描述: {phase_desc}\n" + f"期望产出: {expected}\n" + f"项目文件保存目录: {project_path}\n" + f"请将所有产出文件写入此目录,完成此阶段的工作并输出你的交付物。" + ) + phase_result = await runtime.run(phase_input) + + output_text = phase_result.content if phase_result.success else ( + phase_result.error or "执行失败" + ) + + # 提取本阶段写入的文件(AgentResult + 目录扫描双保险) + extracted = _extract_written_files(phase_result) + current_files = set(_scan_directory_files(project_path)) + phase_files = sorted(current_files - prev_files) + for fp in extracted: + if fp not in phase_files: + phase_files.append(fp) + + return { + "phase": phase_num, "name": phase_name, "role": role, + "agent_name": agent.name, + "output": output_text, + "success": phase_result.success, + "iterations": phase_result.iterations_used, + "tool_calls": phase_result.tool_calls_made, + "error": phase_result.error if not phase_result.success else None, + "files": phase_files, + } + except Exception as e: + logger.error("团队编排 [%s]: 阶段 %s 异常: %s", self.team_id, phase_num, e) + return { + "phase": phase_num, "name": phase_name, "role": role, + "agent_name": agent.name, + "output": f"执行异常: {e}", + "success": False, "error": str(e), + "files": [], + } + + async def _execute_phases_parallel( + self, + phases: List[Dict[str, Any]], + members: List[TeamMember], + context_doc: str, + project_path: Path, + prev_files: set, + ): + """DAG 并行执行 phases。按 depends_on 拓扑分批,批内 asyncio.gather 并发。 + + Returns: + (phase_results, updated_context_doc, updated_prev_files, all_files) + """ + results: List[Dict[str, Any]] = [] + completed: Dict[int, str] = {} # phase_num → output_text + pending = {p["phase"]: p for p in phases} + current_prev = prev_files.copy() + current_context = context_doc + all_files: List[str] = [] + + while pending: + # 找出所有依赖已满足的阶段 + ready = [ + p for p in pending.values() + if all(d in completed for d in p.get("depends_on", [])) + ] + if not ready: + raise RuntimeError( + f"阶段循环依赖,无法继续。已完成: {list(completed.keys())}, " + f"待处理: {list(pending.keys())}" + ) + + logger.info("团队编排 [%s]: 并行批次 — %d 个阶段: %s", + self.team_id, len(ready), + [(p["phase"], p.get("name", "")) for p in ready]) + + # 并行执行所有就绪阶段 + tasks = [ + self._run_single_phase(p, members, current_context, project_path, current_prev) + for p in ready + ] + batch_results = await asyncio.gather(*tasks, return_exceptions=True) + + for p, r in zip(ready, batch_results): + if isinstance(r, Exception): + r = { + "phase": p["phase"], "name": p.get("name", ""), + "role": p.get("role", ""), + "agent_name": None, + "output": f"并行执行异常: {r}", + "success": False, "error": str(r), + "files": [], + } + + # 记录完成 + completed[p["phase"]] = r.get("output", "") + # 更新共享上下文(后续批次可见) + current_context += ( + f"\n## 阶段 {p['phase']}: {p.get('name', '')}\n" + f"{r.get('output', '')[:2000]}\n" + ) + # 更新文件追踪 + for fp in r.get("files", []): + if fp not in all_files: + all_files.append(fp) + current_prev.add(fp) + results.append(r) + del pending[p["phase"]] + + return results, current_context, current_prev, all_files + async def execute(self, project_description: str, auto_approve_files: bool = True) -> Dict[str, Any]: """执行团队项目(非流式)。 @@ -317,93 +530,53 @@ class TeamOrchestrator: logger.info("团队编排 [%s]: PM 生成 %d 个阶段", self.team_id, len(plan["phases"])) - # ─── Phase 1..N: 顺序执行 ─── + # ─── Phase 1..N: DAG 并行执行(含顺序回退) ─── all_outputs: List[str] = [] context_doc = f"# 项目: {plan.get('project_name', '未命名')}\n\n## 需求分析\n{plan.get('analysis', '')}\n\n" + phases = plan["phases"] - for i, phase in enumerate(plan["phases"]): - phase_num = phase.get("phase", i + 1) - role = phase.get("role", "developer") - phase_name = phase.get("name", f"阶段 {phase_num}") - phase_desc = phase.get("description", "") - expected = phase.get("expected_output", "") + # 检测是否有 phase 显式指定了 depends_on 以决定是否启用并行 + has_dependencies = any(p.get("depends_on") for p in phases) - agent = self._get_agent_by_role(members, role) - if not agent: - logger.warning("团队编排 [%s]: 阶段 %s 缺少角色 '%s',跳过", self.team_id, phase_num, role) - all_outputs.append(f"[跳过] 阶段 {phase_num} ({phase_name}): 无 {role} 角色") - results["phases"].append({ - "phase": phase_num, - "name": phase_name, - "role": role, - "agent_name": None, - "output": f"跳过: 无 {role} 角色", - "success": False, - "error": f"missing_role:{role}", - "files": [], - }) - continue - - logger.info("团队编排 [%s]: 执行阶段 %s — %s (角色: %s, Agent: %s)", - self.team_id, phase_num, phase_name, role, agent.name) - - try: - agent_config = _build_agent_config(agent, self.auto_approve_files) - runtime = AgentRuntime(agent_config) - - phase_input = ( - f"{context_doc}\n" - f"## 当前阶段: {phase_name}\n" - f"任务描述: {phase_desc}\n" - f"期望产出: {expected}\n" - f"项目文件保存目录: {project_path}\n" - f"请将所有产出文件写入此目录,完成此阶段的工作并输出你的交付物。" + if has_dependencies: + logger.info("团队编排 [%s]: 使用 DAG 并行模式执行 %d 个阶段", self.team_id, len(phases)) + phase_results, context_doc, prev_files, all_files = await self._execute_phases_parallel( + phases, members, context_doc, project_path, prev_files + ) + results["phases"] = phase_results + all_outputs = [ + f"## {r['name']} ({r['role']})\n{r.get('output', '')}" + for r in phase_results + ] + # 更新 success(任一阶段失败则整体失败) + for r in phase_results: + if not r.get("success"): + results["success"] = False + else: + # ─── 顺序执行(向后兼容:无 depends_on 的旧版模板) ─── + for i, phase in enumerate(phases): + r = await self._run_single_phase( + phase, members, context_doc, project_path, prev_files ) - phase_result = await runtime.run(phase_input) + phase_num = r["phase"] + phase_name = r["name"] + role = r["role"] + output_text = r.get("output", "") - output_text = phase_result.content if phase_result.success else ( - phase_result.error or "执行失败" - ) - context_doc += f"\n## 阶段 {phase_num}: {phase_name}\n{output_text[:2000]}\n" + if r["success"]: + context_doc += f"\n## 阶段 {phase_num}: {phase_name}\n{output_text[:2000]}\n" + all_outputs.append(f"## {phase_name} ({role})\n{output_text}") + for fp in r.get("files", []): + if fp not in all_files: + all_files.append(fp) + # 更新 prev_files 供下一阶段使用 + for fp in r.get("files", []): + prev_files.add(fp) + else: + all_outputs.append(f"[跳过] 阶段 {phase_num} ({phase_name}): {r.get('error', '')}") + results["success"] = False - all_outputs.append(f"## {phase_name} ({role})\n{output_text}") - - # 提取本阶段写入的文件(从 AgentResult + 目录扫描双保险) - extracted = _extract_written_files(phase_result) - current_files = set(_scan_directory_files(project_path)) - phase_files = sorted(current_files - prev_files) - # 同时保留从 AgentResult 提取的文件(可能包含项目目录外的文件) - for fp in extracted: - if fp not in phase_files: - phase_files.append(fp) - prev_files = current_files - all_files.extend(phase_files) - - results["phases"].append({ - "phase": phase_num, - "name": phase_name, - "role": role, - "agent_name": agent.name, - "output": output_text, - "success": phase_result.success, - "iterations": phase_result.iterations_used, - "tool_calls": phase_result.tool_calls_made, - "error": phase_result.error if not phase_result.success else None, - "files": phase_files, - }) - except Exception as e: - logger.error("团队编排 [%s]: 阶段 %s 异常: %s", self.team_id, phase_num, e) - results["phases"].append({ - "phase": phase_num, - "name": phase_name, - "role": role, - "agent_name": agent.name if agent else None, - "output": f"执行异常: {e}", - "success": False, - "error": str(e), - "files": [], - }) - results["success"] = False + results["phases"].append(r) # ─── QA 审查 ─── qa_agent = self._get_agent_by_role(members, "qa") @@ -522,59 +695,135 @@ class TeamOrchestrator: phases = plan["phases"] context_doc = f"# 项目: {plan.get('project_name', '未命名')}\n\n## 需求分析\n{plan.get('analysis', '')}\n\n" - # ─── 顺序执行各阶段 ─── - for i, phase in enumerate(phases): - phase_num = phase.get("phase", i + 1) - role = phase.get("role", "developer") - phase_name = phase.get("name", f"阶段 {phase_num}") - phase_desc = phase.get("description", "") - expected = phase.get("expected_output", "") + # 检测是否有 phase 显式指定了 depends_on + has_dependencies = any(p.get("depends_on") for p in phases) - agent = self._get_agent_by_role(members, role) - if not agent: - msg = f"阶段 {phase_num} ({phase_name}) 跳过: 无 {role} 角色" - yield {"type": "phase_done", "phase": phase_num, "name": phase_name, - "role": role, "success": False, "error": f"missing_role:{role}"} - all_outputs.append(f"[跳过] {msg}") - continue + if has_dependencies: + # ─── DAG 并行执行(流式) ─── + completed: Dict[int, str] = {} + pending = {p["phase"]: p for p in phases} + current_prev = prev_files.copy() + current_context = context_doc - yield { - "type": "phase_start", - "phase": phase_num, - "name": phase_name, - "role": role, - "agent": agent.name, - } + while pending: + ready = [ + p for p in pending.values() + if all(d in completed for d in p.get("depends_on", [])) + ] + if not ready: + yield {"type": "error", "content": "阶段循环依赖,无法继续"} + return - try: - agent_config = _build_agent_config(agent, self.auto_approve_files) - runtime = AgentRuntime(agent_config) + # 预解析 agent 信息 + phase_agents = [] + for p in ready: + agent = self._get_agent_by_role(members, p.get("role", "")) + phase_agents.append((p, agent)) - phase_input = ( - f"{context_doc}\n" - f"## 当前阶段: {phase_name}\n" - f"任务描述: {phase_desc}\n" - f"期望产出: {expected}\n" - f"项目文件保存目录: {project_path}\n" - f"请将所有产出文件写入此目录,完成此阶段的工作并输出你的交付物。" + # 通知前端并行批次开始 + yield { + "type": "parallel_batch_start", + "phases": [ + {"phase": p["phase"], "name": p.get("name"), "role": p.get("role", ""), + "agent": a.name if a else None} + for p, a in phase_agents + ], + } + + # 各阶段启动事件 + for p, agent in phase_agents: + yield { + "type": "phase_start", + "phase": p["phase"], + "name": p.get("name"), + "role": p.get("role", ""), + "agent": agent.name if agent else None, + } + + # 并行执行所有就绪阶段 + tasks = [ + self._run_single_phase(p, members, current_context, project_path, current_prev) + for p, _ in phase_agents + ] + batch_results = await asyncio.gather(*tasks, return_exceptions=True) + + for (p, agent), r in zip(phase_agents, batch_results): + if isinstance(r, Exception): + r = { + "phase": p["phase"], "name": p.get("name", ""), + "role": p.get("role", ""), + "agent_name": agent.name if agent else None, + "output": f"并行执行异常: {r}", + "success": False, "error": str(r), + "files": [], + } + + output_text = r.get("output", "") + completed[p["phase"]] = output_text + current_context += f"\n## 阶段 {p['phase']}: {p.get('name', '')}\n{output_text[:2000]}\n" + all_outputs.append(f"## {r['name']} ({r['role']})\n{output_text}") + + for fp in r.get("files", []): + if fp not in all_files: + all_files.append(fp) + current_prev.add(fp) + + yield { + "type": "phase_done", + "phase": p["phase"], + "name": r["name"], + "role": r["role"], + "agent": agent.name if agent else None, + "output": output_text, + "success": r.get("success", False), + "iterations": r.get("iterations"), + "tool_calls": r.get("tool_calls"), + "error": r.get("error") if not r.get("success") else None, + "files": r.get("files", []), + } + del pending[p["phase"]] + + yield { + "type": "parallel_batch_end", + "completed": [p["phase"] for p, _ in phase_agents], + } + + prev_files = current_prev + context_doc = current_context + else: + # ─── 顺序执行(向后兼容:无 depends_on 的旧版模板/计划) ─── + for i, phase in enumerate(phases): + phase_num = phase.get("phase", i + 1) + role = phase.get("role", "developer") + phase_name = phase.get("name", f"阶段 {phase_num}") + + agent = self._get_agent_by_role(members, role) + if not agent: + yield {"type": "phase_done", "phase": phase_num, "name": phase_name, + "role": role, "success": False, "error": f"missing_role:{role}"} + all_outputs.append(f"[跳过] 阶段 {phase_num} ({phase_name}): 无 {role} 角色") + continue + + yield { + "type": "phase_start", + "phase": phase_num, + "name": phase_name, + "role": role, + "agent": agent.name, + } + + r = await self._run_single_phase( + phase, members, context_doc, project_path, prev_files ) - phase_result = await runtime.run(phase_input) + output_text = r.get("output", "") - output_text = phase_result.content if phase_result.success else ( - phase_result.error or "执行失败" - ) - context_doc += f"\n## 阶段 {phase_num}: {phase_name}\n{output_text[:2000]}\n" - all_outputs.append(f"## {phase_name} ({role})\n{output_text}") - - # 提取本阶段写入的文件(从 AgentResult + 目录扫描双保险) - extracted = _extract_written_files(phase_result) - current_files = set(_scan_directory_files(project_path)) - phase_files = sorted(current_files - prev_files) - for fp in extracted: - if fp not in phase_files: - phase_files.append(fp) - prev_files = current_files - all_files.extend(phase_files) + if r.get("success"): + context_doc += f"\n## 阶段 {phase_num}: {phase_name}\n{output_text[:2000]}\n" + all_outputs.append(f"## {phase_name} ({role})\n{output_text}") + for fp in r.get("files", []): + if fp not in all_files: + all_files.append(fp) + prev_files.add(fp) yield { "type": "phase_done", @@ -583,22 +832,11 @@ class TeamOrchestrator: "role": role, "agent": agent.name, "output": output_text, - "success": phase_result.success, - "iterations": phase_result.iterations_used, - "tool_calls": phase_result.tool_calls_made, - "error": phase_result.error if not phase_result.success else None, - "files": phase_files, - } - except Exception as e: - logger.error("团队编排: 阶段 %s 异常: %s", phase_num, e) - yield { - "type": "phase_done", - "phase": phase_num, - "name": phase_name, - "role": role, - "agent": agent.name, - "success": False, - "error": str(e), + "success": r.get("success", False), + "iterations": r.get("iterations"), + "tool_calls": r.get("tool_calls"), + "error": r.get("error") if not r.get("success") else None, + "files": r.get("files", []), } # ─── QA 审查 ─── diff --git a/backend/app/services/team_service.py b/backend/app/services/team_service.py index 4646033..e4bfec0 100644 --- a/backend/app/services/team_service.py +++ b/backend/app/services/team_service.py @@ -800,35 +800,32 @@ Output format — include a JSON release plan: }""" -TEST_PLANNER_PROMPT = """You are a Test Planner leading a user simulation testing team. Your job is to design test strategies that simulate real user behavior to verify system applications. +TEST_PLANNER_PROMPT = """CRITICAL: Your ENTIRE response must be a single valid JSON object. Do NOT output any markdown, any explanatory text, or any code fences. The first character must be { and the last must be }. -Your responsibilities: -1. Analyze the target application — understand its user personas, core workflows, and business logic -2. Design realistic user scenarios — create test cases that mirror how actual users would interact with the system -3. Prioritize test scenarios by risk and business impact -4. Assign test areas to team members based on their expertise -5. Compile test results into a comprehensive test report +You are a Test Planner leading a system application testing team with: Functional Tester, UX Reviewer, Edge Explorer, Performance Evaluator. -When you receive a testing task: -- First, identify the target application's user profiles (e.g., novice user, power user, admin) -- Map out the critical user journeys across the application -- Design test scenarios covering: happy path, alternative paths, edge cases, error recovery -- Create a test plan with clear acceptance criteria -- Coordinate with functional testers, UX reviewers, edge explorers, and performance evaluators +When given a project, output ONLY this JSON (no other text): -Output format — include a JSON test plan: { - "application_name": "...", - "user_personas": [{"name": "...", "description": "...", "typical_tasks": ["..."]}], - "test_scenarios": [ - {"id": "TS-001", "name": "...", "priority": "P0/P1/P2", "persona": "...", "steps": ["..."], "expected_result": "..."} + "project_name": "...", + "analysis": "...", + "user_stories": ["..."], + "phases": [ + { + "phase": 1, + "name": "...", + "role": "functional_tester|ux_reviewer|edge_explorer|performance_evaluator", + "description": "...", + "expected_output": "...", + "depends_on": [] + } ], - "test_coverage": {"happy_path_pct": N, "error_path_pct": N, "edge_case_pct": N}, - "risk_areas": [{"area": "...", "risk": "high/medium/low", "mitigation": "..."}], - "team_assignments": {"functional_tester": ["..."], "ux_reviewer": ["..."], "edge_explorer": ["..."], "performance_evaluator": ["..."]} + "acceptance_criteria": ["..."] } -Always provide actionable, specific test scenarios — not generic testing advice.""" +Role assignment priority: functional_tester (core features), ux_reviewer (UX), edge_explorer (edge cases), performance_evaluator (load/perf). 4-6 phases. Use Chinese. + +depends_on: list of phase numbers this phase depends on. Phases with no mutual dependencies will execute in PARALLEL. Example: UX review and edge exploration can both depend on functional testing, and they'll run concurrently. Omit or use [] for phases with no dependencies (they can run immediately).""" FUNCTIONAL_TESTER_PROMPT = """You are a Functional Tester simulating a real user. You verify that system features work correctly from the end-user's perspective. @@ -1193,13 +1190,16 @@ Always output your plan as valid JSON with this structure: "name": "Requirements Analysis", "role": "pm", "description": "Detailed description of what this phase must accomplish", - "expected_output": "What deliverables this phase produces" + "expected_output": "What deliverables this phase produces", + "depends_on": [] } ], "acceptance_criteria": ["criterion1", "criterion2"] } -Keep the plan focused and practical. 4-7 phases is ideal. Use Chinese for output when the project description is in Chinese.""" +Keep the plan focused and practical. 4-7 phases is ideal. Use Chinese for output when the project description is in Chinese. + +depends_on: list of phase numbers this phase depends on. Phases with no mutual dependencies will execute in PARALLEL. Example: after PM's requirements (phase 1), Designer (phase 2) and Developer (phase 3) can both depend on phase 1 and run concurrently. Omit or use [] for phases with no dependencies.""" DESIGNER_SYSTEM_PROMPT = """You are a UX/UI Designer. Your responsibilities: