Files
aiagent/backend/app/services/main_agent_service.py
renjianbo 926ec6c0a1 feat: add Main Agent core service, tools, and Celery tasks (Phase 2)
数字员工大脑 — Main Agent 核心实现:
- MainAgentService: 目标分解(LLM)、任务调度、进度监控、失败重试、自主循环
- 4个 Main Agent 专有工具: create_task / assign_task / check_progress / notify_user
- Celery 异步任务: decompose_goal / execute_goal / execute_task / autonomy_tick
- Goal API 增强: decompose / execute-async / replan 端点

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-08 19:58:53 +08:00

512 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Main Agent 核心服务 — 数字员工的大脑
Main Agent 是管理 Goal 的特殊 Agent它:
1. 理解目标 → LLM 分解为 Task 树
2. 按优先级/依赖调度 → 选择执行策略单Agent/编排/工作流)
3. 监控进度 → 重试失败 / 重新分配 / 升级告警
4. 主动通知用户 → 飞书/站内/邮件
"""
from __future__ import annotations
import asyncio
import json
import logging
import uuid
from datetime import datetime
from typing import Any, Dict, List, Optional
from sqlalchemy.orm import Session
from openai import AsyncOpenAI
from app.core.config import settings
from app.core.database import SessionLocal
from app.models.goal import Goal
from app.models.task import Task
from app.models.agent import Agent
from app.models.execution import Execution
from app.services import goal_service
from app.services.tool_registry import tool_registry
from app.agent_runtime.core import AgentRuntime
from app.agent_runtime.schemas import AgentConfig, AgentLLMConfig, AgentToolConfig, AgentResult
logger = logging.getLogger(__name__)
# ──────────────────── Main Agent 系统提示词 ────────────────────
MAIN_AGENT_SYSTEM_PROMPT = """你是天工智能体平台的主控智能体Main Agent是一个数字员工团队的"项目经理"
你的职责是:
1. **理解目标**深入理解用户的目标Goal分析可行性、识别风险
2. **分解任务**将目标拆解为可执行的子任务Task明确依赖关系和优先级
3. **组建团队**:根据任务特点选择合适的 Agent你拥有调用其他 Agent 的能力)
4. **监控执行**:追踪每个任务的进度,失败时自动重试或重新分配
5. **主动汇报**:进度更新、遇到阻塞、任务完成时主动通知用户
你有以下专用工具可使用:
- `create_task` — 创建一个子任务(写入数据库)
- `assign_task` — 将任务分配给特定 Agent
- `check_progress` — 查看目标下所有任务的执行进度
- `invoke_agent` — 调用一个已有的 Specialist Agent 执行具体工作
- `invoke_workflow` — 调用已有的工作流
- `notify_user` — 发送通知给用户
- `web_search` — 搜索互联网获取信息
工作原则:
- 分解任务时要具体、可执行、有明确的完成标准
- 优先并行化无依赖的任务
- 失败时先尝试一次重试,再考虑重新分配或降级处理
- 保持透明:让用户知道你在做什么、为什么这样做
- 遇到必须人工决策的事情(审批),使用 `wait_for_approval` 工具
"""
class MainAgentService:
"""Main Agent 服务 — 管理 Goal 的全生命周期"""
def __init__(self, db: Session):
self.db = db
# ──────────── LLM 客户端 ────────────
def _get_llm_client(self) -> AsyncOpenAI:
return AsyncOpenAI(
api_key=settings.DEEPSEEK_API_KEY or settings.OPENAI_API_KEY,
base_url=settings.DEEPSEEK_BASE_URL or settings.OPENAI_BASE_URL,
timeout=120.0,
)
def _get_llm_model(self) -> str:
if settings.DEEPSEEK_API_KEY:
return "deepseek-chat"
return "gpt-4o-mini"
async def _call_llm(self, messages: List[Dict[str, str]], tools: Optional[List[Dict]] = None) -> Dict[str, Any]:
"""调用 LLM支持工具调用"""
client = self._get_llm_client()
kwargs: Dict[str, Any] = {
"model": self._get_llm_model(),
"messages": messages,
"temperature": 0.5,
"max_tokens": 4096,
}
if tools:
kwargs["tools"] = tools
kwargs["tool_choice"] = "auto"
response = await client.chat.completions.create(**kwargs)
msg = response.choices[0].message
result = {
"content": msg.content or "",
"role": msg.role,
}
if msg.tool_calls:
result["tool_calls"] = [
{
"id": tc.id,
"type": tc.type,
"function": {
"name": tc.function.name,
"arguments": tc.function.arguments,
},
}
for tc in msg.tool_calls
]
return result
# ──────────── 目标分解 ────────────
async def decompose_goal(self, goal_id: str) -> Goal:
"""
使用 LLM 将目标分解为 Task 树。
步骤:
1. 读取 Goal 的 title/description
2. 构造提示词 → LLM 输出结构化任务列表
3. 为每个任务调用 create_task 写入数据库
4. 更新 Goal.plan 字段
"""
goal = goal_service.get_goal(self.db, goal_id)
# 获取平台上可用的 Agent 列表
agents = self.db.query(Agent).filter(Agent.status == "published").all()
agent_list = "\n".join(
f" - {a.name} (id={a.id}): {a.description or '无描述'}"
for a in agents[:20]
)
messages = [
{"role": "system", "content": f"""你是天工平台的任务分解专家。用户给出一个目标,你需要将其分解为可执行的子任务。
当前平台上可用的 Specialist Agent
{agent_list if agent_list else '(暂无已发布的 Agent使用通用工具处理'}
请分析目标并以 JSON 格式输出任务分解方案。
输出格式:
{{
"analysis": "对目标的分析1-2句话",
"tasks": [
{{
"title": "任务标题",
"description": "任务详细描述",
"priority": 1-10 (数字越小越优先),
"depends_on": ["依赖的 task_index0-based无依赖则为空数组"],
"assigned_agent_id": "匹配的 Agent ID可为 null",
"assigned_agent_name": "Agent 名称",
"task_config": {{"orchestration_mode": "agent"}},
"estimated_effort": "预估工作量说明"
}}
],
"execution_order": "推荐的执行顺序说明"
}}
要求:
- 每个任务都要具体、可衡量、有明确完成标准
- 优先并行化无依赖的任务
- 尽量将任务分配给匹配的 Specialist Agent
- 任务数量控制在 3-8 个(除非目标特别复杂)
- 仅输出 JSON不要其他文字"""},
{"role": "user", "content": f"目标:{goal.title}\n\n描述:{goal.description or ''}\n\n请分解此目标。"}
]
response = await self._call_llm(messages)
content = response.get("content", "")
# 解析 LLM 输出的 JSON
try:
# 提取 JSON 块(排除 markdown 代码块标记)
json_str = content
if "```json" in json_str:
json_str = json_str.split("```json")[1].split("```")[0]
elif "```" in json_str:
json_str = json_str.split("```")[1].split("```")[0]
plan = json.loads(json_str)
except (json.JSONDecodeError, IndexError, KeyError) as e:
logger.error(f"Failed to parse LLM decomposition output: {e}\nRaw: {content[:500]}")
raise RuntimeError(f"目标分解失败LLM 输出格式异常")
tasks_data = plan.get("tasks", [])
if not tasks_data:
raise RuntimeError("目标分解失败LLM 未输出有效任务列表")
# 创建任务(先创建所有 task 记录再建立依赖)
created_tasks: List[Task] = []
id_to_index: Dict[int, str] = {}
for idx, td in enumerate(tasks_data):
task = goal_service.create_task(
db=self.db,
goal_id=goal_id,
title=td.get("title", f"任务 {idx+1}"),
description=td.get("description", ""),
priority=td.get("priority", 5),
assigned_agent_id=td.get("assigned_agent_id"),
assigned_agent_name=td.get("assigned_agent_name"),
task_config=td.get("task_config", {"orchestration_mode": "agent"}),
)
created_tasks.append(task)
id_to_index[idx] = task.id
# 建立依赖关系
for idx, td in enumerate(tasks_data):
deps = td.get("depends_on", [])
if deps:
dep_ids = [id_to_index[di] for di in deps if di in id_to_index]
if dep_ids:
task = created_tasks[idx]
task.depends_on = dep_ids
self.db.add(task)
self.db.commit()
# 更新 Goal.plan
goal_service.update_goal(
db=self.db,
goal_id=goal_id,
plan={
"analysis": plan.get("analysis", ""),
"execution_order": plan.get("execution_order", ""),
"task_count": len(created_tasks),
"decomposed_at": datetime.now().isoformat(),
},
)
logger.info(f"Goal {goal_id} decomposed into {len(created_tasks)} tasks")
return goal_service.get_goal(self.db, goal_id)
# ──────────── 任务执行 ────────────
async def execute_task(self, task_id: str) -> Dict[str, Any]:
"""
执行单个任务。
根据 task_config.orchestration_mode 选择执行策略:
- "agent" (默认) → 调用单个 Agent
- "workflow" → 调用工作流引擎
"""
task = goal_service.get_task(self.db, task_id)
# 检查依赖
if not goal_service.get_task_dependencies_met(self.db, task_id):
return {"status": "blocked", "reason": "前置依赖未完成"}
# 更新状态
goal_service.update_task(self.db, task_id, status="in_progress")
task_config = task.task_config or {}
mode = task_config.get("orchestration_mode", "agent")
try:
if mode == "workflow":
result = await self._execute_as_workflow(task)
else:
result = await self._execute_as_agent(task)
# 更新结果
goal_service.update_task(
self.db, task_id,
status="completed",
result=result,
)
return {"status": "completed", "result": result}
except Exception as e:
err = str(e)
logger.error(f"Task {task_id} execution failed: {err}")
goal_service.update_task(
self.db, task_id,
status="failed",
error_message=err,
)
return {"status": "failed", "error": err}
async def _execute_as_agent(self, task: Task) -> Dict[str, Any]:
"""以 Agent 模式执行任务"""
agent_id = task.assigned_agent_id
agent = None
if agent_id:
agent = self.db.query(Agent).filter(Agent.id == agent_id).first()
config = task.task_config or {}
# 构建 Agent 配置
agent_config = AgentConfig(
name=f"task_{task.id[:8]}",
system_prompt=config.get("system_prompt",
f"你需要完成以下任务:\n{task.title}\n\n{task.description or ''}\n\n请使用可用工具完成任务。"),
llm=AgentLLMConfig(
provider=config.get("provider", "deepseek"),
model=config.get("model", "deepseek-chat"),
temperature=config.get("temperature", 0.7),
max_iterations=config.get("max_iterations", 15),
),
tools=AgentToolConfig(include_tools=task_config_tool_whitelist(config)),
)
runtime = AgentRuntime(agent_config)
task_input = config.get("input_data", {})
user_input = task_input.get("user_input",
f"请完成以下任务:{task.title}\n\n{task.description or ''}")
result: AgentResult = await runtime.run(user_input)
return {
"content": result.content,
"success": result.success,
"iterations": result.iterations_used,
"tool_calls": result.tool_calls_made,
"steps": [s.model_dump() for s in result.steps[-5:]] if result.steps else [],
}
async def _execute_as_workflow(self, task: Task) -> Dict[str, Any]:
"""以工作流模式执行任务"""
config = task.task_config or {}
workflow_id = config.get("workflow_id")
if not workflow_id:
raise RuntimeError("工作流模式缺少 workflow_id")
from app.models.workflow import Workflow
wf = self.db.query(Workflow).filter(Workflow.id == workflow_id).first()
if not wf:
raise RuntimeError(f"工作流 {workflow_id} 不存在")
workflow_data = {"nodes": wf.nodes, "edges": wf.edges}
input_data = config.get("input_data", {"task_title": task.title, "task_description": task.description})
from app.services.workflow_engine import WorkflowEngine
engine = WorkflowEngine(str(workflow_id), workflow_data, db=self.db)
result = await asyncio.to_thread(
lambda: asyncio.run(engine.execute(input_data))
)
return {"output": result}
# ──────────── 进度监控 ────────────
async def monitor_progress(self, goal_id: str) -> Dict[str, Any]:
"""检查目标所有任务的进度,更新 Goal.progress"""
tasks = goal_service.list_tasks(self.db, goal_id=goal_id, limit=200)
goal = goal_service.update_goal_progress(self.db, goal_id)
by_status = {}
for t in tasks:
by_status.setdefault(t.status, 0)
by_status[t.status] += 1
# 检查是否有失败任务需要处理
failed_tasks = [t for t in tasks if t.status == "failed"]
stuck_tasks = [t for t in tasks if t.status == "in_progress"
and t.started_at and (datetime.now() - t.started_at).total_seconds() > 3600]
return {
"goal_id": goal_id,
"progress": goal.progress,
"total_tasks": len(tasks),
"by_status": by_status,
"failed_count": len(failed_tasks),
"stuck_count": len(stuck_tasks),
}
async def handle_task_failure(self, task_id: str) -> Dict[str, Any]:
"""
处理任务失败:尝试重试 → 重新分配 → 升级通知
"""
task = goal_service.get_task(self.db, task_id)
if task.status != "failed":
return {"status": "skipped", "reason": f"任务状态为 {task.status},无需处理"}
# 重试一次
logger.info(f"Retrying failed task {task_id}")
result = await self.execute_task(task_id)
if result.get("status") == "completed":
return {"status": "retry_success", "task_id": task_id}
# 重新分配:尝试找其他匹配的 Agent
goal = goal_service.get_goal(self.db, task.goal_id)
other_agents = self.db.query(Agent).filter(
Agent.id != task.assigned_agent_id,
Agent.status == "published",
).limit(5).all()
if other_agents:
new_agent = other_agents[0]
goal_service.update_task(
self.db, task_id,
status="pending",
error_message=None,
assigned_agent_id=new_agent.id,
assigned_agent_name=new_agent.name,
)
logger.info(f"Task {task_id} reassigned to {new_agent.name}")
return {"status": "reassigned", "task_id": task_id, "new_agent": new_agent.name}
# 无法自动恢复:记录错误消息
goal_service.update_task(
self.db, task_id,
error_message=f"{task.error_message or ''} | 自动重试和重新分配均失败,需人工介入",
)
logger.warning(f"Task {task_id} needs manual intervention")
return {"status": "needs_manual_intervention", "task_id": task_id}
# ──────────── 自主循环 ────────────
async def autonomy_tick(self, goal_id: str) -> Dict[str, Any]:
"""
自主循环单次心跳:
1. 检查进度
2. 找出可执行的任务(依赖已满足 + pending 状态)
3. 执行一个任务
4. 处理失败任务
5. 检查目标是否完成
6. 通知用户
"""
goal = goal_service.get_goal(self.db, goal_id)
if goal.status != "active":
return {"status": "skipped", "reason": f"Goal status is {goal.status}"}
progress = await self.monitor_progress(goal_id)
# 先处理失败任务
for _ in range(progress["failed_count"]):
failed = self.db.query(Task).filter(
Task.goal_id == goal_id, Task.status == "failed"
).first()
if failed:
await self.handle_task_failure(failed.id)
# 找可执行的任务
ready_tasks = self._find_ready_tasks(goal_id)
if ready_tasks:
task = ready_tasks[0] # 按优先级选择第一个
result = await self.execute_task(task.id)
return {"status": "executed", "task_id": task.id, "result": result}
# 检查是否全部完成
progress = await self.monitor_progress(goal_id)
if progress.get("by_status", {}).get("completed", 0) == progress["total_tasks"]:
goal_service.update_goal(self.db, goal_id, status="completed")
return {"status": "goal_completed", "goal_id": goal_id}
return {"status": "idle", "goal_id": goal_id,
"progress": progress["progress"],
"pending_review": progress.get("by_status", {}).get("failed", 0) > 0}
def _find_ready_tasks(self, goal_id: str) -> List[Task]:
"""找出所有依赖已满足的 pending 任务,按优先级排序"""
all_tasks = goal_service.list_tasks(self.db, goal_id=goal_id, limit=200)
ready = []
for t in all_tasks:
if t.status != "pending":
continue
if goal_service.get_task_dependencies_met(self.db, t.id):
ready.append(t)
ready.sort(key=lambda t: t.priority)
return ready
# ──────────── 入口 ────────────
async def start_goal_execution(self, goal_id: str) -> Dict[str, Any]:
"""
启动 Main Agent 管理目标(完整入口):
1. 分解目标
2. 持续执行直到完成或阻塞
"""
goal = goal_service.get_goal(self.db, goal_id)
if goal.status not in ("active", "pending"):
goal_service.update_goal(self.db, goal_id, status="active")
# 如果还没有任务,先分解
existing_tasks = goal_service.list_tasks(self.db, goal_id=goal_id, limit=1)
if not existing_tasks:
logger.info(f"Decomposing goal {goal_id}")
await self.decompose_goal(goal_id)
goal.started_at = datetime.now()
self.db.commit()
# 执行自主循环(最多 10 轮,避免无限循环)
results = []
for i in range(10):
tick = await self.autonomy_tick(goal_id)
results.append(tick)
if tick.get("status") in ("goal_completed", "idle"):
break
# 等 2 秒再下一次 tick
await asyncio.sleep(2)
return {
"goal_id": goal_id,
"ticks": len(results),
"status": goal_service.get_goal(self.db, goal_id).status,
"results": results,
}
def task_config_tool_whitelist(config: Dict) -> Optional[List[str]]:
"""从 task_config 中提取工具白名单,空则返回 None全部工具可用"""
tools = config.get("tools") or config.get("include_tools")
if not tools:
return None
if isinstance(tools, list):
return tools
return None