"""为工具市场批量创建示例工具""" import json import urllib.request import urllib.parse import ssl BASE = "http://localhost:8037" ssl_ctx = ssl.create_default_context() ssl_ctx.check_hostname = False ssl_ctx.verify_mode = ssl.CERT_NONE def req(method, path, headers=None, body=None, raw_body=None): hdrs = {"Content-Type": "application/json"} if headers: hdrs.update(headers) data = raw_body if raw_body else (json.dumps(body).encode() if body else None) r = urllib.request.Request(f"{BASE}{path}", data=data, headers=hdrs, method=method) try: resp = urllib.request.urlopen(r, context=ssl_ctx, timeout=10) return resp.status, json.loads(resp.read()) except urllib.request.HTTPError as e: return e.code, json.loads(e.read()) # 1. 注册用户并登录 _, _ = req("POST", "/api/v1/auth/register", body={ "username": "tooladmin", "email": "tooladmin@test.com", "password": "test123456" }) status, login_data = req("POST", "/api/v1/auth/login", headers={"Content-Type": "application/x-www-form-urlencoded"}, raw_body=urllib.parse.urlencode({"username": "tooladmin", "password": "test123456"}).encode()) token = login_data["access_token"] auth = {"Authorization": f"Bearer {token}"} print(f"Logged in, token: {token[:20]}...") # 2. 创建工具定义 tools = [ # ── 文本处理 ── { "name": "html_to_markdown", "description": "将HTML转换为Markdown格式,支持表格、链接、图片转换", "category": "数据处理", "implementation_type": "code", "is_public": True, "function_schema": {"name": "html_to_markdown", "description": "将HTML转换为Markdown格式", "parameters": {"type": "object", "properties": {"html": {"type": "string", "description": "HTML内容"}}, "required": ["html"]}}, "implementation_config": {"source": """def run(args): import re html = args.get("html", "") html = re.sub(r"]*>.*?", "", html, flags=re.DOTALL) html = re.sub(r"]*>.*?", "", html, flags=re.DOTALL) for i in range(6, 0, -1): html = re.sub(rf"]*>(.*?)", "#" * i + r" \\1\\n", html, flags=re.DOTALL) html = re.sub(r"(.*?)", r"**\\1**", html) html = re.sub(r"(.*?)", r"*\\1*", html) html = re.sub(r']*href="(.+?)"[^>]*>(.*?)', r"[\\2](\\1)", html) html = re.sub(r']*src="(.+?)"[^>]*alt="(.+?)"[^>]*>', r"![\\2](\\1)", html) html = re.sub(r"]*>(.*?)

", r"\\1\\n\\n", html, flags=re.DOTALL) html = re.sub(r"", "\\n", html) html = re.sub(r"
  • (.*?)
  • ", r"- \\1\\n", html, flags=re.DOTALL) html = re.sub(r"<[^>]+>", "", html) html = re.sub(r"\\n{3,}", "\\n\\n", html) return html.strip()"""}, }, { "name": "extract_info", "description": "从文本中提取邮箱地址、电话号码、URL链接等信息", "category": "数据处理", "implementation_type": "code", "is_public": True, "function_schema": {"name": "extract_info", "description": "提取文本中的结构化信息", "parameters": {"type": "object", "properties": {"text": {"type": "string", "description": "要分析的文本"}}, "required": ["text"]}}, "implementation_config": {"source": """def run(args): import re text = args.get("text", "") emails = list(set(re.findall(r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}", text))) phones = list(set(re.findall(r"1[3-9]\\d{9}", text))) urls = list(set(re.findall(r"https?://[^\\s,<>\"']+", text))) ip_addrs = list(set(re.findall(r"\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}", text))) return {"emails": emails, "phones": phones, "urls": urls, "ip_addresses": ip_addrs}"""}, }, { "name": "text_summarize", "description": "对长文本进行智能摘要,可指定摘要长度", "category": "数据处理", "implementation_type": "code", "is_public": True, "function_schema": {"name": "text_summarize", "description": "文本摘要", "parameters": {"type": "object", "properties": {"text": {"type": "string", "description": "要摘要的文本"}, "max_sentences": {"type": "integer", "description": "最大句子数", "default": 3}}, "required": ["text"]}}, "implementation_config": {"source": """def run(args): import re text = args.get("text", "") max_sent = int(args.get("max_sentences", 3)) sentences = re.split(r"[.!?。!?]+", text) sentences = [s.strip() for s in sentences if len(s.strip()) > 5] if not sentences: return {"summary": text[:200], "original_length": len(text)} # Score sentences by length (prefer medium-length sentences) scored = [] avg_len = sum(len(s) for s in sentences) / max(len(sentences), 1) for s in sentences: score = 1.0 - abs(len(s) - avg_len) / max(avg_len, 1) scored.append((score, s)) scored.sort(key=lambda x: -x[0]) summary = ".".join(s for _, s in scored[:max_sent]) + "." return {"summary": summary, "original_length": len(text), "summary_length": len(summary)}"""}, }, { "name": "json_tool", "description": "JSON格式化、压缩、验证和转换,支持JSON与YAML/CSV互转", "category": "数据处理", "implementation_type": "code", "is_public": True, "function_schema": {"name": "json_tool", "description": "JSON处理工具", "parameters": {"type": "object", "properties": {"input": {"type": "string", "description": "输入内容"}, "action": {"type": "string", "enum": ["format", "compress", "validate", "to_csv", "to_yaml"], "description": "操作类型"}, "indent": {"type": "integer", "description": "缩进空格数", "default": 2}}, "required": ["input", "action"]}}, "implementation_config": {"source": """def run(args): import json inp = args.get("input", "") action = args.get("action", "format") indent = int(args.get("indent", 2)) try: data = json.loads(inp) if isinstance(inp, str) else inp except json.JSONDecodeError as e: return {"error": f"JSON解析失败: {e}"} if action == "validate": return {"valid": True, "type": str(type(data).__name__), "keys": list(data.keys()) if isinstance(data, dict) else None} elif action == "compress": return {"result": json.dumps(data, ensure_ascii=False, separators=(",", ":"))} elif action == "to_csv": if isinstance(data, list) and len(data) > 0 and isinstance(data[0], dict): keys = list(data[0].keys()) lines = [",".join(keys)] for row in data: lines.append(",".join(str(row.get(k, "")) for k in keys)) return {"result": "\\n".join(lines)} return {"error": "CSV转换仅支持对象数组"} else: return {"result": json.dumps(data, ensure_ascii=False, indent=indent)}"""}, }, # ── 网络/HTTP ── { "name": "weather_query", "description": "查询指定城市的实时天气信息,包含温度、湿度、风速和天气状况", "category": "网络请求", "implementation_type": "http", "is_public": True, "function_schema": {"name": "weather_query", "description": "查询城市天气", "parameters": {"type": "object", "properties": {"city": {"type": "string", "description": "城市名称,如北京、上海"}}, "required": ["city"]}}, "implementation_config": {"url": "https://wttr.in/{city}?format=j1", "method": "GET", "timeout": 15, "headers": {}}, }, { "name": "shorten_url", "description": "生成短链接,方便分享长URL", "category": "网络请求", "implementation_type": "http", "is_public": True, "function_schema": {"name": "shorten_url", "description": "生成短链接", "parameters": {"type": "object", "properties": {"url": {"type": "string", "description": "要缩短的长URL"}}, "required": ["url"]}}, "implementation_config": {"url": "https://is.gd/create.php?format=simple&url={url}", "method": "GET", "timeout": 10, "headers": {}}, }, { "name": "check_website", "description": "检测网站是否可访问,返回HTTP状态码和响应时间", "category": "网络请求", "implementation_type": "code", "is_public": True, "function_schema": {"name": "check_website", "description": "检测网站状态", "parameters": {"type": "object", "properties": {"url": {"type": "string", "description": "网站URL"}, "timeout": {"type": "integer", "description": "超时秒数", "default": 10}}, "required": ["url"]}}, "implementation_config": {"source": """def run(args): import urllib.request, time url = args.get("url", "") timeout = int(args.get("timeout", 10)) if not url.startswith("http"): url = "https://" + url start = time.time() try: resp = urllib.request.urlopen(url, timeout=timeout) elapsed = round((time.time() - start) * 1000) content = resp.read() return {"status_code": resp.status, "elapsed_ms": elapsed, "size_bytes": len(content), "reachable": True} except Exception as e: elapsed = round((time.time() - start) * 1000) return {"status_code": None, "elapsed_ms": elapsed, "error": str(e), "reachable": False}"""}, }, # ── 文件/格式 ── { "name": "base64_codec", "description": "Base64编解码,支持文本和文件内容", "category": "文件操作", "implementation_type": "code", "is_public": True, "function_schema": {"name": "base64_codec", "description": "Base64编解码", "parameters": {"type": "object", "properties": {"input": {"type": "string", "description": "输入内容"}, "action": {"type": "string", "enum": ["encode", "decode"], "description": "操作类型", "default": "encode"}}, "required": ["input"]}}, "implementation_config": {"source": """def run(args): import base64 inp = args.get("input", "") action = args.get("action", "encode") try: if action == "encode": result = base64.b64encode(inp.encode()).decode() else: result = base64.b64decode(inp).decode("utf-8", errors="replace") return {"result": result, "length": len(result)} except Exception as e: return {"error": str(e)}"""}, }, { "name": "csv_processor", "description": "CSV文件处理:解析、筛选、排序、统计聚合", "category": "数据处理", "implementation_type": "code", "is_public": True, "function_schema": {"name": "csv_processor", "description": "CSV处理工具", "parameters": {"type": "object", "properties": {"csv_text": {"type": "string", "description": "CSV文本内容"}, "operation": {"type": "string", "enum": ["parse", "stats", "filter", "sort"], "description": "操作类型"}, "column": {"type": "string", "description": "列名(用于筛选/排序)"}, "keyword": {"type": "string", "description": "筛选关键词"}}, "required": ["csv_text", "operation"]}}, "implementation_config": {"source": """def run(args): import csv, io, json text = args.get("csv_text", "") op = args.get("operation", "parse") col = args.get("column") keyword = args.get("keyword", "").lower() reader = csv.DictReader(io.StringIO(text)) rows = list(reader) if not rows: return {"error": "空CSV或格式错误", "rows": 0} headers = list(rows[0].keys()) if op == "parse": return {"headers": headers, "rows": len(rows), "data": rows[:50], "total_rows": len(rows)} elif op == "stats": stats = {} for h in headers: vals = [r[h] for r in rows if r[h]] nums = [] for v in vals: try: nums.append(float(v)) except: pass if nums: stats[h] = {"min": min(nums), "max": max(nums), "avg": round(sum(nums)/len(nums), 2), "count": len(nums)} else: stats[h] = {"unique_values": len(set(vals)), "count": len(vals)} return {"stats": stats, "total_rows": len(rows)} elif op == "filter" and col: filtered = [r for r in rows if keyword in r.get(col, "").lower()] return {"filtered_rows": len(filtered), "data": filtered[:50], "total_rows": len(rows)} elif op == "sort" and col: sorted_rows = sorted(rows, key=lambda r: (r.get(col, "") or "")) return {"sorted": True, "column": col, "data": sorted_rows[:50], "total_rows": len(rows)} return {"error": "未知操作"}"""}, }, # ── AI/智能 ── { "name": "sentiment_analysis", "description": "简单的情感分析,判断文本的情感倾向(正面/负面/中性),无需调用LLM", "category": "AI服务", "implementation_type": "code", "is_public": True, "function_schema": {"name": "sentiment_analysis", "description": "文本情感分析", "parameters": {"type": "object", "properties": {"text": {"type": "string", "description": "要分析的文本"}}, "required": ["text"]}}, "implementation_config": {"source": """def run(args): text = args.get("text", "") positive_words = ["好", "棒", "优秀", "喜欢", "满意", "推荐", "赞", "开心", " beautiful", "good", "great", "excellent", "love", "amazing", "wonderful", "fantastic", "positive"] negative_words = ["差", "烂", "糟糕", "讨厌", "失望", "差劲", "恶心", "生气", "bad", "terrible", "awful", "hate", "horrible", "poor", "worst", "negative", "ugly"] text_lower = text.lower() pos_count = sum(1 for w in positive_words if w in text_lower) neg_count = sum(1 for w in negative_words if w in text_lower) total = pos_count + neg_count if total == 0: sentiment, score = "中性", 0.0 else: score = (pos_count - neg_count) / total if score > 0.2: sentiment = "正面" elif score < -0.2: sentiment = "负面" else: sentiment = "中性" return {"sentiment": sentiment, "score": round(score, 3), "positive_hits": pos_count, "negative_hits": neg_count, "text_length": len(text)}"""}, }, { "name": "uuid_generator", "description": "生成UUID/GUID,支持单次和批量生成", "category": "自定义", "implementation_type": "code", "is_public": True, "function_schema": {"name": "uuid_generator", "description": "UUID生成器", "parameters": {"type": "object", "properties": {"count": {"type": "integer", "description": "生成数量", "default": 1}}, "required": []}}, "implementation_config": {"source": """def run(args): import uuid count = min(int(args.get("count", 1)), 100) uuids = [str(uuid.uuid4()) for _ in range(count)] return {"uuids": uuids, "count": count}"""}, }, { "name": "timestamp", "description": "时间戳转换工具:获取当前时间戳,或转换时间戳为可读日期", "category": "自定义", "implementation_type": "code", "is_public": True, "function_schema": {"name": "timestamp", "description": "时间戳工具", "parameters": {"type": "object", "properties": {"action": {"type": "string", "enum": ["now", "to_datetime", "to_timestamp"], "description": "操作类型"}, "value": {"type": "string", "description": "输入值(转换时使用)"}, "format": {"type": "string", "description": "日期格式,如 %Y-%m-%d %H:%M:%S", "default": "%Y-%m-%d %H:%M:%S"}}, "required": ["action"]}}, "implementation_config": {"source": """def run(args): from datetime import datetime action = args.get("action", "now") fmt = args.get("format", "%Y-%m-%d %H:%M:%S") if action == "now": now = datetime.now() return {"timestamp": int(now.timestamp()), "datetime": now.strftime(fmt), "utc": datetime.utcnow().strftime(fmt)} elif action == "to_datetime": val = args.get("value", "") try: ts = int(val) dt = datetime.fromtimestamp(ts) return {"timestamp": ts, "datetime": dt.strftime(fmt)} except: return {"error": f"无效时间戳: {val}"} elif action == "to_timestamp": val = args.get("value", "") try: dt = datetime.strptime(val, fmt) return {"datetime": val, "timestamp": int(dt.timestamp())} except: return {"error": f"无法解析日期: {val},格式: {fmt}"} return {"error": "未知操作"}"""}, }, ] # 3. 批量创建 success = 0 failed = 0 for t in tools: status, data = req("POST", "/api/v1/tools", headers=auth, body=t) if status == 201: print(f" ✓ {t['name']}") success += 1 else: print(f" ✗ {t['name']}: {data.get('detail', data)}") failed += 1 print(f"\n创建完成: {success} 成功, {failed} 失败")