Files
aiagent/backend/scripts/e2e_zhini12_bbbb_md.py

240 lines
9.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
E2E知你客服12号 — 用户话术「在 D:\\aaa\\aiagent\\user_data 下创建 bbbb.md」
并校验:/health 内置工具、落盘路径必须在工作区内user_data 子路径)、可选 node_results 中的 file_write 痕迹。
用法(在 backend 目录):
.\\venv\\Scripts\\python.exe scripts\\e2e_zhini12_bbbb_md.py
环境变量API_BASE、E2E_AGENT_NAME、E2E_RESTART_CELERY、E2E_LLM_TOOL_CHOICE。
注意:固定 user_data/bbbb.md 与正文/标记常量;不读取 E2E_REL_FILE、E2E_FILE_CONTENT、E2E_MARKER避免 Shell 残留误测)。
默认 E2E_LLM_TOOL_CHOICE=auto。
"""
from __future__ import annotations
import json
import os
import subprocess
import sys
import time
import uuid
from pathlib import Path
BACKEND_DIR = Path(__file__).resolve().parents[1]
REPO_ROOT = BACKEND_DIR.parent
USER_DATA_ROOT = (REPO_ROOT / "user_data").resolve()
VENV_PY = BACKEND_DIR / "venv" / "Scripts" / "python.exe"
API_BASE = os.environ.get("API_BASE", "http://127.0.0.1:8037")
AGENT_NAME = os.environ.get("E2E_AGENT_NAME", "知你客服12号")
# 固定测 bbbb.md勿用 E2E_REL_FILEPowerShell 里常残留 ccc/bbb
REL_PATH = "user_data/bbbb.md"
# 固定内容与标记(不用 E2E_FILE_CONTENT / E2E_MARKER 环境变量,避免 PowerShell 残留旧值)
FILE_CONTENT = "# bbbb\n\ne2e bbbb permission marker\n"
E2E_MARKER = "e2e bbbb permission marker"
def _restart_celery() -> None:
ps = (
"Get-CimInstance Win32_Process | "
"Where-Object { $_.CommandLine -match 'celery_app' } | "
"ForEach-Object { Stop-Process -Id $_.ProcessId -Force -ErrorAction SilentlyContinue }"
)
subprocess.run(
["powershell", "-NoProfile", "-Command", ps],
cwd=str(BACKEND_DIR),
capture_output=True,
text=True,
)
time.sleep(2)
if not VENV_PY.is_file():
print("未找到 venv Python跳过启动 Celery", file=sys.stderr)
return
kw: dict = {"cwd": str(BACKEND_DIR), "stdout": subprocess.DEVNULL, "stderr": subprocess.STDOUT}
if sys.platform == "win32":
kw["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP # type: ignore[attr-defined]
popen_env = os.environ.copy()
if os.environ.get("E2E_LLM_TOOL_CHOICE", "auto").strip().lower() not in (
"0",
"false",
"no",
"auto",
):
popen_env["LLM_TOOL_CHOICE"] = "required"
subprocess.Popen(
[
str(VENV_PY),
"-m",
"celery",
"-A",
"app.core.celery_app",
"worker",
"--loglevel=info",
"--pool=threads",
"--concurrency=8",
],
env=popen_env,
**kw,
)
print("已启动 Celery等待就绪…")
time.sleep(4)
def _health_check(client) -> bool:
r = client.get(f"{API_BASE.rstrip('/')}/health", timeout=15.0)
r.raise_for_status()
data = r.json()
print("\n--- GET /health ---")
print(json.dumps(data, ensure_ascii=False, indent=2)[:2500])
checks = data.get("checks")
if not checks:
print(
"\n[INFO] /health 无 checks 字段(可能 API 未更新);请以 Worker 日志「内置工具就绪」为准。",
file=sys.stderr,
)
return True
ok = checks.get("builtin_tools_ready") and checks.get("file_agent_core_ready")
if not ok:
print(
"\n[WARN] API 进程 builtin_tools 未完全就绪。请同步重启 API 与 Celery。",
file=sys.stderr,
)
return bool(ok)
def _path_allowed(abs_file: Path) -> tuple[bool, str]:
"""file_write 合法路径:须在仓库根下,且本次要求落在 user_data 下。"""
try:
f = abs_file.resolve()
repo = REPO_ROOT.resolve()
except OSError as e:
return False, str(e)
try:
f.relative_to(repo)
except ValueError:
return False, f"文件不在仓库根内: {f} vs root {repo}"
try:
f.relative_to(USER_DATA_ROOT)
except ValueError:
return False, f"文件不在 user_data 下: {f} vs {USER_DATA_ROOT}"
return True, ""
def _scan_file_write_in_node_results(od: dict) -> None:
nr = od.get("node_results") or {}
hits = []
for nid, payload in nr.items():
if not isinstance(payload, dict):
continue
out = payload.get("output")
s = json.dumps(out, ensure_ascii=False) if not isinstance(out, str) else out
if "file_write" in s and ("success" in s.lower() or "file_path" in s):
hits.append((nid, s[:400]))
print("\n--- node_results 中含 file_write 线索的节点 ---")
if not hits:
print("(未扫到明显 JSON可能结果只在 LLM 正文或日志中)")
for nid, frag in hits:
print(f" {nid}: {frag}...")
def main() -> int:
os.chdir(BACKEND_DIR)
sys.path.insert(0, str(BACKEND_DIR))
if os.environ.get("E2E_LLM_TOOL_CHOICE", "auto").strip().lower() not in ("0", "false", "no", "auto"):
os.environ["LLM_TOOL_CHOICE"] = "required"
if os.environ.get("E2E_RESTART_CELERY", "1").strip().lower() not in ("0", "false", "no"):
_restart_celery()
import httpx
from app.core.database import SessionLocal
from app.core.security import create_access_token
from app.models.agent import Agent
from app.models.user import User
db = SessionLocal()
try:
with httpx.Client(timeout=30.0) as hclient:
_health_check(hclient)
agent = db.query(Agent).filter(Agent.name == AGENT_NAME).first()
if not agent:
print(f"未找到「{AGENT_NAME}", file=sys.stderr)
return 1
owner = db.query(User).filter(User.id == agent.user_id).first()
user = owner or db.query(User).first()
if not user:
print("无用户", file=sys.stderr)
return 1
token = create_access_token(data={"sub": user.id, "username": user.username})
headers = {"Authorization": f"Bearer {token}"}
uid = f"bbbb12_{uuid.uuid4().hex[:10]}"
abs_win = str((REPO_ROOT / REL_PATH.replace("/", os.sep)).resolve())
_basename = os.path.basename(REL_PATH.replace("\\", "/"))
q = (
f"请在 D:\\\\aaa\\\\aiagent\\\\user_data 目录下创建 {_basename}"
f"必须通过工具 file_write 写入:优先使用 相对路径 {REL_PATH}(相对工作区根);"
f"content 为 {json.dumps(FILE_CONTENT, ensure_ascii=False)}mode 为 w。"
f"file_write 返回的 JSON 必须原样体现在你最终 reply 的可读说明里(含 success 与 file_path"
f"最后一行仍输出单行 JSONintent、reply、user_profile。"
)
print(f"\nagent={agent.id} ({AGENT_NAME}) user_id={uid}")
print(f"目标文件(解析后): {abs_win}")
print(f"权限校验: 须位于 {USER_DATA_ROOT}")
def poll(client: httpx.Client, eid: str, timeout: float = 300.0) -> dict:
t0 = time.time()
while time.time() - t0 < timeout:
r = client.get(f"/api/v1/executions/{eid}", headers=headers)
r.raise_for_status()
d = r.json()
st = d.get("status")
if st == "completed":
return d
if st == "failed":
print("failed:", d.get("error_message"), file=sys.stderr)
raise RuntimeError("执行失败")
time.sleep(1.5)
raise TimeoutError("超时")
with httpx.Client(base_url=API_BASE, timeout=300.0) as client:
r = client.post(
"/api/v1/executions",
json={"agent_id": str(agent.id), "input_data": {"query": q, "user_id": uid}},
headers=headers,
)
if r.status_code >= 400:
print(r.text, file=sys.stderr)
r.raise_for_status()
eid = r.json()["id"]
print(f"\nexecution={eid}")
out = poll(client, eid)
od = out.get("output_data") or {}
result = od.get("result", od)
print("\n--- API output_data.result (截断 1500 字符) ---\n")
print(str(result)[:1500])
_scan_file_write_in_node_results(od)
abs_file = Path(abs_win).resolve()
allowed, reason = _path_allowed(abs_file)
if not allowed:
print(f"\n[FAIL] 路径权限校验: {reason}", file=sys.stderr)
return 3
if not abs_file.is_file():
print(f"\n[FAIL] 磁盘未找到: {abs_file}", file=sys.stderr)
print("若 completed可能未触发 file_write查 Celery「执行工具 file_write」。", file=sys.stderr)
return 2
body = abs_file.read_text(encoding="utf-8", errors="replace")
print(f"\n[OK] 文件存在且路径合法: {abs_file}\n--- 内容 ---\n{body}\n---")
if E2E_MARKER and E2E_MARKER not in body and FILE_CONTENT.strip() and E2E_MARKER in FILE_CONTENT:
print(f"[WARN] 未在文件中发现标记「{E2E_MARKER}", file=sys.stderr)
finally:
db.close()
print("\n[OK] E2E bbbb.md + 路径权限 通过\n完成")
return 0
if __name__ == "__main__":
raise SystemExit(main())