feat: Agent 运行时、对话 API、作业助手与引擎修复及前端执行超时

- agent_runtime 模块与 agent_chat API,前端 AgentChat 视图与路由对接
- workflow_engine: code 节点命名空间与 json 引用修复
- llm_service: 工具调用 extra_body(如 DeepSeek)
- create_homework_manager_agent / _3 脚本与测试脚本扩展
- frontend: WORKFLOW_EXECUTION_HTTP_TIMEOUT_MS、AgentChatPreview/MainLayout 等
- 文档:架构说明与自主 Agent 改造完成情况

Made-with: Cursor
This commit is contained in:
renjianbo
2026-05-01 11:31:48 +08:00
parent 4366312946
commit 09467568ec
23 changed files with 2798 additions and 77 deletions

View File

@@ -983,6 +983,64 @@ class WorkflowEngine:
return ctx.get("assistant_display_name")
return None
def _format_prior_conversation_for_llm(
self, input_data: Dict[str, Any], original_prompt_template: str
) -> Optional[str]:
"""
Agent 多轮对话:执行请求若携带 conversation_history而提示词未使用
{{memory.conversation_history}} 等占位符,则在此处拼进最终 prompt避免模型「失忆」。
"""
t = original_prompt_template or ""
if "memory.conversation_history" in t or re.search(
r"\{\{[^}]*conversation_history[^}]*\}\}", t
):
return None
hist: Any = None
if isinstance(input_data, dict):
hist = input_data.get("conversation_history")
if hist is None and isinstance(input_data.get("memory"), dict):
hist = input_data["memory"].get("conversation_history")
if hist is None and isinstance(input_data.get("right"), dict):
r = input_data["right"]
hist = r.get("conversation_history")
if hist is None and isinstance(r.get("memory"), dict):
hist = r["memory"].get("conversation_history")
if not hist or not isinstance(hist, list):
return None
lines: List[str] = []
max_turns = 24
for msg in hist[-max_turns:]:
if not isinstance(msg, dict):
continue
role = msg.get("role", "")
content = msg.get("content", "")
if content is None:
continue
if not isinstance(content, str):
content = str(content)
content = content.strip()
if not content:
continue
if role == "user":
lines.append(f"用户:{content}")
elif role in ("assistant", "agent"):
lines.append(f"助手:{content}")
else:
lines.append(f"{role}{content}")
if not lines:
return None
body = "\n".join(lines)
max_chars = 12000
if len(body) > max_chars:
body = body[-max_chars:] + "\n…(更早的对话已截断)"
return f"【本轮之前的对话】\n{body}"
def _resolve_vector_db_query_embedding(
self, input_data: Any, query_vector_config: Any
) -> Optional[List[Any]]:
@@ -1605,6 +1663,8 @@ class WorkflowEngine:
logger.info(f"[rjb] 使用JSON或字符串转换: user_query={user_query}")
logger.info(f"[rjb] 最终提取的user_query: {user_query}")
history_block = self._format_prior_conversation_for_llm(input_data, prompt)
# 如果prompt中没有占位符或者仍有未填充的变量将用户输入附加到prompt
is_generic_instruction = False # 初始化变量
@@ -1633,25 +1693,43 @@ class WorkflowEngine:
if is_generic_instruction:
# 如果是通用指令直接使用用户输入作为prompt
formatted_prompt = str(user_query)
if history_block:
formatted_prompt = f"{history_block}\n\n{str(user_query)}"
else:
formatted_prompt = str(user_query)
logger.info(f"[rjb] 检测到通用指令直接使用用户输入作为prompt: {user_query[:50] if user_query else 'None'}")
else:
# 否则将用户输入附加到prompt
formatted_prompt = f"{formatted_prompt}\n\n{user_query}"
if history_block:
formatted_prompt = f"{formatted_prompt}\n\n{history_block}\n\n{user_query}"
else:
formatted_prompt = f"{formatted_prompt}\n\n{user_query}"
logger.info(f"[rjb] 非通用指令将用户输入附加到prompt")
else:
# 如果没有提取到用户查询附加整个input_data
formatted_prompt = f"{formatted_prompt}\n\n{json_module.dumps(input_data, ensure_ascii=False)}"
tail = json_module.dumps(input_data, ensure_ascii=False)
if history_block:
formatted_prompt = f"{formatted_prompt}\n\n{history_block}\n\n{tail}"
else:
formatted_prompt = f"{formatted_prompt}\n\n{tail}"
elif has_unfilled_variables or re.search(r'\{\{[^}]+\}\}', formatted_prompt):
# 如果有占位符但未填充,先尝试清理所有未填充的模板变量
# 使用正则表达式替换所有 {{...}} 格式的未填充变量
formatted_prompt = re.sub(r'\{\{[^}]+\}\}', '', formatted_prompt)
# 如果有占位符但未填充,附加用户需求说明
if user_query:
formatted_prompt = f"{formatted_prompt}\n\n用户需求:{user_query}\n\n请根据用户需求来完成任务。"
user_tail = f"用户需求:{user_query}\n\n请根据用户需求来完成任务。"
if history_block:
formatted_prompt = f"{formatted_prompt}\n\n{history_block}\n\n{user_tail}"
else:
formatted_prompt = f"{formatted_prompt}\n\n{user_tail}"
else:
# 如果没有用户查询附加整个input_data
formatted_prompt = f"{formatted_prompt}\n\n输入数据:{json_module.dumps(input_data, ensure_ascii=False)}\n\n请根据输入数据来完成任务。"
data_tail = f"输入数据:{json_module.dumps(input_data, ensure_ascii=False)}\n\n请根据输入数据来完成任务。"
if history_block:
formatted_prompt = f"{formatted_prompt}\n\n{history_block}\n\n{data_tail}"
else:
formatted_prompt = f"{formatted_prompt}\n\n{data_tail}"
logger.info(f"[rjb] LLM节点prompt格式化: node_id={node_id}, original_prompt='{prompt[:50] if len(prompt) > 50 else prompt}', has_any_placeholder={has_any_placeholder}, user_query={user_query}, is_generic_instruction={is_generic_instruction}, final_prompt前200字符='{formatted_prompt[:200] if len(formatted_prompt) > 200 else formatted_prompt}'")
prompt = formatted_prompt
@@ -1720,6 +1798,9 @@ class WorkflowEngine:
llm_extra_kw["api_key"] = api_key
if base_url is not None:
llm_extra_kw["base_url"] = base_url
_xb = node_data.get("extra_body")
if isinstance(_xb, dict) and _xb:
llm_extra_kw["extra_body"] = _xb
# 记录实际发送给LLM的prompt
logger.info(f"[rjb] 准备调用LLM: node_id={node_id}, provider={provider}, model={model}, prompt前200字符='{prompt[:200] if len(prompt) > 200 else prompt}'")
@@ -1821,7 +1902,45 @@ class WorkflowEngine:
'status': 'failed',
'error': f'LLM调用失败: {str(e)}'
}
elif node_type == 'agent':
# Agent 节点:自主 ReAct 循环,支持多步工具调用
if self.logger:
self.logger.info(
"Agent 节点开始执行",
data={"node_id": node_id, "input": input_data},
)
try:
from app.agent_runtime.workflow_integration import run_agent_node
_agent_on_tool = None
if hasattr(self, '_on_tool_executed_budget'):
_agent_on_tool = self._on_tool_executed_budget
result = await run_agent_node(
node_data=node.get("data", {}),
input_data=input_data,
execution_logger=self.logger,
user_id=self.trusted_model_config_user_id,
on_tool_executed=_agent_on_tool,
)
if self.logger:
duration = int((time.time() - start_time) * 1000)
self.logger.log_node_complete(
node_id, node_type, result.get("output"), duration,
)
return result
except Exception as e:
if self.logger:
duration = int((time.time() - start_time) * 1000)
self.logger.log_node_error(node_id, node_type, e, duration)
logger.error(f"Agent 节点执行失败: {e}", exc_info=True)
return {
"output": None,
"status": "failed",
"error": f"Agent 执行失败: {e}",
}
elif node_type == 'condition':
# 条件节点判断分支output 必须透传上游 dict否则 sourceHandle true/false 下游只收到布尔值,丢失 reply/memory
condition = node.get('data', {}).get('condition', '')
@@ -1892,6 +2011,9 @@ class WorkflowEngine:
expanded_input.update(_bp)
for _k in ('true', 'false', '_condition_result', '_condition_error'):
expanded_input.pop(_k, None)
# 展开 left双入边 transform 的上游一路常挂在 sourceHandle=left另一路为 LLM/code 的 right
if isinstance(expanded_input.get('left'), dict):
expanded_input.update(expanded_input['left'])
# 展开 rightmerge / json-parse 后 reply、user_profile 常在 right 或嵌套 JSON 字符串中
if isinstance(expanded_input.get('right'), dict):
expanded_input.update(expanded_input['right'])
@@ -2365,7 +2487,6 @@ class WorkflowEngine:
try:
import os
import json
import base64
from pathlib import Path
@@ -2806,7 +2927,6 @@ class WorkflowEngine:
if queue_type == 'rabbitmq':
# RabbitMQ实现
import aio_pika
import json
# 获取RabbitMQ配置
host = replace_variables(node_data.get('host', 'localhost'), input_data)
@@ -2885,7 +3005,6 @@ class WorkflowEngine:
elif queue_type == 'kafka':
# Kafka实现
from kafka import KafkaProducer
import json
# 获取Kafka配置
bootstrap_servers = replace_variables(node_data.get('bootstrap_servers', 'localhost:9092'), input_data)
@@ -3530,6 +3649,31 @@ class WorkflowEngine:
if not isinstance(base_up, dict):
base_up = {}
memory['user_profile'] = {**base_up, **upd}
hb_upd = input_data.get('homework_board_update')
if isinstance(hb_upd, str) and hb_upd.strip().startswith('{'):
try:
hb_upd = json_module.loads(hb_upd)
except Exception:
hb_upd = {}
if not isinstance(hb_upd, dict):
hb_upd = {}
if hb_upd:
ctx = memory.get('context')
if not isinstance(ctx, dict):
ctx = {}
base_hb = ctx.get('homework_board')
if not isinstance(base_hb, dict):
base_hb = {}
merged_hb = {**base_hb, **hb_upd}
new_items = hb_upd.get('items')
old_items = base_hb.get('items')
if isinstance(new_items, list) and len(new_items) > 0:
merged_hb['items'] = new_items
elif isinstance(old_items, list):
merged_hb['items'] = old_items
ctx['homework_board'] = merged_hb
memory['context'] = ctx
# 确保memory中有必要的字段
if 'conversation_history' not in memory:
@@ -4937,10 +5081,27 @@ class WorkflowEngine:
try:
if language.lower() == 'python':
# 受限执行环境(禁止无 __builtins__否则 isinstance 等不可用)
local_vars = {'input_data': input_data, 'result': None}
_code_globs = {'__builtins__': _CODE_NODE_SAFE_BUILTINS, 'hashlib': hashlib, 're': re}
exec(code, _code_globs, local_vars)
result = local_vars.get('result', local_vars.get('output', input_data))
# 注入 loads/dumps使用「globals == locals」同一命名空间 exec
# 避免嵌套函数 LOAD_GLOBAL 找不到仅在 locals 里的 loads以及 json 作用域异常。
_code_globs = {
'__builtins__': _CODE_NODE_SAFE_BUILTINS,
'hashlib': hashlib,
're': re,
'json': json,
}
shared_ns: Dict[str, Any] = dict(_code_globs)
shared_ns.update(
{
'input_data': input_data,
'result': None,
'loads': json.loads,
'dumps': json.dumps,
}
)
exec(code, shared_ns, shared_ns)
result = shared_ns.get(
'result', shared_ns.get('output', input_data)
)
elif language.lower() == 'javascript':
# JS 执行需要外部运行时,这里仅占位
result = {