#!/usr/bin/env python3 """ Agent工作流执行测试脚本 用于测试Agent工作流的正常执行 """ import requests import json import time import sys # API基础URL BASE_URL = "http://localhost:8037" def print_section(title): """打印分隔线""" print("\n" + "=" * 80) print(f" {title}") print("=" * 80) def test_agent_execution(agent_id: str = None, user_input: str = "生成一个导出androidlog的脚本"): """ 测试Agent执行 Args: agent_id: Agent ID,如果为None则自动查找第一个已发布的Agent user_input: 用户输入内容 """ print_section("Agent工作流执行测试") # 1. 登录获取token print_section("1. 用户登录") login_data = { "username": "admin", "password": "123456" } try: # OAuth2PasswordRequestForm需要form-data格式 response = requests.post(f"{BASE_URL}/api/v1/auth/login", data=login_data) if response.status_code != 200: print(f"❌ 登录失败: {response.status_code}") print(f"响应: {response.text}") return token = response.json().get("access_token") if not token: print("❌ 登录失败: 未获取到token") return print("✅ 登录成功") headers = {"Authorization": f"Bearer {token}"} except Exception as e: print(f"❌ 登录异常: {str(e)}") return # 2. 获取Agent列表(如果没有指定agent_id) if not agent_id: print_section("2. 查找可用的Agent") try: response = requests.get( f"{BASE_URL}/api/v1/agents", headers=headers, params={"status": "published", "limit": 10} ) if response.status_code == 200: agents = response.json() if agents: # 优先选择已发布的Agent published_agents = [a for a in agents if a.get("status") == "published"] if published_agents: agent_id = published_agents[0]["id"] agent_name = published_agents[0]["name"] print(f"✅ 找到已发布的Agent: {agent_name} (ID: {agent_id})") else: # 如果没有已发布的,使用第一个 agent_id = agents[0]["id"] agent_name = agents[0]["name"] print(f"⚠️ 使用Agent: {agent_name} (ID: {agent_id}) - 状态: {agents[0].get('status')}") else: print("❌ 未找到可用的Agent") print("请先创建一个Agent并发布,或者指定agent_id参数") return else: print(f"❌ 获取Agent列表失败: {response.status_code}") print(f"响应: {response.text}") return except Exception as e: print(f"❌ 获取Agent列表异常: {str(e)}") return # 3. 执行Agent print_section("3. 执行Agent工作流") print(f"用户输入: {user_input}") input_data = { "query": user_input, "USER_INPUT": user_input } execution_data = { "agent_id": agent_id, "input_data": input_data } try: response = requests.post( f"{BASE_URL}/api/v1/executions", headers=headers, json=execution_data ) if response.status_code != 201: print(f"❌ 创建执行任务失败: {response.status_code}") print(f"响应: {response.text}") return execution = response.json() execution_id = execution["id"] print(f"✅ 执行任务已创建: {execution_id}") print(f"状态: {execution.get('status')}") except Exception as e: print(f"❌ 创建执行任务异常: {str(e)}") return # 4. 轮询执行状态 print_section("4. 等待执行完成") max_wait_time = 300 # 最大等待5分钟 start_time = time.time() poll_interval = 2 # 每2秒轮询一次 while True: elapsed_time = time.time() - start_time if elapsed_time > max_wait_time: print(f"❌ 执行超时(超过{max_wait_time}秒)") break try: # 获取执行状态 status_response = requests.get( f"{BASE_URL}/api/v1/executions/{execution_id}/status", headers=headers ) if status_response.status_code == 200: status = status_response.json() current_status = status.get("status") # 显示进度 progress = status.get("progress", 0) print(f"⏳ 执行中... 状态: {current_status}, 进度: {progress}%", end="\r") if current_status == "completed": print("\n✅ 执行完成!") break elif current_status == "failed": print(f"\n❌ 执行失败") error = status.get("error", "未知错误") print(f"错误信息: {error}") break # 显示当前执行的节点 current_node = status.get("current_node") if current_node: print(f"\n 当前节点: {current_node.get('node_id')} - {current_node.get('node_name')}") time.sleep(poll_interval) except Exception as e: print(f"\n❌ 获取执行状态异常: {str(e)}") time.sleep(poll_interval) # 5. 获取执行结果 print_section("5. 获取执行结果") try: response = requests.get( f"{BASE_URL}/api/v1/executions/{execution_id}", headers=headers ) if response.status_code == 200: execution = response.json() status = execution.get("status") output_data = execution.get("output_data") execution_time = execution.get("execution_time") print(f"执行状态: {status}") if execution_time: print(f"执行时间: {execution_time}ms") print("\n输出结果:") print("-" * 80) if output_data: if isinstance(output_data, dict): # 尝试提取文本输出 text_output = ( output_data.get("output") or output_data.get("text") or output_data.get("content") or output_data.get("result") or json.dumps(output_data, ensure_ascii=False, indent=2) ) print(text_output) else: print(output_data) else: print("(无输出数据)") print("-" * 80) # 显示执行日志(如果有) if execution.get("logs"): print("\n执行日志:") for log in execution.get("logs", []): print(f" [{log.get('timestamp')}] {log.get('message')}") else: print(f"❌ 获取执行结果失败: {response.status_code}") print(f"响应: {response.text}") except Exception as e: print(f"❌ 获取执行结果异常: {str(e)}") print_section("测试完成") if __name__ == "__main__": # 从命令行参数获取agent_id和user_input agent_id = None user_input = "生成一个导出androidlog的脚本" if len(sys.argv) > 1: agent_id = sys.argv[1] if len(sys.argv) > 2: user_input = sys.argv[2] test_agent_execution(agent_id=agent_id, user_input=user_input)