- Timezone: compute_next_run now correctly interprets cron in the schedule's configured timezone (e.g., "0 8 * * *" with Asia/Shanghai = 8AM Beijing, not UTC) - Notifications: agent_tasks now reuses pre-created execution records and calls notify_schedule_result on completion, so non-workflow agent schedules get DB notifications + Feishu webhook + Feishu app messages - Duplicate execution: execute_agent_task accepts optional execution_id to reuse the record created by schedule_service instead of creating a second one - Celery Beat: added to restart_backend_celery.ps1, stop_aiagent.ps1, and docker-compose.dev.yml; fixed repo-root path resolution in all PS1 scripts Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
199 lines
7.0 KiB
Python
199 lines
7.0 KiB
Python
"""
|
||
Agent 异步执行任务 — Celery 任务,支持定时调度和手动触发。
|
||
"""
|
||
from celery import Task
|
||
from app.core.tools_bootstrap import ensure_builtin_tools_registered
|
||
|
||
ensure_builtin_tools_registered()
|
||
|
||
from app.core.celery_app import celery_app
|
||
from app.core.database import SessionLocal
|
||
from app.agent_runtime.core import AgentRuntime
|
||
from app.agent_runtime.schemas import (
|
||
AgentConfig,
|
||
AgentLLMConfig,
|
||
AgentToolConfig,
|
||
AgentMemoryConfig,
|
||
AgentBudgetConfig,
|
||
)
|
||
from app.models.agent import Agent
|
||
from app.models.execution import Execution
|
||
import asyncio
|
||
import logging
|
||
import time
|
||
from typing import Optional
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
def _build_agent_config_from_db(agent: Agent) -> AgentConfig:
|
||
"""从 DB Agent 记录构建 AgentConfig。"""
|
||
wf = agent.workflow_config or {}
|
||
nodes = wf.get("nodes", [])
|
||
edges = wf.get("edges", [])
|
||
|
||
# 从工作流节点中查找 start 节点获取 system_prompt
|
||
system_prompt = "你是一个有用的AI助手。"
|
||
tools_include = []
|
||
tools_exclude = []
|
||
model_name = "gpt-4o-mini"
|
||
provider = "openai"
|
||
temperature = 0.7
|
||
max_iterations = 10
|
||
|
||
for node in nodes:
|
||
nd = node.get("data", {}) if isinstance(node, dict) else {}
|
||
node_type = node.get("type", "") if isinstance(node, dict) else ""
|
||
if node_type == "start":
|
||
system_prompt = nd.get("system_prompt", system_prompt)
|
||
elif node_type == "agent":
|
||
tools_include = nd.get("tools", tools_include)
|
||
tools_exclude = nd.get("exclude_tools", tools_exclude)
|
||
model_name = nd.get("model", model_name)
|
||
provider = nd.get("provider", provider)
|
||
temperature = float(nd.get("temperature", temperature))
|
||
max_iterations = int(nd.get("max_iterations", max_iterations))
|
||
|
||
return AgentConfig(
|
||
name=agent.name,
|
||
system_prompt=system_prompt,
|
||
user_id=str(agent.user_id) if agent.user_id else None,
|
||
llm=AgentLLMConfig(
|
||
provider=provider,
|
||
model=model_name,
|
||
temperature=temperature,
|
||
max_iterations=max_iterations,
|
||
),
|
||
tools=AgentToolConfig(
|
||
include_tools=tools_include if tools_include else [],
|
||
exclude_tools=tools_exclude if tools_exclude else [],
|
||
),
|
||
memory=AgentMemoryConfig(
|
||
enabled=True,
|
||
persist_to_db=True,
|
||
learning_enabled=True,
|
||
),
|
||
budget=AgentBudgetConfig(),
|
||
)
|
||
|
||
|
||
@celery_app.task(bind=True)
|
||
def execute_agent_task(self, agent_id: str, input_data: dict, execution_id: Optional[str] = None):
|
||
"""异步执行 Agent 任务。
|
||
|
||
由定时调度 (check_agent_schedules_task) 或手动 API 触发。
|
||
如果传入 execution_id(由 schedule_service 预创建),则复用该记录;
|
||
否则创建新的 Execution 记录。运行 Agent 后更新结果并发送通知。
|
||
|
||
Args:
|
||
agent_id: Agent ID
|
||
input_data: 输入数据,至少包含 "message" 字段
|
||
execution_id: 可选,预创建的 execution 记录 ID(定时任务场景)
|
||
"""
|
||
from app.services.agent_schedule_service import notify_schedule_result
|
||
|
||
db = SessionLocal()
|
||
start_time = time.time()
|
||
|
||
try:
|
||
agent = db.query(Agent).filter(Agent.id == agent_id).first()
|
||
if not agent:
|
||
return {"status": "error", "detail": f"Agent {agent_id} 不存在"}
|
||
|
||
user_message = input_data.get("message") or input_data.get("query") or ""
|
||
if not user_message:
|
||
return {"status": "error", "detail": "缺少 message 输入"}
|
||
|
||
# 复用预创建的 execution 记录,或新建
|
||
if execution_id:
|
||
execution = db.query(Execution).filter(Execution.id == execution_id).first()
|
||
if not execution:
|
||
return {"status": "error", "detail": f"Execution {execution_id} 不存在"}
|
||
execution.status = "running"
|
||
db.commit()
|
||
else:
|
||
execution = Execution(
|
||
agent_id=agent_id,
|
||
input_data=input_data,
|
||
status="running",
|
||
)
|
||
db.add(execution)
|
||
db.flush()
|
||
|
||
# 更新 Celery 任务状态
|
||
self.update_state(
|
||
state="PROGRESS",
|
||
meta={
|
||
"execution_id": str(execution.id),
|
||
"agent_id": agent_id,
|
||
"progress": 0,
|
||
"status": "running",
|
||
},
|
||
)
|
||
|
||
# 构建配置并执行
|
||
config = _build_agent_config_from_db(agent)
|
||
runtime = AgentRuntime(config)
|
||
|
||
try:
|
||
result = asyncio.run(runtime.run(user_message))
|
||
execution_time = int((time.time() - start_time) * 1000)
|
||
|
||
if result.success:
|
||
execution.status = "completed"
|
||
execution.output_data = {
|
||
"content": result.content,
|
||
"iterations_used": result.iterations_used,
|
||
"tool_calls_made": result.tool_calls_made,
|
||
}
|
||
execution.execution_time = execution_time
|
||
db.commit()
|
||
|
||
# 定时任务结果通知
|
||
notify_schedule_result(db, execution, "completed")
|
||
|
||
logger.info(
|
||
"Agent 异步执行完成: agent=%s execution=%s time=%dms",
|
||
agent_id, execution.id, execution_time,
|
||
)
|
||
return {
|
||
"status": "completed",
|
||
"execution_id": str(execution.id),
|
||
"content": result.content,
|
||
"iterations_used": result.iterations_used,
|
||
"tool_calls_made": result.tool_calls_made,
|
||
"execution_time": execution_time,
|
||
}
|
||
else:
|
||
execution.status = "failed"
|
||
execution.error_message = result.error or "Agent 执行返回失败"
|
||
execution.execution_time = int((time.time() - start_time) * 1000)
|
||
db.commit()
|
||
|
||
# 定时任务失败通知
|
||
notify_schedule_result(db, execution, "failed", error_message=execution.error_message)
|
||
|
||
return {
|
||
"status": "failed",
|
||
"execution_id": str(execution.id),
|
||
"error": result.error,
|
||
}
|
||
except Exception as run_e:
|
||
execution_time = int((time.time() - start_time) * 1000)
|
||
execution.status = "failed"
|
||
execution.error_message = f"Agent 执行异常: {run_e!s}"
|
||
execution.execution_time = execution_time
|
||
db.commit()
|
||
|
||
# 定时任务失败通知
|
||
notify_schedule_result(db, execution, "failed", error_message=execution.error_message)
|
||
|
||
logger.error("Agent 异步执行异常: agent=%s error=%s", agent_id, run_e)
|
||
raise
|
||
|
||
except Exception as e:
|
||
logger.error("execute_agent_task 失败: agent=%s error=%s", agent_id, e)
|
||
raise
|
||
finally:
|
||
db.close()
|