feat: add Main Agent core service, tools, and Celery tasks (Phase 2)

数字员工大脑 — Main Agent 核心实现:
- MainAgentService: 目标分解(LLM)、任务调度、进度监控、失败重试、自主循环
- 4个 Main Agent 专有工具: create_task / assign_task / check_progress / notify_user
- Celery 异步任务: decompose_goal / execute_goal / execute_task / autonomy_tick
- Goal API 增强: decompose / execute-async / replan 端点

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
renjianbo
2026-05-08 19:58:53 +08:00
parent 02d7cf8f62
commit 926ec6c0a1
5 changed files with 980 additions and 2 deletions

View File

@@ -1,7 +1,7 @@
"""
Goal API — 目标管理接口
"""
from fastapi import APIRouter, Depends, Query
from fastapi import APIRouter, Depends, Query, HTTPException
from sqlalchemy.orm import Session
from pydantic import BaseModel, Field
from typing import List, Optional, Dict, Any
@@ -202,3 +202,93 @@ def get_goal_task_tree(
):
"""获取目标的任务树"""
return goal_service.get_goal_task_tree(db, goal_id)
class DecomposeResponse(BaseModel):
goal_id: str
task_count: int
message: str
@router.post("/{goal_id}/decompose", response_model=DecomposeResponse)
def decompose_goal(
goal_id: str,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""触发目标分解(同步调用 LLM 拆解为 Task 树)"""
import asyncio
from app.services.main_agent_service import MainAgentService
service = MainAgentService(db)
try:
# 先验证 Goal 存在
goal_service.get_goal(db, goal_id)
goal = asyncio.run(service.decompose_goal(goal_id))
tasks = goal_service.list_tasks(db, goal_id=goal_id, limit=200)
return {
"goal_id": goal_id,
"task_count": len(tasks),
"message": f"目标已分解为 {len(tasks)} 个任务",
}
except Exception as e:
logger.error(f"Goal decompose failed: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"目标分解失败: {e}")
class ExecuteAsyncResponse(BaseModel):
goal_id: str
celery_task_id: Optional[str]
message: str
@router.post("/{goal_id}/execute-async", response_model=ExecuteAsyncResponse)
def execute_goal_async(
goal_id: str,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""异步执行目标(通过 Celery Worker 执行,适合长时间运行的目标)"""
from app.tasks.goal_tasks import execute_goal_task
goal_service.get_goal(db, goal_id)
goal_service.update_goal(db, goal_id, status="active")
task = execute_goal_task.delay(goal_id)
logger.info(f"Goal {goal_id} dispatched to Celery: {task.id}")
return {
"goal_id": goal_id,
"celery_task_id": task.id,
"message": "目标已提交异步执行,可通过 goal 状态和 task 列表追踪进度",
}
@router.post("/{goal_id}/replan", response_model=DecomposeResponse)
def replan_goal(
goal_id: str,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""重新规划目标(清除未完成任务后重新分解)"""
import asyncio
from app.services.main_agent_service import MainAgentService
from app.models.task import Task
goal = goal_service.get_goal(db, goal_id)
# 清除 pending 和 failed 任务(保留已完成的)
db.query(Task).filter(
Task.goal_id == goal_id,
Task.status.in_(["pending", "failed", "cancelled"]),
).delete()
db.commit()
service = MainAgentService(db)
goal = asyncio.run(service.decompose_goal(goal_id))
tasks = goal_service.list_tasks(db, goal_id=goal_id, limit=200)
return {
"goal_id": goal_id,
"task_count": len(tasks),
"message": f"目标已重新规划,生成 {len(tasks)} 个新任务",
}

View File

@@ -8,7 +8,7 @@ logger = logging.getLogger(__name__)
_registered = False
_EXPECTED_BUILTIN = 43
_EXPECTED_BUILTIN = 47
def ensure_builtin_tools_registered() -> None:
@@ -61,6 +61,10 @@ def ensure_builtin_tools_registered() -> None:
image_vision_tool,
speech_to_text_tool,
text_to_speech_tool,
main_agent_create_task,
main_agent_assign_task,
main_agent_check_progress,
main_agent_notify_user,
HTTP_REQUEST_SCHEMA,
FILE_READ_SCHEMA,
FILE_WRITE_SCHEMA,
@@ -104,6 +108,10 @@ def ensure_builtin_tools_registered() -> None:
IMAGE_VISION_SCHEMA,
SPEECH_TO_TEXT_SCHEMA,
TEXT_TO_SPEECH_SCHEMA,
MAIN_AGENT_CREATE_TASK_SCHEMA,
MAIN_AGENT_ASSIGN_TASK_SCHEMA,
MAIN_AGENT_CHECK_PROGRESS_SCHEMA,
MAIN_AGENT_NOTIFY_USER_SCHEMA,
)
tool_registry.register_builtin_tool("http_request", http_request_tool, HTTP_REQUEST_SCHEMA)
@@ -149,6 +157,10 @@ def ensure_builtin_tools_registered() -> None:
tool_registry.register_builtin_tool("image_vision", image_vision_tool, IMAGE_VISION_SCHEMA)
tool_registry.register_builtin_tool("speech_to_text", speech_to_text_tool, SPEECH_TO_TEXT_SCHEMA)
tool_registry.register_builtin_tool("text_to_speech", text_to_speech_tool, TEXT_TO_SPEECH_SCHEMA)
tool_registry.register_builtin_tool("create_task", main_agent_create_task, MAIN_AGENT_CREATE_TASK_SCHEMA)
tool_registry.register_builtin_tool("assign_task", main_agent_assign_task, MAIN_AGENT_ASSIGN_TASK_SCHEMA)
tool_registry.register_builtin_tool("check_progress", main_agent_check_progress, MAIN_AGENT_CHECK_PROGRESS_SCHEMA)
tool_registry.register_builtin_tool("notify_user", main_agent_notify_user, MAIN_AGENT_NOTIFY_USER_SCHEMA)
_registered = True
n = tool_registry.builtin_tool_count()

View File

@@ -5161,6 +5161,239 @@ SPEECH_TO_TEXT_SCHEMA = {
},
}
# ═══════════════════════════════════════════════════════════════
# Main Agent 专有工具Goal/Task 管理
# ═══════════════════════════════════════════════════════════════
async def main_agent_create_task(
goal_id: str,
title: str,
description: str = "",
priority: int = 5,
depends_on: Optional[List[str]] = None,
assigned_agent_id: Optional[str] = None,
assigned_agent_name: Optional[str] = None,
) -> str:
"""Main Agent 工具:创建一个子任务。
将目标分解为可执行的子任务,写入数据库。
"""
from app.core.database import SessionLocal
from app.services.goal_service import create_task, get_goal
db = None
try:
db = SessionLocal()
get_goal(db, goal_id) # 验证 Goal 存在
task = create_task(
db=db,
goal_id=goal_id,
title=title,
description=description,
priority=priority,
depends_on=depends_on or [],
assigned_agent_id=assigned_agent_id,
assigned_agent_name=assigned_agent_name,
)
return json.dumps({
"task_id": task.id,
"title": task.title,
"status": task.status,
"message": f"任务 '{title}' 创建成功",
}, ensure_ascii=False)
except Exception as e:
logger.error(f"main_agent_create_task 失败: {e}", exc_info=True)
return json.dumps({"error": f"创建任务失败: {e}"}, ensure_ascii=False)
finally:
if db:
db.close()
async def main_agent_assign_task(
task_id: str,
assigned_agent_id: str,
assigned_agent_name: str = "",
) -> str:
"""Main Agent 工具:将任务分配给特定 Agent。"""
from app.core.database import SessionLocal
from app.services.goal_service import update_task, get_task
db = None
try:
db = SessionLocal()
get_task(db, task_id) # 验证 Task 存在
update_task(
db=db,
task_id=task_id,
assigned_agent_id=assigned_agent_id,
assigned_agent_name=assigned_agent_name,
)
return json.dumps({
"task_id": task_id,
"assigned_agent_id": assigned_agent_id,
"assigned_agent_name": assigned_agent_name,
"message": f"任务已分配给 {assigned_agent_name or assigned_agent_id}",
}, ensure_ascii=False)
except Exception as e:
logger.error(f"main_agent_assign_task 失败: {e}", exc_info=True)
return json.dumps({"error": f"分配任务失败: {e}"}, ensure_ascii=False)
finally:
if db:
db.close()
async def main_agent_check_progress(goal_id: str) -> str:
"""Main Agent 工具:查看目标下所有任务的执行进度。"""
from app.core.database import SessionLocal
from app.services.goal_service import get_goal, list_tasks, get_goal_task_tree
db = None
try:
db = SessionLocal()
goal = get_goal(db, goal_id)
tasks = list_tasks(db, goal_id=goal_id, limit=200)
by_status = {}
for t in tasks:
by_status.setdefault(t.status, 0)
by_status[t.status] += 1
task_summaries = []
for t in tasks:
task_summaries.append({
"id": t.id,
"title": t.title,
"status": t.status,
"assigned_agent_name": t.assigned_agent_name,
"has_error": bool(t.error_message),
})
return json.dumps({
"goal_id": goal_id,
"goal_title": goal.title,
"goal_status": goal.status,
"progress": goal.progress,
"total_tasks": len(tasks),
"by_status": by_status,
"tasks": task_summaries,
}, ensure_ascii=False)
except Exception as e:
logger.error(f"main_agent_check_progress 失败: {e}", exc_info=True)
return json.dumps({"error": f"检查进度失败: {e}"}, ensure_ascii=False)
finally:
if db:
db.close()
async def main_agent_notify_user(
user_id: str,
message: str,
notification_type: str = "info",
) -> str:
"""Main Agent 工具:向用户发送通知(站内消息)。"""
from app.core.database import SessionLocal
from app.models.notification import Notification
db = None
try:
db = SessionLocal()
notif = Notification(
user_id=user_id,
title="Main Agent 通知",
content=message,
type=notification_type,
ref_type="goal",
ref_id="",
is_read=False,
)
db.add(notif)
db.commit()
logger.info(f"Main Agent 通知已发送: user={user_id}, type={notification_type}, len={len(message)}")
return json.dumps({
"sent": True,
"notification_type": notification_type,
"message_preview": message[:200],
}, ensure_ascii=False)
except Exception as e:
logger.error(f"main_agent_notify_user 失败: {e}", exc_info=True)
return json.dumps({"error": f"发送通知失败: {e}"}, ensure_ascii=False)
finally:
if db:
db.close()
MAIN_AGENT_CREATE_TASK_SCHEMA = {
"type": "function",
"function": {
"name": "create_task",
"description": "创建一个新任务。将目标分解为可执行的子任务,写入数据库。每个任务应具体、可衡量、有明确完成标准。",
"parameters": {
"type": "object",
"properties": {
"goal_id": {"type": "string", "description": "所属目标 ID"},
"title": {"type": "string", "description": "任务标题"},
"description": {"type": "string", "description": "任务详细描述"},
"priority": {"type": "integer", "description": "优先级 1-10数字越小越优先"},
"depends_on": {"type": "array", "items": {"type": "string"}, "description": "前置依赖任务 ID 列表"},
"assigned_agent_id": {"type": "string", "description": "分配的 Agent ID可为空"},
"assigned_agent_name": {"type": "string", "description": "分配的 Agent 名称"},
},
"required": ["goal_id", "title"],
},
},
}
MAIN_AGENT_ASSIGN_TASK_SCHEMA = {
"type": "function",
"function": {
"name": "assign_task",
"description": "将任务分配给特定的 Specialist Agent。只有已发布状态的 Agent 才能被分配。",
"parameters": {
"type": "object",
"properties": {
"task_id": {"type": "string", "description": "任务 ID"},
"assigned_agent_id": {"type": "string", "description": "分配的 Agent ID"},
"assigned_agent_name": {"type": "string", "description": "Agent 名称"},
},
"required": ["task_id", "assigned_agent_id"],
},
},
}
MAIN_AGENT_CHECK_PROGRESS_SCHEMA = {
"type": "function",
"function": {
"name": "check_progress",
"description": "查看目标下所有任务的执行进度,包括各状态统计和每个任务的详细信息。",
"parameters": {
"type": "object",
"properties": {
"goal_id": {"type": "string", "description": "目标 ID"},
},
"required": ["goal_id"],
},
},
}
MAIN_AGENT_NOTIFY_USER_SCHEMA = {
"type": "function",
"function": {
"name": "notify_user",
"description": "向用户发送站内通知消息。在目标完成、任务失败、需要审批等关键节点主动通知用户。",
"parameters": {
"type": "object",
"properties": {
"user_id": {"type": "string", "description": "接收通知的用户 ID"},
"message": {"type": "string", "description": "通知内容(支持 Markdown"},
"notification_type": {"type": "string", "description": "通知类型: info / success / warning / error默认 info"},
},
"required": ["user_id", "message"],
},
},
}
TEXT_TO_SPEECH_SCHEMA = {
"type": "function",
"function": {

View File

@@ -0,0 +1,511 @@
"""
Main Agent 核心服务 — 数字员工的大脑
Main Agent 是管理 Goal 的特殊 Agent它:
1. 理解目标 → LLM 分解为 Task 树
2. 按优先级/依赖调度 → 选择执行策略单Agent/编排/工作流)
3. 监控进度 → 重试失败 / 重新分配 / 升级告警
4. 主动通知用户 → 飞书/站内/邮件
"""
from __future__ import annotations
import asyncio
import json
import logging
import uuid
from datetime import datetime
from typing import Any, Dict, List, Optional
from sqlalchemy.orm import Session
from openai import AsyncOpenAI
from app.core.config import settings
from app.core.database import SessionLocal
from app.models.goal import Goal
from app.models.task import Task
from app.models.agent import Agent
from app.models.execution import Execution
from app.services import goal_service
from app.services.tool_registry import tool_registry
from app.agent_runtime.core import AgentRuntime
from app.agent_runtime.schemas import AgentConfig, AgentLLMConfig, AgentToolConfig, AgentResult
logger = logging.getLogger(__name__)
# ──────────────────── Main Agent 系统提示词 ────────────────────
MAIN_AGENT_SYSTEM_PROMPT = """你是天工智能体平台的主控智能体Main Agent是一个数字员工团队的"项目经理"
你的职责是:
1. **理解目标**深入理解用户的目标Goal分析可行性、识别风险
2. **分解任务**将目标拆解为可执行的子任务Task明确依赖关系和优先级
3. **组建团队**:根据任务特点选择合适的 Agent你拥有调用其他 Agent 的能力)
4. **监控执行**:追踪每个任务的进度,失败时自动重试或重新分配
5. **主动汇报**:进度更新、遇到阻塞、任务完成时主动通知用户
你有以下专用工具可使用:
- `create_task` — 创建一个子任务(写入数据库)
- `assign_task` — 将任务分配给特定 Agent
- `check_progress` — 查看目标下所有任务的执行进度
- `invoke_agent` — 调用一个已有的 Specialist Agent 执行具体工作
- `invoke_workflow` — 调用已有的工作流
- `notify_user` — 发送通知给用户
- `web_search` — 搜索互联网获取信息
工作原则:
- 分解任务时要具体、可执行、有明确的完成标准
- 优先并行化无依赖的任务
- 失败时先尝试一次重试,再考虑重新分配或降级处理
- 保持透明:让用户知道你在做什么、为什么这样做
- 遇到必须人工决策的事情(审批),使用 `wait_for_approval` 工具
"""
class MainAgentService:
"""Main Agent 服务 — 管理 Goal 的全生命周期"""
def __init__(self, db: Session):
self.db = db
# ──────────── LLM 客户端 ────────────
def _get_llm_client(self) -> AsyncOpenAI:
return AsyncOpenAI(
api_key=settings.DEEPSEEK_API_KEY or settings.OPENAI_API_KEY,
base_url=settings.DEEPSEEK_BASE_URL or settings.OPENAI_BASE_URL,
timeout=120.0,
)
def _get_llm_model(self) -> str:
if settings.DEEPSEEK_API_KEY:
return "deepseek-chat"
return "gpt-4o-mini"
async def _call_llm(self, messages: List[Dict[str, str]], tools: Optional[List[Dict]] = None) -> Dict[str, Any]:
"""调用 LLM支持工具调用"""
client = self._get_llm_client()
kwargs: Dict[str, Any] = {
"model": self._get_llm_model(),
"messages": messages,
"temperature": 0.5,
"max_tokens": 4096,
}
if tools:
kwargs["tools"] = tools
kwargs["tool_choice"] = "auto"
response = await client.chat.completions.create(**kwargs)
msg = response.choices[0].message
result = {
"content": msg.content or "",
"role": msg.role,
}
if msg.tool_calls:
result["tool_calls"] = [
{
"id": tc.id,
"type": tc.type,
"function": {
"name": tc.function.name,
"arguments": tc.function.arguments,
},
}
for tc in msg.tool_calls
]
return result
# ──────────── 目标分解 ────────────
async def decompose_goal(self, goal_id: str) -> Goal:
"""
使用 LLM 将目标分解为 Task 树。
步骤:
1. 读取 Goal 的 title/description
2. 构造提示词 → LLM 输出结构化任务列表
3. 为每个任务调用 create_task 写入数据库
4. 更新 Goal.plan 字段
"""
goal = goal_service.get_goal(self.db, goal_id)
# 获取平台上可用的 Agent 列表
agents = self.db.query(Agent).filter(Agent.status == "published").all()
agent_list = "\n".join(
f" - {a.name} (id={a.id}): {a.description or '无描述'}"
for a in agents[:20]
)
messages = [
{"role": "system", "content": f"""你是天工平台的任务分解专家。用户给出一个目标,你需要将其分解为可执行的子任务。
当前平台上可用的 Specialist Agent
{agent_list if agent_list else '(暂无已发布的 Agent使用通用工具处理'}
请分析目标并以 JSON 格式输出任务分解方案。
输出格式:
{{
"analysis": "对目标的分析1-2句话",
"tasks": [
{{
"title": "任务标题",
"description": "任务详细描述",
"priority": 1-10 (数字越小越优先),
"depends_on": ["依赖的 task_index0-based无依赖则为空数组"],
"assigned_agent_id": "匹配的 Agent ID可为 null",
"assigned_agent_name": "Agent 名称",
"task_config": {{"orchestration_mode": "agent"}},
"estimated_effort": "预估工作量说明"
}}
],
"execution_order": "推荐的执行顺序说明"
}}
要求:
- 每个任务都要具体、可衡量、有明确完成标准
- 优先并行化无依赖的任务
- 尽量将任务分配给匹配的 Specialist Agent
- 任务数量控制在 3-8 个(除非目标特别复杂)
- 仅输出 JSON不要其他文字"""},
{"role": "user", "content": f"目标:{goal.title}\n\n描述:{goal.description or ''}\n\n请分解此目标。"}
]
response = await self._call_llm(messages)
content = response.get("content", "")
# 解析 LLM 输出的 JSON
try:
# 提取 JSON 块(排除 markdown 代码块标记)
json_str = content
if "```json" in json_str:
json_str = json_str.split("```json")[1].split("```")[0]
elif "```" in json_str:
json_str = json_str.split("```")[1].split("```")[0]
plan = json.loads(json_str)
except (json.JSONDecodeError, IndexError, KeyError) as e:
logger.error(f"Failed to parse LLM decomposition output: {e}\nRaw: {content[:500]}")
raise RuntimeError(f"目标分解失败LLM 输出格式异常")
tasks_data = plan.get("tasks", [])
if not tasks_data:
raise RuntimeError("目标分解失败LLM 未输出有效任务列表")
# 创建任务(先创建所有 task 记录再建立依赖)
created_tasks: List[Task] = []
id_to_index: Dict[int, str] = {}
for idx, td in enumerate(tasks_data):
task = goal_service.create_task(
db=self.db,
goal_id=goal_id,
title=td.get("title", f"任务 {idx+1}"),
description=td.get("description", ""),
priority=td.get("priority", 5),
assigned_agent_id=td.get("assigned_agent_id"),
assigned_agent_name=td.get("assigned_agent_name"),
task_config=td.get("task_config", {"orchestration_mode": "agent"}),
)
created_tasks.append(task)
id_to_index[idx] = task.id
# 建立依赖关系
for idx, td in enumerate(tasks_data):
deps = td.get("depends_on", [])
if deps:
dep_ids = [id_to_index[di] for di in deps if di in id_to_index]
if dep_ids:
task = created_tasks[idx]
task.depends_on = dep_ids
self.db.add(task)
self.db.commit()
# 更新 Goal.plan
goal_service.update_goal(
db=self.db,
goal_id=goal_id,
plan={
"analysis": plan.get("analysis", ""),
"execution_order": plan.get("execution_order", ""),
"task_count": len(created_tasks),
"decomposed_at": datetime.now().isoformat(),
},
)
logger.info(f"Goal {goal_id} decomposed into {len(created_tasks)} tasks")
return goal_service.get_goal(self.db, goal_id)
# ──────────── 任务执行 ────────────
async def execute_task(self, task_id: str) -> Dict[str, Any]:
"""
执行单个任务。
根据 task_config.orchestration_mode 选择执行策略:
- "agent" (默认) → 调用单个 Agent
- "workflow" → 调用工作流引擎
"""
task = goal_service.get_task(self.db, task_id)
# 检查依赖
if not goal_service.get_task_dependencies_met(self.db, task_id):
return {"status": "blocked", "reason": "前置依赖未完成"}
# 更新状态
goal_service.update_task(self.db, task_id, status="in_progress")
task_config = task.task_config or {}
mode = task_config.get("orchestration_mode", "agent")
try:
if mode == "workflow":
result = await self._execute_as_workflow(task)
else:
result = await self._execute_as_agent(task)
# 更新结果
goal_service.update_task(
self.db, task_id,
status="completed",
result=result,
)
return {"status": "completed", "result": result}
except Exception as e:
err = str(e)
logger.error(f"Task {task_id} execution failed: {err}")
goal_service.update_task(
self.db, task_id,
status="failed",
error_message=err,
)
return {"status": "failed", "error": err}
async def _execute_as_agent(self, task: Task) -> Dict[str, Any]:
"""以 Agent 模式执行任务"""
agent_id = task.assigned_agent_id
agent = None
if agent_id:
agent = self.db.query(Agent).filter(Agent.id == agent_id).first()
config = task.task_config or {}
# 构建 Agent 配置
agent_config = AgentConfig(
name=f"task_{task.id[:8]}",
system_prompt=config.get("system_prompt",
f"你需要完成以下任务:\n{task.title}\n\n{task.description or ''}\n\n请使用可用工具完成任务。"),
llm=AgentLLMConfig(
provider=config.get("provider", "deepseek"),
model=config.get("model", "deepseek-chat"),
temperature=config.get("temperature", 0.7),
max_iterations=config.get("max_iterations", 15),
),
tools=AgentToolConfig(include_tools=task_config_tool_whitelist(config)),
)
runtime = AgentRuntime(agent_config)
task_input = config.get("input_data", {})
user_input = task_input.get("user_input",
f"请完成以下任务:{task.title}\n\n{task.description or ''}")
result: AgentResult = await runtime.run(user_input)
return {
"content": result.content,
"success": result.success,
"iterations": result.iterations_used,
"tool_calls": result.tool_calls_made,
"steps": [s.model_dump() for s in result.steps[-5:]] if result.steps else [],
}
async def _execute_as_workflow(self, task: Task) -> Dict[str, Any]:
"""以工作流模式执行任务"""
config = task.task_config or {}
workflow_id = config.get("workflow_id")
if not workflow_id:
raise RuntimeError("工作流模式缺少 workflow_id")
from app.models.workflow import Workflow
wf = self.db.query(Workflow).filter(Workflow.id == workflow_id).first()
if not wf:
raise RuntimeError(f"工作流 {workflow_id} 不存在")
workflow_data = {"nodes": wf.nodes, "edges": wf.edges}
input_data = config.get("input_data", {"task_title": task.title, "task_description": task.description})
from app.services.workflow_engine import WorkflowEngine
engine = WorkflowEngine(str(workflow_id), workflow_data, db=self.db)
result = await asyncio.to_thread(
lambda: asyncio.run(engine.execute(input_data))
)
return {"output": result}
# ──────────── 进度监控 ────────────
async def monitor_progress(self, goal_id: str) -> Dict[str, Any]:
"""检查目标所有任务的进度,更新 Goal.progress"""
tasks = goal_service.list_tasks(self.db, goal_id=goal_id, limit=200)
goal = goal_service.update_goal_progress(self.db, goal_id)
by_status = {}
for t in tasks:
by_status.setdefault(t.status, 0)
by_status[t.status] += 1
# 检查是否有失败任务需要处理
failed_tasks = [t for t in tasks if t.status == "failed"]
stuck_tasks = [t for t in tasks if t.status == "in_progress"
and t.started_at and (datetime.now() - t.started_at).total_seconds() > 3600]
return {
"goal_id": goal_id,
"progress": goal.progress,
"total_tasks": len(tasks),
"by_status": by_status,
"failed_count": len(failed_tasks),
"stuck_count": len(stuck_tasks),
}
async def handle_task_failure(self, task_id: str) -> Dict[str, Any]:
"""
处理任务失败:尝试重试 → 重新分配 → 升级通知
"""
task = goal_service.get_task(self.db, task_id)
if task.status != "failed":
return {"status": "skipped", "reason": f"任务状态为 {task.status},无需处理"}
# 重试一次
logger.info(f"Retrying failed task {task_id}")
result = await self.execute_task(task_id)
if result.get("status") == "completed":
return {"status": "retry_success", "task_id": task_id}
# 重新分配:尝试找其他匹配的 Agent
goal = goal_service.get_goal(self.db, task.goal_id)
other_agents = self.db.query(Agent).filter(
Agent.id != task.assigned_agent_id,
Agent.status == "published",
).limit(5).all()
if other_agents:
new_agent = other_agents[0]
goal_service.update_task(
self.db, task_id,
status="pending",
error_message=None,
assigned_agent_id=new_agent.id,
assigned_agent_name=new_agent.name,
)
logger.info(f"Task {task_id} reassigned to {new_agent.name}")
return {"status": "reassigned", "task_id": task_id, "new_agent": new_agent.name}
# 无法自动恢复:记录错误消息
goal_service.update_task(
self.db, task_id,
error_message=f"{task.error_message or ''} | 自动重试和重新分配均失败,需人工介入",
)
logger.warning(f"Task {task_id} needs manual intervention")
return {"status": "needs_manual_intervention", "task_id": task_id}
# ──────────── 自主循环 ────────────
async def autonomy_tick(self, goal_id: str) -> Dict[str, Any]:
"""
自主循环单次心跳:
1. 检查进度
2. 找出可执行的任务(依赖已满足 + pending 状态)
3. 执行一个任务
4. 处理失败任务
5. 检查目标是否完成
6. 通知用户
"""
goal = goal_service.get_goal(self.db, goal_id)
if goal.status != "active":
return {"status": "skipped", "reason": f"Goal status is {goal.status}"}
progress = await self.monitor_progress(goal_id)
# 先处理失败任务
for _ in range(progress["failed_count"]):
failed = self.db.query(Task).filter(
Task.goal_id == goal_id, Task.status == "failed"
).first()
if failed:
await self.handle_task_failure(failed.id)
# 找可执行的任务
ready_tasks = self._find_ready_tasks(goal_id)
if ready_tasks:
task = ready_tasks[0] # 按优先级选择第一个
result = await self.execute_task(task.id)
return {"status": "executed", "task_id": task.id, "result": result}
# 检查是否全部完成
progress = await self.monitor_progress(goal_id)
if progress.get("by_status", {}).get("completed", 0) == progress["total_tasks"]:
goal_service.update_goal(self.db, goal_id, status="completed")
return {"status": "goal_completed", "goal_id": goal_id}
return {"status": "idle", "goal_id": goal_id,
"progress": progress["progress"],
"pending_review": progress.get("by_status", {}).get("failed", 0) > 0}
def _find_ready_tasks(self, goal_id: str) -> List[Task]:
"""找出所有依赖已满足的 pending 任务,按优先级排序"""
all_tasks = goal_service.list_tasks(self.db, goal_id=goal_id, limit=200)
ready = []
for t in all_tasks:
if t.status != "pending":
continue
if goal_service.get_task_dependencies_met(self.db, t.id):
ready.append(t)
ready.sort(key=lambda t: t.priority)
return ready
# ──────────── 入口 ────────────
async def start_goal_execution(self, goal_id: str) -> Dict[str, Any]:
"""
启动 Main Agent 管理目标(完整入口):
1. 分解目标
2. 持续执行直到完成或阻塞
"""
goal = goal_service.get_goal(self.db, goal_id)
if goal.status not in ("active", "pending"):
goal_service.update_goal(self.db, goal_id, status="active")
# 如果还没有任务,先分解
existing_tasks = goal_service.list_tasks(self.db, goal_id=goal_id, limit=1)
if not existing_tasks:
logger.info(f"Decomposing goal {goal_id}")
await self.decompose_goal(goal_id)
goal.started_at = datetime.now()
self.db.commit()
# 执行自主循环(最多 10 轮,避免无限循环)
results = []
for i in range(10):
tick = await self.autonomy_tick(goal_id)
results.append(tick)
if tick.get("status") in ("goal_completed", "idle"):
break
# 等 2 秒再下一次 tick
await asyncio.sleep(2)
return {
"goal_id": goal_id,
"ticks": len(results),
"status": goal_service.get_goal(self.db, goal_id).status,
"results": results,
}
def task_config_tool_whitelist(config: Dict) -> Optional[List[str]]:
"""从 task_config 中提取工具白名单,空则返回 None全部工具可用"""
tools = config.get("tools") or config.get("include_tools")
if not tools:
return None
if isinstance(tools, list):
return tools
return None

View File

@@ -0,0 +1,132 @@
"""
Goal/Task 异步任务 — Celery 任务定义
Main Agent 的目标分解、任务执行、自主循环等重量级操作通过 Celery 异步执行。
"""
from app.core.tools_bootstrap import ensure_builtin_tools_registered
ensure_builtin_tools_registered()
from app.core.celery_app import celery_app
from app.core.database import SessionLocal
from app.models.goal import Goal
from app.models.execution import Execution
from app.services.main_agent_service import MainAgentService
import asyncio
import logging
import time
logger = logging.getLogger(__name__)
@celery_app.task(bind=True)
def decompose_goal_task(self, goal_id: str):
"""
异步分解目标:使用 LLM 将 Goal 分解为 Task 树。
在 Celery Worker 中执行,避免 API 请求超时。
"""
db = SessionLocal()
try:
service = MainAgentService(db)
goal = asyncio.run(service.decompose_goal(goal_id))
return {
"status": "completed",
"goal_id": goal_id,
"goal_title": goal.title,
}
except Exception as e:
logger.error(f"Goal decomposition failed: {e}", exc_info=True)
db.rollback()
return {"status": "failed", "goal_id": goal_id, "error": str(e)}
finally:
db.close()
@celery_app.task(bind=True)
def execute_goal_task(self, goal_id: str):
"""
异步执行 Goal启动 Main Agent 管理目标全生命周期。
1. 分解目标(如果尚未分解)
2. 持续执行 task 直到完成或阻塞
3. 更新 Goal 状态
"""
db = SessionLocal()
start_time = time.time()
try:
goal = db.query(Goal).filter(Goal.id == goal_id).first()
if not goal:
return {"status": "failed", "goal_id": goal_id, "error": "目标不存在"}
# 更新状态
goal.status = "active"
db.commit()
service = MainAgentService(db)
result = asyncio.run(service.start_goal_execution(goal_id))
elapsed = int((time.time() - start_time) * 1000)
return {
"status": "completed",
"goal_id": goal_id,
"elapsed_ms": elapsed,
**result,
}
except Exception as e:
logger.error(f"Goal execution failed: {e}", exc_info=True)
goal = db.query(Goal).filter(Goal.id == goal_id).first()
if goal:
goal.status = "failed"
db.commit()
return {"status": "failed", "goal_id": goal_id, "error": str(e)}
finally:
db.close()
@celery_app.task(bind=True)
def execute_task_celery(self, task_id: str):
"""
异步执行单个 Task。
Main Agent 创建 Execution 记录后将任务交给 Celery Worker 执行。
"""
db = SessionLocal()
start_time = time.time()
try:
service = MainAgentService(db)
result = asyncio.run(service.execute_task(task_id))
elapsed = int((time.time() - start_time) * 1000)
return {
"status": result.get("status", "completed"),
"task_id": task_id,
"elapsed_ms": elapsed,
**result,
}
except Exception as e:
logger.error(f"Task execution failed: {e}", exc_info=True)
return {"status": "failed", "task_id": task_id, "error": str(e)}
finally:
db.close()
@celery_app.task(bind=True)
def autonomy_tick_task(self, goal_id: str):
"""
自主循环单次心跳:检查进度 → 执行可运行任务 → 处理失败 → 通知。
由 Celery Beat 定时调度(根据 Goal.autonomy_config.check_interval_minutes
"""
db = SessionLocal()
try:
service = MainAgentService(db)
result = asyncio.run(service.autonomy_tick(goal_id))
logger.info(f"Autonomy tick for goal {goal_id}: {result.get('status', 'unknown')}")
return {"status": "completed", "goal_id": goal_id, **result}
except Exception as e:
logger.error(f"Autonomy tick failed for goal {goal_id}: {e}", exc_info=True)
return {"status": "failed", "goal_id": goal_id, "error": str(e)}
finally:
db.close()