Files
aiagent/backend/app/services/agent_schedule_service.py

489 lines
19 KiB
Python
Raw Normal View History

"""Agent 定时任务服务cron 解析、执行触发、下次执行时间计算"""
from __future__ import annotations
import logging
from datetime import datetime, timezone, timedelta
from typing import Optional
import pytz
from croniter import croniter
from sqlalchemy.orm import Session
from app.core.database import SessionLocal
logger = logging.getLogger(__name__)
def compute_next_run(cron_expression: str, after: Optional[datetime] = None, tz: str = "Asia/Shanghai") -> datetime:
"""根据 cron 表达式计算下一次执行时间。
Args:
cron_expression: 标准 5 cron 表达式 "0 9 * * *"
after: 从哪个时间点开始计算默认为当前 UTC 时间
tz: 时区名称 "Asia/Shanghai"cron 表达式在该时区下解释
Returns:
下次执行的 UTC datetimenaive存储时与 datetime.utcnow() 对齐
"""
base = after or datetime.now(timezone.utc)
# 确保 base 是 timezone-aware UTCdatetime.utcnow() 返回 naive需要先 localize
if base.tzinfo is None:
base = pytz.UTC.localize(base)
else:
base = base.astimezone(pytz.UTC)
# 将 UTC 时间转换为目标时区cron 表达式在目标时区下解释
try:
target_tz = pytz.timezone(tz)
except pytz.UnknownTimeZoneError:
target_tz = pytz.UTC
base_in_tz = base.astimezone(target_tz)
# croniter 需要 naive datetime在目标时区下计算
base_naive = base_in_tz.replace(tzinfo=None)
cron = croniter(cron_expression, base_naive)
next_naive = cron.get_next(datetime)
# 将结果 localize 回目标时区,再转 UTC
next_in_tz = target_tz.localize(next_naive)
next_utc = next_in_tz.astimezone(pytz.UTC)
# 返回 naive UTC与 datetime.utcnow() 对齐)
return next_utc.replace(tzinfo=None)
def _create_execution_for_goal_schedule(db: Session, schedule) -> Optional[str]:
"""为 Goal 类型定时任务创建 Goal 并投递执行。
Args:
db: 数据库会话
schedule: AgentSchedule ORM 对象 (schedule_type == "goal")
Returns:
创建的 goal_id失败返回 None
"""
from app.services.goal_service import create_goal, update_goal
from app.tasks.goal_tasks import execute_goal_task
gc = schedule.goal_config or {}
title = gc.get("title", schedule.input_message or schedule.name)
description = gc.get("description", "")
priority = gc.get("priority", 5)
main_agent_id = schedule.agent_id or gc.get("main_agent_id")
try:
goal = create_goal(
db=db,
creator_id=schedule.user_id,
title=title,
description=description,
priority=priority,
main_agent_id=main_agent_id,
)
task = execute_goal_task.delay(str(goal.id))
update_goal(db, str(goal.id), status="active")
logger.info(
"Goal 定时任务 %s 已投递: goal_id=%s celery_task=%s",
schedule.id, goal.id, task.id,
)
return str(goal.id)
except Exception as e:
logger.error("Goal 定时任务 %s 投递失败: %s", schedule.id, e)
return None
def create_execution_for_schedule(db: Session, schedule) -> Optional[str]:
"""为定时任务创建 Execution 记录并投递 Celery 任务。
Args:
db: 数据库会话
schedule: AgentSchedule ORM 对象
Returns:
创建的 execution_id失败返回 None
"""
from app.models.execution import Execution
from app.models.agent import Agent
# Goal 类型调度:创建 Goal 并投递执行
if getattr(schedule, "schedule_type", "agent") == "goal" and schedule.goal_id:
return _create_execution_for_goal_schedule(db, schedule)
agent = db.query(Agent).filter(Agent.id == schedule.agent_id).first()
if not agent:
logger.warning("定时任务 %s 关联的 Agent %s 不存在", schedule.id, schedule.agent_id)
return None
# 创建执行记录(关联 schedule_id标记为定时任务提醒
execution = Execution(
agent_id=schedule.agent_id,
schedule_id=schedule.id,
input_data={
"USER_INPUT": f"[定时任务提醒] {schedule.input_message}",
"query": f"[定时任务提醒] {schedule.input_message}",
"message": schedule.input_message,
"is_scheduled_reminder": True,
},
status="pending",
)
db.add(execution)
db.flush() # 获取 id
try:
if agent.workflow_config and agent.workflow_config.get("nodes"):
# 有工作流配置:走完整工作流引擎
from app.tasks.workflow_tasks import execute_workflow_task
task = execute_workflow_task.delay(
str(execution.id),
f"agent_{schedule.agent_id}",
agent.workflow_config,
{
"USER_INPUT": f"[定时任务提醒] {schedule.input_message}",
"query": f"[定时任务提醒] {schedule.input_message}",
"message": schedule.input_message,
"is_scheduled_reminder": True,
},
)
else:
# 无工作流配置:走简单 Agent 异步执行(传入已创建的 execution_id
from app.tasks.agent_tasks import execute_agent_task
task = execute_agent_task.delay(
str(schedule.agent_id),
{
"USER_INPUT": f"[定时任务提醒] {schedule.input_message}",
"query": f"[定时任务提醒] {schedule.input_message}",
"message": schedule.input_message,
"is_scheduled_reminder": True,
},
execution_id=str(execution.id),
)
execution.task_id = task.id
execution.status = "running"
db.commit()
logger.info(
"定时任务 %s 已投递执行: execution=%s task=%s",
schedule.id, execution.id, task.id,
)
return str(execution.id)
except Exception as e:
execution.status = "failed"
execution.error_message = f"定时任务投递失败: {e!s}"
db.commit()
logger.warning("定时任务 %s 投递失败: %s", schedule.id, e)
return str(execution.id)
def check_and_run_due_schedules() -> int:
"""检查所有启用的定时任务,执行到期的任务。
Celery Beat 每分钟调用一次
Returns:
本次触发的任务数
"""
from app.models.agent_schedule import AgentSchedule
db: Optional[Session] = None
try:
db = SessionLocal()
now = datetime.now(timezone.utc).replace(tzinfo=None) # naive UTC与 DB 存储一致
due_schedules = (
db.query(AgentSchedule)
.filter(
AgentSchedule.enabled == True, # noqa: E712
AgentSchedule.next_run_at <= now,
)
.all()
)
triggered = 0
for sched in due_schedules:
try:
# 创建执行记录
create_execution_for_schedule(db, sched)
# 更新定时任务状态
sched.last_run_at = now
sched.last_run_status = "success"
# 计算下次执行时间(使用定时任务配置的时区)
sched.next_run_at = compute_next_run(sched.cron_expression, after=now, tz=sched.timezone or "UTC")
db.commit()
triggered += 1
logger.info(
"定时任务触发完成: name=%s next_run=%s",
sched.name, sched.next_run_at,
)
except Exception as e:
logger.error("定时任务 %s 执行失败: %s", sched.id, e)
sched.last_run_at = now
sched.last_run_status = "failed"
db.commit()
return triggered + check_and_run_autonomy_ticks()
except Exception as e:
logger.error("检查定时任务失败: %s", e)
return 0
finally:
if db:
db.close()
def sync_autonomy_schedule_for_goal(db: Session, goal_id: str) -> Optional[str]:
"""根据 Goal 的 autonomy_config.check_interval_minutes 创建/更新 Celery Beat 调度。
Goal 状态为 active 且有有效 check_interval 确保存在对应的定时调度
Goal 状态非 active 删除对应的定时调度
Args:
db: 数据库会话
goal_id: Goal ID
Returns:
创建的 schedule_id不适用返回 None
"""
from app.models.goal import Goal
from app.models.agent_schedule import AgentSchedule
goal = db.query(Goal).filter(Goal.id == goal_id).first()
if not goal:
return None
# 删除现有的自主循环调度
db.query(AgentSchedule).filter(
AgentSchedule.schedule_type == "goal_autonomy",
AgentSchedule.goal_id == goal_id,
).delete()
if goal.status != "active":
db.commit()
logger.info("Goal %s 非活跃状态,已移除自主循环调度", goal_id)
return None
ac = goal.autonomy_config or {}
interval_minutes = int(ac.get("check_interval_minutes", 0) or 0)
if interval_minutes <= 0:
db.commit()
return None
# 构造 cron 表达式(每 N 分钟)
if interval_minutes < 60:
cron_expr = f"*/{interval_minutes} * * * *"
else:
hours = interval_minutes // 60
cron_expr = f"0 */{hours} * * *"
tz = ac.get("timezone", "Asia/Shanghai")
now = datetime.now(timezone.utc).replace(tzinfo=None)
next_run = compute_next_run(cron_expr, after=now, tz=tz)
schedule = AgentSchedule(
agent_id=goal.main_agent_id,
schedule_type="goal_autonomy",
goal_id=goal_id,
goal_config={
"title": goal.title,
"description": goal.description or "",
"priority": goal.priority,
"main_agent_id": goal.main_agent_id,
"autonomy_config": ac,
},
name=f"自主循环: {goal.title}",
cron_expression=cron_expr,
input_message=f"Autonomy tick for goal: {goal.title}",
timezone=tz,
enabled=True,
next_run_at=next_run,
user_id=goal.creator_id,
)
db.add(schedule)
db.commit()
logger.info(
"Goal %s 自主循环调度已创建: schedule=%s cron=%s interval=%dmin",
goal_id, schedule.id, cron_expr, interval_minutes,
)
return str(schedule.id)
def check_and_run_autonomy_ticks() -> int:
"""检查所有到期的 Goal 自主循环调度,触发 autonomy_tick。
Returns:
本次触发的 Goal 数量
"""
from app.models.agent_schedule import AgentSchedule
db: Optional[Session] = None
try:
db = SessionLocal()
now = datetime.now(timezone.utc).replace(tzinfo=None)
due_schedules = (
db.query(AgentSchedule)
.filter(
AgentSchedule.schedule_type == "goal_autonomy",
AgentSchedule.enabled == True,
AgentSchedule.next_run_at <= now,
)
.all()
)
triggered = 0
for sched in due_schedules:
if not sched.goal_id:
continue
try:
from app.tasks.goal_tasks import autonomy_tick_task
autonomy_tick_task.delay(str(sched.goal_id))
sched.last_run_at = now
sched.last_run_status = "success"
sched.next_run_at = compute_next_run(
sched.cron_expression, after=now, tz=sched.timezone or "UTC"
)
db.commit()
triggered += 1
logger.info("自主循环触发: goal=%s schedule=%s", sched.goal_id, sched.id)
except Exception as e:
logger.error("自主循环触发失败: goal=%s error=%s", sched.goal_id, e)
sched.last_run_at = now
sched.last_run_status = "failed"
db.commit()
return triggered
except Exception as e:
logger.error("检查自主循环调度失败: %s", e)
return 0
finally:
if db:
db.close()
def notify_schedule_result(db: Session, execution, status: str, error_message: Optional[str] = None) -> None:
"""如果 execution 关联了定时任务,创建通知并推送飞书消息。
Args:
db: 数据库会话
execution: Execution ORM 对象
status: "completed" "failed"
error_message: 失败时的错误信息
"""
if not execution or not execution.schedule_id:
return
try:
from app.models.agent_schedule import AgentSchedule
from app.services.notification_service import create_notification
schedule = db.query(AgentSchedule).filter(AgentSchedule.id == execution.schedule_id).first()
if not schedule:
return
if status == "completed":
title = f"定时任务「{schedule.name}」执行成功"
# 优先使用 Agent 执行结果,其次使用定时任务的提醒内容
result_text = ""
if execution.output_data:
if isinstance(execution.output_data, dict):
result_text = (
execution.output_data.get("result")
or execution.output_data.get("output")
or execution.output_data.get("text")
or ""
)
elif isinstance(execution.output_data, str):
result_text = execution.output_data
if not result_text:
result_text = schedule.input_message or ""
content = result_text if result_text else "Agent 已按计划执行完成。"
else:
title = f"定时任务「{schedule.name}」执行失败"
content = f"错误信息: {error_message or '未知错误'}"
create_notification(
db,
user_id=schedule.user_id,
title=title,
content=content,
category="schedule",
ref_type="execution",
ref_id=str(execution.id),
)
db.commit()
# 如果配置了飞书 webhook发送飞书通知非阻塞失败不影响主流程
if schedule.webhook_url:
try:
from app.services.feishu_notifier import send_feishu_card
detail_link = None
try:
from app.core.config import settings
if settings.EXTERNAL_URL:
detail_link = f"{settings.EXTERNAL_URL}/executions/{execution.id}"
except Exception:
pass
send_feishu_card(
webhook_url=schedule.webhook_url,
title=title,
body=content,
status=status,
detail_link=detail_link,
)
except Exception as e:
logger.warning("飞书 webhook 通知发送失败: %s", e)
# 如果用户绑定了飞书账号,通过对应的飞书应用发送通知
try:
from app.models.user import User
from app.services.feishu_open_id_service import get_app_id_for_agent, get_open_id_for_app
schedule_user = db.query(User).filter(User.id == schedule.user_id).first()
if schedule_user:
detail_link = None
try:
from app.core.config import settings
if settings.EXTERNAL_URL:
detail_link = f"{settings.EXTERNAL_URL}/executions/{execution.id}"
except Exception:
pass
agent_id = str(execution.agent_id) if execution.agent_id else ""
app_id = get_app_id_for_agent(agent_id)
per_app_open_id = get_open_id_for_app(db, schedule.user_id, app_id) if app_id else None
send_ok = False
# 如果找到了该用户在此应用下的专属 open_id直接用对应应用发送
if per_app_open_id and app_id:
if app_id == (settings.LINGXI_APP_ID or ""):
try:
from app.services.lingxi_app_service import send_message_to_user as send_msg
send_msg(open_id=per_app_open_id, title=title, content=content, status=status, detail_link=detail_link)
send_ok = True
except Exception:
logger.info("灵犀发送失败fallback 到主飞书应用")
elif app_id == (settings.ORANGE_APP_ID or ""):
try:
from app.services.orange_app_service import send_message_to_user as send_msg
send_msg(open_id=per_app_open_id, title=title, content=content, status=status, detail_link=detail_link)
send_ok = True
except Exception:
logger.info("橙子发送失败fallback 到主飞书应用")
elif app_id == (settings.SUYAO_APP_ID or ""):
try:
from app.services.suyao_app_service import send_message_to_user as send_msg
send_msg(open_id=per_app_open_id, title=title, content=content, status=status, detail_link=detail_link)
send_ok = True
except Exception:
logger.info("苏瑶发送失败fallback 到主飞书应用")
elif app_id == (settings.TIANTIAN_APP_ID or ""):
try:
from app.services.tiantian_app_service import send_message_to_user as send_msg
send_msg(open_id=per_app_open_id, title=title, content=content, status=status, detail_link=detail_link)
send_ok = True
except Exception:
logger.info("甜甜发送失败fallback 到主飞书应用")
# Fallback: 使用主飞书应用(苹果)的 open_id 发送
if not send_ok and schedule_user.feishu_open_id:
from app.services.feishu_app_service import send_message_to_user as send_msg
send_msg(open_id=schedule_user.feishu_open_id, title=title, content=content, status=status, detail_link=detail_link)
except Exception as e:
logger.warning("飞书应用通知发送失败: %s", e)
except Exception as e:
logger.warning("创建定时任务通知失败: %s", e)