From 02d7cf8f62c9dfaf32fce571b0e5dd4a7f5a65a5 Mon Sep 17 00:00:00 2001 From: renjianbo <18691577328@163.com> Date: Fri, 8 May 2026 19:50:16 +0800 Subject: [PATCH] feat: add Goal/Task data models, service layer, and API routes (Phase 1) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Main Agent 数字员工工厂基础设施: - 新增 Goal 和 Task SQLAlchemy 数据模型 - Agent 模型新增 agent_type / input_schema / output_schema - Execution 模型新增 goal_id 关联 - 新增 Goal/Task CRUD 服务层(含依赖检查、任务树、进度计算) - 新增 /api/v1/goals (9端点) + /api/v1/tasks (8端点) - 数据库迁移 013_add_goals_tasks Co-Authored-By: Claude Opus 4.6 --- .../alembic/versions/013_add_goals_tasks.py | 81 ++++ backend/app/api/goals.py | 204 ++++++++++ backend/app/api/tasks.py | 240 ++++++++++++ backend/app/core/database.py | 2 + backend/app/main.py | 12 +- backend/app/models/__init__.py | 4 +- backend/app/models/agent.py | 3 + backend/app/models/execution.py | 1 + backend/app/models/goal.py | 45 +++ backend/app/models/task.py | 59 +++ backend/app/services/goal_service.py | 368 ++++++++++++++++++ 11 files changed, 1017 insertions(+), 2 deletions(-) create mode 100644 backend/alembic/versions/013_add_goals_tasks.py create mode 100644 backend/app/api/goals.py create mode 100644 backend/app/api/tasks.py create mode 100644 backend/app/models/goal.py create mode 100644 backend/app/models/task.py create mode 100644 backend/app/services/goal_service.py diff --git a/backend/alembic/versions/013_add_goals_tasks.py b/backend/alembic/versions/013_add_goals_tasks.py new file mode 100644 index 0000000..7e73282 --- /dev/null +++ b/backend/alembic/versions/013_add_goals_tasks.py @@ -0,0 +1,81 @@ +"""add goals and tasks tables, agent_type, goal_id + +Revision ID: 013_add_goals_tasks +Revises: 012_schedule_fk_set_null +Create Date: 2026-05-07 +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects.mysql import CHAR + +revision = "013_add_goals_tasks" +down_revision = "012_schedule_fk_set_null" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # ── goals 表 ── + op.create_table( + "goals", + sa.Column("id", CHAR(36), primary_key=True), + sa.Column("title", sa.String(500), nullable=False), + sa.Column("description", sa.Text), + sa.Column("status", sa.String(20), default="active"), + sa.Column("priority", sa.Integer, default=5), + sa.Column("progress", sa.Float, default=0.0), + sa.Column("plan", sa.JSON), + sa.Column("autonomy_config", sa.JSON), + sa.Column("creator_id", CHAR(36), sa.ForeignKey("users.id"), nullable=False), + sa.Column("main_agent_id", CHAR(36), sa.ForeignKey("agents.id"), nullable=True), + sa.Column("parent_goal_id", CHAR(36), sa.ForeignKey("goals.id"), nullable=True), + sa.Column("started_at", sa.DateTime), + sa.Column("completed_at", sa.DateTime), + sa.Column("deadline", sa.DateTime), + sa.Column("created_at", sa.DateTime, server_default=sa.func.now()), + sa.Column("updated_at", sa.DateTime, server_default=sa.func.now(), onupdate=sa.func.now()), + ) + + # ── tasks 表 ── + op.create_table( + "tasks", + sa.Column("id", CHAR(36), primary_key=True), + sa.Column("goal_id", CHAR(36), sa.ForeignKey("goals.id"), nullable=False), + sa.Column("title", sa.String(500), nullable=False), + sa.Column("description", sa.Text), + sa.Column("status", sa.String(20), default="pending"), + sa.Column("priority", sa.Integer, default=5), + sa.Column("task_config", sa.JSON), + sa.Column("parent_task_id", CHAR(36), sa.ForeignKey("tasks.id"), nullable=True), + sa.Column("depends_on", sa.JSON, default=list), + sa.Column("result", sa.JSON), + sa.Column("error_message", sa.Text), + sa.Column("execution_id", CHAR(36), sa.ForeignKey("executions.id"), nullable=True), + sa.Column("assigned_agent_id", CHAR(36), sa.ForeignKey("agents.id"), nullable=True), + sa.Column("assigned_agent_name", sa.String(200)), + sa.Column("requires_approval", sa.Boolean, default=False), + sa.Column("approver_id", CHAR(36), sa.ForeignKey("users.id"), nullable=True), + sa.Column("approval_status", sa.String(20)), + sa.Column("started_at", sa.DateTime), + sa.Column("completed_at", sa.DateTime), + sa.Column("deadline", sa.DateTime), + sa.Column("created_at", sa.DateTime, server_default=sa.func.now()), + sa.Column("updated_at", sa.DateTime, server_default=sa.func.now(), onupdate=sa.func.now()), + ) + + # ── agents 表新增字段 ── + op.add_column("agents", sa.Column("agent_type", sa.String(20), default="specialist")) + op.add_column("agents", sa.Column("input_schema", sa.JSON, nullable=True)) + op.add_column("agents", sa.Column("output_schema", sa.JSON, nullable=True)) + + # ── executions 表新增 goal_id ── + op.add_column("executions", sa.Column("goal_id", CHAR(36), sa.ForeignKey("goals.id"), nullable=True)) + + +def downgrade() -> None: + op.drop_column("executions", "goal_id") + op.drop_column("agents", "output_schema") + op.drop_column("agents", "input_schema") + op.drop_column("agents", "agent_type") + op.drop_table("tasks") + op.drop_table("goals") diff --git a/backend/app/api/goals.py b/backend/app/api/goals.py new file mode 100644 index 0000000..7f444bb --- /dev/null +++ b/backend/app/api/goals.py @@ -0,0 +1,204 @@ +""" +Goal API — 目标管理接口 +""" +from fastapi import APIRouter, Depends, Query +from sqlalchemy.orm import Session +from pydantic import BaseModel, Field +from typing import List, Optional, Dict, Any +from datetime import datetime +import logging + +from app.core.database import get_db +from app.api.auth import get_current_user +from app.models.user import User +from app.services import goal_service + +logger = logging.getLogger(__name__) + +router = APIRouter( + prefix="/api/v1/goals", + tags=["goals"], + responses={ + 401: {"description": "未授权"}, + 404: {"description": "资源不存在"}, + 400: {"description": "请求参数错误"}, + }, +) + + +# ──────────────────────────── Schemas ──────────────────────────── + +class GoalCreate(BaseModel): + title: str + description: str = "" + priority: int = Field(default=5, ge=1, le=10) + deadline: Optional[datetime] = None + main_agent_id: Optional[str] = None + parent_goal_id: Optional[str] = None + autonomy_config: Optional[Dict[str, Any]] = None + + +class GoalUpdate(BaseModel): + title: Optional[str] = None + description: Optional[str] = None + status: Optional[str] = None + priority: Optional[int] = Field(default=None, ge=1, le=10) + plan: Optional[List[Dict[str, Any]]] = None + autonomy_config: Optional[Dict[str, Any]] = None + deadline: Optional[datetime] = None + main_agent_id: Optional[str] = None + + +class GoalResponse(BaseModel): + id: str + title: str + description: Optional[str] + status: str + priority: int + progress: float + plan: Optional[Any] + autonomy_config: Optional[Any] + creator_id: str + main_agent_id: Optional[str] + parent_goal_id: Optional[str] + started_at: Optional[datetime] + completed_at: Optional[datetime] + deadline: Optional[datetime] + created_at: datetime + updated_at: datetime + + class Config: + from_attributes = True + + +class GoalTaskTreeResponse(BaseModel): + goal: Dict[str, Any] + tasks: List[Dict[str, Any]] + + +# ──────────────────────────── Endpoints ──────────────────────────── + +@router.post("", response_model=GoalResponse, status_code=201) +def create_goal( + data: GoalCreate, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db), +): + """创建新目标""" + return goal_service.create_goal( + db=db, + creator_id=current_user.id, + title=data.title, + description=data.description, + priority=data.priority, + deadline=data.deadline, + main_agent_id=data.main_agent_id, + parent_goal_id=data.parent_goal_id, + autonomy_config=data.autonomy_config, + ) + + +@router.get("", response_model=List[GoalResponse]) +def list_goals( + status: Optional[str] = None, + skip: int = Query(default=0, ge=0), + limit: int = Query(default=20, ge=1, le=100), + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db), +): + """列出目标""" + return goal_service.list_goals( + db=db, + creator_id=current_user.id, + status=status, + skip=skip, + limit=limit, + ) + + +@router.get("/{goal_id}", response_model=GoalResponse) +def get_goal( + goal_id: str, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db), +): + """获取目标详情""" + return goal_service.get_goal(db, goal_id) + + +@router.put("/{goal_id}", response_model=GoalResponse) +def update_goal( + goal_id: str, + data: GoalUpdate, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db), +): + """更新目标""" + return goal_service.update_goal( + db=db, + goal_id=goal_id, + title=data.title, + description=data.description, + status=data.status, + priority=data.priority, + plan=data.plan, + autonomy_config=data.autonomy_config, + deadline=data.deadline, + main_agent_id=data.main_agent_id, + ) + + +@router.delete("/{goal_id}", status_code=204) +def delete_goal( + goal_id: str, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db), +): + """删除目标""" + goal_service.delete_goal(db, goal_id) + return None + + +@router.post("/{goal_id}/start", response_model=GoalResponse) +def start_goal( + goal_id: str, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db), +): + """启动目标执行""" + goal = goal_service.update_goal(db, goal_id, status="active") + goal.started_at = datetime.now() + db.commit() + db.refresh(goal) + logger.info(f"Goal started: {goal_id}") + return goal + + +@router.post("/{goal_id}/pause", response_model=GoalResponse) +def pause_goal( + goal_id: str, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db), +): + """暂停目标执行""" + return goal_service.update_goal(db, goal_id, status="paused") + + +@router.post("/{goal_id}/resume", response_model=GoalResponse) +def resume_goal( + goal_id: str, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db), +): + """恢复目标执行""" + return goal_service.update_goal(db, goal_id, status="active") + + +@router.get("/{goal_id}/tasks", response_model=GoalTaskTreeResponse) +def get_goal_task_tree( + goal_id: str, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db), +): + """获取目标的任务树""" + return goal_service.get_goal_task_tree(db, goal_id) diff --git a/backend/app/api/tasks.py b/backend/app/api/tasks.py new file mode 100644 index 0000000..6799db1 --- /dev/null +++ b/backend/app/api/tasks.py @@ -0,0 +1,240 @@ +""" +Task API — 任务管理接口 +""" +from fastapi import APIRouter, Depends, Query +from sqlalchemy.orm import Session +from pydantic import BaseModel, Field +from typing import List, Optional, Dict, Any +from datetime import datetime +import logging + +from app.core.database import get_db +from app.api.auth import get_current_user +from app.models.user import User +from app.services import goal_service + +logger = logging.getLogger(__name__) + +router = APIRouter( + prefix="/api/v1/tasks", + tags=["tasks"], + responses={ + 401: {"description": "未授权"}, + 404: {"description": "资源不存在"}, + 400: {"description": "请求参数错误"}, + }, +) + + +# ──────────────────────────── Schemas ──────────────────────────── + +class TaskCreate(BaseModel): + goal_id: str + title: str + description: str = "" + priority: int = Field(default=5, ge=1, le=10) + parent_task_id: Optional[str] = None + depends_on: Optional[List[str]] = None + assigned_agent_id: Optional[str] = None + assigned_agent_name: Optional[str] = None + task_config: Optional[Dict[str, Any]] = None + deadline: Optional[datetime] = None + requires_approval: bool = False + approver_id: Optional[str] = None + + +class TaskUpdate(BaseModel): + title: Optional[str] = None + description: Optional[str] = None + status: Optional[str] = None + priority: Optional[int] = Field(default=None, ge=1, le=10) + task_config: Optional[Dict[str, Any]] = None + depends_on: Optional[List[str]] = None + assigned_agent_id: Optional[str] = None + assigned_agent_name: Optional[str] = None + result: Optional[Dict[str, Any]] = None + error_message: Optional[str] = None + execution_id: Optional[str] = None + deadline: Optional[datetime] = None + + +class TaskResponse(BaseModel): + id: str + goal_id: str + title: str + description: Optional[str] + status: str + priority: int + task_config: Optional[Any] + parent_task_id: Optional[str] + depends_on: Optional[Any] + result: Optional[Any] + error_message: Optional[str] + execution_id: Optional[str] + assigned_agent_id: Optional[str] + assigned_agent_name: Optional[str] + requires_approval: bool + approver_id: Optional[str] + approval_status: Optional[str] + started_at: Optional[datetime] + completed_at: Optional[datetime] + deadline: Optional[datetime] + created_at: datetime + updated_at: datetime + + class Config: + from_attributes = True + + +class TaskDependencyCheck(BaseModel): + task_id: str + dependencies_met: bool + pending_dependencies: List[str] = [] + + +# ──────────────────────────── Endpoints ──────────────────────────── + +@router.post("", response_model=TaskResponse, status_code=201) +def create_task( + data: TaskCreate, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db), +): + """创建新任务""" + return goal_service.create_task( + db=db, + goal_id=data.goal_id, + title=data.title, + description=data.description, + priority=data.priority, + parent_task_id=data.parent_task_id, + depends_on=data.depends_on, + assigned_agent_id=data.assigned_agent_id, + assigned_agent_name=data.assigned_agent_name, + task_config=data.task_config, + deadline=data.deadline, + requires_approval=data.requires_approval, + approver_id=data.approver_id, + ) + + +@router.get("", response_model=List[TaskResponse]) +def list_tasks( + goal_id: Optional[str] = None, + status: Optional[str] = None, + assigned_agent_id: Optional[str] = None, + parent_task_id: Optional[str] = None, + skip: int = Query(default=0, ge=0), + limit: int = Query(default=50, ge=1, le=200), + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db), +): + """列出任务""" + return goal_service.list_tasks( + db=db, + goal_id=goal_id, + status=status, + assigned_agent_id=assigned_agent_id, + parent_task_id=parent_task_id, + skip=skip, + limit=limit, + ) + + +@router.get("/{task_id}", response_model=TaskResponse) +def get_task( + task_id: str, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db), +): + """获取任务详情""" + return goal_service.get_task(db, task_id) + + +@router.put("/{task_id}", response_model=TaskResponse) +def update_task( + task_id: str, + data: TaskUpdate, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db), +): + """更新任务""" + return goal_service.update_task( + db=db, + task_id=task_id, + title=data.title, + description=data.description, + status=data.status, + priority=data.priority, + task_config=data.task_config, + depends_on=data.depends_on, + assigned_agent_id=data.assigned_agent_id, + assigned_agent_name=data.assigned_agent_name, + result=data.result, + error_message=data.error_message, + execution_id=data.execution_id, + deadline=data.deadline, + ) + + +@router.delete("/{task_id}", status_code=204) +def delete_task( + task_id: str, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db), +): + """删除任务""" + goal_service.delete_task(db, task_id) + return None + + +@router.get("/{task_id}/check-dependencies", response_model=TaskDependencyCheck) +def check_task_dependencies( + task_id: str, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db), +): + """检查任务的前置依赖是否满足""" + met = goal_service.get_task_dependencies_met(db, task_id) + task = goal_service.get_task(db, task_id) + pending = [] + if not met: + for dep_id in (task.depends_on or []): + from app.models.task import Task + dep = db.query(Task).filter(Task.id == dep_id).first() + if dep and dep.status != "completed": + pending.append(dep_id) + return TaskDependencyCheck( + task_id=task_id, + dependencies_met=met, + pending_dependencies=pending, + ) + + +@router.post("/{task_id}/approve", response_model=TaskResponse) +def approve_task( + task_id: str, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db), +): + """审批通过任务""" + return goal_service.update_task( + db=db, + task_id=task_id, + status="in_progress", + ) + + +@router.post("/{task_id}/reject", response_model=TaskResponse) +def reject_task( + task_id: str, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db), +): + """审批驳回任务""" + return goal_service.update_task( + db=db, + task_id=task_id, + status="failed", + error_message="审批驳回", + ) diff --git a/backend/app/core/database.py b/backend/app/core/database.py index 316e079..3817ee8 100644 --- a/backend/app/core/database.py +++ b/backend/app/core/database.py @@ -54,4 +54,6 @@ def init_db(): import app.models.notification import app.models.orchestration_template import app.models.plugin + import app.models.goal + import app.models.task Base.metadata.create_all(bind=engine) diff --git a/backend/app/main.py b/backend/app/main.py index 512b798..3dd6917 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -85,6 +85,14 @@ Authorization: Bearer { "name": "websocket", "description": "WebSocket API,用于实时推送执行状态。" + }, + { + "name": "goals", + "description": "目标管理API — Main Agent 数字员工的目标 CRUD、启停、任务树查看。" + }, + { + "name": "tasks", + "description": "任务管理API — 目标分解后的子任务 CRUD、依赖检查、审批。" } ] ) @@ -263,7 +271,7 @@ async def startup_event(): logger.error(f"人参果1号长连接启动失败: {e}") # 注册路由 -from app.api import auth, uploads, workflows, executions, websocket, execution_logs, data_sources, agents, platform_templates, model_configs, webhooks, template_market, batch_operations, collaboration, permissions, monitoring, alert_rules, node_test, node_templates, tools, agent_chat, agent_monitoring, knowledge_base, agent_schedules, notifications, feishu_bind, approval, orchestration_templates, plugins, agent_market +from app.api import auth, uploads, workflows, executions, websocket, execution_logs, data_sources, agents, platform_templates, model_configs, webhooks, template_market, batch_operations, collaboration, permissions, monitoring, alert_rules, node_test, node_templates, tools, agent_chat, agent_monitoring, knowledge_base, agent_schedules, notifications, feishu_bind, approval, orchestration_templates, plugins, agent_market, goals, tasks app.include_router(auth.router) app.include_router(uploads.router) @@ -295,6 +303,8 @@ app.include_router(approval.router) app.include_router(plugins.router) app.include_router(orchestration_templates.router) app.include_router(agent_market.router) +app.include_router(goals.router) +app.include_router(tasks.router) if __name__ == "__main__": import uvicorn diff --git a/backend/app/models/__init__.py b/backend/app/models/__init__.py index 19c890e..3799aca 100644 --- a/backend/app/models/__init__.py +++ b/backend/app/models/__init__.py @@ -21,5 +21,7 @@ from app.models.notification import Notification from app.models.user_feishu_open_id import UserFeishuOpenId from app.models.plugin import NodePlugin from app.models.orchestration_template import OrchestrationTemplate +from app.models.goal import Goal +from app.models.task import Task -__all__ = ["User", "Workflow", "WorkflowVersion", "Agent", "GlobalKnowledge", "AgentRating", "AgentFavorite", "Execution", "ExecutionLog", "ModelConfig", "DataSource", "WorkflowTemplate", "TemplateRating", "TemplateFavorite", "NodeTemplate", "Role", "Permission", "WorkflowPermission", "AgentPermission", "AlertRule", "AlertLog", "PersistentUserMemory", "AgentLLMLog", "AgentVectorMemory", "AgentLearningPattern", "AgentSchedule", "KnowledgeBase", "Document", "DocumentChunk", "Notification", "UserFeishuOpenId", "NodePlugin", "OrchestrationTemplate"] \ No newline at end of file +__all__ = ["User", "Workflow", "WorkflowVersion", "Agent", "GlobalKnowledge", "AgentRating", "AgentFavorite", "Execution", "ExecutionLog", "ModelConfig", "DataSource", "WorkflowTemplate", "TemplateRating", "TemplateFavorite", "NodeTemplate", "Role", "Permission", "WorkflowPermission", "AgentPermission", "AlertRule", "AlertLog", "PersistentUserMemory", "AgentLLMLog", "AgentVectorMemory", "AgentLearningPattern", "AgentSchedule", "KnowledgeBase", "Document", "DocumentChunk", "Notification", "UserFeishuOpenId", "NodePlugin", "OrchestrationTemplate", "Goal", "Task"] \ No newline at end of file diff --git a/backend/app/models/agent.py b/backend/app/models/agent.py index 33f1840..7788ece 100644 --- a/backend/app/models/agent.py +++ b/backend/app/models/agent.py @@ -16,11 +16,14 @@ class Agent(Base): name = Column(String(100), nullable=False, comment="智能体名称") description = Column(Text, comment="描述") workflow_config = Column(JSON, nullable=False, comment="工作流配置") + agent_type = Column(String(20), default="specialist", comment="Agent类型: main/specialist") budget_config = Column( JSON, nullable=True, comment="执行预算:max_steps/max_llm_invocations/max_tool_calls(可选,覆盖全局默认)", ) + input_schema = Column(JSON, nullable=True, comment="输入数据类型约束(JSON Schema)") + output_schema = Column(JSON, nullable=True, comment="输出数据类型约束(JSON Schema)") version = Column(Integer, default=1, comment="版本号") status = Column(String(20), default="draft", comment="状态: draft/published/running/stopped") user_id = Column(CHAR(36), ForeignKey("users.id"), comment="创建者ID") diff --git a/backend/app/models/execution.py b/backend/app/models/execution.py index 3da184e..cc311a2 100644 --- a/backend/app/models/execution.py +++ b/backend/app/models/execution.py @@ -33,6 +33,7 @@ class Execution(Base): ) depth = Column(Integer, default=0, nullable=False, comment="执行深度(根为0)") pause_state = Column(JSON, nullable=True, comment="挂起快照(审批节点 HITL,恢复时消费)") + goal_id = Column(CHAR(36), ForeignKey("goals.id"), nullable=True, comment="关联目标ID") created_at = Column(DateTime, default=func.now(), comment="创建时间") # 关系 diff --git a/backend/app/models/goal.py b/backend/app/models/goal.py new file mode 100644 index 0000000..4b7e558 --- /dev/null +++ b/backend/app/models/goal.py @@ -0,0 +1,45 @@ +""" +目标模型 — Main Agent 管理的顶层目标 +""" +from sqlalchemy import Column, String, Text, Integer, Float, DateTime, JSON, ForeignKey, func +from sqlalchemy.dialects.mysql import CHAR +from sqlalchemy.orm import relationship +from app.core.database import Base +import uuid + + +class Goal(Base): + """目标表 — Main Agent 数字员工的顶层工作目标""" + __tablename__ = "goals" + + id = Column(CHAR(36), primary_key=True, default=lambda: str(uuid.uuid4()), comment="目标ID") + title = Column(String(500), nullable=False, comment="目标标题") + description = Column(Text, comment="目标描述(自然语言)") + status = Column(String(20), default="active", comment="状态: active/paused/completed/failed/cancelled") + priority = Column(Integer, default=5, comment="优先级 1-10") + progress = Column(Float, default=0.0, comment="完成进度 0.0 - 1.0") + + # Main Agent 分解后的结构化计划 + plan = Column(JSON, comment="执行计划: [{phase, tasks:[], assigned_agent_id, deadline}]") + + # 自主循环配置 + autonomy_config = Column(JSON, comment="自主循环配置: {check_interval_minutes, max_idle_hours, auto_replan, notify_on_progress}") + + # 关联 + creator_id = Column(CHAR(36), ForeignKey("users.id"), nullable=False, comment="创建者ID") + main_agent_id = Column(CHAR(36), ForeignKey("agents.id"), nullable=True, comment="管理此目标的 Main Agent ID") + parent_goal_id = Column(CHAR(36), ForeignKey("goals.id"), nullable=True, comment="父目标ID,支持目标嵌套") + + # 时间 + started_at = Column(DateTime, comment="开始时间") + completed_at = Column(DateTime, comment="完成时间") + deadline = Column(DateTime, comment="截止时间") + created_at = Column(DateTime, default=func.now(), comment="创建时间") + updated_at = Column(DateTime, default=func.now(), onupdate=func.now(), comment="更新时间") + + # 关系 + creator = relationship("User", backref="goals") + main_agent = relationship("Agent", backref="managed_goals") + + def __repr__(self): + return f"" diff --git a/backend/app/models/task.py b/backend/app/models/task.py new file mode 100644 index 0000000..f2c41ad --- /dev/null +++ b/backend/app/models/task.py @@ -0,0 +1,59 @@ +""" +任务模型 — Goal 拆解的子任务 +""" +from sqlalchemy import Column, String, Text, Integer, DateTime, JSON, Boolean, ForeignKey, func +from sqlalchemy.dialects.mysql import CHAR +from sqlalchemy.orm import relationship +from app.core.database import Base +import uuid + + +class Task(Base): + """任务表 — Main Agent 将目标分解为可执行的子任务""" + __tablename__ = "tasks" + + id = Column(CHAR(36), primary_key=True, default=lambda: str(uuid.uuid4()), comment="任务ID") + goal_id = Column(CHAR(36), ForeignKey("goals.id"), nullable=False, comment="所属目标ID") + title = Column(String(500), nullable=False, comment="任务标题") + description = Column(Text, comment="任务描述") + status = Column( + String(20), default="pending", + comment="状态: pending/in_progress/awaiting_approval/completed/failed/cancelled" + ) + priority = Column(Integer, default=5, comment="优先级 1-10") + + # 任务编排配置 + task_config = Column(JSON, comment="编排配置: {orchestration_mode, agents:[], workflow_id, input_data}") + + # 依赖关系 + parent_task_id = Column(CHAR(36), ForeignKey("tasks.id"), nullable=True, comment="父任务ID") + depends_on = Column(JSON, default=list, comment="前置依赖任务ID列表") + + # 执行结果 + result = Column(JSON, comment="执行输出结果") + error_message = Column(Text, comment="错误信息") + execution_id = Column(CHAR(36), ForeignKey("executions.id"), nullable=True, comment="关联的执行记录ID") + + # 分配 + assigned_agent_id = Column(CHAR(36), ForeignKey("agents.id"), nullable=True, comment="分配的 Agent ID") + assigned_agent_name = Column(String(200), comment="分配的 Agent 名称(冗余便于展示)") + + # 审批 + requires_approval = Column(Boolean, default=False, comment="是否需要人工审批") + approver_id = Column(CHAR(36), ForeignKey("users.id"), nullable=True, comment="审批人ID") + approval_status = Column(String(20), comment="审批状态: pending/approved/rejected") + + # 时间 + started_at = Column(DateTime, comment="开始时间") + completed_at = Column(DateTime, comment="完成时间") + deadline = Column(DateTime, comment="截止时间") + created_at = Column(DateTime, default=func.now(), comment="创建时间") + updated_at = Column(DateTime, default=func.now(), onupdate=func.now(), comment="更新时间") + + # 关系 + goal = relationship("Goal", backref="tasks") + assigned_agent = relationship("Agent", backref="assigned_tasks") + execution = relationship("Execution", backref="task") + + def __repr__(self): + return f"" diff --git a/backend/app/services/goal_service.py b/backend/app/services/goal_service.py new file mode 100644 index 0000000..a8caf05 --- /dev/null +++ b/backend/app/services/goal_service.py @@ -0,0 +1,368 @@ +""" +Goal & Task 业务服务层 +""" +from sqlalchemy.orm import Session +from typing import List, Optional, Dict, Any +from datetime import datetime +import uuid +import logging + +from app.models.goal import Goal +from app.models.task import Task +from app.core.exceptions import NotFoundError, ValidationError + +logger = logging.getLogger(__name__) + + +# ──────────────────────────── Goal CRUD ──────────────────────────── + +def create_goal( + db: Session, + creator_id: str, + title: str, + description: str = "", + priority: int = 5, + deadline: Optional[datetime] = None, + main_agent_id: Optional[str] = None, + parent_goal_id: Optional[str] = None, + autonomy_config: Optional[Dict[str, Any]] = None, +) -> Goal: + """创建目标""" + goal = Goal( + id=str(uuid.uuid4()), + title=title, + description=description or "", + priority=priority, + deadline=deadline, + creator_id=creator_id, + main_agent_id=main_agent_id, + parent_goal_id=parent_goal_id, + autonomy_config=autonomy_config or {}, + progress=0.0, + ) + db.add(goal) + db.commit() + db.refresh(goal) + logger.info(f"Goal created: {goal.id} - {goal.title}") + return goal + + +def get_goal(db: Session, goal_id: str) -> Goal: + """获取单个目标""" + goal = db.query(Goal).filter(Goal.id == goal_id).first() + if not goal: + raise NotFoundError("目标", goal_id) + return goal + + +def list_goals( + db: Session, + creator_id: Optional[str] = None, + status: Optional[str] = None, + skip: int = 0, + limit: int = 20, +) -> List[Goal]: + """列出目标(支持筛选和分页)""" + q = db.query(Goal) + if creator_id: + q = q.filter(Goal.creator_id == creator_id) + if status: + q = q.filter(Goal.status == status) + return q.order_by(Goal.created_at.desc()).offset(skip).limit(limit).all() + + +def update_goal( + db: Session, + goal_id: str, + title: Optional[str] = None, + description: Optional[str] = None, + status: Optional[str] = None, + priority: Optional[int] = None, + plan: Optional[List[Dict[str, Any]]] = None, + autonomy_config: Optional[Dict[str, Any]] = None, + deadline: Optional[datetime] = None, + main_agent_id: Optional[str] = None, +) -> Goal: + """更新目标""" + goal = get_goal(db, goal_id) + + if title is not None: + goal.title = title + if description is not None: + goal.description = description + if status is not None: + valid_statuses = {"active", "paused", "completed", "failed", "cancelled"} + if status not in valid_statuses: + raise ValidationError(f"无效的状态: {status},有效值: {valid_statuses}") + goal.status = status + if status == "completed": + goal.completed_at = datetime.now() + goal.progress = 1.0 + elif status == "failed" or status == "cancelled": + goal.completed_at = datetime.now() + if priority is not None: + if not 1 <= priority <= 10: + raise ValidationError("优先级必须在 1-10 之间") + goal.priority = priority + if plan is not None: + goal.plan = plan + if autonomy_config is not None: + goal.autonomy_config = autonomy_config + if deadline is not None: + goal.deadline = deadline + if main_agent_id is not None: + goal.main_agent_id = main_agent_id + + db.commit() + db.refresh(goal) + return goal + + +def delete_goal(db: Session, goal_id: str) -> None: + """删除目标及其所有任务""" + goal = get_goal(db, goal_id) + db.query(Task).filter(Task.goal_id == goal_id).delete() + db.delete(goal) + db.commit() + logger.info(f"Goal deleted: {goal_id}") + + +def update_goal_progress(db: Session, goal_id: str) -> Goal: + """重新计算并更新目标进度""" + goal = get_goal(db, goal_id) + tasks = db.query(Task).filter(Task.goal_id == goal_id, Task.parent_task_id.is_(None)).all() + + if not tasks: + goal.progress = 0.0 + else: + completed_count = sum(1 for t in tasks if t.status == "completed") + goal.progress = round(completed_count / len(tasks), 4) + + db.commit() + db.refresh(goal) + return goal + + +# ──────────────────────────── Task CRUD ──────────────────────────── + +def create_task( + db: Session, + goal_id: str, + title: str, + description: str = "", + priority: int = 5, + parent_task_id: Optional[str] = None, + depends_on: Optional[List[str]] = None, + assigned_agent_id: Optional[str] = None, + assigned_agent_name: Optional[str] = None, + task_config: Optional[Dict[str, Any]] = None, + deadline: Optional[datetime] = None, + requires_approval: bool = False, + approver_id: Optional[str] = None, +) -> Task: + """创建任务""" + # 验证 goal 存在 + get_goal(db, goal_id) + + # 验证 depends_on 中的任务存在 + if depends_on: + for dep_id in depends_on: + dep_task = db.query(Task).filter(Task.id == dep_id).first() + if not dep_task: + raise NotFoundError("前置依赖任务", dep_id) + + task = Task( + id=str(uuid.uuid4()), + goal_id=goal_id, + title=title, + description=description or "", + priority=priority, + parent_task_id=parent_task_id, + depends_on=depends_on or [], + assigned_agent_id=assigned_agent_id, + assigned_agent_name=assigned_agent_name, + task_config=task_config or {}, + deadline=deadline, + requires_approval=requires_approval, + approver_id=approver_id, + status="pending", + ) + db.add(task) + db.commit() + db.refresh(task) + logger.info(f"Task created: {task.id} - {task.title} (goal={goal_id})") + return task + + +def get_task(db: Session, task_id: str) -> Task: + """获取单个任务""" + task = db.query(Task).filter(Task.id == task_id).first() + if not task: + raise NotFoundError("任务", task_id) + return task + + +def list_tasks( + db: Session, + goal_id: Optional[str] = None, + status: Optional[str] = None, + assigned_agent_id: Optional[str] = None, + parent_task_id: Optional[str] = None, + skip: int = 0, + limit: int = 50, +) -> List[Task]: + """列出任务(支持多条件筛选和分页)""" + q = db.query(Task) + if goal_id: + q = q.filter(Task.goal_id == goal_id) + if status: + q = q.filter(Task.status == status) + if assigned_agent_id: + q = q.filter(Task.assigned_agent_id == assigned_agent_id) + if parent_task_id is not None: + q = q.filter(Task.parent_task_id == parent_task_id) + return q.order_by(Task.created_at.asc()).offset(skip).limit(limit).all() + + +def update_task( + db: Session, + task_id: str, + title: Optional[str] = None, + description: Optional[str] = None, + status: Optional[str] = None, + priority: Optional[int] = None, + task_config: Optional[Dict[str, Any]] = None, + depends_on: Optional[List[str]] = None, + assigned_agent_id: Optional[str] = None, + assigned_agent_name: Optional[str] = None, + result: Optional[Dict[str, Any]] = None, + error_message: Optional[str] = None, + execution_id: Optional[str] = None, + deadline: Optional[datetime] = None, +) -> Task: + """更新任务""" + task = get_task(db, task_id) + + if title is not None: + task.title = title + if description is not None: + task.description = description + if status is not None: + valid_statuses = {"pending", "in_progress", "awaiting_approval", "completed", "failed", "cancelled"} + if status not in valid_statuses: + raise ValidationError(f"无效的任务状态: {status},有效值: {valid_statuses}") + task.status = status + if status == "in_progress" and task.started_at is None: + task.started_at = datetime.now() + elif status in ("completed", "failed", "cancelled"): + task.completed_at = datetime.now() + if priority is not None: + if not 1 <= priority <= 10: + raise ValidationError("优先级必须在 1-10 之间") + task.priority = priority + if task_config is not None: + task.task_config = task_config + if depends_on is not None: + task.depends_on = depends_on + if assigned_agent_id is not None: + task.assigned_agent_id = assigned_agent_id + if assigned_agent_name is not None: + task.assigned_agent_name = assigned_agent_name + if result is not None: + task.result = result + if error_message is not None: + task.error_message = error_message + if execution_id is not None: + task.execution_id = execution_id + if deadline is not None: + task.deadline = deadline + + db.commit() + db.refresh(task) + + # 任务状态变更时同步更新 Goal 进度 + if status is not None: + update_goal_progress(db, task.goal_id) + + return task + + +def delete_task(db: Session, task_id: str) -> None: + """删除任务及其子任务""" + task = get_task(db, task_id) + goal_id = task.goal_id + # 递归删除子任务 + db.query(Task).filter(Task.parent_task_id == task_id).delete() + db.delete(task) + db.commit() + # 更新进度 + update_goal_progress(db, goal_id) + logger.info(f"Task deleted: {task_id}") + + +def get_task_dependencies_met(db: Session, task_id: str) -> bool: + """检查任务的所有前置依赖是否已完成""" + task = get_task(db, task_id) + if not task.depends_on: + return True + for dep_id in task.depends_on: + dep_task = db.query(Task).filter(Task.id == dep_id).first() + if not dep_task or dep_task.status != "completed": + return False + return True + + +def get_goal_task_tree(db: Session, goal_id: str) -> Dict[str, Any]: + """获取目标的任务树结构""" + goal = get_goal(db, goal_id) + tasks = list_tasks(db, goal_id=goal_id, limit=500) + + # 构建任务树 + task_map = {} + for t in tasks: + task_map[t.id] = { + "id": t.id, + "title": t.title, + "description": t.description, + "status": t.status, + "priority": t.priority, + "depends_on": t.depends_on, + "assigned_agent_id": t.assigned_agent_id, + "assigned_agent_name": t.assigned_agent_name, + "result": t.result, + "error_message": t.error_message, + "requires_approval": t.requires_approval, + "approval_status": t.approval_status, + "deadline": t.deadline.isoformat() if t.deadline else None, + "started_at": t.started_at.isoformat() if t.started_at else None, + "completed_at": t.completed_at.isoformat() if t.completed_at else None, + "children": [], + } + + # 建立父子关系 + roots = [] + for t in tasks: + node = task_map[t.id] + if t.parent_task_id and t.parent_task_id in task_map: + task_map[t.parent_task_id]["children"].append(node) + else: + roots.append(node) + + return { + "goal": { + "id": goal.id, + "title": goal.title, + "description": goal.description, + "status": goal.status, + "priority": goal.priority, + "progress": goal.progress, + "plan": goal.plan, + "autonomy_config": goal.autonomy_config, + "main_agent_id": goal.main_agent_id, + "deadline": goal.deadline.isoformat() if goal.deadline else None, + "started_at": goal.started_at.isoformat() if goal.started_at else None, + "completed_at": goal.completed_at.isoformat() if goal.completed_at else None, + "created_at": goal.created_at.isoformat() if goal.created_at else None, + }, + "tasks": roots, + }