- Fix 8 Feishu agent handlers to use permission_level="acceptEdits" so file_write tool works without Web UI approval popup (lingxi/renshenguo/suyao/tiantian/orange/main/schedule) - Add P5-P7 memory improvements: offline keyword fallback, team sharing, file-based memory - Add auto_dream_service for daily memory consolidation - Add 99 memory system test cases (basic 18 + advanced 43 + pytest 38) - Add platform capability assessment report and unfinished project checklist Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1068 lines
37 KiB
Python
1068 lines
37 KiB
Python
"""
|
||
天工 Agent 记忆系统 — 高级/边界测试用例
|
||
|
||
覆盖:文件记忆更新/裁剪/并发、向量搜索过滤/团队查询、
|
||
离线搜索边界、全局知识去重、Embedding 序列化边界、
|
||
Schema 验证、消息裁剪工具配对、记忆生命周期压力测试
|
||
|
||
运行:cd backend && python tests/test_memory_advanced.py
|
||
"""
|
||
import asyncio
|
||
import sys
|
||
import time
|
||
import tempfile
|
||
import shutil
|
||
import os
|
||
import json
|
||
import threading
|
||
|
||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||
|
||
# ─── 测试框架 ───
|
||
PASS = 0
|
||
FAIL = 0
|
||
SKIP = 0
|
||
|
||
def test(name: str):
|
||
def decorator(fn):
|
||
global PASS, FAIL, SKIP
|
||
try:
|
||
result = fn()
|
||
if asyncio.iscoroutine(result):
|
||
result = asyncio.run(result)
|
||
if result is False:
|
||
FAIL += 1
|
||
print(f" FAIL {name}")
|
||
elif result is True:
|
||
PASS += 1
|
||
print(f" PASS {name}")
|
||
else:
|
||
PASS += 1
|
||
print(f" PASS {name} ({result})")
|
||
except Exception as e:
|
||
FAIL += 1
|
||
import traceback
|
||
print(f" FAIL {name}: {e}")
|
||
traceback.print_exc()
|
||
return fn
|
||
return decorator
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# A. 文件式记忆高级测试 (P7)
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
@test("A1. 文件记忆重复保存(追加更新)")
|
||
def test_file_memory_append():
|
||
from app.services.file_memory_service import FileMemoryStore
|
||
|
||
tmpdir = tempfile.mkdtemp(prefix="tmem_")
|
||
try:
|
||
store = FileMemoryStore(tmpdir)
|
||
|
||
# 第一次保存
|
||
store.save("测试条目", "第一版本内容", mem_type="user")
|
||
assert store.memory_count == 1
|
||
|
||
# 同名再次保存(应追加而非创建新文件)
|
||
store.save("测试条目", "第二版本更新内容", mem_type="user")
|
||
assert store.memory_count == 1, "同名条目不应增加计数"
|
||
|
||
# 搜索能找到更新的内容
|
||
results = store.search("第二版本")
|
||
assert len(results) > 0, "应能找到更新的内容"
|
||
|
||
return f"OK count={store.memory_count}"
|
||
finally:
|
||
shutil.rmtree(tmpdir, ignore_errors=True)
|
||
|
||
|
||
@test("A2. 文件记忆索引裁剪(超过200条)")
|
||
def test_file_memory_index_trimming():
|
||
from app.services.file_memory_service import FileMemoryStore, INDEX_MAX_ENTRIES
|
||
|
||
tmpdir = tempfile.mkdtemp(prefix="tmem_")
|
||
try:
|
||
store = FileMemoryStore(tmpdir)
|
||
|
||
# 写入 205 条记忆
|
||
for i in range(205):
|
||
store.save(f"记忆条目_{i:04d}", f"这是第{i}条记忆的内容数据", mem_type="reference")
|
||
|
||
count = store.memory_count
|
||
assert count <= INDEX_MAX_ENTRIES, f"索引应被裁剪到 {INDEX_MAX_ENTRIES},实际 {count}"
|
||
assert count == INDEX_MAX_ENTRIES, f"裁剪后应为 {INDEX_MAX_ENTRIES},实际 {count}"
|
||
|
||
# 最早的那批应该被移除
|
||
results = store.search("记忆条目_0000")
|
||
# 可能被裁剪掉,但搜索功能应正常
|
||
assert store.memory_count == INDEX_MAX_ENTRIES
|
||
|
||
return f"OK trimmed to {count}"
|
||
finally:
|
||
shutil.rmtree(tmpdir, ignore_errors=True)
|
||
|
||
|
||
@test("A3. 文件记忆按类型列出")
|
||
def test_file_memory_list_by_type():
|
||
from app.services.file_memory_service import FileMemoryStore
|
||
|
||
tmpdir = tempfile.mkdtemp(prefix="tmem_")
|
||
try:
|
||
store = FileMemoryStore(tmpdir)
|
||
|
||
store.save("用户偏好A", "喜欢Python", mem_type="user")
|
||
store.save("用户偏好B", "喜欢Vue", mem_type="user")
|
||
store.save("项目信息", "天工平台开发", mem_type="project")
|
||
store.save("反馈记录", "API超时问题", mem_type="feedback")
|
||
store.save("参考文档", "数据库配置", mem_type="reference")
|
||
|
||
assert len(store.list_by_type("user")) == 2
|
||
assert len(store.list_by_type("project")) == 1
|
||
assert len(store.list_by_type("feedback")) == 1
|
||
assert len(store.list_by_type("reference")) == 1
|
||
assert len(store.list_by_type("nonexistent")) == 0
|
||
assert len(store.list_by_type("")) == 5
|
||
|
||
return f"OK 4 types verified"
|
||
finally:
|
||
shutil.rmtree(tmpdir, ignore_errors=True)
|
||
|
||
|
||
@test("A4. 文件记忆空查询返回最近条目")
|
||
def test_file_memory_empty_query():
|
||
from app.services.file_memory_service import FileMemoryStore
|
||
|
||
tmpdir = tempfile.mkdtemp(prefix="tmem_")
|
||
try:
|
||
store = FileMemoryStore(tmpdir)
|
||
store.save("条目1", "内容A", mem_type="user")
|
||
store.save("条目2", "内容B", mem_type="project")
|
||
|
||
results = store.search("")
|
||
assert len(results) >= 1, "空查询应返回最近条目"
|
||
assert all(r["source"] == "file" for r in results)
|
||
|
||
return f"OK {len(results)} recent"
|
||
finally:
|
||
shutil.rmtree(tmpdir, ignore_errors=True)
|
||
|
||
|
||
@test("A5. 文件记忆安全文件名生成")
|
||
def test_file_memory_safe_filename():
|
||
from app.services.file_memory_service import FileMemoryStore
|
||
|
||
# 特殊字符
|
||
safe = FileMemoryStore._safe_filename("用户: 偏好/设置?*")
|
||
assert ":" not in safe and "/" not in safe and "?" not in safe and "*" not in safe
|
||
|
||
# 纯英文
|
||
safe = FileMemoryStore._safe_filename("user_profile")
|
||
assert safe == "user_profile"
|
||
|
||
# 中英混合
|
||
safe = FileMemoryStore._safe_filename("用户user_偏好pref")
|
||
assert "用户" in safe and "user" in safe
|
||
|
||
# 超长截断
|
||
long_name = "A" * 100
|
||
safe = FileMemoryStore._safe_filename(long_name)
|
||
assert len(safe) <= 64
|
||
|
||
# 全部特殊字符
|
||
safe = FileMemoryStore._safe_filename("!@#$%")
|
||
assert safe == "memory" # 兜底名称
|
||
|
||
return "OK"
|
||
|
||
|
||
@test("A6. 文件记忆前端格式解析")
|
||
def test_file_memory_frontmatter_parsing():
|
||
from app.services.file_memory_service import FileMemoryStore
|
||
|
||
# 标准 frontmatter
|
||
text = """---
|
||
name: 测试记忆
|
||
description: 这是一条测试记忆
|
||
type: user
|
||
created: 2026-06-14T10:00:00+08:00
|
||
tags: [python, vue]
|
||
---
|
||
|
||
这是正文内容,包含重要信息。"""
|
||
|
||
fm = FileMemoryStore._parse_frontmatter(text)
|
||
assert fm["name"] == "测试记忆"
|
||
assert fm["type"] == "user"
|
||
assert fm["tags"] == ["python", "vue"]
|
||
assert fm["created"] == "2026-06-14T10:00:00+08:00"
|
||
|
||
# 无 frontmatter
|
||
fm = FileMemoryStore._parse_frontmatter("纯文本内容,没有frontmatter")
|
||
assert fm == {} or "description" in fm, "无frontmatter应返回空dict或仅有description"
|
||
|
||
# 格式不完整的 frontmatter
|
||
fm = FileMemoryStore._parse_frontmatter("---\n只有开始没有结束")
|
||
assert isinstance(fm, dict), "应返回dict而非报错"
|
||
|
||
return "OK"
|
||
|
||
|
||
@test("A7. 文件记忆删除不存在的条目")
|
||
def test_file_memory_delete_nonexistent():
|
||
from app.services.file_memory_service import FileMemoryStore
|
||
|
||
tmpdir = tempfile.mkdtemp(prefix="tmem_")
|
||
try:
|
||
store = FileMemoryStore(tmpdir)
|
||
result = store.delete("不存在的条目")
|
||
assert result is True # 删除不存在条目不报错
|
||
return "OK"
|
||
finally:
|
||
shutil.rmtree(tmpdir, ignore_errors=True)
|
||
|
||
|
||
@test("A8. 文件记忆并发安全写入")
|
||
def test_file_memory_concurrent_writes():
|
||
from app.services.file_memory_service import FileMemoryStore
|
||
|
||
tmpdir = tempfile.mkdtemp(prefix="tmem_")
|
||
try:
|
||
store = FileMemoryStore(tmpdir)
|
||
errors = []
|
||
|
||
def write_batch(prefix, count):
|
||
try:
|
||
for i in range(count):
|
||
store.save(f"{prefix}_{i}", f"{prefix}内容第{i}条", mem_type="user")
|
||
except Exception as e:
|
||
errors.append(str(e))
|
||
|
||
threads = []
|
||
for t in range(5):
|
||
t = threading.Thread(target=write_batch, args=(f"thread{t}", 20))
|
||
threads.append(t)
|
||
|
||
for t in threads:
|
||
t.start()
|
||
for t in threads:
|
||
t.join()
|
||
|
||
assert len(errors) == 0, f"并发写入出错: {errors}"
|
||
assert store.memory_count > 0, "应有写入成功的记忆"
|
||
|
||
return f"OK {store.memory_count} entries"
|
||
finally:
|
||
shutil.rmtree(tmpdir, ignore_errors=True)
|
||
|
||
|
||
@test("A9. 文件记忆搜索无匹配时返回空")
|
||
def test_file_memory_search_no_match():
|
||
from app.services.file_memory_service import FileMemoryStore
|
||
|
||
tmpdir = tempfile.mkdtemp(prefix="tmem_")
|
||
try:
|
||
store = FileMemoryStore(tmpdir)
|
||
store.save("数据库配置", "MySQL在101.43.95.130", mem_type="reference")
|
||
|
||
results = store.search("xyznonexistent12345")
|
||
assert len(results) == 0, "无匹配关键词应返回空"
|
||
|
||
return "OK"
|
||
finally:
|
||
shutil.rmtree(tmpdir, ignore_errors=True)
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# B. 离线关键词搜索边界 (P5)
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
@test("B1. 离线搜索空查询")
|
||
def test_offline_search_empty_query():
|
||
from app.services.embedding_service import embedding_service
|
||
|
||
entries = [
|
||
{"id": "1", "content_text": "测试内容", "embedding": [], "metadata": {}},
|
||
]
|
||
results = embedding_service.keyword_search("", entries, top_k=5)
|
||
assert len(results) <= 5
|
||
|
||
return f"OK {len(results)} entries"
|
||
|
||
|
||
@test("B2. 离线搜索空条目列表")
|
||
def test_offline_search_empty_entries():
|
||
from app.services.embedding_service import embedding_service
|
||
|
||
results = embedding_service.keyword_search("Python", [], top_k=5)
|
||
assert results == []
|
||
|
||
return "OK"
|
||
|
||
|
||
@test("B3. 离线搜索特殊字符")
|
||
def test_offline_search_special_chars():
|
||
from app.services.embedding_service import embedding_service
|
||
|
||
entries = [
|
||
{"id": "1", "content_text": "URL: http://101.43.95.130:3001/api", "embedding": [], "metadata": {}},
|
||
{"id": "2", "content_text": "正常中文内容无特殊字符", "embedding": [], "metadata": {}},
|
||
{"id": "3", "content_text": "email@example.com 联系方式", "embedding": [], "metadata": {}},
|
||
]
|
||
|
||
results = embedding_service.keyword_search("http 101.43.95.130", entries, top_k=3)
|
||
assert len(results) > 0, "特殊字符搜索应有结果"
|
||
assert "http" in results[0]["content_text"].lower(), f"应找到URL条目: {results[0]['content_text'][:50]}"
|
||
|
||
results = embedding_service.keyword_search("email example", entries, top_k=3)
|
||
assert len(results) > 0, "邮箱搜索应有结果"
|
||
|
||
return f"OK URL+email search"
|
||
|
||
|
||
@test("B4. 离线搜索长文本")
|
||
def test_offline_search_long_text():
|
||
from app.services.embedding_service import embedding_service
|
||
|
||
long_content = "天工智能体平台是一个企业级AI平台," + "支持多Agent编排、工具调用、知识库管理。" * 20
|
||
entries = [
|
||
{"id": "1", "content_text": long_content, "embedding": [], "metadata": {}},
|
||
{"id": "2", "content_text": "简短的Python记忆", "embedding": [], "metadata": {}},
|
||
]
|
||
|
||
results = embedding_service.keyword_search("Python", entries, top_k=3)
|
||
assert len(results) > 0
|
||
assert results[0]["content_text"] == "简短的Python记忆"
|
||
|
||
# 长文本Jaccard得分会低(union很大),降低min_score阈值
|
||
results = embedding_service.keyword_search("Agent编排", entries, top_k=3, min_score=0.001)
|
||
assert len(results) > 0, "降低阈值后应该能找到长文本记忆"
|
||
|
||
return f"OK long text searched"
|
||
|
||
|
||
@test("B5. 分词器中英混合精确性")
|
||
def test_tokenizer_mixed_text():
|
||
from app.services.embedding_service import embedding_service
|
||
|
||
# 纯中文
|
||
tokens = embedding_service._tokenize("你好世界")
|
||
assert "你好" in tokens and "世界" in tokens, f"中文二元组: {tokens}"
|
||
assert "你" in tokens and "好" in tokens and "世" in tokens and "界" in tokens
|
||
|
||
# 纯英文
|
||
tokens = embedding_service._tokenize("hello world python")
|
||
assert "hello" in tokens and "world" in tokens and "python" in tokens
|
||
|
||
# 数字
|
||
tokens = embedding_service._tokenize("端口8037版本3.11")
|
||
assert "8037" in tokens, f"数字token: {tokens}"
|
||
|
||
# 空字符串
|
||
tokens = embedding_service._tokenize("")
|
||
assert len(tokens) == 0
|
||
|
||
# 纯符号
|
||
tokens = embedding_service._tokenize("!@#$%^&*()")
|
||
assert len(tokens) == 0, f"纯符号应无token: {tokens}"
|
||
|
||
return f"OK all edge cases"
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# C. AgentMemory 高级测试
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
@test("C1. AgentMemory 不持久化时文件记忆仍可写入")
|
||
def test_memory_file_store_no_persist():
|
||
from app.agent_runtime.memory import AgentMemory
|
||
|
||
tmpdir = tempfile.mkdtemp(prefix="tmem_")
|
||
try:
|
||
mem = AgentMemory(
|
||
scope_id="test_no_persist",
|
||
persist=False,
|
||
memory_dir_enabled=True,
|
||
memory_dir_path=tmpdir,
|
||
)
|
||
|
||
store = mem._get_file_store()
|
||
assert store is not None, "文件存储应可用"
|
||
|
||
# 保存文件记忆
|
||
store.save("测试", "测试内容", mem_type="user")
|
||
assert store.memory_count == 1
|
||
|
||
return f"OK count={store.memory_count}"
|
||
finally:
|
||
shutil.rmtree(tmpdir, ignore_errors=True)
|
||
|
||
|
||
@test("C2. AgentMemory 初始化包含文件记忆搜索结果")
|
||
def test_memory_initialize_with_file_memory():
|
||
from app.agent_runtime.memory import AgentMemory
|
||
|
||
tmpdir = tempfile.mkdtemp(prefix="tmem_")
|
||
try:
|
||
# 预置文件记忆
|
||
from app.services.file_memory_service import FileMemoryStore
|
||
store = FileMemoryStore(tmpdir)
|
||
store.save("用户偏好", "用户喜欢用Python写后端API", mem_type="user")
|
||
store.save("数据库信息", "MySQL地址101.43.95.130端口24936", mem_type="reference")
|
||
|
||
mem = AgentMemory(
|
||
scope_id="test_init_file",
|
||
persist=False,
|
||
memory_dir_enabled=True,
|
||
memory_dir_path=tmpdir,
|
||
)
|
||
|
||
text = asyncio.run(mem.initialize("Python后端开发"))
|
||
assert isinstance(text, str)
|
||
# 检查是否包含文件记忆
|
||
if "文件记忆" in text or "Python" in text:
|
||
return f"OK file memory in context (len={len(text)})"
|
||
else:
|
||
return f"OK init returned (len={len(text)}, no file mem match for query)"
|
||
|
||
finally:
|
||
shutil.rmtree(tmpdir, ignore_errors=True)
|
||
|
||
|
||
@test("C3. AgentMemory 懒加载文件存储")
|
||
def test_memory_lazy_file_store_init():
|
||
from app.agent_runtime.memory import AgentMemory
|
||
|
||
mem = AgentMemory(scope_id="test_lazy")
|
||
assert mem._file_store is None
|
||
|
||
store = mem._get_file_store()
|
||
assert store is None, "未启用时应返回None"
|
||
|
||
mem2 = AgentMemory(scope_id="test_lazy2", memory_dir_enabled=True)
|
||
store2 = mem2._get_file_store()
|
||
assert store2 is not None, "启用后应返回实例"
|
||
|
||
return "OK"
|
||
|
||
|
||
@test("C4. Memory type filter 在向量搜索中的应用")
|
||
def test_memory_type_filter_search():
|
||
from app.agent_runtime.memory import AgentMemory
|
||
|
||
mem = AgentMemory(
|
||
scope_id="test_filter",
|
||
vector_memory_enabled=True,
|
||
memory_type_filter=["user"],
|
||
)
|
||
assert mem.memory_type_filter == ["user"]
|
||
|
||
mem2 = AgentMemory(
|
||
scope_id="test_filter2",
|
||
vector_memory_enabled=True,
|
||
memory_type_filter=["user", "project", "feedback", "reference"],
|
||
)
|
||
assert len(mem2.memory_type_filter) == 4
|
||
|
||
# None = 不过滤
|
||
mem3 = AgentMemory(scope_id="test_filter3")
|
||
assert mem3.memory_type_filter is None
|
||
|
||
return "OK"
|
||
|
||
|
||
@test("C5. 消息裁剪 - 多 tool_calls 配对")
|
||
def test_trim_messages_multi_tool_calls():
|
||
from app.agent_runtime.memory import AgentMemory
|
||
|
||
mem = AgentMemory(scope_id="test", max_history=5)
|
||
|
||
msgs = [
|
||
{"role": "system", "content": "你是助手"},
|
||
{"role": "user", "content": "查天气和股票"},
|
||
{"role": "assistant", "content": "", "tool_calls": [
|
||
{"name": "get_weather", "id": "1"},
|
||
{"name": "get_stock", "id": "2"},
|
||
]},
|
||
{"role": "tool", "content": "晴天", "tool_call_id": "1"},
|
||
{"role": "tool", "content": "涨了", "tool_call_id": "2"},
|
||
{"role": "assistant", "content": "天气晴,股票涨"},
|
||
{"role": "user", "content": "谢谢"},
|
||
{"role": "assistant", "content": "不客气"},
|
||
]
|
||
|
||
trimmed = mem.trim_messages(msgs)
|
||
assert trimmed[0]["role"] == "system"
|
||
# 不应该有独立的 tool 消息开头
|
||
for i, m in enumerate(trimmed):
|
||
if m["role"] == "tool" and i > 0 and trimmed[i-1]["role"] not in ("assistant", "tool"):
|
||
assert False, f"tool消息不能跟在{trimmed[i-1]['role']}之后 (index={i})"
|
||
|
||
return f"OK trimmed to {len(trimmed)}"
|
||
|
||
|
||
@test("C6. 消息裁剪 - 仅系统消息时")
|
||
def test_trim_messages_system_only():
|
||
from app.agent_runtime.memory import AgentMemory
|
||
|
||
mem = AgentMemory(scope_id="test", max_history=5)
|
||
msgs = [{"role": "system", "content": "你是助手"}]
|
||
trimmed = mem.trim_messages(msgs)
|
||
assert len(trimmed) == 1
|
||
assert trimmed[0]["role"] == "system"
|
||
|
||
return "OK"
|
||
|
||
|
||
@test("C7. 消息裁剪 - 大量 tool 消息连续性")
|
||
def test_trim_messages_tool_chain():
|
||
from app.agent_runtime.memory import AgentMemory
|
||
|
||
mem = AgentMemory(scope_id="test", max_history=3)
|
||
|
||
msgs = [
|
||
{"role": "system", "content": "你是助手"},
|
||
{"role": "user", "content": "Q1"},
|
||
{"role": "assistant", "content": "", "tool_calls": [{"id": "t1", "name": "f1", "type": "function", "function": {"name": "f1", "arguments": "{}"}}]},
|
||
{"role": "tool", "content": "R1", "tool_call_id": "t1"},
|
||
{"role": "user", "content": "Q2"},
|
||
{"role": "assistant", "content": "", "tool_calls": [{"id": "t2", "name": "f2", "type": "function", "function": {"name": "f2", "arguments": "{}"}}]},
|
||
{"role": "tool", "content": "R2", "tool_call_id": "t2"},
|
||
]
|
||
|
||
trimmed = mem.trim_messages(msgs)
|
||
assert trimmed[0]["role"] == "system"
|
||
# 检查没有孤立的 tool 消息
|
||
for i, m in enumerate(trimmed):
|
||
if m["role"] == "tool" and i == 1:
|
||
# 如果 tool 排在第一位(system之后),说明它的 assistant 被裁剪了
|
||
assert False, f"孤立tool消息: index={i}"
|
||
|
||
return f"OK trimmed to {len(trimmed)}"
|
||
|
||
|
||
@test("C8. 保存上下文时 fire-and-forget 异步")
|
||
def test_save_context_fire_and_forget():
|
||
from app.agent_runtime.memory import AgentMemory
|
||
|
||
mem = AgentMemory(scope_id="test_ff", persist=False)
|
||
|
||
# 模拟多轮对话触发压缩
|
||
messages = []
|
||
for i in range(5):
|
||
messages.append({"role": "user", "content": f"消息{i}"})
|
||
messages.append({"role": "assistant", "content": f"回复{i}"})
|
||
|
||
# save_context 不应阻塞
|
||
start = time.time()
|
||
asyncio.run(mem.save_context(
|
||
"新用户消息",
|
||
"新助手回复",
|
||
messages=messages,
|
||
))
|
||
elapsed = time.time() - start
|
||
|
||
# 不应超过 5 秒(异步不阻塞)
|
||
assert elapsed < 5.0, f"save_context 耗时 {elapsed:.2f}s,疑似阻塞"
|
||
|
||
return f"OK elapsed={elapsed:.3f}s"
|
||
|
||
|
||
@test("C9. 全局知识去重逻辑")
|
||
def test_global_knowledge_dedup():
|
||
from app.agent_runtime.memory import AgentMemory
|
||
|
||
mem = AgentMemory(scope_id="test_dedup")
|
||
assert hasattr(mem, "save_global_knowledge")
|
||
assert callable(mem.save_global_knowledge)
|
||
|
||
# 测试短内容不保存
|
||
# (无 DB 连接时仅验证方法签名的正确性)
|
||
return "OK"
|
||
|
||
|
||
@test("C10. 记忆压缩摘要向量化方法可用")
|
||
def test_compressed_memory_save_method():
|
||
from app.agent_runtime.memory import AgentMemory
|
||
|
||
mem = AgentMemory(scope_id="test_compress")
|
||
assert hasattr(mem, "_save_compressed_memories")
|
||
|
||
# 验证方法签名正确性(通过 inspect)
|
||
import inspect
|
||
sig = inspect.signature(mem._save_compressed_memories)
|
||
params = list(sig.parameters.keys())
|
||
assert "summary" in params
|
||
assert "facts" in params
|
||
assert "topics" in params
|
||
|
||
return "OK"
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# D. Embedding 服务高级测试
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
@test("D1. 大向量序列化往返")
|
||
def test_embedding_large_serialization():
|
||
from app.services.embedding_service import embedding_service
|
||
|
||
# 模拟 1536 维向量
|
||
large_emb = [0.1] * 1536
|
||
serialized = embedding_service.serialize_embedding(large_emb)
|
||
deserialized = embedding_service.deserialize_embedding(serialized)
|
||
assert len(deserialized) == 1536
|
||
assert deserialized[0] == 0.1
|
||
assert deserialized[-1] == 0.1
|
||
|
||
return f"OK dims={len(deserialized)}"
|
||
|
||
|
||
@test("D2. 空列表序列化")
|
||
def test_embedding_empty_serialization():
|
||
from app.services.embedding_service import embedding_service
|
||
|
||
serialized = embedding_service.serialize_embedding([])
|
||
deserialized = embedding_service.deserialize_embedding(serialized)
|
||
assert deserialized == []
|
||
|
||
return "OK"
|
||
|
||
|
||
@test("D3. 相似度搜索 min_score 边界")
|
||
def test_similarity_search_min_score_edge():
|
||
from app.services.embedding_service import embedding_service
|
||
import asyncio
|
||
|
||
entries = [
|
||
{"id": "1", "content_text": "close", "embedding": [1.0, 0.0]},
|
||
{"id": "2", "content_text": "medium", "embedding": [0.5, 0.5]},
|
||
{"id": "3", "content_text": "far", "embedding": [0.0, 1.0]},
|
||
]
|
||
|
||
query_emb = [1.0, 0.0]
|
||
|
||
# min_score=0.9 仅保留非常相似的
|
||
results = asyncio.run(embedding_service.similarity_search(
|
||
query_emb, entries, top_k=5, min_score=0.9
|
||
))
|
||
assert len(results) == 1, f"高阈值应只有1条,实际{len(results)}"
|
||
assert results[0]["content_text"] == "close"
|
||
|
||
# min_score=0.0 保留所有
|
||
results = asyncio.run(embedding_service.similarity_search(
|
||
query_emb, entries, top_k=5, min_score=0.0
|
||
))
|
||
assert len(results) == 3, f"零阈值应保留全部,实际{len(results)}"
|
||
|
||
return "OK"
|
||
|
||
|
||
@test("D4. 相似度搜索 top_k 截断")
|
||
def test_similarity_search_topk_truncation():
|
||
from app.services.embedding_service import embedding_service
|
||
import asyncio
|
||
|
||
# 使用不同方向的向量确保相似度可区分
|
||
entries = [
|
||
{"id": str(i), "content_text": f"entry_{i}", "embedding": [float(i), float(20-i)]}
|
||
for i in range(20)
|
||
]
|
||
query_emb = [1.0, 0.0] # 与X轴对齐,so i越大越相似
|
||
|
||
results = asyncio.run(embedding_service.similarity_search(
|
||
query_emb, entries, top_k=3, min_score=0.0
|
||
))
|
||
assert len(results) == 3
|
||
# 最相似的几条应该有最大的 id(因为 [i, 20-i] 与 [1,0] 的余弦 = i/sqrt(i^2+(20-i)^2),i越大值越大)
|
||
ids = [int(r["id"]) for r in results]
|
||
assert ids == sorted(ids, reverse=True), f"应按相似度降序: {ids}"
|
||
|
||
return "OK"
|
||
|
||
|
||
@test("D5. Embedding 服务离线可用性")
|
||
def test_embedding_offline_available():
|
||
from app.services.embedding_service import embedding_service
|
||
|
||
assert embedding_service.offline_available is True, "离线关键词搜索应始终可用"
|
||
|
||
return "OK"
|
||
|
||
|
||
@test("D6. 相似度搜索跳过无 embedding 的条目")
|
||
def test_similarity_search_skip_empty_emb():
|
||
from app.services.embedding_service import embedding_service
|
||
import asyncio
|
||
|
||
entries = [
|
||
{"id": "1", "content_text": "has emb", "embedding": [1.0, 0.0]},
|
||
{"id": "2", "content_text": "no emb", "embedding": []},
|
||
{"id": "3", "content_text": "also no emb", "embedding": []},
|
||
]
|
||
query_emb = [1.0, 0.0]
|
||
results = asyncio.run(embedding_service.similarity_search(
|
||
query_emb, entries, top_k=5, min_score=0.0
|
||
))
|
||
assert len(results) == 1
|
||
assert results[0]["content_text"] == "has emb"
|
||
|
||
return "OK"
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# E. Schema 验证测试
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
@test("E1. AgentMemoryConfig 所有字段默认值")
|
||
def test_schema_memory_config_defaults():
|
||
from app.agent_runtime.schemas import AgentMemoryConfig
|
||
|
||
cfg = AgentMemoryConfig()
|
||
assert cfg.enabled is True
|
||
assert cfg.max_history_messages == 20
|
||
assert cfg.persist_to_db is True
|
||
assert cfg.vector_memory_enabled is True
|
||
assert cfg.vector_memory_top_k == 5
|
||
assert cfg.vector_memory_rerank is False
|
||
assert cfg.memory_type_filter is None
|
||
assert cfg.team_id is None
|
||
assert cfg.team_share_enabled is False
|
||
assert cfg.learning_enabled is True
|
||
assert cfg.memory_dir_enabled is False
|
||
assert cfg.memory_dir_path == ""
|
||
|
||
return "OK"
|
||
|
||
|
||
@test("E2. AgentConfig 嵌套创建")
|
||
def test_schema_agent_config_nested():
|
||
from app.agent_runtime.schemas import (
|
||
AgentConfig, AgentLLMConfig, AgentMemoryConfig,
|
||
AgentToolConfig, AgentBudgetConfig,
|
||
)
|
||
|
||
config = AgentConfig(
|
||
name="测试Agent",
|
||
system_prompt="自定义提示词",
|
||
llm=AgentLLMConfig(model="deepseek-v4-flash"),
|
||
memory=AgentMemoryConfig(
|
||
vector_memory_top_k=10,
|
||
team_id="team_test",
|
||
memory_dir_enabled=True,
|
||
),
|
||
tools=AgentToolConfig(include_tools=["read_file"]),
|
||
budget=AgentBudgetConfig(max_tool_calls=100),
|
||
)
|
||
|
||
assert config.name == "测试Agent"
|
||
assert config.llm.model == "deepseek-v4-flash"
|
||
assert config.memory.vector_memory_top_k == 10
|
||
assert config.memory.team_id == "team_test"
|
||
assert config.memory.memory_dir_enabled is True
|
||
assert config.tools.include_tools == ["read_file"]
|
||
assert config.budget.max_tool_calls == 100
|
||
|
||
return "OK"
|
||
|
||
|
||
@test("E3. AgentToolConfig None 值强制转换")
|
||
def test_schema_tool_config_none_coercion():
|
||
from app.agent_runtime.schemas import AgentToolConfig
|
||
|
||
cfg = AgentToolConfig(
|
||
include_tools=None,
|
||
exclude_tools=None,
|
||
require_approval=None,
|
||
)
|
||
assert cfg.include_tools == []
|
||
assert cfg.exclude_tools == []
|
||
assert cfg.require_approval == []
|
||
|
||
return "OK"
|
||
|
||
|
||
@test("E4. AgentResult 字段验证")
|
||
def test_schema_agent_result():
|
||
from app.agent_runtime.schemas import AgentResult, AgentStep
|
||
|
||
result = AgentResult(
|
||
success=False,
|
||
content="出错了",
|
||
error="连接超时",
|
||
iterations_used=5,
|
||
tool_calls_made=10,
|
||
truncated=True,
|
||
steps=[
|
||
AgentStep(iteration=1, type="think", content="思考中"),
|
||
AgentStep(iteration=2, type="final", content="最终结果"),
|
||
],
|
||
)
|
||
assert result.success is False
|
||
assert result.error == "连接超时"
|
||
assert result.truncated is True
|
||
assert len(result.steps) == 2
|
||
assert result.steps[0].type == "think"
|
||
assert result.steps[1].type == "final"
|
||
|
||
return "OK"
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# F. 压力测试
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
@test("F1. 批量文件记忆写入性能")
|
||
def test_file_memory_bulk_write():
|
||
from app.services.file_memory_service import FileMemoryStore
|
||
|
||
tmpdir = tempfile.mkdtemp(prefix="tmem_")
|
||
try:
|
||
store = FileMemoryStore(tmpdir)
|
||
start = time.time()
|
||
|
||
for i in range(100):
|
||
store.save(
|
||
f"记忆_{i:04d}",
|
||
f"这是第{i}条记忆的内容,包含关键词Python、天工、Agent等",
|
||
mem_type="user" if i % 2 == 0 else "reference",
|
||
)
|
||
|
||
elapsed = time.time() - start
|
||
assert store.memory_count >= 90, f"应有至少90条,实际{store.memory_count}"
|
||
|
||
# 100 条写入应在 5 秒内完成
|
||
assert elapsed < 5.0, f"100条写入耗时{elapsed:.2f}s,太慢"
|
||
|
||
return f"OK {store.memory_count} entries in {elapsed:.2f}s"
|
||
finally:
|
||
shutil.rmtree(tmpdir, ignore_errors=True)
|
||
|
||
|
||
@test("F2. 批量搜索性能")
|
||
def test_file_memory_bulk_search():
|
||
from app.services.file_memory_service import FileMemoryStore
|
||
|
||
tmpdir = tempfile.mkdtemp(prefix="tmem_")
|
||
try:
|
||
store = FileMemoryStore(tmpdir)
|
||
|
||
# 写入50条记忆
|
||
for i in range(50):
|
||
store.save(
|
||
f"记忆_{i:04d}",
|
||
f"内容_{i}: " + f"主题{'Python' if i%3==0 else '天工' if i%3==1 else 'Agent'}相关记忆" * 5,
|
||
mem_type="reference",
|
||
)
|
||
|
||
# 连续搜索
|
||
start = time.time()
|
||
for _ in range(20):
|
||
store.search("Python")
|
||
store.search("天工")
|
||
store.search("Agent")
|
||
elapsed = time.time() - start
|
||
|
||
# 60 次搜索应在 2 秒内
|
||
assert elapsed < 2.0, f"60次搜索耗时{elapsed:.2f}s"
|
||
|
||
return f"OK 60 searches in {elapsed:.3f}s"
|
||
finally:
|
||
shutil.rmtree(tmpdir, ignore_errors=True)
|
||
|
||
|
||
@test("F3. AgentMemory 全配置场景构造")
|
||
def test_full_config_scenario():
|
||
from app.agent_runtime.memory import AgentMemory
|
||
|
||
tmpdir = tempfile.mkdtemp(prefix="tmem_")
|
||
try:
|
||
mem = AgentMemory(
|
||
scope_kind="agent",
|
||
scope_id="prod_agent_001",
|
||
session_key="user_session_abc",
|
||
persist=False,
|
||
max_history=30,
|
||
vector_memory_enabled=True,
|
||
vector_memory_top_k=10,
|
||
vector_memory_rerank=True,
|
||
memory_type_filter=["user", "project", "reference"],
|
||
team_id="production_team",
|
||
team_share_enabled=True,
|
||
memory_dir_enabled=True,
|
||
memory_dir_path=tmpdir,
|
||
)
|
||
|
||
# 验证所有配置
|
||
assert mem.scope_kind == "agent"
|
||
assert mem.scope_id == "prod_agent_001"
|
||
assert mem.max_history == 30
|
||
assert mem.vector_memory_top_k == 10
|
||
assert mem.vector_memory_rerank is True
|
||
assert len(mem.memory_type_filter) == 3
|
||
assert mem.team_id == "production_team"
|
||
assert mem.team_share_enabled is True
|
||
assert mem.memory_dir_enabled is True
|
||
|
||
# 文件存储应可用
|
||
store = mem._get_file_store()
|
||
assert store is not None
|
||
|
||
# 验证 4 种记忆类型
|
||
assert mem.MEMORY_TYPES == ("user", "feedback", "project", "reference")
|
||
|
||
return f"OK all params verified"
|
||
finally:
|
||
shutil.rmtree(tmpdir, ignore_errors=True)
|
||
|
||
|
||
@test("F4. 记忆类型推断覆盖所有类型")
|
||
def test_memory_type_inference_all_types():
|
||
from app.agent_runtime.memory import AgentMemory
|
||
|
||
cases = [
|
||
# (user_message, assistant_reply, expected_type)
|
||
# user 类型
|
||
("我喜欢Python", "好的", "user"),
|
||
("记住我不喜欢辣", "记住了", "user"),
|
||
("我的名字是小明", "你好小明", "user"),
|
||
|
||
# feedback 类型
|
||
("这个报错了", "让我检查", "feedback"),
|
||
("不对,应该是8080端口", "修正了", "feedback"),
|
||
("不要这样写代码", "好的我改", "feedback"),
|
||
("能不能换个方式", "可以", "feedback"),
|
||
|
||
# reference 类型
|
||
("数据库地址是什么", "101.43.95.130", "reference"),
|
||
("API的URL是什么", "http://api.test.com", "reference"),
|
||
("密码是多少", "123456", "reference"),
|
||
|
||
# project 类型
|
||
("这个任务进度怎么样", "完成了80%", "project"),
|
||
("需求文档写好了吗", "写好了", "project"),
|
||
("新功能测试通过了吗", "通过了", "project"),
|
||
("项目上线时间定了吗", "下周一", "project"),
|
||
]
|
||
|
||
for um, ar, expected in cases:
|
||
result = AgentMemory._infer_memory_type(um, ar)
|
||
assert result == expected, f"'{um[:30]}' 期望 {expected} 实际 {result}"
|
||
|
||
return f"OK {len(cases)} cases"
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# G. Auto Dream 高级测试
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
@test("G1. Auto Dream 每日触发检查")
|
||
def test_auto_dream_daily_check():
|
||
from app.services.auto_dream_service import _should_dream_today
|
||
from datetime import datetime, timedelta, timezone
|
||
|
||
# 当前时间如果不是凌晨3点,应该返回False
|
||
now = datetime.now(timezone.utc).astimezone(timezone(timedelta(hours=8)))
|
||
result = _should_dream_today()
|
||
if now.hour == 3:
|
||
# 如果恰好是3点,可能会触发(但已执行过就不会)
|
||
pass
|
||
else:
|
||
assert result is False, f"非凌晨3点不应触发 (当前{now.hour}点)"
|
||
|
||
return f"OK (hour={now.hour})"
|
||
|
||
|
||
@test("G2. Auto Dream 整合函数可调用")
|
||
def test_auto_dream_runnable():
|
||
from app.services.auto_dream_service import run_auto_dream, _do_consolidate
|
||
|
||
assert callable(run_auto_dream)
|
||
assert callable(_do_consolidate)
|
||
|
||
# 不应报错 (带锁保护)
|
||
return "OK"
|
||
|
||
|
||
@test("G3. Auto Dream 合并阈值边界检查")
|
||
def test_auto_dream_threshold_validation():
|
||
from app.services.auto_dream_service import MERGE_SIMILARITY_THRESHOLD
|
||
|
||
assert isinstance(MERGE_SIMILARITY_THRESHOLD, float)
|
||
assert 0.0 < MERGE_SIMILARITY_THRESHOLD <= 1.0
|
||
# 85% 是合理的合并阈值
|
||
assert 0.80 <= MERGE_SIMILARITY_THRESHOLD <= 0.95
|
||
|
||
return f"OK threshold={MERGE_SIMILARITY_THRESHOLD}"
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# H. 团队共享记忆边界测试 (P6)
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
@test("H1. 团队共享配置完整性")
|
||
def test_team_sharing_config_full():
|
||
from app.agent_runtime.memory import AgentMemory
|
||
|
||
# 启用团队共享
|
||
mem = AgentMemory(
|
||
scope_id="agent_a",
|
||
team_id="team_1",
|
||
team_share_enabled=True,
|
||
)
|
||
assert mem.team_id == "team_1"
|
||
assert mem.team_share_enabled is True
|
||
|
||
# 有team_id但不启用共享
|
||
mem2 = AgentMemory(
|
||
scope_id="agent_b",
|
||
team_id="team_1",
|
||
team_share_enabled=False,
|
||
)
|
||
assert mem2.team_id == "team_1"
|
||
assert mem2.team_share_enabled is False
|
||
|
||
# 无团队
|
||
mem3 = AgentMemory(scope_id="agent_c")
|
||
assert mem3.team_id is None
|
||
assert mem3.team_share_enabled is False
|
||
|
||
return "OK"
|
||
|
||
|
||
@test("H2. 团队共享 schema 默认值")
|
||
def test_team_sharing_schema_defaults():
|
||
from app.agent_runtime.schemas import AgentMemoryConfig
|
||
|
||
cfg = AgentMemoryConfig()
|
||
assert cfg.team_id is None
|
||
assert cfg.team_share_enabled is False
|
||
|
||
cfg2 = AgentMemoryConfig(team_id="my_team", team_share_enabled=True)
|
||
assert cfg2.team_id == "my_team"
|
||
assert cfg2.team_share_enabled is True
|
||
|
||
return "OK"
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# 运行
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
if __name__ == "__main__":
|
||
print("=" * 60)
|
||
print("天工 Agent 记忆系统 — 高级/边界测试")
|
||
print("=" * 60)
|
||
print()
|
||
|
||
total = PASS + FAIL + SKIP
|
||
print()
|
||
print("=" * 60)
|
||
print(f"测试结果: {PASS} 通过 / {FAIL} 失败 / {SKIP} 跳过 (共 {total})")
|
||
print("=" * 60)
|
||
|
||
if FAIL > 0:
|
||
sys.exit(1)
|
||
else:
|
||
print("\n全部高级测试通过!")
|