""" 文件式记忆存储 (MEMORY.md) — 参考 Claude Code memdir 架构。 提供完全离线的文件系统记忆读写,数据库不可用时作为兜底。 格式:YAML frontmatter + markdown 内容,MEMORY.md 索引。 使用方式: store = FileMemoryStore("/path/to/memory_dir") store.save("用户偏好", "用户喜欢用Python", mem_type="user") results = store.search("Python") # 关键词检索 """ from __future__ import annotations import logging import os import re from datetime import datetime, timezone, timedelta from pathlib import Path from typing import Any, Dict, List, Optional logger = logging.getLogger(__name__) # MEMORY.md 索引每行最大长度 INDEX_LINE_MAX_LENGTH = 150 # 索引总行数上限(不含标题和空行) INDEX_MAX_ENTRIES = 200 # 单文件最大大小 MAX_FILE_SIZE = 40_000 class FileMemoryStore: """ 文件式记忆存储。 - 所有记忆以 .md 文件存储,含 YAML frontmatter - MEMORY.md 维护索引(一行一条) - 支持按类型分组目录 - 支持关键词检索 - 完全离线,零外部依赖 """ def __init__(self, memory_dir: str = ""): self._base_dir = Path(memory_dir) if memory_dir else Path.home() / ".tiangong" / "memory" self._base_dir.mkdir(parents=True, exist_ok=True) self._memories_dir = self._base_dir / "memories" self._memories_dir.mkdir(parents=True, exist_ok=True) self._index_path = self._base_dir / "MEMORY.md" self._ensure_index() # ─── Public API ─── @property def base_dir(self) -> str: return str(self._base_dir) @property def memory_count(self) -> int: """返回已索引的记忆数量。""" return len(self._parse_index()) def save( self, name: str, content: str, mem_type: str = "reference", tags: Optional[List[str]] = None, ) -> bool: """ 保存一条记忆。 1. 写入 {memories_dir}/{safe_name}.md 2. 更新 MEMORY.md 索引 成功返回 True。 """ try: safe_name = self._safe_filename(name) file_path = self._memories_dir / f"{safe_name}.md" # 构建 frontmatter + content now = datetime.now(timezone.utc).astimezone( timezone(timedelta(hours=8)) ).isoformat(timespec="seconds") tags_yaml = f"[{', '.join(tags)}]" if tags else "[]" body = ( f"---\n" f"name: {name}\n" f"description: {self._one_line(content)}\n" f"type: {mem_type}\n" f"created: {now}\n" f"tags: {tags_yaml}\n" f"---\n\n" f"{content}\n" ) # 检查是否已存在同名文件(更新而非追加) if file_path.exists(): existing = file_path.read_text(encoding="utf-8") if len(existing) + len(content) > MAX_FILE_SIZE: # 保留最近 2000 字 + 追加新内容 existing = existing[-2000:] body = existing.rstrip() + f"\n\n---\n## 更新 {now}\n\n{content}\n" file_path.write_text(body, encoding="utf-8") # 更新索引 self._update_index(name, mem_type, safe_name) logger.debug("文件记忆已保存: %s (%s)", name, mem_type) return True except Exception as e: logger.warning("文件记忆保存失败: %s", e) return False def search(self, query: str, top_k: int = 5) -> List[Dict[str, Any]]: """ 关键词检索所有记忆文件。 对 query 分词后匹配文件内容,返回得分排序的结果。 """ if not query or not query.strip(): return self._recent(top_k) tokens = self._tokenize(query) if not tokens: return [] scored: List[tuple] = [] # (score, file_path, name, mem_type) for md_file in self._memories_dir.glob("*.md"): try: text = md_file.read_text(encoding="utf-8") text_tokens = self._tokenize(text) if not text_tokens: continue intersection = tokens & text_tokens union = tokens | text_tokens score = len(intersection) / len(union) if union else 0 if score > 0: frontmatter = self._parse_frontmatter(text) name = frontmatter.get("name", md_file.stem) mem_type = frontmatter.get("type", "reference") scored.append((score, str(md_file), name, mem_type, text)) except Exception: continue scored.sort(key=lambda x: x[0], reverse=True) results = [] for score, path, name, mem_type, text in scored[:top_k]: # 提取匹配片段 snippet = self._extract_snippet(text, tokens, max_len=300) results.append({ "name": name, "type": mem_type, "content": snippet, "score": round(score, 3), "source": "file", "path": path, }) return results def list_by_type(self, mem_type: str = "") -> List[Dict[str, Any]]: """列出指定类型的所有记忆。""" entries = self._parse_index() if mem_type: entries = [e for e in entries if e.get("type") == mem_type] results = [] for entry in entries: file_path = self._memories_dir / f"{entry.get('file', '')}.md" content = "" if file_path.exists(): try: text = file_path.read_text(encoding="utf-8") frontmatter = self._parse_frontmatter(text) content = frontmatter.get("description", "")[:300] except Exception: pass results.append({ "name": entry.get("name", ""), "type": entry.get("type", ""), "content": content, "source": "file", }) return results def delete(self, name: str) -> bool: """删除一条记忆(文件 + 索引)。""" try: safe_name = self._safe_filename(name) file_path = self._memories_dir / f"{safe_name}.md" if file_path.exists(): file_path.unlink() self._remove_from_index(name) return True except Exception as e: logger.warning("删除文件记忆失败: %s", e) return False # ─── Private helpers ─── def _ensure_index(self) -> None: """确保 MEMORY.md 存在。""" if not self._index_path.exists(): self._index_path.write_text( "# Memory Index\n\n" "> 文件式记忆存储 — 数据库不可用时的离线兜底。\n" "> 格式参考 Claude Code memdir 架构。\n\n", encoding="utf-8", ) def _parse_index(self) -> List[Dict[str, str]]: """解析 MEMORY.md 索引,返回条目列表。""" entries = [] if not self._index_path.exists(): return entries try: lines = self._index_path.read_text(encoding="utf-8").split("\n") for line in lines: # 匹配 `- [Name](file.md) — description` 或 `- [Name](file.md)` m = re.match(r'- \[(.+?)\]\((.+?)\)\s*[-—]?\s*(.*)', line) if m: name = m.group(1).strip() filename = m.group(2).strip().removesuffix(".md") desc = m.group(3).strip() mem_type = desc.split(" ")[0] if desc else "reference" entries.append({ "name": name, "file": filename, "description": desc, "type": mem_type, }) except Exception: pass return entries def _update_index(self, name: str, mem_type: str, safe_name: str) -> None: """更新 MEMORY.md 索引。""" # 先移除旧条目 self._remove_from_index(name) try: # 读取现有内容 content = self._index_path.read_text(encoding="utf-8") lines = content.rstrip().split("\n") # 构建新行 new_line = f"- [{name}]({safe_name}.md) — {mem_type}" if len(new_line) > INDEX_LINE_MAX_LENGTH: new_line = new_line[:INDEX_LINE_MAX_LENGTH - 3] + "..." # 找到最后一个列表项后插入(或添加到末尾) insert_at = len(lines) for i in range(len(lines) - 1, -1, -1): if lines[i].startswith("- ["): insert_at = i + 1 break lines.insert(insert_at, new_line) # 裁剪索引到上限 list_lines = [l for l in lines if l.startswith("- [")] if len(list_lines) > INDEX_MAX_ENTRIES: # 移除最早的条目 excess = len(list_lines) - INDEX_MAX_ENTRIES new_lines = [] removed = 0 for line in lines: if line.startswith("- [") and removed < excess: removed += 1 continue new_lines.append(line) lines = new_lines self._index_path.write_text("\n".join(lines) + "\n", encoding="utf-8") except Exception as e: logger.warning("更新 MEMORY.md 索引失败: %s", e) def _remove_from_index(self, name: str) -> None: """从 MEMORY.md 索引中移除指定条目。""" try: if not self._index_path.exists(): return lines = self._index_path.read_text(encoding="utf-8").split("\n") new_lines = [] for line in lines: if f"[{name}]" in line: continue new_lines.append(line) self._index_path.write_text("\n".join(new_lines) + "\n", encoding="utf-8") except Exception: pass def _recent(self, top_k: int = 5) -> List[Dict[str, Any]]: """返回最近修改的记忆文件。""" files = sorted( self._memories_dir.glob("*.md"), key=lambda f: f.stat().st_mtime, reverse=True, ) results = [] for f in files[:top_k]: try: text = f.read_text(encoding="utf-8") fm = self._parse_frontmatter(text) results.append({ "name": fm.get("name", f.stem), "type": fm.get("type", "reference"), "content": fm.get("description", "")[:300], "score": 1.0, "source": "file", "path": str(f), }) except Exception: pass return results @staticmethod def _parse_frontmatter(text: str) -> Dict[str, Any]: """解析 YAML frontmatter。轻量版,不依赖 PyYAML。""" result: Dict[str, Any] = {} if not text.startswith("---"): return result parts = text.split("---", 2) if len(parts) < 3: return result fm_text = parts[1].strip() for line in fm_text.split("\n"): line = line.strip() if ":" in line: key, _, value = line.partition(":") key = key.strip() value = value.strip().strip("'\"") # 解析简单列表 [a, b, c] if value.startswith("[") and value.endswith("]"): value = [v.strip().strip("'\"") for v in value[1:-1].split(",") if v.strip()] result[key] = value # 提取正文第一段作为 description(如果 frontmatter 里没有) if "description" not in result: body = parts[2].strip() result["description"] = body[:200] return result @staticmethod def _safe_filename(name: str) -> str: """将名称转换为安全的文件名。""" # 只保留中文、英文、数字、下划线、连字符 safe = re.sub(r'[^\w\u4e00-\u9fff\-]', '_', name) return safe.strip('_')[:64] or "memory" @staticmethod def _one_line(text: str) -> str: """提取文本的第一行或前 120 字符。""" first_line = text.split("\n")[0].strip() if len(first_line) > 120: first_line = first_line[:117] + "..." return first_line or text[:120] @staticmethod def _tokenize(text: str) -> set: """分词(复用 embedding_service 的逻辑)。""" tokens: set = set() text_lower = text.lower() # 英文/数字词 alpha_words = re.findall(r'[a-z0-9]{2,}', text_lower) for w in alpha_words: tokens.add(w) # 中文二元组 + 单字 cjk_segments = re.findall(r'[\u4e00-\u9fff]+', text_lower) for seg in cjk_segments: for i in range(len(seg) - 1): tokens.add(seg[i:i+2]) for c in seg: tokens.add(c) # 数字 numbers = re.findall(r'\d+', text_lower) for n in numbers: tokens.add(n) return tokens @staticmethod def _extract_snippet(text: str, tokens: set, max_len: int = 300) -> str: """从文本中提取包含关键词的片段。""" # 跳过 frontmatter if text.startswith("---"): parts = text.split("---", 2) text = parts[2] if len(parts) >= 3 else text # 按段落找第一个匹配 paragraphs = text.split("\n\n") for para in paragraphs: para_lower = para.lower() if any(t in para_lower for t in tokens): if len(para) > max_len: return para[:max_len - 3] + "..." return para return text[:max_len] # 全局单例(延迟初始化,默认路径在首次使用项目目录时设置) _file_store: Optional[FileMemoryStore] = None def get_file_memory_store(memory_dir: str = "") -> FileMemoryStore: """获取文件记忆存储单例。""" global _file_store if _file_store is None or (memory_dir and _file_store.base_dir != memory_dir): _file_store = FileMemoryStore(memory_dir) return _file_store