Files
aiagent/backend/app/services/condition_parser.py

277 lines
9.0 KiB
Python
Raw Normal View History

2026-01-19 00:09:36 +08:00
"""
条件表达式解析器
支持更复杂的条件判断表达式
"""
import re
import json
from typing import Dict, Any, Union
class ConditionParser:
"""条件表达式解析器"""
# 支持的运算符
OPERATORS = {
'==': lambda a, b: a == b,
'!=': lambda a, b: a != b,
'>': lambda a, b: a > b,
'>=': lambda a, b: a >= b,
'<': lambda a, b: a < b,
'<=': lambda a, b: a <= b,
'in': lambda a, b: a in b if isinstance(b, (list, str, dict)) else False,
'not in': lambda a, b: a not in b if isinstance(b, (list, str, dict)) else False,
'contains': lambda a, b: b in str(a) if a is not None else False,
'not contains': lambda a, b: b not in str(a) if a is not None else True,
}
# 逻辑运算符
LOGICAL_OPERATORS = {
'and': lambda a, b: a and b,
'or': lambda a, b: a or b,
'not': lambda a: not a,
}
@staticmethod
def get_value(path: str, data: Dict[str, Any]) -> Any:
"""
从数据中获取值支持嵌套路径
Args:
path: 路径 'user.name' 'items[0].price'
data: 数据字典
Returns:
如果不存在返回None
"""
try:
# 处理数组索引,如 items[0]
if '[' in path and ']' in path:
parts = re.split(r'\[|\]', path)
value = data
for part in parts:
if not part:
continue
if part.isdigit():
value = value[int(part)]
else:
value = value.get(part) if isinstance(value, dict) else None
if value is None:
return None
return value
# 处理嵌套路径,如 user.name
keys = path.split('.')
value = data
for key in keys:
if isinstance(value, dict):
value = value.get(key)
elif isinstance(value, list) and key.isdigit():
value = value[int(key)] if int(key) < len(value) else None
else:
return None
if value is None:
return None
return value
except (KeyError, IndexError, TypeError, AttributeError):
return None
@staticmethod
def parse_value(value_str: str) -> Any:
"""
解析值字符串支持字符串数字布尔值JSON
Args:
value_str: 值字符串
Returns:
解析后的值
"""
value_str = value_str.strip()
# 布尔值
if value_str.lower() == 'true':
return True
if value_str.lower() == 'false':
return False
# None
if value_str.lower() == 'null' or value_str.lower() == 'none':
return None
# 数字
try:
if '.' in value_str:
return float(value_str)
return int(value_str)
except ValueError:
pass
# JSON
if value_str.startswith('{') or value_str.startswith('['):
try:
return json.loads(value_str)
except json.JSONDecodeError:
pass
# 字符串(移除引号)
if (value_str.startswith('"') and value_str.endswith('"')) or \
(value_str.startswith("'") and value_str.endswith("'")):
return value_str[1:-1]
return value_str
@staticmethod
def evaluate_simple_condition(condition: str, data: Dict[str, Any]) -> bool:
"""
评估简单条件表达式
支持的格式
- {key} == value
- {key} > value
- {key} in [value1, value2]
- {key} contains "text"
Args:
condition: 条件表达式
data: 输入数据
Returns:
条件结果
"""
condition = condition.strip()
# 替换变量 {key}
for key, value in data.items():
placeholder = f'{{{key}}}'
if placeholder in condition:
# 如果值是复杂类型转换为JSON字符串
if isinstance(value, (dict, list)):
condition = condition.replace(placeholder, json.dumps(value, ensure_ascii=False))
else:
condition = condition.replace(placeholder, str(value))
# 尝试解析为Python表达式安全方式
try:
# 只允许安全的操作
safe_dict = {
'__builtins__': {},
'True': True,
'False': False,
'None': None,
'null': None,
}
# 添加数据中的值到安全字典
for key, value in data.items():
# 只添加简单的值,避免复杂对象
if isinstance(value, (str, int, float, bool, type(None))):
safe_dict[key] = value
# 尝试评估
result = eval(condition, safe_dict)
if isinstance(result, bool):
return result
except:
pass
# 如果eval失败尝试手动解析
# 匹配运算符
for op in ConditionParser.OPERATORS.keys():
if op in condition:
parts = condition.split(op, 1)
if len(parts) == 2:
left = parts[0].strip()
right = parts[1].strip()
# 获取左侧值
if left.startswith('{') and left.endswith('}'):
key = left[1:-1]
left_value = ConditionParser.get_value(key, data)
else:
left_value = ConditionParser.parse_value(left)
# 获取右侧值
right_value = ConditionParser.parse_value(right)
# 执行比较
if left_value is not None:
return ConditionParser.OPERATORS[op](left_value, right_value)
# 默认返回False
return False
@staticmethod
def evaluate_condition(condition: str, data: Dict[str, Any]) -> bool:
"""
评估条件表达式支持复杂表达式
支持的格式
- 简单条件: {key} == value
- 逻辑组合: {key} > 10 and {key} < 20
- 括号分组: ({key} == 'a' or {key} == 'b') and {other} > 0
Args:
condition: 条件表达式
data: 输入数据
Returns:
条件结果
"""
if not condition:
return False
condition = condition.strip()
# 处理括号表达式(递归处理)
def process_parentheses(expr: str) -> str:
"""处理括号表达式"""
while '(' in expr and ')' in expr:
# 找到最内层的括号
start = expr.rfind('(')
end = expr.find(')', start)
if end == -1:
break
# 提取括号内的表达式
inner_expr = expr[start+1:end]
inner_result = ConditionParser.evaluate_condition(inner_expr, data)
# 替换括号表达式为结果
expr = expr[:start] + str(inner_result) + expr[end+1:]
return expr
condition = process_parentheses(condition)
# 分割逻辑运算符,按优先级处理
# 先处理 and优先级更高
if ' and ' in condition.lower():
parts = re.split(r'\s+and\s+', condition, flags=re.IGNORECASE)
results = []
for part in parts:
part = part.strip()
if part:
results.append(ConditionParser.evaluate_simple_condition(part, data))
return all(results)
# 再处理 or
if ' or ' in condition.lower():
parts = re.split(r'\s+or\s+', condition, flags=re.IGNORECASE)
results = []
for part in parts:
part = part.strip()
if part:
results.append(ConditionParser.evaluate_simple_condition(part, data))
return any(results)
# 处理 not
if condition.lower().startswith('not '):
inner = condition[4:].strip()
return not ConditionParser.evaluate_simple_condition(inner, data)
# 最终评估简单条件
return ConditionParser.evaluate_simple_condition(condition, data)
# 全局实例
condition_parser = ConditionParser()