""" 条件表达式解析器 支持更复杂的条件判断表达式 """ import re import json from typing import Dict, Any, Union class ConditionParser: """条件表达式解析器""" # 支持的运算符 OPERATORS = { '==': lambda a, b: a == b, '!=': lambda a, b: a != b, '>': lambda a, b: a > b, '>=': lambda a, b: a >= b, '<': lambda a, b: a < b, '<=': lambda a, b: a <= b, 'in': lambda a, b: a in b if isinstance(b, (list, str, dict)) else False, 'not in': lambda a, b: a not in b if isinstance(b, (list, str, dict)) else False, 'contains': lambda a, b: b in str(a) if a is not None else False, 'not contains': lambda a, b: b not in str(a) if a is not None else True, } # 逻辑运算符 LOGICAL_OPERATORS = { 'and': lambda a, b: a and b, 'or': lambda a, b: a or b, 'not': lambda a: not a, } @staticmethod def get_value(path: str, data: Dict[str, Any]) -> Any: """ 从数据中获取值(支持嵌套路径) Args: path: 路径,如 'user.name' 或 'items[0].price' data: 数据字典 Returns: 值,如果不存在返回None """ try: # 处理数组索引,如 items[0] if '[' in path and ']' in path: parts = re.split(r'\[|\]', path) value = data for part in parts: if not part: continue if part.isdigit(): value = value[int(part)] else: value = value.get(part) if isinstance(value, dict) else None if value is None: return None return value # 处理嵌套路径,如 user.name keys = path.split('.') value = data for key in keys: if isinstance(value, dict): value = value.get(key) elif isinstance(value, list) and key.isdigit(): value = value[int(key)] if int(key) < len(value) else None else: return None if value is None: return None return value except (KeyError, IndexError, TypeError, AttributeError): return None @staticmethod def parse_value(value_str: str) -> Any: """ 解析值字符串(支持字符串、数字、布尔值、JSON) Args: value_str: 值字符串 Returns: 解析后的值 """ value_str = value_str.strip() # 布尔值 if value_str.lower() == 'true': return True if value_str.lower() == 'false': return False # None if value_str.lower() == 'null' or value_str.lower() == 'none': return None # 数字 try: if '.' in value_str: return float(value_str) return int(value_str) except ValueError: pass # JSON if value_str.startswith('{') or value_str.startswith('['): try: return json.loads(value_str) except json.JSONDecodeError: pass # 字符串(移除引号) if (value_str.startswith('"') and value_str.endswith('"')) or \ (value_str.startswith("'") and value_str.endswith("'")): return value_str[1:-1] return value_str @staticmethod def evaluate_simple_condition(condition: str, data: Dict[str, Any]) -> bool: """ 评估简单条件表达式 支持的格式: - {key} == value - {key} > value - {key} in [value1, value2] - {key} contains "text" Args: condition: 条件表达式 data: 输入数据 Returns: 条件结果 """ condition = condition.strip() # 替换变量 {key} for key, value in data.items(): placeholder = f'{{{key}}}' if placeholder in condition: # 如果值是复杂类型,转换为JSON字符串 if isinstance(value, (dict, list)): condition = condition.replace(placeholder, json.dumps(value, ensure_ascii=False)) else: condition = condition.replace(placeholder, str(value)) # 尝试解析为Python表达式(安全方式) try: # 只允许安全的操作 safe_dict = { '__builtins__': {}, 'True': True, 'False': False, 'None': None, 'null': None, } # 添加数据中的值到安全字典 for key, value in data.items(): # 只添加简单的值,避免复杂对象 if isinstance(value, (str, int, float, bool, type(None))): safe_dict[key] = value # 尝试评估 result = eval(condition, safe_dict) if isinstance(result, bool): return result except: pass # 如果eval失败,尝试手动解析 # 匹配运算符 for op in ConditionParser.OPERATORS.keys(): if op in condition: parts = condition.split(op, 1) if len(parts) == 2: left = parts[0].strip() right = parts[1].strip() # 获取左侧值 if left.startswith('{') and left.endswith('}'): key = left[1:-1] left_value = ConditionParser.get_value(key, data) else: left_value = ConditionParser.parse_value(left) # 获取右侧值 right_value = ConditionParser.parse_value(right) # 执行比较 if left_value is not None: return ConditionParser.OPERATORS[op](left_value, right_value) # 默认返回False return False @staticmethod def evaluate_condition(condition: str, data: Dict[str, Any]) -> bool: """ 评估条件表达式(支持复杂表达式) 支持的格式: - 简单条件: {key} == value - 逻辑组合: {key} > 10 and {key} < 20 - 括号分组: ({key} == 'a' or {key} == 'b') and {other} > 0 Args: condition: 条件表达式 data: 输入数据 Returns: 条件结果 """ if not condition: return False condition = condition.strip() # 处理括号表达式(递归处理) def process_parentheses(expr: str) -> str: """处理括号表达式""" while '(' in expr and ')' in expr: # 找到最内层的括号 start = expr.rfind('(') end = expr.find(')', start) if end == -1: break # 提取括号内的表达式 inner_expr = expr[start+1:end] inner_result = ConditionParser.evaluate_condition(inner_expr, data) # 替换括号表达式为结果 expr = expr[:start] + str(inner_result) + expr[end+1:] return expr condition = process_parentheses(condition) # 分割逻辑运算符,按优先级处理 # 先处理 and(优先级更高) if ' and ' in condition.lower(): parts = re.split(r'\s+and\s+', condition, flags=re.IGNORECASE) results = [] for part in parts: part = part.strip() if part: results.append(ConditionParser.evaluate_simple_condition(part, data)) return all(results) # 再处理 or if ' or ' in condition.lower(): parts = re.split(r'\s+or\s+', condition, flags=re.IGNORECASE) results = [] for part in parts: part = part.strip() if part: results.append(ConditionParser.evaluate_simple_condition(part, data)) return any(results) # 处理 not if condition.lower().startswith('not '): inner = condition[4:].strip() return not ConditionParser.evaluate_simple_condition(inner, data) # 最终评估简单条件 return ConditionParser.evaluate_simple_condition(condition, data) # 全局实例 condition_parser = ConditionParser()