""" Task API — 任务管理接口 (含原子认领 + 依赖图 + Agent 状态) 参考 Claude Code src/utils/tasks.ts 的 API 设计 """ from fastapi import APIRouter, Depends, Query from sqlalchemy.orm import Session from pydantic import BaseModel, Field from typing import List, Optional, Dict, Any from datetime import datetime import logging from app.core.database import get_db from app.core.task_system import TaskSystem, ClaimResult, AgentState, AgentStatus from app.api.auth import get_current_user from app.models.user import User from app.services import goal_service logger = logging.getLogger(__name__) router = APIRouter( prefix="/api/v1/tasks", tags=["tasks"], responses={ 401: {"description": "未授权"}, 404: {"description": "资源不存在"}, 400: {"description": "请求参数错误"}, }, ) # ──────────────────────────── Schemas ──────────────────────────── class TaskCreate(BaseModel): goal_id: str title: str description: str = "" priority: int = Field(default=5, ge=1, le=10) parent_task_id: Optional[str] = None depends_on: Optional[List[str]] = None assigned_agent_id: Optional[str] = None assigned_agent_name: Optional[str] = None task_config: Optional[Dict[str, Any]] = None deadline: Optional[datetime] = None requires_approval: bool = False approver_id: Optional[str] = None class TaskUpdate(BaseModel): title: Optional[str] = None description: Optional[str] = None status: Optional[str] = None priority: Optional[int] = Field(default=None, ge=1, le=10) task_config: Optional[Dict[str, Any]] = None depends_on: Optional[List[str]] = None assigned_agent_id: Optional[str] = None assigned_agent_name: Optional[str] = None result: Optional[Dict[str, Any]] = None error_message: Optional[str] = None execution_id: Optional[str] = None deadline: Optional[datetime] = None class TaskResponse(BaseModel): id: str goal_id: str title: str description: Optional[str] status: str priority: int task_config: Optional[Any] parent_task_id: Optional[str] depends_on: Optional[Any] result: Optional[Any] error_message: Optional[str] execution_id: Optional[str] assigned_agent_id: Optional[str] assigned_agent_name: Optional[str] requires_approval: bool approver_id: Optional[str] approval_status: Optional[str] started_at: Optional[datetime] completed_at: Optional[datetime] deadline: Optional[datetime] created_at: datetime updated_at: datetime class Config: from_attributes = True class TaskDependencyCheck(BaseModel): task_id: str dependencies_met: bool pending_dependencies: List[str] = [] class ClaimTaskRequest(BaseModel): agent_id: str = Field(..., description="认领任务的 Agent 标识") check_busy: bool = Field(default=True, description="是否检查 Agent 忙碌状态") class ClaimTaskResponse(BaseModel): success: bool reason: Optional[str] = None task: Optional[TaskResponse] = None busy_with_tasks: List[str] = [] blocked_by_tasks: List[str] = [] class BlockTaskRequest(BaseModel): from_task_id: str = Field(..., description="阻塞方任务ID") to_task_id: str = Field(..., description="被阻塞方任务ID") class AgentStatusResponse(BaseModel): agent_id: str status: str # idle / busy current_tasks: List[str] = [] class ReleaseTaskRequest(BaseModel): agent_id: str = Field(..., description="释放任务的 Agent 标识") class TaskCompleteRequest(BaseModel): result: Optional[Dict[str, Any]] = None class TaskFailRequest(BaseModel): error_message: str = "" # ──────────────────────────── Endpoints ──────────────────────────── @router.post("", response_model=TaskResponse, status_code=201) def create_task( data: TaskCreate, current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ): """创建新任务""" return goal_service.create_task( db=db, goal_id=data.goal_id, title=data.title, description=data.description, priority=data.priority, parent_task_id=data.parent_task_id, depends_on=data.depends_on, assigned_agent_id=data.assigned_agent_id, assigned_agent_name=data.assigned_agent_name, task_config=data.task_config, deadline=data.deadline, requires_approval=data.requires_approval, approver_id=data.approver_id, ) @router.get("", response_model=List[TaskResponse]) def list_tasks( goal_id: Optional[str] = None, status: Optional[str] = None, assigned_agent_id: Optional[str] = None, parent_task_id: Optional[str] = None, skip: int = Query(default=0, ge=0), limit: int = Query(default=50, ge=1, le=200), current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ): """列出任务""" return goal_service.list_tasks( db=db, goal_id=goal_id, status=status, assigned_agent_id=assigned_agent_id, parent_task_id=parent_task_id, skip=skip, limit=limit, ) @router.get("/{task_id}", response_model=TaskResponse) def get_task( task_id: str, current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ): """获取任务详情""" return goal_service.get_task(db, task_id) @router.put("/{task_id}", response_model=TaskResponse) def update_task( task_id: str, data: TaskUpdate, current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ): """更新任务""" return goal_service.update_task( db=db, task_id=task_id, title=data.title, description=data.description, status=data.status, priority=data.priority, task_config=data.task_config, depends_on=data.depends_on, assigned_agent_id=data.assigned_agent_id, assigned_agent_name=data.assigned_agent_name, result=data.result, error_message=data.error_message, execution_id=data.execution_id, deadline=data.deadline, ) @router.delete("/{task_id}", status_code=204) def delete_task( task_id: str, current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ): """删除任务""" goal_service.delete_task(db, task_id) return None @router.get("/{task_id}/check-dependencies", response_model=TaskDependencyCheck) def check_task_dependencies( task_id: str, current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ): """检查任务的前置依赖是否满足""" met = goal_service.get_task_dependencies_met(db, task_id) task = goal_service.get_task(db, task_id) pending = [] if not met: for dep_id in (task.depends_on or []): from app.models.task import Task dep = db.query(Task).filter(Task.id == dep_id).first() if dep and dep.status != "completed": pending.append(dep_id) return TaskDependencyCheck( task_id=task_id, dependencies_met=met, pending_dependencies=pending, ) @router.post("/{task_id}/approve", response_model=TaskResponse) def approve_task( task_id: str, current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ): """审批通过任务""" return goal_service.update_task( db=db, task_id=task_id, status="in_progress", ) @router.post("/{task_id}/reject", response_model=TaskResponse) def reject_task( task_id: str, current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ): """审批驳回任务""" return goal_service.update_task( db=db, task_id=task_id, status="failed", error_message="审批驳回", ) @router.post("/{task_id}/execute", response_model=TaskResponse) async def execute_task( task_id: str, current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ): """手动执行任务 — 通过 Main Agent 服务执行单个任务""" from app.services.main_agent_service import MainAgentService goal_service.update_task(db=db, task_id=task_id, status="in_progress") try: svc = MainAgentService(db) result = await svc.execute_task(task_id) goal_service.update_task( db=db, task_id=task_id, status="completed", result=result, ) except Exception as e: logger.error("任务 %s 执行失败: %s", task_id, e) goal_service.update_task( db=db, task_id=task_id, status="failed", error_message=str(e), ) return goal_service.get_task(db, task_id) @router.post("/{task_id}/retry", response_model=TaskResponse) async def retry_task( task_id: str, current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ): """重试失败任务""" task = goal_service.get_task(db, task_id) if task.status != "failed": from fastapi import HTTPException raise HTTPException(400, "只能重试失败状态的任务") goal_service.update_task( db=db, task_id=task_id, status="pending", error_message=None, result=None, ) from app.services.main_agent_service import MainAgentService goal_service.update_task(db=db, task_id=task_id, status="in_progress") try: svc = MainAgentService(db) result = await svc.execute_task(task_id) goal_service.update_task( db=db, task_id=task_id, status="completed", result=result, ) except Exception as e: logger.error("任务 %s 重试失败: %s", task_id, e) goal_service.update_task( db=db, task_id=task_id, status="failed", error_message=str(e), ) return goal_service.get_task(db, task_id) # ════════════════════ 任务系统增强 (参考 Claude Code task_system) ════════════════════ def _get_task_system(db: Session) -> TaskSystem: return TaskSystem(db) @router.post("/{task_id}/claim", response_model=ClaimTaskResponse) def claim_task( task_id: str, data: ClaimTaskRequest, current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ): """原子认领任务 (SELECT FOR UPDATE),检查依赖+Agent忙碌""" ts = _get_task_system(db) result = ts.claim_task(task_id=task_id, agent_id=data.agent_id, check_busy=data.check_busy) return ClaimTaskResponse( success=result.success, reason=result.reason, task=TaskResponse.model_validate(result.task) if result.task else None, busy_with_tasks=result.busy_with_tasks, blocked_by_tasks=result.blocked_by_tasks, ) @router.post("/block", status_code=200) def block_task( data: BlockTaskRequest, current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ): """设置任务依赖: from_task 阻塞 to_task""" ts = _get_task_system(db) ok = ts.block_task(data.from_task_id, data.to_task_id) if not ok: from fastapi import HTTPException raise HTTPException(404, "任务不存在") return {"message": "ok", "from": data.from_task_id, "to": data.to_task_id} @router.delete("/block", status_code=200) def unblock_task( data: BlockTaskRequest, current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ): """移除任务依赖""" ts = _get_task_system(db) ok = ts.unblock_task(data.from_task_id, data.to_task_id) if not ok: from fastapi import HTTPException raise HTTPException(404, "任务不存在") return {"message": "ok"} @router.get("/agent/{agent_id}/status", response_model=AgentStatusResponse) def get_agent_status( agent_id: str, current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ): """获取 Agent 忙闲状态 (idle/busy)""" ts = _get_task_system(db) state = ts.get_agent_status(agent_id) return AgentStatusResponse( agent_id=state.agent_id, status=state.status.value, current_tasks=state.current_tasks, ) @router.post("/{task_id}/release", status_code=200) def release_task( task_id: str, data: ReleaseTaskRequest, current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ): """Agent 主动释放单个任务""" ts = _get_task_system(db) ok = ts.release_task(task_id, data.agent_id) if not ok: from fastapi import HTTPException raise HTTPException(404, "任务不存在或不属于该 Agent") return {"message": "ok", "task_id": task_id} @router.post("/agent/{agent_id}/unassign", status_code=200) def unassign_agent_tasks( agent_id: str, current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ): """释放 Agent 所有未完成任务 (Agent 下线时调用)""" ts = _get_task_system(db) tasks = ts.unassign_agent_tasks(agent_id) return {"message": "ok", "unassigned_count": len(tasks), "task_ids": [t.id for t in tasks]} @router.post("/{task_id}/complete", response_model=TaskResponse) def complete_task( task_id: str, data: TaskCompleteRequest = TaskCompleteRequest(), current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ): """标记任务完成 (自动检查被阻塞任务)""" ts = _get_task_system(db) task = ts.complete_task(task_id, result=data.result) if not task: from fastapi import HTTPException raise HTTPException(404, "任务不存在") return TaskResponse.model_validate(task) @router.post("/{task_id}/fail", response_model=TaskResponse) def fail_task( task_id: str, data: TaskFailRequest = TaskFailRequest(), current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ): """标记任务失败""" ts = _get_task_system(db) task = ts.fail_task(task_id, error_message=data.error_message) if not task: from fastapi import HTTPException raise HTTPException(404, "任务不存在") return TaskResponse.model_validate(task) @router.get("/available/{goal_id}", response_model=List[TaskResponse]) def get_available_tasks( goal_id: str, limit: int = Query(default=10, ge=1, le=100), current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ): """获取目标下所有可执行任务 (依赖满足 + 未被认领)""" ts = _get_task_system(db) tasks = ts.get_next_available_tasks(goal_id, limit=limit) return [TaskResponse.model_validate(t) for t in tasks] @router.get("/{task_id}/blockers", response_model=List[TaskResponse]) def get_task_blockers( task_id: str, current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ): """获取任务尚未完成的阻塞依赖""" ts = _get_task_system(db) blockers = ts.get_unresolved_blockers(task_id) return [TaskResponse.model_validate(t) for t in blockers]