Files
aiagent/debug_switch_node.py

109 lines
3.6 KiB
Python
Raw Permalink Normal View History

2026-01-22 09:59:02 +08:00
#!/usr/bin/env python3
"""
调试Switch节点的详细脚本
"""
import sys
import os
import json
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'backend'))
from app.core.database import SessionLocal
from app.models.execution import Execution
from app.models.agent import Agent
def main():
db = SessionLocal()
try:
# 获取最近的执行记录
execution = db.query(Execution).filter(
Execution.agent_id.isnot(None)
).order_by(Execution.created_at.desc()).first()
if not execution:
print("❌ 没有找到执行记录")
return
print(f"执行ID: {execution.id}")
print(f"状态: {execution.status}")
print()
# 获取Agent配置
agent = db.query(Agent).filter(Agent.id == execution.agent_id).first()
if not agent:
print("❌ 没有找到Agent")
return
workflow_config = agent.workflow_config
nodes = workflow_config.get('nodes', [])
edges = workflow_config.get('edges', [])
# 找到Switch节点
switch_node = None
for node in nodes:
if node.get('type') == 'switch':
switch_node = node
break
if not switch_node:
print("❌ 没有找到Switch节点")
return
print("=" * 80)
print("Switch节点配置:")
print("=" * 80)
print(f"节点ID: {switch_node['id']}")
print(f"字段: {switch_node['data'].get('field')}")
print(f"Cases: {json.dumps(switch_node['data'].get('cases', {}), ensure_ascii=False, indent=2)}")
print(f"Default: {switch_node['data'].get('default')}")
print()
# 找到从Switch节点出发的边
print("=" * 80)
print("从Switch节点出发的边:")
print("=" * 80)
switch_edges = [e for e in edges if e.get('source') == switch_node['id']]
for edge in switch_edges:
print(f"边ID: {edge.get('id')}")
print(f" sourceHandle: {edge.get('sourceHandle')}")
print(f" target: {edge.get('target')}")
print()
# 查看执行结果
print("=" * 80)
print("执行结果中的节点输出:")
print("=" * 80)
if execution.output_data and 'node_results' in execution.output_data:
node_results = execution.output_data['node_results']
if switch_node['id'] in node_results:
switch_result = node_results[switch_node['id']]
print(f"Switch节点输出: {json.dumps(switch_result, ensure_ascii=False, indent=2)}")
else:
print("❌ Switch节点没有输出结果")
else:
print("❌ 没有找到节点执行结果")
# 检查哪些分支节点执行了
print()
print("=" * 80)
print("执行了的分支节点:")
print("=" * 80)
if execution.output_data and 'node_results' in execution.output_data:
node_results = execution.output_data['node_results']
for edge in switch_edges:
target_id = edge.get('target')
if target_id in node_results:
print(f"{target_id} (sourceHandle: {edge.get('sourceHandle')})")
else:
print(f"{target_id} (sourceHandle: {edge.get('sourceHandle')}) - 未执行")
except Exception as e:
print(f"❌ 错误: {str(e)}")
import traceback
traceback.print_exc()
finally:
db.close()
if __name__ == "__main__":
main()