Files
aiagent/backend/app/agent_runtime/permissions.py
renjianbo 7f4aeb021b fix: Feishu channel agents file_write permission blocked + memory system tests & docs
- Fix 8 Feishu agent handlers to use permission_level="acceptEdits" so file_write
  tool works without Web UI approval popup (lingxi/renshenguo/suyao/tiantian/orange/main/schedule)
- Add P5-P7 memory improvements: offline keyword fallback, team sharing, file-based memory
- Add auto_dream_service for daily memory consolidation
- Add 99 memory system test cases (basic 18 + advanced 43 + pytest 38)
- Add platform capability assessment report and unfinished project checklist

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-06-14 20:35:12 +08:00

229 lines
8.5 KiB
Python

"""
工具安全分级与权限检查
参考 Claude Code Tool.ts 的 checkPermissions / PermissionResult 设计:
- 4 级权限: bypass > acceptEdits > default > plan
- 工具标记: is_read_only / is_destructive
- 自动批准规则: 基于工具名 + 参数模式匹配
"""
from __future__ import annotations
from enum import Enum
from typing import Any, Dict, List, Optional
from dataclasses import dataclass, field
import re
import logging
logger = logging.getLogger(__name__)
# ──────────────────────────── 权限级别 ────────────────────────────
class PermissionLevel(str, Enum):
"""权限级别 — 参考 Claude Code PermissionMode"""
BYPASS = "bypass" # 完全跳过权限检查
ACCEPT_EDITS = "acceptEdits" # 自动批准文件编辑(读+写)
DEFAULT = "default" # 每次询问(写操作需确认)
PLAN = "plan" # 只读 + 计划工具
# ──────────────────────────── 权限结果 ────────────────────────────
class PermissionAction(str, Enum):
ALLOW = "allow"
DENY = "deny"
ASK = "ask" # 需要用户确认
@dataclass
class PermissionResult:
"""权限检查结果"""
action: PermissionAction
message: str = ""
updated_input: Optional[Dict[str, Any]] = None # Hook 可修改参数
# ──────────────────────────── 工具安全标记 ────────────────────────────
# 只读工具 — PLAN 模式下仍然可用
READ_ONLY_TOOLS: set = {
"file_read", "grep", "glob", "web_search", "web_fetch",
"list_files", "read_lints", "codebase_search",
"math_calculate", "text", "json", "csv",
"database_query", "agent_list", "knowledge_base_search",
}
# 破坏性工具 — 不可逆操作
DESTRUCTIVE_TOOLS: set = {
"file_write", "file_delete", "command_exec", "shell_exec",
"docker_manage", "git_push", "git_reset_hard",
"database_execute", "deploy_push", "agent_delete",
}
# 编辑工具 — ACCEPT_EDITS 级别自动批准
EDIT_TOOLS: set = {
"file_edit", "file_write", "notebook_edit",
}
def is_read_only_tool(tool_name: str) -> bool:
"""判断工具是否只读"""
return tool_name in READ_ONLY_TOOLS
def is_destructive_tool(tool_name: str) -> bool:
"""判断工具是否具有破坏性"""
return tool_name in DESTRUCTIVE_TOOLS
def is_edit_tool(tool_name: str) -> bool:
"""判断工具是否为编辑类"""
return tool_name in EDIT_TOOLS
# ──────────────────────────── 自动批准规则 ────────────────────────────
@dataclass
class AutoApproveRule:
"""自动批准规则 — 参考 Claude Code alwaysAllowRules"""
tool_pattern: str # 工具名匹配 (支持 * 通配符)
param_conditions: Optional[Dict[str, Any]] = None # 参数条件
description: str = ""
def matches(self, tool_name: str, params: Optional[Dict[str, Any]] = None) -> bool:
"""检查工具是否匹配此规则"""
# 通配符匹配
if self.tool_pattern == "*":
return True
if self.tool_pattern.endswith("*"):
prefix = self.tool_pattern[:-1]
if not tool_name.startswith(prefix):
return False
elif tool_name != self.tool_pattern:
return False
# 参数条件匹配
if self.param_conditions and params:
for key, expected in self.param_conditions.items():
actual = params.get(key)
if isinstance(expected, str) and expected.startswith("regex:"):
pattern = expected[6:]
if not re.search(pattern, str(actual)):
return False
elif actual != expected:
return False
return True
# 默认自动批准规则
DEFAULT_AUTO_APPROVE_RULES: List[AutoApproveRule] = [
AutoApproveRule(tool_pattern="file_read", description="读取文件总是安全"),
AutoApproveRule(tool_pattern="grep", description="代码搜索总是安全"),
AutoApproveRule(tool_pattern="glob", description="文件搜索总是安全"),
AutoApproveRule(tool_pattern="web_search", description="网页搜索只读"),
AutoApproveRule(tool_pattern="web_fetch", description="网页抓取只读"),
AutoApproveRule(tool_pattern="math_calculate", description="数学计算无副作用"),
AutoApproveRule(tool_pattern="list_files", description="列出文件无副作用"),
AutoApproveRule(tool_pattern="read_lints", description="读取 lint 结果无副作用"),
AutoApproveRule(tool_pattern="knowledge_base_search", description="知识库搜索只读"),
]
# ──────────────────────────── 权限检查器 ────────────────────────────
class PermissionChecker:
"""
工具权限检查器 — 参考 Claude Code useCanUseTool 流程。
检查顺序:
1. BYPASS 模式 → 直接放行
2. 拒绝列表 → 直接拒绝
3. 自动批准规则 → 放行
4. PLAN 模式 → 只允许只读工具
5. ACCEPT_EDITS 模式 → 只读 + 编辑工具自动放行
6. DEFAULT 模式 → 编辑/破坏性工具需确认
"""
def __init__(
self,
level: PermissionLevel = PermissionLevel.DEFAULT,
auto_approve_rules: Optional[List[AutoApproveRule]] = None,
deny_rules: Optional[List[str]] = None,
):
self.level = level
self.auto_approve_rules = auto_approve_rules or list(DEFAULT_AUTO_APPROVE_RULES)
self.deny_tools: set = set(deny_rules or [])
def check(
self,
tool_name: str,
params: Optional[Dict[str, Any]] = None,
) -> PermissionResult:
"""
检查工具调用权限。
Returns:
PermissionResult 指示 allow / deny / ask
"""
# 1. BYPASS — 完全放行
if self.level == PermissionLevel.BYPASS:
return PermissionResult(action=PermissionAction.ALLOW)
# 2. 拒绝列表
if tool_name in self.deny_tools:
return PermissionResult(
action=PermissionAction.DENY,
message=f"工具 {tool_name} 已被管理员禁用",
)
# 3. 自动批准规则
for rule in self.auto_approve_rules:
if rule.matches(tool_name, params):
logger.debug(f"工具 {tool_name} 匹配自动批准规则: {rule.description}")
return PermissionResult(action=PermissionAction.ALLOW)
# 4. PLAN 模式 — 只允许只读
if self.level == PermissionLevel.PLAN:
if is_read_only_tool(tool_name):
return PermissionResult(action=PermissionAction.ALLOW)
return PermissionResult(
action=PermissionAction.DENY,
message=f"PLAN 模式下不允许使用 {tool_name}(仅支持只读工具)",
)
# 5. ACCEPT_EDITS — 只读 + 编辑自动放行
if self.level == PermissionLevel.ACCEPT_EDITS:
if is_read_only_tool(tool_name) or is_edit_tool(tool_name):
return PermissionResult(action=PermissionAction.ALLOW)
# 6. DEFAULT — 破坏性工具需确认
if is_destructive_tool(tool_name):
return PermissionResult(
action=PermissionAction.ASK,
message=f"工具 {tool_name} 可能产生不可逆操作,是否继续?",
)
# 编辑工具在 DEFAULT 下也需确认
if is_edit_tool(tool_name):
return PermissionResult(
action=PermissionAction.ASK,
message=f"确认编辑操作: {tool_name}",
)
# 未知工具默认放行
return PermissionResult(action=PermissionAction.ALLOW)
def add_auto_approve_rule(self, rule: AutoApproveRule):
"""添加自动批准规则"""
self.auto_approve_rules.append(rule)
def add_deny_tool(self, tool_name: str):
"""添加拒绝工具"""
self.deny_tools.add(tool_name)
def set_level(self, level: PermissionLevel):
"""切换权限级别"""
logger.info(f"权限级别切换: {self.level.value}{level.value}")
self.level = level