Files
aiagent/scripts/tools/debug_switch_node.py
renjianbo eabf90c496 feat: add AI学习助手 agent (KG+RAG ideal) and renshenguo feishu bot
- Add AI学习助手 agent creation script with all 39 tools, 3-layer KG+RAG memory
- Add renshenguo (人参果) feishu bot integration (app_service + ws_handler)
- Register renshenguo WS client in main.py startup
- Add RENSHENGUO_APP_ID / RENSHENGUO_APP_SECRET / RENSHENGUO_AGENT_ID config
- Reorganize docs from root into docs/ subdirectories
- Move startup scripts to scripts/startup/
- Various backend optimizations and tool improvements

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-06 01:37:13 +08:00

109 lines
3.6 KiB
Python

#!/usr/bin/env python3
"""
调试Switch节点的详细脚本
"""
import sys
import os
import json
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'backend'))
from app.core.database import SessionLocal
from app.models.execution import Execution
from app.models.agent import Agent
def main():
db = SessionLocal()
try:
# 获取最近的执行记录
execution = db.query(Execution).filter(
Execution.agent_id.isnot(None)
).order_by(Execution.created_at.desc()).first()
if not execution:
print("❌ 没有找到执行记录")
return
print(f"执行ID: {execution.id}")
print(f"状态: {execution.status}")
print()
# 获取Agent配置
agent = db.query(Agent).filter(Agent.id == execution.agent_id).first()
if not agent:
print("❌ 没有找到Agent")
return
workflow_config = agent.workflow_config
nodes = workflow_config.get('nodes', [])
edges = workflow_config.get('edges', [])
# 找到Switch节点
switch_node = None
for node in nodes:
if node.get('type') == 'switch':
switch_node = node
break
if not switch_node:
print("❌ 没有找到Switch节点")
return
print("=" * 80)
print("Switch节点配置:")
print("=" * 80)
print(f"节点ID: {switch_node['id']}")
print(f"字段: {switch_node['data'].get('field')}")
print(f"Cases: {json.dumps(switch_node['data'].get('cases', {}), ensure_ascii=False, indent=2)}")
print(f"Default: {switch_node['data'].get('default')}")
print()
# 找到从Switch节点出发的边
print("=" * 80)
print("从Switch节点出发的边:")
print("=" * 80)
switch_edges = [e for e in edges if e.get('source') == switch_node['id']]
for edge in switch_edges:
print(f"边ID: {edge.get('id')}")
print(f" sourceHandle: {edge.get('sourceHandle')}")
print(f" target: {edge.get('target')}")
print()
# 查看执行结果
print("=" * 80)
print("执行结果中的节点输出:")
print("=" * 80)
if execution.output_data and 'node_results' in execution.output_data:
node_results = execution.output_data['node_results']
if switch_node['id'] in node_results:
switch_result = node_results[switch_node['id']]
print(f"Switch节点输出: {json.dumps(switch_result, ensure_ascii=False, indent=2)}")
else:
print("❌ Switch节点没有输出结果")
else:
print("❌ 没有找到节点执行结果")
# 检查哪些分支节点执行了
print()
print("=" * 80)
print("执行了的分支节点:")
print("=" * 80)
if execution.output_data and 'node_results' in execution.output_data:
node_results = execution.output_data['node_results']
for edge in switch_edges:
target_id = edge.get('target')
if target_id in node_results:
print(f"{target_id} (sourceHandle: {edge.get('sourceHandle')})")
else:
print(f"{target_id} (sourceHandle: {edge.get('sourceHandle')}) - 未执行")
except Exception as e:
print(f"❌ 错误: {str(e)}")
import traceback
traceback.print_exc()
finally:
db.close()
if __name__ == "__main__":
main()