#!/usr/bin/env python3 """ 测试工具调用可视化功能 创建一个简单的Agent,使用工具调用,然后查看执行详情 """ import requests import json import time import sys BASE_URL = "http://localhost:8037" def print_section(title): print("\n" + "=" * 80) print(f" {title}") print("=" * 80) def print_info(message): print(f"ℹ️ {message}") def print_success(message): print(f"✅ {message}") def print_error(message): print(f"❌ {message}") def login(): """用户登录""" print_section("1. 用户登录") login_data = {"username": "admin", "password": "123456"} try: response = requests.post(f"{BASE_URL}/api/v1/auth/login", data=login_data) if response.status_code != 200: print_error(f"登录失败: {response.status_code}") return None token = response.json().get("access_token") if not token: print_error("登录失败: 未获取到token") return None print_success(f"登录成功") return {"Authorization": f"Bearer {token}"} except Exception as e: print_error(f"登录异常: {str(e)}") return None def create_test_workflow(headers): """创建测试工作流(使用工具调用)""" print_section("2. 创建测试工作流") workflow_data = { "name": "工具调用可视化测试工作流", "description": "用于测试工具调用可视化功能", "nodes": [ { "id": "start", "type": "start", "position": {"x": 100, "y": 200}, "data": {"label": "开始"} }, { "id": "llm-with-tools", "type": "llm", "position": {"x": 400, "y": 200}, "data": { "label": "工具调用测试", "provider": "deepseek", "model": "deepseek-chat", "temperature": 0.7, "max_tokens": 1000, "enable_tools": True, "selected_tools": ["http_request", "datetime", "math_calculate"], "prompt": """用户请求:{{input.query}} 请根据用户需求,选择合适的工具执行任务。可以使用以下工具: - http_request: 发送HTTP请求 - datetime: 获取当前时间 - math_calculate: 执行数学计算 请分析用户需求,调用合适的工具,然后基于工具返回的结果生成回复。""" } }, { "id": "end", "type": "end", "position": {"x": 700, "y": 200}, "data": {"label": "结束"} } ], "edges": [ { "id": "e1", "source": "start", "target": "llm-with-tools", "sourceHandle": "right", "targetHandle": "left" }, { "id": "e2", "source": "llm-with-tools", "target": "end", "sourceHandle": "right", "targetHandle": "left" } ] } try: response = requests.post( f"{BASE_URL}/api/v1/workflows", headers=headers, json=workflow_data ) if response.status_code not in [200, 201]: print_error(f"创建工作流失败: {response.status_code}") print(f"响应: {response.text}") return None workflow = response.json() print_success(f"工作流创建成功: {workflow.get('id')}") return workflow except Exception as e: print_error(f"创建工作流异常: {str(e)}") return None def execute_workflow(headers, workflow_id, input_data): """执行工作流""" print_section("3. 执行工作流") execution_data = { "workflow_id": workflow_id, "input_data": input_data } try: response = requests.post( f"{BASE_URL}/api/v1/executions", headers=headers, json=execution_data ) if response.status_code not in [200, 201]: print_error(f"执行工作流失败: {response.status_code}") print(f"响应: {response.text}") return None execution = response.json() execution_id = execution.get("id") print_success(f"执行已创建: {execution_id}") return execution_id except Exception as e: print_error(f"执行工作流异常: {str(e)}") return None def wait_for_completion(headers, execution_id, timeout=60): """等待执行完成""" print_section("4. 等待执行完成") start_time = time.time() while time.time() - start_time < timeout: try: response = requests.get( f"{BASE_URL}/api/v1/executions/{execution_id}", headers=headers ) if response.status_code != 200: print_error(f"获取执行状态失败: {response.status_code}") return None execution = response.json() status = execution.get("status") print_info(f"执行状态: {status}") if status in ["completed", "failed"]: print_success(f"执行完成,状态: {status}") return execution time.sleep(2) except Exception as e: print_error(f"获取执行状态异常: {str(e)}") return None print_error("执行超时") return None def get_execution_logs(headers, execution_id): """获取执行日志""" print_section("5. 获取执行日志(包含工具调用信息)") try: response = requests.get( f"{BASE_URL}/api/v1/execution-logs/executions/{execution_id}", headers=headers, params={"limit": 100} ) if response.status_code != 200: print_error(f"获取执行日志失败: {response.status_code}") print(f"响应: {response.text}") return None logs = response.json() print_success(f"获取到 {len(logs)} 条日志") # 查找工具调用相关的日志 tool_call_logs = [] for log in logs: if not log: continue data = log.get("data") or {} if isinstance(data, str): try: data = json.loads(data) except: data = {} if data.get("tool_name") or "工具" in log.get("message", ""): tool_call_logs.append(log) if tool_call_logs: print_success(f"找到 {len(tool_call_logs)} 条工具调用日志") print("\n工具调用日志详情:") for i, log in enumerate(tool_call_logs, 1): print(f"\n{i}. {log.get('message')}") print(f" 时间: {log.get('timestamp')}") print(f" 节点: {log.get('node_id')}") data = log.get("data", {}) if data.get("tool_name"): print(f" 工具名称: {data.get('tool_name')}") print(f" 状态: {data.get('status')}") if data.get("tool_args"): print(f" 参数: {json.dumps(data.get('tool_args'), ensure_ascii=False, indent=2)}") if data.get("duration"): print(f" 耗时: {data.get('duration')}ms") else: print_info("未找到工具调用日志") return logs except Exception as e: print_error(f"获取执行日志异常: {str(e)}") return None def get_node_execution_data(headers, execution_id, node_id): """获取节点执行数据""" print_section(f"6. 获取节点执行数据 (节点: {node_id})") try: response = requests.get( f"{BASE_URL}/api/v1/execution-logs/executions/{execution_id}", headers=headers, params={"node_id": node_id, "limit": 100} ) if response.status_code != 200: print_error(f"获取节点执行数据失败: {response.status_code}") return None logs = response.json() print_success(f"获取到 {len(logs)} 条节点日志") # 显示工具调用信息 tool_calls = [] for log in logs: if not log: continue data = log.get("data") or {} if isinstance(data, str): try: data = json.loads(data) except: data = {} if data.get("tool_name"): tool_calls.append({ "tool_name": data.get("tool_name"), "status": data.get("status"), "args": data.get("tool_args"), "result": data.get("tool_result"), "duration": data.get("duration"), "timestamp": log.get("timestamp") }) if tool_calls: print_success(f"找到 {len(tool_calls)} 个工具调用") for i, call in enumerate(tool_calls, 1): print(f"\n工具调用 {i}:") print(f" 工具: {call['tool_name']}") print(f" 状态: {call['status']}") print(f" 参数: {json.dumps(call['args'], ensure_ascii=False, indent=2) if call['args'] else '无'}") if call.get('result'): result_preview = call['result'][:200] if len(call['result']) > 200 else call['result'] print(f" 结果预览: {result_preview}...") print(f" 耗时: {call.get('duration', 'N/A')}ms") print(f" 时间: {call['timestamp']}") else: print_info("该节点没有工具调用") return logs except Exception as e: print_error(f"获取节点执行数据异常: {str(e)}") return None def main(): """主函数""" print_section("工具调用可视化功能测试") # 1. 登录 headers = login() if not headers: return # 2. 创建测试工作流 workflow = create_test_workflow(headers) if not workflow: return workflow_id = workflow.get("id") # 3. 执行工作流(使用不同的测试用例) test_cases = [ { "name": "测试HTTP请求工具", "input": {"query": "请查询 https://api.github.com/users/octocat 的信息"} }, { "name": "测试时间工具", "input": {"query": "现在是什么时间?"} }, { "name": "测试数学计算工具", "input": {"query": "计算 123 * 456 的结果"} } ] for i, test_case in enumerate(test_cases, 1): print_section(f"测试用例 {i}: {test_case['name']}") # 执行工作流 execution_id = execute_workflow(headers, workflow_id, test_case["input"]) if not execution_id: continue # 等待完成 execution = wait_for_completion(headers, execution_id) if not execution: continue # 获取执行日志 logs = get_execution_logs(headers, execution_id) # 获取节点执行数据 node_logs = get_node_execution_data(headers, execution_id, "llm-with-tools") print_success(f"测试用例 {i} 完成") print("\n" + "-" * 80) print_section("测试完成") print_success("所有测试用例执行完成!") print_info("请在前端查看执行详情,验证工具调用可视化功能:") print_info(f"1. 打开执行详情页面: http://localhost:8038/executions/{execution_id}") print_info("2. 点击节点查看节点执行详情") print_info("3. 检查工具调用可视化是否正确显示") if __name__ == "__main__": main()