Files
aiagent/backend/scripts/e2e_zhini7_two_rounds.py

172 lines
5.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
重启 Celery WorkerWindows并对「知你客服7号」做两轮 API 测试:
1我的名字叫小七 2我叫什么名字
需:本机 API 已监听(默认 8037、Redis、LLM 配置可用。
"""
from __future__ import annotations
import json
import os
import subprocess
import sys
import time
import uuid
from pathlib import Path
BACKEND_DIR = Path(__file__).resolve().parents[1]
VENV_PY = BACKEND_DIR / "venv" / "Scripts" / "python.exe"
API_BASE = os.environ.get("API_BASE", "http://127.0.0.1:8037")
def _restart_celery() -> None:
ps = (
"Get-CimInstance Win32_Process | "
"Where-Object { $_.CommandLine -match 'celery_app' } | "
"ForEach-Object { Stop-Process -Id $_.ProcessId -Force -ErrorAction SilentlyContinue }"
)
subprocess.run(
["powershell", "-NoProfile", "-Command", ps],
cwd=str(BACKEND_DIR),
capture_output=True,
text=True,
)
time.sleep(2)
if not VENV_PY.is_file():
print("未找到 venv Python跳过启动 Celery", file=sys.stderr)
return
popen_kw: dict = {
"cwd": str(BACKEND_DIR),
"stdout": subprocess.DEVNULL,
"stderr": subprocess.STDOUT,
}
if sys.platform == "win32":
popen_kw["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP # type: ignore[attr-defined]
subprocess.Popen(
[
str(VENV_PY),
"-m",
"celery",
"-A",
"app.core.celery_app",
"worker",
"--loglevel=info",
"--pool=threads",
"--concurrency=8",
],
**popen_kw,
)
print("已启动新 Celery Worker线程池等待就绪…")
time.sleep(4)
def _touch_api_reload() -> None:
"""若 uvicorn 带 --reload触发重载。"""
main_py = BACKEND_DIR / "app" / "main.py"
if main_py.is_file():
main_py.touch()
print("已 touch app/main.py 以触发 API 热重载(若启用 --reload")
def main() -> int:
os.chdir(BACKEND_DIR)
sys.path.insert(0, str(BACKEND_DIR))
_restart_celery()
_touch_api_reload()
import httpx
from app.core.database import SessionLocal
from app.core.security import create_access_token
from app.models.agent import Agent
from app.models.user import User
db = SessionLocal()
try:
agent = db.query(Agent).filter(Agent.name == "知你客服7号").first()
if not agent:
print("数据库中未找到名为「知你客服7号」的 Agent", file=sys.stderr)
return 1
owner = db.query(User).filter(User.id == agent.user_id).first()
user = owner or db.query(User).first()
if not user:
print("无可用用户,无法签发 JWT", file=sys.stderr)
return 1
token = create_access_token(data={"sub": user.id, "username": user.username})
headers = {"Authorization": f"Bearer {token}"}
uid = f"e2e_xiaoqi_{uuid.uuid4().hex[:10]}"
print(f"agent_id={agent.id} owner={user.username} user_id={uid}")
print(f"请确认工作流 Cache 键为 user_memory_{{{{user_id}}}},请求中已带 user_id={uid}\n")
def poll(client: httpx.Client, execution_id: str, timeout: float = 300.0) -> dict:
t0 = time.time()
while time.time() - t0 < timeout:
r = client.get(f"/api/v1/executions/{execution_id}", headers=headers)
r.raise_for_status()
data = r.json()
st = data.get("status")
if st == "completed":
return data
if st == "failed":
print("error:", data.get("error_message"), file=sys.stderr)
raise RuntimeError("执行失败")
time.sleep(1)
raise TimeoutError("等待执行完成超时")
with httpx.Client(base_url=API_BASE, timeout=300.0) as client:
r = client.post(
"/api/v1/executions",
json={
"agent_id": str(agent.id),
"input_data": {"query": "我的名字叫小七", "user_id": uid},
},
headers=headers,
)
if r.status_code >= 400:
print(r.text, file=sys.stderr)
r.raise_for_status()
eid1 = r.json()["id"]
print("第一轮 execution_id:", eid1)
out1 = poll(client, eid1)
print("第一轮 output_data:", json.dumps(out1.get("output_data"), ensure_ascii=False)[:1200])
r = client.post(
"/api/v1/executions",
json={
"agent_id": str(agent.id),
"input_data": {"query": "我叫什么名字?", "user_id": uid},
},
headers=headers,
)
r.raise_for_status()
eid2 = r.json()["id"]
print("\n第二轮 execution_id:", eid2)
out2 = poll(client, eid2)
print("第二轮 output_data:", json.dumps(out2.get("output_data"), ensure_ascii=False)[:1200])
# Redis 键检查
try:
from app.core.config import settings
import redis as redis_lib
url = getattr(settings, "REDIS_URL", None) or "redis://localhost:6379/0"
rc = redis_lib.from_url(url, decode_responses=True)
key = f"user_memory_{uid}"
raw = rc.get(key)
print(f"\nRedis 键 {key}:", "存在" if raw else "不存在")
if raw:
try:
mem = json.loads(raw)
print("memory.user_profile:", mem.get("user_profile"))
except Exception as ex:
print("解析 Redis 值失败:", ex)
except Exception as ex:
print("Redis 检查跳过:", ex)
finally:
db.close()
return 0
if __name__ == "__main__":
raise SystemExit(main())