feat: Phase 3 - parallel execution, progress reporting, result caching + AgentChat bug fixes

Phase 3 能力:
- DAG 并行执行 (workflow_engine): asyncio.gather 并行执行就绪节点
- Debate 并行 (orchestrator): for 循环改为 asyncio.gather
- 粒度进度上报 (workflow_engine + tasks + websocket): Redis 推送 + DB 降级
- 工具结果缓存 (tool_manager): 确定性工具默认开启缓存
- LLM 响应缓存 (core): messages[-4:] + model 哈希,5min TTL

AgentChat bug 修复 (Gitea #1-#5):
- #1 SSE 降级重复空消息: fallback POST 前移除占位消息
- #2 streamTimeout 泄漏: while 正常退出后 clearTimeout
- #3 loading 闪烁: final/error 事件中提前设 loading=false
- #4 SSE 事件类型对齐: 确认匹配,未知类型加 console.warn
- #5 retryMessage 流式残留: 重试时清理占位消息

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
renjianbo
2026-05-05 00:00:51 +08:00
parent f3cb35c460
commit 7e00b027d4
8 changed files with 605 additions and 345 deletions

View File

@@ -5589,14 +5589,18 @@ class WorkflowEngine:
self,
input_data: Dict[str, Any],
resume_snapshot: Optional[Dict[str, Any]] = None,
execution_id: Optional[str] = None,
on_progress: Optional[callable] = None,
) -> Dict[str, Any]:
"""
执行完整工作流
Args:
input_data: 初始输入数据(恢复执行时须包含 __hil_decision 等)
resume_snapshot: 从挂起快照恢复(与 pause_state 一致)
execution_id: 执行记录 ID用于进度上报
on_progress: 进度回调 async def(execution_id, progress_data)
Returns:
执行结果
"""
@@ -5641,6 +5645,8 @@ class WorkflowEngine:
self._tool_calls_used = 0
results = {}
total_nodes = len(self.nodes)
# 按拓扑顺序执行节点(动态构建执行图)
while True:
# 构建当前活跃的执行图
@@ -5656,12 +5662,12 @@ class WorkflowEngine:
f"[rjb] 当前执行图: {execution_order}, 活跃边数: {len(active_edges)}, 已执行节点: {executed_nodes}"
)
next_node_id = None
# 收集所有就绪节点DAG 并行)
ready_nodes: list = []
for node_id in pending_ids:
can_execute = True
incoming_edges = [e for e in active_edges if e["target"] == node_id]
if not incoming_edges:
# 没有入边:仅允许 Start孤立节点跳过
if node_id not in [n["id"] for n in self.nodes.values() if n.get("type") == "start"]:
logger.debug(f"[rjb] 节点 {node_id} 没有入边,跳过执行")
continue
@@ -5669,7 +5675,6 @@ class WorkflowEngine:
for edge in incoming_edges:
src = edge["source"]
if src not in forward_reachable:
# 条件分支裁剪后不可达的前驱,不参与 gateOR-join
continue
if src not in executed_nodes:
can_execute = False
@@ -5678,298 +5683,327 @@ class WorkflowEngine:
)
break
if can_execute:
next_node_id = node_id
ready_nodes.append(node_id)
logger.info(
f"[rjb] 选择执行节点: {next_node_id}, 类型: {self.nodes[next_node_id].get('type')}, 入边数: {len(incoming_edges)}"
f"[rjb] 就绪节点: {node_id}, 类型: {self.nodes[node_id].get('type')}"
)
break
if not next_node_id:
if not ready_nodes:
break # 没有更多节点可执行
node = self.nodes[next_node_id]
is_approval = node.get("type") == "approval"
if not is_approval:
executed_nodes.add(next_node_id)
execution_sequence.append(next_node_id)
# 调试:检查节点数据结构
if node.get('type') == 'llm':
logger.debug(f"[rjb] 执行LLM节点: node_id={next_node_id}, node keys={list(node.keys())}, data keys={list(node.get('data', {}).keys()) if node.get('data') else []}")
# 获取节点输入(使用活跃的边)
node_input = self.get_node_input(next_node_id, self.node_outputs, active_edges)
# 如果是起始节点,使用初始输入
if node.get('type') == 'start' and not node_input:
node_input = input_data
logger.info(f"[rjb] Start节点使用初始输入: node_id={next_node_id}, node_input={node_input}")
# 调试:记录节点输入数据
if node.get('type') == 'llm':
logger.info(f"[rjb] LLM节点输入: node_id={next_node_id}, node_input={node_input}, node_outputs keys={list(self.node_outputs.keys())}")
if 'start-1' in self.node_outputs:
logger.info(f"[rjb] Start节点输出内容: {self.node_outputs['start-1']}")
# 标记所有就绪节点为已执行approval 节点除外)
for nid in ready_nodes:
node = self.nodes[nid]
if node.get("type") != "approval":
executed_nodes.add(nid)
execution_sequence.append(nid)
# 单执行步数预算(每执行一个节点计 1 步)
self._steps_used += 1
# 获取所有就绪节点的输入
node_inputs: dict = {}
for nid in ready_nodes:
node = self.nodes[nid]
node_input = self.get_node_input(nid, self.node_outputs, active_edges)
if node.get('type') == 'start' and not node_input:
node_input = input_data
node_inputs[nid] = node_input
# 预算检查(整批节点)
self._steps_used += len(ready_nodes)
if self._steps_used > self._cap_steps:
raise WorkflowExecutionError(
detail=f"已超过单执行预算上限({self._cap_steps} 步),已熔断",
node_id=next_node_id,
node_id=ready_nodes[0],
)
# 执行节点
result = await self.execute_node(node, node_input)
# 并行执行所有就绪节点
async def _exec_one(nid: str):
try:
return await self.execute_node(self.nodes[nid], node_inputs[nid])
except Exception as exc:
return {"status": "failed", "error": str(exc)}
if result.get("status") == "awaiting_approval":
self._steps_used -= 1
snap = self._build_pause_snapshot(
next_node_id, active_edges, executed_nodes, execution_sequence, results
)
raise WorkflowPaused(snap)
if is_approval:
executed_nodes.add(next_node_id)
execution_sequence.append(next_node_id)
results[next_node_id] = result
# 保存节点输出
if result.get('status') == 'success':
output_value = result.get('output', {})
self.node_outputs[next_node_id] = output_value
if node.get('type') == 'start':
logger.info(f"[rjb] Start节点输出已保存: node_id={next_node_id}, output={output_value}, output_type={type(output_value)}")
# 如果是条件节点或Switch节点根据分支结果过滤边
if node.get('type') == 'condition':
branch = result.get('branch', 'false')
logger.info(f"[rjb] 条件节点分支过滤: node_id={next_node_id}, branch={branch}")
# 移除不符合条件的边
# 只保留1) 不是从条件节点出发的边,或 2) 从条件节点出发且sourceHandle匹配分支的边
edges_to_remove = []
edges_to_keep = []
for edge in active_edges:
if edge['source'] == next_node_id:
# 这是从条件节点出发的边
edge_handle = edge.get('sourceHandle')
if edge_handle == branch:
# sourceHandle匹配分支保留
edges_to_keep.append(edge)
logger.info(f"[rjb] 保留边: {edge.get('id')} (sourceHandle={edge_handle} == branch={branch})")
else:
# sourceHandle不匹配或为None移除
edges_to_remove.append(edge)
logger.info(f"[rjb] 移除边: {edge.get('id')} (sourceHandle={edge_handle} != branch={branch})")
else:
# 不是从条件节点出发的边,保留
edges_to_keep.append(edge)
active_edges = edges_to_keep
elif node.get('type') == 'switch':
branch = result.get('branch', 'default')
logger.info(f"[rjb] Switch节点分支过滤: node_id={next_node_id}, branch={branch}")
# 记录过滤前的边信息
edges_before = [e for e in active_edges if e['source'] == next_node_id]
logger.info(f"[rjb] Switch节点过滤前: 从节点出发的边有{len(edges_before)}")
for edge in edges_before:
logger.info(f"[rjb] 边 {edge.get('id')}: sourceHandle={edge.get('sourceHandle')}, target={edge.get('target')}")
# 移除不匹配的边
edges_to_keep = []
edges_removed_count = 0
removed_source_nodes = set() # 记录被移除边的源节点
for edge in active_edges:
if edge['source'] == next_node_id:
# 这是从Switch节点出发的边
edge_handle = edge.get('sourceHandle')
if edge_handle == branch:
# sourceHandle匹配分支保留
edges_to_keep.append(edge)
logger.info(f"[rjb] ✅ 保留边: {edge.get('id')} (sourceHandle={edge_handle} == branch={branch})")
else:
# sourceHandle不匹配移除
edges_removed_count += 1
target_id = edge.get('target')
removed_source_nodes.add(target_id) # 记录目标节点(这些节点将不再可达)
logger.info(f"[rjb] ❌ 移除边: {edge.get('id')} (sourceHandle={edge_handle} != branch={branch}, target={target_id})")
else:
# 不是从Switch节点出发的边保留
edges_to_keep.append(edge)
# 重要移除那些指向被过滤节点的边这些边来自被过滤的LLM节点
# 例如如果llm-question被过滤了那么llm-question → merge-response的边也应该被移除
additional_removed = 0
for edge in list(edges_to_keep): # 使用list副本因为我们要修改原列表
if edge['source'] in removed_source_nodes:
# 这条边来自被过滤的节点,也应该被移除
edges_to_keep.remove(edge)
additional_removed += 1
logger.info(f"[rjb] ❌ 移除来自被过滤节点的边: {edge.get('id')} ({edge.get('source')}{edge.get('target')})")
edges_removed_count += additional_removed
active_edges = edges_to_keep
filter_info = {
'branch': branch,
'edges_before': len(edges_before),
'edges_kept': len([e for e in edges_to_keep if e['source'] == next_node_id]),
'edges_removed': edges_removed_count
}
logger.info(f"[rjb] Switch节点过滤后: 保留{len(active_edges)}条边其中从Switch节点出发的{filter_info['edges_kept']}条),移除{edges_removed_count}条边")
# 记录过滤后的活跃边
remaining_switch_edges = [e for e in active_edges if e['source'] == next_node_id]
logger.info(f"[rjb] Switch节点过滤后剩余的边: {[e.get('id') + '->' + e.get('target') for e in remaining_switch_edges]}")
# 重要:找出那些不再可达的节点(这些节点只通过被移除的边连接)
removed_targets = set()
for edge in edges_before:
if edge not in edges_to_keep:
target_id = edge.get('target')
removed_targets.add(target_id)
logger.info(f"[rjb] ❌ 节点 {target_id} 的边已被移除,该节点将不会被执行")
# 关键修复:立即重新构建执行图,确保不再可达的节点不在执行图中
# 这样在下次循环时,这些节点就不会被选择执行
logger.info(f"[rjb] Switch节点过滤后重新构建执行图排除 {len(removed_targets)} 个不再可达的节点)")
# 同时记录到数据库
if self.logger:
self.logger.info(
f"Switch节点分支过滤: branch={branch}, 保留{filter_info['edges_kept']}条边,移除{edges_removed_count}条边",
node_id=next_node_id,
node_type='switch',
data=filter_info
)
elif node.get('type') == 'approval':
branch = result.get('branch', 'approved')
logger.info(f"[rjb] Approval节点分支过滤: node_id={next_node_id}, branch={branch}")
edges_to_keep = []
removed_source_nodes = set()
for edge in active_edges:
if edge['source'] == next_node_id:
edge_handle = edge.get('sourceHandle')
if edge_handle == branch:
edges_to_keep.append(edge)
else:
removed_source_nodes.add(edge.get('target'))
else:
edges_to_keep.append(edge)
for edge in list(edges_to_keep):
if edge['source'] in removed_source_nodes:
edges_to_keep.remove(edge)
active_edges = edges_to_keep
if self.logger:
self.logger.info(
f"Approval节点分支过滤: branch={branch}",
node_id=next_node_id,
node_type='approval',
)
# 如果是循环节点,跳过循环体的节点(循环体已在节点内部执行)
if node.get('type') in ['loop', 'foreach']:
# 标记循环体的节点为已执行(简化处理)
for edge in active_edges[:]: # 使用切片复制列表
if edge.get('source') == next_node_id:
target_id = edge.get('target')
if target_id in self.nodes:
# 检查是否是循环结束节点
target_node = self.nodes[target_id]
if target_node.get('type') not in ['loop_end', 'end']:
# 标记为已执行(循环体已在循环节点内部执行)
executed_nodes.add(target_id)
# 继续查找循环体内的节点
self._mark_loop_body_executed(target_id, executed_nodes, active_edges)
if len(ready_nodes) == 1:
batch_results = {ready_nodes[0]: await _exec_one(ready_nodes[0])}
else:
# 执行失败或质量不达标 — 支持重试
failed_status = result.get('status', 'failed')
error_msg = result.get('error', '未知错误')
node_type = node.get('type', 'unknown')
logger.info(f"[rjb] 并行执行 {len(ready_nodes)} 个节点: {ready_nodes}")
tasks = [_exec_one(nid) for nid in ready_nodes]
gathered = await asyncio.gather(*tasks, return_exceptions=True)
batch_results = {}
for i, nid in enumerate(ready_nodes):
r = gathered[i]
if isinstance(r, BaseException):
batch_results[nid] = {"status": "failed", "error": str(r)}
else:
batch_results[nid] = r
# 处理 error_handler 返回的 retry_predecessor
if failed_status == 'retry_predecessor':
pred_id = result.get('predecessor_id')
eh_retry_count = result.get('retry_count', 1)
eh_retry_delay_ms = result.get('retry_delay_ms', 1000)
# 逐一处理各节点结果
for next_node_id in ready_nodes:
node = self.nodes[next_node_id]
result = batch_results[next_node_id]
is_approval = node.get("type") == "approval"
# 检查是否已有重试计数(多次经过 error_handler
prev_counter = self.node_outputs.get(f"_eh_retry_{pred_id}", {})
remaining = prev_counter.get("remaining", eh_retry_count)
if pred_id and pred_id in self.nodes and remaining > 0:
remaining -= 1
logger.warning(
f"error_handler 请求重试前驱节点 {pred_id},剩余 {remaining}"
)
self.executed_nodes.discard(pred_id)
self.node_outputs.pop(pred_id, None)
self.node_outputs[f"_eh_retry_{pred_id}"] = {
"remaining": remaining,
"delay_ms": eh_retry_delay_ms,
"error_handler_id": next_node_id,
}
executed_nodes.add(next_node_id)
execution_sequence.append(next_node_id)
results[next_node_id] = result
await asyncio.sleep(eh_retry_delay_ms / 1000.0)
continue
elif remaining <= 0:
logger.error(f"error_handler 重试次数耗尽,前驱节点 {pred_id} 停止")
raise WorkflowExecutionError(
detail=f"error_handler 重试次数耗尽: {error_msg}",
node_id=pred_id,
)
# 检查节点级 retry_config
node_data = node.get('data', {}) or {}
retry_cfg = node_data.get('retry_config', {})
max_retries = retry_cfg.get('max_retries', 0) if isinstance(retry_cfg, dict) else 0
retry_delay_ms = retry_cfg.get('retry_delay_ms', 1000) if isinstance(retry_cfg, dict) else 1000
on_exhausted = retry_cfg.get('on_exhausted', 'stop') if isinstance(retry_cfg, dict) else 'stop'
retry_key = f"_retry_{next_node_id}"
retries_done = self.node_outputs.get(retry_key, 0)
if max_retries > 0 and retries_done < max_retries:
self.node_outputs[retry_key] = retries_done + 1
logger.warning(
f"节点 {next_node_id} ({node_type}) 执行失败,重试 {retries_done + 1}/{max_retries}: {error_msg}"
if result.get("status") == "awaiting_approval":
self._steps_used -= 1
snap = self._build_pause_snapshot(
next_node_id, active_edges, executed_nodes, execution_sequence, results
)
await asyncio.sleep(retry_delay_ms / 1000.0)
continue # 不标记已执行,下次循环重新执行
raise WorkflowPaused(snap)
# 重试耗尽或未配置重试
if retries_done >= max_retries and max_retries > 0:
if on_exhausted == 'skip':
logger.warning(f"节点 {next_node_id} 重试耗尽,跳过: {error_msg}")
self.node_outputs[next_node_id] = {
'status': 'skipped', 'error': error_msg
}
executed_nodes.add(next_node_id)
execution_sequence.append(next_node_id)
results[next_node_id] = result
continue
elif on_exhausted == 'notify':
logger.error(f"节点 {next_node_id} 重试耗尽,已通知: {error_msg}")
self.node_outputs[next_node_id] = {
'status': 'error_notified', 'error': error_msg
}
executed_nodes.add(next_node_id)
execution_sequence.append(next_node_id)
results[next_node_id] = result
continue
if is_approval:
executed_nodes.add(next_node_id)
execution_sequence.append(next_node_id)
results[next_node_id] = result
# 保存节点输出
if result.get('status') == 'success':
output_value = result.get('output', {})
self.node_outputs[next_node_id] = output_value
if node.get('type') == 'start':
logger.info(f"[rjb] Start节点输出已保存: node_id={next_node_id}, output={output_value}, output_type={type(output_value)}")
# 如果是条件节点或Switch节点根据分支结果过滤边
if node.get('type') == 'condition':
branch = result.get('branch', 'false')
logger.info(f"[rjb] 条件节点分支过滤: node_id={next_node_id}, branch={branch}")
# 移除不符合条件的边
# 只保留1) 不是从条件节点出发的边,或 2) 从条件节点出发且sourceHandle匹配分支的边
edges_to_remove = []
edges_to_keep = []
for edge in active_edges:
if edge['source'] == next_node_id:
# 这是从条件节点出发的边
edge_handle = edge.get('sourceHandle')
if edge_handle == branch:
# sourceHandle匹配分支保留
edges_to_keep.append(edge)
logger.info(f"[rjb] 保留边: {edge.get('id')} (sourceHandle={edge_handle} == branch={branch})")
else:
# sourceHandle不匹配或为None移除
edges_to_remove.append(edge)
logger.info(f"[rjb] 移除边: {edge.get('id')} (sourceHandle={edge_handle} != branch={branch})")
else:
# 不是从条件节点出发的边,保留
edges_to_keep.append(edge)
active_edges = edges_to_keep
elif node.get('type') == 'switch':
branch = result.get('branch', 'default')
logger.info(f"[rjb] Switch节点分支过滤: node_id={next_node_id}, branch={branch}")
# 记录过滤前的边信息
edges_before = [e for e in active_edges if e['source'] == next_node_id]
logger.info(f"[rjb] Switch节点过滤前: 从节点出发的边有{len(edges_before)}")
for edge in edges_before:
logger.info(f"[rjb] 边 {edge.get('id')}: sourceHandle={edge.get('sourceHandle')}, target={edge.get('target')}")
# 移除不匹配的边
edges_to_keep = []
edges_removed_count = 0
removed_source_nodes = set() # 记录被移除边的源节点
for edge in active_edges:
if edge['source'] == next_node_id:
# 这是从Switch节点出发的边
edge_handle = edge.get('sourceHandle')
if edge_handle == branch:
# sourceHandle匹配分支保留
edges_to_keep.append(edge)
logger.info(f"[rjb] ✅ 保留边: {edge.get('id')} (sourceHandle={edge_handle} == branch={branch})")
else:
# sourceHandle不匹配移除
edges_removed_count += 1
target_id = edge.get('target')
removed_source_nodes.add(target_id) # 记录目标节点(这些节点将不再可达)
logger.info(f"[rjb] ❌ 移除边: {edge.get('id')} (sourceHandle={edge_handle} != branch={branch}, target={target_id})")
else:
# 不是从Switch节点出发的边保留
edges_to_keep.append(edge)
# 重要移除那些指向被过滤节点的边这些边来自被过滤的LLM节点
# 例如如果llm-question被过滤了那么llm-question → merge-response的边也应该被移除
additional_removed = 0
for edge in list(edges_to_keep): # 使用list副本因为我们要修改原列表
if edge['source'] in removed_source_nodes:
# 这条边来自被过滤的节点,也应该被移除
edges_to_keep.remove(edge)
additional_removed += 1
logger.info(f"[rjb] ❌ 移除来自被过滤节点的边: {edge.get('id')} ({edge.get('source')}{edge.get('target')})")
edges_removed_count += additional_removed
active_edges = edges_to_keep
filter_info = {
'branch': branch,
'edges_before': len(edges_before),
'edges_kept': len([e for e in edges_to_keep if e['source'] == next_node_id]),
'edges_removed': edges_removed_count
}
logger.info(f"[rjb] Switch节点过滤后: 保留{len(active_edges)}条边其中从Switch节点出发的{filter_info['edges_kept']}条),移除{edges_removed_count}条边")
# 记录过滤后的活跃边
remaining_switch_edges = [e for e in active_edges if e['source'] == next_node_id]
logger.info(f"[rjb] Switch节点过滤后剩余的边: {[e.get('id') + '->' + e.get('target') for e in remaining_switch_edges]}")
# 重要:找出那些不再可达的节点(这些节点只通过被移除的边连接)
removed_targets = set()
for edge in edges_before:
if edge not in edges_to_keep:
target_id = edge.get('target')
removed_targets.add(target_id)
logger.info(f"[rjb] ❌ 节点 {target_id} 的边已被移除,该节点将不会被执行")
# 关键修复:立即重新构建执行图,确保不再可达的节点不在执行图中
# 这样在下次循环时,这些节点就不会被选择执行
logger.info(f"[rjb] Switch节点过滤后重新构建执行图排除 {len(removed_targets)} 个不再可达的节点)")
# 同时记录到数据库
if self.logger:
self.logger.info(
f"Switch节点分支过滤: branch={branch}, 保留{filter_info['edges_kept']}条边,移除{edges_removed_count}条边",
node_id=next_node_id,
node_type='switch',
data=filter_info
)
elif node.get('type') == 'approval':
branch = result.get('branch', 'approved')
logger.info(f"[rjb] Approval节点分支过滤: node_id={next_node_id}, branch={branch}")
edges_to_keep = []
removed_source_nodes = set()
for edge in active_edges:
if edge['source'] == next_node_id:
edge_handle = edge.get('sourceHandle')
if edge_handle == branch:
edges_to_keep.append(edge)
else:
removed_source_nodes.add(edge.get('target'))
else:
edges_to_keep.append(edge)
for edge in list(edges_to_keep):
if edge['source'] in removed_source_nodes:
edges_to_keep.remove(edge)
active_edges = edges_to_keep
if self.logger:
self.logger.info(
f"Approval节点分支过滤: branch={branch}",
node_id=next_node_id,
node_type='approval',
)
# 如果是循环节点,跳过循环体的节点(循环体已在节点内部执行)
if node.get('type') in ['loop', 'foreach']:
# 标记循环体的节点为已执行(简化处理)
for edge in active_edges[:]: # 使用切片复制列表
if edge.get('source') == next_node_id:
target_id = edge.get('target')
if target_id in self.nodes:
# 检查是否是循环结束节点
target_node = self.nodes[target_id]
if target_node.get('type') not in ['loop_end', 'end']:
# 标记为已执行(循环体已在循环节点内部执行)
executed_nodes.add(target_id)
# 继续查找循环体内的节点
self._mark_loop_body_executed(target_id, executed_nodes, active_edges)
else:
# 执行失败或质量不达标 — 支持重试
failed_status = result.get('status', 'failed')
error_msg = result.get('error', '未知错误')
node_type = node.get('type', 'unknown')
# 处理 error_handler 返回的 retry_predecessor
if failed_status == 'retry_predecessor':
pred_id = result.get('predecessor_id')
eh_retry_count = result.get('retry_count', 1)
eh_retry_delay_ms = result.get('retry_delay_ms', 1000)
# 检查是否已有重试计数(多次经过 error_handler
prev_counter = self.node_outputs.get(f"_eh_retry_{pred_id}", {})
remaining = prev_counter.get("remaining", eh_retry_count)
if pred_id and pred_id in self.nodes and remaining > 0:
remaining -= 1
logger.warning(
f"error_handler 请求重试前驱节点 {pred_id},剩余 {remaining}"
)
executed_nodes.discard(pred_id)
self.node_outputs.pop(pred_id, None)
self.node_outputs[f"_eh_retry_{pred_id}"] = {
"remaining": remaining,
"delay_ms": eh_retry_delay_ms,
"error_handler_id": next_node_id,
}
executed_nodes.add(next_node_id)
execution_sequence.append(next_node_id)
results[next_node_id] = result
await asyncio.sleep(eh_retry_delay_ms / 1000.0)
break # 退出 for 循环while 循环下次迭代处理重试
elif remaining <= 0:
logger.error(f"error_handler 重试次数耗尽,前驱节点 {pred_id} 停止")
raise WorkflowExecutionError(
detail=f"error_handler 重试次数耗尽: {error_msg}",
node_id=pred_id,
)
# 检查节点级 retry_config
node_data = node.get('data', {}) or {}
retry_cfg = node_data.get('retry_config', {})
max_retries = retry_cfg.get('max_retries', 0) if isinstance(retry_cfg, dict) else 0
retry_delay_ms = retry_cfg.get('retry_delay_ms', 1000) if isinstance(retry_cfg, dict) else 1000
on_exhausted = retry_cfg.get('on_exhausted', 'stop') if isinstance(retry_cfg, dict) else 'stop'
retry_key = f"_retry_{next_node_id}"
retries_done = self.node_outputs.get(retry_key, 0)
if max_retries > 0 and retries_done < max_retries:
self.node_outputs[retry_key] = retries_done + 1
logger.warning(
f"节点 {next_node_id} ({node_type}) 执行失败,重试 {retries_done + 1}/{max_retries}: {error_msg}"
)
await asyncio.sleep(retry_delay_ms / 1000.0)
executed_nodes.discard(next_node_id)
break # 退出 for 循环while 循环下次迭代重新执行
# 重试耗尽或未配置重试
if retries_done >= max_retries and max_retries > 0:
if on_exhausted == 'skip':
logger.warning(f"节点 {next_node_id} 重试耗尽,跳过: {error_msg}")
self.node_outputs[next_node_id] = {
'status': 'skipped', 'error': error_msg
}
executed_nodes.add(next_node_id)
execution_sequence.append(next_node_id)
results[next_node_id] = result
continue
elif on_exhausted == 'notify':
logger.error(f"节点 {next_node_id} 重试耗尽,已通知: {error_msg}")
self.node_outputs[next_node_id] = {
'status': 'error_notified', 'error': error_msg
}
executed_nodes.add(next_node_id)
execution_sequence.append(next_node_id)
results[next_node_id] = result
continue
# 默认:停止工作流
logger.error(f"工作流执行失败 - 节点: {next_node_id} ({node_type}), 错误: {error_msg}")
raise WorkflowExecutionError(
detail=error_msg,
node_id=next_node_id
)
# 上报批次进度
if on_progress and execution_id:
try:
pct = int(len(executed_nodes) / total_nodes * 100) if total_nodes else 100
await on_progress(execution_id, {
"current": len(executed_nodes),
"total": total_nodes,
"progress": min(pct, 99),
"status": "running",
})
except Exception:
pass
# 默认:停止工作流
logger.error(f"工作流执行失败 - 节点: {next_node_id} ({node_type}), 错误: {error_msg}")
raise WorkflowExecutionError(
detail=error_msg,
node_id=next_node_id
)
# 返回最终结果:优先取 End 类型且无出边的节点,避免向量写入等侧链与 End 同为 sink 时
# 因 executed_nodes 为 set 迭代顺序不确定而错误返回 upsert 元数据。
if executed_nodes:
@@ -6052,7 +6086,19 @@ class WorkflowEngine:
# 记录工作流执行完成
if self.logger:
self.logger.info("工作流执行完成", data={"result": final_result.get('result')})
# 上报 100% 完成进度
if on_progress and execution_id:
try:
await on_progress(execution_id, {
"current": len(executed_nodes),
"total": total_nodes,
"progress": 100,
"status": "completed",
})
except Exception:
pass
return final_result
if self.logger: