"""Agent 定时任务服务:cron 解析、执行触发、下次执行时间计算""" from __future__ import annotations import logging from datetime import datetime, timezone, timedelta from typing import Optional import pytz from croniter import croniter from sqlalchemy.orm import Session from app.core.database import SessionLocal logger = logging.getLogger(__name__) def compute_next_run(cron_expression: str, after: Optional[datetime] = None, tz: str = "Asia/Shanghai") -> datetime: """根据 cron 表达式计算下一次执行时间。 Args: cron_expression: 标准 5 位 cron 表达式,如 "0 9 * * *" after: 从哪个时间点开始计算,默认为当前 UTC 时间 tz: 时区名称,如 "Asia/Shanghai",cron 表达式在该时区下解释 Returns: 下次执行的 UTC datetime(naive,存储时与 datetime.utcnow() 对齐) """ base = after or datetime.now(timezone.utc) # 确保 base 是 timezone-aware UTC(datetime.utcnow() 返回 naive,需要先 localize) if base.tzinfo is None: base = pytz.UTC.localize(base) else: base = base.astimezone(pytz.UTC) # 将 UTC 时间转换为目标时区,cron 表达式在目标时区下解释 try: target_tz = pytz.timezone(tz) except pytz.UnknownTimeZoneError: target_tz = pytz.UTC base_in_tz = base.astimezone(target_tz) # croniter 需要 naive datetime,在目标时区下计算 base_naive = base_in_tz.replace(tzinfo=None) cron = croniter(cron_expression, base_naive) next_naive = cron.get_next(datetime) # 将结果 localize 回目标时区,再转 UTC next_in_tz = target_tz.localize(next_naive) next_utc = next_in_tz.astimezone(pytz.UTC) # 返回 naive UTC(与 datetime.utcnow() 对齐) return next_utc.replace(tzinfo=None) def create_execution_for_schedule(db: Session, schedule) -> Optional[str]: """为定时任务创建 Execution 记录并投递 Celery 任务。 Args: db: 数据库会话 schedule: AgentSchedule ORM 对象 Returns: 创建的 execution_id,失败返回 None """ from app.models.execution import Execution from app.models.agent import Agent agent = db.query(Agent).filter(Agent.id == schedule.agent_id).first() if not agent: logger.warning("定时任务 %s 关联的 Agent %s 不存在", schedule.id, schedule.agent_id) return None # 创建执行记录(关联 schedule_id),标记为定时任务提醒 execution = Execution( agent_id=schedule.agent_id, schedule_id=schedule.id, input_data={ "USER_INPUT": f"[定时任务提醒] {schedule.input_message}", "query": f"[定时任务提醒] {schedule.input_message}", "message": schedule.input_message, "is_scheduled_reminder": True, }, status="pending", ) db.add(execution) db.flush() # 获取 id try: if agent.workflow_config and agent.workflow_config.get("nodes"): # 有工作流配置:走完整工作流引擎 from app.tasks.workflow_tasks import execute_workflow_task task = execute_workflow_task.delay( str(execution.id), f"agent_{schedule.agent_id}", agent.workflow_config, { "USER_INPUT": f"[定时任务提醒] {schedule.input_message}", "query": f"[定时任务提醒] {schedule.input_message}", "message": schedule.input_message, "is_scheduled_reminder": True, }, ) else: # 无工作流配置:走简单 Agent 异步执行(传入已创建的 execution_id) from app.tasks.agent_tasks import execute_agent_task task = execute_agent_task.delay( str(schedule.agent_id), { "USER_INPUT": f"[定时任务提醒] {schedule.input_message}", "query": f"[定时任务提醒] {schedule.input_message}", "message": schedule.input_message, "is_scheduled_reminder": True, }, execution_id=str(execution.id), ) execution.task_id = task.id execution.status = "running" db.commit() logger.info( "定时任务 %s 已投递执行: execution=%s task=%s", schedule.id, execution.id, task.id, ) return str(execution.id) except Exception as e: execution.status = "failed" execution.error_message = f"定时任务投递失败: {e!s}" db.commit() logger.warning("定时任务 %s 投递失败: %s", schedule.id, e) return str(execution.id) def check_and_run_due_schedules() -> int: """检查所有启用的定时任务,执行到期的任务。 被 Celery Beat 每分钟调用一次。 Returns: 本次触发的任务数 """ from app.models.agent_schedule import AgentSchedule db: Optional[Session] = None try: db = SessionLocal() now = datetime.now(timezone.utc).replace(tzinfo=None) # naive UTC,与 DB 存储一致 due_schedules = ( db.query(AgentSchedule) .filter( AgentSchedule.enabled == True, # noqa: E712 AgentSchedule.next_run_at <= now, ) .all() ) triggered = 0 for sched in due_schedules: try: # 创建执行记录 create_execution_for_schedule(db, sched) # 更新定时任务状态 sched.last_run_at = now sched.last_run_status = "success" # 计算下次执行时间(使用定时任务配置的时区) sched.next_run_at = compute_next_run(sched.cron_expression, after=now, tz=sched.timezone or "UTC") db.commit() triggered += 1 logger.info( "定时任务触发完成: name=%s next_run=%s", sched.name, sched.next_run_at, ) except Exception as e: logger.error("定时任务 %s 执行失败: %s", sched.id, e) sched.last_run_at = now sched.last_run_status = "failed" db.commit() return triggered except Exception as e: logger.error("检查定时任务失败: %s", e) return 0 finally: if db: db.close() def notify_schedule_result(db: Session, execution, status: str, error_message: Optional[str] = None) -> None: """如果 execution 关联了定时任务,创建通知并推送飞书消息。 Args: db: 数据库会话 execution: Execution ORM 对象 status: "completed" 或 "failed" error_message: 失败时的错误信息 """ if not execution or not execution.schedule_id: return try: from app.models.agent_schedule import AgentSchedule from app.services.notification_service import create_notification schedule = db.query(AgentSchedule).filter(AgentSchedule.id == execution.schedule_id).first() if not schedule: return if status == "completed": title = f"定时任务「{schedule.name}」执行成功" # 优先使用 Agent 执行结果,其次使用定时任务的提醒内容 result_text = "" if execution.output_data: if isinstance(execution.output_data, dict): result_text = ( execution.output_data.get("result") or execution.output_data.get("output") or execution.output_data.get("text") or "" ) elif isinstance(execution.output_data, str): result_text = execution.output_data if not result_text: result_text = schedule.input_message or "" content = result_text if result_text else "Agent 已按计划执行完成。" else: title = f"定时任务「{schedule.name}」执行失败" content = f"错误信息: {error_message or '未知错误'}" create_notification( db, user_id=schedule.user_id, title=title, content=content, category="schedule", ref_type="execution", ref_id=str(execution.id), ) db.commit() # 如果配置了飞书 webhook,发送飞书通知(非阻塞,失败不影响主流程) if schedule.webhook_url: try: from app.services.feishu_notifier import send_feishu_card detail_link = None try: from app.core.config import settings if settings.EXTERNAL_URL: detail_link = f"{settings.EXTERNAL_URL}/executions/{execution.id}" except Exception: pass send_feishu_card( webhook_url=schedule.webhook_url, title=title, body=content, status=status, detail_link=detail_link, ) except Exception as e: logger.warning("飞书 webhook 通知发送失败: %s", e) # 如果用户绑定了飞书账号,通过对应的飞书应用发送通知 try: from app.models.user import User from app.services.feishu_open_id_service import get_app_id_for_agent, get_open_id_for_app schedule_user = db.query(User).filter(User.id == schedule.user_id).first() if schedule_user: detail_link = None try: from app.core.config import settings if settings.EXTERNAL_URL: detail_link = f"{settings.EXTERNAL_URL}/executions/{execution.id}" except Exception: pass agent_id = str(execution.agent_id) if execution.agent_id else "" app_id = get_app_id_for_agent(agent_id) per_app_open_id = get_open_id_for_app(db, schedule.user_id, app_id) if app_id else None send_ok = False # 如果找到了该用户在此应用下的专属 open_id,直接用对应应用发送 if per_app_open_id and app_id: if app_id == (settings.LINGXI_APP_ID or ""): try: from app.services.lingxi_app_service import send_message_to_user as send_msg send_msg(open_id=per_app_open_id, title=title, content=content, status=status, detail_link=detail_link) send_ok = True except Exception: logger.info("灵犀发送失败,fallback 到主飞书应用") elif app_id == (settings.ORANGE_APP_ID or ""): try: from app.services.orange_app_service import send_message_to_user as send_msg send_msg(open_id=per_app_open_id, title=title, content=content, status=status, detail_link=detail_link) send_ok = True except Exception: logger.info("橙子发送失败,fallback 到主飞书应用") elif app_id == (settings.SUYAO_APP_ID or ""): try: from app.services.suyao_app_service import send_message_to_user as send_msg send_msg(open_id=per_app_open_id, title=title, content=content, status=status, detail_link=detail_link) send_ok = True except Exception: logger.info("苏瑶发送失败,fallback 到主飞书应用") elif app_id == (settings.TIANTIAN_APP_ID or ""): try: from app.services.tiantian_app_service import send_message_to_user as send_msg send_msg(open_id=per_app_open_id, title=title, content=content, status=status, detail_link=detail_link) send_ok = True except Exception: logger.info("甜甜发送失败,fallback 到主飞书应用") # Fallback: 使用主飞书应用(苹果)的 open_id 发送 if not send_ok and schedule_user.feishu_open_id: from app.services.feishu_app_service import send_message_to_user as send_msg send_msg(open_id=schedule_user.feishu_open_id, title=title, content=content, status=status, detail_link=detail_link) except Exception as e: logger.warning("飞书应用通知发送失败: %s", e) except Exception as e: logger.warning("创建定时任务通知失败: %s", e)