"""Agent 定时任务服务:cron 解析、执行触发、下次执行时间计算""" from __future__ import annotations import logging from datetime import datetime, timezone, timedelta from typing import Optional from croniter import croniter from sqlalchemy.orm import Session from app.core.database import SessionLocal logger = logging.getLogger(__name__) def compute_next_run(cron_expression: str, after: Optional[datetime] = None) -> datetime: """根据 cron 表达式计算下一次执行时间。 Args: cron_expression: 标准 5 位 cron 表达式,如 "0 9 * * *" after: 从哪个时间点开始计算,默认为当前 UTC 时间 Returns: 下次执行的 UTC datetime """ base = after or datetime.now(timezone.utc) # croniter 需要以 datetime 对象作为基准 base_naive = base.replace(tzinfo=None) if base.tzinfo else base cron = croniter(cron_expression, base_naive) next_dt = cron.get_next(datetime) return next_dt def create_execution_for_schedule(db: Session, schedule) -> Optional[str]: """为定时任务创建 Execution 记录并投递 Celery 任务。 Args: db: 数据库会话 schedule: AgentSchedule ORM 对象 Returns: 创建的 execution_id,失败返回 None """ from app.models.execution import Execution from app.models.agent import Agent agent = db.query(Agent).filter(Agent.id == schedule.agent_id).first() if not agent: logger.warning("定时任务 %s 关联的 Agent %s 不存在", schedule.id, schedule.agent_id) return None if not agent.workflow_config: logger.warning("Agent %s 缺少 workflow_config,无法执行定时任务", schedule.agent_id) return None # 创建执行记录(关联 schedule_id) execution = Execution( agent_id=schedule.agent_id, schedule_id=schedule.id, input_data={"message": schedule.input_message}, status="pending", ) db.add(execution) db.flush() # 获取 id # 投递到 Celery from app.tasks.workflow_tasks import execute_workflow_task try: task = execute_workflow_task.delay( str(execution.id), f"agent_{schedule.agent_id}", agent.workflow_config, {"message": schedule.input_message}, ) execution.task_id = task.id execution.status = "running" db.commit() logger.info( "定时任务 %s 已投递执行: execution=%s task=%s", schedule.id, execution.id, task.id, ) return str(execution.id) except Exception as e: execution.status = "failed" execution.error_message = f"定时任务投递失败: {e!s}" db.commit() logger.warning("定时任务 %s 投递失败: %s", schedule.id, e) return str(execution.id) def check_and_run_due_schedules() -> int: """检查所有启用的定时任务,执行到期的任务。 被 Celery Beat 每分钟调用一次。 Returns: 本次触发的任务数 """ from app.models.agent_schedule import AgentSchedule db: Optional[Session] = None try: db = SessionLocal() now = datetime.utcnow() due_schedules = ( db.query(AgentSchedule) .filter( AgentSchedule.enabled == True, # noqa: E712 AgentSchedule.next_run_at <= now, ) .all() ) triggered = 0 for sched in due_schedules: try: # 创建执行记录 create_execution_for_schedule(db, sched) # 更新定时任务状态 sched.last_run_at = now sched.last_run_status = "success" # 计算下次执行时间 sched.next_run_at = compute_next_run(sched.cron_expression, after=now) db.commit() triggered += 1 logger.info( "定时任务触发完成: name=%s next_run=%s", sched.name, sched.next_run_at, ) except Exception as e: logger.error("定时任务 %s 执行失败: %s", sched.id, e) sched.last_run_at = now sched.last_run_status = "failed" db.commit() return triggered except Exception as e: logger.error("检查定时任务失败: %s", e) return 0 finally: if db: db.close()