Files
aiagent/backend/app/agent_runtime/workflow_integration.py
renjianbo eabf90c496 feat: add AI学习助手 agent (KG+RAG ideal) and renshenguo feishu bot
- Add AI学习助手 agent creation script with all 39 tools, 3-layer KG+RAG memory
- Add renshenguo (人参果) feishu bot integration (app_service + ws_handler)
- Register renshenguo WS client in main.py startup
- Add RENSHENGUO_APP_ID / RENSHENGUO_APP_SECRET / RENSHENGUO_AGENT_ID config
- Reorganize docs from root into docs/ subdirectories
- Move startup scripts to scripts/startup/
- Various backend optimizations and tool improvements

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-06 01:37:13 +08:00

308 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Agent Runtime ⇄ WorkflowEngine 桥接。
让 workflow_engine.execute_node() 通过寥寥几行调用 Agent Runtime。
"""
from __future__ import annotations
import logging
from typing import Any, Dict, Optional
from app.agent_runtime.core import AgentRuntime
from app.agent_runtime.schemas import (
AgentConfig,
AgentLLMConfig,
AgentToolConfig,
AgentBudgetConfig,
AgentMemoryConfig,
)
logger = logging.getLogger(__name__)
async def run_agent_node(
node_data: Dict[str, Any],
input_data: Dict[str, Any],
execution_logger: Optional[Any] = None,
user_id: Optional[str] = None,
on_tool_executed: Optional[Any] = None,
on_llm_invocation: Optional[Any] = None,
budget_limits: Optional[Dict[str, int]] = None,
) -> Dict[str, Any]:
"""
在工作流中执行 Agent 节点。
node_data 支持的字段:
system_prompt — Agent 人格/指令(支持 {{variable}} 模板)
tools — 可选工具白名单,默认全部
exclude_tools — 可选工具黑名单
model — 模型名称
provider — 提供商openai/deepseek
temperature — 温度
max_iterations — ReAct 最大步数
memory — 是否启用长期记忆
input_data 中的 "query""input" 字段作为用户输入。
"""
# 1. 解析配置
query = (
input_data.get("query")
or input_data.get("input")
or input_data.get("text", "")
)
if not isinstance(query, str):
query = str(query) if query else ""
if not query:
return {"output": "错误Agent 节点未收到用户输入", "status": "error"}
# 2. 解析 system_prompt支持模板变量
raw_prompt = node_data.get("system_prompt", "你是一个有用的AI助手。")
try:
formatted_prompt = raw_prompt.format(**input_data)
except (KeyError, ValueError):
formatted_prompt = raw_prompt
# 3. 构建 Agent 配置
llm_config = AgentLLMConfig(
provider=node_data.get("provider", "openai"),
model=node_data.get("model", "gpt-4o-mini"),
temperature=float(node_data.get("temperature", 0.7)),
max_iterations=int(node_data.get("max_iterations", 10)),
)
# 允许节点内联 api_key/base_url
if node_data.get("api_key"):
llm_config.api_key = node_data["api_key"]
if node_data.get("base_url"):
llm_config.base_url = node_data["base_url"]
# 3a. 构建预算配置(接收工作流级预算限制)
budget = AgentBudgetConfig()
if budget_limits:
if "max_llm_invocations" in budget_limits:
budget.max_llm_invocations = max(1, int(budget_limits["max_llm_invocations"]))
if "max_tool_calls" in budget_limits:
budget.max_tool_calls = max(1, int(budget_limits["max_tool_calls"]))
# 构建记忆配置(从 node_data 读取完整字段,兼容简化配置)
mem_enabled = bool(node_data.get("memory", True))
memory_config = AgentMemoryConfig(
enabled=mem_enabled,
max_history_messages=int(node_data.get("memory_max_history", 20)),
persist_to_db=bool(node_data.get("memory_persist", mem_enabled)),
vector_memory_enabled=bool(node_data.get("memory_vector_enabled", True)),
vector_memory_top_k=int(node_data.get("memory_vector_top_k", 5)),
learning_enabled=bool(node_data.get("memory_learning", True)),
)
# 构建工具审批配置
tool_config = AgentToolConfig(
include_tools=node_data.get("tools") or [],
exclude_tools=node_data.get("exclude_tools") or [],
require_approval=node_data.get("require_approval") or [],
approval_timeout_ms=int(node_data.get("approval_timeout_ms", 60000)),
approval_default=node_data.get("approval_default", "deny"),
)
agent_config = AgentConfig(
name=node_data.get("label", "agent_node"),
system_prompt=formatted_prompt,
llm=llm_config,
tools=tool_config,
memory=memory_config,
budget=budget,
user_id=user_id,
memory_scope_id=node_data.get("memory_scope_id") or node_data.get("agent_id", ""),
self_review_enabled=node_data.get("self_review_enabled", False),
)
# 4. 执行 Agent
runtime = AgentRuntime(
config=agent_config,
execution_logger=execution_logger,
on_tool_executed=on_tool_executed,
)
# 注入 LLM 预算回调(使 Agent 内部 LLM 调用计入工作流预算)
if on_llm_invocation:
runtime.on_llm_invocation = on_llm_invocation
result = await runtime.run(query)
# 5. 返回结果(兼容工作流引擎的输出格式)
if result.success:
return {
"output": result.content,
"status": "success",
"agent_meta": {
"iterations": result.iterations_used,
"tool_calls": result.tool_calls_made,
"truncated": result.truncated,
},
}
else:
return {
"output": result.content,
"status": "error",
"error": result.error,
}
async def run_orchestrator_node(
node_data: Dict[str, Any],
input_data: Dict[str, Any],
execution_logger: Optional[Any] = None,
user_id: Optional[str] = None,
on_tool_executed: Optional[Any] = None,
on_llm_invocation: Optional[Any] = None,
) -> Dict[str, Any]:
"""
在工作流中执行多 Agent 编排节点。
node_data 支持的字段:
mode — "route" | "sequential" | "debate" | "pipeline"
agents — Agent ID 列表(必填,至少 2 个)
routing_prompt — route 模式的路由指令(可选)
aggregation_prompt— debate 模式的汇总指令(可选)
model — 覆盖各 Agent 的模型(可选)
provider — 覆盖各 Agent 的提供商(可选)
temperature — 覆盖各 Agent 的温度(可选)
max_iterations — 覆盖各 Agent 的最大步数(可选)
input_data 中的 "query""input" 字段作为用户输入。
"""
from sqlalchemy.orm import Session
from app.core.database import SessionLocal
from app.models.agent import Agent
from app.agent_runtime.orchestrator import (
AgentOrchestrator,
OrchestratorAgentConfig,
)
# 1. 解析输入
query = (
input_data.get("query")
or input_data.get("input")
or input_data.get("text", "")
)
if not isinstance(query, str):
query = str(query) if query else ""
if not query:
return {"output": "错误Orchestrator 节点未收到用户输入", "status": "error"}
# 2. 解析编排模式
mode = node_data.get("mode", "debate").lower()
if mode not in ("route", "sequential", "debate", "pipeline"):
return {"output": f"错误:不支持的编排模式 '{mode}',可选: route, sequential, debate, pipeline", "status": "error"}
# 3. 解析 Agent 列表
agent_ids = node_data.get("agents", [])
if not agent_ids or not isinstance(agent_ids, list) or len(agent_ids) < 1:
return {"output": "错误Orchestrator 节点需要至少 1 个 Agent", "status": "error"}
# 4. 从 DB 加载 Agent 配置
db: Optional[Session] = None
try:
db = SessionLocal()
# 覆盖配置(可选)
override_model = node_data.get("model")
override_provider = node_data.get("provider")
override_temperature = node_data.get("temperature")
override_max_iterations = node_data.get("max_iterations")
agent_configs: list = []
for agent_id in agent_ids:
agent = db.query(Agent).filter(Agent.id == agent_id).first()
if not agent:
logger.warning("Orchestrator: Agent %s 不存在,跳过", agent_id)
continue
# 从 workflow_config 提取 Agent 的 LLM 配置
wc = agent.workflow_config or {}
nodes = wc.get("nodes", [])
system_prompt = agent.description or ""
model = override_model or "deepseek-v4-flash"
provider = override_provider or "deepseek"
temperature = float(override_temperature) if override_temperature else 0.7
max_iterations = int(override_max_iterations) if override_max_iterations else 10
tools_whitelist: list = []
for n in nodes:
if n.get("type") not in ("agent", "llm", "template"):
continue
cfg = n.get("data", {}) if isinstance(n, dict) else getattr(n, "data", {})
system_prompt = cfg.get("system_prompt", "") or system_prompt
if not override_model:
model = cfg.get("model", model)
if not override_provider:
provider = cfg.get("provider", provider)
if not override_temperature:
temperature = float(cfg.get("temperature", temperature))
if not override_max_iterations:
max_iterations = int(cfg.get("max_iterations", max_iterations))
tools_whitelist = cfg.get("tools", tools_whitelist)
break
agent_configs.append(OrchestratorAgentConfig(
id=agent.id,
name=agent.name or "Agent",
system_prompt=system_prompt,
model=model,
provider=provider,
temperature=temperature,
max_iterations=max_iterations,
tools=tools_whitelist,
description=agent.description or "",
))
if not agent_configs:
return {"output": "错误:没有找到可用的 Agent", "status": "error"}
# 5. 创建 Orchestrator 并执行
orchestrator = AgentOrchestrator(
default_llm_config=AgentLLMConfig(
model=override_model or "deepseek-v4-flash",
temperature=0.3,
),
)
result = await orchestrator.run(
mode=mode,
question=query,
agents=agent_configs,
on_llm_call=on_llm_invocation,
)
# 6. 返回结构化结果
return {
"output": result.final_answer,
"status": "success",
"orchestrator_meta": {
"mode": result.mode,
"agent_count": len(agent_configs),
"steps": [
{
"agent_id": s.agent_id,
"agent_name": s.agent_name,
"input": s.input[:200] if s.input else "",
"output": s.output[:500] if s.output else "",
"iterations_used": s.iterations_used,
"tool_calls_made": s.tool_calls_made,
"error": s.error,
}
for s in result.steps
],
},
}
except Exception as e:
logger.error("Orchestrator 节点执行失败: %s", e, exc_info=True)
return {
"output": None,
"status": "failed",
"error": f"Orchestrator 执行失败: {e}",
}
finally:
if db:
db.close()