""" Agent 异步执行任务 — Celery 任务,支持定时调度和手动触发。 """ from celery import Task from app.core.tools_bootstrap import ensure_builtin_tools_registered ensure_builtin_tools_registered() from app.core.celery_app import celery_app from app.core.database import SessionLocal from app.agent_runtime.core import AgentRuntime from app.agent_runtime.schemas import ( AgentConfig, AgentLLMConfig, AgentToolConfig, AgentMemoryConfig, AgentBudgetConfig, ) from app.models.agent import Agent from app.models.execution import Execution import asyncio import logging import time from typing import Optional logger = logging.getLogger(__name__) def _build_agent_config_from_db(agent: Agent) -> AgentConfig: """从 DB Agent 记录构建 AgentConfig。""" wf = agent.workflow_config or {} nodes = wf.get("nodes", []) edges = wf.get("edges", []) # 从工作流节点中查找 start 节点获取 system_prompt system_prompt = "你是一个有用的AI助手。" tools_include = [] tools_exclude = [] model_name = "gpt-4o-mini" provider = "openai" temperature = 0.7 max_iterations = 10 for node in nodes: nd = node.get("data", {}) if isinstance(node, dict) else {} node_type = node.get("type", "") if isinstance(node, dict) else "" if node_type == "start": system_prompt = nd.get("system_prompt", system_prompt) elif node_type == "agent": tools_include = nd.get("tools", tools_include) tools_exclude = nd.get("exclude_tools", tools_exclude) model_name = nd.get("model", model_name) provider = nd.get("provider", provider) temperature = float(nd.get("temperature", temperature)) max_iterations = int(nd.get("max_iterations", max_iterations)) return AgentConfig( name=agent.name, system_prompt=system_prompt, user_id=str(agent.user_id) if agent.user_id else None, llm=AgentLLMConfig( provider=provider, model=model_name, temperature=temperature, max_iterations=max_iterations, ), tools=AgentToolConfig( include_tools=tools_include if tools_include else [], exclude_tools=tools_exclude if tools_exclude else [], ), memory=AgentMemoryConfig( enabled=True, persist_to_db=True, learning_enabled=True, ), budget=AgentBudgetConfig(), ) @celery_app.task(bind=True) def execute_agent_task(self, agent_id: str, input_data: dict, execution_id: Optional[str] = None): """异步执行 Agent 任务。 由定时调度 (check_agent_schedules_task) 或手动 API 触发。 如果传入 execution_id(由 schedule_service 预创建),则复用该记录; 否则创建新的 Execution 记录。运行 Agent 后更新结果并发送通知。 Args: agent_id: Agent ID input_data: 输入数据,至少包含 "message" 字段 execution_id: 可选,预创建的 execution 记录 ID(定时任务场景) """ from app.services.agent_schedule_service import notify_schedule_result db = SessionLocal() start_time = time.time() try: agent = db.query(Agent).filter(Agent.id == agent_id).first() if not agent: return {"status": "error", "detail": f"Agent {agent_id} 不存在"} user_message = input_data.get("message") or input_data.get("query") or "" if not user_message: return {"status": "error", "detail": "缺少 message 输入"} # 复用预创建的 execution 记录,或新建 if execution_id: execution = db.query(Execution).filter(Execution.id == execution_id).first() if not execution: return {"status": "error", "detail": f"Execution {execution_id} 不存在"} execution.status = "running" db.commit() else: execution = Execution( agent_id=agent_id, input_data=input_data, status="running", ) db.add(execution) db.flush() # 更新 Celery 任务状态 self.update_state( state="PROGRESS", meta={ "execution_id": str(execution.id), "agent_id": agent_id, "progress": 0, "status": "running", }, ) # 构建配置并执行 config = _build_agent_config_from_db(agent) runtime = AgentRuntime(config) try: result = asyncio.run(runtime.run(user_message)) execution_time = int((time.time() - start_time) * 1000) if result.success: execution.status = "completed" execution.output_data = { "content": result.content, "iterations_used": result.iterations_used, "tool_calls_made": result.tool_calls_made, } execution.execution_time = execution_time db.commit() # 定时任务结果通知 notify_schedule_result(db, execution, "completed") logger.info( "Agent 异步执行完成: agent=%s execution=%s time=%dms", agent_id, execution.id, execution_time, ) return { "status": "completed", "execution_id": str(execution.id), "content": result.content, "iterations_used": result.iterations_used, "tool_calls_made": result.tool_calls_made, "execution_time": execution_time, } else: execution.status = "failed" execution.error_message = result.error or "Agent 执行返回失败" execution.execution_time = int((time.time() - start_time) * 1000) db.commit() # 定时任务失败通知 notify_schedule_result(db, execution, "failed", error_message=execution.error_message) return { "status": "failed", "execution_id": str(execution.id), "error": result.error, } except Exception as run_e: execution_time = int((time.time() - start_time) * 1000) execution.status = "failed" execution.error_message = f"Agent 执行异常: {run_e!s}" execution.execution_time = execution_time db.commit() # 定时任务失败通知 notify_schedule_result(db, execution, "failed", error_message=execution.error_message) logger.error("Agent 异步执行异常: agent=%s error=%s", agent_id, run_e) raise except Exception as e: logger.error("execute_agent_task 失败: agent=%s error=%s", agent_id, e) raise finally: db.close()