From 926ec6c0a17d3ad168ea15fc00e0252ef054a7c0 Mon Sep 17 00:00:00 2001 From: renjianbo <18691577328@163.com> Date: Fri, 8 May 2026 19:58:53 +0800 Subject: [PATCH] feat: add Main Agent core service, tools, and Celery tasks (Phase 2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 数字员工大脑 — Main Agent 核心实现: - MainAgentService: 目标分解(LLM)、任务调度、进度监控、失败重试、自主循环 - 4个 Main Agent 专有工具: create_task / assign_task / check_progress / notify_user - Celery 异步任务: decompose_goal / execute_goal / execute_task / autonomy_tick - Goal API 增强: decompose / execute-async / replan 端点 Co-Authored-By: Claude Opus 4.6 --- backend/app/api/goals.py | 92 +++- backend/app/core/tools_bootstrap.py | 14 +- backend/app/services/builtin_tools.py | 233 ++++++++++ backend/app/services/main_agent_service.py | 511 +++++++++++++++++++++ backend/app/tasks/goal_tasks.py | 132 ++++++ 5 files changed, 980 insertions(+), 2 deletions(-) create mode 100644 backend/app/services/main_agent_service.py create mode 100644 backend/app/tasks/goal_tasks.py diff --git a/backend/app/api/goals.py b/backend/app/api/goals.py index 7f444bb..e5b48d1 100644 --- a/backend/app/api/goals.py +++ b/backend/app/api/goals.py @@ -1,7 +1,7 @@ """ Goal API — 目标管理接口 """ -from fastapi import APIRouter, Depends, Query +from fastapi import APIRouter, Depends, Query, HTTPException from sqlalchemy.orm import Session from pydantic import BaseModel, Field from typing import List, Optional, Dict, Any @@ -202,3 +202,93 @@ def get_goal_task_tree( ): """获取目标的任务树""" return goal_service.get_goal_task_tree(db, goal_id) + + +class DecomposeResponse(BaseModel): + goal_id: str + task_count: int + message: str + + +@router.post("/{goal_id}/decompose", response_model=DecomposeResponse) +def decompose_goal( + goal_id: str, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db), +): + """触发目标分解(同步调用 LLM 拆解为 Task 树)""" + import asyncio + from app.services.main_agent_service import MainAgentService + + service = MainAgentService(db) + try: + # 先验证 Goal 存在 + goal_service.get_goal(db, goal_id) + goal = asyncio.run(service.decompose_goal(goal_id)) + tasks = goal_service.list_tasks(db, goal_id=goal_id, limit=200) + return { + "goal_id": goal_id, + "task_count": len(tasks), + "message": f"目标已分解为 {len(tasks)} 个任务", + } + except Exception as e: + logger.error(f"Goal decompose failed: {e}", exc_info=True) + raise HTTPException(status_code=500, detail=f"目标分解失败: {e}") + + +class ExecuteAsyncResponse(BaseModel): + goal_id: str + celery_task_id: Optional[str] + message: str + + +@router.post("/{goal_id}/execute-async", response_model=ExecuteAsyncResponse) +def execute_goal_async( + goal_id: str, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db), +): + """异步执行目标(通过 Celery Worker 执行,适合长时间运行的目标)""" + from app.tasks.goal_tasks import execute_goal_task + + goal_service.get_goal(db, goal_id) + goal_service.update_goal(db, goal_id, status="active") + + task = execute_goal_task.delay(goal_id) + logger.info(f"Goal {goal_id} dispatched to Celery: {task.id}") + + return { + "goal_id": goal_id, + "celery_task_id": task.id, + "message": "目标已提交异步执行,可通过 goal 状态和 task 列表追踪进度", + } + + +@router.post("/{goal_id}/replan", response_model=DecomposeResponse) +def replan_goal( + goal_id: str, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db), +): + """重新规划目标(清除未完成任务后重新分解)""" + import asyncio + from app.services.main_agent_service import MainAgentService + from app.models.task import Task + + goal = goal_service.get_goal(db, goal_id) + + # 清除 pending 和 failed 任务(保留已完成的) + db.query(Task).filter( + Task.goal_id == goal_id, + Task.status.in_(["pending", "failed", "cancelled"]), + ).delete() + db.commit() + + service = MainAgentService(db) + goal = asyncio.run(service.decompose_goal(goal_id)) + tasks = goal_service.list_tasks(db, goal_id=goal_id, limit=200) + return { + "goal_id": goal_id, + "task_count": len(tasks), + "message": f"目标已重新规划,生成 {len(tasks)} 个新任务", + } diff --git a/backend/app/core/tools_bootstrap.py b/backend/app/core/tools_bootstrap.py index 881bd38..4c1fbec 100644 --- a/backend/app/core/tools_bootstrap.py +++ b/backend/app/core/tools_bootstrap.py @@ -8,7 +8,7 @@ logger = logging.getLogger(__name__) _registered = False -_EXPECTED_BUILTIN = 43 +_EXPECTED_BUILTIN = 47 def ensure_builtin_tools_registered() -> None: @@ -61,6 +61,10 @@ def ensure_builtin_tools_registered() -> None: image_vision_tool, speech_to_text_tool, text_to_speech_tool, + main_agent_create_task, + main_agent_assign_task, + main_agent_check_progress, + main_agent_notify_user, HTTP_REQUEST_SCHEMA, FILE_READ_SCHEMA, FILE_WRITE_SCHEMA, @@ -104,6 +108,10 @@ def ensure_builtin_tools_registered() -> None: IMAGE_VISION_SCHEMA, SPEECH_TO_TEXT_SCHEMA, TEXT_TO_SPEECH_SCHEMA, + MAIN_AGENT_CREATE_TASK_SCHEMA, + MAIN_AGENT_ASSIGN_TASK_SCHEMA, + MAIN_AGENT_CHECK_PROGRESS_SCHEMA, + MAIN_AGENT_NOTIFY_USER_SCHEMA, ) tool_registry.register_builtin_tool("http_request", http_request_tool, HTTP_REQUEST_SCHEMA) @@ -149,6 +157,10 @@ def ensure_builtin_tools_registered() -> None: tool_registry.register_builtin_tool("image_vision", image_vision_tool, IMAGE_VISION_SCHEMA) tool_registry.register_builtin_tool("speech_to_text", speech_to_text_tool, SPEECH_TO_TEXT_SCHEMA) tool_registry.register_builtin_tool("text_to_speech", text_to_speech_tool, TEXT_TO_SPEECH_SCHEMA) + tool_registry.register_builtin_tool("create_task", main_agent_create_task, MAIN_AGENT_CREATE_TASK_SCHEMA) + tool_registry.register_builtin_tool("assign_task", main_agent_assign_task, MAIN_AGENT_ASSIGN_TASK_SCHEMA) + tool_registry.register_builtin_tool("check_progress", main_agent_check_progress, MAIN_AGENT_CHECK_PROGRESS_SCHEMA) + tool_registry.register_builtin_tool("notify_user", main_agent_notify_user, MAIN_AGENT_NOTIFY_USER_SCHEMA) _registered = True n = tool_registry.builtin_tool_count() diff --git a/backend/app/services/builtin_tools.py b/backend/app/services/builtin_tools.py index b3c657a..96e0475 100644 --- a/backend/app/services/builtin_tools.py +++ b/backend/app/services/builtin_tools.py @@ -5161,6 +5161,239 @@ SPEECH_TO_TEXT_SCHEMA = { }, } +# ═══════════════════════════════════════════════════════════════ +# Main Agent 专有工具:Goal/Task 管理 +# ═══════════════════════════════════════════════════════════════ + + +async def main_agent_create_task( + goal_id: str, + title: str, + description: str = "", + priority: int = 5, + depends_on: Optional[List[str]] = None, + assigned_agent_id: Optional[str] = None, + assigned_agent_name: Optional[str] = None, +) -> str: + """Main Agent 工具:创建一个子任务。 + + 将目标分解为可执行的子任务,写入数据库。 + """ + from app.core.database import SessionLocal + from app.services.goal_service import create_task, get_goal + + db = None + try: + db = SessionLocal() + get_goal(db, goal_id) # 验证 Goal 存在 + task = create_task( + db=db, + goal_id=goal_id, + title=title, + description=description, + priority=priority, + depends_on=depends_on or [], + assigned_agent_id=assigned_agent_id, + assigned_agent_name=assigned_agent_name, + ) + return json.dumps({ + "task_id": task.id, + "title": task.title, + "status": task.status, + "message": f"任务 '{title}' 创建成功", + }, ensure_ascii=False) + except Exception as e: + logger.error(f"main_agent_create_task 失败: {e}", exc_info=True) + return json.dumps({"error": f"创建任务失败: {e}"}, ensure_ascii=False) + finally: + if db: + db.close() + + +async def main_agent_assign_task( + task_id: str, + assigned_agent_id: str, + assigned_agent_name: str = "", +) -> str: + """Main Agent 工具:将任务分配给特定 Agent。""" + from app.core.database import SessionLocal + from app.services.goal_service import update_task, get_task + + db = None + try: + db = SessionLocal() + get_task(db, task_id) # 验证 Task 存在 + update_task( + db=db, + task_id=task_id, + assigned_agent_id=assigned_agent_id, + assigned_agent_name=assigned_agent_name, + ) + return json.dumps({ + "task_id": task_id, + "assigned_agent_id": assigned_agent_id, + "assigned_agent_name": assigned_agent_name, + "message": f"任务已分配给 {assigned_agent_name or assigned_agent_id}", + }, ensure_ascii=False) + except Exception as e: + logger.error(f"main_agent_assign_task 失败: {e}", exc_info=True) + return json.dumps({"error": f"分配任务失败: {e}"}, ensure_ascii=False) + finally: + if db: + db.close() + + +async def main_agent_check_progress(goal_id: str) -> str: + """Main Agent 工具:查看目标下所有任务的执行进度。""" + from app.core.database import SessionLocal + from app.services.goal_service import get_goal, list_tasks, get_goal_task_tree + + db = None + try: + db = SessionLocal() + goal = get_goal(db, goal_id) + tasks = list_tasks(db, goal_id=goal_id, limit=200) + + by_status = {} + for t in tasks: + by_status.setdefault(t.status, 0) + by_status[t.status] += 1 + + task_summaries = [] + for t in tasks: + task_summaries.append({ + "id": t.id, + "title": t.title, + "status": t.status, + "assigned_agent_name": t.assigned_agent_name, + "has_error": bool(t.error_message), + }) + + return json.dumps({ + "goal_id": goal_id, + "goal_title": goal.title, + "goal_status": goal.status, + "progress": goal.progress, + "total_tasks": len(tasks), + "by_status": by_status, + "tasks": task_summaries, + }, ensure_ascii=False) + except Exception as e: + logger.error(f"main_agent_check_progress 失败: {e}", exc_info=True) + return json.dumps({"error": f"检查进度失败: {e}"}, ensure_ascii=False) + finally: + if db: + db.close() + + +async def main_agent_notify_user( + user_id: str, + message: str, + notification_type: str = "info", +) -> str: + """Main Agent 工具:向用户发送通知(站内消息)。""" + from app.core.database import SessionLocal + from app.models.notification import Notification + + db = None + try: + db = SessionLocal() + notif = Notification( + user_id=user_id, + title="Main Agent 通知", + content=message, + type=notification_type, + ref_type="goal", + ref_id="", + is_read=False, + ) + db.add(notif) + db.commit() + logger.info(f"Main Agent 通知已发送: user={user_id}, type={notification_type}, len={len(message)}") + return json.dumps({ + "sent": True, + "notification_type": notification_type, + "message_preview": message[:200], + }, ensure_ascii=False) + except Exception as e: + logger.error(f"main_agent_notify_user 失败: {e}", exc_info=True) + return json.dumps({"error": f"发送通知失败: {e}"}, ensure_ascii=False) + finally: + if db: + db.close() + + +MAIN_AGENT_CREATE_TASK_SCHEMA = { + "type": "function", + "function": { + "name": "create_task", + "description": "创建一个新任务。将目标分解为可执行的子任务,写入数据库。每个任务应具体、可衡量、有明确完成标准。", + "parameters": { + "type": "object", + "properties": { + "goal_id": {"type": "string", "description": "所属目标 ID"}, + "title": {"type": "string", "description": "任务标题"}, + "description": {"type": "string", "description": "任务详细描述"}, + "priority": {"type": "integer", "description": "优先级 1-10,数字越小越优先"}, + "depends_on": {"type": "array", "items": {"type": "string"}, "description": "前置依赖任务 ID 列表"}, + "assigned_agent_id": {"type": "string", "description": "分配的 Agent ID(可为空)"}, + "assigned_agent_name": {"type": "string", "description": "分配的 Agent 名称"}, + }, + "required": ["goal_id", "title"], + }, + }, +} + +MAIN_AGENT_ASSIGN_TASK_SCHEMA = { + "type": "function", + "function": { + "name": "assign_task", + "description": "将任务分配给特定的 Specialist Agent。只有已发布状态的 Agent 才能被分配。", + "parameters": { + "type": "object", + "properties": { + "task_id": {"type": "string", "description": "任务 ID"}, + "assigned_agent_id": {"type": "string", "description": "分配的 Agent ID"}, + "assigned_agent_name": {"type": "string", "description": "Agent 名称"}, + }, + "required": ["task_id", "assigned_agent_id"], + }, + }, +} + +MAIN_AGENT_CHECK_PROGRESS_SCHEMA = { + "type": "function", + "function": { + "name": "check_progress", + "description": "查看目标下所有任务的执行进度,包括各状态统计和每个任务的详细信息。", + "parameters": { + "type": "object", + "properties": { + "goal_id": {"type": "string", "description": "目标 ID"}, + }, + "required": ["goal_id"], + }, + }, +} + +MAIN_AGENT_NOTIFY_USER_SCHEMA = { + "type": "function", + "function": { + "name": "notify_user", + "description": "向用户发送站内通知消息。在目标完成、任务失败、需要审批等关键节点主动通知用户。", + "parameters": { + "type": "object", + "properties": { + "user_id": {"type": "string", "description": "接收通知的用户 ID"}, + "message": {"type": "string", "description": "通知内容(支持 Markdown)"}, + "notification_type": {"type": "string", "description": "通知类型: info / success / warning / error(默认 info)"}, + }, + "required": ["user_id", "message"], + }, + }, +} + + TEXT_TO_SPEECH_SCHEMA = { "type": "function", "function": { diff --git a/backend/app/services/main_agent_service.py b/backend/app/services/main_agent_service.py new file mode 100644 index 0000000..2a2c0ac --- /dev/null +++ b/backend/app/services/main_agent_service.py @@ -0,0 +1,511 @@ +""" +Main Agent 核心服务 — 数字员工的大脑 + +Main Agent 是管理 Goal 的特殊 Agent,它: +1. 理解目标 → LLM 分解为 Task 树 +2. 按优先级/依赖调度 → 选择执行策略(单Agent/编排/工作流) +3. 监控进度 → 重试失败 / 重新分配 / 升级告警 +4. 主动通知用户 → 飞书/站内/邮件 +""" +from __future__ import annotations + +import asyncio +import json +import logging +import uuid +from datetime import datetime +from typing import Any, Dict, List, Optional + +from sqlalchemy.orm import Session +from openai import AsyncOpenAI + +from app.core.config import settings +from app.core.database import SessionLocal +from app.models.goal import Goal +from app.models.task import Task +from app.models.agent import Agent +from app.models.execution import Execution +from app.services import goal_service +from app.services.tool_registry import tool_registry +from app.agent_runtime.core import AgentRuntime +from app.agent_runtime.schemas import AgentConfig, AgentLLMConfig, AgentToolConfig, AgentResult + +logger = logging.getLogger(__name__) + +# ──────────────────── Main Agent 系统提示词 ──────────────────── + +MAIN_AGENT_SYSTEM_PROMPT = """你是天工智能体平台的主控智能体(Main Agent),是一个数字员工团队的"项目经理"。 + +你的职责是: +1. **理解目标**:深入理解用户的目标(Goal),分析可行性、识别风险 +2. **分解任务**:将目标拆解为可执行的子任务(Task),明确依赖关系和优先级 +3. **组建团队**:根据任务特点选择合适的 Agent(你拥有调用其他 Agent 的能力) +4. **监控执行**:追踪每个任务的进度,失败时自动重试或重新分配 +5. **主动汇报**:进度更新、遇到阻塞、任务完成时主动通知用户 + +你有以下专用工具可使用: +- `create_task` — 创建一个子任务(写入数据库) +- `assign_task` — 将任务分配给特定 Agent +- `check_progress` — 查看目标下所有任务的执行进度 +- `invoke_agent` — 调用一个已有的 Specialist Agent 执行具体工作 +- `invoke_workflow` — 调用已有的工作流 +- `notify_user` — 发送通知给用户 +- `web_search` — 搜索互联网获取信息 + +工作原则: +- 分解任务时要具体、可执行、有明确的完成标准 +- 优先并行化无依赖的任务 +- 失败时先尝试一次重试,再考虑重新分配或降级处理 +- 保持透明:让用户知道你在做什么、为什么这样做 +- 遇到必须人工决策的事情(审批),使用 `wait_for_approval` 工具 +""" + + +class MainAgentService: + """Main Agent 服务 — 管理 Goal 的全生命周期""" + + def __init__(self, db: Session): + self.db = db + + # ──────────── LLM 客户端 ──────────── + + def _get_llm_client(self) -> AsyncOpenAI: + return AsyncOpenAI( + api_key=settings.DEEPSEEK_API_KEY or settings.OPENAI_API_KEY, + base_url=settings.DEEPSEEK_BASE_URL or settings.OPENAI_BASE_URL, + timeout=120.0, + ) + + def _get_llm_model(self) -> str: + if settings.DEEPSEEK_API_KEY: + return "deepseek-chat" + return "gpt-4o-mini" + + async def _call_llm(self, messages: List[Dict[str, str]], tools: Optional[List[Dict]] = None) -> Dict[str, Any]: + """调用 LLM,支持工具调用""" + client = self._get_llm_client() + kwargs: Dict[str, Any] = { + "model": self._get_llm_model(), + "messages": messages, + "temperature": 0.5, + "max_tokens": 4096, + } + if tools: + kwargs["tools"] = tools + kwargs["tool_choice"] = "auto" + + response = await client.chat.completions.create(**kwargs) + msg = response.choices[0].message + result = { + "content": msg.content or "", + "role": msg.role, + } + if msg.tool_calls: + result["tool_calls"] = [ + { + "id": tc.id, + "type": tc.type, + "function": { + "name": tc.function.name, + "arguments": tc.function.arguments, + }, + } + for tc in msg.tool_calls + ] + return result + + # ──────────── 目标分解 ──────────── + + async def decompose_goal(self, goal_id: str) -> Goal: + """ + 使用 LLM 将目标分解为 Task 树。 + + 步骤: + 1. 读取 Goal 的 title/description + 2. 构造提示词 → LLM 输出结构化任务列表 + 3. 为每个任务调用 create_task 写入数据库 + 4. 更新 Goal.plan 字段 + """ + goal = goal_service.get_goal(self.db, goal_id) + + # 获取平台上可用的 Agent 列表 + agents = self.db.query(Agent).filter(Agent.status == "published").all() + agent_list = "\n".join( + f" - {a.name} (id={a.id}): {a.description or '无描述'}" + for a in agents[:20] + ) + + messages = [ + {"role": "system", "content": f"""你是天工平台的任务分解专家。用户给出一个目标,你需要将其分解为可执行的子任务。 + +当前平台上可用的 Specialist Agent: +{agent_list if agent_list else '(暂无已发布的 Agent,使用通用工具处理)'} + +请分析目标并以 JSON 格式输出任务分解方案。 +输出格式: +{{ + "analysis": "对目标的分析(1-2句话)", + "tasks": [ + {{ + "title": "任务标题", + "description": "任务详细描述", + "priority": 1-10 (数字越小越优先), + "depends_on": ["依赖的 task_index(0-based),无依赖则为空数组"], + "assigned_agent_id": "匹配的 Agent ID(可为 null)", + "assigned_agent_name": "Agent 名称", + "task_config": {{"orchestration_mode": "agent"}}, + "estimated_effort": "预估工作量说明" + }} + ], + "execution_order": "推荐的执行顺序说明" +}} + +要求: +- 每个任务都要具体、可衡量、有明确完成标准 +- 优先并行化无依赖的任务 +- 尽量将任务分配给匹配的 Specialist Agent +- 任务数量控制在 3-8 个(除非目标特别复杂) +- 仅输出 JSON,不要其他文字"""}, + {"role": "user", "content": f"目标:{goal.title}\n\n描述:{goal.description or '无'}\n\n请分解此目标。"} + ] + + response = await self._call_llm(messages) + content = response.get("content", "") + + # 解析 LLM 输出的 JSON + try: + # 提取 JSON 块(排除 markdown 代码块标记) + json_str = content + if "```json" in json_str: + json_str = json_str.split("```json")[1].split("```")[0] + elif "```" in json_str: + json_str = json_str.split("```")[1].split("```")[0] + plan = json.loads(json_str) + except (json.JSONDecodeError, IndexError, KeyError) as e: + logger.error(f"Failed to parse LLM decomposition output: {e}\nRaw: {content[:500]}") + raise RuntimeError(f"目标分解失败:LLM 输出格式异常") + + tasks_data = plan.get("tasks", []) + if not tasks_data: + raise RuntimeError("目标分解失败:LLM 未输出有效任务列表") + + # 创建任务(先创建所有 task 记录再建立依赖) + created_tasks: List[Task] = [] + id_to_index: Dict[int, str] = {} + + for idx, td in enumerate(tasks_data): + task = goal_service.create_task( + db=self.db, + goal_id=goal_id, + title=td.get("title", f"任务 {idx+1}"), + description=td.get("description", ""), + priority=td.get("priority", 5), + assigned_agent_id=td.get("assigned_agent_id"), + assigned_agent_name=td.get("assigned_agent_name"), + task_config=td.get("task_config", {"orchestration_mode": "agent"}), + ) + created_tasks.append(task) + id_to_index[idx] = task.id + + # 建立依赖关系 + for idx, td in enumerate(tasks_data): + deps = td.get("depends_on", []) + if deps: + dep_ids = [id_to_index[di] for di in deps if di in id_to_index] + if dep_ids: + task = created_tasks[idx] + task.depends_on = dep_ids + self.db.add(task) + self.db.commit() + + # 更新 Goal.plan + goal_service.update_goal( + db=self.db, + goal_id=goal_id, + plan={ + "analysis": plan.get("analysis", ""), + "execution_order": plan.get("execution_order", ""), + "task_count": len(created_tasks), + "decomposed_at": datetime.now().isoformat(), + }, + ) + + logger.info(f"Goal {goal_id} decomposed into {len(created_tasks)} tasks") + return goal_service.get_goal(self.db, goal_id) + + # ──────────── 任务执行 ──────────── + + async def execute_task(self, task_id: str) -> Dict[str, Any]: + """ + 执行单个任务。 + + 根据 task_config.orchestration_mode 选择执行策略: + - "agent" (默认) → 调用单个 Agent + - "workflow" → 调用工作流引擎 + """ + task = goal_service.get_task(self.db, task_id) + + # 检查依赖 + if not goal_service.get_task_dependencies_met(self.db, task_id): + return {"status": "blocked", "reason": "前置依赖未完成"} + + # 更新状态 + goal_service.update_task(self.db, task_id, status="in_progress") + + task_config = task.task_config or {} + mode = task_config.get("orchestration_mode", "agent") + + try: + if mode == "workflow": + result = await self._execute_as_workflow(task) + else: + result = await self._execute_as_agent(task) + + # 更新结果 + goal_service.update_task( + self.db, task_id, + status="completed", + result=result, + ) + return {"status": "completed", "result": result} + + except Exception as e: + err = str(e) + logger.error(f"Task {task_id} execution failed: {err}") + goal_service.update_task( + self.db, task_id, + status="failed", + error_message=err, + ) + return {"status": "failed", "error": err} + + async def _execute_as_agent(self, task: Task) -> Dict[str, Any]: + """以 Agent 模式执行任务""" + agent_id = task.assigned_agent_id + agent = None + if agent_id: + agent = self.db.query(Agent).filter(Agent.id == agent_id).first() + + config = task.task_config or {} + + # 构建 Agent 配置 + agent_config = AgentConfig( + name=f"task_{task.id[:8]}", + system_prompt=config.get("system_prompt", + f"你需要完成以下任务:\n{task.title}\n\n{task.description or ''}\n\n请使用可用工具完成任务。"), + llm=AgentLLMConfig( + provider=config.get("provider", "deepseek"), + model=config.get("model", "deepseek-chat"), + temperature=config.get("temperature", 0.7), + max_iterations=config.get("max_iterations", 15), + ), + tools=AgentToolConfig(include_tools=task_config_tool_whitelist(config)), + ) + + runtime = AgentRuntime(agent_config) + task_input = config.get("input_data", {}) + user_input = task_input.get("user_input", + f"请完成以下任务:{task.title}\n\n{task.description or ''}") + + result: AgentResult = await runtime.run(user_input) + + return { + "content": result.content, + "success": result.success, + "iterations": result.iterations_used, + "tool_calls": result.tool_calls_made, + "steps": [s.model_dump() for s in result.steps[-5:]] if result.steps else [], + } + + async def _execute_as_workflow(self, task: Task) -> Dict[str, Any]: + """以工作流模式执行任务""" + config = task.task_config or {} + workflow_id = config.get("workflow_id") + if not workflow_id: + raise RuntimeError("工作流模式缺少 workflow_id") + + from app.models.workflow import Workflow + wf = self.db.query(Workflow).filter(Workflow.id == workflow_id).first() + if not wf: + raise RuntimeError(f"工作流 {workflow_id} 不存在") + + workflow_data = {"nodes": wf.nodes, "edges": wf.edges} + input_data = config.get("input_data", {"task_title": task.title, "task_description": task.description}) + + from app.services.workflow_engine import WorkflowEngine + engine = WorkflowEngine(str(workflow_id), workflow_data, db=self.db) + result = await asyncio.to_thread( + lambda: asyncio.run(engine.execute(input_data)) + ) + return {"output": result} + + # ──────────── 进度监控 ──────────── + + async def monitor_progress(self, goal_id: str) -> Dict[str, Any]: + """检查目标所有任务的进度,更新 Goal.progress""" + tasks = goal_service.list_tasks(self.db, goal_id=goal_id, limit=200) + goal = goal_service.update_goal_progress(self.db, goal_id) + + by_status = {} + for t in tasks: + by_status.setdefault(t.status, 0) + by_status[t.status] += 1 + + # 检查是否有失败任务需要处理 + failed_tasks = [t for t in tasks if t.status == "failed"] + stuck_tasks = [t for t in tasks if t.status == "in_progress" + and t.started_at and (datetime.now() - t.started_at).total_seconds() > 3600] + + return { + "goal_id": goal_id, + "progress": goal.progress, + "total_tasks": len(tasks), + "by_status": by_status, + "failed_count": len(failed_tasks), + "stuck_count": len(stuck_tasks), + } + + async def handle_task_failure(self, task_id: str) -> Dict[str, Any]: + """ + 处理任务失败:尝试重试 → 重新分配 → 升级通知 + """ + task = goal_service.get_task(self.db, task_id) + if task.status != "failed": + return {"status": "skipped", "reason": f"任务状态为 {task.status},无需处理"} + + # 重试一次 + logger.info(f"Retrying failed task {task_id}") + result = await self.execute_task(task_id) + + if result.get("status") == "completed": + return {"status": "retry_success", "task_id": task_id} + + # 重新分配:尝试找其他匹配的 Agent + goal = goal_service.get_goal(self.db, task.goal_id) + other_agents = self.db.query(Agent).filter( + Agent.id != task.assigned_agent_id, + Agent.status == "published", + ).limit(5).all() + + if other_agents: + new_agent = other_agents[0] + goal_service.update_task( + self.db, task_id, + status="pending", + error_message=None, + assigned_agent_id=new_agent.id, + assigned_agent_name=new_agent.name, + ) + logger.info(f"Task {task_id} reassigned to {new_agent.name}") + return {"status": "reassigned", "task_id": task_id, "new_agent": new_agent.name} + + # 无法自动恢复:记录错误消息 + goal_service.update_task( + self.db, task_id, + error_message=f"{task.error_message or ''} | 自动重试和重新分配均失败,需人工介入", + ) + logger.warning(f"Task {task_id} needs manual intervention") + return {"status": "needs_manual_intervention", "task_id": task_id} + + # ──────────── 自主循环 ──────────── + + async def autonomy_tick(self, goal_id: str) -> Dict[str, Any]: + """ + 自主循环单次心跳: + 1. 检查进度 + 2. 找出可执行的任务(依赖已满足 + pending 状态) + 3. 执行一个任务 + 4. 处理失败任务 + 5. 检查目标是否完成 + 6. 通知用户 + """ + goal = goal_service.get_goal(self.db, goal_id) + if goal.status != "active": + return {"status": "skipped", "reason": f"Goal status is {goal.status}"} + + progress = await self.monitor_progress(goal_id) + + # 先处理失败任务 + for _ in range(progress["failed_count"]): + failed = self.db.query(Task).filter( + Task.goal_id == goal_id, Task.status == "failed" + ).first() + if failed: + await self.handle_task_failure(failed.id) + + # 找可执行的任务 + ready_tasks = self._find_ready_tasks(goal_id) + if ready_tasks: + task = ready_tasks[0] # 按优先级选择第一个 + result = await self.execute_task(task.id) + return {"status": "executed", "task_id": task.id, "result": result} + + # 检查是否全部完成 + progress = await self.monitor_progress(goal_id) + if progress.get("by_status", {}).get("completed", 0) == progress["total_tasks"]: + goal_service.update_goal(self.db, goal_id, status="completed") + return {"status": "goal_completed", "goal_id": goal_id} + + return {"status": "idle", "goal_id": goal_id, + "progress": progress["progress"], + "pending_review": progress.get("by_status", {}).get("failed", 0) > 0} + + def _find_ready_tasks(self, goal_id: str) -> List[Task]: + """找出所有依赖已满足的 pending 任务,按优先级排序""" + all_tasks = goal_service.list_tasks(self.db, goal_id=goal_id, limit=200) + ready = [] + for t in all_tasks: + if t.status != "pending": + continue + if goal_service.get_task_dependencies_met(self.db, t.id): + ready.append(t) + ready.sort(key=lambda t: t.priority) + return ready + + # ──────────── 入口 ──────────── + + async def start_goal_execution(self, goal_id: str) -> Dict[str, Any]: + """ + 启动 Main Agent 管理目标(完整入口): + 1. 分解目标 + 2. 持续执行直到完成或阻塞 + """ + goal = goal_service.get_goal(self.db, goal_id) + if goal.status not in ("active", "pending"): + goal_service.update_goal(self.db, goal_id, status="active") + + # 如果还没有任务,先分解 + existing_tasks = goal_service.list_tasks(self.db, goal_id=goal_id, limit=1) + if not existing_tasks: + logger.info(f"Decomposing goal {goal_id}") + await self.decompose_goal(goal_id) + + goal.started_at = datetime.now() + self.db.commit() + + # 执行自主循环(最多 10 轮,避免无限循环) + results = [] + for i in range(10): + tick = await self.autonomy_tick(goal_id) + results.append(tick) + if tick.get("status") in ("goal_completed", "idle"): + break + # 等 2 秒再下一次 tick + await asyncio.sleep(2) + + return { + "goal_id": goal_id, + "ticks": len(results), + "status": goal_service.get_goal(self.db, goal_id).status, + "results": results, + } + + +def task_config_tool_whitelist(config: Dict) -> Optional[List[str]]: + """从 task_config 中提取工具白名单,空则返回 None(全部工具可用)""" + tools = config.get("tools") or config.get("include_tools") + if not tools: + return None + if isinstance(tools, list): + return tools + return None diff --git a/backend/app/tasks/goal_tasks.py b/backend/app/tasks/goal_tasks.py new file mode 100644 index 0000000..ed7a0b6 --- /dev/null +++ b/backend/app/tasks/goal_tasks.py @@ -0,0 +1,132 @@ +""" +Goal/Task 异步任务 — Celery 任务定义 + +Main Agent 的目标分解、任务执行、自主循环等重量级操作通过 Celery 异步执行。 +""" +from app.core.tools_bootstrap import ensure_builtin_tools_registered + +ensure_builtin_tools_registered() + +from app.core.celery_app import celery_app +from app.core.database import SessionLocal +from app.models.goal import Goal +from app.models.execution import Execution +from app.services.main_agent_service import MainAgentService +import asyncio +import logging +import time + +logger = logging.getLogger(__name__) + + +@celery_app.task(bind=True) +def decompose_goal_task(self, goal_id: str): + """ + 异步分解目标:使用 LLM 将 Goal 分解为 Task 树。 + + 在 Celery Worker 中执行,避免 API 请求超时。 + """ + db = SessionLocal() + try: + service = MainAgentService(db) + goal = asyncio.run(service.decompose_goal(goal_id)) + + return { + "status": "completed", + "goal_id": goal_id, + "goal_title": goal.title, + } + except Exception as e: + logger.error(f"Goal decomposition failed: {e}", exc_info=True) + db.rollback() + return {"status": "failed", "goal_id": goal_id, "error": str(e)} + finally: + db.close() + + +@celery_app.task(bind=True) +def execute_goal_task(self, goal_id: str): + """ + 异步执行 Goal:启动 Main Agent 管理目标全生命周期。 + + 1. 分解目标(如果尚未分解) + 2. 持续执行 task 直到完成或阻塞 + 3. 更新 Goal 状态 + """ + db = SessionLocal() + start_time = time.time() + try: + goal = db.query(Goal).filter(Goal.id == goal_id).first() + if not goal: + return {"status": "failed", "goal_id": goal_id, "error": "目标不存在"} + + # 更新状态 + goal.status = "active" + db.commit() + + service = MainAgentService(db) + result = asyncio.run(service.start_goal_execution(goal_id)) + + elapsed = int((time.time() - start_time) * 1000) + return { + "status": "completed", + "goal_id": goal_id, + "elapsed_ms": elapsed, + **result, + } + except Exception as e: + logger.error(f"Goal execution failed: {e}", exc_info=True) + goal = db.query(Goal).filter(Goal.id == goal_id).first() + if goal: + goal.status = "failed" + db.commit() + return {"status": "failed", "goal_id": goal_id, "error": str(e)} + finally: + db.close() + + +@celery_app.task(bind=True) +def execute_task_celery(self, task_id: str): + """ + 异步执行单个 Task。 + + Main Agent 创建 Execution 记录后将任务交给 Celery Worker 执行。 + """ + db = SessionLocal() + start_time = time.time() + try: + service = MainAgentService(db) + result = asyncio.run(service.execute_task(task_id)) + + elapsed = int((time.time() - start_time) * 1000) + return { + "status": result.get("status", "completed"), + "task_id": task_id, + "elapsed_ms": elapsed, + **result, + } + except Exception as e: + logger.error(f"Task execution failed: {e}", exc_info=True) + return {"status": "failed", "task_id": task_id, "error": str(e)} + finally: + db.close() + + +@celery_app.task(bind=True) +def autonomy_tick_task(self, goal_id: str): + """ + 自主循环单次心跳:检查进度 → 执行可运行任务 → 处理失败 → 通知。 + + 由 Celery Beat 定时调度(根据 Goal.autonomy_config.check_interval_minutes)。 + """ + db = SessionLocal() + try: + service = MainAgentService(db) + result = asyncio.run(service.autonomy_tick(goal_id)) + logger.info(f"Autonomy tick for goal {goal_id}: {result.get('status', 'unknown')}") + return {"status": "completed", "goal_id": goal_id, **result} + except Exception as e: + logger.error(f"Autonomy tick failed for goal {goal_id}: {e}", exc_info=True) + return {"status": "failed", "goal_id": goal_id, "error": str(e)} + finally: + db.close()