Files
aiagent/backend/app/api/permissions.py
2026-01-19 00:09:36 +08:00

590 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
权限管理API
支持RBAC基于角色的访问控制
"""
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from sqlalchemy import or_
from pydantic import BaseModel
from typing import List, Optional
import logging
from app.core.database import get_db
from app.models.permission import Role, Permission, WorkflowPermission, AgentPermission
from app.models.user import User
from app.models.workflow import Workflow
from app.models.agent import Agent
from app.api.auth import get_current_user
from app.core.exceptions import NotFoundError, ConflictError, ValidationError
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v1/permissions", tags=["permissions"])
# ========== 角色管理 ==========
class RoleCreate(BaseModel):
"""角色创建模型"""
name: str
description: Optional[str] = None
permission_ids: Optional[List[str]] = None
class RoleUpdate(BaseModel):
"""角色更新模型"""
name: Optional[str] = None
description: Optional[str] = None
permission_ids: Optional[List[str]] = None
class RoleResponse(BaseModel):
"""角色响应模型"""
id: str
name: str
description: Optional[str]
is_system: bool
permissions: List[dict]
user_count: int
created_at: str
updated_at: str
class Config:
from_attributes = True
@router.get("/roles", response_model=List[RoleResponse])
async def get_roles(
skip: int = 0,
limit: int = 100,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""获取角色列表(仅管理员)"""
if current_user.role != "admin":
raise HTTPException(status_code=403, detail="仅管理员可访问")
roles = db.query(Role).offset(skip).limit(limit).all()
result = []
for role in roles:
result.append({
"id": role.id,
"name": role.name,
"description": role.description,
"is_system": role.is_system,
"permissions": [{"id": p.id, "name": p.name, "code": p.code} for p in role.permissions],
"user_count": len(role.users.all()) if hasattr(role.users, 'all') else 0,
"created_at": role.created_at.isoformat() if role.created_at else None,
"updated_at": role.updated_at.isoformat() if role.updated_at else None
})
return result
@router.post("/roles", response_model=RoleResponse)
async def create_role(
role_data: RoleCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""创建角色(仅管理员)"""
if current_user.role != "admin":
raise HTTPException(status_code=403, detail="仅管理员可创建角色")
# 检查角色名称是否已存在
existing = db.query(Role).filter(Role.name == role_data.name).first()
if existing:
raise ConflictError(f"角色名称 '{role_data.name}' 已存在")
# 创建角色
role = Role(
name=role_data.name,
description=role_data.description
)
db.add(role)
# 分配权限
if role_data.permission_ids:
permissions = db.query(Permission).filter(Permission.id.in_(role_data.permission_ids)).all()
role.permissions = permissions
db.commit()
db.refresh(role)
return {
"id": role.id,
"name": role.name,
"description": role.description,
"is_system": role.is_system,
"permissions": [{"id": p.id, "name": p.name, "code": p.code} for p in role.permissions],
"user_count": 0,
"created_at": role.created_at.isoformat() if role.created_at else None,
"updated_at": role.updated_at.isoformat() if role.updated_at else None
}
@router.put("/roles/{role_id}", response_model=RoleResponse)
async def update_role(
role_id: str,
role_data: RoleUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""更新角色(仅管理员)"""
if current_user.role != "admin":
raise HTTPException(status_code=403, detail="仅管理员可更新角色")
role = db.query(Role).filter(Role.id == role_id).first()
if not role:
raise NotFoundError("角色", role_id)
if role.is_system:
raise HTTPException(status_code=400, detail="系统角色不可修改")
# 更新字段
if role_data.name is not None:
# 检查名称是否重复
existing = db.query(Role).filter(Role.name == role_data.name, Role.id != role_id).first()
if existing:
raise ConflictError(f"角色名称 '{role_data.name}' 已存在")
role.name = role_data.name
if role_data.description is not None:
role.description = role_data.description
# 更新权限
if role_data.permission_ids is not None:
permissions = db.query(Permission).filter(Permission.id.in_(role_data.permission_ids)).all()
role.permissions = permissions
db.commit()
db.refresh(role)
return {
"id": role.id,
"name": role.name,
"description": role.description,
"is_system": role.is_system,
"permissions": [{"id": p.id, "name": p.name, "code": p.code} for p in role.permissions],
"user_count": len(role.users.all()) if hasattr(role.users, 'all') else 0,
"created_at": role.created_at.isoformat() if role.created_at else None,
"updated_at": role.updated_at.isoformat() if role.updated_at else None
}
@router.delete("/roles/{role_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_role(
role_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""删除角色(仅管理员)"""
if current_user.role != "admin":
raise HTTPException(status_code=403, detail="仅管理员可删除角色")
role = db.query(Role).filter(Role.id == role_id).first()
if not role:
raise NotFoundError("角色", role_id)
if role.is_system:
raise HTTPException(status_code=400, detail="系统角色不可删除")
db.delete(role)
db.commit()
# ========== 权限管理 ==========
class PermissionResponse(BaseModel):
"""权限响应模型"""
id: str
name: str
code: str
resource: str
action: str
description: Optional[str]
created_at: str
updated_at: str
class Config:
from_attributes = True
@router.get("/permissions", response_model=List[PermissionResponse])
async def get_permissions(
resource: Optional[str] = None,
action: Optional[str] = None,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""获取权限列表"""
query = db.query(Permission)
if resource:
query = query.filter(Permission.resource == resource)
if action:
query = query.filter(Permission.action == action)
permissions = query.all()
return [
{
"id": p.id,
"name": p.name,
"code": p.code,
"resource": p.resource,
"action": p.action,
"description": p.description,
"created_at": p.created_at.isoformat() if p.created_at else None,
"updated_at": p.updated_at.isoformat() if p.updated_at else None
}
for p in permissions
]
# ========== 用户角色管理 ==========
class UserRoleAssign(BaseModel):
"""用户角色分配模型"""
user_id: str
role_ids: List[str]
@router.post("/users/{user_id}/roles")
async def assign_user_roles(
user_id: str,
role_ids: List[str],
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""为用户分配角色(仅管理员)"""
if current_user.role != "admin":
raise HTTPException(status_code=403, detail="仅管理员可分配角色")
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise NotFoundError("用户", user_id)
roles = db.query(Role).filter(Role.id.in_(role_ids)).all()
if len(roles) != len(role_ids):
raise NotFoundError("角色", "部分角色不存在")
user.roles = roles
db.commit()
return {
"user_id": user_id,
"roles": [{"id": r.id, "name": r.name} for r in roles]
}
@router.get("/users", response_model=List[dict])
async def get_users(
skip: int = 0,
limit: int = 100,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""获取用户列表(仅管理员)"""
if current_user.role != "admin":
raise HTTPException(status_code=403, detail="仅管理员可访问")
users = db.query(User).offset(skip).limit(limit).all()
result = []
for user in users:
# 获取用户的角色
roles = user.roles.all() if hasattr(user.roles, 'all') else list(user.roles)
result.append({
"id": user.id,
"username": user.username,
"email": user.email,
"role": user.role,
"roles": [{"id": r.id, "name": r.name, "description": r.description} for r in roles],
"created_at": user.created_at.isoformat() if user.created_at else None
})
return result
@router.get("/users/{user_id}/roles")
async def get_user_roles(
user_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""获取用户的角色列表"""
# 用户只能查看自己的角色,管理员可以查看所有用户的角色
if current_user.id != user_id and current_user.role != "admin":
raise HTTPException(status_code=403, detail="无权访问")
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise NotFoundError("用户", user_id)
roles = user.roles.all() if hasattr(user.roles, 'all') else list(user.roles)
return {
"user_id": user_id,
"roles": [{"id": r.id, "name": r.name, "description": r.description} for r in roles]
}
# ========== 工作流权限管理 ==========
class WorkflowPermissionCreate(BaseModel):
"""工作流权限创建模型"""
workflow_id: str
user_id: Optional[str] = None
role_id: Optional[str] = None
permission_type: str # read/write/execute/share
@router.post("/workflows/{workflow_id}/permissions")
async def grant_workflow_permission(
workflow_id: str,
permission_data: WorkflowPermissionCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""授予工作流权限"""
workflow = db.query(Workflow).filter(Workflow.id == workflow_id).first()
if not workflow:
raise NotFoundError("工作流", workflow_id)
# 只有工作流所有者或管理员可以授权
if workflow.user_id != current_user.id and current_user.role != "admin":
raise HTTPException(status_code=403, detail="无权授权此工作流")
# 验证权限类型
if permission_data.permission_type not in ["read", "write", "execute", "share"]:
raise ValidationError("权限类型必须是: read/write/execute/share")
# user_id和role_id必须至少有一个
if not permission_data.user_id and not permission_data.role_id:
raise ValidationError("必须指定user_id或role_id")
# 检查权限是否已存在
existing = db.query(WorkflowPermission).filter(
WorkflowPermission.workflow_id == workflow_id,
WorkflowPermission.user_id == permission_data.user_id,
WorkflowPermission.role_id == permission_data.role_id,
WorkflowPermission.permission_type == permission_data.permission_type
).first()
if existing:
raise ConflictError("权限已存在")
# 创建权限
permission = WorkflowPermission(
workflow_id=workflow_id,
user_id=permission_data.user_id,
role_id=permission_data.role_id,
permission_type=permission_data.permission_type,
granted_by=current_user.id
)
db.add(permission)
db.commit()
db.refresh(permission)
return {
"id": permission.id,
"workflow_id": workflow_id,
"user_id": permission_data.user_id,
"role_id": permission_data.role_id,
"permission_type": permission_data.permission_type,
"granted_by": current_user.id
}
@router.get("/workflows/{workflow_id}/permissions")
async def get_workflow_permissions(
workflow_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""获取工作流权限列表"""
workflow = db.query(Workflow).filter(Workflow.id == workflow_id).first()
if not workflow:
raise NotFoundError("工作流", workflow_id)
# 只有工作流所有者或管理员可以查看权限
if workflow.user_id != current_user.id and current_user.role != "admin":
raise HTTPException(status_code=403, detail="无权查看此工作流的权限")
permissions = db.query(WorkflowPermission).filter(
WorkflowPermission.workflow_id == workflow_id
).all()
return [
{
"id": p.id,
"user_id": p.user_id,
"role_id": p.role_id,
"permission_type": p.permission_type,
"granted_by": p.granted_by,
"created_at": p.created_at.isoformat() if p.created_at else None
}
for p in permissions
]
@router.delete("/workflows/{workflow_id}/permissions/{permission_id}", status_code=status.HTTP_204_NO_CONTENT)
async def revoke_workflow_permission(
workflow_id: str,
permission_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""撤销工作流权限"""
workflow = db.query(Workflow).filter(Workflow.id == workflow_id).first()
if not workflow:
raise NotFoundError("工作流", workflow_id)
permission = db.query(WorkflowPermission).filter(
WorkflowPermission.id == permission_id,
WorkflowPermission.workflow_id == workflow_id
).first()
if not permission:
raise NotFoundError("权限", permission_id)
# 只有工作流所有者、管理员或授权人可以撤销
if (workflow.user_id != current_user.id and
current_user.role != "admin" and
permission.granted_by != current_user.id):
raise HTTPException(status_code=403, detail="无权撤销此权限")
db.delete(permission)
db.commit()
# ========== Agent权限管理 ==========
class AgentPermissionCreate(BaseModel):
"""Agent权限创建模型"""
agent_id: str
user_id: Optional[str] = None
role_id: Optional[str] = None
permission_type: str # read/write/execute/deploy
@router.post("/agents/{agent_id}/permissions")
async def grant_agent_permission(
agent_id: str,
permission_data: AgentPermissionCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""授予Agent权限"""
agent = db.query(Agent).filter(Agent.id == agent_id).first()
if not agent:
raise NotFoundError("Agent", agent_id)
# 只有Agent所有者或管理员可以授权
if agent.user_id != current_user.id and current_user.role != "admin":
raise HTTPException(status_code=403, detail="无权授权此Agent")
# 验证权限类型
if permission_data.permission_type not in ["read", "write", "execute", "deploy"]:
raise ValidationError("权限类型必须是: read/write/execute/deploy")
# user_id和role_id必须至少有一个
if not permission_data.user_id and not permission_data.role_id:
raise ValidationError("必须指定user_id或role_id")
# 检查权限是否已存在
existing = db.query(AgentPermission).filter(
AgentPermission.agent_id == agent_id,
AgentPermission.user_id == permission_data.user_id,
AgentPermission.role_id == permission_data.role_id,
AgentPermission.permission_type == permission_data.permission_type
).first()
if existing:
raise ConflictError("权限已存在")
# 创建权限
permission = AgentPermission(
agent_id=agent_id,
user_id=permission_data.user_id,
role_id=permission_data.role_id,
permission_type=permission_data.permission_type,
granted_by=current_user.id
)
db.add(permission)
db.commit()
db.refresh(permission)
return {
"id": permission.id,
"agent_id": agent_id,
"user_id": permission_data.user_id,
"role_id": permission_data.role_id,
"permission_type": permission_data.permission_type,
"granted_by": current_user.id
}
@router.get("/agents/{agent_id}/permissions")
async def get_agent_permissions(
agent_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""获取Agent权限列表"""
agent = db.query(Agent).filter(Agent.id == agent_id).first()
if not agent:
raise NotFoundError("Agent", agent_id)
# 只有Agent所有者或管理员可以查看权限
if agent.user_id != current_user.id and current_user.role != "admin":
raise HTTPException(status_code=403, detail="无权查看此Agent的权限")
permissions = db.query(AgentPermission).filter(
AgentPermission.agent_id == agent_id
).all()
return [
{
"id": p.id,
"user_id": p.user_id,
"role_id": p.role_id,
"permission_type": p.permission_type,
"granted_by": p.granted_by,
"created_at": p.created_at.isoformat() if p.created_at else None
}
for p in permissions
]
@router.delete("/agents/{agent_id}/permissions/{permission_id}", status_code=status.HTTP_204_NO_CONTENT)
async def revoke_agent_permission(
agent_id: str,
permission_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""撤销Agent权限"""
agent = db.query(Agent).filter(Agent.id == agent_id).first()
if not agent:
raise NotFoundError("Agent", agent_id)
permission = db.query(AgentPermission).filter(
AgentPermission.id == permission_id,
AgentPermission.agent_id == agent_id
).first()
if not permission:
raise NotFoundError("权限", permission_id)
# 只有Agent所有者、管理员或授权人可以撤销
if (agent.user_id != current_user.id and
current_user.role != "admin" and
permission.granted_by != current_user.id):
raise HTTPException(status_code=403, detail="无权撤销此权限")
db.delete(permission)
db.commit()