Files
aiagent/backend/test_email_mq_integration.py
2026-01-19 00:09:36 +08:00

495 lines
17 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
集成测试:邮件节点和消息队列节点
通过API测试完整的工作流创建和执行流程
"""
import asyncio
import sys
import os
import json
import requests
import time
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
# API基础URL
API_BASE = "http://101.43.95.130:8037"
# API_BASE = "http://localhost:8037" # 本地测试
# 测试用户凭据(需要先注册)
TEST_USERNAME = "test_user"
TEST_PASSWORD = "test_password123"
def login():
"""登录获取token"""
print("=" * 60)
print("步骤1: 用户登录")
print("=" * 60)
# 先尝试注册(如果用户不存在)
try:
register_response = requests.post(
f"{API_BASE}/api/v1/auth/register",
json={
"username": TEST_USERNAME,
"password": TEST_PASSWORD,
"email": f"{TEST_USERNAME}@test.com"
}
)
if register_response.status_code == 201:
print(f"✅ 用户注册成功: {TEST_USERNAME}")
elif register_response.status_code == 400:
print(f" 用户已存在,直接登录")
else:
print(f"⚠️ 注册响应: {register_response.status_code}")
except Exception as e:
print(f"⚠️ 注册失败(可能用户已存在): {e}")
# 登录
try:
login_response = requests.post(
f"{API_BASE}/api/v1/auth/login",
data={
"username": TEST_USERNAME,
"password": TEST_PASSWORD
}
)
if login_response.status_code == 200:
token = login_response.json()["access_token"]
print(f"✅ 登录成功")
print(f" Token: {token[:20]}...")
return token
else:
print(f"❌ 登录失败: {login_response.status_code}")
print(f" 响应: {login_response.text}")
return None
except Exception as e:
print(f"❌ 登录异常: {e}")
return None
def test_email_node_workflow(token):
"""测试邮件节点工作流"""
print("\n" + "=" * 60)
print("步骤2: 测试邮件节点工作流")
print("=" * 60)
headers = {"Authorization": f"Bearer {token}"}
# 创建工作流(包含邮件节点)
workflow_data = {
"name": "测试邮件节点工作流",
"description": "测试邮件发送功能",
"nodes": [
{
"id": "start-1",
"type": "start",
"position": {"x": 100, "y": 100},
"data": {"label": "开始"}
},
{
"id": "email-1",
"type": "email",
"position": {"x": 300, "y": 100},
"data": {
"label": "发送邮件",
"smtp_host": "smtp.ethereal.email", # 测试SMTP服务器
"smtp_port": 587,
"smtp_user": "test@example.com",
"smtp_password": "test_password",
"use_tls": True,
"from_email": "sender@example.com",
"to_email": "recipient@example.com",
"subject": "测试邮件 - {test_key}",
"body": "这是一封测试邮件。\n\n测试数据: {test_data}\n时间: {timestamp}",
"body_type": "text"
}
},
{
"id": "end-1",
"type": "end",
"position": {"x": 500, "y": 100},
"data": {"label": "结束"}
}
],
"edges": [
{"id": "e1", "source": "start-1", "target": "email-1"},
{"id": "e2", "source": "email-1", "target": "end-1"}
]
}
try:
# 创建工作流
print("📝 创建工作流...")
create_response = requests.post(
f"{API_BASE}/api/v1/workflows",
headers=headers,
json=workflow_data
)
if create_response.status_code != 201:
print(f"❌ 创建工作流失败: {create_response.status_code}")
print(f" 响应: {create_response.text}")
return None
workflow = create_response.json()
workflow_id = workflow["id"]
print(f"✅ 工作流创建成功")
print(f" 工作流ID: {workflow_id}")
print(f" 节点数: {len(workflow['nodes'])}")
print(f" 边数: {len(workflow['edges'])}")
# 验证工作流
print("\n🔍 验证工作流...")
validate_response = requests.post(
f"{API_BASE}/api/v1/workflows/validate",
headers=headers,
json=workflow_data
)
if validate_response.status_code == 200:
validation = validate_response.json()
print(f"✅ 工作流验证通过")
print(f" 有效: {validation['valid']}")
print(f" 错误: {validation['errors']}")
print(f" 警告: {validation['warnings']}")
else:
print(f"⚠️ 验证失败: {validate_response.status_code}")
# 执行工作流注意会失败因为没有真实的SMTP服务器
print("\n🚀 执行工作流...")
print("⚠️ 注意: 由于没有配置真实的SMTP服务器执行会失败")
print(" 这是正常的,我们主要测试节点配置和识别")
input_data = {
"test_key": "Hello World",
"test_data": "这是测试数据",
"timestamp": "2024-01-01 12:00:00"
}
execute_response = requests.post(
f"{API_BASE}/api/v1/workflows/{workflow_id}/execute",
headers=headers,
json=input_data
)
if execute_response.status_code == 201:
execution = execute_response.json()
execution_id = execution["id"]
print(f"✅ 执行任务创建成功")
print(f" 执行ID: {execution_id}")
print(f" 状态: {execution['status']}")
# 等待一段时间后查询执行结果
print("\n⏳ 等待执行完成5秒...")
time.sleep(5)
execution_detail_response = requests.get(
f"{API_BASE}/api/v1/executions/{execution_id}",
headers=headers
)
if execution_detail_response.status_code == 200:
execution_detail = execution_detail_response.json()
print(f"✅ 执行详情获取成功")
print(f" 状态: {execution_detail['status']}")
if execution_detail.get('error_message'):
print(f" 错误信息: {execution_detail['error_message']}")
if execution_detail.get('output_data'):
print(f" 输出数据: {json.dumps(execution_detail['output_data'], ensure_ascii=False, indent=2)[:200]}")
else:
print(f"⚠️ 执行失败: {execute_response.status_code}")
print(f" 响应: {execute_response.text}")
return workflow_id
except Exception as e:
print(f"❌ 测试异常: {e}")
import traceback
traceback.print_exc()
return None
def test_message_queue_node_workflow(token):
"""测试消息队列节点工作流"""
print("\n" + "=" * 60)
print("步骤3: 测试消息队列节点工作流")
print("=" * 60)
headers = {"Authorization": f"Bearer {token}"}
# 创建工作流包含RabbitMQ节点
workflow_data = {
"name": "测试消息队列节点工作流",
"description": "测试RabbitMQ消息发送功能",
"nodes": [
{
"id": "start-1",
"type": "start",
"position": {"x": 100, "y": 100},
"data": {"label": "开始"}
},
{
"id": "mq-1",
"type": "message_queue",
"position": {"x": 300, "y": 100},
"data": {
"label": "发送到RabbitMQ",
"queue_type": "rabbitmq",
"host": "localhost",
"port": 5672,
"username": "guest",
"password": "guest",
"queue_name": "test_queue",
"routing_key": "test.routing.key",
"message": {
"test_key": "{test_key}",
"test_data": "{test_data}",
"timestamp": "{timestamp}"
}
}
},
{
"id": "end-1",
"type": "end",
"position": {"x": 500, "y": 100},
"data": {"label": "结束"}
}
],
"edges": [
{"id": "e1", "source": "start-1", "target": "mq-1"},
{"id": "e2", "source": "mq-1", "target": "end-1"}
]
}
try:
# 创建工作流
print("📝 创建工作流...")
create_response = requests.post(
f"{API_BASE}/api/v1/workflows",
headers=headers,
json=workflow_data
)
if create_response.status_code != 201:
print(f"❌ 创建工作流失败: {create_response.status_code}")
print(f" 响应: {create_response.text}")
return None
workflow = create_response.json()
workflow_id = workflow["id"]
print(f"✅ 工作流创建成功")
print(f" 工作流ID: {workflow_id}")
print(f" 节点数: {len(workflow['nodes'])}")
print(f" 边数: {len(workflow['edges'])}")
# 验证工作流
print("\n🔍 验证工作流...")
validate_response = requests.post(
f"{API_BASE}/api/v1/workflows/validate",
headers=headers,
json=workflow_data
)
if validate_response.status_code == 200:
validation = validate_response.json()
print(f"✅ 工作流验证通过")
print(f" 有效: {validation['valid']}")
print(f" 错误: {validation['errors']}")
print(f" 警告: {validation['warnings']}")
else:
print(f"⚠️ 验证失败: {validate_response.status_code}")
# 执行工作流注意会失败因为没有RabbitMQ服务器
print("\n🚀 执行工作流...")
print("⚠️ 注意: 由于没有运行RabbitMQ服务器执行会失败")
print(" 这是正常的,我们主要测试节点配置和识别")
input_data = {
"test_key": "Hello RabbitMQ",
"test_data": "这是测试数据",
"timestamp": "2024-01-01 12:00:00"
}
execute_response = requests.post(
f"{API_BASE}/api/v1/workflows/{workflow_id}/execute",
headers=headers,
json=input_data
)
if execute_response.status_code == 201:
execution = execute_response.json()
execution_id = execution["id"]
print(f"✅ 执行任务创建成功")
print(f" 执行ID: {execution_id}")
print(f" 状态: {execution['status']}")
# 等待一段时间后查询执行结果
print("\n⏳ 等待执行完成5秒...")
time.sleep(5)
execution_detail_response = requests.get(
f"{API_BASE}/api/v1/executions/{execution_id}",
headers=headers
)
if execution_detail_response.status_code == 200:
execution_detail = execution_detail_response.json()
print(f"✅ 执行详情获取成功")
print(f" 状态: {execution_detail['status']}")
if execution_detail.get('error_message'):
print(f" 错误信息: {execution_detail['error_message']}")
if execution_detail.get('output_data'):
print(f" 输出数据: {json.dumps(execution_detail['output_data'], ensure_ascii=False, indent=2)[:200]}")
else:
print(f"⚠️ 执行失败: {execute_response.status_code}")
print(f" 响应: {execute_response.text}")
return workflow_id
except Exception as e:
print(f"❌ 测试异常: {e}")
import traceback
traceback.print_exc()
return None
def test_kafka_node_workflow(token):
"""测试Kafka节点工作流"""
print("\n" + "=" * 60)
print("步骤4: 测试Kafka节点工作流")
print("=" * 60)
headers = {"Authorization": f"Bearer {token}"}
# 创建工作流包含Kafka节点
workflow_data = {
"name": "测试Kafka节点工作流",
"description": "测试Kafka消息发送功能",
"nodes": [
{
"id": "start-1",
"type": "start",
"position": {"x": 100, "y": 100},
"data": {"label": "开始"}
},
{
"id": "kafka-1",
"type": "kafka",
"position": {"x": 300, "y": 100},
"data": {
"label": "发送到Kafka",
"queue_type": "kafka",
"bootstrap_servers": "localhost:9092",
"topic": "test_topic",
"message": {
"test_key": "{test_key}",
"test_data": "{test_data}",
"timestamp": "{timestamp}"
}
}
},
{
"id": "end-1",
"type": "end",
"position": {"x": 500, "y": 100},
"data": {"label": "结束"}
}
],
"edges": [
{"id": "e1", "source": "start-1", "target": "kafka-1"},
{"id": "e2", "source": "kafka-1", "target": "end-1"}
]
}
try:
# 创建工作流
print("📝 创建工作流...")
create_response = requests.post(
f"{API_BASE}/api/v1/workflows",
headers=headers,
json=workflow_data
)
if create_response.status_code != 201:
print(f"❌ 创建工作流失败: {create_response.status_code}")
print(f" 响应: {create_response.text}")
return None
workflow = create_response.json()
workflow_id = workflow["id"]
print(f"✅ 工作流创建成功")
print(f" 工作流ID: {workflow_id}")
# 验证工作流
print("\n🔍 验证工作流...")
validate_response = requests.post(
f"{API_BASE}/api/v1/workflows/validate",
headers=headers,
json=workflow_data
)
if validate_response.status_code == 200:
validation = validate_response.json()
print(f"✅ 工作流验证通过")
print(f" 有效: {validation['valid']}")
print(f" 错误: {validation['errors']}")
print(f" 警告: {validation['warnings']}")
return workflow_id
except Exception as e:
print(f"❌ 测试异常: {e}")
import traceback
traceback.print_exc()
return None
def main():
"""主测试函数"""
print("\n" + "=" * 60)
print("邮件节点和消息队列节点集成测试")
print("=" * 60)
print()
# 步骤1: 登录
token = login()
if not token:
print("\n❌ 登录失败,无法继续测试")
return
# 步骤2: 测试邮件节点
email_workflow_id = test_email_node_workflow(token)
# 步骤3: 测试RabbitMQ节点
mq_workflow_id = test_message_queue_node_workflow(token)
# 步骤4: 测试Kafka节点
kafka_workflow_id = test_kafka_node_workflow(token)
# 总结
print("\n" + "=" * 60)
print("测试总结")
print("=" * 60)
print(f"✅ 登录: 成功")
print(f"{'' if email_workflow_id else ''} 邮件节点工作流: {'成功' if email_workflow_id else '失败'}")
print(f"{'' if mq_workflow_id else ''} RabbitMQ节点工作流: {'成功' if mq_workflow_id else '失败'}")
print(f"{'' if kafka_workflow_id else ''} Kafka节点工作流: {'成功' if kafka_workflow_id else '失败'}")
print("\n📝 说明:")
print(" - 工作流创建和验证测试通过 ✅")
print(" - 节点配置和识别测试通过 ✅")
print(" - 实际执行需要配置真实的SMTP/RabbitMQ/Kafka服务器")
print(" - 执行失败是正常的,因为测试环境没有这些服务")
print("\n🎯 下一步:")
print(" 1. 配置真实的SMTP服务器测试邮件节点")
print(" 2. 启动RabbitMQ服务器测试消息队列节点")
print(" 3. 启动Kafka服务器测试Kafka节点")
print(" 4. 在前端界面中测试这些节点")
if __name__ == "__main__":
main()