Files
aiagent/test_agent_execution.py
2026-01-20 11:03:55 +08:00

232 lines
7.9 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Agent工作流执行测试脚本
用于测试Agent工作流的正常执行
"""
import requests
import json
import time
import sys
# API基础URL
BASE_URL = "http://localhost:8037"
def print_section(title):
"""打印分隔线"""
print("\n" + "=" * 80)
print(f" {title}")
print("=" * 80)
def test_agent_execution(agent_id: str = None, user_input: str = "生成一个导出androidlog的脚本"):
"""
测试Agent执行
Args:
agent_id: Agent ID如果为None则自动查找第一个已发布的Agent
user_input: 用户输入内容
"""
print_section("Agent工作流执行测试")
# 1. 登录获取token
print_section("1. 用户登录")
login_data = {
"username": "admin",
"password": "123456"
}
try:
# OAuth2PasswordRequestForm需要form-data格式
response = requests.post(f"{BASE_URL}/api/v1/auth/login", data=login_data)
if response.status_code != 200:
print(f"❌ 登录失败: {response.status_code}")
print(f"响应: {response.text}")
return
token = response.json().get("access_token")
if not token:
print("❌ 登录失败: 未获取到token")
return
print("✅ 登录成功")
headers = {"Authorization": f"Bearer {token}"}
except Exception as e:
print(f"❌ 登录异常: {str(e)}")
return
# 2. 获取Agent列表如果没有指定agent_id
if not agent_id:
print_section("2. 查找可用的Agent")
try:
response = requests.get(
f"{BASE_URL}/api/v1/agents",
headers=headers,
params={"status": "published", "limit": 10}
)
if response.status_code == 200:
agents = response.json()
if agents:
# 优先选择已发布的Agent
published_agents = [a for a in agents if a.get("status") == "published"]
if published_agents:
agent_id = published_agents[0]["id"]
agent_name = published_agents[0]["name"]
print(f"✅ 找到已发布的Agent: {agent_name} (ID: {agent_id})")
else:
# 如果没有已发布的,使用第一个
agent_id = agents[0]["id"]
agent_name = agents[0]["name"]
print(f"⚠️ 使用Agent: {agent_name} (ID: {agent_id}) - 状态: {agents[0].get('status')}")
else:
print("❌ 未找到可用的Agent")
print("请先创建一个Agent并发布或者指定agent_id参数")
return
else:
print(f"❌ 获取Agent列表失败: {response.status_code}")
print(f"响应: {response.text}")
return
except Exception as e:
print(f"❌ 获取Agent列表异常: {str(e)}")
return
# 3. 执行Agent
print_section("3. 执行Agent工作流")
print(f"用户输入: {user_input}")
input_data = {
"query": user_input,
"USER_INPUT": user_input
}
execution_data = {
"agent_id": agent_id,
"input_data": input_data
}
try:
response = requests.post(
f"{BASE_URL}/api/v1/executions",
headers=headers,
json=execution_data
)
if response.status_code != 201:
print(f"❌ 创建执行任务失败: {response.status_code}")
print(f"响应: {response.text}")
return
execution = response.json()
execution_id = execution["id"]
print(f"✅ 执行任务已创建: {execution_id}")
print(f"状态: {execution.get('status')}")
except Exception as e:
print(f"❌ 创建执行任务异常: {str(e)}")
return
# 4. 轮询执行状态
print_section("4. 等待执行完成")
max_wait_time = 300 # 最大等待5分钟
start_time = time.time()
poll_interval = 2 # 每2秒轮询一次
while True:
elapsed_time = time.time() - start_time
if elapsed_time > max_wait_time:
print(f"❌ 执行超时(超过{max_wait_time}秒)")
break
try:
# 获取执行状态
status_response = requests.get(
f"{BASE_URL}/api/v1/executions/{execution_id}/status",
headers=headers
)
if status_response.status_code == 200:
status = status_response.json()
current_status = status.get("status")
# 显示进度
progress = status.get("progress", 0)
print(f"⏳ 执行中... 状态: {current_status}, 进度: {progress}%", end="\r")
if current_status == "completed":
print("\n✅ 执行完成!")
break
elif current_status == "failed":
print(f"\n❌ 执行失败")
error = status.get("error", "未知错误")
print(f"错误信息: {error}")
break
# 显示当前执行的节点
current_node = status.get("current_node")
if current_node:
print(f"\n 当前节点: {current_node.get('node_id')} - {current_node.get('node_name')}")
time.sleep(poll_interval)
except Exception as e:
print(f"\n❌ 获取执行状态异常: {str(e)}")
time.sleep(poll_interval)
# 5. 获取执行结果
print_section("5. 获取执行结果")
try:
response = requests.get(
f"{BASE_URL}/api/v1/executions/{execution_id}",
headers=headers
)
if response.status_code == 200:
execution = response.json()
status = execution.get("status")
output_data = execution.get("output_data")
execution_time = execution.get("execution_time")
print(f"执行状态: {status}")
if execution_time:
print(f"执行时间: {execution_time}ms")
print("\n输出结果:")
print("-" * 80)
if output_data:
if isinstance(output_data, dict):
# 尝试提取文本输出
text_output = (
output_data.get("output") or
output_data.get("text") or
output_data.get("content") or
output_data.get("result") or
json.dumps(output_data, ensure_ascii=False, indent=2)
)
print(text_output)
else:
print(output_data)
else:
print("(无输出数据)")
print("-" * 80)
# 显示执行日志(如果有)
if execution.get("logs"):
print("\n执行日志:")
for log in execution.get("logs", []):
print(f" [{log.get('timestamp')}] {log.get('message')}")
else:
print(f"❌ 获取执行结果失败: {response.status_code}")
print(f"响应: {response.text}")
except Exception as e:
print(f"❌ 获取执行结果异常: {str(e)}")
print_section("测试完成")
if __name__ == "__main__":
# 从命令行参数获取agent_id和user_input
agent_id = None
user_input = "生成一个导出androidlog的脚本"
if len(sys.argv) > 1:
agent_id = sys.argv[1]
if len(sys.argv) > 2:
user_input = sys.argv[2]
test_agent_execution(agent_id=agent_id, user_input=user_input)