diff --git a/backend/app/api/agent_schedules.py b/backend/app/api/agent_schedules.py new file mode 100644 index 0000000..0691658 --- /dev/null +++ b/backend/app/api/agent_schedules.py @@ -0,0 +1,194 @@ +"""Agent 定时任务 CRUD API""" +from __future__ import annotations + +import logging +from datetime import datetime +from typing import Any, Dict, List, Optional + +from fastapi import APIRouter, Depends, HTTPException +from pydantic import BaseModel, Field +from sqlalchemy.orm import Session + +from app.api.auth import get_current_user +from app.core.database import get_db +from app.models.agent import Agent +from app.models.agent_schedule import AgentSchedule +from app.models.user import User +from app.services.agent_schedule_service import ( + compute_next_run, + create_execution_for_schedule, +) + +logger = logging.getLogger(__name__) +router = APIRouter(prefix="/api/v1/agent-schedules", tags=["agent-schedules"]) + + +# ─── Pydantic Schemas ────────────────────────────────────────────── + + +class ScheduleCreate(BaseModel): + agent_id: str + name: str = Field(..., max_length=100) + cron_expression: str = Field(..., description="标准 5 位 cron,如 0 9 * * *") + input_message: str = Field(..., description="每次触发时发给 Agent 的消息") + timezone: str = "Asia/Shanghai" + + +class ScheduleUpdate(BaseModel): + name: Optional[str] = None + cron_expression: Optional[str] = None + input_message: Optional[str] = None + timezone: Optional[str] = None + enabled: Optional[bool] = None + + +class ScheduleResponse(BaseModel): + id: str + agent_id: str + name: str + cron_expression: str + input_message: str + timezone: str + enabled: bool + last_run_at: Optional[datetime] = None + last_run_status: Optional[str] = None + next_run_at: datetime + created_at: datetime + updated_at: datetime + + class Config: + from_attributes = True + + +# ─── API Endpoints ───────────────────────────────────────────────── + + +@router.get("", response_model=List[ScheduleResponse]) +async def list_schedules( + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db), +): + """获取当前用户的所有定时任务。""" + schedules = ( + db.query(AgentSchedule) + .filter(AgentSchedule.user_id == current_user.id) + .order_by(AgentSchedule.created_at.desc()) + .all() + ) + return schedules + + +@router.post("", response_model=ScheduleResponse, status_code=201) +async def create_schedule( + data: ScheduleCreate, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db), +): + """创建定时任务。""" + # 验证 Agent 存在 + agent = db.query(Agent).filter(Agent.id == data.agent_id).first() + if not agent: + raise HTTPException(status_code=404, detail="Agent 不存在") + if agent.user_id and agent.user_id != current_user.id and current_user.role != "admin": + raise HTTPException(status_code=403, detail="无权使用该 Agent") + + # 验证 cron 表达式 + try: + next_run = compute_next_run(data.cron_expression) + except (ValueError, KeyError) as e: + raise HTTPException(status_code=400, detail=f"cron 表达式无效: {e}") + + schedule = AgentSchedule( + agent_id=data.agent_id, + name=data.name, + cron_expression=data.cron_expression, + input_message=data.input_message, + timezone=data.timezone or "Asia/Shanghai", + enabled=True, + next_run_at=next_run, + user_id=current_user.id, + ) + db.add(schedule) + db.commit() + db.refresh(schedule) + logger.info( + "定时任务创建: user=%s agent=%s name=%s cron=%s next_run=%s", + current_user.id, data.agent_id, data.name, data.cron_expression, next_run, + ) + return schedule + + +@router.put("/{schedule_id}", response_model=ScheduleResponse) +async def update_schedule( + schedule_id: str, + data: ScheduleUpdate, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db), +): + """更新定时任务配置。""" + schedule = db.query(AgentSchedule).filter(AgentSchedule.id == schedule_id).first() + if not schedule: + raise HTTPException(status_code=404, detail="定时任务不存在") + if schedule.user_id != current_user.id and current_user.role != "admin": + raise HTTPException(status_code=403, detail="无权修改该定时任务") + + if data.name is not None: + schedule.name = data.name + if data.cron_expression is not None: + try: + schedule.next_run_at = compute_next_run(data.cron_expression, after=datetime.utcnow()) + schedule.cron_expression = data.cron_expression + except (ValueError, KeyError) as e: + raise HTTPException(status_code=400, detail=f"cron 表达式无效: {e}") + if data.input_message is not None: + schedule.input_message = data.input_message + if data.timezone is not None: + schedule.timezone = data.timezone + if data.enabled is not None: + schedule.enabled = data.enabled + + schedule.updated_at = datetime.utcnow() + db.commit() + db.refresh(schedule) + return schedule + + +@router.delete("/{schedule_id}") +async def delete_schedule( + schedule_id: str, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db), +): + """删除定时任务。""" + schedule = db.query(AgentSchedule).filter(AgentSchedule.id == schedule_id).first() + if not schedule: + raise HTTPException(status_code=404, detail="定时任务不存在") + if schedule.user_id != current_user.id and current_user.role != "admin": + raise HTTPException(status_code=403, detail="无权删除该定时任务") + + db.delete(schedule) + db.commit() + return {"message": "定时任务已删除"} + + +@router.post("/{schedule_id}/trigger") +async def trigger_schedule( + schedule_id: str, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db), +): + """手动触发一次定时任务。""" + schedule = db.query(AgentSchedule).filter(AgentSchedule.id == schedule_id).first() + if not schedule: + raise HTTPException(status_code=404, detail="定时任务不存在") + if schedule.user_id != current_user.id and current_user.role != "admin": + raise HTTPException(status_code=403, detail="无权触发该定时任务") + + execution_id = create_execution_for_schedule(db, schedule) + if not execution_id: + raise HTTPException(status_code=500, detail="触发执行失败") + + return { + "message": "定时任务已触发", + "execution_id": execution_id, + } diff --git a/backend/app/core/celery_app.py b/backend/app/core/celery_app.py index cd24470..9bfd560 100644 --- a/backend/app/core/celery_app.py +++ b/backend/app/core/celery_app.py @@ -2,13 +2,18 @@ Celery 应用配置 """ from celery import Celery +from celery.schedules import crontab from app.core.config import settings celery_app = Celery( "aiagent", broker=settings.REDIS_URL, backend=settings.REDIS_URL, - include=["app.tasks.workflow_tasks", "app.tasks.agent_tasks"] + include=[ + "app.tasks.workflow_tasks", + "app.tasks.agent_tasks", + "app.tasks.scheduler_tasks", + ] ) celery_app.conf.update( @@ -21,3 +26,11 @@ celery_app.conf.update( task_time_limit=30 * 60, # 30分钟 task_soft_time_limit=25 * 60, # 25分钟 ) + +# Celery Beat 定时调度配置 +celery_app.conf.beat_schedule = { + "check-agent-schedules-every-minute": { + "task": "app.tasks.scheduler_tasks.check_agent_schedules_task", + "schedule": crontab(minute="*"), # 每分钟检查 + }, +} diff --git a/backend/app/core/database.py b/backend/app/core/database.py index fbaa82e..fcfc8e5 100644 --- a/backend/app/core/database.py +++ b/backend/app/core/database.py @@ -49,5 +49,6 @@ def init_db(): import app.models.agent_llm_log import app.models.agent_vector_memory import app.models.agent_learning_pattern + import app.models.agent_schedule import app.models.knowledge_base Base.metadata.create_all(bind=engine) diff --git a/backend/app/main.py b/backend/app/main.py index dd76abb..3877022 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -213,7 +213,7 @@ async def startup_event(): logger.error(f"自定义工具加载失败: {e}") # 注册路由 -from app.api import auth, uploads, workflows, executions, websocket, execution_logs, data_sources, agents, platform_templates, model_configs, webhooks, template_market, batch_operations, collaboration, permissions, monitoring, alert_rules, node_test, node_templates, tools, agent_chat, agent_monitoring, knowledge_base +from app.api import auth, uploads, workflows, executions, websocket, execution_logs, data_sources, agents, platform_templates, model_configs, webhooks, template_market, batch_operations, collaboration, permissions, monitoring, alert_rules, node_test, node_templates, tools, agent_chat, agent_monitoring, knowledge_base, agent_schedules app.include_router(auth.router) app.include_router(uploads.router) @@ -238,6 +238,7 @@ app.include_router(tools.router) app.include_router(agent_chat.router) app.include_router(agent_monitoring.router) app.include_router(knowledge_base.router) +app.include_router(agent_schedules.router) if __name__ == "__main__": import uvicorn diff --git a/backend/app/models/__init__.py b/backend/app/models/__init__.py index 8640e50..edb070f 100644 --- a/backend/app/models/__init__.py +++ b/backend/app/models/__init__.py @@ -15,6 +15,7 @@ from app.models.persistent_user_memory import PersistentUserMemory from app.models.agent_llm_log import AgentLLMLog from app.models.agent_vector_memory import AgentVectorMemory from app.models.agent_learning_pattern import AgentLearningPattern +from app.models.agent_schedule import AgentSchedule from app.models.knowledge_base import KnowledgeBase, Document, DocumentChunk -__all__ = ["User", "Workflow", "WorkflowVersion", "Agent", "Execution", "ExecutionLog", "ModelConfig", "DataSource", "WorkflowTemplate", "TemplateRating", "TemplateFavorite", "NodeTemplate", "Role", "Permission", "WorkflowPermission", "AgentPermission", "AlertRule", "AlertLog", "PersistentUserMemory", "AgentLLMLog", "AgentVectorMemory", "AgentLearningPattern", "KnowledgeBase", "Document", "DocumentChunk"] \ No newline at end of file +__all__ = ["User", "Workflow", "WorkflowVersion", "Agent", "Execution", "ExecutionLog", "ModelConfig", "DataSource", "WorkflowTemplate", "TemplateRating", "TemplateFavorite", "NodeTemplate", "Role", "Permission", "WorkflowPermission", "AgentPermission", "AlertRule", "AlertLog", "PersistentUserMemory", "AgentLLMLog", "AgentVectorMemory", "AgentLearningPattern", "AgentSchedule", "KnowledgeBase", "Document", "DocumentChunk"] \ No newline at end of file diff --git a/backend/app/models/agent_schedule.py b/backend/app/models/agent_schedule.py new file mode 100644 index 0000000..3ce831e --- /dev/null +++ b/backend/app/models/agent_schedule.py @@ -0,0 +1,28 @@ +"""Agent 定时任务表:按 cron 表达式周期执行 Agent""" +import uuid +from datetime import datetime +from sqlalchemy import Column, String, Text, Integer, DateTime, ForeignKey, Boolean +from sqlalchemy.dialects.mysql import CHAR +from app.core.database import Base + + +class AgentSchedule(Base): + """Agent 定时任务 — 按 cron 表达式周期执行指定 Agent""" + __tablename__ = "agent_schedules" + + id = Column(CHAR(36), primary_key=True, default=lambda: str(uuid.uuid4())) + agent_id = Column(CHAR(36), ForeignKey("agents.id"), nullable=False, index=True, comment="关联 Agent ID") + name = Column(String(100), nullable=False, comment="任务名称") + cron_expression = Column(String(100), nullable=False, comment="cron 表达式,如 0 9 * * *") + input_message = Column(Text, nullable=False, comment="定时执行时发送的消息内容") + timezone = Column(String(64), default="Asia/Shanghai", comment="时区") + enabled = Column(Boolean, default=True, comment="是否启用") + last_run_at = Column(DateTime, nullable=True, comment="上次执行时间") + last_run_status = Column(String(32), nullable=True, comment="上次执行状态: success/failed") + next_run_at = Column(DateTime, nullable=False, comment="下次执行时间") + user_id = Column(CHAR(36), ForeignKey("users.id"), nullable=False, index=True, comment="创建者 ID") + created_at = Column(DateTime, default=datetime.utcnow, comment="创建时间") + updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, comment="更新时间") + + def __repr__(self): + return f"" diff --git a/backend/app/services/agent_schedule_service.py b/backend/app/services/agent_schedule_service.py new file mode 100644 index 0000000..963eba7 --- /dev/null +++ b/backend/app/services/agent_schedule_service.py @@ -0,0 +1,140 @@ +"""Agent 定时任务服务:cron 解析、执行触发、下次执行时间计算""" +from __future__ import annotations + +import logging +from datetime import datetime, timezone, timedelta +from typing import Optional +from croniter import croniter +from sqlalchemy.orm import Session + +from app.core.database import SessionLocal + +logger = logging.getLogger(__name__) + + +def compute_next_run(cron_expression: str, after: Optional[datetime] = None) -> datetime: + """根据 cron 表达式计算下一次执行时间。 + + Args: + cron_expression: 标准 5 位 cron 表达式,如 "0 9 * * *" + after: 从哪个时间点开始计算,默认为当前 UTC 时间 + + Returns: + 下次执行的 UTC datetime + """ + base = after or datetime.now(timezone.utc) + # croniter 需要以 datetime 对象作为基准 + base_naive = base.replace(tzinfo=None) if base.tzinfo else base + cron = croniter(cron_expression, base_naive) + next_dt = cron.get_next(datetime) + return next_dt + + +def create_execution_for_schedule(db: Session, schedule) -> Optional[str]: + """为定时任务创建 Execution 记录并投递 Celery 任务。 + + Args: + db: 数据库会话 + schedule: AgentSchedule ORM 对象 + + Returns: + 创建的 execution_id,失败返回 None + """ + from app.models.execution import Execution + from app.models.agent import Agent + + agent = db.query(Agent).filter(Agent.id == schedule.agent_id).first() + if not agent: + logger.warning("定时任务 %s 关联的 Agent %s 不存在", schedule.id, schedule.agent_id) + return None + if not agent.workflow_config: + logger.warning("Agent %s 缺少 workflow_config,无法执行定时任务", schedule.agent_id) + return None + + # 创建执行记录 + execution = Execution( + agent_id=schedule.agent_id, + input_data={"message": schedule.input_message}, + status="pending", + ) + db.add(execution) + db.flush() # 获取 id + + # 投递到 Celery + from app.tasks.workflow_tasks import execute_workflow_task + + try: + task = execute_workflow_task.delay( + str(execution.id), + f"agent_{schedule.agent_id}", + agent.workflow_config, + {"message": schedule.input_message}, + ) + execution.task_id = task.id + execution.status = "running" + db.commit() + logger.info( + "定时任务 %s 已投递执行: execution=%s task=%s", + schedule.id, execution.id, task.id, + ) + return str(execution.id) + except Exception as e: + execution.status = "failed" + execution.error_message = f"定时任务投递失败: {e!s}" + db.commit() + logger.warning("定时任务 %s 投递失败: %s", schedule.id, e) + return str(execution.id) + + +def check_and_run_due_schedules() -> int: + """检查所有启用的定时任务,执行到期的任务。 + + 被 Celery Beat 每分钟调用一次。 + + Returns: + 本次触发的任务数 + """ + db: Optional[Session] = None + try: + db = SessionLocal() + now = datetime.utcnow() + + due_schedules = ( + db.query(AgentSchedule) + .filter( + AgentSchedule.enabled == True, # noqa: E712 + AgentSchedule.next_run_at <= now, + ) + .all() + ) + + triggered = 0 + for sched in due_schedules: + try: + # 创建执行记录 + create_execution_for_schedule(db, sched) + + # 更新定时任务状态 + sched.last_run_at = now + sched.last_run_status = "success" + # 计算下次执行时间 + sched.next_run_at = compute_next_run(sched.cron_expression, after=now) + db.commit() + triggered += 1 + logger.info( + "定时任务触发完成: name=%s next_run=%s", + sched.name, sched.next_run_at, + ) + except Exception as e: + logger.error("定时任务 %s 执行失败: %s", sched.id, e) + sched.last_run_at = now + sched.last_run_status = "failed" + db.commit() + + return triggered + except Exception as e: + logger.error("检查定时任务失败: %s", e) + return 0 + finally: + if db: + db.close() diff --git a/backend/app/tasks/scheduler_tasks.py b/backend/app/tasks/scheduler_tasks.py new file mode 100644 index 0000000..4f4da12 --- /dev/null +++ b/backend/app/tasks/scheduler_tasks.py @@ -0,0 +1,27 @@ +""" +Celery Beat 定时调度任务 — 每分钟检查并触发到期的 Agent 定时任务。 +""" +from __future__ import annotations + +import logging + +from app.core.tools_bootstrap import ensure_builtin_tools_registered + +ensure_builtin_tools_registered() + +from app.core.celery_app import celery_app +from app.services.agent_schedule_service import check_and_run_due_schedules + +logger = logging.getLogger(__name__) + + +@celery_app.task(bind=True, max_retries=0) +def check_agent_schedules_task(self): + """检查所有到期的 Agent 定时任务并触发执行。 + + 由 Celery Beat 每分钟调度一次。若 Redis/Worker 短暂不可用, + 下次心跳仍会捕获到期的任务(next_run_at 不会因错过而丢失)。 + """ + triggered = check_and_run_due_schedules() + if triggered: + logger.info("定时任务检查完成: 本次触发 %d 个任务", triggered)