""" 内置工具实现 """ from typing import Dict, Any, Optional, List, Tuple from pathlib import Path import httpx import json import os import logging import re import math from datetime import datetime, timedelta import platform import sys import asyncio import subprocess import shlex from sqlalchemy import text from sqlalchemy.exc import SQLAlchemyError from app.core.config import settings logger = logging.getLogger(__name__) def _local_file_workspace_root() -> Path: """file_read/file_write 允许操作的根目录(解析后路径)。""" raw = (getattr(settings, "LOCAL_FILE_TOOLS_ROOT", None) or "").strip() if raw: return Path(raw).expanduser().resolve() # backend/app/services/builtin_tools.py -> 上级 x4 = 仓库根(与历史上 join(services_dir, '../../..') 一致) return Path(__file__).resolve().parent.parent.parent.parent def _sanitize_tool_path_string(file_path: str) -> str: """ 去掉 LLM/DSML/Markdown 泄漏到路径末尾的非法字符(如 123.md<|、引号),避免 WinError 123。 """ if not isinstance(file_path, str): return str(file_path) s = file_path.replace("\ufeff", "").strip() s = s.strip('"').strip("'") # 行尾非法:Windows 文件名不能含 <>:"|?*;模型常把半角/全角尖括号粘在扩展名后 tail_bad = '<><>"|?*:"|*' while s and s[-1] in tail_bad: s = s[:-1].rstrip() # 若仍含未配对尖括号片段在行尾(如 xxx.md<) s = re.sub(r'[<<][||]?\s*$', "", s).rstrip() return s def _resolve_path_under_workspace(file_path: str) -> Tuple[Optional[Path], Optional[str]]: """将用户给定路径解析为绝对路径,且必须位于工作区内。""" file_path = _sanitize_tool_path_string(file_path) root = _local_file_workspace_root() try: p = Path(file_path).expanduser() if not p.is_absolute(): p = (root / p).resolve() else: p = p.resolve() except (OSError, ValueError) as e: return None, str(e) try: p.relative_to(root) except ValueError: return None, f"不允许访问工作区外路径,允许根目录: {root}" return p, None async def http_request_tool( url: str, method: str = "GET", headers: Optional[Dict[str, str]] = None, body: Any = None ) -> str: """ HTTP请求工具 Args: url: 请求URL method: HTTP方法 (GET, POST, PUT, DELETE) headers: 请求头 body: 请求体(POST/PUT时使用) Returns: JSON格式的响应结果 """ try: async with httpx.AsyncClient(timeout=30.0) as client: method_upper = method.upper() if method_upper == "GET": response = await client.get(url, headers=headers) elif method_upper == "POST": response = await client.post(url, json=body, headers=headers) elif method_upper == "PUT": response = await client.put(url, json=body, headers=headers) elif method_upper == "DELETE": response = await client.delete(url, headers=headers) else: raise ValueError(f"不支持的HTTP方法: {method}") # 尝试解析JSON响应 try: response_body = response.json() except: response_body = response.text result = { "status_code": response.status_code, "headers": dict(response.headers), "body": response_body } return json.dumps(result, ensure_ascii=False) except Exception as e: logger.error(f"HTTP请求工具执行失败: {str(e)}") return json.dumps({"error": str(e)}, ensure_ascii=False) _FILE_READ_TEXT_SUFFIXES = frozenset( { ".txt", ".md", ".markdown", ".csv", ".tsv", ".json", ".jsonl", ".xml", ".html", ".htm", ".css", ".js", ".mjs", ".cjs", ".ts", ".tsx", ".jsx", ".vue", ".py", ".java", ".go", ".rs", ".sql", ".sh", ".bat", ".ps1", ".yaml", ".yml", ".toml", ".ini", ".cfg", ".log", ".rst", ".tex", ".gitignore", ".env", ".properties", } ) _FILE_READ_IMAGE_SUFFIXES = frozenset( {".png", ".jpg", ".jpeg", ".webp", ".gif", ".bmp", ".tif", ".tiff"} ) def _truncate_utf8_text(s: str, max_bytes: int) -> str: raw = s.encode("utf-8") if len(raw) <= max_bytes: return s cut = raw[:max_bytes].decode("utf-8", errors="ignore") return cut + "\n\n...[内容已按字节上限截断]" def _read_pdf_text_sync(path: Path, max_bytes: int) -> str: from pypdf import PdfReader reader = PdfReader(str(path)) parts: List[str] = [] for page in reader.pages: t = page.extract_text() or "" if t.strip(): parts.append(t) body = "\n\n".join(parts) if parts else "" if not body.strip(): return "[PDF 未解析出文本,可能是扫描件或图片型 PDF;可将页导出为图片后上传,或安装 OCR 流程]" return _truncate_utf8_text(body, max_bytes) def _read_docx_text_sync(path: Path, max_bytes: int) -> str: import docx doc = docx.Document(str(path)) lines = [p.text for p in doc.paragraphs if p.text and p.text.strip()] body = "\n".join(lines) return _truncate_utf8_text(body or "[docx 中未找到段落文本]", max_bytes) def _read_xlsx_text_sync(path: Path, max_bytes: int) -> str: from openpyxl import load_workbook wb = load_workbook(filename=str(path), read_only=True, data_only=True) try: lines: List[str] = [] for ws in wb.worksheets: lines.append(f"## {ws.title}") for row in ws.iter_rows(values_only=True): cells = ["" if c is None else str(c) for c in row] if any(x.strip() for x in cells): lines.append("\t".join(cells)) body = "\n".join(lines) finally: wb.close() return _truncate_utf8_text(body or "[xlsx 中未读到单元格文本]", max_bytes) def _tessdata_dir_for_ocr() -> Optional[Path]: """返回 tessdata 目录(内含 .traineddata),传给 Tesseract --tessdata-dir。""" raw = (getattr(settings, "TESSERACT_TESSDATA_DIR", None) or "").strip() if raw: p = Path(raw).expanduser().resolve() return p if p.is_dir() else None root = _local_file_workspace_root() loc = root / "tessdata" if loc.is_dir() and any(loc.glob("*.traineddata")): return loc.resolve() return None def _read_image_ocr_sync(path: Path, max_bytes: int) -> str: from PIL import Image import pytesseract cmd = (getattr(settings, "TESSERACT_CMD", None) or "").strip() if cmd: pytesseract.pytesseract.tesseract_cmd = cmd tess_cfg = "" td = _tessdata_dir_for_ocr() if td is not None: tess_cfg = f"--tessdata-dir {shlex.quote(str(td))}" img = Image.open(path) if img.mode not in ("RGB", "L"): img = img.convert("RGB") text = "" last_err: Optional[Exception] = None for lang in ("chi_sim+eng", "eng"): try: chunk = pytesseract.image_to_string(img, lang=lang, config=tess_cfg) or "" if chunk.strip(): text = chunk break except Exception as e: last_err = e continue if not text.strip() and last_err: raise last_err if not text.strip(): return "[图片中未识别到文字;可尝试更清晰、正向拍摄的作业照片]" return _truncate_utf8_text(text, max_bytes) async def file_read_tool(file_path: str) -> str: """ 文件读取工具:UTF-8 文本、PDF 文本、docx、xlsx 单元格文本、常见图片 OCR(需本机 Tesseract)。 Args: file_path: 文件路径(相对工作区根或工作区内绝对路径) Returns: JSON:file_path, content, size;或 error """ try: max_bytes = int(getattr(settings, "LOCAL_FILE_READ_MAX_BYTES", 2_097_152) or 2_097_152) path, err = _resolve_path_under_workspace(file_path) if err: return json.dumps({"error": err}, ensure_ascii=False) if not path.is_file(): return json.dumps({"error": f"文件不存在或不是普通文件: {path}"}, ensure_ascii=False) fsize = path.stat().st_size if fsize > max_bytes: return json.dumps( {"error": f"文件过大({fsize} 字节),上限 {max_bytes}"}, ensure_ascii=False, ) suffix = path.suffix.lower() if suffix == ".doc": return json.dumps( { "error": "暂不支持旧版 .doc,请在 Word 中另存为 .docx 后上传", "file_path": str(path), }, ensure_ascii=False, ) content: str extract_mode = "text" if suffix in _FILE_READ_IMAGE_SUFFIXES: extract_mode = "image_ocr" try: content = await asyncio.to_thread(_read_image_ocr_sync, path, max_bytes) except ImportError as e: return json.dumps( { "error": f"图片识别依赖未安装: {e}。请在后端环境执行 pip install Pillow pytesseract", "file_path": str(path), }, ensure_ascii=False, ) except Exception as e: err_low = str(e).lower() if "tesseract" in err_low or "tesseractnotfound" in type(e).__name__.lower(): return json.dumps( { "error": ( "未找到 Tesseract OCR。请安装 Tesseract(Windows 可装官方安装包)," "并在 .env 中配置 TESSERACT_CMD 指向 tesseract.exe,或将其加入 PATH;" "中文识别需额外下载 chi_sim 语言包。" ), "file_path": str(path), }, ensure_ascii=False, ) logger.warning("图片 OCR 失败: %s", e) return json.dumps({"error": f"图片识别失败: {e}", "file_path": str(path)}, ensure_ascii=False) elif suffix == ".pdf": extract_mode = "pdf" try: content = await asyncio.to_thread(_read_pdf_text_sync, path, max_bytes) except ImportError as e: return json.dumps( {"error": f"PDF 解析依赖未安装: {e}。请 pip install pypdf", "file_path": str(path)}, ensure_ascii=False, ) except Exception as e: logger.warning("PDF 解析失败: %s", e) return json.dumps({"error": f"PDF 解析失败: {e}", "file_path": str(path)}, ensure_ascii=False) elif suffix == ".docx": extract_mode = "docx" try: content = await asyncio.to_thread(_read_docx_text_sync, path, max_bytes) except ImportError as e: return json.dumps( {"error": f"Word 解析依赖未安装: {e}。请 pip install python-docx", "file_path": str(path)}, ensure_ascii=False, ) except Exception as e: logger.warning("docx 解析失败: %s", e) return json.dumps({"error": f"docx 解析失败: {e}", "file_path": str(path)}, ensure_ascii=False) elif suffix == ".xlsx": extract_mode = "xlsx" try: content = await asyncio.to_thread(_read_xlsx_text_sync, path, max_bytes) except ImportError as e: return json.dumps( {"error": f"Excel 解析依赖未安装: {e}。请 pip install openpyxl", "file_path": str(path)}, ensure_ascii=False, ) except Exception as e: logger.warning("xlsx 解析失败: %s", e) return json.dumps({"error": f"xlsx 解析失败: {e}", "file_path": str(path)}, ensure_ascii=False) elif suffix in _FILE_READ_TEXT_SUFFIXES or suffix == "": extract_mode = "text" content = path.read_text(encoding="utf-8", errors="replace") content = _truncate_utf8_text(content, max_bytes) else: # 未知扩展名:仍尝试按 UTF-8 文本读(兼容无后缀文本) extract_mode = "text" content = path.read_text(encoding="utf-8", errors="replace") content = _truncate_utf8_text(content, max_bytes) out = { "file_path": str(path), "content": content, "size": len(content.encode("utf-8")), "extract_mode": extract_mode, } logger.info("file_read %s mode=%s content_len=%s", path, extract_mode, out["size"]) return json.dumps(out, ensure_ascii=False) except FileNotFoundError: return json.dumps({"error": f"文件不存在: {file_path}"}, ensure_ascii=False) except Exception as e: logger.error(f"文件读取工具执行失败: {str(e)}") return json.dumps({"error": str(e)}, ensure_ascii=False) async def file_write_tool(file_path: str, content: str, mode: str = "w") -> str: """ 文件写入工具 Args: file_path: 文件路径 content: 要写入的内容 mode: 写入模式(w=覆盖,a=追加) Returns: 写入结果 """ try: max_bytes = int(getattr(settings, "LOCAL_FILE_WRITE_MAX_BYTES", 2_097_152) or 2_097_152) raw = content if isinstance(content, str) else str(content) enc_len = len(raw.encode("utf-8")) if enc_len > max_bytes: return json.dumps( {"error": f"写入内容过大({enc_len} 字节 UTF-8),上限 {max_bytes}"}, ensure_ascii=False, ) path, err = _resolve_path_under_workspace(file_path) if err: return json.dumps({"error": err}, ensure_ascii=False) path.parent.mkdir(parents=True, exist_ok=True) write_mode = "a" if mode == "a" else "w" if write_mode == "a": with path.open("a", encoding="utf-8") as f: f.write(raw) else: path.write_text(raw, encoding="utf-8") return json.dumps( { "success": True, "file_path": str(path), "mode": write_mode, "content_length": enc_len, }, ensure_ascii=False, ) except Exception as e: logger.error(f"文件写入工具执行失败: {str(e)}") return json.dumps({"error": str(e)}, ensure_ascii=False) async def text_analyze_tool(text: str, operation: str = "count") -> str: """ 文本分析工具 Args: text: 要分析的文本 operation: 操作类型(count=统计, keywords=提取关键词, summary=摘要) Returns: 分析结果 """ try: if operation == "count": # 统计字数、字符数、行数等 char_count = len(text) char_count_no_spaces = len(text.replace(" ", "")) word_count = len(text.split()) line_count = len(text.splitlines()) paragraph_count = len([p for p in text.split("\n\n") if p.strip()]) return json.dumps({ "char_count": char_count, "char_count_no_spaces": char_count_no_spaces, "word_count": word_count, "line_count": line_count, "paragraph_count": paragraph_count }, ensure_ascii=False) elif operation == "keywords": # 简单的关键词提取(基于词频) words = re.findall(r'\b\w+\b', text.lower()) word_freq = {} for word in words: if len(word) > 2: # 忽略太短的词 word_freq[word] = word_freq.get(word, 0) + 1 # 取频率最高的10个词 top_keywords = sorted(word_freq.items(), key=lambda x: x[1], reverse=True)[:10] return json.dumps({ "keywords": [{"word": k, "frequency": v} for k, v in top_keywords] }, ensure_ascii=False) elif operation == "summary": # 简单的摘要(取前3句) sentences = re.split(r'[.!?。!?]\s+', text) summary = ". ".join(sentences[:3]) + "." return json.dumps({ "summary": summary, "original_length": len(text), "summary_length": len(summary) }, ensure_ascii=False) else: return json.dumps({ "error": f"不支持的操作类型: {operation}" }, ensure_ascii=False) except Exception as e: logger.error(f"文本分析工具执行失败: {str(e)}") return json.dumps({ "error": str(e) }, ensure_ascii=False) async def datetime_tool(operation: str = "now", format: str = "%Y-%m-%d %H:%M:%S", timezone: Optional[str] = None) -> str: """ 日期时间工具 Args: operation: 操作类型(now=当前时间, format=格式化, parse=解析) format: 时间格式 timezone: 时区(暂未实现) Returns: 日期时间结果 """ try: if operation == "now": now = datetime.now() return json.dumps({ "datetime": now.strftime(format), "timestamp": now.timestamp(), "iso_format": now.isoformat() }, ensure_ascii=False) elif operation == "format": now = datetime.now() return json.dumps({ "formatted": now.strftime(format) }, ensure_ascii=False) else: return json.dumps({ "error": f"不支持的操作类型: {operation}" }, ensure_ascii=False) except Exception as e: logger.error(f"日期时间工具执行失败: {str(e)}") return json.dumps({ "error": str(e) }, ensure_ascii=False) async def math_calculate_tool(expression: str) -> str: """ 数学计算工具(安全限制:只允许基本数学运算) Args: expression: 数学表达式(如 "2+2", "sqrt(16)", "sin(0)") Returns: 计算结果 """ try: # 安全检查:只允许数字、基本运算符和安全的数学函数 allowed_chars = set("0123456789+-*/.() ") allowed_functions = ['sqrt', 'sin', 'cos', 'tan', 'log', 'exp', 'abs', 'pow'] # 检查表达式是否安全 if not all(c in allowed_chars or any(f in expression for f in allowed_functions) for c in expression): return json.dumps({ "error": "表达式包含不安全的字符" }, ensure_ascii=False) # 构建安全的数学环境 safe_dict = { "__builtins__": {}, "abs": abs, "sqrt": math.sqrt, "sin": math.sin, "cos": math.cos, "tan": math.tan, "log": math.log, "exp": math.exp, "pow": pow, "pi": math.pi, "e": math.e } result = eval(expression, safe_dict) return json.dumps({ "expression": expression, "result": result, "result_type": type(result).__name__ }, ensure_ascii=False) except Exception as e: logger.error(f"数学计算工具执行失败: {str(e)}") return json.dumps({ "error": str(e) }, ensure_ascii=False) async def system_info_tool() -> str: """ 系统信息工具 Returns: 系统信息 """ try: info = { "platform": platform.system(), "platform_version": platform.version(), "architecture": platform.machine(), "processor": platform.processor(), "python_version": sys.version, "python_version_info": { "major": sys.version_info.major, "minor": sys.version_info.minor, "micro": sys.version_info.micro, }, # file_read/file_write 使用的允许根目录(与 LOCAL_FILE_TOOLS_ROOT 一致) "local_file_workspace_root": str(_local_file_workspace_root()), } return json.dumps(info, ensure_ascii=False) except Exception as e: logger.error(f"系统信息工具执行失败: {str(e)}") return json.dumps({ "error": str(e) }, ensure_ascii=False) async def json_process_tool(json_string: str, operation: str = "parse") -> str: """ JSON处理工具 Args: json_string: JSON字符串 operation: 操作类型(parse=解析, stringify=序列化, validate=验证) Returns: 处理结果 """ try: if operation == "parse": data = json.loads(json_string) return json.dumps({ "parsed": data, "type": type(data).__name__ }, ensure_ascii=False) elif operation == "stringify": # 如果输入已经是字符串,尝试解析后再序列化 try: data = json.loads(json_string) except: data = json_string return json.dumps({ "stringified": json.dumps(data, ensure_ascii=False, indent=2) }, ensure_ascii=False) elif operation == "validate": try: json.loads(json_string) return json.dumps({ "valid": True }, ensure_ascii=False) except json.JSONDecodeError as e: return json.dumps({ "valid": False, "error": str(e) }, ensure_ascii=False) else: return json.dumps({ "error": f"不支持的操作类型: {operation}" }, ensure_ascii=False) except Exception as e: logger.error(f"JSON处理工具执行失败: {str(e)}") return json.dumps({ "error": str(e) }, ensure_ascii=False) def _validate_sql_query(sql: str) -> Tuple[bool, Optional[str]]: """ 验证SQL查询的安全性 Args: sql: SQL查询语句 Returns: (是否安全, 错误信息) """ # 去除注释和多余空白 sql_clean = re.sub(r'--.*?\n', '', sql) sql_clean = re.sub(r'/\*.*?\*/', '', sql_clean, flags=re.DOTALL) sql_clean = ' '.join(sql_clean.split()) # 转换为小写以便检查 sql_lower = sql_clean.lower().strip() # 只允许SELECT查询 if not sql_lower.startswith('select'): return False, "只允许SELECT查询,不允许INSERT、UPDATE、DELETE、DROP等操作" # 检查危险关键字 dangerous_keywords = [ 'insert', 'update', 'delete', 'drop', 'truncate', 'alter', 'create', 'grant', 'revoke', 'exec', 'execute', 'call', 'commit', 'rollback', 'savepoint' ] for keyword in dangerous_keywords: # 使用单词边界匹配,避免误判(如"select"中包含"select") pattern = r'\b' + keyword + r'\b' if re.search(pattern, sql_lower): return False, f"检测到危险关键字: {keyword.upper()},查询被拒绝" # 检查是否有多个SQL语句(防止SQL注入) if ';' in sql_clean and sql_clean.count(';') > 1: return False, "不允许执行多个SQL语句" return True, None async def _execute_default_db_query(sql: str, timeout: int = 30) -> List[Dict[str, Any]]: """ 执行默认数据库查询 Args: sql: SQL查询语句 timeout: 查询超时时间(秒) Returns: 查询结果列表 """ from app.core.database import engine try: # 使用asyncio实现超时控制 loop = asyncio.get_event_loop() def _execute(): with engine.connect() as connection: result = connection.execute(text(sql)) # 获取列名 columns = result.keys() # 获取所有行 rows = result.fetchall() # 转换为字典列表 return [dict(zip(columns, row)) for row in rows] # 执行查询,带超时控制 result = await asyncio.wait_for( asyncio.to_thread(_execute), timeout=timeout ) return result except asyncio.TimeoutError: raise Exception(f"查询超时(超过{timeout}秒)") except SQLAlchemyError as e: raise Exception(f"数据库查询失败: {str(e)}") except Exception as e: raise Exception(f"执行查询时发生错误: {str(e)}") async def _execute_data_source_query(data_source_id: str, sql: str, timeout: int = 30) -> List[Dict[str, Any]]: """ 通过数据源ID执行查询 Args: data_source_id: 数据源ID sql: SQL查询语句 timeout: 查询超时时间(秒) Returns: 查询结果列表 """ from app.core.database import SessionLocal from app.models.data_source import DataSource from app.services.data_source_connector import create_connector db = SessionLocal() try: # 获取数据源配置 data_source = db.query(DataSource).filter(DataSource.id == data_source_id).first() if not data_source: raise Exception(f"数据源不存在: {data_source_id}") if data_source.status != 'active': raise Exception(f"数据源状态异常: {data_source.status}") # 创建连接器 connector = create_connector(data_source.type, data_source.config) # 执行查询(带超时控制) query_params = {'sql': sql} # 使用asyncio实现超时控制 def _execute(): return connector.query(query_params) result = await asyncio.wait_for( asyncio.to_thread(_execute), timeout=timeout ) # 确保结果是列表格式 if not isinstance(result, list): result = [result] if result else [] return result except asyncio.TimeoutError: raise Exception(f"查询超时(超过{timeout}秒)") except Exception as e: raise Exception(f"数据源查询失败: {str(e)}") finally: db.close() async def database_query_tool( query: str, database: str = "default", data_source_id: Optional[str] = None, timeout: int = 30 ) -> str: """ 数据库查询工具 Args: query: SQL查询语句(只允许SELECT) database: 数据库名称(已废弃,保留用于兼容) data_source_id: 数据源ID(可选,如果提供则使用指定数据源) timeout: 查询超时时间(秒,默认30秒) Returns: JSON格式的查询结果 """ try: # 验证SQL安全性 is_safe, error_msg = _validate_sql_query(query) if not is_safe: return json.dumps({ "error": error_msg, "query": query }, ensure_ascii=False) # 验证超时时间 if timeout <= 0 or timeout > 300: return json.dumps({ "error": "超时时间必须在1-300秒之间", "timeout": timeout }, ensure_ascii=False) # 执行查询 if data_source_id: # 使用指定的数据源 result = await _execute_data_source_query(data_source_id, query, timeout) else: # 使用默认数据库 result = await _execute_default_db_query(query, timeout) # 格式化结果 return json.dumps({ "success": True, "row_count": len(result), "data": result, "query": query }, ensure_ascii=False, default=str) except Exception as e: logger.error(f"数据库查询工具执行失败: {str(e)}") return json.dumps({ "error": str(e), "query": query }, ensure_ascii=False) async def adb_log_tool( command: str = "logcat", filter_tag: Optional[str] = None, level: Optional[str] = None, max_lines: int = 100, timeout: int = 10 ) -> str: """ ADB日志工具 - 获取Android设备日志 Args: command: ADB命令类型(logcat=获取日志, devices=列出设备, shell=执行shell命令) filter_tag: 日志标签过滤(如 "ActivityManager", "SystemServer") level: 日志级别过滤(V/D/I/W/E/F/S,如 "E" 只显示错误) max_lines: 最大返回行数(默认100行,避免输出过长) timeout: 命令执行超时时间(秒,默认10秒) Returns: JSON格式的执行结果 """ try: # 验证超时时间 if timeout <= 0 or timeout > 60: return json.dumps({ "error": "超时时间必须在1-60秒之间", "timeout": timeout }, ensure_ascii=False) # 验证最大行数 if max_lines <= 0 or max_lines > 10000: return json.dumps({ "error": "最大行数必须在1-10000之间", "max_lines": max_lines }, ensure_ascii=False) # 构建adb命令 if command == "logcat": # 获取日志 adb_cmd = ["adb", "logcat", "-d"] # -d 表示获取已缓存的日志 # 添加日志级别过滤 if level: level = level.upper() if level in ["V", "D", "I", "W", "E", "F", "S"]: adb_cmd.extend([f"*:{level}"]) else: return json.dumps({ "error": f"无效的日志级别: {level},支持: V/D/I/W/E/F/S" }, ensure_ascii=False) # 添加标签过滤 if filter_tag: adb_cmd.append(filter_tag) # 限制输出行数 adb_cmd.extend(["-t", str(max_lines)]) elif command == "devices": # 列出连接的设备 adb_cmd = ["adb", "devices", "-l"] elif command == "shell": # 执行shell命令(受限,只允许安全命令) if filter_tag: # filter_tag在这里作为shell命令 # 只允许安全的命令 safe_commands = ["getprop", "dumpsys", "pm", "am", "settings"] cmd_parts = filter_tag.split() if not cmd_parts or cmd_parts[0] not in safe_commands: return json.dumps({ "error": f"不允许执行命令: {filter_tag},只允许: {', '.join(safe_commands)}" }, ensure_ascii=False) adb_cmd = ["adb", "shell"] + cmd_parts else: return json.dumps({ "error": "shell命令需要提供filter_tag参数作为命令" }, ensure_ascii=False) else: return json.dumps({ "error": f"不支持的ADB命令: {command},支持: logcat/devices/shell" }, ensure_ascii=False) # 执行adb命令 def _execute_adb(): try: result = subprocess.run( adb_cmd, capture_output=True, text=True, timeout=timeout, encoding='utf-8', errors='replace' # 处理编码错误 ) return result except subprocess.TimeoutExpired: raise Exception(f"ADB命令执行超时(超过{timeout}秒)") except FileNotFoundError: raise Exception("未找到adb命令,请确保已安装Android SDK Platform Tools并配置PATH") except Exception as e: raise Exception(f"执行ADB命令失败: {str(e)}") # 异步执行命令 result = await asyncio.wait_for( asyncio.to_thread(_execute_adb), timeout=timeout + 2 # 额外2秒缓冲 ) # 处理结果 output_lines = result.stdout.split('\n') if result.stdout else [] error_lines = result.stderr.split('\n') if result.stderr else [] # 如果输出太长,截断 if len(output_lines) > max_lines: output_lines = output_lines[:max_lines] output_lines.append(f"... (已截断,共 {len(result.stdout.split(chr(10)))} 行)") return json.dumps({ "success": result.returncode == 0, "command": " ".join(adb_cmd), "return_code": result.returncode, "output": "\n".join(output_lines), "output_lines": len(output_lines), "error": "\n".join(error_lines) if error_lines and any(error_lines) else None, "timestamp": datetime.now().isoformat() }, ensure_ascii=False) except asyncio.TimeoutError: return json.dumps({ "error": f"ADB命令执行超时(超过{timeout}秒)", "command": " ".join(adb_cmd) if 'adb_cmd' in locals() else command }, ensure_ascii=False) except Exception as e: logger.error(f"ADB日志工具执行失败: {str(e)}") return json.dumps({ "error": str(e), "command": " ".join(adb_cmd) if 'adb_cmd' in locals() else command }, ensure_ascii=False) # 工具定义(OpenAI Function格式) HTTP_REQUEST_SCHEMA = { "type": "function", "function": { "name": "http_request", "description": "发送HTTP请求,支持GET、POST、PUT、DELETE方法。可以用于调用API、获取网页内容等。", "parameters": { "type": "object", "properties": { "url": { "type": "string", "description": "请求的URL地址" }, "method": { "type": "string", "enum": ["GET", "POST", "PUT", "DELETE"], "description": "HTTP请求方法", "default": "GET" }, "headers": { "type": "object", "description": "HTTP请求头(可选)", "additionalProperties": { "type": "string" } }, "body": { "type": "object", "description": "请求体(POST/PUT时使用,可选)" } }, "required": ["url", "method"] } } } FILE_READ_SCHEMA = { "type": "function", "function": { "name": "file_read", "description": ( "读取工作区内文件并尽量提取可读文本:UTF-8 文本、.pdf 文字层、.docx 段落、.xlsx 单元格、" "常见图片(.png/.jpg 等)用 OCR 提取文字(需服务器安装 Tesseract,可选配置 TESSERACT_CMD)。" "用户上传附件后消息中会给出相对路径,请用本工具读取该路径。路径须落在 LOCAL_FILE_TOOLS_ROOT 下。" ), "parameters": { "type": "object", "properties": { "file_path": { "type": "string", "description": "要读取的文件路径(相对于项目根目录或绝对路径)" } }, "required": ["file_path"] } } } FILE_WRITE_SCHEMA = { "type": "function", "function": { "name": "file_write", "description": "写入本地文本文件(UTF-8,w 覆盖 / a 追加)。路径须落在工作区内;父目录不存在会自动创建。勿写入密码、密钥等大型敏感内容。", "parameters": { "type": "object", "properties": { "file_path": { "type": "string", "description": "要写入的文件路径(相对于项目根目录或绝对路径)" }, "content": { "type": "string", "description": "要写入的内容" }, "mode": { "type": "string", "enum": ["w", "a"], "description": "写入模式:w=覆盖写入,a=追加写入", "default": "w" } }, "required": ["file_path", "content"] } } } TEXT_ANALYZE_SCHEMA = { "type": "function", "function": { "name": "text_analyze", "description": "分析文本内容,支持统计字数、提取关键词、生成摘要等功能。", "parameters": { "type": "object", "properties": { "text": { "type": "string", "description": "要分析的文本内容" }, "operation": { "type": "string", "enum": ["count", "keywords", "summary"], "description": "操作类型:count=统计字数等信息,keywords=提取关键词,summary=生成摘要", "default": "count" } }, "required": ["text"] } } } DATETIME_SCHEMA = { "type": "function", "function": { "name": "datetime", "description": "获取和处理日期时间信息,支持获取当前时间、格式化时间等。", "parameters": { "type": "object", "properties": { "operation": { "type": "string", "enum": ["now", "format"], "description": "操作类型:now=获取当前时间,format=格式化时间", "default": "now" }, "format": { "type": "string", "description": "时间格式字符串(如:%Y-%m-%d %H:%M:%S)", "default": "%Y-%m-%d %H:%M:%S" } } } } } MATH_CALCULATE_SCHEMA = { "type": "function", "function": { "name": "math_calculate", "description": "执行数学计算,支持基本运算和常用数学函数(如sqrt, sin, cos, log等)。", "parameters": { "type": "object", "properties": { "expression": { "type": "string", "description": "数学表达式(如:2+2, sqrt(16), sin(0), pow(2,3))" } }, "required": ["expression"] } } } SYSTEM_INFO_SCHEMA = { "type": "function", "function": { "name": "system_info", "description": "获取系统信息(含 local_file_workspace_root:file_read/file_write 允许访问的根目录)。用户问「工作区路径」时应调用本工具并如实转述该字段。", "parameters": { "type": "object", "properties": {} } } } JSON_PROCESS_SCHEMA = { "type": "function", "function": { "name": "json_process", "description": "处理JSON数据,支持解析、序列化、验证等功能。", "parameters": { "type": "object", "properties": { "json_string": { "type": "string", "description": "JSON字符串" }, "operation": { "type": "string", "enum": ["parse", "stringify", "validate"], "description": "操作类型:parse=解析JSON,stringify=序列化为JSON,validate=验证JSON格式", "default": "parse" } }, "required": ["json_string"] } } } ADB_LOG_SCHEMA = { "type": "function", "function": { "name": "adb_log", "description": "执行ADB命令获取Android设备日志。支持logcat(获取日志)、devices(列出设备)、shell(执行shell命令)。可以过滤日志标签和级别。", "parameters": { "type": "object", "properties": { "command": { "type": "string", "enum": ["logcat", "devices", "shell"], "description": "ADB命令类型:logcat=获取日志,devices=列出连接的设备,shell=执行shell命令", "default": "logcat" }, "filter_tag": { "type": "string", "description": "日志标签过滤(如 'ActivityManager', 'SystemServer')或shell命令(当command=shell时)" }, "level": { "type": "string", "enum": ["V", "D", "I", "W", "E", "F", "S"], "description": "日志级别过滤:V=Verbose, D=Debug, I=Info, W=Warning, E=Error, F=Fatal, S=Silent" }, "max_lines": { "type": "integer", "description": "最大返回行数(默认100行,避免输出过长)", "default": 100 }, "timeout": { "type": "integer", "description": "命令执行超时时间(秒,默认10秒,最大60秒)", "default": 10 } } } } } DATABASE_QUERY_SCHEMA = { "type": "function", "function": { "name": "database_query", "description": "执行数据库查询(只允许SELECT查询,支持默认数据库和指定数据源)。可以查询工作流、Agent、执行记录等系统数据,或通过数据源ID查询外部数据库。", "parameters": { "type": "object", "properties": { "query": { "type": "string", "description": "SQL查询语句(只允许SELECT查询,不允许INSERT、UPDATE、DELETE等操作)" }, "data_source_id": { "type": "string", "description": "数据源ID(可选,如果提供则使用指定的数据源,否则使用默认数据库)" }, "timeout": { "type": "integer", "description": "查询超时时间(秒,默认30秒,最大300秒)", "default": 30, "minimum": 1, "maximum": 300 } }, "required": ["query"] } } }