Files
aiagent/backend/app/services/agent_schedule_service.py
renjianbo f33bc461ff fix: resolve Feishu cross-app notification routing bug
Implement per-app open_id storage via user_feishu_open_ids table with
union_id-based cross-app user identification. WS handlers now auto-capture
open_id+union_id and resolve/associate user accounts. Schedule notifications
route through the correct bot's open_id instead of always falling back to 苹果.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-06 00:36:40 +08:00

316 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Agent 定时任务服务cron 解析、执行触发、下次执行时间计算"""
from __future__ import annotations
import logging
from datetime import datetime, timezone, timedelta
from typing import Optional
import pytz
from croniter import croniter
from sqlalchemy.orm import Session
from app.core.database import SessionLocal
logger = logging.getLogger(__name__)
def compute_next_run(cron_expression: str, after: Optional[datetime] = None, tz: str = "Asia/Shanghai") -> datetime:
"""根据 cron 表达式计算下一次执行时间。
Args:
cron_expression: 标准 5 位 cron 表达式,如 "0 9 * * *"
after: 从哪个时间点开始计算,默认为当前 UTC 时间
tz: 时区名称,如 "Asia/Shanghai"cron 表达式在该时区下解释
Returns:
下次执行的 UTC datetimenaive存储时与 datetime.utcnow() 对齐)
"""
base = after or datetime.now(timezone.utc)
# 确保 base 是 timezone-aware UTCdatetime.utcnow() 返回 naive需要先 localize
if base.tzinfo is None:
base = pytz.UTC.localize(base)
else:
base = base.astimezone(pytz.UTC)
# 将 UTC 时间转换为目标时区cron 表达式在目标时区下解释
try:
target_tz = pytz.timezone(tz)
except pytz.UnknownTimeZoneError:
target_tz = pytz.UTC
base_in_tz = base.astimezone(target_tz)
# croniter 需要 naive datetime在目标时区下计算
base_naive = base_in_tz.replace(tzinfo=None)
cron = croniter(cron_expression, base_naive)
next_naive = cron.get_next(datetime)
# 将结果 localize 回目标时区,再转 UTC
next_in_tz = target_tz.localize(next_naive)
next_utc = next_in_tz.astimezone(pytz.UTC)
# 返回 naive UTC与 datetime.utcnow() 对齐)
return next_utc.replace(tzinfo=None)
def create_execution_for_schedule(db: Session, schedule) -> Optional[str]:
"""为定时任务创建 Execution 记录并投递 Celery 任务。
Args:
db: 数据库会话
schedule: AgentSchedule ORM 对象
Returns:
创建的 execution_id失败返回 None
"""
from app.models.execution import Execution
from app.models.agent import Agent
agent = db.query(Agent).filter(Agent.id == schedule.agent_id).first()
if not agent:
logger.warning("定时任务 %s 关联的 Agent %s 不存在", schedule.id, schedule.agent_id)
return None
# 创建执行记录(关联 schedule_id标记为定时任务提醒
execution = Execution(
agent_id=schedule.agent_id,
schedule_id=schedule.id,
input_data={
"USER_INPUT": f"[定时任务提醒] {schedule.input_message}",
"query": f"[定时任务提醒] {schedule.input_message}",
"message": schedule.input_message,
"is_scheduled_reminder": True,
},
status="pending",
)
db.add(execution)
db.flush() # 获取 id
try:
if agent.workflow_config and agent.workflow_config.get("nodes"):
# 有工作流配置:走完整工作流引擎
from app.tasks.workflow_tasks import execute_workflow_task
task = execute_workflow_task.delay(
str(execution.id),
f"agent_{schedule.agent_id}",
agent.workflow_config,
{
"USER_INPUT": f"[定时任务提醒] {schedule.input_message}",
"query": f"[定时任务提醒] {schedule.input_message}",
"message": schedule.input_message,
"is_scheduled_reminder": True,
},
)
else:
# 无工作流配置:走简单 Agent 异步执行(传入已创建的 execution_id
from app.tasks.agent_tasks import execute_agent_task
task = execute_agent_task.delay(
str(schedule.agent_id),
{
"USER_INPUT": f"[定时任务提醒] {schedule.input_message}",
"query": f"[定时任务提醒] {schedule.input_message}",
"message": schedule.input_message,
"is_scheduled_reminder": True,
},
execution_id=str(execution.id),
)
execution.task_id = task.id
execution.status = "running"
db.commit()
logger.info(
"定时任务 %s 已投递执行: execution=%s task=%s",
schedule.id, execution.id, task.id,
)
return str(execution.id)
except Exception as e:
execution.status = "failed"
execution.error_message = f"定时任务投递失败: {e!s}"
db.commit()
logger.warning("定时任务 %s 投递失败: %s", schedule.id, e)
return str(execution.id)
def check_and_run_due_schedules() -> int:
"""检查所有启用的定时任务,执行到期的任务。
被 Celery Beat 每分钟调用一次。
Returns:
本次触发的任务数
"""
from app.models.agent_schedule import AgentSchedule
db: Optional[Session] = None
try:
db = SessionLocal()
now = datetime.now(timezone.utc).replace(tzinfo=None) # naive UTC与 DB 存储一致
due_schedules = (
db.query(AgentSchedule)
.filter(
AgentSchedule.enabled == True, # noqa: E712
AgentSchedule.next_run_at <= now,
)
.all()
)
triggered = 0
for sched in due_schedules:
try:
# 创建执行记录
create_execution_for_schedule(db, sched)
# 更新定时任务状态
sched.last_run_at = now
sched.last_run_status = "success"
# 计算下次执行时间(使用定时任务配置的时区)
sched.next_run_at = compute_next_run(sched.cron_expression, after=now, tz=sched.timezone or "UTC")
db.commit()
triggered += 1
logger.info(
"定时任务触发完成: name=%s next_run=%s",
sched.name, sched.next_run_at,
)
except Exception as e:
logger.error("定时任务 %s 执行失败: %s", sched.id, e)
sched.last_run_at = now
sched.last_run_status = "failed"
db.commit()
return triggered
except Exception as e:
logger.error("检查定时任务失败: %s", e)
return 0
finally:
if db:
db.close()
def notify_schedule_result(db: Session, execution, status: str, error_message: Optional[str] = None) -> None:
"""如果 execution 关联了定时任务,创建通知并推送飞书消息。
Args:
db: 数据库会话
execution: Execution ORM 对象
status: "completed""failed"
error_message: 失败时的错误信息
"""
if not execution or not execution.schedule_id:
return
try:
from app.models.agent_schedule import AgentSchedule
from app.services.notification_service import create_notification
schedule = db.query(AgentSchedule).filter(AgentSchedule.id == execution.schedule_id).first()
if not schedule:
return
if status == "completed":
title = f"定时任务「{schedule.name}」执行成功"
# 优先使用 Agent 执行结果,其次使用定时任务的提醒内容
result_text = ""
if execution.output_data:
if isinstance(execution.output_data, dict):
result_text = (
execution.output_data.get("result")
or execution.output_data.get("output")
or execution.output_data.get("text")
or ""
)
elif isinstance(execution.output_data, str):
result_text = execution.output_data
if not result_text:
result_text = schedule.input_message or ""
content = result_text if result_text else "Agent 已按计划执行完成。"
else:
title = f"定时任务「{schedule.name}」执行失败"
content = f"错误信息: {error_message or '未知错误'}"
create_notification(
db,
user_id=schedule.user_id,
title=title,
content=content,
category="schedule",
ref_type="execution",
ref_id=str(execution.id),
)
db.commit()
# 如果配置了飞书 webhook发送飞书通知非阻塞失败不影响主流程
if schedule.webhook_url:
try:
from app.services.feishu_notifier import send_feishu_card
detail_link = None
try:
from app.core.config import settings
if settings.EXTERNAL_URL:
detail_link = f"{settings.EXTERNAL_URL}/executions/{execution.id}"
except Exception:
pass
send_feishu_card(
webhook_url=schedule.webhook_url,
title=title,
body=content,
status=status,
detail_link=detail_link,
)
except Exception as e:
logger.warning("飞书 webhook 通知发送失败: %s", e)
# 如果用户绑定了飞书账号,通过对应的飞书应用发送通知
try:
from app.models.user import User
from app.services.feishu_open_id_service import get_app_id_for_agent, get_open_id_for_app
schedule_user = db.query(User).filter(User.id == schedule.user_id).first()
if schedule_user:
detail_link = None
try:
from app.core.config import settings
if settings.EXTERNAL_URL:
detail_link = f"{settings.EXTERNAL_URL}/executions/{execution.id}"
except Exception:
pass
agent_id = str(execution.agent_id) if execution.agent_id else ""
app_id = get_app_id_for_agent(agent_id)
per_app_open_id = get_open_id_for_app(db, schedule.user_id, app_id) if app_id else None
send_ok = False
# 如果找到了该用户在此应用下的专属 open_id直接用对应应用发送
if per_app_open_id and app_id:
if app_id == (settings.LINGXI_APP_ID or ""):
try:
from app.services.lingxi_app_service import send_message_to_user as send_msg
send_msg(open_id=per_app_open_id, title=title, content=content, status=status, detail_link=detail_link)
send_ok = True
except Exception:
logger.info("灵犀发送失败fallback 到主飞书应用")
elif app_id == (settings.ORANGE_APP_ID or ""):
try:
from app.services.orange_app_service import send_message_to_user as send_msg
send_msg(open_id=per_app_open_id, title=title, content=content, status=status, detail_link=detail_link)
send_ok = True
except Exception:
logger.info("橙子发送失败fallback 到主飞书应用")
elif app_id == (settings.SUYAO_APP_ID or ""):
try:
from app.services.suyao_app_service import send_message_to_user as send_msg
send_msg(open_id=per_app_open_id, title=title, content=content, status=status, detail_link=detail_link)
send_ok = True
except Exception:
logger.info("苏瑶发送失败fallback 到主飞书应用")
elif app_id == (settings.TIANTIAN_APP_ID or ""):
try:
from app.services.tiantian_app_service import send_message_to_user as send_msg
send_msg(open_id=per_app_open_id, title=title, content=content, status=status, detail_link=detail_link)
send_ok = True
except Exception:
logger.info("甜甜发送失败fallback 到主飞书应用")
# Fallback: 使用主飞书应用(苹果)的 open_id 发送
if not send_ok and schedule_user.feishu_open_id:
from app.services.feishu_app_service import send_message_to_user as send_msg
send_msg(open_id=schedule_user.feishu_open_id, title=title, content=content, status=status, detail_link=detail_link)
except Exception as e:
logger.warning("飞书应用通知发送失败: %s", e)
except Exception as e:
logger.warning("创建定时任务通知失败: %s", e)