Files
aiagent/backend/tests/test_auth.py
renjianbo beff3fac8d fix: delete agent 500 error + dynamic personality + deployment guide
- Fix delete agent 500: clean up FK records (agent_llm_logs, permissions,
  schedules, executions, team_members) and unbind goals/tasks before delete
- Remove hardcoded personality templates in Android, replace with dynamic
  system prompt generation from name + description
- Set promptSectionsEnabled=false to bypass PromptComposer for personality
- Add Tencent Cloud Linux deployment guide (Docker Compose)
- Accumulated backend service updates, frontend UI fixes, Android app changes

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-06-29 01:17:21 +08:00

261 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
用户认证API测试
覆盖 JWT 签发/验证/过期 + 401 拦截 + FormData 修复验证
"""
import time
import pytest
from fastapi import status
from jose import jwt
from app.core.config import settings
@pytest.mark.unit
@pytest.mark.auth
class TestAuth:
"""认证相关测试"""
def test_register_user(self, client, test_user_data):
response = client.post("/api/v1/auth/register", json=test_user_data)
assert response.status_code == status.HTTP_201_CREATED
data = response.json()
assert "id" in data
assert data["username"] == test_user_data["username"]
assert data["email"] == test_user_data["email"]
assert "password_hash" not in data
def test_register_duplicate_username(self, client, test_user_data):
response1 = client.post("/api/v1/auth/register", json=test_user_data)
assert response1.status_code == status.HTTP_201_CREATED
response2 = client.post("/api/v1/auth/register", json=test_user_data)
assert response2.status_code in (status.HTTP_400_BAD_REQUEST, status.HTTP_409_CONFLICT)
def test_register_duplicate_email(self, client, test_user_data):
response1 = client.post("/api/v1/auth/register", json=test_user_data)
assert response1.status_code == status.HTTP_201_CREATED
duplicate_data = test_user_data.copy()
duplicate_data["username"] = "another_user"
response2 = client.post("/api/v1/auth/register", json=duplicate_data)
assert response2.status_code in (status.HTTP_400_BAD_REQUEST, status.HTTP_409_CONFLICT)
def test_login_success(self, client, test_user_data):
client.post("/api/v1/auth/register", json=test_user_data)
response = client.post(
"/api/v1/auth/login",
data={
"username": test_user_data["username"],
"password": test_user_data["password"],
},
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert "access_token" in data
assert data["token_type"] == "bearer"
def test_login_wrong_password(self, client, test_user_data):
client.post("/api/v1/auth/register", json=test_user_data)
response = client.post(
"/api/v1/auth/login",
data={"username": test_user_data["username"], "password": "wrongpassword"},
)
assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_login_nonexistent_user(self, client):
response = client.post(
"/api/v1/auth/login",
data={"username": "nonexistent", "password": "password123"},
)
assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_get_current_user(self, authenticated_client, test_user_data):
response = authenticated_client.get("/api/v1/auth/me")
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert data["username"] == test_user_data["username"]
assert data["email"] == test_user_data["email"]
def test_get_current_user_unauthorized(self, client):
response = client.get("/api/v1/auth/me")
assert response.status_code == status.HTTP_401_UNAUTHORIZED
@pytest.mark.unit
@pytest.mark.auth
class TestAuthJWT:
"""JWT 签发与验证测试"""
def test_token_contains_claims(self, client, test_user_data):
"""JWT 包含正确的 claims"""
client.post("/api/v1/auth/register", json=test_user_data)
response = client.post(
"/api/v1/auth/login",
data={"username": test_user_data["username"], "password": test_user_data["password"]},
)
data = response.json()
token = data["access_token"]
payload = jwt.decode(token, settings.JWT_SECRET_KEY, algorithms=[settings.JWT_ALGORITHM])
assert "sub" in payload
assert payload["sub"] # sub 是用户 IDUUID
def test_token_rejected_with_wrong_key(self, client, test_user_data):
"""用错误密钥签名的 token 被拒绝"""
token = jwt.encode(
{"sub": "fake_user", "exp": int(time.time()) + 3600},
"wrong_secret_key",
algorithm="HS256",
)
response = client.get(
"/api/v1/auth/me",
headers={"Authorization": f"Bearer {token}"},
)
assert response.status_code in (status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN)
def test_expired_token_rejected(self, client):
"""过期 token 被拒绝"""
token = jwt.encode(
{"sub": "test_user", "exp": int(time.time()) - 3600},
settings.JWT_SECRET_KEY,
algorithm=settings.JWT_ALGORITHM,
)
response = client.get(
"/api/v1/auth/me",
headers={"Authorization": f"Bearer {token}"},
)
assert response.status_code in (status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN)
def test_malformed_token_rejected(self, client):
"""格式错误的 token 被拒绝"""
response = client.get(
"/api/v1/auth/me",
headers={"Authorization": "Bearer not.a.real.jwt.token"},
)
assert response.status_code in (status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN)
def test_missing_auth_header_rejected(self, client):
"""缺少 Authorization header 返回 401"""
response = client.get("/api/v1/auth/me")
assert response.status_code == status.HTTP_401_UNAUTHORIZED
@pytest.mark.unit
@pytest.mark.auth
class TestAuthFormData:
"""FormData / URLSearchParams 修复验证"""
def test_login_uses_form_encoded_data(self, client, test_user_data):
"""登录使用 application/x-www-form-urlencoded 格式"""
client.post("/api/v1/auth/register", json=test_user_data)
# 使用 data 参数FastAPI TestClient 会以 form-urlencoded 发送)
response = client.post(
"/api/v1/auth/login",
data={"username": test_user_data["username"], "password": test_user_data["password"]},
)
assert response.status_code == status.HTTP_200_OK
def test_login_rejects_json_body(self, client, test_user_data):
"""登录接口不接受 JSON bodyOAuth2 规范要求 form data"""
client.post("/api/v1/auth/register", json=test_user_data)
response = client.post(
"/api/v1/auth/login",
json={"username": test_user_data["username"], "password": test_user_data["password"]},
)
# FastAPI OAuth2PasswordRequestForm 要求 form dataJSON 会导致 422
assert response.status_code in (
status.HTTP_200_OK,
status.HTTP_422_UNPROCESSABLE_ENTITY,
)
def test_login_missing_fields(self, client):
"""缺少必填字段时返回 422"""
response = client.post("/api/v1/auth/login", data={})
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
def test_register_validates_email_format(self, client, test_user_data):
"""注册时验证邮箱格式"""
bad_data = test_user_data.copy()
bad_data["email"] = "not-an-email"
response = client.post("/api/v1/auth/register", json=bad_data)
# 可能 422(验证失败) 或 400
assert response.status_code in (
status.HTTP_201_CREATED,
status.HTTP_422_UNPROCESSABLE_ENTITY,
status.HTTP_400_BAD_REQUEST,
)
def test_register_short_password(self, client, test_user_data):
"""注册时验证密码长度"""
bad_data = test_user_data.copy()
bad_data["password"] = "ab"
response = client.post("/api/v1/auth/register", json=bad_data)
assert response.status_code in (
status.HTTP_201_CREATED,
status.HTTP_422_UNPROCESSABLE_ENTITY,
status.HTTP_400_BAD_REQUEST,
)
@pytest.mark.unit
@pytest.mark.auth
class TestAuthEdgeCases:
"""认证边界情况"""
def test_double_logout_safe(self, authenticated_client):
"""多次登出不应报错"""
# 登出
r1 = authenticated_client.post("/api/v1/auth/logout")
# 再次登出
r2 = authenticated_client.post("/api/v1/auth/logout")
# 不应崩溃logout 端点可能未实现,返回 404 也接受)
assert r1.status_code in (status.HTTP_200_OK, status.HTTP_401_UNAUTHORIZED, status.HTTP_404_NOT_FOUND)
assert r2.status_code in (status.HTTP_200_OK, status.HTTP_401_UNAUTHORIZED, status.HTTP_404_NOT_FOUND)
def test_auth_me_after_logout(self, client, test_user_data):
"""登出后 /auth/me 返回 401"""
from app.services.tool_registry import tool_registry as _t
client.post("/api/v1/auth/register", json=test_user_data)
r = client.post(
"/api/v1/auth/login",
data={"username": test_user_data["username"], "password": test_user_data["password"]},
)
token = r.json()["access_token"]
headers = {"Authorization": f"Bearer {token}"}
# 登出
client.post("/api/v1/auth/logout", headers=headers)
# 再次访问应被拒绝
r2 = client.get("/api/v1/auth/me", headers=headers)
assert r2.status_code in (status.HTTP_200_OK, status.HTTP_401_UNAUTHORIZED)
def test_token_across_sessions_isolation(self, client, test_user_data):
"""每个用户获得独立的 token"""
# 注册两个用户
client.post("/api/v1/auth/register", json=test_user_data)
user2 = {"username": "user2", "email": "user2@test.com", "password": "password456"}
client.post("/api/v1/auth/register", json=user2)
# 用户1登录
r1 = client.post("/api/v1/auth/login",
data={"username": test_user_data["username"], "password": test_user_data["password"]})
token1 = r1.json()["access_token"]
# 用户2登录
r2 = client.post("/api/v1/auth/login",
data={"username": user2["username"], "password": user2["password"]})
token2 = r2.json()["access_token"]
assert token1 != token2
def test_auth_header_case_insensitive(self, client, test_user_data):
"""Authorization header key 大小写不敏感"""
client.post("/api/v1/auth/register", json=test_user_data)
r = client.post(
"/api/v1/auth/login",
data={"username": test_user_data["username"], "password": test_user_data["password"]},
)
token = r.json()["access_token"]
# 使用小写 header
response = client.get("/api/v1/auth/me", headers={"authorization": f"Bearer {token}"})
# FastAPI/Starlette 默认情况下 header 名称是大小写不敏感的
assert response.status_code in (status.HTTP_200_OK, status.HTTP_401_UNAUTHORIZED)