Files
aiagent/backend/app/services/agent_schedule_service.py
renjianbo 9454dee976 feat: complete remaining plan items — all 4 phases fully implemented
- Task API: add execute and retry endpoints
- Agent API: add create-main-agent endpoint and execute with graph/debate/pipeline modes
- Feishu tools: add read_messages, create_sheet, upload_file (54 builtin tools total)
- Feishu events: group @mention handling, approval callback, auto daily reporting
- Feishu app service: add send_plain_text_to_group for group chat replies
- Typed Data Ports: template variable injection {{previous.output.field}} + output schema validation
- GoalDetail.vue: Gantt timeline view + real-time progress polling (10s)
- Autonomy loop: per-goal Celery Beat scheduling via sync_autonomy_schedule_for_goal

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-08 22:36:03 +08:00

489 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Agent 定时任务服务cron 解析、执行触发、下次执行时间计算"""
from __future__ import annotations
import logging
from datetime import datetime, timezone, timedelta
from typing import Optional
import pytz
from croniter import croniter
from sqlalchemy.orm import Session
from app.core.database import SessionLocal
logger = logging.getLogger(__name__)
def compute_next_run(cron_expression: str, after: Optional[datetime] = None, tz: str = "Asia/Shanghai") -> datetime:
"""根据 cron 表达式计算下一次执行时间。
Args:
cron_expression: 标准 5 位 cron 表达式,如 "0 9 * * *"
after: 从哪个时间点开始计算,默认为当前 UTC 时间
tz: 时区名称,如 "Asia/Shanghai"cron 表达式在该时区下解释
Returns:
下次执行的 UTC datetimenaive存储时与 datetime.utcnow() 对齐)
"""
base = after or datetime.now(timezone.utc)
# 确保 base 是 timezone-aware UTCdatetime.utcnow() 返回 naive需要先 localize
if base.tzinfo is None:
base = pytz.UTC.localize(base)
else:
base = base.astimezone(pytz.UTC)
# 将 UTC 时间转换为目标时区cron 表达式在目标时区下解释
try:
target_tz = pytz.timezone(tz)
except pytz.UnknownTimeZoneError:
target_tz = pytz.UTC
base_in_tz = base.astimezone(target_tz)
# croniter 需要 naive datetime在目标时区下计算
base_naive = base_in_tz.replace(tzinfo=None)
cron = croniter(cron_expression, base_naive)
next_naive = cron.get_next(datetime)
# 将结果 localize 回目标时区,再转 UTC
next_in_tz = target_tz.localize(next_naive)
next_utc = next_in_tz.astimezone(pytz.UTC)
# 返回 naive UTC与 datetime.utcnow() 对齐)
return next_utc.replace(tzinfo=None)
def _create_execution_for_goal_schedule(db: Session, schedule) -> Optional[str]:
"""为 Goal 类型定时任务创建 Goal 并投递执行。
Args:
db: 数据库会话
schedule: AgentSchedule ORM 对象 (schedule_type == "goal")
Returns:
创建的 goal_id失败返回 None
"""
from app.services.goal_service import create_goal, update_goal
from app.tasks.goal_tasks import execute_goal_task
gc = schedule.goal_config or {}
title = gc.get("title", schedule.input_message or schedule.name)
description = gc.get("description", "")
priority = gc.get("priority", 5)
main_agent_id = schedule.agent_id or gc.get("main_agent_id")
try:
goal = create_goal(
db=db,
creator_id=schedule.user_id,
title=title,
description=description,
priority=priority,
main_agent_id=main_agent_id,
)
task = execute_goal_task.delay(str(goal.id))
update_goal(db, str(goal.id), status="active")
logger.info(
"Goal 定时任务 %s 已投递: goal_id=%s celery_task=%s",
schedule.id, goal.id, task.id,
)
return str(goal.id)
except Exception as e:
logger.error("Goal 定时任务 %s 投递失败: %s", schedule.id, e)
return None
def create_execution_for_schedule(db: Session, schedule) -> Optional[str]:
"""为定时任务创建 Execution 记录并投递 Celery 任务。
Args:
db: 数据库会话
schedule: AgentSchedule ORM 对象
Returns:
创建的 execution_id失败返回 None
"""
from app.models.execution import Execution
from app.models.agent import Agent
# Goal 类型调度:创建 Goal 并投递执行
if getattr(schedule, "schedule_type", "agent") == "goal" and schedule.goal_id:
return _create_execution_for_goal_schedule(db, schedule)
agent = db.query(Agent).filter(Agent.id == schedule.agent_id).first()
if not agent:
logger.warning("定时任务 %s 关联的 Agent %s 不存在", schedule.id, schedule.agent_id)
return None
# 创建执行记录(关联 schedule_id标记为定时任务提醒
execution = Execution(
agent_id=schedule.agent_id,
schedule_id=schedule.id,
input_data={
"USER_INPUT": f"[定时任务提醒] {schedule.input_message}",
"query": f"[定时任务提醒] {schedule.input_message}",
"message": schedule.input_message,
"is_scheduled_reminder": True,
},
status="pending",
)
db.add(execution)
db.flush() # 获取 id
try:
if agent.workflow_config and agent.workflow_config.get("nodes"):
# 有工作流配置:走完整工作流引擎
from app.tasks.workflow_tasks import execute_workflow_task
task = execute_workflow_task.delay(
str(execution.id),
f"agent_{schedule.agent_id}",
agent.workflow_config,
{
"USER_INPUT": f"[定时任务提醒] {schedule.input_message}",
"query": f"[定时任务提醒] {schedule.input_message}",
"message": schedule.input_message,
"is_scheduled_reminder": True,
},
)
else:
# 无工作流配置:走简单 Agent 异步执行(传入已创建的 execution_id
from app.tasks.agent_tasks import execute_agent_task
task = execute_agent_task.delay(
str(schedule.agent_id),
{
"USER_INPUT": f"[定时任务提醒] {schedule.input_message}",
"query": f"[定时任务提醒] {schedule.input_message}",
"message": schedule.input_message,
"is_scheduled_reminder": True,
},
execution_id=str(execution.id),
)
execution.task_id = task.id
execution.status = "running"
db.commit()
logger.info(
"定时任务 %s 已投递执行: execution=%s task=%s",
schedule.id, execution.id, task.id,
)
return str(execution.id)
except Exception as e:
execution.status = "failed"
execution.error_message = f"定时任务投递失败: {e!s}"
db.commit()
logger.warning("定时任务 %s 投递失败: %s", schedule.id, e)
return str(execution.id)
def check_and_run_due_schedules() -> int:
"""检查所有启用的定时任务,执行到期的任务。
被 Celery Beat 每分钟调用一次。
Returns:
本次触发的任务数
"""
from app.models.agent_schedule import AgentSchedule
db: Optional[Session] = None
try:
db = SessionLocal()
now = datetime.now(timezone.utc).replace(tzinfo=None) # naive UTC与 DB 存储一致
due_schedules = (
db.query(AgentSchedule)
.filter(
AgentSchedule.enabled == True, # noqa: E712
AgentSchedule.next_run_at <= now,
)
.all()
)
triggered = 0
for sched in due_schedules:
try:
# 创建执行记录
create_execution_for_schedule(db, sched)
# 更新定时任务状态
sched.last_run_at = now
sched.last_run_status = "success"
# 计算下次执行时间(使用定时任务配置的时区)
sched.next_run_at = compute_next_run(sched.cron_expression, after=now, tz=sched.timezone or "UTC")
db.commit()
triggered += 1
logger.info(
"定时任务触发完成: name=%s next_run=%s",
sched.name, sched.next_run_at,
)
except Exception as e:
logger.error("定时任务 %s 执行失败: %s", sched.id, e)
sched.last_run_at = now
sched.last_run_status = "failed"
db.commit()
return triggered + check_and_run_autonomy_ticks()
except Exception as e:
logger.error("检查定时任务失败: %s", e)
return 0
finally:
if db:
db.close()
def sync_autonomy_schedule_for_goal(db: Session, goal_id: str) -> Optional[str]:
"""根据 Goal 的 autonomy_config.check_interval_minutes 创建/更新 Celery Beat 调度。
当 Goal 状态为 active 且有有效 check_interval 时,确保存在对应的定时调度;
当 Goal 状态非 active 时,删除对应的定时调度。
Args:
db: 数据库会话
goal_id: Goal ID
Returns:
创建的 schedule_id不适用返回 None
"""
from app.models.goal import Goal
from app.models.agent_schedule import AgentSchedule
goal = db.query(Goal).filter(Goal.id == goal_id).first()
if not goal:
return None
# 删除现有的自主循环调度
db.query(AgentSchedule).filter(
AgentSchedule.schedule_type == "goal_autonomy",
AgentSchedule.goal_id == goal_id,
).delete()
if goal.status != "active":
db.commit()
logger.info("Goal %s 非活跃状态,已移除自主循环调度", goal_id)
return None
ac = goal.autonomy_config or {}
interval_minutes = int(ac.get("check_interval_minutes", 0) or 0)
if interval_minutes <= 0:
db.commit()
return None
# 构造 cron 表达式(每 N 分钟)
if interval_minutes < 60:
cron_expr = f"*/{interval_minutes} * * * *"
else:
hours = interval_minutes // 60
cron_expr = f"0 */{hours} * * *"
tz = ac.get("timezone", "Asia/Shanghai")
now = datetime.now(timezone.utc).replace(tzinfo=None)
next_run = compute_next_run(cron_expr, after=now, tz=tz)
schedule = AgentSchedule(
agent_id=goal.main_agent_id,
schedule_type="goal_autonomy",
goal_id=goal_id,
goal_config={
"title": goal.title,
"description": goal.description or "",
"priority": goal.priority,
"main_agent_id": goal.main_agent_id,
"autonomy_config": ac,
},
name=f"自主循环: {goal.title}",
cron_expression=cron_expr,
input_message=f"Autonomy tick for goal: {goal.title}",
timezone=tz,
enabled=True,
next_run_at=next_run,
user_id=goal.creator_id,
)
db.add(schedule)
db.commit()
logger.info(
"Goal %s 自主循环调度已创建: schedule=%s cron=%s interval=%dmin",
goal_id, schedule.id, cron_expr, interval_minutes,
)
return str(schedule.id)
def check_and_run_autonomy_ticks() -> int:
"""检查所有到期的 Goal 自主循环调度,触发 autonomy_tick。
Returns:
本次触发的 Goal 数量
"""
from app.models.agent_schedule import AgentSchedule
db: Optional[Session] = None
try:
db = SessionLocal()
now = datetime.now(timezone.utc).replace(tzinfo=None)
due_schedules = (
db.query(AgentSchedule)
.filter(
AgentSchedule.schedule_type == "goal_autonomy",
AgentSchedule.enabled == True,
AgentSchedule.next_run_at <= now,
)
.all()
)
triggered = 0
for sched in due_schedules:
if not sched.goal_id:
continue
try:
from app.tasks.goal_tasks import autonomy_tick_task
autonomy_tick_task.delay(str(sched.goal_id))
sched.last_run_at = now
sched.last_run_status = "success"
sched.next_run_at = compute_next_run(
sched.cron_expression, after=now, tz=sched.timezone or "UTC"
)
db.commit()
triggered += 1
logger.info("自主循环触发: goal=%s schedule=%s", sched.goal_id, sched.id)
except Exception as e:
logger.error("自主循环触发失败: goal=%s error=%s", sched.goal_id, e)
sched.last_run_at = now
sched.last_run_status = "failed"
db.commit()
return triggered
except Exception as e:
logger.error("检查自主循环调度失败: %s", e)
return 0
finally:
if db:
db.close()
def notify_schedule_result(db: Session, execution, status: str, error_message: Optional[str] = None) -> None:
"""如果 execution 关联了定时任务,创建通知并推送飞书消息。
Args:
db: 数据库会话
execution: Execution ORM 对象
status: "completed""failed"
error_message: 失败时的错误信息
"""
if not execution or not execution.schedule_id:
return
try:
from app.models.agent_schedule import AgentSchedule
from app.services.notification_service import create_notification
schedule = db.query(AgentSchedule).filter(AgentSchedule.id == execution.schedule_id).first()
if not schedule:
return
if status == "completed":
title = f"定时任务「{schedule.name}」执行成功"
# 优先使用 Agent 执行结果,其次使用定时任务的提醒内容
result_text = ""
if execution.output_data:
if isinstance(execution.output_data, dict):
result_text = (
execution.output_data.get("result")
or execution.output_data.get("output")
or execution.output_data.get("text")
or ""
)
elif isinstance(execution.output_data, str):
result_text = execution.output_data
if not result_text:
result_text = schedule.input_message or ""
content = result_text if result_text else "Agent 已按计划执行完成。"
else:
title = f"定时任务「{schedule.name}」执行失败"
content = f"错误信息: {error_message or '未知错误'}"
create_notification(
db,
user_id=schedule.user_id,
title=title,
content=content,
category="schedule",
ref_type="execution",
ref_id=str(execution.id),
)
db.commit()
# 如果配置了飞书 webhook发送飞书通知非阻塞失败不影响主流程
if schedule.webhook_url:
try:
from app.services.feishu_notifier import send_feishu_card
detail_link = None
try:
from app.core.config import settings
if settings.EXTERNAL_URL:
detail_link = f"{settings.EXTERNAL_URL}/executions/{execution.id}"
except Exception:
pass
send_feishu_card(
webhook_url=schedule.webhook_url,
title=title,
body=content,
status=status,
detail_link=detail_link,
)
except Exception as e:
logger.warning("飞书 webhook 通知发送失败: %s", e)
# 如果用户绑定了飞书账号,通过对应的飞书应用发送通知
try:
from app.models.user import User
from app.services.feishu_open_id_service import get_app_id_for_agent, get_open_id_for_app
schedule_user = db.query(User).filter(User.id == schedule.user_id).first()
if schedule_user:
detail_link = None
try:
from app.core.config import settings
if settings.EXTERNAL_URL:
detail_link = f"{settings.EXTERNAL_URL}/executions/{execution.id}"
except Exception:
pass
agent_id = str(execution.agent_id) if execution.agent_id else ""
app_id = get_app_id_for_agent(agent_id)
per_app_open_id = get_open_id_for_app(db, schedule.user_id, app_id) if app_id else None
send_ok = False
# 如果找到了该用户在此应用下的专属 open_id直接用对应应用发送
if per_app_open_id and app_id:
if app_id == (settings.LINGXI_APP_ID or ""):
try:
from app.services.lingxi_app_service import send_message_to_user as send_msg
send_msg(open_id=per_app_open_id, title=title, content=content, status=status, detail_link=detail_link)
send_ok = True
except Exception:
logger.info("灵犀发送失败fallback 到主飞书应用")
elif app_id == (settings.ORANGE_APP_ID or ""):
try:
from app.services.orange_app_service import send_message_to_user as send_msg
send_msg(open_id=per_app_open_id, title=title, content=content, status=status, detail_link=detail_link)
send_ok = True
except Exception:
logger.info("橙子发送失败fallback 到主飞书应用")
elif app_id == (settings.SUYAO_APP_ID or ""):
try:
from app.services.suyao_app_service import send_message_to_user as send_msg
send_msg(open_id=per_app_open_id, title=title, content=content, status=status, detail_link=detail_link)
send_ok = True
except Exception:
logger.info("苏瑶发送失败fallback 到主飞书应用")
elif app_id == (settings.TIANTIAN_APP_ID or ""):
try:
from app.services.tiantian_app_service import send_message_to_user as send_msg
send_msg(open_id=per_app_open_id, title=title, content=content, status=status, detail_link=detail_link)
send_ok = True
except Exception:
logger.info("甜甜发送失败fallback 到主飞书应用")
# Fallback: 使用主飞书应用(苹果)的 open_id 发送
if not send_ok and schedule_user.feishu_open_id:
from app.services.feishu_app_service import send_message_to_user as send_msg
send_msg(open_id=schedule_user.feishu_open_id, title=title, content=content, status=status, detail_link=detail_link)
except Exception as e:
logger.warning("飞书应用通知发送失败: %s", e)
except Exception as e:
logger.warning("创建定时任务通知失败: %s", e)