feat: add Goal/Task data models, service layer, and API routes (Phase 1)

Main Agent 数字员工工厂基础设施:
- 新增 Goal 和 Task SQLAlchemy 数据模型
- Agent 模型新增 agent_type / input_schema / output_schema
- Execution 模型新增 goal_id 关联
- 新增 Goal/Task CRUD 服务层(含依赖检查、任务树、进度计算)
- 新增 /api/v1/goals (9端点) + /api/v1/tasks (8端点)
- 数据库迁移 013_add_goals_tasks

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
renjianbo
2026-05-08 19:50:16 +08:00
parent 10ee7ee625
commit 02d7cf8f62
11 changed files with 1017 additions and 2 deletions

View File

@@ -0,0 +1,81 @@
"""add goals and tasks tables, agent_type, goal_id
Revision ID: 013_add_goals_tasks
Revises: 012_schedule_fk_set_null
Create Date: 2026-05-07
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.mysql import CHAR
revision = "013_add_goals_tasks"
down_revision = "012_schedule_fk_set_null"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ── goals 表 ──
op.create_table(
"goals",
sa.Column("id", CHAR(36), primary_key=True),
sa.Column("title", sa.String(500), nullable=False),
sa.Column("description", sa.Text),
sa.Column("status", sa.String(20), default="active"),
sa.Column("priority", sa.Integer, default=5),
sa.Column("progress", sa.Float, default=0.0),
sa.Column("plan", sa.JSON),
sa.Column("autonomy_config", sa.JSON),
sa.Column("creator_id", CHAR(36), sa.ForeignKey("users.id"), nullable=False),
sa.Column("main_agent_id", CHAR(36), sa.ForeignKey("agents.id"), nullable=True),
sa.Column("parent_goal_id", CHAR(36), sa.ForeignKey("goals.id"), nullable=True),
sa.Column("started_at", sa.DateTime),
sa.Column("completed_at", sa.DateTime),
sa.Column("deadline", sa.DateTime),
sa.Column("created_at", sa.DateTime, server_default=sa.func.now()),
sa.Column("updated_at", sa.DateTime, server_default=sa.func.now(), onupdate=sa.func.now()),
)
# ── tasks 表 ──
op.create_table(
"tasks",
sa.Column("id", CHAR(36), primary_key=True),
sa.Column("goal_id", CHAR(36), sa.ForeignKey("goals.id"), nullable=False),
sa.Column("title", sa.String(500), nullable=False),
sa.Column("description", sa.Text),
sa.Column("status", sa.String(20), default="pending"),
sa.Column("priority", sa.Integer, default=5),
sa.Column("task_config", sa.JSON),
sa.Column("parent_task_id", CHAR(36), sa.ForeignKey("tasks.id"), nullable=True),
sa.Column("depends_on", sa.JSON, default=list),
sa.Column("result", sa.JSON),
sa.Column("error_message", sa.Text),
sa.Column("execution_id", CHAR(36), sa.ForeignKey("executions.id"), nullable=True),
sa.Column("assigned_agent_id", CHAR(36), sa.ForeignKey("agents.id"), nullable=True),
sa.Column("assigned_agent_name", sa.String(200)),
sa.Column("requires_approval", sa.Boolean, default=False),
sa.Column("approver_id", CHAR(36), sa.ForeignKey("users.id"), nullable=True),
sa.Column("approval_status", sa.String(20)),
sa.Column("started_at", sa.DateTime),
sa.Column("completed_at", sa.DateTime),
sa.Column("deadline", sa.DateTime),
sa.Column("created_at", sa.DateTime, server_default=sa.func.now()),
sa.Column("updated_at", sa.DateTime, server_default=sa.func.now(), onupdate=sa.func.now()),
)
# ── agents 表新增字段 ──
op.add_column("agents", sa.Column("agent_type", sa.String(20), default="specialist"))
op.add_column("agents", sa.Column("input_schema", sa.JSON, nullable=True))
op.add_column("agents", sa.Column("output_schema", sa.JSON, nullable=True))
# ── executions 表新增 goal_id ──
op.add_column("executions", sa.Column("goal_id", CHAR(36), sa.ForeignKey("goals.id"), nullable=True))
def downgrade() -> None:
op.drop_column("executions", "goal_id")
op.drop_column("agents", "output_schema")
op.drop_column("agents", "input_schema")
op.drop_column("agents", "agent_type")
op.drop_table("tasks")
op.drop_table("goals")

204
backend/app/api/goals.py Normal file
View File

@@ -0,0 +1,204 @@
"""
Goal API — 目标管理接口
"""
from fastapi import APIRouter, Depends, Query
from sqlalchemy.orm import Session
from pydantic import BaseModel, Field
from typing import List, Optional, Dict, Any
from datetime import datetime
import logging
from app.core.database import get_db
from app.api.auth import get_current_user
from app.models.user import User
from app.services import goal_service
logger = logging.getLogger(__name__)
router = APIRouter(
prefix="/api/v1/goals",
tags=["goals"],
responses={
401: {"description": "未授权"},
404: {"description": "资源不存在"},
400: {"description": "请求参数错误"},
},
)
# ──────────────────────────── Schemas ────────────────────────────
class GoalCreate(BaseModel):
title: str
description: str = ""
priority: int = Field(default=5, ge=1, le=10)
deadline: Optional[datetime] = None
main_agent_id: Optional[str] = None
parent_goal_id: Optional[str] = None
autonomy_config: Optional[Dict[str, Any]] = None
class GoalUpdate(BaseModel):
title: Optional[str] = None
description: Optional[str] = None
status: Optional[str] = None
priority: Optional[int] = Field(default=None, ge=1, le=10)
plan: Optional[List[Dict[str, Any]]] = None
autonomy_config: Optional[Dict[str, Any]] = None
deadline: Optional[datetime] = None
main_agent_id: Optional[str] = None
class GoalResponse(BaseModel):
id: str
title: str
description: Optional[str]
status: str
priority: int
progress: float
plan: Optional[Any]
autonomy_config: Optional[Any]
creator_id: str
main_agent_id: Optional[str]
parent_goal_id: Optional[str]
started_at: Optional[datetime]
completed_at: Optional[datetime]
deadline: Optional[datetime]
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
class GoalTaskTreeResponse(BaseModel):
goal: Dict[str, Any]
tasks: List[Dict[str, Any]]
# ──────────────────────────── Endpoints ────────────────────────────
@router.post("", response_model=GoalResponse, status_code=201)
def create_goal(
data: GoalCreate,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""创建新目标"""
return goal_service.create_goal(
db=db,
creator_id=current_user.id,
title=data.title,
description=data.description,
priority=data.priority,
deadline=data.deadline,
main_agent_id=data.main_agent_id,
parent_goal_id=data.parent_goal_id,
autonomy_config=data.autonomy_config,
)
@router.get("", response_model=List[GoalResponse])
def list_goals(
status: Optional[str] = None,
skip: int = Query(default=0, ge=0),
limit: int = Query(default=20, ge=1, le=100),
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""列出目标"""
return goal_service.list_goals(
db=db,
creator_id=current_user.id,
status=status,
skip=skip,
limit=limit,
)
@router.get("/{goal_id}", response_model=GoalResponse)
def get_goal(
goal_id: str,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""获取目标详情"""
return goal_service.get_goal(db, goal_id)
@router.put("/{goal_id}", response_model=GoalResponse)
def update_goal(
goal_id: str,
data: GoalUpdate,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""更新目标"""
return goal_service.update_goal(
db=db,
goal_id=goal_id,
title=data.title,
description=data.description,
status=data.status,
priority=data.priority,
plan=data.plan,
autonomy_config=data.autonomy_config,
deadline=data.deadline,
main_agent_id=data.main_agent_id,
)
@router.delete("/{goal_id}", status_code=204)
def delete_goal(
goal_id: str,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""删除目标"""
goal_service.delete_goal(db, goal_id)
return None
@router.post("/{goal_id}/start", response_model=GoalResponse)
def start_goal(
goal_id: str,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""启动目标执行"""
goal = goal_service.update_goal(db, goal_id, status="active")
goal.started_at = datetime.now()
db.commit()
db.refresh(goal)
logger.info(f"Goal started: {goal_id}")
return goal
@router.post("/{goal_id}/pause", response_model=GoalResponse)
def pause_goal(
goal_id: str,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""暂停目标执行"""
return goal_service.update_goal(db, goal_id, status="paused")
@router.post("/{goal_id}/resume", response_model=GoalResponse)
def resume_goal(
goal_id: str,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""恢复目标执行"""
return goal_service.update_goal(db, goal_id, status="active")
@router.get("/{goal_id}/tasks", response_model=GoalTaskTreeResponse)
def get_goal_task_tree(
goal_id: str,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""获取目标的任务树"""
return goal_service.get_goal_task_tree(db, goal_id)

240
backend/app/api/tasks.py Normal file
View File

@@ -0,0 +1,240 @@
"""
Task API — 任务管理接口
"""
from fastapi import APIRouter, Depends, Query
from sqlalchemy.orm import Session
from pydantic import BaseModel, Field
from typing import List, Optional, Dict, Any
from datetime import datetime
import logging
from app.core.database import get_db
from app.api.auth import get_current_user
from app.models.user import User
from app.services import goal_service
logger = logging.getLogger(__name__)
router = APIRouter(
prefix="/api/v1/tasks",
tags=["tasks"],
responses={
401: {"description": "未授权"},
404: {"description": "资源不存在"},
400: {"description": "请求参数错误"},
},
)
# ──────────────────────────── Schemas ────────────────────────────
class TaskCreate(BaseModel):
goal_id: str
title: str
description: str = ""
priority: int = Field(default=5, ge=1, le=10)
parent_task_id: Optional[str] = None
depends_on: Optional[List[str]] = None
assigned_agent_id: Optional[str] = None
assigned_agent_name: Optional[str] = None
task_config: Optional[Dict[str, Any]] = None
deadline: Optional[datetime] = None
requires_approval: bool = False
approver_id: Optional[str] = None
class TaskUpdate(BaseModel):
title: Optional[str] = None
description: Optional[str] = None
status: Optional[str] = None
priority: Optional[int] = Field(default=None, ge=1, le=10)
task_config: Optional[Dict[str, Any]] = None
depends_on: Optional[List[str]] = None
assigned_agent_id: Optional[str] = None
assigned_agent_name: Optional[str] = None
result: Optional[Dict[str, Any]] = None
error_message: Optional[str] = None
execution_id: Optional[str] = None
deadline: Optional[datetime] = None
class TaskResponse(BaseModel):
id: str
goal_id: str
title: str
description: Optional[str]
status: str
priority: int
task_config: Optional[Any]
parent_task_id: Optional[str]
depends_on: Optional[Any]
result: Optional[Any]
error_message: Optional[str]
execution_id: Optional[str]
assigned_agent_id: Optional[str]
assigned_agent_name: Optional[str]
requires_approval: bool
approver_id: Optional[str]
approval_status: Optional[str]
started_at: Optional[datetime]
completed_at: Optional[datetime]
deadline: Optional[datetime]
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
class TaskDependencyCheck(BaseModel):
task_id: str
dependencies_met: bool
pending_dependencies: List[str] = []
# ──────────────────────────── Endpoints ────────────────────────────
@router.post("", response_model=TaskResponse, status_code=201)
def create_task(
data: TaskCreate,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""创建新任务"""
return goal_service.create_task(
db=db,
goal_id=data.goal_id,
title=data.title,
description=data.description,
priority=data.priority,
parent_task_id=data.parent_task_id,
depends_on=data.depends_on,
assigned_agent_id=data.assigned_agent_id,
assigned_agent_name=data.assigned_agent_name,
task_config=data.task_config,
deadline=data.deadline,
requires_approval=data.requires_approval,
approver_id=data.approver_id,
)
@router.get("", response_model=List[TaskResponse])
def list_tasks(
goal_id: Optional[str] = None,
status: Optional[str] = None,
assigned_agent_id: Optional[str] = None,
parent_task_id: Optional[str] = None,
skip: int = Query(default=0, ge=0),
limit: int = Query(default=50, ge=1, le=200),
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""列出任务"""
return goal_service.list_tasks(
db=db,
goal_id=goal_id,
status=status,
assigned_agent_id=assigned_agent_id,
parent_task_id=parent_task_id,
skip=skip,
limit=limit,
)
@router.get("/{task_id}", response_model=TaskResponse)
def get_task(
task_id: str,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""获取任务详情"""
return goal_service.get_task(db, task_id)
@router.put("/{task_id}", response_model=TaskResponse)
def update_task(
task_id: str,
data: TaskUpdate,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""更新任务"""
return goal_service.update_task(
db=db,
task_id=task_id,
title=data.title,
description=data.description,
status=data.status,
priority=data.priority,
task_config=data.task_config,
depends_on=data.depends_on,
assigned_agent_id=data.assigned_agent_id,
assigned_agent_name=data.assigned_agent_name,
result=data.result,
error_message=data.error_message,
execution_id=data.execution_id,
deadline=data.deadline,
)
@router.delete("/{task_id}", status_code=204)
def delete_task(
task_id: str,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""删除任务"""
goal_service.delete_task(db, task_id)
return None
@router.get("/{task_id}/check-dependencies", response_model=TaskDependencyCheck)
def check_task_dependencies(
task_id: str,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""检查任务的前置依赖是否满足"""
met = goal_service.get_task_dependencies_met(db, task_id)
task = goal_service.get_task(db, task_id)
pending = []
if not met:
for dep_id in (task.depends_on or []):
from app.models.task import Task
dep = db.query(Task).filter(Task.id == dep_id).first()
if dep and dep.status != "completed":
pending.append(dep_id)
return TaskDependencyCheck(
task_id=task_id,
dependencies_met=met,
pending_dependencies=pending,
)
@router.post("/{task_id}/approve", response_model=TaskResponse)
def approve_task(
task_id: str,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""审批通过任务"""
return goal_service.update_task(
db=db,
task_id=task_id,
status="in_progress",
)
@router.post("/{task_id}/reject", response_model=TaskResponse)
def reject_task(
task_id: str,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""审批驳回任务"""
return goal_service.update_task(
db=db,
task_id=task_id,
status="failed",
error_message="审批驳回",
)

View File

@@ -54,4 +54,6 @@ def init_db():
import app.models.notification
import app.models.orchestration_template
import app.models.plugin
import app.models.goal
import app.models.task
Base.metadata.create_all(bind=engine)

View File

@@ -85,6 +85,14 @@ Authorization: Bearer <your_token>
{
"name": "websocket",
"description": "WebSocket API用于实时推送执行状态。"
},
{
"name": "goals",
"description": "目标管理API — Main Agent 数字员工的目标 CRUD、启停、任务树查看。"
},
{
"name": "tasks",
"description": "任务管理API — 目标分解后的子任务 CRUD、依赖检查、审批。"
}
]
)
@@ -263,7 +271,7 @@ async def startup_event():
logger.error(f"人参果1号长连接启动失败: {e}")
# 注册路由
from app.api import auth, uploads, workflows, executions, websocket, execution_logs, data_sources, agents, platform_templates, model_configs, webhooks, template_market, batch_operations, collaboration, permissions, monitoring, alert_rules, node_test, node_templates, tools, agent_chat, agent_monitoring, knowledge_base, agent_schedules, notifications, feishu_bind, approval, orchestration_templates, plugins, agent_market
from app.api import auth, uploads, workflows, executions, websocket, execution_logs, data_sources, agents, platform_templates, model_configs, webhooks, template_market, batch_operations, collaboration, permissions, monitoring, alert_rules, node_test, node_templates, tools, agent_chat, agent_monitoring, knowledge_base, agent_schedules, notifications, feishu_bind, approval, orchestration_templates, plugins, agent_market, goals, tasks
app.include_router(auth.router)
app.include_router(uploads.router)
@@ -295,6 +303,8 @@ app.include_router(approval.router)
app.include_router(plugins.router)
app.include_router(orchestration_templates.router)
app.include_router(agent_market.router)
app.include_router(goals.router)
app.include_router(tasks.router)
if __name__ == "__main__":
import uvicorn

View File

@@ -21,5 +21,7 @@ from app.models.notification import Notification
from app.models.user_feishu_open_id import UserFeishuOpenId
from app.models.plugin import NodePlugin
from app.models.orchestration_template import OrchestrationTemplate
from app.models.goal import Goal
from app.models.task import Task
__all__ = ["User", "Workflow", "WorkflowVersion", "Agent", "GlobalKnowledge", "AgentRating", "AgentFavorite", "Execution", "ExecutionLog", "ModelConfig", "DataSource", "WorkflowTemplate", "TemplateRating", "TemplateFavorite", "NodeTemplate", "Role", "Permission", "WorkflowPermission", "AgentPermission", "AlertRule", "AlertLog", "PersistentUserMemory", "AgentLLMLog", "AgentVectorMemory", "AgentLearningPattern", "AgentSchedule", "KnowledgeBase", "Document", "DocumentChunk", "Notification", "UserFeishuOpenId", "NodePlugin", "OrchestrationTemplate"]
__all__ = ["User", "Workflow", "WorkflowVersion", "Agent", "GlobalKnowledge", "AgentRating", "AgentFavorite", "Execution", "ExecutionLog", "ModelConfig", "DataSource", "WorkflowTemplate", "TemplateRating", "TemplateFavorite", "NodeTemplate", "Role", "Permission", "WorkflowPermission", "AgentPermission", "AlertRule", "AlertLog", "PersistentUserMemory", "AgentLLMLog", "AgentVectorMemory", "AgentLearningPattern", "AgentSchedule", "KnowledgeBase", "Document", "DocumentChunk", "Notification", "UserFeishuOpenId", "NodePlugin", "OrchestrationTemplate", "Goal", "Task"]

View File

@@ -16,11 +16,14 @@ class Agent(Base):
name = Column(String(100), nullable=False, comment="智能体名称")
description = Column(Text, comment="描述")
workflow_config = Column(JSON, nullable=False, comment="工作流配置")
agent_type = Column(String(20), default="specialist", comment="Agent类型: main/specialist")
budget_config = Column(
JSON,
nullable=True,
comment="执行预算max_steps/max_llm_invocations/max_tool_calls可选覆盖全局默认",
)
input_schema = Column(JSON, nullable=True, comment="输入数据类型约束JSON Schema")
output_schema = Column(JSON, nullable=True, comment="输出数据类型约束JSON Schema")
version = Column(Integer, default=1, comment="版本号")
status = Column(String(20), default="draft", comment="状态: draft/published/running/stopped")
user_id = Column(CHAR(36), ForeignKey("users.id"), comment="创建者ID")

View File

@@ -33,6 +33,7 @@ class Execution(Base):
)
depth = Column(Integer, default=0, nullable=False, comment="执行深度根为0")
pause_state = Column(JSON, nullable=True, comment="挂起快照(审批节点 HITL恢复时消费")
goal_id = Column(CHAR(36), ForeignKey("goals.id"), nullable=True, comment="关联目标ID")
created_at = Column(DateTime, default=func.now(), comment="创建时间")
# 关系

View File

@@ -0,0 +1,45 @@
"""
目标模型 — Main Agent 管理的顶层目标
"""
from sqlalchemy import Column, String, Text, Integer, Float, DateTime, JSON, ForeignKey, func
from sqlalchemy.dialects.mysql import CHAR
from sqlalchemy.orm import relationship
from app.core.database import Base
import uuid
class Goal(Base):
"""目标表 — Main Agent 数字员工的顶层工作目标"""
__tablename__ = "goals"
id = Column(CHAR(36), primary_key=True, default=lambda: str(uuid.uuid4()), comment="目标ID")
title = Column(String(500), nullable=False, comment="目标标题")
description = Column(Text, comment="目标描述(自然语言)")
status = Column(String(20), default="active", comment="状态: active/paused/completed/failed/cancelled")
priority = Column(Integer, default=5, comment="优先级 1-10")
progress = Column(Float, default=0.0, comment="完成进度 0.0 - 1.0")
# Main Agent 分解后的结构化计划
plan = Column(JSON, comment="执行计划: [{phase, tasks:[], assigned_agent_id, deadline}]")
# 自主循环配置
autonomy_config = Column(JSON, comment="自主循环配置: {check_interval_minutes, max_idle_hours, auto_replan, notify_on_progress}")
# 关联
creator_id = Column(CHAR(36), ForeignKey("users.id"), nullable=False, comment="创建者ID")
main_agent_id = Column(CHAR(36), ForeignKey("agents.id"), nullable=True, comment="管理此目标的 Main Agent ID")
parent_goal_id = Column(CHAR(36), ForeignKey("goals.id"), nullable=True, comment="父目标ID支持目标嵌套")
# 时间
started_at = Column(DateTime, comment="开始时间")
completed_at = Column(DateTime, comment="完成时间")
deadline = Column(DateTime, comment="截止时间")
created_at = Column(DateTime, default=func.now(), comment="创建时间")
updated_at = Column(DateTime, default=func.now(), onupdate=func.now(), comment="更新时间")
# 关系
creator = relationship("User", backref="goals")
main_agent = relationship("Agent", backref="managed_goals")
def __repr__(self):
return f"<Goal(id={self.id}, title={self.title}, status={self.status})>"

View File

@@ -0,0 +1,59 @@
"""
任务模型 — Goal 拆解的子任务
"""
from sqlalchemy import Column, String, Text, Integer, DateTime, JSON, Boolean, ForeignKey, func
from sqlalchemy.dialects.mysql import CHAR
from sqlalchemy.orm import relationship
from app.core.database import Base
import uuid
class Task(Base):
"""任务表 — Main Agent 将目标分解为可执行的子任务"""
__tablename__ = "tasks"
id = Column(CHAR(36), primary_key=True, default=lambda: str(uuid.uuid4()), comment="任务ID")
goal_id = Column(CHAR(36), ForeignKey("goals.id"), nullable=False, comment="所属目标ID")
title = Column(String(500), nullable=False, comment="任务标题")
description = Column(Text, comment="任务描述")
status = Column(
String(20), default="pending",
comment="状态: pending/in_progress/awaiting_approval/completed/failed/cancelled"
)
priority = Column(Integer, default=5, comment="优先级 1-10")
# 任务编排配置
task_config = Column(JSON, comment="编排配置: {orchestration_mode, agents:[], workflow_id, input_data}")
# 依赖关系
parent_task_id = Column(CHAR(36), ForeignKey("tasks.id"), nullable=True, comment="父任务ID")
depends_on = Column(JSON, default=list, comment="前置依赖任务ID列表")
# 执行结果
result = Column(JSON, comment="执行输出结果")
error_message = Column(Text, comment="错误信息")
execution_id = Column(CHAR(36), ForeignKey("executions.id"), nullable=True, comment="关联的执行记录ID")
# 分配
assigned_agent_id = Column(CHAR(36), ForeignKey("agents.id"), nullable=True, comment="分配的 Agent ID")
assigned_agent_name = Column(String(200), comment="分配的 Agent 名称(冗余便于展示)")
# 审批
requires_approval = Column(Boolean, default=False, comment="是否需要人工审批")
approver_id = Column(CHAR(36), ForeignKey("users.id"), nullable=True, comment="审批人ID")
approval_status = Column(String(20), comment="审批状态: pending/approved/rejected")
# 时间
started_at = Column(DateTime, comment="开始时间")
completed_at = Column(DateTime, comment="完成时间")
deadline = Column(DateTime, comment="截止时间")
created_at = Column(DateTime, default=func.now(), comment="创建时间")
updated_at = Column(DateTime, default=func.now(), onupdate=func.now(), comment="更新时间")
# 关系
goal = relationship("Goal", backref="tasks")
assigned_agent = relationship("Agent", backref="assigned_tasks")
execution = relationship("Execution", backref="task")
def __repr__(self):
return f"<Task(id={self.id}, title={self.title}, status={self.status})>"

View File

@@ -0,0 +1,368 @@
"""
Goal & Task 业务服务层
"""
from sqlalchemy.orm import Session
from typing import List, Optional, Dict, Any
from datetime import datetime
import uuid
import logging
from app.models.goal import Goal
from app.models.task import Task
from app.core.exceptions import NotFoundError, ValidationError
logger = logging.getLogger(__name__)
# ──────────────────────────── Goal CRUD ────────────────────────────
def create_goal(
db: Session,
creator_id: str,
title: str,
description: str = "",
priority: int = 5,
deadline: Optional[datetime] = None,
main_agent_id: Optional[str] = None,
parent_goal_id: Optional[str] = None,
autonomy_config: Optional[Dict[str, Any]] = None,
) -> Goal:
"""创建目标"""
goal = Goal(
id=str(uuid.uuid4()),
title=title,
description=description or "",
priority=priority,
deadline=deadline,
creator_id=creator_id,
main_agent_id=main_agent_id,
parent_goal_id=parent_goal_id,
autonomy_config=autonomy_config or {},
progress=0.0,
)
db.add(goal)
db.commit()
db.refresh(goal)
logger.info(f"Goal created: {goal.id} - {goal.title}")
return goal
def get_goal(db: Session, goal_id: str) -> Goal:
"""获取单个目标"""
goal = db.query(Goal).filter(Goal.id == goal_id).first()
if not goal:
raise NotFoundError("目标", goal_id)
return goal
def list_goals(
db: Session,
creator_id: Optional[str] = None,
status: Optional[str] = None,
skip: int = 0,
limit: int = 20,
) -> List[Goal]:
"""列出目标(支持筛选和分页)"""
q = db.query(Goal)
if creator_id:
q = q.filter(Goal.creator_id == creator_id)
if status:
q = q.filter(Goal.status == status)
return q.order_by(Goal.created_at.desc()).offset(skip).limit(limit).all()
def update_goal(
db: Session,
goal_id: str,
title: Optional[str] = None,
description: Optional[str] = None,
status: Optional[str] = None,
priority: Optional[int] = None,
plan: Optional[List[Dict[str, Any]]] = None,
autonomy_config: Optional[Dict[str, Any]] = None,
deadline: Optional[datetime] = None,
main_agent_id: Optional[str] = None,
) -> Goal:
"""更新目标"""
goal = get_goal(db, goal_id)
if title is not None:
goal.title = title
if description is not None:
goal.description = description
if status is not None:
valid_statuses = {"active", "paused", "completed", "failed", "cancelled"}
if status not in valid_statuses:
raise ValidationError(f"无效的状态: {status},有效值: {valid_statuses}")
goal.status = status
if status == "completed":
goal.completed_at = datetime.now()
goal.progress = 1.0
elif status == "failed" or status == "cancelled":
goal.completed_at = datetime.now()
if priority is not None:
if not 1 <= priority <= 10:
raise ValidationError("优先级必须在 1-10 之间")
goal.priority = priority
if plan is not None:
goal.plan = plan
if autonomy_config is not None:
goal.autonomy_config = autonomy_config
if deadline is not None:
goal.deadline = deadline
if main_agent_id is not None:
goal.main_agent_id = main_agent_id
db.commit()
db.refresh(goal)
return goal
def delete_goal(db: Session, goal_id: str) -> None:
"""删除目标及其所有任务"""
goal = get_goal(db, goal_id)
db.query(Task).filter(Task.goal_id == goal_id).delete()
db.delete(goal)
db.commit()
logger.info(f"Goal deleted: {goal_id}")
def update_goal_progress(db: Session, goal_id: str) -> Goal:
"""重新计算并更新目标进度"""
goal = get_goal(db, goal_id)
tasks = db.query(Task).filter(Task.goal_id == goal_id, Task.parent_task_id.is_(None)).all()
if not tasks:
goal.progress = 0.0
else:
completed_count = sum(1 for t in tasks if t.status == "completed")
goal.progress = round(completed_count / len(tasks), 4)
db.commit()
db.refresh(goal)
return goal
# ──────────────────────────── Task CRUD ────────────────────────────
def create_task(
db: Session,
goal_id: str,
title: str,
description: str = "",
priority: int = 5,
parent_task_id: Optional[str] = None,
depends_on: Optional[List[str]] = None,
assigned_agent_id: Optional[str] = None,
assigned_agent_name: Optional[str] = None,
task_config: Optional[Dict[str, Any]] = None,
deadline: Optional[datetime] = None,
requires_approval: bool = False,
approver_id: Optional[str] = None,
) -> Task:
"""创建任务"""
# 验证 goal 存在
get_goal(db, goal_id)
# 验证 depends_on 中的任务存在
if depends_on:
for dep_id in depends_on:
dep_task = db.query(Task).filter(Task.id == dep_id).first()
if not dep_task:
raise NotFoundError("前置依赖任务", dep_id)
task = Task(
id=str(uuid.uuid4()),
goal_id=goal_id,
title=title,
description=description or "",
priority=priority,
parent_task_id=parent_task_id,
depends_on=depends_on or [],
assigned_agent_id=assigned_agent_id,
assigned_agent_name=assigned_agent_name,
task_config=task_config or {},
deadline=deadline,
requires_approval=requires_approval,
approver_id=approver_id,
status="pending",
)
db.add(task)
db.commit()
db.refresh(task)
logger.info(f"Task created: {task.id} - {task.title} (goal={goal_id})")
return task
def get_task(db: Session, task_id: str) -> Task:
"""获取单个任务"""
task = db.query(Task).filter(Task.id == task_id).first()
if not task:
raise NotFoundError("任务", task_id)
return task
def list_tasks(
db: Session,
goal_id: Optional[str] = None,
status: Optional[str] = None,
assigned_agent_id: Optional[str] = None,
parent_task_id: Optional[str] = None,
skip: int = 0,
limit: int = 50,
) -> List[Task]:
"""列出任务(支持多条件筛选和分页)"""
q = db.query(Task)
if goal_id:
q = q.filter(Task.goal_id == goal_id)
if status:
q = q.filter(Task.status == status)
if assigned_agent_id:
q = q.filter(Task.assigned_agent_id == assigned_agent_id)
if parent_task_id is not None:
q = q.filter(Task.parent_task_id == parent_task_id)
return q.order_by(Task.created_at.asc()).offset(skip).limit(limit).all()
def update_task(
db: Session,
task_id: str,
title: Optional[str] = None,
description: Optional[str] = None,
status: Optional[str] = None,
priority: Optional[int] = None,
task_config: Optional[Dict[str, Any]] = None,
depends_on: Optional[List[str]] = None,
assigned_agent_id: Optional[str] = None,
assigned_agent_name: Optional[str] = None,
result: Optional[Dict[str, Any]] = None,
error_message: Optional[str] = None,
execution_id: Optional[str] = None,
deadline: Optional[datetime] = None,
) -> Task:
"""更新任务"""
task = get_task(db, task_id)
if title is not None:
task.title = title
if description is not None:
task.description = description
if status is not None:
valid_statuses = {"pending", "in_progress", "awaiting_approval", "completed", "failed", "cancelled"}
if status not in valid_statuses:
raise ValidationError(f"无效的任务状态: {status},有效值: {valid_statuses}")
task.status = status
if status == "in_progress" and task.started_at is None:
task.started_at = datetime.now()
elif status in ("completed", "failed", "cancelled"):
task.completed_at = datetime.now()
if priority is not None:
if not 1 <= priority <= 10:
raise ValidationError("优先级必须在 1-10 之间")
task.priority = priority
if task_config is not None:
task.task_config = task_config
if depends_on is not None:
task.depends_on = depends_on
if assigned_agent_id is not None:
task.assigned_agent_id = assigned_agent_id
if assigned_agent_name is not None:
task.assigned_agent_name = assigned_agent_name
if result is not None:
task.result = result
if error_message is not None:
task.error_message = error_message
if execution_id is not None:
task.execution_id = execution_id
if deadline is not None:
task.deadline = deadline
db.commit()
db.refresh(task)
# 任务状态变更时同步更新 Goal 进度
if status is not None:
update_goal_progress(db, task.goal_id)
return task
def delete_task(db: Session, task_id: str) -> None:
"""删除任务及其子任务"""
task = get_task(db, task_id)
goal_id = task.goal_id
# 递归删除子任务
db.query(Task).filter(Task.parent_task_id == task_id).delete()
db.delete(task)
db.commit()
# 更新进度
update_goal_progress(db, goal_id)
logger.info(f"Task deleted: {task_id}")
def get_task_dependencies_met(db: Session, task_id: str) -> bool:
"""检查任务的所有前置依赖是否已完成"""
task = get_task(db, task_id)
if not task.depends_on:
return True
for dep_id in task.depends_on:
dep_task = db.query(Task).filter(Task.id == dep_id).first()
if not dep_task or dep_task.status != "completed":
return False
return True
def get_goal_task_tree(db: Session, goal_id: str) -> Dict[str, Any]:
"""获取目标的任务树结构"""
goal = get_goal(db, goal_id)
tasks = list_tasks(db, goal_id=goal_id, limit=500)
# 构建任务树
task_map = {}
for t in tasks:
task_map[t.id] = {
"id": t.id,
"title": t.title,
"description": t.description,
"status": t.status,
"priority": t.priority,
"depends_on": t.depends_on,
"assigned_agent_id": t.assigned_agent_id,
"assigned_agent_name": t.assigned_agent_name,
"result": t.result,
"error_message": t.error_message,
"requires_approval": t.requires_approval,
"approval_status": t.approval_status,
"deadline": t.deadline.isoformat() if t.deadline else None,
"started_at": t.started_at.isoformat() if t.started_at else None,
"completed_at": t.completed_at.isoformat() if t.completed_at else None,
"children": [],
}
# 建立父子关系
roots = []
for t in tasks:
node = task_map[t.id]
if t.parent_task_id and t.parent_task_id in task_map:
task_map[t.parent_task_id]["children"].append(node)
else:
roots.append(node)
return {
"goal": {
"id": goal.id,
"title": goal.title,
"description": goal.description,
"status": goal.status,
"priority": goal.priority,
"progress": goal.progress,
"plan": goal.plan,
"autonomy_config": goal.autonomy_config,
"main_agent_id": goal.main_agent_id,
"deadline": goal.deadline.isoformat() if goal.deadline else None,
"started_at": goal.started_at.isoformat() if goal.started_at else None,
"completed_at": goal.completed_at.isoformat() if goal.completed_at else None,
"created_at": goal.created_at.isoformat() if goal.created_at else None,
},
"tasks": roots,
}