609 lines
19 KiB
Python
609 lines
19 KiB
Python
"""
|
||
内置工具实现
|
||
"""
|
||
from typing import Dict, Any, Optional, List
|
||
import httpx
|
||
import json
|
||
import os
|
||
import logging
|
||
import re
|
||
import math
|
||
from datetime import datetime, timedelta
|
||
import platform
|
||
import sys
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
async def http_request_tool(
|
||
url: str,
|
||
method: str = "GET",
|
||
headers: Optional[Dict[str, str]] = None,
|
||
body: Any = None
|
||
) -> str:
|
||
"""
|
||
HTTP请求工具
|
||
|
||
Args:
|
||
url: 请求URL
|
||
method: HTTP方法 (GET, POST, PUT, DELETE)
|
||
headers: 请求头
|
||
body: 请求体(POST/PUT时使用)
|
||
|
||
Returns:
|
||
JSON格式的响应结果
|
||
"""
|
||
try:
|
||
async with httpx.AsyncClient(timeout=30.0) as client:
|
||
method_upper = method.upper()
|
||
|
||
if method_upper == "GET":
|
||
response = await client.get(url, headers=headers)
|
||
elif method_upper == "POST":
|
||
response = await client.post(url, json=body, headers=headers)
|
||
elif method_upper == "PUT":
|
||
response = await client.put(url, json=body, headers=headers)
|
||
elif method_upper == "DELETE":
|
||
response = await client.delete(url, headers=headers)
|
||
else:
|
||
raise ValueError(f"不支持的HTTP方法: {method}")
|
||
|
||
# 尝试解析JSON响应
|
||
try:
|
||
response_body = response.json()
|
||
except:
|
||
response_body = response.text
|
||
|
||
result = {
|
||
"status_code": response.status_code,
|
||
"headers": dict(response.headers),
|
||
"body": response_body
|
||
}
|
||
|
||
return json.dumps(result, ensure_ascii=False)
|
||
except Exception as e:
|
||
logger.error(f"HTTP请求工具执行失败: {str(e)}")
|
||
return json.dumps({"error": str(e)}, ensure_ascii=False)
|
||
|
||
|
||
async def file_read_tool(file_path: str) -> str:
|
||
"""
|
||
文件读取工具
|
||
|
||
Args:
|
||
file_path: 文件路径
|
||
|
||
Returns:
|
||
文件内容或错误信息
|
||
"""
|
||
try:
|
||
# 安全检查:限制可读取的文件路径
|
||
# 只允许读取项目目录下的文件
|
||
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../.."))
|
||
abs_path = os.path.abspath(file_path)
|
||
|
||
if not abs_path.startswith(project_root):
|
||
return json.dumps({
|
||
"error": f"不允许读取项目目录外的文件: {file_path}"
|
||
}, ensure_ascii=False)
|
||
|
||
with open(file_path, 'r', encoding='utf-8') as f:
|
||
content = f.read()
|
||
|
||
return json.dumps({
|
||
"file_path": file_path,
|
||
"content": content,
|
||
"size": len(content)
|
||
}, ensure_ascii=False)
|
||
except FileNotFoundError:
|
||
return json.dumps({
|
||
"error": f"文件不存在: {file_path}"
|
||
}, ensure_ascii=False)
|
||
except Exception as e:
|
||
logger.error(f"文件读取工具执行失败: {str(e)}")
|
||
return json.dumps({
|
||
"error": str(e)
|
||
}, ensure_ascii=False)
|
||
|
||
|
||
async def file_write_tool(file_path: str, content: str, mode: str = "w") -> str:
|
||
"""
|
||
文件写入工具
|
||
|
||
Args:
|
||
file_path: 文件路径
|
||
content: 要写入的内容
|
||
mode: 写入模式(w=覆盖,a=追加)
|
||
|
||
Returns:
|
||
写入结果
|
||
"""
|
||
try:
|
||
# 安全检查:限制可写入的文件路径
|
||
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../.."))
|
||
abs_path = os.path.abspath(file_path)
|
||
|
||
if not abs_path.startswith(project_root):
|
||
return json.dumps({
|
||
"error": f"不允许写入项目目录外的文件: {file_path}"
|
||
}, ensure_ascii=False)
|
||
|
||
write_mode = "a" if mode == "a" else "w"
|
||
with open(file_path, write_mode, encoding='utf-8') as f:
|
||
f.write(content)
|
||
|
||
return json.dumps({
|
||
"success": True,
|
||
"file_path": file_path,
|
||
"mode": write_mode,
|
||
"content_length": len(content)
|
||
}, ensure_ascii=False)
|
||
except Exception as e:
|
||
logger.error(f"文件写入工具执行失败: {str(e)}")
|
||
return json.dumps({
|
||
"error": str(e)
|
||
}, ensure_ascii=False)
|
||
|
||
|
||
async def text_analyze_tool(text: str, operation: str = "count") -> str:
|
||
"""
|
||
文本分析工具
|
||
|
||
Args:
|
||
text: 要分析的文本
|
||
operation: 操作类型(count=统计, keywords=提取关键词, summary=摘要)
|
||
|
||
Returns:
|
||
分析结果
|
||
"""
|
||
try:
|
||
if operation == "count":
|
||
# 统计字数、字符数、行数等
|
||
char_count = len(text)
|
||
char_count_no_spaces = len(text.replace(" ", ""))
|
||
word_count = len(text.split())
|
||
line_count = len(text.splitlines())
|
||
paragraph_count = len([p for p in text.split("\n\n") if p.strip()])
|
||
|
||
return json.dumps({
|
||
"char_count": char_count,
|
||
"char_count_no_spaces": char_count_no_spaces,
|
||
"word_count": word_count,
|
||
"line_count": line_count,
|
||
"paragraph_count": paragraph_count
|
||
}, ensure_ascii=False)
|
||
|
||
elif operation == "keywords":
|
||
# 简单的关键词提取(基于词频)
|
||
words = re.findall(r'\b\w+\b', text.lower())
|
||
word_freq = {}
|
||
for word in words:
|
||
if len(word) > 2: # 忽略太短的词
|
||
word_freq[word] = word_freq.get(word, 0) + 1
|
||
|
||
# 取频率最高的10个词
|
||
top_keywords = sorted(word_freq.items(), key=lambda x: x[1], reverse=True)[:10]
|
||
|
||
return json.dumps({
|
||
"keywords": [{"word": k, "frequency": v} for k, v in top_keywords]
|
||
}, ensure_ascii=False)
|
||
|
||
elif operation == "summary":
|
||
# 简单的摘要(取前3句)
|
||
sentences = re.split(r'[.!?。!?]\s+', text)
|
||
summary = ". ".join(sentences[:3]) + "."
|
||
|
||
return json.dumps({
|
||
"summary": summary,
|
||
"original_length": len(text),
|
||
"summary_length": len(summary)
|
||
}, ensure_ascii=False)
|
||
|
||
else:
|
||
return json.dumps({
|
||
"error": f"不支持的操作类型: {operation}"
|
||
}, ensure_ascii=False)
|
||
except Exception as e:
|
||
logger.error(f"文本分析工具执行失败: {str(e)}")
|
||
return json.dumps({
|
||
"error": str(e)
|
||
}, ensure_ascii=False)
|
||
|
||
|
||
async def datetime_tool(operation: str = "now", format: str = "%Y-%m-%d %H:%M:%S",
|
||
timezone: Optional[str] = None) -> str:
|
||
"""
|
||
日期时间工具
|
||
|
||
Args:
|
||
operation: 操作类型(now=当前时间, format=格式化, parse=解析)
|
||
format: 时间格式
|
||
timezone: 时区(暂未实现)
|
||
|
||
Returns:
|
||
日期时间结果
|
||
"""
|
||
try:
|
||
if operation == "now":
|
||
now = datetime.now()
|
||
return json.dumps({
|
||
"datetime": now.strftime(format),
|
||
"timestamp": now.timestamp(),
|
||
"iso_format": now.isoformat()
|
||
}, ensure_ascii=False)
|
||
|
||
elif operation == "format":
|
||
now = datetime.now()
|
||
return json.dumps({
|
||
"formatted": now.strftime(format)
|
||
}, ensure_ascii=False)
|
||
|
||
else:
|
||
return json.dumps({
|
||
"error": f"不支持的操作类型: {operation}"
|
||
}, ensure_ascii=False)
|
||
except Exception as e:
|
||
logger.error(f"日期时间工具执行失败: {str(e)}")
|
||
return json.dumps({
|
||
"error": str(e)
|
||
}, ensure_ascii=False)
|
||
|
||
|
||
async def math_calculate_tool(expression: str) -> str:
|
||
"""
|
||
数学计算工具(安全限制:只允许基本数学运算)
|
||
|
||
Args:
|
||
expression: 数学表达式(如 "2+2", "sqrt(16)", "sin(0)")
|
||
|
||
Returns:
|
||
计算结果
|
||
"""
|
||
try:
|
||
# 安全检查:只允许数字、基本运算符和安全的数学函数
|
||
allowed_chars = set("0123456789+-*/.() ")
|
||
allowed_functions = ['sqrt', 'sin', 'cos', 'tan', 'log', 'exp', 'abs', 'pow']
|
||
|
||
# 检查表达式是否安全
|
||
if not all(c in allowed_chars or any(f in expression for f in allowed_functions) for c in expression):
|
||
return json.dumps({
|
||
"error": "表达式包含不安全的字符"
|
||
}, ensure_ascii=False)
|
||
|
||
# 构建安全的数学环境
|
||
safe_dict = {
|
||
"__builtins__": {},
|
||
"abs": abs,
|
||
"sqrt": math.sqrt,
|
||
"sin": math.sin,
|
||
"cos": math.cos,
|
||
"tan": math.tan,
|
||
"log": math.log,
|
||
"exp": math.exp,
|
||
"pow": pow,
|
||
"pi": math.pi,
|
||
"e": math.e
|
||
}
|
||
|
||
result = eval(expression, safe_dict)
|
||
|
||
return json.dumps({
|
||
"expression": expression,
|
||
"result": result,
|
||
"result_type": type(result).__name__
|
||
}, ensure_ascii=False)
|
||
except Exception as e:
|
||
logger.error(f"数学计算工具执行失败: {str(e)}")
|
||
return json.dumps({
|
||
"error": str(e)
|
||
}, ensure_ascii=False)
|
||
|
||
|
||
async def system_info_tool() -> str:
|
||
"""
|
||
系统信息工具
|
||
|
||
Returns:
|
||
系统信息
|
||
"""
|
||
try:
|
||
info = {
|
||
"platform": platform.system(),
|
||
"platform_version": platform.version(),
|
||
"architecture": platform.machine(),
|
||
"processor": platform.processor(),
|
||
"python_version": sys.version,
|
||
"python_version_info": {
|
||
"major": sys.version_info.major,
|
||
"minor": sys.version_info.minor,
|
||
"micro": sys.version_info.micro
|
||
}
|
||
}
|
||
|
||
return json.dumps(info, ensure_ascii=False)
|
||
except Exception as e:
|
||
logger.error(f"系统信息工具执行失败: {str(e)}")
|
||
return json.dumps({
|
||
"error": str(e)
|
||
}, ensure_ascii=False)
|
||
|
||
|
||
async def json_process_tool(json_string: str, operation: str = "parse") -> str:
|
||
"""
|
||
JSON处理工具
|
||
|
||
Args:
|
||
json_string: JSON字符串
|
||
operation: 操作类型(parse=解析, stringify=序列化, validate=验证)
|
||
|
||
Returns:
|
||
处理结果
|
||
"""
|
||
try:
|
||
if operation == "parse":
|
||
data = json.loads(json_string)
|
||
return json.dumps({
|
||
"parsed": data,
|
||
"type": type(data).__name__
|
||
}, ensure_ascii=False)
|
||
|
||
elif operation == "stringify":
|
||
# 如果输入已经是字符串,尝试解析后再序列化
|
||
try:
|
||
data = json.loads(json_string)
|
||
except:
|
||
data = json_string
|
||
|
||
return json.dumps({
|
||
"stringified": json.dumps(data, ensure_ascii=False, indent=2)
|
||
}, ensure_ascii=False)
|
||
|
||
elif operation == "validate":
|
||
try:
|
||
json.loads(json_string)
|
||
return json.dumps({
|
||
"valid": True
|
||
}, ensure_ascii=False)
|
||
except json.JSONDecodeError as e:
|
||
return json.dumps({
|
||
"valid": False,
|
||
"error": str(e)
|
||
}, ensure_ascii=False)
|
||
|
||
else:
|
||
return json.dumps({
|
||
"error": f"不支持的操作类型: {operation}"
|
||
}, ensure_ascii=False)
|
||
except Exception as e:
|
||
logger.error(f"JSON处理工具执行失败: {str(e)}")
|
||
return json.dumps({
|
||
"error": str(e)
|
||
}, ensure_ascii=False)
|
||
|
||
|
||
async def database_query_tool(query: str, database: str = "default") -> str:
|
||
"""
|
||
数据库查询工具(占位实现)
|
||
|
||
Args:
|
||
query: SQL查询语句
|
||
database: 数据库名称
|
||
|
||
Returns:
|
||
查询结果
|
||
"""
|
||
# TODO: 实现数据库查询逻辑
|
||
return json.dumps({
|
||
"error": "数据库查询工具尚未实现",
|
||
"query": query,
|
||
"database": database
|
||
}, ensure_ascii=False)
|
||
|
||
|
||
# 工具定义(OpenAI Function格式)
|
||
HTTP_REQUEST_SCHEMA = {
|
||
"type": "function",
|
||
"function": {
|
||
"name": "http_request",
|
||
"description": "发送HTTP请求,支持GET、POST、PUT、DELETE方法。可以用于调用API、获取网页内容等。",
|
||
"parameters": {
|
||
"type": "object",
|
||
"properties": {
|
||
"url": {
|
||
"type": "string",
|
||
"description": "请求的URL地址"
|
||
},
|
||
"method": {
|
||
"type": "string",
|
||
"enum": ["GET", "POST", "PUT", "DELETE"],
|
||
"description": "HTTP请求方法",
|
||
"default": "GET"
|
||
},
|
||
"headers": {
|
||
"type": "object",
|
||
"description": "HTTP请求头(可选)",
|
||
"additionalProperties": {
|
||
"type": "string"
|
||
}
|
||
},
|
||
"body": {
|
||
"type": "object",
|
||
"description": "请求体(POST/PUT时使用,可选)"
|
||
}
|
||
},
|
||
"required": ["url", "method"]
|
||
}
|
||
}
|
||
}
|
||
|
||
FILE_READ_SCHEMA = {
|
||
"type": "function",
|
||
"function": {
|
||
"name": "file_read",
|
||
"description": "读取文件内容。只能读取项目目录下的文件,确保文件路径正确。",
|
||
"parameters": {
|
||
"type": "object",
|
||
"properties": {
|
||
"file_path": {
|
||
"type": "string",
|
||
"description": "要读取的文件路径(相对于项目根目录或绝对路径)"
|
||
}
|
||
},
|
||
"required": ["file_path"]
|
||
}
|
||
}
|
||
}
|
||
|
||
FILE_WRITE_SCHEMA = {
|
||
"type": "function",
|
||
"function": {
|
||
"name": "file_write",
|
||
"description": "写入文件内容。只能写入项目目录下的文件,确保文件路径正确。",
|
||
"parameters": {
|
||
"type": "object",
|
||
"properties": {
|
||
"file_path": {
|
||
"type": "string",
|
||
"description": "要写入的文件路径(相对于项目根目录或绝对路径)"
|
||
},
|
||
"content": {
|
||
"type": "string",
|
||
"description": "要写入的内容"
|
||
},
|
||
"mode": {
|
||
"type": "string",
|
||
"enum": ["w", "a"],
|
||
"description": "写入模式:w=覆盖写入,a=追加写入",
|
||
"default": "w"
|
||
}
|
||
},
|
||
"required": ["file_path", "content"]
|
||
}
|
||
}
|
||
}
|
||
|
||
TEXT_ANALYZE_SCHEMA = {
|
||
"type": "function",
|
||
"function": {
|
||
"name": "text_analyze",
|
||
"description": "分析文本内容,支持统计字数、提取关键词、生成摘要等功能。",
|
||
"parameters": {
|
||
"type": "object",
|
||
"properties": {
|
||
"text": {
|
||
"type": "string",
|
||
"description": "要分析的文本内容"
|
||
},
|
||
"operation": {
|
||
"type": "string",
|
||
"enum": ["count", "keywords", "summary"],
|
||
"description": "操作类型:count=统计字数等信息,keywords=提取关键词,summary=生成摘要",
|
||
"default": "count"
|
||
}
|
||
},
|
||
"required": ["text"]
|
||
}
|
||
}
|
||
}
|
||
|
||
DATETIME_SCHEMA = {
|
||
"type": "function",
|
||
"function": {
|
||
"name": "datetime",
|
||
"description": "获取和处理日期时间信息,支持获取当前时间、格式化时间等。",
|
||
"parameters": {
|
||
"type": "object",
|
||
"properties": {
|
||
"operation": {
|
||
"type": "string",
|
||
"enum": ["now", "format"],
|
||
"description": "操作类型:now=获取当前时间,format=格式化时间",
|
||
"default": "now"
|
||
},
|
||
"format": {
|
||
"type": "string",
|
||
"description": "时间格式字符串(如:%Y-%m-%d %H:%M:%S)",
|
||
"default": "%Y-%m-%d %H:%M:%S"
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
MATH_CALCULATE_SCHEMA = {
|
||
"type": "function",
|
||
"function": {
|
||
"name": "math_calculate",
|
||
"description": "执行数学计算,支持基本运算和常用数学函数(如sqrt, sin, cos, log等)。",
|
||
"parameters": {
|
||
"type": "object",
|
||
"properties": {
|
||
"expression": {
|
||
"type": "string",
|
||
"description": "数学表达式(如:2+2, sqrt(16), sin(0), pow(2,3))"
|
||
}
|
||
},
|
||
"required": ["expression"]
|
||
}
|
||
}
|
||
}
|
||
|
||
SYSTEM_INFO_SCHEMA = {
|
||
"type": "function",
|
||
"function": {
|
||
"name": "system_info",
|
||
"description": "获取系统信息,包括操作系统、Python版本等。",
|
||
"parameters": {
|
||
"type": "object",
|
||
"properties": {}
|
||
}
|
||
}
|
||
}
|
||
|
||
JSON_PROCESS_SCHEMA = {
|
||
"type": "function",
|
||
"function": {
|
||
"name": "json_process",
|
||
"description": "处理JSON数据,支持解析、序列化、验证等功能。",
|
||
"parameters": {
|
||
"type": "object",
|
||
"properties": {
|
||
"json_string": {
|
||
"type": "string",
|
||
"description": "JSON字符串"
|
||
},
|
||
"operation": {
|
||
"type": "string",
|
||
"enum": ["parse", "stringify", "validate"],
|
||
"description": "操作类型:parse=解析JSON,stringify=序列化为JSON,validate=验证JSON格式",
|
||
"default": "parse"
|
||
}
|
||
},
|
||
"required": ["json_string"]
|
||
}
|
||
}
|
||
}
|
||
|
||
DATABASE_QUERY_SCHEMA = {
|
||
"type": "function",
|
||
"function": {
|
||
"name": "database_query",
|
||
"description": "执行数据库查询(暂未实现)",
|
||
"parameters": {
|
||
"type": "object",
|
||
"properties": {
|
||
"query": {
|
||
"type": "string",
|
||
"description": "SQL查询语句"
|
||
},
|
||
"database": {
|
||
"type": "string",
|
||
"description": "数据库名称",
|
||
"default": "default"
|
||
}
|
||
},
|
||
"required": ["query"]
|
||
}
|
||
}
|
||
}
|