Files
aiagent/backend/app/services/file_memory_service.py
renjianbo 7f4aeb021b fix: Feishu channel agents file_write permission blocked + memory system tests & docs
- Fix 8 Feishu agent handlers to use permission_level="acceptEdits" so file_write
  tool works without Web UI approval popup (lingxi/renshenguo/suyao/tiantian/orange/main/schedule)
- Add P5-P7 memory improvements: offline keyword fallback, team sharing, file-based memory
- Add auto_dream_service for daily memory consolidation
- Add 99 memory system test cases (basic 18 + advanced 43 + pytest 38)
- Add platform capability assessment report and unfinished project checklist

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-06-14 20:35:12 +08:00

420 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
文件式记忆存储 (MEMORY.md) — 参考 Claude Code memdir 架构。
提供完全离线的文件系统记忆读写,数据库不可用时作为兜底。
格式YAML frontmatter + markdown 内容MEMORY.md 索引。
使用方式:
store = FileMemoryStore("/path/to/memory_dir")
store.save("用户偏好", "用户喜欢用Python", mem_type="user")
results = store.search("Python") # 关键词检索
"""
from __future__ import annotations
import logging
import os
import re
from datetime import datetime, timezone, timedelta
from pathlib import Path
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
# MEMORY.md 索引每行最大长度
INDEX_LINE_MAX_LENGTH = 150
# 索引总行数上限(不含标题和空行)
INDEX_MAX_ENTRIES = 200
# 单文件最大大小
MAX_FILE_SIZE = 40_000
class FileMemoryStore:
"""
文件式记忆存储。
- 所有记忆以 .md 文件存储,含 YAML frontmatter
- MEMORY.md 维护索引(一行一条)
- 支持按类型分组目录
- 支持关键词检索
- 完全离线,零外部依赖
"""
def __init__(self, memory_dir: str = ""):
self._base_dir = Path(memory_dir) if memory_dir else Path.home() / ".tiangong" / "memory"
self._base_dir.mkdir(parents=True, exist_ok=True)
self._memories_dir = self._base_dir / "memories"
self._memories_dir.mkdir(parents=True, exist_ok=True)
self._index_path = self._base_dir / "MEMORY.md"
self._ensure_index()
# ─── Public API ───
@property
def base_dir(self) -> str:
return str(self._base_dir)
@property
def memory_count(self) -> int:
"""返回已索引的记忆数量。"""
return len(self._parse_index())
def save(
self, name: str, content: str,
mem_type: str = "reference",
tags: Optional[List[str]] = None,
) -> bool:
"""
保存一条记忆。
1. 写入 {memories_dir}/{safe_name}.md
2. 更新 MEMORY.md 索引
成功返回 True。
"""
try:
safe_name = self._safe_filename(name)
file_path = self._memories_dir / f"{safe_name}.md"
# 构建 frontmatter + content
now = datetime.now(timezone.utc).astimezone(
timezone(timedelta(hours=8))
).isoformat(timespec="seconds")
tags_yaml = f"[{', '.join(tags)}]" if tags else "[]"
body = (
f"---\n"
f"name: {name}\n"
f"description: {self._one_line(content)}\n"
f"type: {mem_type}\n"
f"created: {now}\n"
f"tags: {tags_yaml}\n"
f"---\n\n"
f"{content}\n"
)
# 检查是否已存在同名文件(更新而非追加)
if file_path.exists():
existing = file_path.read_text(encoding="utf-8")
if len(existing) + len(content) > MAX_FILE_SIZE:
# 保留最近 2000 字 + 追加新内容
existing = existing[-2000:]
body = existing.rstrip() + f"\n\n---\n## 更新 {now}\n\n{content}\n"
file_path.write_text(body, encoding="utf-8")
# 更新索引
self._update_index(name, mem_type, safe_name)
logger.debug("文件记忆已保存: %s (%s)", name, mem_type)
return True
except Exception as e:
logger.warning("文件记忆保存失败: %s", e)
return False
def search(self, query: str, top_k: int = 5) -> List[Dict[str, Any]]:
"""
关键词检索所有记忆文件。
对 query 分词后匹配文件内容,返回得分排序的结果。
"""
if not query or not query.strip():
return self._recent(top_k)
tokens = self._tokenize(query)
if not tokens:
return []
scored: List[tuple] = [] # (score, file_path, name, mem_type)
for md_file in self._memories_dir.glob("*.md"):
try:
text = md_file.read_text(encoding="utf-8")
text_tokens = self._tokenize(text)
if not text_tokens:
continue
intersection = tokens & text_tokens
union = tokens | text_tokens
score = len(intersection) / len(union) if union else 0
if score > 0:
frontmatter = self._parse_frontmatter(text)
name = frontmatter.get("name", md_file.stem)
mem_type = frontmatter.get("type", "reference")
scored.append((score, str(md_file), name, mem_type, text))
except Exception:
continue
scored.sort(key=lambda x: x[0], reverse=True)
results = []
for score, path, name, mem_type, text in scored[:top_k]:
# 提取匹配片段
snippet = self._extract_snippet(text, tokens, max_len=300)
results.append({
"name": name,
"type": mem_type,
"content": snippet,
"score": round(score, 3),
"source": "file",
"path": path,
})
return results
def list_by_type(self, mem_type: str = "") -> List[Dict[str, Any]]:
"""列出指定类型的所有记忆。"""
entries = self._parse_index()
if mem_type:
entries = [e for e in entries if e.get("type") == mem_type]
results = []
for entry in entries:
file_path = self._memories_dir / f"{entry.get('file', '')}.md"
content = ""
if file_path.exists():
try:
text = file_path.read_text(encoding="utf-8")
frontmatter = self._parse_frontmatter(text)
content = frontmatter.get("description", "")[:300]
except Exception:
pass
results.append({
"name": entry.get("name", ""),
"type": entry.get("type", ""),
"content": content,
"source": "file",
})
return results
def delete(self, name: str) -> bool:
"""删除一条记忆(文件 + 索引)。"""
try:
safe_name = self._safe_filename(name)
file_path = self._memories_dir / f"{safe_name}.md"
if file_path.exists():
file_path.unlink()
self._remove_from_index(name)
return True
except Exception as e:
logger.warning("删除文件记忆失败: %s", e)
return False
# ─── Private helpers ───
def _ensure_index(self) -> None:
"""确保 MEMORY.md 存在。"""
if not self._index_path.exists():
self._index_path.write_text(
"# Memory Index\n\n"
"> 文件式记忆存储 — 数据库不可用时的离线兜底。\n"
"> 格式参考 Claude Code memdir 架构。\n\n",
encoding="utf-8",
)
def _parse_index(self) -> List[Dict[str, str]]:
"""解析 MEMORY.md 索引,返回条目列表。"""
entries = []
if not self._index_path.exists():
return entries
try:
lines = self._index_path.read_text(encoding="utf-8").split("\n")
for line in lines:
# 匹配 `- [Name](file.md) — description` 或 `- [Name](file.md)`
m = re.match(r'- \[(.+?)\]\((.+?)\)\s*[-—]?\s*(.*)', line)
if m:
name = m.group(1).strip()
filename = m.group(2).strip().removesuffix(".md")
desc = m.group(3).strip()
mem_type = desc.split(" ")[0] if desc else "reference"
entries.append({
"name": name,
"file": filename,
"description": desc,
"type": mem_type,
})
except Exception:
pass
return entries
def _update_index(self, name: str, mem_type: str, safe_name: str) -> None:
"""更新 MEMORY.md 索引。"""
# 先移除旧条目
self._remove_from_index(name)
try:
# 读取现有内容
content = self._index_path.read_text(encoding="utf-8")
lines = content.rstrip().split("\n")
# 构建新行
new_line = f"- [{name}]({safe_name}.md) — {mem_type}"
if len(new_line) > INDEX_LINE_MAX_LENGTH:
new_line = new_line[:INDEX_LINE_MAX_LENGTH - 3] + "..."
# 找到最后一个列表项后插入(或添加到末尾)
insert_at = len(lines)
for i in range(len(lines) - 1, -1, -1):
if lines[i].startswith("- ["):
insert_at = i + 1
break
lines.insert(insert_at, new_line)
# 裁剪索引到上限
list_lines = [l for l in lines if l.startswith("- [")]
if len(list_lines) > INDEX_MAX_ENTRIES:
# 移除最早的条目
excess = len(list_lines) - INDEX_MAX_ENTRIES
new_lines = []
removed = 0
for line in lines:
if line.startswith("- [") and removed < excess:
removed += 1
continue
new_lines.append(line)
lines = new_lines
self._index_path.write_text("\n".join(lines) + "\n", encoding="utf-8")
except Exception as e:
logger.warning("更新 MEMORY.md 索引失败: %s", e)
def _remove_from_index(self, name: str) -> None:
"""从 MEMORY.md 索引中移除指定条目。"""
try:
if not self._index_path.exists():
return
lines = self._index_path.read_text(encoding="utf-8").split("\n")
new_lines = []
for line in lines:
if f"[{name}]" in line:
continue
new_lines.append(line)
self._index_path.write_text("\n".join(new_lines) + "\n", encoding="utf-8")
except Exception:
pass
def _recent(self, top_k: int = 5) -> List[Dict[str, Any]]:
"""返回最近修改的记忆文件。"""
files = sorted(
self._memories_dir.glob("*.md"),
key=lambda f: f.stat().st_mtime,
reverse=True,
)
results = []
for f in files[:top_k]:
try:
text = f.read_text(encoding="utf-8")
fm = self._parse_frontmatter(text)
results.append({
"name": fm.get("name", f.stem),
"type": fm.get("type", "reference"),
"content": fm.get("description", "")[:300],
"score": 1.0,
"source": "file",
"path": str(f),
})
except Exception:
pass
return results
@staticmethod
def _parse_frontmatter(text: str) -> Dict[str, Any]:
"""解析 YAML frontmatter。轻量版不依赖 PyYAML。"""
result: Dict[str, Any] = {}
if not text.startswith("---"):
return result
parts = text.split("---", 2)
if len(parts) < 3:
return result
fm_text = parts[1].strip()
for line in fm_text.split("\n"):
line = line.strip()
if ":" in line:
key, _, value = line.partition(":")
key = key.strip()
value = value.strip().strip("'\"")
# 解析简单列表 [a, b, c]
if value.startswith("[") and value.endswith("]"):
value = [v.strip().strip("'\"") for v in value[1:-1].split(",") if v.strip()]
result[key] = value
# 提取正文第一段作为 description如果 frontmatter 里没有)
if "description" not in result:
body = parts[2].strip()
result["description"] = body[:200]
return result
@staticmethod
def _safe_filename(name: str) -> str:
"""将名称转换为安全的文件名。"""
# 只保留中文、英文、数字、下划线、连字符
safe = re.sub(r'[^\w\u4e00-\u9fff\-]', '_', name)
return safe.strip('_')[:64] or "memory"
@staticmethod
def _one_line(text: str) -> str:
"""提取文本的第一行或前 120 字符。"""
first_line = text.split("\n")[0].strip()
if len(first_line) > 120:
first_line = first_line[:117] + "..."
return first_line or text[:120]
@staticmethod
def _tokenize(text: str) -> set:
"""分词(复用 embedding_service 的逻辑)。"""
tokens: set = set()
text_lower = text.lower()
# 英文/数字词
alpha_words = re.findall(r'[a-z0-9]{2,}', text_lower)
for w in alpha_words:
tokens.add(w)
# 中文二元组 + 单字
cjk_segments = re.findall(r'[\u4e00-\u9fff]+', text_lower)
for seg in cjk_segments:
for i in range(len(seg) - 1):
tokens.add(seg[i:i+2])
for c in seg:
tokens.add(c)
# 数字
numbers = re.findall(r'\d+', text_lower)
for n in numbers:
tokens.add(n)
return tokens
@staticmethod
def _extract_snippet(text: str, tokens: set, max_len: int = 300) -> str:
"""从文本中提取包含关键词的片段。"""
# 跳过 frontmatter
if text.startswith("---"):
parts = text.split("---", 2)
text = parts[2] if len(parts) >= 3 else text
# 按段落找第一个匹配
paragraphs = text.split("\n\n")
for para in paragraphs:
para_lower = para.lower()
if any(t in para_lower for t in tokens):
if len(para) > max_len:
return para[:max_len - 3] + "..."
return para
return text[:max_len]
# 全局单例(延迟初始化,默认路径在首次使用项目目录时设置)
_file_store: Optional[FileMemoryStore] = None
def get_file_memory_store(memory_dir: str = "") -> FileMemoryStore:
"""获取文件记忆存储单例。"""
global _file_store
if _file_store is None or (memory_dir and _file_store.base_dir != memory_dir):
_file_store = FileMemoryStore(memory_dir)
return _file_store