""" 条件节点表达式解析测试 """ import asyncio import sys import os sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from app.services.condition_parser import condition_parser from app.services.workflow_engine import WorkflowEngine def test_simple_conditions(): """测试简单条件""" print("=" * 60) print("测试1: 简单条件表达式") print("=" * 60) test_cases = [ ("{value} > 10", {"value": 15}, True), ("{value} > 10", {"value": 5}, False), ("{value} == 10", {"value": 10}, True), ("{value} != 10", {"value": 10}, False), ("{status} == 'active'", {"status": "active"}, True), ("{status} == 'active'", {"status": "inactive"}, False), ("{count} >= 0", {"count": 0}, True), ("{count} < 100", {"count": 50}, True), ] passed = 0 for condition, data, expected in test_cases: result = condition_parser.evaluate_condition(condition, data) status = "✅" if result == expected else "❌" print(f"{status} {condition} with {data} = {result} (expected {expected})") if result == expected: passed += 1 print(f"\n通过: {passed}/{len(test_cases)}") return passed == len(test_cases) def test_logical_conditions(): """测试逻辑组合条件""" print("\n" + "=" * 60) print("测试2: 逻辑组合条件") print("=" * 60) test_cases = [ ("{value} > 10 and {value} < 20", {"value": 15}, True), ("{value} > 10 and {value} < 20", {"value": 5}, False), ("{value} > 10 and {value} < 20", {"value": 25}, False), ("{status} == 'active' or {status} == 'pending'", {"status": "active"}, True), ("{status} == 'active' or {status} == 'pending'", {"status": "pending"}, True), ("{status} == 'active' or {status} == 'pending'", {"status": "inactive"}, False), ] passed = 0 for condition, data, expected in test_cases: result = condition_parser.evaluate_condition(condition, data) status = "✅" if result == expected else "❌" print(f"{status} {condition} with {data} = {result} (expected {expected})") if result == expected: passed += 1 print(f"\n通过: {passed}/{len(test_cases)}") return passed == len(test_cases) def test_complex_conditions(): """测试复杂条件""" print("\n" + "=" * 60) print("测试3: 复杂条件表达式") print("=" * 60) test_cases = [ ("({value} > 10 and {value} < 20) and {status} == 'active'", {"value": 15, "status": "active"}, True), ("({value} > 10 and {value} < 20) and {status} == 'active'", {"value": 15, "status": "inactive"}, False), ("({status} == 'a' or {status} == 'b') and {count} > 0", {"status": "a", "count": 5}, True), ("({status} == 'a' or {status} == 'b') and {count} > 0", {"status": "c", "count": 5}, False), ] passed = 0 for condition, data, expected in test_cases: result = condition_parser.evaluate_condition(condition, data) status = "✅" if result == expected else "❌" print(f"{status} {condition}") print(f" 数据: {data}") print(f" 结果: {result} (期望: {expected})") if result == expected: passed += 1 print(f"\n通过: {passed}/{len(test_cases)}") return passed == len(test_cases) async def test_workflow_with_condition(): """测试工作流中的条件节点""" print("\n" + "=" * 60) print("测试4: 工作流中的条件节点") print("=" * 60) # 创建工作流:开始 → 条件节点 → [True分支] → 输出1 # → [False分支] → 输出2 workflow_data = { "nodes": [ { "id": "start-1", "type": "start", "data": {"label": "开始"} }, { "id": "condition-1", "type": "condition", "data": { "label": "条件判断", "condition": "{value} > 10" } }, { "id": "output-true", "type": "output", "data": {"label": "True分支输出"} }, { "id": "output-false", "type": "output", "data": {"label": "False分支输出"} } ], "edges": [ {"id": "e1", "source": "start-1", "target": "condition-1"}, {"id": "e2", "source": "condition-1", "target": "output-true", "sourceHandle": "true"}, {"id": "e3", "source": "condition-1", "target": "output-false", "sourceHandle": "false"} ] } # 测试1: value > 10 (应该走True分支) print("\n测试用例1: value = 15 (应该走True分支)") engine1 = WorkflowEngine("test-1", workflow_data) result1 = await engine1.execute({"value": 15}) print(f"结果: {result1.get('result')}") print(f"节点结果: {list(result1.get('node_results', {}).keys())}") # 测试2: value <= 10 (应该走False分支) print("\n测试用例2: value = 5 (应该走False分支)") engine2 = WorkflowEngine("test-2", workflow_data) result2 = await engine2.execute({"value": 5}) print(f"结果: {result2.get('result')}") print(f"节点结果: {list(result2.get('node_results', {}).keys())}") return True async def main(): """主测试函数""" print("\n🚀 开始条件节点表达式解析测试\n") results = [] results.append(test_simple_conditions()) results.append(test_logical_conditions()) results.append(test_complex_conditions()) results.append(await test_workflow_with_condition()) print("\n" + "=" * 60) print("测试结果汇总") print("=" * 60) passed = sum(results) total = len(results) print(f"通过: {passed}/{total}") print(f"失败: {total - passed}/{total}") if passed == total: print("\n✅ 所有测试通过!条件节点表达式解析功能正常!") else: print(f"\n⚠️ 有 {total - passed} 个测试失败") if __name__ == "__main__": asyncio.run(main())