""" 测试“学生作业管理助手”最小对话链路: 1) 登录 2) 按名称查找 Agent 3) 创建执行并发送“你好” 4) 轮询直到完成,打印助手回复 示例: python scripts/test_homework_agent_hello.py --password 123456 """ from __future__ import annotations import argparse import sys import time from typing import Any, Dict, List, Optional import requests def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="测试学生作业管理助手发送“你好”") parser.add_argument("--base-url", default="http://127.0.0.1:8037", help="后端地址") parser.add_argument("--username", default="admin", help="登录用户名") parser.add_argument("--password", default="123456", help="登录密码") parser.add_argument("--agent-name", default="学生作业管理助手", help="目标 Agent 名称") parser.add_argument("--message", default="你好", help="发送内容") parser.add_argument("--timeout-seconds", type=int, default=90, help="轮询超时秒数") parser.add_argument("--poll-interval", type=float, default=1.5, help="轮询间隔秒") parser.add_argument("--request-timeout", type=int, default=30, help="单次HTTP请求超时秒数") return parser.parse_args() def login(base_url: str, username: str, password: str, timeout: int = 20) -> str: url = f"{base_url}/api/v1/auth/login" resp = requests.post( url, data={"username": username, "password": password}, headers={"Content-Type": "application/x-www-form-urlencoded"}, timeout=timeout, ) resp.raise_for_status() data = resp.json() token = data.get("access_token") if not token: raise RuntimeError(f"登录成功但未返回 access_token: {data}") return token def pick_agent_id(base_url: str, headers: Dict[str, str], agent_name: str, timeout: int = 20) -> str: url = f"{base_url}/api/v1/agents" resp = requests.get(url, headers=headers, params={"search": agent_name, "limit": 100}, timeout=timeout) resp.raise_for_status() agents: List[Dict[str, Any]] = resp.json() or [] exact = [a for a in agents if (a.get("name") or "").strip() == agent_name] target = exact[0] if exact else (agents[0] if agents else None) if not target: raise RuntimeError(f"未找到 Agent: {agent_name}") return str(target["id"]) def create_execution( base_url: str, headers: Dict[str, str], agent_id: str, message: str, timeout: int = 30 ) -> Dict[str, Any]: url = f"{base_url}/api/v1/executions" payload = { "agent_id": agent_id, "input_data": { "USER_INPUT": message, "query": message, "user_id": f"preview_{agent_id}", "attachments": [], }, } resp = requests.post(url, headers=headers, json=payload, timeout=timeout) resp.raise_for_status() return resp.json() def extract_reply(exec_data: Dict[str, Any]) -> str: output_data = exec_data.get("output_data") if not output_data: return "" if isinstance(output_data, str): return output_data if isinstance(output_data, dict): for key in ("result", "output", "response", "text"): value = output_data.get(key) if value is not None: return value if isinstance(value, str) else str(value) return str(output_data) return str(output_data) def get_execution(base_url: str, headers: Dict[str, str], execution_id: str, timeout: int = 20) -> Dict[str, Any]: url = f"{base_url}/api/v1/executions/{execution_id}" resp = requests.get(url, headers=headers, timeout=timeout) resp.raise_for_status() return resp.json() def main() -> int: args = parse_args() base_url = args.base_url.rstrip("/") try: token = login(base_url, args.username, args.password, timeout=args.request_timeout) headers = {"Authorization": f"Bearer {token}"} print(f"[OK] 登录成功: {args.username}") agent_id = pick_agent_id(base_url, headers, args.agent_name, timeout=args.request_timeout) print(f"[OK] 找到 Agent: {args.agent_name} ({agent_id})") created = create_execution( base_url, headers, agent_id, args.message, timeout=args.request_timeout ) execution_id = str(created["id"]) print(f"[OK] 已发送消息: {args.message}") print(f"[INFO] execution_id: {execution_id}") deadline = time.time() + max(1, args.timeout_seconds) while time.time() < deadline: detail = get_execution(base_url, headers, execution_id, timeout=args.request_timeout) status = detail.get("status") if status in ("completed", "failed", "cancelled", "awaiting_approval"): print(f"[INFO] 最终状态: {status}") if status == "completed": reply = extract_reply(detail) print("[AGENT_REPLY]") print(reply or "(空回复)") return 0 print(detail.get("error_message") or "(无错误信息)") return 2 print(f"[POLL] status={status}") time.sleep(max(0.2, args.poll_interval)) print("[TIMEOUT] 轮询超时,执行尚未完成") return 3 except requests.HTTPError as e: body = "" if e.response is not None: try: body = e.response.text[:500] except Exception: body = "<无法读取响应内容>" print(f"[HTTP_ERROR] {e} {body}") return 4 except Exception as e: # noqa: BLE001 print(f"[ERROR] {e}") return 5 if __name__ == "__main__": sys.exit(main())