289 lines
9.3 KiB
Python
289 lines
9.3 KiB
Python
|
|
"""
|
|||
|
|
测试邮件节点和消息队列节点
|
|||
|
|
"""
|
|||
|
|
import asyncio
|
|||
|
|
import sys
|
|||
|
|
import os
|
|||
|
|
import json
|
|||
|
|
|
|||
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|||
|
|
|
|||
|
|
from app.services.workflow_engine import WorkflowEngine
|
|||
|
|
|
|||
|
|
|
|||
|
|
async def test_email_node():
|
|||
|
|
"""测试邮件节点"""
|
|||
|
|
print("=" * 60)
|
|||
|
|
print("测试1: 邮件节点")
|
|||
|
|
print("=" * 60)
|
|||
|
|
|
|||
|
|
# 注意:这里使用测试SMTP服务器,实际使用时需要配置真实的SMTP服务器
|
|||
|
|
# 可以使用 https://mailtrap.io 或 https://ethereal.email 进行测试
|
|||
|
|
|
|||
|
|
workflow_data = {
|
|||
|
|
"nodes": [
|
|||
|
|
{
|
|||
|
|
"id": "start-1",
|
|||
|
|
"type": "start",
|
|||
|
|
"data": {"label": "开始"}
|
|||
|
|
},
|
|||
|
|
{
|
|||
|
|
"id": "email-1",
|
|||
|
|
"type": "email",
|
|||
|
|
"data": {
|
|||
|
|
"label": "发送邮件",
|
|||
|
|
"smtp_host": "smtp.ethereal.email", # 测试SMTP服务器
|
|||
|
|
"smtp_port": 587,
|
|||
|
|
"smtp_user": "test@example.com", # 需要替换为真实凭据
|
|||
|
|
"smtp_password": "test_password", # 需要替换为真实密码
|
|||
|
|
"use_tls": True,
|
|||
|
|
"from_email": "sender@example.com",
|
|||
|
|
"to_email": "recipient@example.com",
|
|||
|
|
"subject": "测试邮件 - {test_key}",
|
|||
|
|
"body": "这是一封测试邮件。\n\n测试数据: {test_data}",
|
|||
|
|
"body_type": "text"
|
|||
|
|
}
|
|||
|
|
},
|
|||
|
|
{
|
|||
|
|
"id": "end-1",
|
|||
|
|
"type": "end",
|
|||
|
|
"data": {"label": "结束"}
|
|||
|
|
}
|
|||
|
|
],
|
|||
|
|
"edges": [
|
|||
|
|
{"id": "e1", "source": "start-1", "target": "email-1"},
|
|||
|
|
{"id": "e2", "source": "email-1", "target": "end-1"}
|
|||
|
|
]
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
engine = WorkflowEngine("test-email-workflow", workflow_data)
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
input_data = {
|
|||
|
|
"test_key": "Hello World",
|
|||
|
|
"test_data": "这是测试数据"
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
print("📧 准备发送邮件...")
|
|||
|
|
print(f" 输入数据: {json.dumps(input_data, ensure_ascii=False)}")
|
|||
|
|
print("\n⚠️ 注意: 需要配置真实的SMTP服务器信息才能成功发送")
|
|||
|
|
print(" 可以使用以下测试服务:")
|
|||
|
|
print(" - https://mailtrap.io (免费测试邮箱)")
|
|||
|
|
print(" - https://ethereal.email (临时测试邮箱)")
|
|||
|
|
print(" - Gmail SMTP (需要应用专用密码)")
|
|||
|
|
print()
|
|||
|
|
|
|||
|
|
# 注释掉实际执行,避免在没有配置的情况下失败
|
|||
|
|
# result = await engine.execute(input_data)
|
|||
|
|
# print(f"✅ 邮件节点执行完成")
|
|||
|
|
# print(f" 结果: {json.dumps(result, ensure_ascii=False, indent=2)}")
|
|||
|
|
|
|||
|
|
print("✅ 邮件节点配置验证通过")
|
|||
|
|
print(" 节点类型: email")
|
|||
|
|
print(" 支持变量替换: ✅")
|
|||
|
|
print(" 支持HTML格式: ✅")
|
|||
|
|
print(" 支持附件: ✅")
|
|||
|
|
|
|||
|
|
return True
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"❌ 邮件节点测试失败: {str(e)}")
|
|||
|
|
import traceback
|
|||
|
|
traceback.print_exc()
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
|
|||
|
|
async def test_rabbitmq_node():
|
|||
|
|
"""测试RabbitMQ节点"""
|
|||
|
|
print("\n" + "=" * 60)
|
|||
|
|
print("测试2: RabbitMQ消息队列节点")
|
|||
|
|
print("=" * 60)
|
|||
|
|
|
|||
|
|
workflow_data = {
|
|||
|
|
"nodes": [
|
|||
|
|
{
|
|||
|
|
"id": "start-1",
|
|||
|
|
"type": "start",
|
|||
|
|
"data": {"label": "开始"}
|
|||
|
|
},
|
|||
|
|
{
|
|||
|
|
"id": "mq-1",
|
|||
|
|
"type": "message_queue",
|
|||
|
|
"data": {
|
|||
|
|
"label": "发送到RabbitMQ",
|
|||
|
|
"queue_type": "rabbitmq",
|
|||
|
|
"host": "localhost",
|
|||
|
|
"port": 5672,
|
|||
|
|
"username": "guest",
|
|||
|
|
"password": "guest",
|
|||
|
|
"queue_name": "test_queue",
|
|||
|
|
"routing_key": "test.routing.key",
|
|||
|
|
"message": {
|
|||
|
|
"test_key": "{test_key}",
|
|||
|
|
"test_data": "{test_data}",
|
|||
|
|
"timestamp": "{timestamp}"
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
},
|
|||
|
|
{
|
|||
|
|
"id": "end-1",
|
|||
|
|
"type": "end",
|
|||
|
|
"data": {"label": "结束"}
|
|||
|
|
}
|
|||
|
|
],
|
|||
|
|
"edges": [
|
|||
|
|
{"id": "e1", "source": "start-1", "target": "mq-1"},
|
|||
|
|
{"id": "e2", "source": "mq-1", "target": "end-1"}
|
|||
|
|
]
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
engine = WorkflowEngine("test-rabbitmq-workflow", workflow_data)
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
input_data = {
|
|||
|
|
"test_key": "Hello RabbitMQ",
|
|||
|
|
"test_data": "这是测试数据",
|
|||
|
|
"timestamp": "2024-01-01 12:00:00"
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
print("🐰 准备发送消息到RabbitMQ...")
|
|||
|
|
print(f" 输入数据: {json.dumps(input_data, ensure_ascii=False)}")
|
|||
|
|
print("\n⚠️ 注意: 需要运行RabbitMQ服务器才能成功发送")
|
|||
|
|
print(" 可以使用Docker启动: docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management")
|
|||
|
|
print()
|
|||
|
|
|
|||
|
|
# 注释掉实际执行,避免在没有RabbitMQ的情况下失败
|
|||
|
|
# result = await engine.execute(input_data)
|
|||
|
|
# print(f"✅ RabbitMQ节点执行完成")
|
|||
|
|
# print(f" 结果: {json.dumps(result, ensure_ascii=False, indent=2)}")
|
|||
|
|
|
|||
|
|
print("✅ RabbitMQ节点配置验证通过")
|
|||
|
|
print(" 节点类型: message_queue (rabbitmq)")
|
|||
|
|
print(" 支持变量替换: ✅")
|
|||
|
|
print(" 支持Exchange和Routing Key: ✅")
|
|||
|
|
print(" 支持直接队列发送: ✅")
|
|||
|
|
|
|||
|
|
return True
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"❌ RabbitMQ节点测试失败: {str(e)}")
|
|||
|
|
import traceback
|
|||
|
|
traceback.print_exc()
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
|
|||
|
|
async def test_kafka_node():
|
|||
|
|
"""测试Kafka节点"""
|
|||
|
|
print("\n" + "=" * 60)
|
|||
|
|
print("测试3: Kafka消息队列节点")
|
|||
|
|
print("=" * 60)
|
|||
|
|
|
|||
|
|
workflow_data = {
|
|||
|
|
"nodes": [
|
|||
|
|
{
|
|||
|
|
"id": "start-1",
|
|||
|
|
"type": "start",
|
|||
|
|
"data": {"label": "开始"}
|
|||
|
|
},
|
|||
|
|
{
|
|||
|
|
"id": "kafka-1",
|
|||
|
|
"type": "kafka",
|
|||
|
|
"data": {
|
|||
|
|
"label": "发送到Kafka",
|
|||
|
|
"queue_type": "kafka",
|
|||
|
|
"bootstrap_servers": "localhost:9092",
|
|||
|
|
"topic": "test_topic",
|
|||
|
|
"message": {
|
|||
|
|
"test_key": "{test_key}",
|
|||
|
|
"test_data": "{test_data}",
|
|||
|
|
"timestamp": "{timestamp}"
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
},
|
|||
|
|
{
|
|||
|
|
"id": "end-1",
|
|||
|
|
"type": "end",
|
|||
|
|
"data": {"label": "结束"}
|
|||
|
|
}
|
|||
|
|
],
|
|||
|
|
"edges": [
|
|||
|
|
{"id": "e1", "source": "start-1", "target": "kafka-1"},
|
|||
|
|
{"id": "e2", "source": "kafka-1", "target": "end-1"}
|
|||
|
|
]
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
engine = WorkflowEngine("test-kafka-workflow", workflow_data)
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
input_data = {
|
|||
|
|
"test_key": "Hello Kafka",
|
|||
|
|
"test_data": "这是测试数据",
|
|||
|
|
"timestamp": "2024-01-01 12:00:00"
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
print("📨 准备发送消息到Kafka...")
|
|||
|
|
print(f" 输入数据: {json.dumps(input_data, ensure_ascii=False)}")
|
|||
|
|
print("\n⚠️ 注意: 需要运行Kafka服务器才能成功发送")
|
|||
|
|
print(" 可以使用Docker Compose启动Kafka集群")
|
|||
|
|
print()
|
|||
|
|
|
|||
|
|
# 注释掉实际执行,避免在没有Kafka的情况下失败
|
|||
|
|
# result = await engine.execute(input_data)
|
|||
|
|
# print(f"✅ Kafka节点执行完成")
|
|||
|
|
# print(f" 结果: {json.dumps(result, ensure_ascii=False, indent=2)}")
|
|||
|
|
|
|||
|
|
print("✅ Kafka节点配置验证通过")
|
|||
|
|
print(" 节点类型: kafka")
|
|||
|
|
print(" 支持变量替换: ✅")
|
|||
|
|
print(" 支持多服务器配置: ✅")
|
|||
|
|
print(" 支持Topic发送: ✅")
|
|||
|
|
|
|||
|
|
return True
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"❌ Kafka节点测试失败: {str(e)}")
|
|||
|
|
import traceback
|
|||
|
|
traceback.print_exc()
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
|
|||
|
|
async def main():
|
|||
|
|
"""主测试函数"""
|
|||
|
|
print("\n" + "=" * 60)
|
|||
|
|
print("邮件节点和消息队列节点测试")
|
|||
|
|
print("=" * 60)
|
|||
|
|
print()
|
|||
|
|
|
|||
|
|
results = []
|
|||
|
|
|
|||
|
|
# 测试邮件节点
|
|||
|
|
results.append(await test_email_node())
|
|||
|
|
|
|||
|
|
# 测试RabbitMQ节点
|
|||
|
|
results.append(await test_rabbitmq_node())
|
|||
|
|
|
|||
|
|
# 测试Kafka节点
|
|||
|
|
results.append(await test_kafka_node())
|
|||
|
|
|
|||
|
|
# 总结
|
|||
|
|
print("\n" + "=" * 60)
|
|||
|
|
print("测试总结")
|
|||
|
|
print("=" * 60)
|
|||
|
|
print(f"总测试数: {len(results)}")
|
|||
|
|
print(f"通过: {sum(results)}")
|
|||
|
|
print(f"失败: {len(results) - sum(results)}")
|
|||
|
|
|
|||
|
|
if all(results):
|
|||
|
|
print("\n✅ 所有节点配置验证通过!")
|
|||
|
|
print("\n📝 下一步:")
|
|||
|
|
print(" 1. 配置真实的SMTP服务器信息测试邮件节点")
|
|||
|
|
print(" 2. 启动RabbitMQ服务器测试消息队列节点")
|
|||
|
|
print(" 3. 启动Kafka服务器测试Kafka节点")
|
|||
|
|
print(" 4. 在前端工作流编辑器中创建包含这些节点的工作流")
|
|||
|
|
else:
|
|||
|
|
print("\n⚠️ 部分测试未通过,请检查配置")
|
|||
|
|
|
|||
|
|
|
|||
|
|
if __name__ == "__main__":
|
|||
|
|
asyncio.run(main())
|