Files
aiagent/backend/app/tasks/agent_tasks.py
renjianbo 0606137d57 fix: schedule timezone bug + missing notifications + celery beat startup
- Timezone: compute_next_run now correctly interprets cron in the schedule's
  configured timezone (e.g., "0 8 * * *" with Asia/Shanghai = 8AM Beijing, not UTC)
- Notifications: agent_tasks now reuses pre-created execution records and calls
  notify_schedule_result on completion, so non-workflow agent schedules get
  DB notifications + Feishu webhook + Feishu app messages
- Duplicate execution: execute_agent_task accepts optional execution_id to reuse
  the record created by schedule_service instead of creating a second one
- Celery Beat: added to restart_backend_celery.ps1, stop_aiagent.ps1, and
  docker-compose.dev.yml; fixed repo-root path resolution in all PS1 scripts

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-05 08:57:00 +08:00

199 lines
7.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Agent 异步执行任务 — Celery 任务,支持定时调度和手动触发。
"""
from celery import Task
from app.core.tools_bootstrap import ensure_builtin_tools_registered
ensure_builtin_tools_registered()
from app.core.celery_app import celery_app
from app.core.database import SessionLocal
from app.agent_runtime.core import AgentRuntime
from app.agent_runtime.schemas import (
AgentConfig,
AgentLLMConfig,
AgentToolConfig,
AgentMemoryConfig,
AgentBudgetConfig,
)
from app.models.agent import Agent
from app.models.execution import Execution
import asyncio
import logging
import time
from typing import Optional
logger = logging.getLogger(__name__)
def _build_agent_config_from_db(agent: Agent) -> AgentConfig:
"""从 DB Agent 记录构建 AgentConfig。"""
wf = agent.workflow_config or {}
nodes = wf.get("nodes", [])
edges = wf.get("edges", [])
# 从工作流节点中查找 start 节点获取 system_prompt
system_prompt = "你是一个有用的AI助手。"
tools_include = []
tools_exclude = []
model_name = "gpt-4o-mini"
provider = "openai"
temperature = 0.7
max_iterations = 10
for node in nodes:
nd = node.get("data", {}) if isinstance(node, dict) else {}
node_type = node.get("type", "") if isinstance(node, dict) else ""
if node_type == "start":
system_prompt = nd.get("system_prompt", system_prompt)
elif node_type == "agent":
tools_include = nd.get("tools", tools_include)
tools_exclude = nd.get("exclude_tools", tools_exclude)
model_name = nd.get("model", model_name)
provider = nd.get("provider", provider)
temperature = float(nd.get("temperature", temperature))
max_iterations = int(nd.get("max_iterations", max_iterations))
return AgentConfig(
name=agent.name,
system_prompt=system_prompt,
user_id=str(agent.user_id) if agent.user_id else None,
llm=AgentLLMConfig(
provider=provider,
model=model_name,
temperature=temperature,
max_iterations=max_iterations,
),
tools=AgentToolConfig(
include_tools=tools_include if tools_include else [],
exclude_tools=tools_exclude if tools_exclude else [],
),
memory=AgentMemoryConfig(
enabled=True,
persist_to_db=True,
learning_enabled=True,
),
budget=AgentBudgetConfig(),
)
@celery_app.task(bind=True)
def execute_agent_task(self, agent_id: str, input_data: dict, execution_id: Optional[str] = None):
"""异步执行 Agent 任务。
由定时调度 (check_agent_schedules_task) 或手动 API 触发。
如果传入 execution_id由 schedule_service 预创建),则复用该记录;
否则创建新的 Execution 记录。运行 Agent 后更新结果并发送通知。
Args:
agent_id: Agent ID
input_data: 输入数据,至少包含 "message" 字段
execution_id: 可选,预创建的 execution 记录 ID定时任务场景
"""
from app.services.agent_schedule_service import notify_schedule_result
db = SessionLocal()
start_time = time.time()
try:
agent = db.query(Agent).filter(Agent.id == agent_id).first()
if not agent:
return {"status": "error", "detail": f"Agent {agent_id} 不存在"}
user_message = input_data.get("message") or input_data.get("query") or ""
if not user_message:
return {"status": "error", "detail": "缺少 message 输入"}
# 复用预创建的 execution 记录,或新建
if execution_id:
execution = db.query(Execution).filter(Execution.id == execution_id).first()
if not execution:
return {"status": "error", "detail": f"Execution {execution_id} 不存在"}
execution.status = "running"
db.commit()
else:
execution = Execution(
agent_id=agent_id,
input_data=input_data,
status="running",
)
db.add(execution)
db.flush()
# 更新 Celery 任务状态
self.update_state(
state="PROGRESS",
meta={
"execution_id": str(execution.id),
"agent_id": agent_id,
"progress": 0,
"status": "running",
},
)
# 构建配置并执行
config = _build_agent_config_from_db(agent)
runtime = AgentRuntime(config)
try:
result = asyncio.run(runtime.run(user_message))
execution_time = int((time.time() - start_time) * 1000)
if result.success:
execution.status = "completed"
execution.output_data = {
"content": result.content,
"iterations_used": result.iterations_used,
"tool_calls_made": result.tool_calls_made,
}
execution.execution_time = execution_time
db.commit()
# 定时任务结果通知
notify_schedule_result(db, execution, "completed")
logger.info(
"Agent 异步执行完成: agent=%s execution=%s time=%dms",
agent_id, execution.id, execution_time,
)
return {
"status": "completed",
"execution_id": str(execution.id),
"content": result.content,
"iterations_used": result.iterations_used,
"tool_calls_made": result.tool_calls_made,
"execution_time": execution_time,
}
else:
execution.status = "failed"
execution.error_message = result.error or "Agent 执行返回失败"
execution.execution_time = int((time.time() - start_time) * 1000)
db.commit()
# 定时任务失败通知
notify_schedule_result(db, execution, "failed", error_message=execution.error_message)
return {
"status": "failed",
"execution_id": str(execution.id),
"error": result.error,
}
except Exception as run_e:
execution_time = int((time.time() - start_time) * 1000)
execution.status = "failed"
execution.error_message = f"Agent 执行异常: {run_e!s}"
execution.execution_time = execution_time
db.commit()
# 定时任务失败通知
notify_schedule_result(db, execution, "failed", error_message=execution.error_message)
logger.error("Agent 异步执行异常: agent=%s error=%s", agent_id, run_e)
raise
except Exception as e:
logger.error("execute_agent_task 失败: agent=%s error=%s", agent_id, e)
raise
finally:
db.close()