Files
aiagent/scripts/seed_tools.py
renjianbo 342f3fcb16 chore: 添加工具市场种子脚本
批量创建 12 个实用工具:HTML2Markdown、信息提取、文本摘要、JSON处理、
天气查询、短链接、网站检测、Base64编解码、CSV处理、情感分析、UUID生成、时间戳

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-01 23:06:08 +08:00

327 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""为工具市场批量创建示例工具"""
import json
import urllib.request
import urllib.parse
import ssl
BASE = "http://localhost:8037"
ssl_ctx = ssl.create_default_context()
ssl_ctx.check_hostname = False
ssl_ctx.verify_mode = ssl.CERT_NONE
def req(method, path, headers=None, body=None, raw_body=None):
hdrs = {"Content-Type": "application/json"}
if headers:
hdrs.update(headers)
data = raw_body if raw_body else (json.dumps(body).encode() if body else None)
r = urllib.request.Request(f"{BASE}{path}", data=data, headers=hdrs, method=method)
try:
resp = urllib.request.urlopen(r, context=ssl_ctx, timeout=10)
return resp.status, json.loads(resp.read())
except urllib.request.HTTPError as e:
return e.code, json.loads(e.read())
# 1. 注册用户并登录
_, _ = req("POST", "/api/v1/auth/register", body={
"username": "tooladmin", "email": "tooladmin@test.com", "password": "test123456"
})
status, login_data = req("POST", "/api/v1/auth/login",
headers={"Content-Type": "application/x-www-form-urlencoded"},
raw_body=urllib.parse.urlencode({"username": "tooladmin", "password": "test123456"}).encode())
token = login_data["access_token"]
auth = {"Authorization": f"Bearer {token}"}
print(f"Logged in, token: {token[:20]}...")
# 2. 创建工具定义
tools = [
# ── 文本处理 ──
{
"name": "html_to_markdown",
"description": "将HTML转换为Markdown格式支持表格、链接、图片转换",
"category": "数据处理",
"implementation_type": "code",
"is_public": True,
"function_schema": {"name": "html_to_markdown", "description": "将HTML转换为Markdown格式", "parameters": {"type": "object", "properties": {"html": {"type": "string", "description": "HTML内容"}}, "required": ["html"]}},
"implementation_config": {"source": """def run(args):
import re
html = args.get("html", "")
html = re.sub(r"<script[^>]*>.*?</script>", "", html, flags=re.DOTALL)
html = re.sub(r"<style[^>]*>.*?</style>", "", html, flags=re.DOTALL)
for i in range(6, 0, -1):
html = re.sub(rf"<h{i}[^>]*>(.*?)</h{i}>", "#" * i + r" \\1\\n", html, flags=re.DOTALL)
html = re.sub(r"<strong>(.*?)</strong>", r"**\\1**", html)
html = re.sub(r"<em>(.*?)</em>", r"*\\1*", html)
html = re.sub(r'<a[^>]*href="(.+?)"[^>]*>(.*?)</a>', r"[\\2](\\1)", html)
html = re.sub(r'<img[^>]*src="(.+?)"[^>]*alt="(.+?)"[^>]*>', r"![\\2](\\1)", html)
html = re.sub(r"<p[^>]*>(.*?)</p>", r"\\1\\n\\n", html, flags=re.DOTALL)
html = re.sub(r"<br\\s*/?>", "\\n", html)
html = re.sub(r"<li>(.*?)</li>", r"- \\1\\n", html, flags=re.DOTALL)
html = re.sub(r"<[^>]+>", "", html)
html = re.sub(r"\\n{3,}", "\\n\\n", html)
return html.strip()"""},
},
{
"name": "extract_info",
"description": "从文本中提取邮箱地址、电话号码、URL链接等信息",
"category": "数据处理",
"implementation_type": "code",
"is_public": True,
"function_schema": {"name": "extract_info", "description": "提取文本中的结构化信息", "parameters": {"type": "object", "properties": {"text": {"type": "string", "description": "要分析的文本"}}, "required": ["text"]}},
"implementation_config": {"source": """def run(args):
import re
text = args.get("text", "")
emails = list(set(re.findall(r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}", text)))
phones = list(set(re.findall(r"1[3-9]\\d{9}", text)))
urls = list(set(re.findall(r"https?://[^\\s,<>\"']+", text)))
ip_addrs = list(set(re.findall(r"\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}", text)))
return {"emails": emails, "phones": phones, "urls": urls, "ip_addresses": ip_addrs}"""},
},
{
"name": "text_summarize",
"description": "对长文本进行智能摘要,可指定摘要长度",
"category": "数据处理",
"implementation_type": "code",
"is_public": True,
"function_schema": {"name": "text_summarize", "description": "文本摘要", "parameters": {"type": "object", "properties": {"text": {"type": "string", "description": "要摘要的文本"}, "max_sentences": {"type": "integer", "description": "最大句子数", "default": 3}}, "required": ["text"]}},
"implementation_config": {"source": """def run(args):
import re
text = args.get("text", "")
max_sent = int(args.get("max_sentences", 3))
sentences = re.split(r"[.!?。!?]+", text)
sentences = [s.strip() for s in sentences if len(s.strip()) > 5]
if not sentences:
return {"summary": text[:200], "original_length": len(text)}
# Score sentences by length (prefer medium-length sentences)
scored = []
avg_len = sum(len(s) for s in sentences) / max(len(sentences), 1)
for s in sentences:
score = 1.0 - abs(len(s) - avg_len) / max(avg_len, 1)
scored.append((score, s))
scored.sort(key=lambda x: -x[0])
summary = ".".join(s for _, s in scored[:max_sent]) + "."
return {"summary": summary, "original_length": len(text), "summary_length": len(summary)}"""},
},
{
"name": "json_tool",
"description": "JSON格式化、压缩、验证和转换支持JSON与YAML/CSV互转",
"category": "数据处理",
"implementation_type": "code",
"is_public": True,
"function_schema": {"name": "json_tool", "description": "JSON处理工具", "parameters": {"type": "object", "properties": {"input": {"type": "string", "description": "输入内容"}, "action": {"type": "string", "enum": ["format", "compress", "validate", "to_csv", "to_yaml"], "description": "操作类型"}, "indent": {"type": "integer", "description": "缩进空格数", "default": 2}}, "required": ["input", "action"]}},
"implementation_config": {"source": """def run(args):
import json
inp = args.get("input", "")
action = args.get("action", "format")
indent = int(args.get("indent", 2))
try:
data = json.loads(inp) if isinstance(inp, str) else inp
except json.JSONDecodeError as e:
return {"error": f"JSON解析失败: {e}"}
if action == "validate":
return {"valid": True, "type": str(type(data).__name__), "keys": list(data.keys()) if isinstance(data, dict) else None}
elif action == "compress":
return {"result": json.dumps(data, ensure_ascii=False, separators=(",", ":"))}
elif action == "to_csv":
if isinstance(data, list) and len(data) > 0 and isinstance(data[0], dict):
keys = list(data[0].keys())
lines = [",".join(keys)]
for row in data:
lines.append(",".join(str(row.get(k, "")) for k in keys))
return {"result": "\\n".join(lines)}
return {"error": "CSV转换仅支持对象数组"}
else:
return {"result": json.dumps(data, ensure_ascii=False, indent=indent)}"""},
},
# ── 网络/HTTP ──
{
"name": "weather_query",
"description": "查询指定城市的实时天气信息,包含温度、湿度、风速和天气状况",
"category": "网络请求",
"implementation_type": "http",
"is_public": True,
"function_schema": {"name": "weather_query", "description": "查询城市天气", "parameters": {"type": "object", "properties": {"city": {"type": "string", "description": "城市名称,如北京、上海"}}, "required": ["city"]}},
"implementation_config": {"url": "https://wttr.in/{city}?format=j1", "method": "GET", "timeout": 15, "headers": {}},
},
{
"name": "shorten_url",
"description": "生成短链接方便分享长URL",
"category": "网络请求",
"implementation_type": "http",
"is_public": True,
"function_schema": {"name": "shorten_url", "description": "生成短链接", "parameters": {"type": "object", "properties": {"url": {"type": "string", "description": "要缩短的长URL"}}, "required": ["url"]}},
"implementation_config": {"url": "https://is.gd/create.php?format=simple&url={url}", "method": "GET", "timeout": 10, "headers": {}},
},
{
"name": "check_website",
"description": "检测网站是否可访问返回HTTP状态码和响应时间",
"category": "网络请求",
"implementation_type": "code",
"is_public": True,
"function_schema": {"name": "check_website", "description": "检测网站状态", "parameters": {"type": "object", "properties": {"url": {"type": "string", "description": "网站URL"}, "timeout": {"type": "integer", "description": "超时秒数", "default": 10}}, "required": ["url"]}},
"implementation_config": {"source": """def run(args):
import urllib.request, time
url = args.get("url", "")
timeout = int(args.get("timeout", 10))
if not url.startswith("http"):
url = "https://" + url
start = time.time()
try:
resp = urllib.request.urlopen(url, timeout=timeout)
elapsed = round((time.time() - start) * 1000)
content = resp.read()
return {"status_code": resp.status, "elapsed_ms": elapsed, "size_bytes": len(content), "reachable": True}
except Exception as e:
elapsed = round((time.time() - start) * 1000)
return {"status_code": None, "elapsed_ms": elapsed, "error": str(e), "reachable": False}"""},
},
# ── 文件/格式 ──
{
"name": "base64_codec",
"description": "Base64编解码支持文本和文件内容",
"category": "文件操作",
"implementation_type": "code",
"is_public": True,
"function_schema": {"name": "base64_codec", "description": "Base64编解码", "parameters": {"type": "object", "properties": {"input": {"type": "string", "description": "输入内容"}, "action": {"type": "string", "enum": ["encode", "decode"], "description": "操作类型", "default": "encode"}}, "required": ["input"]}},
"implementation_config": {"source": """def run(args):
import base64
inp = args.get("input", "")
action = args.get("action", "encode")
try:
if action == "encode":
result = base64.b64encode(inp.encode()).decode()
else:
result = base64.b64decode(inp).decode("utf-8", errors="replace")
return {"result": result, "length": len(result)}
except Exception as e:
return {"error": str(e)}"""},
},
{
"name": "csv_processor",
"description": "CSV文件处理解析、筛选、排序、统计聚合",
"category": "数据处理",
"implementation_type": "code",
"is_public": True,
"function_schema": {"name": "csv_processor", "description": "CSV处理工具", "parameters": {"type": "object", "properties": {"csv_text": {"type": "string", "description": "CSV文本内容"}, "operation": {"type": "string", "enum": ["parse", "stats", "filter", "sort"], "description": "操作类型"}, "column": {"type": "string", "description": "列名(用于筛选/排序)"}, "keyword": {"type": "string", "description": "筛选关键词"}}, "required": ["csv_text", "operation"]}},
"implementation_config": {"source": """def run(args):
import csv, io, json
text = args.get("csv_text", "")
op = args.get("operation", "parse")
col = args.get("column")
keyword = args.get("keyword", "").lower()
reader = csv.DictReader(io.StringIO(text))
rows = list(reader)
if not rows:
return {"error": "空CSV或格式错误", "rows": 0}
headers = list(rows[0].keys())
if op == "parse":
return {"headers": headers, "rows": len(rows), "data": rows[:50], "total_rows": len(rows)}
elif op == "stats":
stats = {}
for h in headers:
vals = [r[h] for r in rows if r[h]]
nums = []
for v in vals:
try: nums.append(float(v))
except: pass
if nums:
stats[h] = {"min": min(nums), "max": max(nums), "avg": round(sum(nums)/len(nums), 2), "count": len(nums)}
else:
stats[h] = {"unique_values": len(set(vals)), "count": len(vals)}
return {"stats": stats, "total_rows": len(rows)}
elif op == "filter" and col:
filtered = [r for r in rows if keyword in r.get(col, "").lower()]
return {"filtered_rows": len(filtered), "data": filtered[:50], "total_rows": len(rows)}
elif op == "sort" and col:
sorted_rows = sorted(rows, key=lambda r: (r.get(col, "") or ""))
return {"sorted": True, "column": col, "data": sorted_rows[:50], "total_rows": len(rows)}
return {"error": "未知操作"}"""},
},
# ── AI/智能 ──
{
"name": "sentiment_analysis",
"description": "简单的情感分析,判断文本的情感倾向(正面/负面/中性无需调用LLM",
"category": "AI服务",
"implementation_type": "code",
"is_public": True,
"function_schema": {"name": "sentiment_analysis", "description": "文本情感分析", "parameters": {"type": "object", "properties": {"text": {"type": "string", "description": "要分析的文本"}}, "required": ["text"]}},
"implementation_config": {"source": """def run(args):
text = args.get("text", "")
positive_words = ["", "", "优秀", "喜欢", "满意", "推荐", "", "开心", " beautiful", "good", "great", "excellent", "love", "amazing", "wonderful", "fantastic", "positive"]
negative_words = ["", "", "糟糕", "讨厌", "失望", "差劲", "恶心", "生气", "bad", "terrible", "awful", "hate", "horrible", "poor", "worst", "negative", "ugly"]
text_lower = text.lower()
pos_count = sum(1 for w in positive_words if w in text_lower)
neg_count = sum(1 for w in negative_words if w in text_lower)
total = pos_count + neg_count
if total == 0:
sentiment, score = "中性", 0.0
else:
score = (pos_count - neg_count) / total
if score > 0.2: sentiment = "正面"
elif score < -0.2: sentiment = "负面"
else: sentiment = "中性"
return {"sentiment": sentiment, "score": round(score, 3), "positive_hits": pos_count, "negative_hits": neg_count, "text_length": len(text)}"""},
},
{
"name": "uuid_generator",
"description": "生成UUID/GUID支持单次和批量生成",
"category": "自定义",
"implementation_type": "code",
"is_public": True,
"function_schema": {"name": "uuid_generator", "description": "UUID生成器", "parameters": {"type": "object", "properties": {"count": {"type": "integer", "description": "生成数量", "default": 1}}, "required": []}},
"implementation_config": {"source": """def run(args):
import uuid
count = min(int(args.get("count", 1)), 100)
uuids = [str(uuid.uuid4()) for _ in range(count)]
return {"uuids": uuids, "count": count}"""},
},
{
"name": "timestamp",
"description": "时间戳转换工具:获取当前时间戳,或转换时间戳为可读日期",
"category": "自定义",
"implementation_type": "code",
"is_public": True,
"function_schema": {"name": "timestamp", "description": "时间戳工具", "parameters": {"type": "object", "properties": {"action": {"type": "string", "enum": ["now", "to_datetime", "to_timestamp"], "description": "操作类型"}, "value": {"type": "string", "description": "输入值(转换时使用)"}, "format": {"type": "string", "description": "日期格式,如 %Y-%m-%d %H:%M:%S", "default": "%Y-%m-%d %H:%M:%S"}}, "required": ["action"]}},
"implementation_config": {"source": """def run(args):
from datetime import datetime
action = args.get("action", "now")
fmt = args.get("format", "%Y-%m-%d %H:%M:%S")
if action == "now":
now = datetime.now()
return {"timestamp": int(now.timestamp()), "datetime": now.strftime(fmt), "utc": datetime.utcnow().strftime(fmt)}
elif action == "to_datetime":
val = args.get("value", "")
try:
ts = int(val)
dt = datetime.fromtimestamp(ts)
return {"timestamp": ts, "datetime": dt.strftime(fmt)}
except: return {"error": f"无效时间戳: {val}"}
elif action == "to_timestamp":
val = args.get("value", "")
try:
dt = datetime.strptime(val, fmt)
return {"datetime": val, "timestamp": int(dt.timestamp())}
except: return {"error": f"无法解析日期: {val},格式: {fmt}"}
return {"error": "未知操作"}"""},
},
]
# 3. 批量创建
success = 0
failed = 0
for t in tools:
status, data = req("POST", "/api/v1/tools", headers=auth, body=t)
if status == 201:
print(f"{t['name']}")
success += 1
else:
print(f"{t['name']}: {data.get('detail', data)}")
failed += 1
print(f"\n创建完成: {success} 成功, {failed} 失败")