- Fix 8 Feishu agent handlers to use permission_level="acceptEdits" so file_write tool works without Web UI approval popup (lingxi/renshenguo/suyao/tiantian/orange/main/schedule) - Add P5-P7 memory improvements: offline keyword fallback, team sharing, file-based memory - Add auto_dream_service for daily memory consolidation - Add 99 memory system test cases (basic 18 + advanced 43 + pytest 38) - Add platform capability assessment report and unfinished project checklist Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
422 lines
14 KiB
Python
422 lines
14 KiB
Python
"""
|
||
天工 Agent 记忆系统 — 全功能测试用例
|
||
|
||
覆盖:P0 分类 / P1 向量化 / P2 Rerank / P3 异步压缩 / P4 Auto Dream
|
||
P5 离线兜底 / P6 团队共享 / P7 文件记忆 / 核心嵌入 / 压缩 / 知识池
|
||
|
||
运行:cd backend && python tests/test_memory_system.py
|
||
"""
|
||
import asyncio
|
||
import sys
|
||
import time
|
||
import tempfile
|
||
import shutil
|
||
import os
|
||
|
||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||
|
||
# ─── 测试框架 ───
|
||
PASS = 0
|
||
FAIL = 0
|
||
SKIP = 0
|
||
|
||
def test(name: str):
|
||
"""装饰器风格的测试标记"""
|
||
def decorator(fn):
|
||
global PASS, FAIL, SKIP
|
||
try:
|
||
result = fn()
|
||
if asyncio.iscoroutine(result):
|
||
result = asyncio.run(result)
|
||
if result is False:
|
||
FAIL += 1
|
||
print(f" FAIL {name}")
|
||
elif result is True:
|
||
PASS += 1
|
||
print(f" PASS {name}")
|
||
else:
|
||
PASS += 1
|
||
print(f" PASS {name} ({result})")
|
||
except Exception as e:
|
||
FAIL += 1
|
||
print(f" FAIL {name}: {e}")
|
||
return fn
|
||
return decorator
|
||
|
||
|
||
# ─── 测试用例 ───
|
||
|
||
@test("1.1 Embedding 服务 (SiliconFlow BGE-M3)")
|
||
def test_embedding_generation():
|
||
from app.services.embedding_service import embedding_service
|
||
emb = asyncio.run(embedding_service.generate_embedding("天工智能体平台记忆测试"))
|
||
assert emb and len(emb) == 1024, f"期望 1024 维,实际 {len(emb) if emb else 0}"
|
||
return f"OK dims=1024"
|
||
|
||
|
||
@test("1.2 离线关键词分词器")
|
||
def test_offline_tokenizer():
|
||
from app.services.embedding_service import embedding_service
|
||
|
||
# 中文二元组
|
||
tokens = embedding_service._tokenize("今天天气真好")
|
||
assert "今天" in tokens or "天气" in tokens, "中文二元组缺失"
|
||
assert len(tokens) > 2, f"tokens太少: {len(tokens)}"
|
||
|
||
# 混合中英文
|
||
tokens = embedding_service._tokenize("Python写代码")
|
||
assert "python" in tokens, f"英文token缺失: {tokens}"
|
||
|
||
# 数字提取
|
||
tokens = embedding_service._tokenize("IP: 101.43.95.130")
|
||
assert "101" in tokens and "130" in tokens, f"数字token缺失: {tokens}"
|
||
|
||
return f"OK tokens={len(tokens)}"
|
||
|
||
|
||
@test("1.3 离线关键词搜索")
|
||
def test_keyword_search():
|
||
from app.services.embedding_service import embedding_service
|
||
|
||
entries = [
|
||
{"id": "1", "content_text": "数据库地址是101.43.95.130", "embedding": [], "metadata": {}},
|
||
{"id": "2", "content_text": "今天天气很好适合出去玩", "embedding": [], "metadata": {}},
|
||
{"id": "3", "content_text": "Python是一门很好的编程语言", "embedding": [], "metadata": {}},
|
||
{"id": "4", "content_text": "天工平台有7个飞书机器人", "embedding": [], "metadata": {}},
|
||
]
|
||
|
||
results = embedding_service.keyword_search("Python编程", entries, top_k=2)
|
||
assert len(results) > 0, "关键词搜索无结果"
|
||
assert "Python" in results[0]["content_text"], f"首条结果不相关: {results[0]['content_text'][:50]}"
|
||
|
||
results = embedding_service.keyword_search("飞书机器人", entries, top_k=2)
|
||
assert any("飞书机器人" in r["content_text"] for r in results), "飞书搜索失败"
|
||
|
||
return f"OK 命中{len(results)}条"
|
||
|
||
|
||
@test("2.1 记忆类型推断 (P0)")
|
||
def test_memory_type_inference():
|
||
from app.agent_runtime.memory import AgentMemory
|
||
|
||
cases = [
|
||
("我喜欢用Python写代码", "好的", "user"),
|
||
("这个功能报错了,不对", "让我看看", "feedback"),
|
||
("数据库的地址是什么?", "地址是101.43.95.130", "reference"),
|
||
("这个任务的进度怎么样了?", "任务完成80%", "project"),
|
||
("帮我提交一下代码", "已提交", "project"),
|
||
("记住我不喜欢吃辣", "记住了", "user"),
|
||
]
|
||
for um, ar, expected in cases:
|
||
result = AgentMemory._infer_memory_type(um, ar)
|
||
assert result == expected, f"\"{um[:20]}\" 期望 {expected},实际 {result}"
|
||
|
||
return f"OK {len(cases)} cases"
|
||
|
||
|
||
@test("2.2 记忆类型过滤")
|
||
def test_memory_type_filter():
|
||
from app.agent_runtime.memory import AgentMemory
|
||
|
||
mem = AgentMemory(scope_id="test_filter", memory_type_filter=["user", "feedback"])
|
||
assert mem.memory_type_filter == ["user", "feedback"]
|
||
assert mem.MEMORY_TYPES == ("user", "feedback", "project", "reference")
|
||
|
||
# 无过滤
|
||
mem2 = AgentMemory(scope_id="test_nofilter")
|
||
assert mem2.memory_type_filter is None
|
||
|
||
return "OK"
|
||
|
||
|
||
@test("3.1 LLM Rerank 配置 (P2)")
|
||
def test_rerank_config():
|
||
from app.agent_runtime.memory import AgentMemory
|
||
|
||
mem = AgentMemory(scope_id="test_rerank", vector_memory_rerank=True)
|
||
assert mem.vector_memory_rerank is True
|
||
assert hasattr(mem, "_llm_rerank"), "缺少 _llm_rerank 方法"
|
||
|
||
mem2 = AgentMemory(scope_id="test_norerank")
|
||
assert mem2.vector_memory_rerank is False
|
||
|
||
return "OK"
|
||
|
||
|
||
@test("3.2 消息裁剪保留配对完整性")
|
||
def test_trim_messages():
|
||
from app.agent_runtime.memory import AgentMemory
|
||
|
||
mem = AgentMemory(scope_id="test", max_history=4)
|
||
|
||
# 构造含 tool_calls + tool_result 的消息序列
|
||
msgs = [
|
||
{"role": "system", "content": "你是助手"},
|
||
{"role": "user", "content": "查天气"},
|
||
{"role": "assistant", "content": "好的", "tool_calls": [{"name": "get_weather", "id": "1"}]},
|
||
{"role": "tool", "content": "晴天 25度", "tool_call_id": "1"},
|
||
{"role": "assistant", "content": "今天晴天25度"},
|
||
{"role": "user", "content": "谢谢"},
|
||
]
|
||
|
||
trimmed = mem.trim_messages(msgs)
|
||
# system msg 应保留
|
||
assert trimmed[0]["role"] == "system", "system消息应保留"
|
||
# 不应有孤立的 tool 消息开头
|
||
assert trimmed[1]["role"] != "tool", "裁剪后首条不应是孤立 tool 消息"
|
||
|
||
return f"OK trimmed to {len(trimmed)}"
|
||
|
||
|
||
@test("4.1 后台异步压缩结构完整 (P3)")
|
||
def test_background_compress_structure():
|
||
from app.agent_runtime.memory import AgentMemory
|
||
|
||
mem = AgentMemory(scope_id="test_bg")
|
||
assert hasattr(mem, "_background_compress_and_save"), "缺少 _background_compress_and_save"
|
||
assert hasattr(mem, "_compress_and_summarize"), "缺少 _compress_and_summarize"
|
||
assert hasattr(mem, "_save_compressed_memories"), "缺少 _save_compressed_memories (P1)"
|
||
|
||
return "OK"
|
||
|
||
|
||
@test("5.1 Auto Dream 阈值配置 (P4)")
|
||
def test_auto_dream_config():
|
||
from app.services.auto_dream_service import MERGE_SIMILARITY_THRESHOLD, _should_dream_today
|
||
|
||
assert 0.8 <= MERGE_SIMILARITY_THRESHOLD <= 0.95, "合并阈值不合理"
|
||
# 非凌晨3点不应触发
|
||
assert _should_dream_today() is False, "非凌晨3点不应触发 dream"
|
||
|
||
return f"OK threshold={MERGE_SIMILARITY_THRESHOLD}"
|
||
|
||
|
||
@test("5.2 Auto Dream 服务导入正常")
|
||
def test_auto_dream_import():
|
||
from app.services.auto_dream_service import run_auto_dream, _should_dream_today
|
||
assert callable(run_auto_dream)
|
||
assert callable(_should_dream_today)
|
||
|
||
return "OK"
|
||
|
||
|
||
@test("6.1 团队共享记忆 (P6)")
|
||
def test_team_sharing():
|
||
from app.agent_runtime.memory import AgentMemory
|
||
|
||
mem = AgentMemory(scope_id="agent_1", team_id="team_alpha", team_share_enabled=True)
|
||
assert mem.team_id == "team_alpha"
|
||
assert mem.team_share_enabled is True
|
||
|
||
mem2 = AgentMemory(scope_id="agent_2", team_id="team_alpha", team_share_enabled=False)
|
||
assert mem2.team_id == "team_alpha"
|
||
assert mem2.team_share_enabled is False
|
||
|
||
return "OK"
|
||
|
||
|
||
@test("7.1 文件式记忆存储 (P7)")
|
||
def test_file_memory_store():
|
||
from app.services.file_memory_service import FileMemoryStore
|
||
|
||
tmpdir = tempfile.mkdtemp(prefix="tmem_")
|
||
try:
|
||
store = FileMemoryStore(tmpdir)
|
||
|
||
# 保存
|
||
store.save("用户偏好", "用户喜欢用Python开发", mem_type="user")
|
||
store.save("数据库配置", "MySQL地址101.43.95.130", mem_type="reference")
|
||
store.save("项目信息", "天工平台有7个飞书机器人", mem_type="project")
|
||
|
||
# 计数
|
||
assert store.memory_count == 3, f"期望 3,实际 {store.memory_count}"
|
||
|
||
# 搜索
|
||
results = store.search("Python")
|
||
assert len(results) > 0, "Python搜索无结果"
|
||
|
||
results = store.search("飞书机器人")
|
||
assert len(results) > 0, "飞书搜索无结果"
|
||
|
||
# 按类型列出
|
||
user_items = store.list_by_type("user")
|
||
assert len(user_items) >= 1, "user类型缺失"
|
||
|
||
# 删除
|
||
store.delete("用户偏好")
|
||
assert store.memory_count == 2, f"删除后期望 2,实际 {store.memory_count}"
|
||
|
||
# MEMORY.md 存在
|
||
index_path = os.path.join(tmpdir, "MEMORY.md")
|
||
assert os.path.exists(index_path), "MEMORY.md 不存在"
|
||
|
||
return f"OK saved=3 searched=2 deleted=1"
|
||
finally:
|
||
shutil.rmtree(tmpdir, ignore_errors=True)
|
||
|
||
|
||
@test("7.2 文件记忆读取")
|
||
def test_file_memory_read():
|
||
from app.services.file_memory_service import FileMemoryStore
|
||
|
||
tmpdir = tempfile.mkdtemp(prefix="tmem_")
|
||
try:
|
||
store = FileMemoryStore(tmpdir)
|
||
store.save("测试记忆", "这是一条测试记忆内容,包含关键词Python和天工", mem_type="reference")
|
||
|
||
# 通过搜索读取
|
||
results = store.search("Python")
|
||
assert len(results) == 1
|
||
assert results[0]["source"] == "file"
|
||
assert "Python" in results[0]["content"]
|
||
|
||
return f"OK content={results[0]['content'][:30]}"
|
||
finally:
|
||
shutil.rmtree(tmpdir, ignore_errors=True)
|
||
|
||
|
||
@test("8.1 余弦相似度计算")
|
||
def test_cosine_similarity():
|
||
from app.services.embedding_service import embedding_service
|
||
|
||
# 相同向量
|
||
sim = embedding_service.cosine_similarity([1.0, 2.0, 3.0], [1.0, 2.0, 3.0])
|
||
assert abs(sim - 1.0) < 0.001, f"相同向量相似度应为1.0,实际{sim}"
|
||
|
||
# 正交向量
|
||
sim = embedding_service.cosine_similarity([1.0, 0.0], [0.0, 1.0])
|
||
assert abs(sim - 0.0) < 0.001, f"正交向量相似度应为0.0,实际{sim}"
|
||
|
||
# 维度不同
|
||
sim = embedding_service.cosine_similarity([1.0], [1.0, 2.0])
|
||
assert sim == 0.0, f"不同维度应返回0"
|
||
|
||
# 空向量
|
||
sim = embedding_service.cosine_similarity([], [1.0, 2.0])
|
||
assert sim == 0.0, "空向量应返回0"
|
||
|
||
return "OK"
|
||
|
||
|
||
@test("9.1 压缩记忆向量化可调用 (P1)")
|
||
def test_compressed_memory_vectorization():
|
||
from app.agent_runtime.memory import AgentMemory
|
||
|
||
mem = AgentMemory(scope_id="test_cmv")
|
||
assert hasattr(mem, "_save_compressed_memories")
|
||
assert callable(mem._save_compressed_memories)
|
||
|
||
return "OK"
|
||
|
||
|
||
@test("9.2 全局知识保存结构")
|
||
def test_global_knowledge_structure():
|
||
from app.agent_runtime.memory import AgentMemory
|
||
|
||
mem = AgentMemory(scope_id="test_gk")
|
||
assert hasattr(mem, "save_global_knowledge")
|
||
assert hasattr(mem, "_global_knowledge_search")
|
||
|
||
return "OK"
|
||
|
||
|
||
@test("10.1 完整记忆生命周期模拟")
|
||
def test_full_lifecycle():
|
||
"""模拟一次完整的记忆生命周期:创建 → 检索 → 保存 → 压缩 → 整合"""
|
||
from app.agent_runtime.memory import AgentMemory
|
||
|
||
mem = AgentMemory(
|
||
scope_id="test_lifecycle",
|
||
vector_memory_enabled=True,
|
||
vector_memory_top_k=3,
|
||
vector_memory_rerank=False,
|
||
memory_type_filter=None,
|
||
team_id="test_team",
|
||
team_share_enabled=True,
|
||
memory_dir_enabled=True,
|
||
memory_dir_path=tempfile.mkdtemp(prefix="tlife_"),
|
||
)
|
||
|
||
# 初始化
|
||
text = asyncio.run(mem.initialize("Python开发"))
|
||
assert isinstance(text, str), "initialize 应返回字符串"
|
||
|
||
# 保存上下文
|
||
asyncio.run(mem.save_context(
|
||
"我喜欢用Python写代码",
|
||
"Python确实是很好的选择",
|
||
))
|
||
|
||
# 消息裁剪
|
||
msgs = [
|
||
{"role": "system", "content": "你是助手"},
|
||
{"role": "user", "content": "你好"},
|
||
{"role": "assistant", "content": "你好!"},
|
||
{"role": "user", "content": "帮我写Python"},
|
||
{"role": "assistant", "content": "好的"},
|
||
{"role": "user", "content": "谢谢"},
|
||
]
|
||
trimmed = mem.trim_messages(msgs)
|
||
assert len(trimmed) <= mem.max_history + 1, "裁剪后应不超过 max_history"
|
||
|
||
# 清理文件记忆目录
|
||
mp = mem.memory_dir_path
|
||
if mp and os.path.exists(mp):
|
||
shutil.rmtree(mp, ignore_errors=True)
|
||
|
||
return "OK init+save+trim"
|
||
|
||
|
||
@test("10.2 AgentMemory 配置全量传递")
|
||
def test_full_config_wiring():
|
||
from app.agent_runtime.memory import AgentMemory
|
||
|
||
mem = AgentMemory(
|
||
scope_kind="agent",
|
||
scope_id="agent_78ba9dfb",
|
||
session_key="session_001",
|
||
persist=True,
|
||
max_history=15,
|
||
vector_memory_enabled=True,
|
||
vector_memory_top_k=8,
|
||
vector_memory_rerank=True,
|
||
memory_type_filter=["user", "project"],
|
||
team_id="team_feishu",
|
||
team_share_enabled=True,
|
||
memory_dir_enabled=True,
|
||
memory_dir_path="/tmp/tiangong_mem",
|
||
)
|
||
|
||
assert mem.scope_kind == "agent"
|
||
assert mem.scope_id == "agent_78ba9dfb"
|
||
assert mem.max_history == 15
|
||
assert mem.vector_memory_top_k == 8
|
||
assert mem.vector_memory_rerank is True
|
||
assert mem.memory_type_filter == ["user", "project"]
|
||
assert mem.team_id == "team_feishu"
|
||
assert mem.team_share_enabled is True
|
||
assert mem.memory_dir_enabled is True
|
||
assert mem.memory_dir_path == "/tmp/tiangong_mem"
|
||
|
||
return "OK all 12 params"
|
||
|
||
|
||
# ─── 运行 ───
|
||
if __name__ == "__main__":
|
||
print("=" * 60)
|
||
print("天工 Agent 记忆系统 — 全功能测试")
|
||
print("=" * 60)
|
||
print()
|
||
|
||
# 所有 @test 装饰器在 import 时自动执行
|
||
total = PASS + FAIL + SKIP
|
||
print()
|
||
print("=" * 60)
|
||
print(f"测试结果: {PASS} 通过 / {FAIL} 失败 / {SKIP} 跳过 (共 {total})")
|
||
print("=" * 60)
|
||
|
||
if FAIL > 0:
|
||
sys.exit(1)
|
||
else:
|
||
print("\n全部测试通过!")
|