Files
aiagent/backend/app/tasks/scheduler_tasks.py
renjianbo 0bbf68d5bb feat: 实现 Agent 定时任务系统 — 按 cron 表达式周期执行 Agent
- 新增 AgentSchedule 模型、CRUD API、调度服务
- 集成 Celery Beat 每分钟检查到期任务并自动触发执行
- 支持手动触发、cron 表达式解析、执行状态跟踪
- 依赖: croniter (cron 表达式解析)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-02 12:14:37 +08:00

28 lines
890 B
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Celery Beat 定时调度任务 — 每分钟检查并触发到期的 Agent 定时任务。
"""
from __future__ import annotations
import logging
from app.core.tools_bootstrap import ensure_builtin_tools_registered
ensure_builtin_tools_registered()
from app.core.celery_app import celery_app
from app.services.agent_schedule_service import check_and_run_due_schedules
logger = logging.getLogger(__name__)
@celery_app.task(bind=True, max_retries=0)
def check_agent_schedules_task(self):
"""检查所有到期的 Agent 定时任务并触发执行。
由 Celery Beat 每分钟调度一次。若 Redis/Worker 短暂不可用,
下次心跳仍会捕获到期的任务next_run_at 不会因错过而丢失)。
"""
triggered = check_and_run_due_schedules()
if triggered:
logger.info("定时任务检查完成: 本次触发 %d 个任务", triggered)