""" Goal API — 目标管理接口 """ from fastapi import APIRouter, Depends, Query, HTTPException from sqlalchemy.orm import Session from pydantic import BaseModel, Field from typing import List, Optional, Dict, Any from datetime import datetime import logging from app.core.database import get_db from app.api.auth import get_current_user from app.models.user import User from app.services import goal_service logger = logging.getLogger(__name__) router = APIRouter( prefix="/api/v1/goals", tags=["goals"], responses={ 401: {"description": "未授权"}, 404: {"description": "资源不存在"}, 400: {"description": "请求参数错误"}, }, ) # ──────────────────────────── Schemas ──────────────────────────── class GoalCreate(BaseModel): title: str description: str = "" priority: int = Field(default=5, ge=1, le=10) deadline: Optional[datetime] = None main_agent_id: Optional[str] = None parent_goal_id: Optional[str] = None autonomy_config: Optional[Dict[str, Any]] = None class GoalUpdate(BaseModel): title: Optional[str] = None description: Optional[str] = None status: Optional[str] = None priority: Optional[int] = Field(default=None, ge=1, le=10) plan: Optional[List[Dict[str, Any]]] = None autonomy_config: Optional[Dict[str, Any]] = None deadline: Optional[datetime] = None main_agent_id: Optional[str] = None class GoalResponse(BaseModel): id: str title: str description: Optional[str] status: str priority: int progress: float plan: Optional[Any] autonomy_config: Optional[Any] creator_id: str main_agent_id: Optional[str] parent_goal_id: Optional[str] started_at: Optional[datetime] completed_at: Optional[datetime] deadline: Optional[datetime] created_at: datetime updated_at: datetime class Config: from_attributes = True class GoalTaskTreeResponse(BaseModel): goal: Dict[str, Any] tasks: List[Dict[str, Any]] # ──────────────────────────── Endpoints ──────────────────────────── @router.post("", response_model=GoalResponse, status_code=201) def create_goal( data: GoalCreate, current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ): """创建新目标""" return goal_service.create_goal( db=db, creator_id=current_user.id, title=data.title, description=data.description, priority=data.priority, deadline=data.deadline, main_agent_id=data.main_agent_id, parent_goal_id=data.parent_goal_id, autonomy_config=data.autonomy_config, ) @router.get("", response_model=List[GoalResponse]) def list_goals( status: Optional[str] = None, skip: int = Query(default=0, ge=0), limit: int = Query(default=20, ge=1, le=100), current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ): """列出目标""" return goal_service.list_goals( db=db, creator_id=current_user.id, status=status, skip=skip, limit=limit, ) @router.get("/{goal_id}", response_model=GoalResponse) def get_goal( goal_id: str, current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ): """获取目标详情""" return goal_service.get_goal(db, goal_id) @router.put("/{goal_id}", response_model=GoalResponse) def update_goal( goal_id: str, data: GoalUpdate, current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ): """更新目标""" return goal_service.update_goal( db=db, goal_id=goal_id, title=data.title, description=data.description, status=data.status, priority=data.priority, plan=data.plan, autonomy_config=data.autonomy_config, deadline=data.deadline, main_agent_id=data.main_agent_id, ) @router.delete("/{goal_id}", status_code=204) def delete_goal( goal_id: str, current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ): """删除目标""" goal_service.delete_goal(db, goal_id) return None @router.post("/{goal_id}/start", response_model=GoalResponse) def start_goal( goal_id: str, current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ): """启动目标执行(同步创建自主循环调度)""" goal = goal_service.update_goal(db, goal_id, status="active") goal.started_at = datetime.now() db.commit() db.refresh(goal) from app.services.agent_schedule_service import sync_autonomy_schedule_for_goal sync_autonomy_schedule_for_goal(db, goal_id) logger.info(f"Goal started: {goal_id}") return goal @router.post("/{goal_id}/pause", response_model=GoalResponse) def pause_goal( goal_id: str, current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ): """暂停目标执行(移除自主循环调度)""" from app.services.agent_schedule_service import sync_autonomy_schedule_for_goal result = goal_service.update_goal(db, goal_id, status="paused") sync_autonomy_schedule_for_goal(db, goal_id) return result @router.post("/{goal_id}/resume", response_model=GoalResponse) def resume_goal( goal_id: str, current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ): """恢复目标执行(重新创建自主循环调度)""" from app.services.agent_schedule_service import sync_autonomy_schedule_for_goal result = goal_service.update_goal(db, goal_id, status="active") sync_autonomy_schedule_for_goal(db, goal_id) return result @router.get("/{goal_id}/tasks", response_model=GoalTaskTreeResponse) def get_goal_task_tree( goal_id: str, current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ): """获取目标的任务树""" return goal_service.get_goal_task_tree(db, goal_id) class DecomposeResponse(BaseModel): goal_id: str task_count: int message: str @router.post("/{goal_id}/decompose", response_model=DecomposeResponse) def decompose_goal( goal_id: str, current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ): """触发目标分解(同步调用 LLM 拆解为 Task 树)""" import asyncio from app.services.main_agent_service import MainAgentService service = MainAgentService(db) try: # 先验证 Goal 存在 goal_service.get_goal(db, goal_id) goal = asyncio.run(service.decompose_goal(goal_id)) tasks = goal_service.list_tasks(db, goal_id=goal_id, limit=200) return { "goal_id": goal_id, "task_count": len(tasks), "message": f"目标已分解为 {len(tasks)} 个任务", } except Exception as e: logger.error(f"Goal decompose failed: {e}", exc_info=True) raise HTTPException(status_code=500, detail=f"目标分解失败: {e}") class ExecuteAsyncResponse(BaseModel): goal_id: str celery_task_id: Optional[str] message: str @router.post("/{goal_id}/execute-async", response_model=ExecuteAsyncResponse) def execute_goal_async( goal_id: str, current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ): """异步执行目标(通过 Celery Worker 执行,适合长时间运行的目标)""" from app.tasks.goal_tasks import execute_goal_task goal_service.get_goal(db, goal_id) goal_service.update_goal(db, goal_id, status="active") task = execute_goal_task.delay(goal_id) logger.info(f"Goal {goal_id} dispatched to Celery: {task.id}") return { "goal_id": goal_id, "celery_task_id": task.id, "message": "目标已提交异步执行,可通过 goal 状态和 task 列表追踪进度", } @router.post("/{goal_id}/replan", response_model=DecomposeResponse) def replan_goal( goal_id: str, current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ): """重新规划目标(清除未完成任务后重新分解)""" import asyncio from app.services.main_agent_service import MainAgentService from app.models.task import Task goal = goal_service.get_goal(db, goal_id) # 清除 pending 和 failed 任务(保留已完成的) db.query(Task).filter( Task.goal_id == goal_id, Task.status.in_(["pending", "failed", "cancelled"]), ).delete() db.commit() service = MainAgentService(db) goal = asyncio.run(service.decompose_goal(goal_id)) tasks = goal_service.list_tasks(db, goal_id=goal_id, limit=200) return { "goal_id": goal_id, "task_count": len(tasks), "message": f"目标已重新规划,生成 {len(tasks)} 个新任务", }