Files
aiagent/backend/app/agent_runtime/core.py

734 lines
30 KiB
Python
Raw Normal View History

"""
Agent Runtime 核心 自主 ReAct 循环
流程
1. 接收用户输入 追加到消息列表
2. 调用 LLM携带 tools schema
3. 如果 LLM 返回工具调用 执行工具 结果追加到消息列表 回到 2
4. 如果 LLM 返回文本 作为最终回答返回
5. 超过 max_iterations 强制终止
"""
from __future__ import annotations
import json
import logging
import time
from typing import Any, AsyncGenerator, Callable, Dict, List, Optional, Protocol, TypedDict
from app.agent_runtime.schemas import (
AgentConfig,
AgentResult,
AgentStep,
)
from app.agent_runtime.context import AgentContext
from app.agent_runtime.memory import AgentMemory
from app.agent_runtime.tool_manager import AgentToolManager
from app.core.exceptions import WorkflowExecutionError
logger = logging.getLogger(__name__)
class LLMCallMetrics(TypedDict, total=False):
"""一次 LLM 调用的度量数据"""
agent_id: Optional[str]
session_id: str
user_id: Optional[str]
model: str
provider: Optional[str]
prompt_tokens: int
completion_tokens: int
total_tokens: int
latency_ms: int
iteration_number: int
step_type: str # think / final
tool_name: Optional[str]
status: str # success / error
error_message: Optional[str]
# 可重试的 API 异常
_RETRYABLE_ERRORS = (
"timed out",
"timeout",
"connection error",
"temporarily unavailable",
"server disconnected",
"rate limit",
"too many requests",
"internal server error",
"service unavailable",
)
class AgentRuntime:
"""
自主 Agent 运行时
用法
runtime = AgentRuntime(config)
result = await runtime.run("帮我写个Python脚本")
"""
def __init__(
self,
config: Optional[AgentConfig] = None,
context: Optional[AgentContext] = None,
memory: Optional[AgentMemory] = None,
tool_manager: Optional[AgentToolManager] = None,
execution_logger: Optional[Any] = None,
on_tool_executed: Optional[Callable[[str], Any]] = None,
on_llm_call: Optional[Callable[[Dict[str, Any]], Any]] = None,
):
self.config = config or AgentConfig()
self.context = context or AgentContext(
system_prompt=self.config.system_prompt,
user_id=self.config.user_id,
)
self.memory = memory or AgentMemory(
scope_id=self.config.user_id or self.config.name,
max_history=self.config.memory.max_history_messages,
persist=self.config.memory.persist_to_db,
)
self.tool_manager = tool_manager or AgentToolManager(
include_tools=self.config.tools.include_tools,
exclude_tools=self.config.tools.exclude_tools,
)
self.execution_logger = execution_logger
self.on_tool_executed = on_tool_executed
self.on_llm_call = on_llm_call
self._memory_context_loaded = False
self._llm_invocations = 0
# 预算回调:供 WorkflowEngine 注入,使 Agent 内部计数计入工作流预算
# 返回 True 表示预算充足;返回 False 或抛出异常表示超限
self.on_llm_invocation: Optional[Callable[[], Any]] = None
async def run(self, user_input: str) -> AgentResult:
"""
执行 Agent 单轮对话
流程加载记忆 追加用户消息 ReAct 循环 保存记忆 返回结果
"""
max_iter = max(1, self.config.llm.max_iterations)
self.context.iteration = 0
self.context.tool_calls_made = 0
# 1. 首次运行时加载长期记忆到 system prompt
if not self._memory_context_loaded:
await self._inject_memory_context(user_input)
self._memory_context_loaded = True
# 2. 追加用户消息
self.context.add_user_message(user_input)
# 3. ReAct 循环
llm = _LLMClient(self.config.llm)
tool_schemas = self.tool_manager.get_tool_schemas()
has_tools = self.tool_manager.has_tools()
steps: List[AgentStep] = []
# 构建 LLM 调用回调(包装 on_llm_call补充上下文
llm_callback_ctx = {"step_type": "think", "tool_name": None}
def _llm_callback(metrics: Dict[str, Any]):
if self.on_llm_call:
metrics.update({
"session_id": self.context.session_id,
"user_id": self.config.user_id,
"step_type": llm_callback_ctx["step_type"],
"tool_name": llm_callback_ctx["tool_name"],
})
self.on_llm_call(metrics)
while self.context.iteration < max_iter:
self.context.iteration += 1
# 裁剪过长历史
messages = self.memory.trim_messages(self.context.messages)
# 预算检查LLM 调用次数(在调用 LLM 之前检查,避免浪费额度)
budget = self.config.budget
if self._llm_invocations >= budget.max_llm_invocations:
err = f"已超过 LLM 调用预算({budget.max_llm_invocations} 次)"
logger.warning(err)
steps.append(AgentStep(iteration=self.context.iteration, type="final", content=err))
await self.memory.save_context(user_input, err, self.context.messages)
return AgentResult(success=False, content=err, truncated=True,
iterations_used=self.context.iteration,
tool_calls_made=self.context.tool_calls_made,
steps=steps, error=err)
# 调用外部 LLM 预算回调WorkflowEngine 注入,将 Agent 的 LLM 计入工作流预算)
if self.on_llm_invocation:
try:
self.on_llm_invocation()
except Exception as e:
err = f"LLM 调用超出工作流预算: {e}"
logger.warning(err)
steps.append(AgentStep(iteration=self.context.iteration, type="final", content=err))
await self.memory.save_context(user_input, err, self.context.messages)
return AgentResult(success=False, content=err, truncated=True,
iterations_used=self.context.iteration,
tool_calls_made=self.context.tool_calls_made,
steps=steps, error=str(e))
# 调用 LLM
try:
response = await llm.chat(
messages=messages,
tools=tool_schemas if has_tools and self.context.iteration == 1 else
(tool_schemas if has_tools else None),
iteration=self.context.iteration,
on_completion=_llm_callback,
)
except Exception as e:
err_str = str(e)
logger.error("LLM 调用失败 (iteration=%s): %s", self.context.iteration, err_str)
if self.context.iteration < max_iter and self._is_retryable(err_str):
steps.append(AgentStep(
iteration=self.context.iteration,
type="tool_result",
content=f"LLM 调用失败(可重试): {err_str}",
))
continue
return AgentResult(
success=False,
content=f"LLM 调用失败: {err_str}",
iterations_used=self.context.iteration,
tool_calls_made=self.context.tool_calls_made,
error=err_str,
)
# 记录 LLM 调用次数(内部计数)
self._llm_invocations += 1
# 解析工具调用
tool_calls = self._extract_tool_calls(response)
content = self._extract_content(response)
reasoning = getattr(response, "reasoning_content", None) or (
response.get("reasoning_content") if isinstance(response, dict) else None
)
if not tool_calls:
# LLM 直接返回文本 → 结束
self.context.add_assistant_message(content)
final_text = content or "(模型未返回有效内容)"
steps.append(AgentStep(
iteration=self.context.iteration,
type="final",
content=final_text,
reasoning=reasoning,
))
# 保存记忆
await self.memory.save_context(user_input, final_text, self.context.messages)
return AgentResult(
success=True,
content=final_text,
iterations_used=self.context.iteration,
tool_calls_made=self.context.tool_calls_made,
steps=steps,
)
# 有工具调用 → 先记录 assistant 消息(含 tool_calls
self.context.add_assistant_message(content or "", tool_calls, reasoning)
# 记录思考步骤(含工具调用意图)
tc_names = [tc["function"]["name"] for tc in tool_calls]
tc_args_list = []
for tc in tool_calls:
try:
tc_args_list.append(json.loads(tc["function"].get("arguments", "{}")))
except (json.JSONDecodeError, TypeError):
tc_args_list.append({})
steps.append(AgentStep(
iteration=self.context.iteration,
type="think",
content=content or f"调用工具: {', '.join(tc_names)}",
reasoning=reasoning,
tool_name=tc_names[0] if len(tc_names) == 1 else None,
tool_input=tc_args_list[0] if len(tc_args_list) == 1 else None,
))
if self.execution_logger:
self.execution_logger.info(
f"Agent 调用 {len(tool_calls)} 个工具",
data={"tool_calls": tc_names,
"iteration": self.context.iteration},
)
# 逐一执行工具
for tc in tool_calls:
tfn = tc.get("function", {})
tname = tfn.get("name", "unknown")
tcid = tc.get("id", f"call_{self.context.iteration}_{self.context.tool_calls_made}")
try:
targs = json.loads(tfn.get("arguments", "{}"))
except (json.JSONDecodeError, TypeError):
targs = {}
logger.info("Agent 执行工具 [%s]: %s", tname, targs)
result = await self.tool_manager.execute(tname, targs)
steps.append(AgentStep(
iteration=self.context.iteration,
type="tool_result",
content=f"工具 {tname} 返回结果",
tool_name=tname,
tool_input=targs,
tool_result=result[:500] + "..." if len(result) > 500 else result,
))
self.context.add_tool_result(tcid, tname, result)
self.context.tool_calls_made += 1
# 预算检查:工具调用次数
if self.context.tool_calls_made > budget.max_tool_calls:
err = f"已超过工具调用预算({budget.max_tool_calls} 次)"
logger.warning(err)
steps.append(AgentStep(iteration=self.context.iteration, type="tool_result",
content=err, tool_name=tname))
return AgentResult(success=False, content=err, truncated=True,
iterations_used=self.context.iteration,
tool_calls_made=self.context.tool_calls_made,
steps=steps, error=err)
if self.on_tool_executed:
try:
await self.on_tool_executed(tname)
except WorkflowExecutionError:
raise
except Exception:
pass
if self.execution_logger:
preview = result[:300] + "..." if len(result) > 300 else result
self.execution_logger.info(
f"工具 {tname} 执行完成",
data={"tool_name": tname, "result_preview": preview},
)
# 达到最大迭代次数
last_content = ""
for m in reversed(self.context.messages):
if m.get("role") == "assistant" and m.get("content"):
last_content = m["content"]
break
logger.warning("Agent 达到最大迭代次数 (%s)", max_iter)
await self.memory.save_context(user_input, last_content or "(已达最大迭代次数)", self.context.messages)
if last_content:
steps.append(AgentStep(
iteration=self.context.iteration,
type="final",
content=last_content,
))
return AgentResult(
success=True,
content=last_content or "已达最大迭代次数,但模型未返回最终回答。",
truncated=True,
iterations_used=self.context.iteration,
tool_calls_made=self.context.tool_calls_made,
steps=steps,
)
async def run_stream(self, user_input: str) -> AsyncGenerator[dict, None]:
"""
流式执行 Agent 单轮对话
run() 逻辑相同但在每个关键步骤 yield SSE 事件
- think: LLM 思考中准备调用工具
- tool_call: 即将执行工具
- tool_result: 工具执行完毕
- final: 最终回答
- error: 出错/预算超限
"""
max_iter = max(1, self.config.llm.max_iterations)
self.context.iteration = 0
self.context.tool_calls_made = 0
# 1. 首次运行时加载长期记忆到 system prompt
if not self._memory_context_loaded:
await self._inject_memory_context(user_input)
self._memory_context_loaded = True
# 2. 追加用户消息
self.context.add_user_message(user_input)
# 3. ReAct 循环
llm = _LLMClient(self.config.llm)
tool_schemas = self.tool_manager.get_tool_schemas()
has_tools = self.tool_manager.has_tools()
steps: List[AgentStep] = []
llm_callback_ctx = {"step_type": "think", "tool_name": None}
def _llm_callback(metrics: Dict[str, Any]):
if self.on_llm_call:
metrics.update({
"session_id": self.context.session_id,
"user_id": self.config.user_id,
"step_type": llm_callback_ctx["step_type"],
"tool_name": llm_callback_ctx["tool_name"],
})
self.on_llm_call(metrics)
while self.context.iteration < max_iter:
self.context.iteration += 1
messages = self.memory.trim_messages(self.context.messages)
# 预算检查LLM 调用次数(在调用 LLM 之前检查,避免浪费额度)
budget = self.config.budget
if self._llm_invocations >= budget.max_llm_invocations:
err = f"已超过 LLM 调用预算({budget.max_llm_invocations} 次)"
logger.warning(err)
yield {"type": "error", "content": err, "iteration": self.context.iteration,
"truncated": True}
await self.memory.save_context(user_input, err, self.context.messages)
return
# 调用外部 LLM 预算回调WorkflowEngine 注入)
if self.on_llm_invocation:
try:
self.on_llm_invocation()
except Exception as e:
err = f"LLM 调用超出工作流预算: {e}"
logger.warning(err)
yield {"type": "error", "content": err, "iteration": self.context.iteration,
"truncated": True}
return
# 调用 LLM
try:
response = await llm.chat(
messages=messages,
tools=tool_schemas if has_tools and self.context.iteration == 1 else
(tool_schemas if has_tools else None),
iteration=self.context.iteration,
on_completion=_llm_callback,
)
except Exception as e:
err_str = str(e)
logger.error("LLM 调用失败 (iteration=%s): %s", self.context.iteration, err_str)
if self.context.iteration < max_iter and self._is_retryable(err_str):
yield {"type": "error", "content": f"LLM 调用失败(可重试): {err_str}",
"iteration": self.context.iteration}
continue
yield {"type": "error", "content": f"LLM 调用失败: {err_str}",
"iteration": self.context.iteration}
return
# 记录 LLM 调用次数(内部计数)
self._llm_invocations += 1
# 解析工具调用
tool_calls = self._extract_tool_calls(response)
content = self._extract_content(response)
reasoning = getattr(response, "reasoning_content", None) or (
response.get("reasoning_content") if isinstance(response, dict) else None
)
if not tool_calls:
# LLM 直接返回文本 → 结束
self.context.add_assistant_message(content)
final_text = content or "(模型未返回有效内容)"
yield {
"type": "final",
"content": final_text,
"reasoning": reasoning,
"iteration": self.context.iteration,
"iterations_used": self.context.iteration,
"tool_calls_made": self.context.tool_calls_made,
"session_id": self.context.session_id,
}
await self.memory.save_context(user_input, final_text, self.context.messages)
return
# 有工具调用 → 先记录 assistant 消息
self.context.add_assistant_message(content or "", tool_calls, reasoning)
# yield think 事件
tc_names = [tc["function"]["name"] for tc in tool_calls]
tc_args_list = []
for tc in tool_calls:
try:
tc_args_list.append(json.loads(tc["function"].get("arguments", "{}")))
except (json.JSONDecodeError, TypeError):
tc_args_list.append({})
yield {
"type": "think",
"content": content or f"调用工具: {', '.join(tc_names)}",
"reasoning": reasoning,
"tool_names": tc_names,
"iteration": self.context.iteration,
}
steps.append(AgentStep(
iteration=self.context.iteration,
type="think",
content=content or f"调用工具: {', '.join(tc_names)}",
reasoning=reasoning,
tool_name=tc_names[0] if len(tc_names) == 1 else None,
tool_input=tc_args_list[0] if len(tc_args_list) == 1 else None,
))
if self.execution_logger:
self.execution_logger.info(
f"Agent 调用 {len(tool_calls)} 个工具",
data={"tool_calls": tc_names, "iteration": self.context.iteration},
)
# 逐一执行工具
for tc in tool_calls:
tfn = tc.get("function", {})
tname = tfn.get("name", "unknown")
tcid = tc.get("id", f"call_{self.context.iteration}_{self.context.tool_calls_made}")
try:
targs = json.loads(tfn.get("arguments", "{}"))
except (json.JSONDecodeError, TypeError):
targs = {}
# yield tool_call 事件
yield {
"type": "tool_call",
"name": tname,
"input": targs,
"iteration": self.context.iteration,
}
logger.info("Agent 执行工具 [%s]: %s", tname, targs)
result = await self.tool_manager.execute(tname, targs)
# yield tool_result 事件
yield {
"type": "tool_result",
"name": tname,
"result": result[:500] + "..." if len(result) > 500 else result,
"iteration": self.context.iteration,
}
steps.append(AgentStep(
iteration=self.context.iteration,
type="tool_result",
content=f"工具 {tname} 返回结果",
tool_name=tname,
tool_input=targs,
tool_result=result[:500] + "..." if len(result) > 500 else result,
))
self.context.add_tool_result(tcid, tname, result)
self.context.tool_calls_made += 1
# 预算检查:工具调用次数
if self.context.tool_calls_made > budget.max_tool_calls:
err = f"已超过工具调用预算({budget.max_tool_calls} 次)"
logger.warning(err)
yield {"type": "error", "content": err, "iteration": self.context.iteration,
"truncated": True}
return
if self.on_tool_executed:
try:
await self.on_tool_executed(tname)
except WorkflowExecutionError:
raise
except Exception:
pass
if self.execution_logger:
preview = result[:300] + "..." if len(result) > 300 else result
self.execution_logger.info(
f"工具 {tname} 执行完成",
data={"tool_name": tname, "result_preview": preview},
)
# 达到最大迭代次数
last_content = ""
for m in reversed(self.context.messages):
if m.get("role") == "assistant" and m.get("content"):
last_content = m["content"]
break
logger.warning("Agent 达到最大迭代次数 (%s)", max_iter)
await self.memory.save_context(user_input, last_content or "(已达最大迭代次数)", self.context.messages)
yield {
"type": "final",
"content": last_content or "已达最大迭代次数,但模型未返回最终回答。",
"iteration": self.context.iteration,
"iterations_used": self.context.iteration,
"tool_calls_made": self.context.tool_calls_made,
"truncated": True,
"session_id": self.context.session_id,
}
async def _inject_memory_context(self, query: str = "") -> None:
"""加载长期记忆并注入 system prompt。"""
mem_text = await self.memory.initialize(query=query)
if mem_text:
enriched = (
self.config.system_prompt.rstrip("\n")
+ "\n\n"
+ mem_text
)
self.context.set_system_prompt(enriched)
logger.info("Agent 已注入长期记忆上下文")
@staticmethod
def _extract_tool_calls(response: Any) -> List[Dict[str, Any]]:
"""从 LLM 响应中提取工具调用列表。"""
if response is None:
return []
# OpenAI SDK 格式
if hasattr(response, "tool_calls") and response.tool_calls:
result = []
for tc in response.tool_calls:
result.append({
"id": tc.id,
"type": tc.type,
"function": {
"name": tc.function.name,
"arguments": tc.function.arguments,
},
})
return result
# 字典格式
if isinstance(response, dict):
tc_list = response.get("tool_calls") or []
if tc_list:
return tc_list
# 检查 content 中是否嵌入了 DSML
content = response.get("content") or ""
if "invoke" in content or "function_call" in content:
from app.services.llm_service import _parse_dsml_tool_invocations
dsml = _parse_dsml_tool_invocations(content)
if dsml:
return [
{
"id": f"dsml-{i}",
"type": "function",
"function": {
"name": inv["name"],
"arguments": json.dumps(inv["arguments"], ensure_ascii=False),
},
}
for i, inv in enumerate(dsml)
]
return []
@staticmethod
def _extract_content(response: Any) -> str:
"""从 LLM 响应中提取文本内容。"""
if response is None:
return ""
if hasattr(response, "content"):
return response.content or ""
if isinstance(response, dict):
return response.get("content") or ""
return str(response)
@staticmethod
def _is_retryable(err_str: str) -> bool:
"""判断错误是否可重试。"""
err_lower = err_str.lower()
return any(kw in err_lower for kw in _RETRYABLE_ERRORS)
class _LLMClient:
"""轻量 LLM 客户端包装,复用已有 LLMService 能力。"""
def __init__(self, config: Any):
from app.services.llm_service import llm_service
self._service = llm_service
self._config = config
async def chat(
self,
messages: List[Dict[str, Any]],
tools: Optional[List[Dict[str, Any]]] = None,
iteration: int = 1,
on_completion: Optional[Callable[[Dict[str, Any]], Any]] = None,
) -> Any:
"""
调用 LLM
优先使用 llm_service.call_openai_with_tools支持 ReAct 的多次工具调用
但为避免外层 ReAct 与内部 ReAct 冲突
- 1 使用标准 chat无内部 ReAct由外层 AgentRuntime 控制循环
- 后续轮次也使用标准 chat仅追加工具结果
"""
# 直接用 OpenAI/DeepSeek SDK 调用,由 AgentRuntime 控制循环
from openai import AsyncOpenAI
from app.core.config import settings
# 优先从配置读取,其次从 settings.env 加载),最后 os.environ
api_key = self._config.api_key or settings.OPENAI_API_KEY or ""
base_url = self._config.base_url or settings.OPENAI_BASE_URL or ""
if not api_key or api_key == "your-openai-api-key":
# 尝试 DeepSeek
api_key = self._config.api_key or settings.DEEPSEEK_API_KEY or ""
base_url = self._config.base_url or settings.DEEPSEEK_BASE_URL or "https://api.deepseek.com"
if not api_key:
raise ValueError("未配置 API Key")
client = AsyncOpenAI(api_key=api_key, base_url=base_url)
kwargs: Dict[str, Any] = {
"model": self._config.model,
"messages": messages,
"temperature": self._config.temperature,
"timeout": self._config.request_timeout,
}
if self._config.max_tokens:
kwargs["max_tokens"] = self._config.max_tokens
if self._config.extra_body:
kwargs["extra_body"] = self._config.extra_body
if tools:
kwargs["tools"] = tools
kwargs["tool_choice"] = "auto"
start_time = time.perf_counter()
try:
response = await client.chat.completions.create(**kwargs)
latency_ms = int((time.perf_counter() - start_time) * 1000)
message = response.choices[0].message
# 提取 token 用量
usage = getattr(response, "usage", None)
prompt_tokens = usage.prompt_tokens if usage else 0
completion_tokens = usage.completion_tokens if usage else 0
total_tokens = usage.total_tokens if usage else 0
# 调用完成回调
if on_completion:
on_completion({
"model": self._config.model,
"provider": self._config.provider,
"prompt_tokens": prompt_tokens or 0,
"completion_tokens": completion_tokens or 0,
"total_tokens": total_tokens or 0,
"latency_ms": latency_ms,
"iteration_number": iteration,
"status": "success",
})
return message
except Exception as e:
latency_ms = int((time.perf_counter() - start_time) * 1000)
if on_completion:
on_completion({
"model": self._config.model,
"provider": self._config.provider,
"prompt_tokens": 0,
"completion_tokens": 0,
"total_tokens": 0,
"latency_ms": latency_ms,
"iteration_number": iteration,
"status": "error",
"error_message": str(e),
})
raise