Files
aiagent/backend/app/services/condition_parser.py
2026-01-19 00:09:36 +08:00

277 lines
9.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
条件表达式解析器
支持更复杂的条件判断表达式
"""
import re
import json
from typing import Dict, Any, Union
class ConditionParser:
"""条件表达式解析器"""
# 支持的运算符
OPERATORS = {
'==': lambda a, b: a == b,
'!=': lambda a, b: a != b,
'>': lambda a, b: a > b,
'>=': lambda a, b: a >= b,
'<': lambda a, b: a < b,
'<=': lambda a, b: a <= b,
'in': lambda a, b: a in b if isinstance(b, (list, str, dict)) else False,
'not in': lambda a, b: a not in b if isinstance(b, (list, str, dict)) else False,
'contains': lambda a, b: b in str(a) if a is not None else False,
'not contains': lambda a, b: b not in str(a) if a is not None else True,
}
# 逻辑运算符
LOGICAL_OPERATORS = {
'and': lambda a, b: a and b,
'or': lambda a, b: a or b,
'not': lambda a: not a,
}
@staticmethod
def get_value(path: str, data: Dict[str, Any]) -> Any:
"""
从数据中获取值(支持嵌套路径)
Args:
path: 路径,如 'user.name''items[0].price'
data: 数据字典
Returns:
如果不存在返回None
"""
try:
# 处理数组索引,如 items[0]
if '[' in path and ']' in path:
parts = re.split(r'\[|\]', path)
value = data
for part in parts:
if not part:
continue
if part.isdigit():
value = value[int(part)]
else:
value = value.get(part) if isinstance(value, dict) else None
if value is None:
return None
return value
# 处理嵌套路径,如 user.name
keys = path.split('.')
value = data
for key in keys:
if isinstance(value, dict):
value = value.get(key)
elif isinstance(value, list) and key.isdigit():
value = value[int(key)] if int(key) < len(value) else None
else:
return None
if value is None:
return None
return value
except (KeyError, IndexError, TypeError, AttributeError):
return None
@staticmethod
def parse_value(value_str: str) -> Any:
"""
解析值字符串支持字符串、数字、布尔值、JSON
Args:
value_str: 值字符串
Returns:
解析后的值
"""
value_str = value_str.strip()
# 布尔值
if value_str.lower() == 'true':
return True
if value_str.lower() == 'false':
return False
# None
if value_str.lower() == 'null' or value_str.lower() == 'none':
return None
# 数字
try:
if '.' in value_str:
return float(value_str)
return int(value_str)
except ValueError:
pass
# JSON
if value_str.startswith('{') or value_str.startswith('['):
try:
return json.loads(value_str)
except json.JSONDecodeError:
pass
# 字符串(移除引号)
if (value_str.startswith('"') and value_str.endswith('"')) or \
(value_str.startswith("'") and value_str.endswith("'")):
return value_str[1:-1]
return value_str
@staticmethod
def evaluate_simple_condition(condition: str, data: Dict[str, Any]) -> bool:
"""
评估简单条件表达式
支持的格式:
- {key} == value
- {key} > value
- {key} in [value1, value2]
- {key} contains "text"
Args:
condition: 条件表达式
data: 输入数据
Returns:
条件结果
"""
condition = condition.strip()
# 替换变量 {key}
for key, value in data.items():
placeholder = f'{{{key}}}'
if placeholder in condition:
# 如果值是复杂类型转换为JSON字符串
if isinstance(value, (dict, list)):
condition = condition.replace(placeholder, json.dumps(value, ensure_ascii=False))
else:
condition = condition.replace(placeholder, str(value))
# 尝试解析为Python表达式安全方式
try:
# 只允许安全的操作
safe_dict = {
'__builtins__': {},
'True': True,
'False': False,
'None': None,
'null': None,
}
# 添加数据中的值到安全字典
for key, value in data.items():
# 只添加简单的值,避免复杂对象
if isinstance(value, (str, int, float, bool, type(None))):
safe_dict[key] = value
# 尝试评估
result = eval(condition, safe_dict)
if isinstance(result, bool):
return result
except:
pass
# 如果eval失败尝试手动解析
# 匹配运算符
for op in ConditionParser.OPERATORS.keys():
if op in condition:
parts = condition.split(op, 1)
if len(parts) == 2:
left = parts[0].strip()
right = parts[1].strip()
# 获取左侧值
if left.startswith('{') and left.endswith('}'):
key = left[1:-1]
left_value = ConditionParser.get_value(key, data)
else:
left_value = ConditionParser.parse_value(left)
# 获取右侧值
right_value = ConditionParser.parse_value(right)
# 执行比较
if left_value is not None:
return ConditionParser.OPERATORS[op](left_value, right_value)
# 默认返回False
return False
@staticmethod
def evaluate_condition(condition: str, data: Dict[str, Any]) -> bool:
"""
评估条件表达式(支持复杂表达式)
支持的格式:
- 简单条件: {key} == value
- 逻辑组合: {key} > 10 and {key} < 20
- 括号分组: ({key} == 'a' or {key} == 'b') and {other} > 0
Args:
condition: 条件表达式
data: 输入数据
Returns:
条件结果
"""
if not condition:
return False
condition = condition.strip()
# 处理括号表达式(递归处理)
def process_parentheses(expr: str) -> str:
"""处理括号表达式"""
while '(' in expr and ')' in expr:
# 找到最内层的括号
start = expr.rfind('(')
end = expr.find(')', start)
if end == -1:
break
# 提取括号内的表达式
inner_expr = expr[start+1:end]
inner_result = ConditionParser.evaluate_condition(inner_expr, data)
# 替换括号表达式为结果
expr = expr[:start] + str(inner_result) + expr[end+1:]
return expr
condition = process_parentheses(condition)
# 分割逻辑运算符,按优先级处理
# 先处理 and优先级更高
if ' and ' in condition.lower():
parts = re.split(r'\s+and\s+', condition, flags=re.IGNORECASE)
results = []
for part in parts:
part = part.strip()
if part:
results.append(ConditionParser.evaluate_simple_condition(part, data))
return all(results)
# 再处理 or
if ' or ' in condition.lower():
parts = re.split(r'\s+or\s+', condition, flags=re.IGNORECASE)
results = []
for part in parts:
part = part.strip()
if part:
results.append(ConditionParser.evaluate_simple_condition(part, data))
return any(results)
# 处理 not
if condition.lower().startswith('not '):
inner = condition[4:].strip()
return not ConditionParser.evaluate_simple_condition(inner, data)
# 最终评估简单条件
return ConditionParser.evaluate_simple_condition(condition, data)
# 全局实例
condition_parser = ConditionParser()