Files
aiagent/backend/test_data_transform.py

228 lines
5.6 KiB
Python
Raw Normal View History

2026-01-19 00:09:36 +08:00
"""
数据转换节点测试
"""
import asyncio
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from app.services.data_transformer import data_transformer
from app.services.workflow_engine import WorkflowEngine
def test_field_mapping():
"""测试字段映射"""
print("=" * 60)
print("测试1: 字段映射")
print("=" * 60)
input_data = {
"name": "张三",
"age": 25,
"email": "zhangsan@example.com"
}
mapping = {
"username": "name",
"user_age": "age",
"user_email": "email"
}
result = data_transformer.transform_mapping(input_data, mapping)
print(f"输入: {input_data}")
print(f"映射规则: {mapping}")
print(f"输出: {result}")
assert result["username"] == "张三"
assert result["user_age"] == 25
assert result["user_email"] == "zhangsan@example.com"
print("✅ 字段映射测试通过")
return True
def test_nested_mapping():
"""测试嵌套字段映射"""
print("\n" + "=" * 60)
print("测试2: 嵌套字段映射")
print("=" * 60)
input_data = {
"user": {
"name": "李四",
"profile": {
"age": 30
}
},
"items": [
{"id": 1, "price": 100},
{"id": 2, "price": 200}
]
}
mapping = {
"user_name": "user.name",
"user_age": "user.profile.age",
"first_item_price": "items[0].price"
}
result = data_transformer.transform_mapping(input_data, mapping)
print(f"输入: {input_data}")
print(f"映射规则: {mapping}")
print(f"输出: {result}")
assert result["user_name"] == "李四"
assert result["user_age"] == 30
assert result["first_item_price"] == 100
print("✅ 嵌套字段映射测试通过")
return True
def test_data_filter():
"""测试数据过滤"""
print("\n" + "=" * 60)
print("测试3: 数据过滤")
print("=" * 60)
input_data = {
"status": "active",
"count": 15,
"name": "测试"
}
filter_rules = [
{"field": "status", "operator": "==", "value": "active"},
{"field": "count", "operator": ">", "value": 10}
]
result = data_transformer.transform_filter(input_data, filter_rules)
print(f"输入: {input_data}")
print(f"过滤规则: {filter_rules}")
print(f"输出: {result}")
assert "status" in result
assert "count" in result
print("✅ 数据过滤测试通过")
return True
def test_data_compute():
"""测试数据计算"""
print("\n" + "=" * 60)
print("测试4: 数据计算")
print("=" * 60)
input_data = {
"price": 100,
"quantity": 3,
"discount": 0.1
}
compute_rules = {
"subtotal": "{price} * {quantity}",
"total": "({price} * {quantity}) * (1 - {discount})"
}
result = data_transformer.transform_compute(input_data, compute_rules)
print(f"输入: {input_data}")
print(f"计算规则: {compute_rules}")
print(f"输出: {result}")
assert result["subtotal"] == 300
assert result["total"] == 270.0
print("✅ 数据计算测试通过")
return True
async def test_workflow_transform_node():
"""测试工作流中的转换节点"""
print("\n" + "=" * 60)
print("测试5: 工作流中的转换节点")
print("=" * 60)
workflow_data = {
"nodes": [
{
"id": "start-1",
"type": "start",
"data": {"label": "开始"}
},
{
"id": "transform-1",
"type": "transform",
"data": {
"label": "数据转换",
"mode": "mapping",
"mapping": {
"new_name": "old_name",
"new_age": "old_age"
}
}
},
{
"id": "end-1",
"type": "end",
"data": {"label": "结束"}
}
],
"edges": [
{"id": "e1", "source": "start-1", "target": "transform-1"},
{"id": "e2", "source": "transform-1", "target": "end-1"}
]
}
input_data = {
"old_name": "王五",
"old_age": 28,
"other": "其他数据"
}
engine = WorkflowEngine("test-transform", workflow_data)
result = await engine.execute(input_data)
print(f"输入: {input_data}")
print(f"输出: {result.get('result')}")
assert result.get('result', {}).get('new_name') == "王五"
assert result.get('result', {}).get('new_age') == 28
print("✅ 工作流中的转换节点测试通过")
return True
async def main():
"""主测试函数"""
print("\n🚀 开始数据转换节点测试\n")
results = []
results.append(test_field_mapping())
results.append(test_nested_mapping())
results.append(test_data_filter())
results.append(test_data_compute())
results.append(await test_workflow_transform_node())
print("\n" + "=" * 60)
print("测试结果汇总")
print("=" * 60)
passed = sum(results)
total = len(results)
print(f"通过: {passed}/{total}")
print(f"失败: {total - passed}/{total}")
if passed == total:
print("\n✅ 所有测试通过!数据转换节点功能正常!")
else:
print(f"\n⚠️ 有 {total - passed} 个测试失败")
if __name__ == "__main__":
asyncio.run(main())