Files
aiagent/backend/scripts/e2e_zhini12_bbb_md.py

185 lines
7.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
通过「知你客服12号」Agent 实测:在 user_data 下创建 bbb.md对应 D:\\aaa\\aiagent\\user_data\\bbb.md
依赖API(默认 8037)、Redis、Celery、LLM引擎需已修复 End 节点优先于 vector-upsert否则界面 result 可能仍不对,但 file_write 可能已执行)。
用法:
cd backend
.\\venv\\Scripts\\python.exe scripts\\e2e_zhini12_bbb_md.py
环境变量:
API_BASE 默认 http://127.0.0.1:8037
E2E_AGENT_NAME 默认 知你客服12号
E2E_REL_FILE 默认 user_data/bbb.md
E2E_FILE_CONTENT 写入内容,默认带标记行便于校验
E2E_RESTART_CELERY 默认 1设为 0 跳过重启 Worker
E2E_USE_ABSPATH_MSG 设为 1 时在用户话术中额外要求使用绝对路径 D:\\aaa\\aiagent\\user_data\\bbb.md
"""
from __future__ import annotations
import json
import os
import subprocess
import sys
import time
import uuid
from pathlib import Path
BACKEND_DIR = Path(__file__).resolve().parents[1]
VENV_PY = BACKEND_DIR / "venv" / "Scripts" / "python.exe"
API_BASE = os.environ.get("API_BASE", "http://127.0.0.1:8037")
AGENT_NAME = os.environ.get("E2E_AGENT_NAME", "知你客服12号")
REL_PATH = os.environ.get("E2E_REL_FILE", "user_data/bbb.md")
FILE_CONTENT = os.environ.get(
"E2E_FILE_CONTENT",
"# bbb\n\nzhini12 e2e bbb marker\n",
)
def _restart_celery() -> None:
ps = (
"Get-CimInstance Win32_Process | "
"Where-Object { $_.CommandLine -match 'celery_app' } | "
"ForEach-Object { Stop-Process -Id $_.ProcessId -Force -ErrorAction SilentlyContinue }"
)
subprocess.run(
["powershell", "-NoProfile", "-Command", ps],
cwd=str(BACKEND_DIR),
capture_output=True,
text=True,
)
time.sleep(2)
if not VENV_PY.is_file():
print("未找到 venv Python跳过启动 Celery", file=sys.stderr)
return
kw: dict = {"cwd": str(BACKEND_DIR), "stdout": subprocess.DEVNULL, "stderr": subprocess.STDOUT}
if sys.platform == "win32":
kw["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP # type: ignore[attr-defined]
popen_env = os.environ.copy()
if os.environ.get("E2E_LLM_TOOL_CHOICE", "required").strip().lower() not in (
"0",
"false",
"no",
"auto",
):
popen_env["LLM_TOOL_CHOICE"] = "required"
subprocess.Popen(
[
str(VENV_PY),
"-m",
"celery",
"-A",
"app.core.celery_app",
"worker",
"--loglevel=info",
"--pool=threads",
"--concurrency=8",
],
env=popen_env,
**kw,
)
print("已启动 Celery等待就绪…")
time.sleep(4)
def main() -> int:
os.chdir(BACKEND_DIR)
sys.path.insert(0, str(BACKEND_DIR))
# 实测 file_write部分模型在 tool_choice=auto 下不发起 tool_calls只输出伪造的 JSON 文本E2E 默认要求至少一次函数调用
if os.environ.get("E2E_LLM_TOOL_CHOICE", "required").strip().lower() not in ("0", "false", "no", "auto"):
os.environ["LLM_TOOL_CHOICE"] = "required"
if os.environ.get("E2E_RESTART_CELERY", "1").strip().lower() not in ("0", "false", "no"):
_restart_celery()
import httpx
from app.core.database import SessionLocal
from app.core.security import create_access_token
from app.models.agent import Agent
from app.models.user import User
db = SessionLocal()
try:
agent = db.query(Agent).filter(Agent.name == AGENT_NAME).first()
if not agent:
print(f"未找到「{AGENT_NAME}", file=sys.stderr)
return 1
owner = db.query(User).filter(User.id == agent.user_id).first()
user = owner or db.query(User).first()
if not user:
print("无用户", file=sys.stderr)
return 1
token = create_access_token(data={"sub": user.id, "username": user.username})
headers = {"Authorization": f"Bearer {token}"}
uid = f"bbb12_{uuid.uuid4().hex[:10]}"
abs_win = str((BACKEND_DIR.parent / REL_PATH.replace("/", os.sep)).resolve())
use_abs = os.environ.get("E2E_USE_ABSPATH_MSG", "").strip().lower() in ("1", "true", "yes")
path_hint = (
f"绝对路径 {json.dumps(abs_win, ensure_ascii=False)}"
if use_abs
else f"相对路径 {REL_PATH}(相对工作区根)"
)
_basename = os.path.basename(REL_PATH.replace("\\", "/"))
q = (
f"请在 D:\\\\aaa\\\\aiagent\\\\user_data 目录下创建 {_basename}"
f"必须通过工具 file_write 写入:优先使用 {path_hint}"
f"content 为 {json.dumps(FILE_CONTENT, ensure_ascii=False)}mode 为 w。"
f"file_write 返回的 JSON 必须原样体现在你最终 reply 的可读说明里(含 success 与 file_path"
f"最后一行仍输出单行 JSONintent、reply、user_profile。"
)
print(f"agent={agent.id} ({AGENT_NAME}) user_id={uid}\n目标文件(解析后): {abs_win}\n")
def poll(client: httpx.Client, eid: str, timeout: float = 300.0) -> dict:
t0 = time.time()
while time.time() - t0 < timeout:
r = client.get(f"/api/v1/executions/{eid}", headers=headers)
r.raise_for_status()
d = r.json()
st = d.get("status")
if st == "completed":
return d
if st == "failed":
print("failed:", d.get("error_message"), file=sys.stderr)
raise RuntimeError("执行失败")
time.sleep(1.5)
raise TimeoutError("超时")
with httpx.Client(base_url=API_BASE, timeout=300.0) as client:
r = client.post(
"/api/v1/executions",
json={"agent_id": str(agent.id), "input_data": {"query": q, "user_id": uid}},
headers=headers,
)
if r.status_code >= 400:
print(r.text, file=sys.stderr)
r.raise_for_status()
eid = r.json()["id"]
print(f"execution={eid}")
out = poll(client, eid)
od = out.get("output_data") or {}
result = od.get("result", od)
print("\n--- API output_data.result (截断 1500 字符) ---\n")
print(str(result)[:1500])
abs_file = Path(abs_win).resolve()
if not abs_file.is_file():
print(f"\n[FAIL] 磁盘未找到: {abs_file}", file=sys.stderr)
print("若 API 已 completed可能是模型未触发 file_write或路径/权限问题;可看 Celery 日志「执行工具 file_write」。", file=sys.stderr)
return 2
body = abs_file.read_text(encoding="utf-8", errors="replace")
print(f"\n[OK] 文件存在: {abs_file}\n--- 内容 ---\n{body}\n---")
marker = os.environ.get("E2E_MARKER", "zhini12 e2e bbb marker")
if marker and marker not in body and FILE_CONTENT.strip() and marker in FILE_CONTENT:
print(f"[WARN] 未在文件中发现预期标记「{marker}」,可能内容与 E2E_FILE_CONTENT 不一致", file=sys.stderr)
finally:
db.close()
print("\n完成")
return 0
if __name__ == "__main__":
raise SystemExit(main())