feat: Phase 1 - output quality verification + node-level auto-retry

- Add enterprise_review tool (35th builtin) for LLM-based quality assessment
- Add evaluator workflow node type for quality gating in DAG
- Add AgentRuntime built-in self-review with auto-correction loop
- Rewrite error_handler node from stub to real retry mechanism
- Add engine-level per-node retry with configurable max_retries/delay/on_exhausted
- Add AgentExtension model for extension tracking
- Enhance validation in agent_create_tool and tool_register_tool
- Update 全能助手 system prompt with self-evolution workflow
- Docs: 缺失能力.md and 解决缺失能力计划.md

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
renjianbo
2026-05-04 22:05:28 +08:00
parent 244ed31274
commit b8b01254ca
13 changed files with 1821 additions and 60 deletions

View File

@@ -4066,57 +4066,62 @@ class WorkflowEngine:
}
elif node_type == 'error_handler':
# 错误处理节点:捕获错误、错误重试、错误通知
# 注意:这个节点需要特殊处理,因为它应该包装其他节点的执行
# 这里我们实现一个简化版本,主要用于错误重试和通知
# 错误处理节点:捕获上游节点失败,执行重试/通知/停止策略
node_data = node.get('data', {})
retry_count = node_data.get('retry_count', 3)
retry_delay = node_data.get('retry_delay', 1000) # 毫秒
on_error = node_data.get('on_error', 'notify') # notify, retry, stop
error_handler_workflow = node_data.get('error_handler_workflow', '')
# 这个节点通常用于包装其他节点,但在这里我们只处理输入数据中的错误
# 找到上游节点(从 edges 反查 source
predecessor_id = None
for e in self.active_edges if hasattr(self, 'active_edges') else []:
if e.get("target") == node_id:
predecessor_id = e.get("source")
break
if not predecessor_id:
for e in self.edges:
if e.get("target") == node_id:
predecessor_id = e.get("source")
break
try:
# 检查输入数据中是否有错误
if isinstance(input_data, dict) and input_data.get('status') == 'failed':
error = input_data.get('error', '未知错误')
if on_error == 'retry' and retry_count > 0:
# 重试逻辑(这里简化处理,实际应该重新执行前一个节点)
logger.warning(f"错误处理节点检测到错误,将重试: {error}")
# 注意:实际重试需要重新执行前一个节点,这里只记录
if on_error == 'retry':
logger.warning(f"error_handler 检测到错误,请求重试前驱节点 {predecessor_id}: {error}")
exec_result = {
'output': input_data,
'status': 'retry',
'status': 'retry_predecessor',
'predecessor_id': predecessor_id,
'retry_count': retry_count,
'error': error
'retry_delay_ms': retry_delay,
'error': error,
}
elif on_error == 'notify':
# 通知错误(记录日志)
logger.error(f"错误处理节点捕获错误: {error}")
logger.error(f"error_handler 捕获错误并通知: {error}")
exec_result = {
'output': input_data,
'status': 'error_handled',
'error': error,
'notified': True
'notified': True,
}
else:
# 停止执行
else: # stop
exec_result = {
'output': input_data,
'status': 'failed',
'error': error,
'stopped': True
'stopped': True,
}
else:
# 没有错误,正常通过
# 上游正常,透传
exec_result = {'output': input_data, 'status': 'success'}
if self.logger:
duration = int((time.time() - start_time) * 1000)
self.logger.log_node_complete(node_id, node_type, exec_result.get('output'), duration)
return exec_result
except Exception as e:
if self.logger:
duration = int((time.time() - start_time) * 1000)
@@ -4124,7 +4129,7 @@ class WorkflowEngine:
return {
'output': input_data,
'status': 'failed',
'error': f'错误处理失败: {str(e)}'
'error': f'错误处理失败: {str(e)}',
}
elif node_type == 'csv':
@@ -5427,6 +5432,89 @@ class WorkflowEngine:
self.logger.log_node_complete(node_id, node_type, final_output, duration)
return result
elif node_type == 'evaluator':
# 输出质量评估节点:用 LLM 评判上游节点的输出质量
nd = node.get('data', {}) or {}
criteria = nd.get('criteria', '回答必须准确、完整、切题')
pass_threshold = float(nd.get('pass_threshold', 0.6))
# 从输入数据提取待评估内容
if isinstance(input_data, dict):
content = input_data.get('output', input_data.get('content', str(input_data)))
else:
content = str(input_data) if input_data else ""
# 调用 LLM 评判
try:
from app.services.llm_service import llm_service
judge_prompt = (
"你是严格的内容质量评审专家。请根据以下标准对内容进行评分。\n\n"
f"【评判标准】\n{criteria}\n\n"
f"【待评审内容】\n{str(content)[:8000]}\n\n"
"请以 JSON 格式返回评审结果(严格只返回 JSON不要任何其他文字\n"
'{"score": 0.75, "passed": true, "issues": ["问题1"], '
'"suggestions": ["建议1"], "summary": "一句话总结"}\n\n'
"评分规则1.0完美 0.8良好 0.6基本满足 0.4大部分未满足 0.2完全不满足\n"
"score >= 0.6 时 passed=true否则 passed=false\n"
)
messages = [{"role": "user", "content": judge_prompt}]
resp = llm_service.call_openai_with_tools(
provider="deepseek",
model="deepseek-v4-flash",
messages=messages,
temperature=0.1,
max_tokens=600,
)
judge_text = ""
if isinstance(resp, dict):
choices = resp.get("choices", [])
if choices:
judge_text = choices[0].get("message", {}).get("content", "")
elif hasattr(resp, 'choices'):
judge_text = resp.choices[0].message.content or ""
# 解析 JSON
import re as _eval_re
judge_clean = judge_text.strip()
if judge_clean.startswith("```"):
lines = judge_clean.split("\n")
judge_clean = "\n".join(lines[1:-1] if lines[-1].strip() == "```" else lines[1:])
try:
review = json.loads(judge_clean)
except json.JSONDecodeError:
m = _eval_re.search(r'\{[^{}]*"score"\s*:\s*[\d.]+[^{}]*\}', judge_text, _eval_re.DOTALL)
if m:
review = json.loads(m.group())
else:
review = {"score": 0.5, "passed": False, "issues": ["评审解析失败"], "suggestions": []}
score = float(review.get("score", 0.5))
passed = score >= pass_threshold
exec_result = {
'output': {
'score': score,
'passed': passed,
'threshold': pass_threshold,
'issues': review.get('issues', []),
'suggestions': review.get('suggestions', []),
'summary': review.get('summary', ''),
'content': content if isinstance(content, str) else str(content),
},
'status': 'success' if passed else 'quality_failed',
}
except Exception as e:
logger.error(f"evaluator 节点评审失败: {e}")
exec_result = {
'output': {'score': 0, 'passed': False, 'error': str(e), 'content': str(content)},
'status': 'failed',
}
if self.logger:
duration = int((time.time() - start_time) * 1000)
self.logger.log_node_complete(node_id, node_type, exec_result.get('output'), duration)
return exec_result
else:
# 未知节点类型
logger.warning(f"[rjb] 未知节点类型: node_id={node_id}, node_type={node_type}, node keys={list(node.keys())}")
@@ -5751,9 +5839,84 @@ class WorkflowEngine:
# 继续查找循环体内的节点
self._mark_loop_body_executed(target_id, executed_nodes, active_edges)
else:
# 执行失败,停止工作流
# 执行失败或质量不达标 — 支持重试
failed_status = result.get('status', 'failed')
error_msg = result.get('error', '未知错误')
node_type = node.get('type', 'unknown')
# 处理 error_handler 返回的 retry_predecessor
if failed_status == 'retry_predecessor':
pred_id = result.get('predecessor_id')
eh_retry_count = result.get('retry_count', 1)
eh_retry_delay_ms = result.get('retry_delay_ms', 1000)
# 检查是否已有重试计数(多次经过 error_handler
prev_counter = self.node_outputs.get(f"_eh_retry_{pred_id}", {})
remaining = prev_counter.get("remaining", eh_retry_count)
if pred_id and pred_id in self.nodes and remaining > 0:
remaining -= 1
logger.warning(
f"error_handler 请求重试前驱节点 {pred_id},剩余 {remaining}"
)
self.executed_nodes.discard(pred_id)
self.node_outputs.pop(pred_id, None)
self.node_outputs[f"_eh_retry_{pred_id}"] = {
"remaining": remaining,
"delay_ms": eh_retry_delay_ms,
"error_handler_id": next_node_id,
}
executed_nodes.add(next_node_id)
execution_sequence.append(next_node_id)
results[next_node_id] = result
await asyncio.sleep(eh_retry_delay_ms / 1000.0)
continue
elif remaining <= 0:
logger.error(f"error_handler 重试次数耗尽,前驱节点 {pred_id} 停止")
raise WorkflowExecutionError(
detail=f"error_handler 重试次数耗尽: {error_msg}",
node_id=pred_id,
)
# 检查节点级 retry_config
node_data = node.get('data', {}) or {}
retry_cfg = node_data.get('retry_config', {})
max_retries = retry_cfg.get('max_retries', 0) if isinstance(retry_cfg, dict) else 0
retry_delay_ms = retry_cfg.get('retry_delay_ms', 1000) if isinstance(retry_cfg, dict) else 1000
on_exhausted = retry_cfg.get('on_exhausted', 'stop') if isinstance(retry_cfg, dict) else 'stop'
retry_key = f"_retry_{next_node_id}"
retries_done = self.node_outputs.get(retry_key, 0)
if max_retries > 0 and retries_done < max_retries:
self.node_outputs[retry_key] = retries_done + 1
logger.warning(
f"节点 {next_node_id} ({node_type}) 执行失败,重试 {retries_done + 1}/{max_retries}: {error_msg}"
)
await asyncio.sleep(retry_delay_ms / 1000.0)
continue # 不标记已执行,下次循环重新执行
# 重试耗尽或未配置重试
if retries_done >= max_retries and max_retries > 0:
if on_exhausted == 'skip':
logger.warning(f"节点 {next_node_id} 重试耗尽,跳过: {error_msg}")
self.node_outputs[next_node_id] = {
'status': 'skipped', 'error': error_msg
}
executed_nodes.add(next_node_id)
execution_sequence.append(next_node_id)
results[next_node_id] = result
continue
elif on_exhausted == 'notify':
logger.error(f"节点 {next_node_id} 重试耗尽,已通知: {error_msg}")
self.node_outputs[next_node_id] = {
'status': 'error_notified', 'error': error_msg
}
executed_nodes.add(next_node_id)
execution_sequence.append(next_node_id)
results[next_node_id] = result
continue
# 默认:停止工作流
logger.error(f"工作流执行失败 - 节点: {next_node_id} ({node_type}), 错误: {error_msg}")
raise WorkflowExecutionError(
detail=error_msg,