Files
aiagent/scripts/seed_tools.py

327 lines
17 KiB
Python
Raw Permalink Normal View History

"""为工具市场批量创建示例工具"""
import json
import urllib.request
import urllib.parse
import ssl
BASE = "http://localhost:8037"
ssl_ctx = ssl.create_default_context()
ssl_ctx.check_hostname = False
ssl_ctx.verify_mode = ssl.CERT_NONE
def req(method, path, headers=None, body=None, raw_body=None):
hdrs = {"Content-Type": "application/json"}
if headers:
hdrs.update(headers)
data = raw_body if raw_body else (json.dumps(body).encode() if body else None)
r = urllib.request.Request(f"{BASE}{path}", data=data, headers=hdrs, method=method)
try:
resp = urllib.request.urlopen(r, context=ssl_ctx, timeout=10)
return resp.status, json.loads(resp.read())
except urllib.request.HTTPError as e:
return e.code, json.loads(e.read())
# 1. 注册用户并登录
_, _ = req("POST", "/api/v1/auth/register", body={
"username": "tooladmin", "email": "tooladmin@test.com", "password": "test123456"
})
status, login_data = req("POST", "/api/v1/auth/login",
headers={"Content-Type": "application/x-www-form-urlencoded"},
raw_body=urllib.parse.urlencode({"username": "tooladmin", "password": "test123456"}).encode())
token = login_data["access_token"]
auth = {"Authorization": f"Bearer {token}"}
print(f"Logged in, token: {token[:20]}...")
# 2. 创建工具定义
tools = [
# ── 文本处理 ──
{
"name": "html_to_markdown",
"description": "将HTML转换为Markdown格式支持表格、链接、图片转换",
"category": "数据处理",
"implementation_type": "code",
"is_public": True,
"function_schema": {"name": "html_to_markdown", "description": "将HTML转换为Markdown格式", "parameters": {"type": "object", "properties": {"html": {"type": "string", "description": "HTML内容"}}, "required": ["html"]}},
"implementation_config": {"source": """def run(args):
import re
html = args.get("html", "")
html = re.sub(r"<script[^>]*>.*?</script>", "", html, flags=re.DOTALL)
html = re.sub(r"<style[^>]*>.*?</style>", "", html, flags=re.DOTALL)
for i in range(6, 0, -1):
html = re.sub(rf"<h{i}[^>]*>(.*?)</h{i}>", "#" * i + r" \\1\\n", html, flags=re.DOTALL)
html = re.sub(r"<strong>(.*?)</strong>", r"**\\1**", html)
html = re.sub(r"<em>(.*?)</em>", r"*\\1*", html)
html = re.sub(r'<a[^>]*href="(.+?)"[^>]*>(.*?)</a>', r"[\\2](\\1)", html)
html = re.sub(r'<img[^>]*src="(.+?)"[^>]*alt="(.+?)"[^>]*>', r"![\\2](\\1)", html)
html = re.sub(r"<p[^>]*>(.*?)</p>", r"\\1\\n\\n", html, flags=re.DOTALL)
html = re.sub(r"<br\\s*/?>", "\\n", html)
html = re.sub(r"<li>(.*?)</li>", r"- \\1\\n", html, flags=re.DOTALL)
html = re.sub(r"<[^>]+>", "", html)
html = re.sub(r"\\n{3,}", "\\n\\n", html)
return html.strip()"""},
},
{
"name": "extract_info",
"description": "从文本中提取邮箱地址、电话号码、URL链接等信息",
"category": "数据处理",
"implementation_type": "code",
"is_public": True,
"function_schema": {"name": "extract_info", "description": "提取文本中的结构化信息", "parameters": {"type": "object", "properties": {"text": {"type": "string", "description": "要分析的文本"}}, "required": ["text"]}},
"implementation_config": {"source": """def run(args):
import re
text = args.get("text", "")
emails = list(set(re.findall(r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}", text)))
phones = list(set(re.findall(r"1[3-9]\\d{9}", text)))
urls = list(set(re.findall(r"https?://[^\\s,<>\"']+", text)))
ip_addrs = list(set(re.findall(r"\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}", text)))
return {"emails": emails, "phones": phones, "urls": urls, "ip_addresses": ip_addrs}"""},
},
{
"name": "text_summarize",
"description": "对长文本进行智能摘要,可指定摘要长度",
"category": "数据处理",
"implementation_type": "code",
"is_public": True,
"function_schema": {"name": "text_summarize", "description": "文本摘要", "parameters": {"type": "object", "properties": {"text": {"type": "string", "description": "要摘要的文本"}, "max_sentences": {"type": "integer", "description": "最大句子数", "default": 3}}, "required": ["text"]}},
"implementation_config": {"source": """def run(args):
import re
text = args.get("text", "")
max_sent = int(args.get("max_sentences", 3))
sentences = re.split(r"[.!?。!?]+", text)
sentences = [s.strip() for s in sentences if len(s.strip()) > 5]
if not sentences:
return {"summary": text[:200], "original_length": len(text)}
# Score sentences by length (prefer medium-length sentences)
scored = []
avg_len = sum(len(s) for s in sentences) / max(len(sentences), 1)
for s in sentences:
score = 1.0 - abs(len(s) - avg_len) / max(avg_len, 1)
scored.append((score, s))
scored.sort(key=lambda x: -x[0])
summary = ".".join(s for _, s in scored[:max_sent]) + "."
return {"summary": summary, "original_length": len(text), "summary_length": len(summary)}"""},
},
{
"name": "json_tool",
"description": "JSON格式化、压缩、验证和转换支持JSON与YAML/CSV互转",
"category": "数据处理",
"implementation_type": "code",
"is_public": True,
"function_schema": {"name": "json_tool", "description": "JSON处理工具", "parameters": {"type": "object", "properties": {"input": {"type": "string", "description": "输入内容"}, "action": {"type": "string", "enum": ["format", "compress", "validate", "to_csv", "to_yaml"], "description": "操作类型"}, "indent": {"type": "integer", "description": "缩进空格数", "default": 2}}, "required": ["input", "action"]}},
"implementation_config": {"source": """def run(args):
import json
inp = args.get("input", "")
action = args.get("action", "format")
indent = int(args.get("indent", 2))
try:
data = json.loads(inp) if isinstance(inp, str) else inp
except json.JSONDecodeError as e:
return {"error": f"JSON解析失败: {e}"}
if action == "validate":
return {"valid": True, "type": str(type(data).__name__), "keys": list(data.keys()) if isinstance(data, dict) else None}
elif action == "compress":
return {"result": json.dumps(data, ensure_ascii=False, separators=(",", ":"))}
elif action == "to_csv":
if isinstance(data, list) and len(data) > 0 and isinstance(data[0], dict):
keys = list(data[0].keys())
lines = [",".join(keys)]
for row in data:
lines.append(",".join(str(row.get(k, "")) for k in keys))
return {"result": "\\n".join(lines)}
return {"error": "CSV转换仅支持对象数组"}
else:
return {"result": json.dumps(data, ensure_ascii=False, indent=indent)}"""},
},
# ── 网络/HTTP ──
{
"name": "weather_query",
"description": "查询指定城市的实时天气信息,包含温度、湿度、风速和天气状况",
"category": "网络请求",
"implementation_type": "http",
"is_public": True,
"function_schema": {"name": "weather_query", "description": "查询城市天气", "parameters": {"type": "object", "properties": {"city": {"type": "string", "description": "城市名称,如北京、上海"}}, "required": ["city"]}},
"implementation_config": {"url": "https://wttr.in/{city}?format=j1", "method": "GET", "timeout": 15, "headers": {}},
},
{
"name": "shorten_url",
"description": "生成短链接方便分享长URL",
"category": "网络请求",
"implementation_type": "http",
"is_public": True,
"function_schema": {"name": "shorten_url", "description": "生成短链接", "parameters": {"type": "object", "properties": {"url": {"type": "string", "description": "要缩短的长URL"}}, "required": ["url"]}},
"implementation_config": {"url": "https://is.gd/create.php?format=simple&url={url}", "method": "GET", "timeout": 10, "headers": {}},
},
{
"name": "check_website",
"description": "检测网站是否可访问返回HTTP状态码和响应时间",
"category": "网络请求",
"implementation_type": "code",
"is_public": True,
"function_schema": {"name": "check_website", "description": "检测网站状态", "parameters": {"type": "object", "properties": {"url": {"type": "string", "description": "网站URL"}, "timeout": {"type": "integer", "description": "超时秒数", "default": 10}}, "required": ["url"]}},
"implementation_config": {"source": """def run(args):
import urllib.request, time
url = args.get("url", "")
timeout = int(args.get("timeout", 10))
if not url.startswith("http"):
url = "https://" + url
start = time.time()
try:
resp = urllib.request.urlopen(url, timeout=timeout)
elapsed = round((time.time() - start) * 1000)
content = resp.read()
return {"status_code": resp.status, "elapsed_ms": elapsed, "size_bytes": len(content), "reachable": True}
except Exception as e:
elapsed = round((time.time() - start) * 1000)
return {"status_code": None, "elapsed_ms": elapsed, "error": str(e), "reachable": False}"""},
},
# ── 文件/格式 ──
{
"name": "base64_codec",
"description": "Base64编解码支持文本和文件内容",
"category": "文件操作",
"implementation_type": "code",
"is_public": True,
"function_schema": {"name": "base64_codec", "description": "Base64编解码", "parameters": {"type": "object", "properties": {"input": {"type": "string", "description": "输入内容"}, "action": {"type": "string", "enum": ["encode", "decode"], "description": "操作类型", "default": "encode"}}, "required": ["input"]}},
"implementation_config": {"source": """def run(args):
import base64
inp = args.get("input", "")
action = args.get("action", "encode")
try:
if action == "encode":
result = base64.b64encode(inp.encode()).decode()
else:
result = base64.b64decode(inp).decode("utf-8", errors="replace")
return {"result": result, "length": len(result)}
except Exception as e:
return {"error": str(e)}"""},
},
{
"name": "csv_processor",
"description": "CSV文件处理解析、筛选、排序、统计聚合",
"category": "数据处理",
"implementation_type": "code",
"is_public": True,
"function_schema": {"name": "csv_processor", "description": "CSV处理工具", "parameters": {"type": "object", "properties": {"csv_text": {"type": "string", "description": "CSV文本内容"}, "operation": {"type": "string", "enum": ["parse", "stats", "filter", "sort"], "description": "操作类型"}, "column": {"type": "string", "description": "列名(用于筛选/排序)"}, "keyword": {"type": "string", "description": "筛选关键词"}}, "required": ["csv_text", "operation"]}},
"implementation_config": {"source": """def run(args):
import csv, io, json
text = args.get("csv_text", "")
op = args.get("operation", "parse")
col = args.get("column")
keyword = args.get("keyword", "").lower()
reader = csv.DictReader(io.StringIO(text))
rows = list(reader)
if not rows:
return {"error": "空CSV或格式错误", "rows": 0}
headers = list(rows[0].keys())
if op == "parse":
return {"headers": headers, "rows": len(rows), "data": rows[:50], "total_rows": len(rows)}
elif op == "stats":
stats = {}
for h in headers:
vals = [r[h] for r in rows if r[h]]
nums = []
for v in vals:
try: nums.append(float(v))
except: pass
if nums:
stats[h] = {"min": min(nums), "max": max(nums), "avg": round(sum(nums)/len(nums), 2), "count": len(nums)}
else:
stats[h] = {"unique_values": len(set(vals)), "count": len(vals)}
return {"stats": stats, "total_rows": len(rows)}
elif op == "filter" and col:
filtered = [r for r in rows if keyword in r.get(col, "").lower()]
return {"filtered_rows": len(filtered), "data": filtered[:50], "total_rows": len(rows)}
elif op == "sort" and col:
sorted_rows = sorted(rows, key=lambda r: (r.get(col, "") or ""))
return {"sorted": True, "column": col, "data": sorted_rows[:50], "total_rows": len(rows)}
return {"error": "未知操作"}"""},
},
# ── AI/智能 ──
{
"name": "sentiment_analysis",
"description": "简单的情感分析,判断文本的情感倾向(正面/负面/中性无需调用LLM",
"category": "AI服务",
"implementation_type": "code",
"is_public": True,
"function_schema": {"name": "sentiment_analysis", "description": "文本情感分析", "parameters": {"type": "object", "properties": {"text": {"type": "string", "description": "要分析的文本"}}, "required": ["text"]}},
"implementation_config": {"source": """def run(args):
text = args.get("text", "")
positive_words = ["", "", "优秀", "喜欢", "满意", "推荐", "", "开心", " beautiful", "good", "great", "excellent", "love", "amazing", "wonderful", "fantastic", "positive"]
negative_words = ["", "", "糟糕", "讨厌", "失望", "差劲", "恶心", "生气", "bad", "terrible", "awful", "hate", "horrible", "poor", "worst", "negative", "ugly"]
text_lower = text.lower()
pos_count = sum(1 for w in positive_words if w in text_lower)
neg_count = sum(1 for w in negative_words if w in text_lower)
total = pos_count + neg_count
if total == 0:
sentiment, score = "中性", 0.0
else:
score = (pos_count - neg_count) / total
if score > 0.2: sentiment = "正面"
elif score < -0.2: sentiment = "负面"
else: sentiment = "中性"
return {"sentiment": sentiment, "score": round(score, 3), "positive_hits": pos_count, "negative_hits": neg_count, "text_length": len(text)}"""},
},
{
"name": "uuid_generator",
"description": "生成UUID/GUID支持单次和批量生成",
"category": "自定义",
"implementation_type": "code",
"is_public": True,
"function_schema": {"name": "uuid_generator", "description": "UUID生成器", "parameters": {"type": "object", "properties": {"count": {"type": "integer", "description": "生成数量", "default": 1}}, "required": []}},
"implementation_config": {"source": """def run(args):
import uuid
count = min(int(args.get("count", 1)), 100)
uuids = [str(uuid.uuid4()) for _ in range(count)]
return {"uuids": uuids, "count": count}"""},
},
{
"name": "timestamp",
"description": "时间戳转换工具:获取当前时间戳,或转换时间戳为可读日期",
"category": "自定义",
"implementation_type": "code",
"is_public": True,
"function_schema": {"name": "timestamp", "description": "时间戳工具", "parameters": {"type": "object", "properties": {"action": {"type": "string", "enum": ["now", "to_datetime", "to_timestamp"], "description": "操作类型"}, "value": {"type": "string", "description": "输入值(转换时使用)"}, "format": {"type": "string", "description": "日期格式,如 %Y-%m-%d %H:%M:%S", "default": "%Y-%m-%d %H:%M:%S"}}, "required": ["action"]}},
"implementation_config": {"source": """def run(args):
from datetime import datetime
action = args.get("action", "now")
fmt = args.get("format", "%Y-%m-%d %H:%M:%S")
if action == "now":
now = datetime.now()
return {"timestamp": int(now.timestamp()), "datetime": now.strftime(fmt), "utc": datetime.utcnow().strftime(fmt)}
elif action == "to_datetime":
val = args.get("value", "")
try:
ts = int(val)
dt = datetime.fromtimestamp(ts)
return {"timestamp": ts, "datetime": dt.strftime(fmt)}
except: return {"error": f"无效时间戳: {val}"}
elif action == "to_timestamp":
val = args.get("value", "")
try:
dt = datetime.strptime(val, fmt)
return {"datetime": val, "timestamp": int(dt.timestamp())}
except: return {"error": f"无法解析日期: {val},格式: {fmt}"}
return {"error": "未知操作"}"""},
},
]
# 3. 批量创建
success = 0
failed = 0
for t in tools:
status, data = req("POST", "/api/v1/tools", headers=auth, body=t)
if status == 201:
print(f"{t['name']}")
success += 1
else:
print(f"{t['name']}: {data.get('detail', data)}")
failed += 1
print(f"\n创建完成: {success} 成功, {failed} 失败")