Files
aiagent/backend/tests/test_memory_advanced.py
renjianbo 7f4aeb021b fix: Feishu channel agents file_write permission blocked + memory system tests & docs
- Fix 8 Feishu agent handlers to use permission_level="acceptEdits" so file_write
  tool works without Web UI approval popup (lingxi/renshenguo/suyao/tiantian/orange/main/schedule)
- Add P5-P7 memory improvements: offline keyword fallback, team sharing, file-based memory
- Add auto_dream_service for daily memory consolidation
- Add 99 memory system test cases (basic 18 + advanced 43 + pytest 38)
- Add platform capability assessment report and unfinished project checklist

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-06-14 20:35:12 +08:00

1068 lines
37 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
天工 Agent 记忆系统 — 高级/边界测试用例
覆盖:文件记忆更新/裁剪/并发、向量搜索过滤/团队查询、
离线搜索边界、全局知识去重、Embedding 序列化边界、
Schema 验证、消息裁剪工具配对、记忆生命周期压力测试
运行cd backend && python tests/test_memory_advanced.py
"""
import asyncio
import sys
import time
import tempfile
import shutil
import os
import json
import threading
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# ─── 测试框架 ───
PASS = 0
FAIL = 0
SKIP = 0
def test(name: str):
def decorator(fn):
global PASS, FAIL, SKIP
try:
result = fn()
if asyncio.iscoroutine(result):
result = asyncio.run(result)
if result is False:
FAIL += 1
print(f" FAIL {name}")
elif result is True:
PASS += 1
print(f" PASS {name}")
else:
PASS += 1
print(f" PASS {name} ({result})")
except Exception as e:
FAIL += 1
import traceback
print(f" FAIL {name}: {e}")
traceback.print_exc()
return fn
return decorator
# ═══════════════════════════════════════════════════════════════
# A. 文件式记忆高级测试 (P7)
# ═══════════════════════════════════════════════════════════════
@test("A1. 文件记忆重复保存(追加更新)")
def test_file_memory_append():
from app.services.file_memory_service import FileMemoryStore
tmpdir = tempfile.mkdtemp(prefix="tmem_")
try:
store = FileMemoryStore(tmpdir)
# 第一次保存
store.save("测试条目", "第一版本内容", mem_type="user")
assert store.memory_count == 1
# 同名再次保存(应追加而非创建新文件)
store.save("测试条目", "第二版本更新内容", mem_type="user")
assert store.memory_count == 1, "同名条目不应增加计数"
# 搜索能找到更新的内容
results = store.search("第二版本")
assert len(results) > 0, "应能找到更新的内容"
return f"OK count={store.memory_count}"
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
@test("A2. 文件记忆索引裁剪超过200条")
def test_file_memory_index_trimming():
from app.services.file_memory_service import FileMemoryStore, INDEX_MAX_ENTRIES
tmpdir = tempfile.mkdtemp(prefix="tmem_")
try:
store = FileMemoryStore(tmpdir)
# 写入 205 条记忆
for i in range(205):
store.save(f"记忆条目_{i:04d}", f"这是第{i}条记忆的内容数据", mem_type="reference")
count = store.memory_count
assert count <= INDEX_MAX_ENTRIES, f"索引应被裁剪到 {INDEX_MAX_ENTRIES},实际 {count}"
assert count == INDEX_MAX_ENTRIES, f"裁剪后应为 {INDEX_MAX_ENTRIES},实际 {count}"
# 最早的那批应该被移除
results = store.search("记忆条目_0000")
# 可能被裁剪掉,但搜索功能应正常
assert store.memory_count == INDEX_MAX_ENTRIES
return f"OK trimmed to {count}"
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
@test("A3. 文件记忆按类型列出")
def test_file_memory_list_by_type():
from app.services.file_memory_service import FileMemoryStore
tmpdir = tempfile.mkdtemp(prefix="tmem_")
try:
store = FileMemoryStore(tmpdir)
store.save("用户偏好A", "喜欢Python", mem_type="user")
store.save("用户偏好B", "喜欢Vue", mem_type="user")
store.save("项目信息", "天工平台开发", mem_type="project")
store.save("反馈记录", "API超时问题", mem_type="feedback")
store.save("参考文档", "数据库配置", mem_type="reference")
assert len(store.list_by_type("user")) == 2
assert len(store.list_by_type("project")) == 1
assert len(store.list_by_type("feedback")) == 1
assert len(store.list_by_type("reference")) == 1
assert len(store.list_by_type("nonexistent")) == 0
assert len(store.list_by_type("")) == 5
return f"OK 4 types verified"
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
@test("A4. 文件记忆空查询返回最近条目")
def test_file_memory_empty_query():
from app.services.file_memory_service import FileMemoryStore
tmpdir = tempfile.mkdtemp(prefix="tmem_")
try:
store = FileMemoryStore(tmpdir)
store.save("条目1", "内容A", mem_type="user")
store.save("条目2", "内容B", mem_type="project")
results = store.search("")
assert len(results) >= 1, "空查询应返回最近条目"
assert all(r["source"] == "file" for r in results)
return f"OK {len(results)} recent"
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
@test("A5. 文件记忆安全文件名生成")
def test_file_memory_safe_filename():
from app.services.file_memory_service import FileMemoryStore
# 特殊字符
safe = FileMemoryStore._safe_filename("用户: 偏好/设置?*")
assert ":" not in safe and "/" not in safe and "?" not in safe and "*" not in safe
# 纯英文
safe = FileMemoryStore._safe_filename("user_profile")
assert safe == "user_profile"
# 中英混合
safe = FileMemoryStore._safe_filename("用户user_偏好pref")
assert "用户" in safe and "user" in safe
# 超长截断
long_name = "A" * 100
safe = FileMemoryStore._safe_filename(long_name)
assert len(safe) <= 64
# 全部特殊字符
safe = FileMemoryStore._safe_filename("!@#$%")
assert safe == "memory" # 兜底名称
return "OK"
@test("A6. 文件记忆前端格式解析")
def test_file_memory_frontmatter_parsing():
from app.services.file_memory_service import FileMemoryStore
# 标准 frontmatter
text = """---
name: 测试记忆
description: 这是一条测试记忆
type: user
created: 2026-06-14T10:00:00+08:00
tags: [python, vue]
---
这是正文内容,包含重要信息。"""
fm = FileMemoryStore._parse_frontmatter(text)
assert fm["name"] == "测试记忆"
assert fm["type"] == "user"
assert fm["tags"] == ["python", "vue"]
assert fm["created"] == "2026-06-14T10:00:00+08:00"
# 无 frontmatter
fm = FileMemoryStore._parse_frontmatter("纯文本内容没有frontmatter")
assert fm == {} or "description" in fm, "无frontmatter应返回空dict或仅有description"
# 格式不完整的 frontmatter
fm = FileMemoryStore._parse_frontmatter("---\n只有开始没有结束")
assert isinstance(fm, dict), "应返回dict而非报错"
return "OK"
@test("A7. 文件记忆删除不存在的条目")
def test_file_memory_delete_nonexistent():
from app.services.file_memory_service import FileMemoryStore
tmpdir = tempfile.mkdtemp(prefix="tmem_")
try:
store = FileMemoryStore(tmpdir)
result = store.delete("不存在的条目")
assert result is True # 删除不存在条目不报错
return "OK"
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
@test("A8. 文件记忆并发安全写入")
def test_file_memory_concurrent_writes():
from app.services.file_memory_service import FileMemoryStore
tmpdir = tempfile.mkdtemp(prefix="tmem_")
try:
store = FileMemoryStore(tmpdir)
errors = []
def write_batch(prefix, count):
try:
for i in range(count):
store.save(f"{prefix}_{i}", f"{prefix}内容第{i}", mem_type="user")
except Exception as e:
errors.append(str(e))
threads = []
for t in range(5):
t = threading.Thread(target=write_batch, args=(f"thread{t}", 20))
threads.append(t)
for t in threads:
t.start()
for t in threads:
t.join()
assert len(errors) == 0, f"并发写入出错: {errors}"
assert store.memory_count > 0, "应有写入成功的记忆"
return f"OK {store.memory_count} entries"
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
@test("A9. 文件记忆搜索无匹配时返回空")
def test_file_memory_search_no_match():
from app.services.file_memory_service import FileMemoryStore
tmpdir = tempfile.mkdtemp(prefix="tmem_")
try:
store = FileMemoryStore(tmpdir)
store.save("数据库配置", "MySQL在101.43.95.130", mem_type="reference")
results = store.search("xyznonexistent12345")
assert len(results) == 0, "无匹配关键词应返回空"
return "OK"
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
# ═══════════════════════════════════════════════════════════════
# B. 离线关键词搜索边界 (P5)
# ═══════════════════════════════════════════════════════════════
@test("B1. 离线搜索空查询")
def test_offline_search_empty_query():
from app.services.embedding_service import embedding_service
entries = [
{"id": "1", "content_text": "测试内容", "embedding": [], "metadata": {}},
]
results = embedding_service.keyword_search("", entries, top_k=5)
assert len(results) <= 5
return f"OK {len(results)} entries"
@test("B2. 离线搜索空条目列表")
def test_offline_search_empty_entries():
from app.services.embedding_service import embedding_service
results = embedding_service.keyword_search("Python", [], top_k=5)
assert results == []
return "OK"
@test("B3. 离线搜索特殊字符")
def test_offline_search_special_chars():
from app.services.embedding_service import embedding_service
entries = [
{"id": "1", "content_text": "URL: http://101.43.95.130:3001/api", "embedding": [], "metadata": {}},
{"id": "2", "content_text": "正常中文内容无特殊字符", "embedding": [], "metadata": {}},
{"id": "3", "content_text": "email@example.com 联系方式", "embedding": [], "metadata": {}},
]
results = embedding_service.keyword_search("http 101.43.95.130", entries, top_k=3)
assert len(results) > 0, "特殊字符搜索应有结果"
assert "http" in results[0]["content_text"].lower(), f"应找到URL条目: {results[0]['content_text'][:50]}"
results = embedding_service.keyword_search("email example", entries, top_k=3)
assert len(results) > 0, "邮箱搜索应有结果"
return f"OK URL+email search"
@test("B4. 离线搜索长文本")
def test_offline_search_long_text():
from app.services.embedding_service import embedding_service
long_content = "天工智能体平台是一个企业级AI平台" + "支持多Agent编排、工具调用、知识库管理。" * 20
entries = [
{"id": "1", "content_text": long_content, "embedding": [], "metadata": {}},
{"id": "2", "content_text": "简短的Python记忆", "embedding": [], "metadata": {}},
]
results = embedding_service.keyword_search("Python", entries, top_k=3)
assert len(results) > 0
assert results[0]["content_text"] == "简短的Python记忆"
# 长文本Jaccard得分会低union很大降低min_score阈值
results = embedding_service.keyword_search("Agent编排", entries, top_k=3, min_score=0.001)
assert len(results) > 0, "降低阈值后应该能找到长文本记忆"
return f"OK long text searched"
@test("B5. 分词器中英混合精确性")
def test_tokenizer_mixed_text():
from app.services.embedding_service import embedding_service
# 纯中文
tokens = embedding_service._tokenize("你好世界")
assert "你好" in tokens and "世界" in tokens, f"中文二元组: {tokens}"
assert "" in tokens and "" in tokens and "" in tokens and "" in tokens
# 纯英文
tokens = embedding_service._tokenize("hello world python")
assert "hello" in tokens and "world" in tokens and "python" in tokens
# 数字
tokens = embedding_service._tokenize("端口8037版本3.11")
assert "8037" in tokens, f"数字token: {tokens}"
# 空字符串
tokens = embedding_service._tokenize("")
assert len(tokens) == 0
# 纯符号
tokens = embedding_service._tokenize("!@#$%^&*()")
assert len(tokens) == 0, f"纯符号应无token: {tokens}"
return f"OK all edge cases"
# ═══════════════════════════════════════════════════════════════
# C. AgentMemory 高级测试
# ═══════════════════════════════════════════════════════════════
@test("C1. AgentMemory 不持久化时文件记忆仍可写入")
def test_memory_file_store_no_persist():
from app.agent_runtime.memory import AgentMemory
tmpdir = tempfile.mkdtemp(prefix="tmem_")
try:
mem = AgentMemory(
scope_id="test_no_persist",
persist=False,
memory_dir_enabled=True,
memory_dir_path=tmpdir,
)
store = mem._get_file_store()
assert store is not None, "文件存储应可用"
# 保存文件记忆
store.save("测试", "测试内容", mem_type="user")
assert store.memory_count == 1
return f"OK count={store.memory_count}"
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
@test("C2. AgentMemory 初始化包含文件记忆搜索结果")
def test_memory_initialize_with_file_memory():
from app.agent_runtime.memory import AgentMemory
tmpdir = tempfile.mkdtemp(prefix="tmem_")
try:
# 预置文件记忆
from app.services.file_memory_service import FileMemoryStore
store = FileMemoryStore(tmpdir)
store.save("用户偏好", "用户喜欢用Python写后端API", mem_type="user")
store.save("数据库信息", "MySQL地址101.43.95.130端口24936", mem_type="reference")
mem = AgentMemory(
scope_id="test_init_file",
persist=False,
memory_dir_enabled=True,
memory_dir_path=tmpdir,
)
text = asyncio.run(mem.initialize("Python后端开发"))
assert isinstance(text, str)
# 检查是否包含文件记忆
if "文件记忆" in text or "Python" in text:
return f"OK file memory in context (len={len(text)})"
else:
return f"OK init returned (len={len(text)}, no file mem match for query)"
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
@test("C3. AgentMemory 懒加载文件存储")
def test_memory_lazy_file_store_init():
from app.agent_runtime.memory import AgentMemory
mem = AgentMemory(scope_id="test_lazy")
assert mem._file_store is None
store = mem._get_file_store()
assert store is None, "未启用时应返回None"
mem2 = AgentMemory(scope_id="test_lazy2", memory_dir_enabled=True)
store2 = mem2._get_file_store()
assert store2 is not None, "启用后应返回实例"
return "OK"
@test("C4. Memory type filter 在向量搜索中的应用")
def test_memory_type_filter_search():
from app.agent_runtime.memory import AgentMemory
mem = AgentMemory(
scope_id="test_filter",
vector_memory_enabled=True,
memory_type_filter=["user"],
)
assert mem.memory_type_filter == ["user"]
mem2 = AgentMemory(
scope_id="test_filter2",
vector_memory_enabled=True,
memory_type_filter=["user", "project", "feedback", "reference"],
)
assert len(mem2.memory_type_filter) == 4
# None = 不过滤
mem3 = AgentMemory(scope_id="test_filter3")
assert mem3.memory_type_filter is None
return "OK"
@test("C5. 消息裁剪 - 多 tool_calls 配对")
def test_trim_messages_multi_tool_calls():
from app.agent_runtime.memory import AgentMemory
mem = AgentMemory(scope_id="test", max_history=5)
msgs = [
{"role": "system", "content": "你是助手"},
{"role": "user", "content": "查天气和股票"},
{"role": "assistant", "content": "", "tool_calls": [
{"name": "get_weather", "id": "1"},
{"name": "get_stock", "id": "2"},
]},
{"role": "tool", "content": "晴天", "tool_call_id": "1"},
{"role": "tool", "content": "涨了", "tool_call_id": "2"},
{"role": "assistant", "content": "天气晴,股票涨"},
{"role": "user", "content": "谢谢"},
{"role": "assistant", "content": "不客气"},
]
trimmed = mem.trim_messages(msgs)
assert trimmed[0]["role"] == "system"
# 不应该有独立的 tool 消息开头
for i, m in enumerate(trimmed):
if m["role"] == "tool" and i > 0 and trimmed[i-1]["role"] not in ("assistant", "tool"):
assert False, f"tool消息不能跟在{trimmed[i-1]['role']}之后 (index={i})"
return f"OK trimmed to {len(trimmed)}"
@test("C6. 消息裁剪 - 仅系统消息时")
def test_trim_messages_system_only():
from app.agent_runtime.memory import AgentMemory
mem = AgentMemory(scope_id="test", max_history=5)
msgs = [{"role": "system", "content": "你是助手"}]
trimmed = mem.trim_messages(msgs)
assert len(trimmed) == 1
assert trimmed[0]["role"] == "system"
return "OK"
@test("C7. 消息裁剪 - 大量 tool 消息连续性")
def test_trim_messages_tool_chain():
from app.agent_runtime.memory import AgentMemory
mem = AgentMemory(scope_id="test", max_history=3)
msgs = [
{"role": "system", "content": "你是助手"},
{"role": "user", "content": "Q1"},
{"role": "assistant", "content": "", "tool_calls": [{"id": "t1", "name": "f1", "type": "function", "function": {"name": "f1", "arguments": "{}"}}]},
{"role": "tool", "content": "R1", "tool_call_id": "t1"},
{"role": "user", "content": "Q2"},
{"role": "assistant", "content": "", "tool_calls": [{"id": "t2", "name": "f2", "type": "function", "function": {"name": "f2", "arguments": "{}"}}]},
{"role": "tool", "content": "R2", "tool_call_id": "t2"},
]
trimmed = mem.trim_messages(msgs)
assert trimmed[0]["role"] == "system"
# 检查没有孤立的 tool 消息
for i, m in enumerate(trimmed):
if m["role"] == "tool" and i == 1:
# 如果 tool 排在第一位system之后说明它的 assistant 被裁剪了
assert False, f"孤立tool消息: index={i}"
return f"OK trimmed to {len(trimmed)}"
@test("C8. 保存上下文时 fire-and-forget 异步")
def test_save_context_fire_and_forget():
from app.agent_runtime.memory import AgentMemory
mem = AgentMemory(scope_id="test_ff", persist=False)
# 模拟多轮对话触发压缩
messages = []
for i in range(5):
messages.append({"role": "user", "content": f"消息{i}"})
messages.append({"role": "assistant", "content": f"回复{i}"})
# save_context 不应阻塞
start = time.time()
asyncio.run(mem.save_context(
"新用户消息",
"新助手回复",
messages=messages,
))
elapsed = time.time() - start
# 不应超过 5 秒(异步不阻塞)
assert elapsed < 5.0, f"save_context 耗时 {elapsed:.2f}s疑似阻塞"
return f"OK elapsed={elapsed:.3f}s"
@test("C9. 全局知识去重逻辑")
def test_global_knowledge_dedup():
from app.agent_runtime.memory import AgentMemory
mem = AgentMemory(scope_id="test_dedup")
assert hasattr(mem, "save_global_knowledge")
assert callable(mem.save_global_knowledge)
# 测试短内容不保存
# (无 DB 连接时仅验证方法签名的正确性)
return "OK"
@test("C10. 记忆压缩摘要向量化方法可用")
def test_compressed_memory_save_method():
from app.agent_runtime.memory import AgentMemory
mem = AgentMemory(scope_id="test_compress")
assert hasattr(mem, "_save_compressed_memories")
# 验证方法签名正确性(通过 inspect
import inspect
sig = inspect.signature(mem._save_compressed_memories)
params = list(sig.parameters.keys())
assert "summary" in params
assert "facts" in params
assert "topics" in params
return "OK"
# ═══════════════════════════════════════════════════════════════
# D. Embedding 服务高级测试
# ═══════════════════════════════════════════════════════════════
@test("D1. 大向量序列化往返")
def test_embedding_large_serialization():
from app.services.embedding_service import embedding_service
# 模拟 1536 维向量
large_emb = [0.1] * 1536
serialized = embedding_service.serialize_embedding(large_emb)
deserialized = embedding_service.deserialize_embedding(serialized)
assert len(deserialized) == 1536
assert deserialized[0] == 0.1
assert deserialized[-1] == 0.1
return f"OK dims={len(deserialized)}"
@test("D2. 空列表序列化")
def test_embedding_empty_serialization():
from app.services.embedding_service import embedding_service
serialized = embedding_service.serialize_embedding([])
deserialized = embedding_service.deserialize_embedding(serialized)
assert deserialized == []
return "OK"
@test("D3. 相似度搜索 min_score 边界")
def test_similarity_search_min_score_edge():
from app.services.embedding_service import embedding_service
import asyncio
entries = [
{"id": "1", "content_text": "close", "embedding": [1.0, 0.0]},
{"id": "2", "content_text": "medium", "embedding": [0.5, 0.5]},
{"id": "3", "content_text": "far", "embedding": [0.0, 1.0]},
]
query_emb = [1.0, 0.0]
# min_score=0.9 仅保留非常相似的
results = asyncio.run(embedding_service.similarity_search(
query_emb, entries, top_k=5, min_score=0.9
))
assert len(results) == 1, f"高阈值应只有1条实际{len(results)}"
assert results[0]["content_text"] == "close"
# min_score=0.0 保留所有
results = asyncio.run(embedding_service.similarity_search(
query_emb, entries, top_k=5, min_score=0.0
))
assert len(results) == 3, f"零阈值应保留全部,实际{len(results)}"
return "OK"
@test("D4. 相似度搜索 top_k 截断")
def test_similarity_search_topk_truncation():
from app.services.embedding_service import embedding_service
import asyncio
# 使用不同方向的向量确保相似度可区分
entries = [
{"id": str(i), "content_text": f"entry_{i}", "embedding": [float(i), float(20-i)]}
for i in range(20)
]
query_emb = [1.0, 0.0] # 与X轴对齐so i越大越相似
results = asyncio.run(embedding_service.similarity_search(
query_emb, entries, top_k=3, min_score=0.0
))
assert len(results) == 3
# 最相似的几条应该有最大的 id因为 [i, 20-i] 与 [1,0] 的余弦 = i/sqrt(i^2+(20-i)^2)i越大值越大
ids = [int(r["id"]) for r in results]
assert ids == sorted(ids, reverse=True), f"应按相似度降序: {ids}"
return "OK"
@test("D5. Embedding 服务离线可用性")
def test_embedding_offline_available():
from app.services.embedding_service import embedding_service
assert embedding_service.offline_available is True, "离线关键词搜索应始终可用"
return "OK"
@test("D6. 相似度搜索跳过无 embedding 的条目")
def test_similarity_search_skip_empty_emb():
from app.services.embedding_service import embedding_service
import asyncio
entries = [
{"id": "1", "content_text": "has emb", "embedding": [1.0, 0.0]},
{"id": "2", "content_text": "no emb", "embedding": []},
{"id": "3", "content_text": "also no emb", "embedding": []},
]
query_emb = [1.0, 0.0]
results = asyncio.run(embedding_service.similarity_search(
query_emb, entries, top_k=5, min_score=0.0
))
assert len(results) == 1
assert results[0]["content_text"] == "has emb"
return "OK"
# ═══════════════════════════════════════════════════════════════
# E. Schema 验证测试
# ═══════════════════════════════════════════════════════════════
@test("E1. AgentMemoryConfig 所有字段默认值")
def test_schema_memory_config_defaults():
from app.agent_runtime.schemas import AgentMemoryConfig
cfg = AgentMemoryConfig()
assert cfg.enabled is True
assert cfg.max_history_messages == 20
assert cfg.persist_to_db is True
assert cfg.vector_memory_enabled is True
assert cfg.vector_memory_top_k == 5
assert cfg.vector_memory_rerank is False
assert cfg.memory_type_filter is None
assert cfg.team_id is None
assert cfg.team_share_enabled is False
assert cfg.learning_enabled is True
assert cfg.memory_dir_enabled is False
assert cfg.memory_dir_path == ""
return "OK"
@test("E2. AgentConfig 嵌套创建")
def test_schema_agent_config_nested():
from app.agent_runtime.schemas import (
AgentConfig, AgentLLMConfig, AgentMemoryConfig,
AgentToolConfig, AgentBudgetConfig,
)
config = AgentConfig(
name="测试Agent",
system_prompt="自定义提示词",
llm=AgentLLMConfig(model="deepseek-v4-flash"),
memory=AgentMemoryConfig(
vector_memory_top_k=10,
team_id="team_test",
memory_dir_enabled=True,
),
tools=AgentToolConfig(include_tools=["read_file"]),
budget=AgentBudgetConfig(max_tool_calls=100),
)
assert config.name == "测试Agent"
assert config.llm.model == "deepseek-v4-flash"
assert config.memory.vector_memory_top_k == 10
assert config.memory.team_id == "team_test"
assert config.memory.memory_dir_enabled is True
assert config.tools.include_tools == ["read_file"]
assert config.budget.max_tool_calls == 100
return "OK"
@test("E3. AgentToolConfig None 值强制转换")
def test_schema_tool_config_none_coercion():
from app.agent_runtime.schemas import AgentToolConfig
cfg = AgentToolConfig(
include_tools=None,
exclude_tools=None,
require_approval=None,
)
assert cfg.include_tools == []
assert cfg.exclude_tools == []
assert cfg.require_approval == []
return "OK"
@test("E4. AgentResult 字段验证")
def test_schema_agent_result():
from app.agent_runtime.schemas import AgentResult, AgentStep
result = AgentResult(
success=False,
content="出错了",
error="连接超时",
iterations_used=5,
tool_calls_made=10,
truncated=True,
steps=[
AgentStep(iteration=1, type="think", content="思考中"),
AgentStep(iteration=2, type="final", content="最终结果"),
],
)
assert result.success is False
assert result.error == "连接超时"
assert result.truncated is True
assert len(result.steps) == 2
assert result.steps[0].type == "think"
assert result.steps[1].type == "final"
return "OK"
# ═══════════════════════════════════════════════════════════════
# F. 压力测试
# ═══════════════════════════════════════════════════════════════
@test("F1. 批量文件记忆写入性能")
def test_file_memory_bulk_write():
from app.services.file_memory_service import FileMemoryStore
tmpdir = tempfile.mkdtemp(prefix="tmem_")
try:
store = FileMemoryStore(tmpdir)
start = time.time()
for i in range(100):
store.save(
f"记忆_{i:04d}",
f"这是第{i}条记忆的内容包含关键词Python、天工、Agent等",
mem_type="user" if i % 2 == 0 else "reference",
)
elapsed = time.time() - start
assert store.memory_count >= 90, f"应有至少90条实际{store.memory_count}"
# 100 条写入应在 5 秒内完成
assert elapsed < 5.0, f"100条写入耗时{elapsed:.2f}s太慢"
return f"OK {store.memory_count} entries in {elapsed:.2f}s"
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
@test("F2. 批量搜索性能")
def test_file_memory_bulk_search():
from app.services.file_memory_service import FileMemoryStore
tmpdir = tempfile.mkdtemp(prefix="tmem_")
try:
store = FileMemoryStore(tmpdir)
# 写入50条记忆
for i in range(50):
store.save(
f"记忆_{i:04d}",
f"内容_{i}: " + f"主题{'Python' if i%3==0 else '天工' if i%3==1 else 'Agent'}相关记忆" * 5,
mem_type="reference",
)
# 连续搜索
start = time.time()
for _ in range(20):
store.search("Python")
store.search("天工")
store.search("Agent")
elapsed = time.time() - start
# 60 次搜索应在 2 秒内
assert elapsed < 2.0, f"60次搜索耗时{elapsed:.2f}s"
return f"OK 60 searches in {elapsed:.3f}s"
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
@test("F3. AgentMemory 全配置场景构造")
def test_full_config_scenario():
from app.agent_runtime.memory import AgentMemory
tmpdir = tempfile.mkdtemp(prefix="tmem_")
try:
mem = AgentMemory(
scope_kind="agent",
scope_id="prod_agent_001",
session_key="user_session_abc",
persist=False,
max_history=30,
vector_memory_enabled=True,
vector_memory_top_k=10,
vector_memory_rerank=True,
memory_type_filter=["user", "project", "reference"],
team_id="production_team",
team_share_enabled=True,
memory_dir_enabled=True,
memory_dir_path=tmpdir,
)
# 验证所有配置
assert mem.scope_kind == "agent"
assert mem.scope_id == "prod_agent_001"
assert mem.max_history == 30
assert mem.vector_memory_top_k == 10
assert mem.vector_memory_rerank is True
assert len(mem.memory_type_filter) == 3
assert mem.team_id == "production_team"
assert mem.team_share_enabled is True
assert mem.memory_dir_enabled is True
# 文件存储应可用
store = mem._get_file_store()
assert store is not None
# 验证 4 种记忆类型
assert mem.MEMORY_TYPES == ("user", "feedback", "project", "reference")
return f"OK all params verified"
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
@test("F4. 记忆类型推断覆盖所有类型")
def test_memory_type_inference_all_types():
from app.agent_runtime.memory import AgentMemory
cases = [
# (user_message, assistant_reply, expected_type)
# user 类型
("我喜欢Python", "好的", "user"),
("记住我不喜欢辣", "记住了", "user"),
("我的名字是小明", "你好小明", "user"),
# feedback 类型
("这个报错了", "让我检查", "feedback"),
("不对应该是8080端口", "修正了", "feedback"),
("不要这样写代码", "好的我改", "feedback"),
("能不能换个方式", "可以", "feedback"),
# reference 类型
("数据库地址是什么", "101.43.95.130", "reference"),
("API的URL是什么", "http://api.test.com", "reference"),
("密码是多少", "123456", "reference"),
# project 类型
("这个任务进度怎么样", "完成了80%", "project"),
("需求文档写好了吗", "写好了", "project"),
("新功能测试通过了吗", "通过了", "project"),
("项目上线时间定了吗", "下周一", "project"),
]
for um, ar, expected in cases:
result = AgentMemory._infer_memory_type(um, ar)
assert result == expected, f"'{um[:30]}' 期望 {expected} 实际 {result}"
return f"OK {len(cases)} cases"
# ═══════════════════════════════════════════════════════════════
# G. Auto Dream 高级测试
# ═══════════════════════════════════════════════════════════════
@test("G1. Auto Dream 每日触发检查")
def test_auto_dream_daily_check():
from app.services.auto_dream_service import _should_dream_today
from datetime import datetime, timedelta, timezone
# 当前时间如果不是凌晨3点应该返回False
now = datetime.now(timezone.utc).astimezone(timezone(timedelta(hours=8)))
result = _should_dream_today()
if now.hour == 3:
# 如果恰好是3点可能会触发但已执行过就不会
pass
else:
assert result is False, f"非凌晨3点不应触发 (当前{now.hour}点)"
return f"OK (hour={now.hour})"
@test("G2. Auto Dream 整合函数可调用")
def test_auto_dream_runnable():
from app.services.auto_dream_service import run_auto_dream, _do_consolidate
assert callable(run_auto_dream)
assert callable(_do_consolidate)
# 不应报错 (带锁保护)
return "OK"
@test("G3. Auto Dream 合并阈值边界检查")
def test_auto_dream_threshold_validation():
from app.services.auto_dream_service import MERGE_SIMILARITY_THRESHOLD
assert isinstance(MERGE_SIMILARITY_THRESHOLD, float)
assert 0.0 < MERGE_SIMILARITY_THRESHOLD <= 1.0
# 85% 是合理的合并阈值
assert 0.80 <= MERGE_SIMILARITY_THRESHOLD <= 0.95
return f"OK threshold={MERGE_SIMILARITY_THRESHOLD}"
# ═══════════════════════════════════════════════════════════════
# H. 团队共享记忆边界测试 (P6)
# ═══════════════════════════════════════════════════════════════
@test("H1. 团队共享配置完整性")
def test_team_sharing_config_full():
from app.agent_runtime.memory import AgentMemory
# 启用团队共享
mem = AgentMemory(
scope_id="agent_a",
team_id="team_1",
team_share_enabled=True,
)
assert mem.team_id == "team_1"
assert mem.team_share_enabled is True
# 有team_id但不启用共享
mem2 = AgentMemory(
scope_id="agent_b",
team_id="team_1",
team_share_enabled=False,
)
assert mem2.team_id == "team_1"
assert mem2.team_share_enabled is False
# 无团队
mem3 = AgentMemory(scope_id="agent_c")
assert mem3.team_id is None
assert mem3.team_share_enabled is False
return "OK"
@test("H2. 团队共享 schema 默认值")
def test_team_sharing_schema_defaults():
from app.agent_runtime.schemas import AgentMemoryConfig
cfg = AgentMemoryConfig()
assert cfg.team_id is None
assert cfg.team_share_enabled is False
cfg2 = AgentMemoryConfig(team_id="my_team", team_share_enabled=True)
assert cfg2.team_id == "my_team"
assert cfg2.team_share_enabled is True
return "OK"
# ═══════════════════════════════════════════════════════════════
# 运行
# ═══════════════════════════════════════════════════════════════
if __name__ == "__main__":
print("=" * 60)
print("天工 Agent 记忆系统 — 高级/边界测试")
print("=" * 60)
print()
total = PASS + FAIL + SKIP
print()
print("=" * 60)
print(f"测试结果: {PASS} 通过 / {FAIL} 失败 / {SKIP} 跳过 (共 {total})")
print("=" * 60)
if FAIL > 0:
sys.exit(1)
else:
print("\n全部高级测试通过!")