""" 天工 Agent 记忆系统 — 高级/边界测试用例 覆盖:文件记忆更新/裁剪/并发、向量搜索过滤/团队查询、 离线搜索边界、全局知识去重、Embedding 序列化边界、 Schema 验证、消息裁剪工具配对、记忆生命周期压力测试 运行:cd backend && python tests/test_memory_advanced.py """ import asyncio import sys import time import tempfile import shutil import os import json import threading sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # ─── 测试框架 ─── PASS = 0 FAIL = 0 SKIP = 0 def test(name: str): def decorator(fn): global PASS, FAIL, SKIP try: result = fn() if asyncio.iscoroutine(result): result = asyncio.run(result) if result is False: FAIL += 1 print(f" FAIL {name}") elif result is True: PASS += 1 print(f" PASS {name}") else: PASS += 1 print(f" PASS {name} ({result})") except Exception as e: FAIL += 1 import traceback print(f" FAIL {name}: {e}") traceback.print_exc() return fn return decorator # ═══════════════════════════════════════════════════════════════ # A. 文件式记忆高级测试 (P7) # ═══════════════════════════════════════════════════════════════ @test("A1. 文件记忆重复保存(追加更新)") def test_file_memory_append(): from app.services.file_memory_service import FileMemoryStore tmpdir = tempfile.mkdtemp(prefix="tmem_") try: store = FileMemoryStore(tmpdir) # 第一次保存 store.save("测试条目", "第一版本内容", mem_type="user") assert store.memory_count == 1 # 同名再次保存(应追加而非创建新文件) store.save("测试条目", "第二版本更新内容", mem_type="user") assert store.memory_count == 1, "同名条目不应增加计数" # 搜索能找到更新的内容 results = store.search("第二版本") assert len(results) > 0, "应能找到更新的内容" return f"OK count={store.memory_count}" finally: shutil.rmtree(tmpdir, ignore_errors=True) @test("A2. 文件记忆索引裁剪(超过200条)") def test_file_memory_index_trimming(): from app.services.file_memory_service import FileMemoryStore, INDEX_MAX_ENTRIES tmpdir = tempfile.mkdtemp(prefix="tmem_") try: store = FileMemoryStore(tmpdir) # 写入 205 条记忆 for i in range(205): store.save(f"记忆条目_{i:04d}", f"这是第{i}条记忆的内容数据", mem_type="reference") count = store.memory_count assert count <= INDEX_MAX_ENTRIES, f"索引应被裁剪到 {INDEX_MAX_ENTRIES},实际 {count}" assert count == INDEX_MAX_ENTRIES, f"裁剪后应为 {INDEX_MAX_ENTRIES},实际 {count}" # 最早的那批应该被移除 results = store.search("记忆条目_0000") # 可能被裁剪掉,但搜索功能应正常 assert store.memory_count == INDEX_MAX_ENTRIES return f"OK trimmed to {count}" finally: shutil.rmtree(tmpdir, ignore_errors=True) @test("A3. 文件记忆按类型列出") def test_file_memory_list_by_type(): from app.services.file_memory_service import FileMemoryStore tmpdir = tempfile.mkdtemp(prefix="tmem_") try: store = FileMemoryStore(tmpdir) store.save("用户偏好A", "喜欢Python", mem_type="user") store.save("用户偏好B", "喜欢Vue", mem_type="user") store.save("项目信息", "天工平台开发", mem_type="project") store.save("反馈记录", "API超时问题", mem_type="feedback") store.save("参考文档", "数据库配置", mem_type="reference") assert len(store.list_by_type("user")) == 2 assert len(store.list_by_type("project")) == 1 assert len(store.list_by_type("feedback")) == 1 assert len(store.list_by_type("reference")) == 1 assert len(store.list_by_type("nonexistent")) == 0 assert len(store.list_by_type("")) == 5 return f"OK 4 types verified" finally: shutil.rmtree(tmpdir, ignore_errors=True) @test("A4. 文件记忆空查询返回最近条目") def test_file_memory_empty_query(): from app.services.file_memory_service import FileMemoryStore tmpdir = tempfile.mkdtemp(prefix="tmem_") try: store = FileMemoryStore(tmpdir) store.save("条目1", "内容A", mem_type="user") store.save("条目2", "内容B", mem_type="project") results = store.search("") assert len(results) >= 1, "空查询应返回最近条目" assert all(r["source"] == "file" for r in results) return f"OK {len(results)} recent" finally: shutil.rmtree(tmpdir, ignore_errors=True) @test("A5. 文件记忆安全文件名生成") def test_file_memory_safe_filename(): from app.services.file_memory_service import FileMemoryStore # 特殊字符 safe = FileMemoryStore._safe_filename("用户: 偏好/设置?*") assert ":" not in safe and "/" not in safe and "?" not in safe and "*" not in safe # 纯英文 safe = FileMemoryStore._safe_filename("user_profile") assert safe == "user_profile" # 中英混合 safe = FileMemoryStore._safe_filename("用户user_偏好pref") assert "用户" in safe and "user" in safe # 超长截断 long_name = "A" * 100 safe = FileMemoryStore._safe_filename(long_name) assert len(safe) <= 64 # 全部特殊字符 safe = FileMemoryStore._safe_filename("!@#$%") assert safe == "memory" # 兜底名称 return "OK" @test("A6. 文件记忆前端格式解析") def test_file_memory_frontmatter_parsing(): from app.services.file_memory_service import FileMemoryStore # 标准 frontmatter text = """--- name: 测试记忆 description: 这是一条测试记忆 type: user created: 2026-06-14T10:00:00+08:00 tags: [python, vue] --- 这是正文内容,包含重要信息。""" fm = FileMemoryStore._parse_frontmatter(text) assert fm["name"] == "测试记忆" assert fm["type"] == "user" assert fm["tags"] == ["python", "vue"] assert fm["created"] == "2026-06-14T10:00:00+08:00" # 无 frontmatter fm = FileMemoryStore._parse_frontmatter("纯文本内容,没有frontmatter") assert fm == {} or "description" in fm, "无frontmatter应返回空dict或仅有description" # 格式不完整的 frontmatter fm = FileMemoryStore._parse_frontmatter("---\n只有开始没有结束") assert isinstance(fm, dict), "应返回dict而非报错" return "OK" @test("A7. 文件记忆删除不存在的条目") def test_file_memory_delete_nonexistent(): from app.services.file_memory_service import FileMemoryStore tmpdir = tempfile.mkdtemp(prefix="tmem_") try: store = FileMemoryStore(tmpdir) result = store.delete("不存在的条目") assert result is True # 删除不存在条目不报错 return "OK" finally: shutil.rmtree(tmpdir, ignore_errors=True) @test("A8. 文件记忆并发安全写入") def test_file_memory_concurrent_writes(): from app.services.file_memory_service import FileMemoryStore tmpdir = tempfile.mkdtemp(prefix="tmem_") try: store = FileMemoryStore(tmpdir) errors = [] def write_batch(prefix, count): try: for i in range(count): store.save(f"{prefix}_{i}", f"{prefix}内容第{i}条", mem_type="user") except Exception as e: errors.append(str(e)) threads = [] for t in range(5): t = threading.Thread(target=write_batch, args=(f"thread{t}", 20)) threads.append(t) for t in threads: t.start() for t in threads: t.join() assert len(errors) == 0, f"并发写入出错: {errors}" assert store.memory_count > 0, "应有写入成功的记忆" return f"OK {store.memory_count} entries" finally: shutil.rmtree(tmpdir, ignore_errors=True) @test("A9. 文件记忆搜索无匹配时返回空") def test_file_memory_search_no_match(): from app.services.file_memory_service import FileMemoryStore tmpdir = tempfile.mkdtemp(prefix="tmem_") try: store = FileMemoryStore(tmpdir) store.save("数据库配置", "MySQL在101.43.95.130", mem_type="reference") results = store.search("xyznonexistent12345") assert len(results) == 0, "无匹配关键词应返回空" return "OK" finally: shutil.rmtree(tmpdir, ignore_errors=True) # ═══════════════════════════════════════════════════════════════ # B. 离线关键词搜索边界 (P5) # ═══════════════════════════════════════════════════════════════ @test("B1. 离线搜索空查询") def test_offline_search_empty_query(): from app.services.embedding_service import embedding_service entries = [ {"id": "1", "content_text": "测试内容", "embedding": [], "metadata": {}}, ] results = embedding_service.keyword_search("", entries, top_k=5) assert len(results) <= 5 return f"OK {len(results)} entries" @test("B2. 离线搜索空条目列表") def test_offline_search_empty_entries(): from app.services.embedding_service import embedding_service results = embedding_service.keyword_search("Python", [], top_k=5) assert results == [] return "OK" @test("B3. 离线搜索特殊字符") def test_offline_search_special_chars(): from app.services.embedding_service import embedding_service entries = [ {"id": "1", "content_text": "URL: http://101.43.95.130:3001/api", "embedding": [], "metadata": {}}, {"id": "2", "content_text": "正常中文内容无特殊字符", "embedding": [], "metadata": {}}, {"id": "3", "content_text": "email@example.com 联系方式", "embedding": [], "metadata": {}}, ] results = embedding_service.keyword_search("http 101.43.95.130", entries, top_k=3) assert len(results) > 0, "特殊字符搜索应有结果" assert "http" in results[0]["content_text"].lower(), f"应找到URL条目: {results[0]['content_text'][:50]}" results = embedding_service.keyword_search("email example", entries, top_k=3) assert len(results) > 0, "邮箱搜索应有结果" return f"OK URL+email search" @test("B4. 离线搜索长文本") def test_offline_search_long_text(): from app.services.embedding_service import embedding_service long_content = "天工智能体平台是一个企业级AI平台," + "支持多Agent编排、工具调用、知识库管理。" * 20 entries = [ {"id": "1", "content_text": long_content, "embedding": [], "metadata": {}}, {"id": "2", "content_text": "简短的Python记忆", "embedding": [], "metadata": {}}, ] results = embedding_service.keyword_search("Python", entries, top_k=3) assert len(results) > 0 assert results[0]["content_text"] == "简短的Python记忆" # 长文本Jaccard得分会低(union很大),降低min_score阈值 results = embedding_service.keyword_search("Agent编排", entries, top_k=3, min_score=0.001) assert len(results) > 0, "降低阈值后应该能找到长文本记忆" return f"OK long text searched" @test("B5. 分词器中英混合精确性") def test_tokenizer_mixed_text(): from app.services.embedding_service import embedding_service # 纯中文 tokens = embedding_service._tokenize("你好世界") assert "你好" in tokens and "世界" in tokens, f"中文二元组: {tokens}" assert "你" in tokens and "好" in tokens and "世" in tokens and "界" in tokens # 纯英文 tokens = embedding_service._tokenize("hello world python") assert "hello" in tokens and "world" in tokens and "python" in tokens # 数字 tokens = embedding_service._tokenize("端口8037版本3.11") assert "8037" in tokens, f"数字token: {tokens}" # 空字符串 tokens = embedding_service._tokenize("") assert len(tokens) == 0 # 纯符号 tokens = embedding_service._tokenize("!@#$%^&*()") assert len(tokens) == 0, f"纯符号应无token: {tokens}" return f"OK all edge cases" # ═══════════════════════════════════════════════════════════════ # C. AgentMemory 高级测试 # ═══════════════════════════════════════════════════════════════ @test("C1. AgentMemory 不持久化时文件记忆仍可写入") def test_memory_file_store_no_persist(): from app.agent_runtime.memory import AgentMemory tmpdir = tempfile.mkdtemp(prefix="tmem_") try: mem = AgentMemory( scope_id="test_no_persist", persist=False, memory_dir_enabled=True, memory_dir_path=tmpdir, ) store = mem._get_file_store() assert store is not None, "文件存储应可用" # 保存文件记忆 store.save("测试", "测试内容", mem_type="user") assert store.memory_count == 1 return f"OK count={store.memory_count}" finally: shutil.rmtree(tmpdir, ignore_errors=True) @test("C2. AgentMemory 初始化包含文件记忆搜索结果") def test_memory_initialize_with_file_memory(): from app.agent_runtime.memory import AgentMemory tmpdir = tempfile.mkdtemp(prefix="tmem_") try: # 预置文件记忆 from app.services.file_memory_service import FileMemoryStore store = FileMemoryStore(tmpdir) store.save("用户偏好", "用户喜欢用Python写后端API", mem_type="user") store.save("数据库信息", "MySQL地址101.43.95.130端口24936", mem_type="reference") mem = AgentMemory( scope_id="test_init_file", persist=False, memory_dir_enabled=True, memory_dir_path=tmpdir, ) text = asyncio.run(mem.initialize("Python后端开发")) assert isinstance(text, str) # 检查是否包含文件记忆 if "文件记忆" in text or "Python" in text: return f"OK file memory in context (len={len(text)})" else: return f"OK init returned (len={len(text)}, no file mem match for query)" finally: shutil.rmtree(tmpdir, ignore_errors=True) @test("C3. AgentMemory 懒加载文件存储") def test_memory_lazy_file_store_init(): from app.agent_runtime.memory import AgentMemory mem = AgentMemory(scope_id="test_lazy") assert mem._file_store is None store = mem._get_file_store() assert store is None, "未启用时应返回None" mem2 = AgentMemory(scope_id="test_lazy2", memory_dir_enabled=True) store2 = mem2._get_file_store() assert store2 is not None, "启用后应返回实例" return "OK" @test("C4. Memory type filter 在向量搜索中的应用") def test_memory_type_filter_search(): from app.agent_runtime.memory import AgentMemory mem = AgentMemory( scope_id="test_filter", vector_memory_enabled=True, memory_type_filter=["user"], ) assert mem.memory_type_filter == ["user"] mem2 = AgentMemory( scope_id="test_filter2", vector_memory_enabled=True, memory_type_filter=["user", "project", "feedback", "reference"], ) assert len(mem2.memory_type_filter) == 4 # None = 不过滤 mem3 = AgentMemory(scope_id="test_filter3") assert mem3.memory_type_filter is None return "OK" @test("C5. 消息裁剪 - 多 tool_calls 配对") def test_trim_messages_multi_tool_calls(): from app.agent_runtime.memory import AgentMemory mem = AgentMemory(scope_id="test", max_history=5) msgs = [ {"role": "system", "content": "你是助手"}, {"role": "user", "content": "查天气和股票"}, {"role": "assistant", "content": "", "tool_calls": [ {"name": "get_weather", "id": "1"}, {"name": "get_stock", "id": "2"}, ]}, {"role": "tool", "content": "晴天", "tool_call_id": "1"}, {"role": "tool", "content": "涨了", "tool_call_id": "2"}, {"role": "assistant", "content": "天气晴,股票涨"}, {"role": "user", "content": "谢谢"}, {"role": "assistant", "content": "不客气"}, ] trimmed = mem.trim_messages(msgs) assert trimmed[0]["role"] == "system" # 不应该有独立的 tool 消息开头 for i, m in enumerate(trimmed): if m["role"] == "tool" and i > 0 and trimmed[i-1]["role"] not in ("assistant", "tool"): assert False, f"tool消息不能跟在{trimmed[i-1]['role']}之后 (index={i})" return f"OK trimmed to {len(trimmed)}" @test("C6. 消息裁剪 - 仅系统消息时") def test_trim_messages_system_only(): from app.agent_runtime.memory import AgentMemory mem = AgentMemory(scope_id="test", max_history=5) msgs = [{"role": "system", "content": "你是助手"}] trimmed = mem.trim_messages(msgs) assert len(trimmed) == 1 assert trimmed[0]["role"] == "system" return "OK" @test("C7. 消息裁剪 - 大量 tool 消息连续性") def test_trim_messages_tool_chain(): from app.agent_runtime.memory import AgentMemory mem = AgentMemory(scope_id="test", max_history=3) msgs = [ {"role": "system", "content": "你是助手"}, {"role": "user", "content": "Q1"}, {"role": "assistant", "content": "", "tool_calls": [{"id": "t1", "name": "f1", "type": "function", "function": {"name": "f1", "arguments": "{}"}}]}, {"role": "tool", "content": "R1", "tool_call_id": "t1"}, {"role": "user", "content": "Q2"}, {"role": "assistant", "content": "", "tool_calls": [{"id": "t2", "name": "f2", "type": "function", "function": {"name": "f2", "arguments": "{}"}}]}, {"role": "tool", "content": "R2", "tool_call_id": "t2"}, ] trimmed = mem.trim_messages(msgs) assert trimmed[0]["role"] == "system" # 检查没有孤立的 tool 消息 for i, m in enumerate(trimmed): if m["role"] == "tool" and i == 1: # 如果 tool 排在第一位(system之后),说明它的 assistant 被裁剪了 assert False, f"孤立tool消息: index={i}" return f"OK trimmed to {len(trimmed)}" @test("C8. 保存上下文时 fire-and-forget 异步") def test_save_context_fire_and_forget(): from app.agent_runtime.memory import AgentMemory mem = AgentMemory(scope_id="test_ff", persist=False) # 模拟多轮对话触发压缩 messages = [] for i in range(5): messages.append({"role": "user", "content": f"消息{i}"}) messages.append({"role": "assistant", "content": f"回复{i}"}) # save_context 不应阻塞 start = time.time() asyncio.run(mem.save_context( "新用户消息", "新助手回复", messages=messages, )) elapsed = time.time() - start # 不应超过 5 秒(异步不阻塞) assert elapsed < 5.0, f"save_context 耗时 {elapsed:.2f}s,疑似阻塞" return f"OK elapsed={elapsed:.3f}s" @test("C9. 全局知识去重逻辑") def test_global_knowledge_dedup(): from app.agent_runtime.memory import AgentMemory mem = AgentMemory(scope_id="test_dedup") assert hasattr(mem, "save_global_knowledge") assert callable(mem.save_global_knowledge) # 测试短内容不保存 # (无 DB 连接时仅验证方法签名的正确性) return "OK" @test("C10. 记忆压缩摘要向量化方法可用") def test_compressed_memory_save_method(): from app.agent_runtime.memory import AgentMemory mem = AgentMemory(scope_id="test_compress") assert hasattr(mem, "_save_compressed_memories") # 验证方法签名正确性(通过 inspect) import inspect sig = inspect.signature(mem._save_compressed_memories) params = list(sig.parameters.keys()) assert "summary" in params assert "facts" in params assert "topics" in params return "OK" # ═══════════════════════════════════════════════════════════════ # D. Embedding 服务高级测试 # ═══════════════════════════════════════════════════════════════ @test("D1. 大向量序列化往返") def test_embedding_large_serialization(): from app.services.embedding_service import embedding_service # 模拟 1536 维向量 large_emb = [0.1] * 1536 serialized = embedding_service.serialize_embedding(large_emb) deserialized = embedding_service.deserialize_embedding(serialized) assert len(deserialized) == 1536 assert deserialized[0] == 0.1 assert deserialized[-1] == 0.1 return f"OK dims={len(deserialized)}" @test("D2. 空列表序列化") def test_embedding_empty_serialization(): from app.services.embedding_service import embedding_service serialized = embedding_service.serialize_embedding([]) deserialized = embedding_service.deserialize_embedding(serialized) assert deserialized == [] return "OK" @test("D3. 相似度搜索 min_score 边界") def test_similarity_search_min_score_edge(): from app.services.embedding_service import embedding_service import asyncio entries = [ {"id": "1", "content_text": "close", "embedding": [1.0, 0.0]}, {"id": "2", "content_text": "medium", "embedding": [0.5, 0.5]}, {"id": "3", "content_text": "far", "embedding": [0.0, 1.0]}, ] query_emb = [1.0, 0.0] # min_score=0.9 仅保留非常相似的 results = asyncio.run(embedding_service.similarity_search( query_emb, entries, top_k=5, min_score=0.9 )) assert len(results) == 1, f"高阈值应只有1条,实际{len(results)}" assert results[0]["content_text"] == "close" # min_score=0.0 保留所有 results = asyncio.run(embedding_service.similarity_search( query_emb, entries, top_k=5, min_score=0.0 )) assert len(results) == 3, f"零阈值应保留全部,实际{len(results)}" return "OK" @test("D4. 相似度搜索 top_k 截断") def test_similarity_search_topk_truncation(): from app.services.embedding_service import embedding_service import asyncio # 使用不同方向的向量确保相似度可区分 entries = [ {"id": str(i), "content_text": f"entry_{i}", "embedding": [float(i), float(20-i)]} for i in range(20) ] query_emb = [1.0, 0.0] # 与X轴对齐,so i越大越相似 results = asyncio.run(embedding_service.similarity_search( query_emb, entries, top_k=3, min_score=0.0 )) assert len(results) == 3 # 最相似的几条应该有最大的 id(因为 [i, 20-i] 与 [1,0] 的余弦 = i/sqrt(i^2+(20-i)^2),i越大值越大) ids = [int(r["id"]) for r in results] assert ids == sorted(ids, reverse=True), f"应按相似度降序: {ids}" return "OK" @test("D5. Embedding 服务离线可用性") def test_embedding_offline_available(): from app.services.embedding_service import embedding_service assert embedding_service.offline_available is True, "离线关键词搜索应始终可用" return "OK" @test("D6. 相似度搜索跳过无 embedding 的条目") def test_similarity_search_skip_empty_emb(): from app.services.embedding_service import embedding_service import asyncio entries = [ {"id": "1", "content_text": "has emb", "embedding": [1.0, 0.0]}, {"id": "2", "content_text": "no emb", "embedding": []}, {"id": "3", "content_text": "also no emb", "embedding": []}, ] query_emb = [1.0, 0.0] results = asyncio.run(embedding_service.similarity_search( query_emb, entries, top_k=5, min_score=0.0 )) assert len(results) == 1 assert results[0]["content_text"] == "has emb" return "OK" # ═══════════════════════════════════════════════════════════════ # E. Schema 验证测试 # ═══════════════════════════════════════════════════════════════ @test("E1. AgentMemoryConfig 所有字段默认值") def test_schema_memory_config_defaults(): from app.agent_runtime.schemas import AgentMemoryConfig cfg = AgentMemoryConfig() assert cfg.enabled is True assert cfg.max_history_messages == 20 assert cfg.persist_to_db is True assert cfg.vector_memory_enabled is True assert cfg.vector_memory_top_k == 5 assert cfg.vector_memory_rerank is False assert cfg.memory_type_filter is None assert cfg.team_id is None assert cfg.team_share_enabled is False assert cfg.learning_enabled is True assert cfg.memory_dir_enabled is False assert cfg.memory_dir_path == "" return "OK" @test("E2. AgentConfig 嵌套创建") def test_schema_agent_config_nested(): from app.agent_runtime.schemas import ( AgentConfig, AgentLLMConfig, AgentMemoryConfig, AgentToolConfig, AgentBudgetConfig, ) config = AgentConfig( name="测试Agent", system_prompt="自定义提示词", llm=AgentLLMConfig(model="deepseek-v4-flash"), memory=AgentMemoryConfig( vector_memory_top_k=10, team_id="team_test", memory_dir_enabled=True, ), tools=AgentToolConfig(include_tools=["read_file"]), budget=AgentBudgetConfig(max_tool_calls=100), ) assert config.name == "测试Agent" assert config.llm.model == "deepseek-v4-flash" assert config.memory.vector_memory_top_k == 10 assert config.memory.team_id == "team_test" assert config.memory.memory_dir_enabled is True assert config.tools.include_tools == ["read_file"] assert config.budget.max_tool_calls == 100 return "OK" @test("E3. AgentToolConfig None 值强制转换") def test_schema_tool_config_none_coercion(): from app.agent_runtime.schemas import AgentToolConfig cfg = AgentToolConfig( include_tools=None, exclude_tools=None, require_approval=None, ) assert cfg.include_tools == [] assert cfg.exclude_tools == [] assert cfg.require_approval == [] return "OK" @test("E4. AgentResult 字段验证") def test_schema_agent_result(): from app.agent_runtime.schemas import AgentResult, AgentStep result = AgentResult( success=False, content="出错了", error="连接超时", iterations_used=5, tool_calls_made=10, truncated=True, steps=[ AgentStep(iteration=1, type="think", content="思考中"), AgentStep(iteration=2, type="final", content="最终结果"), ], ) assert result.success is False assert result.error == "连接超时" assert result.truncated is True assert len(result.steps) == 2 assert result.steps[0].type == "think" assert result.steps[1].type == "final" return "OK" # ═══════════════════════════════════════════════════════════════ # F. 压力测试 # ═══════════════════════════════════════════════════════════════ @test("F1. 批量文件记忆写入性能") def test_file_memory_bulk_write(): from app.services.file_memory_service import FileMemoryStore tmpdir = tempfile.mkdtemp(prefix="tmem_") try: store = FileMemoryStore(tmpdir) start = time.time() for i in range(100): store.save( f"记忆_{i:04d}", f"这是第{i}条记忆的内容,包含关键词Python、天工、Agent等", mem_type="user" if i % 2 == 0 else "reference", ) elapsed = time.time() - start assert store.memory_count >= 90, f"应有至少90条,实际{store.memory_count}" # 100 条写入应在 5 秒内完成 assert elapsed < 5.0, f"100条写入耗时{elapsed:.2f}s,太慢" return f"OK {store.memory_count} entries in {elapsed:.2f}s" finally: shutil.rmtree(tmpdir, ignore_errors=True) @test("F2. 批量搜索性能") def test_file_memory_bulk_search(): from app.services.file_memory_service import FileMemoryStore tmpdir = tempfile.mkdtemp(prefix="tmem_") try: store = FileMemoryStore(tmpdir) # 写入50条记忆 for i in range(50): store.save( f"记忆_{i:04d}", f"内容_{i}: " + f"主题{'Python' if i%3==0 else '天工' if i%3==1 else 'Agent'}相关记忆" * 5, mem_type="reference", ) # 连续搜索 start = time.time() for _ in range(20): store.search("Python") store.search("天工") store.search("Agent") elapsed = time.time() - start # 60 次搜索应在 2 秒内 assert elapsed < 2.0, f"60次搜索耗时{elapsed:.2f}s" return f"OK 60 searches in {elapsed:.3f}s" finally: shutil.rmtree(tmpdir, ignore_errors=True) @test("F3. AgentMemory 全配置场景构造") def test_full_config_scenario(): from app.agent_runtime.memory import AgentMemory tmpdir = tempfile.mkdtemp(prefix="tmem_") try: mem = AgentMemory( scope_kind="agent", scope_id="prod_agent_001", session_key="user_session_abc", persist=False, max_history=30, vector_memory_enabled=True, vector_memory_top_k=10, vector_memory_rerank=True, memory_type_filter=["user", "project", "reference"], team_id="production_team", team_share_enabled=True, memory_dir_enabled=True, memory_dir_path=tmpdir, ) # 验证所有配置 assert mem.scope_kind == "agent" assert mem.scope_id == "prod_agent_001" assert mem.max_history == 30 assert mem.vector_memory_top_k == 10 assert mem.vector_memory_rerank is True assert len(mem.memory_type_filter) == 3 assert mem.team_id == "production_team" assert mem.team_share_enabled is True assert mem.memory_dir_enabled is True # 文件存储应可用 store = mem._get_file_store() assert store is not None # 验证 4 种记忆类型 assert mem.MEMORY_TYPES == ("user", "feedback", "project", "reference") return f"OK all params verified" finally: shutil.rmtree(tmpdir, ignore_errors=True) @test("F4. 记忆类型推断覆盖所有类型") def test_memory_type_inference_all_types(): from app.agent_runtime.memory import AgentMemory cases = [ # (user_message, assistant_reply, expected_type) # user 类型 ("我喜欢Python", "好的", "user"), ("记住我不喜欢辣", "记住了", "user"), ("我的名字是小明", "你好小明", "user"), # feedback 类型 ("这个报错了", "让我检查", "feedback"), ("不对,应该是8080端口", "修正了", "feedback"), ("不要这样写代码", "好的我改", "feedback"), ("能不能换个方式", "可以", "feedback"), # reference 类型 ("数据库地址是什么", "101.43.95.130", "reference"), ("API的URL是什么", "http://api.test.com", "reference"), ("密码是多少", "123456", "reference"), # project 类型 ("这个任务进度怎么样", "完成了80%", "project"), ("需求文档写好了吗", "写好了", "project"), ("新功能测试通过了吗", "通过了", "project"), ("项目上线时间定了吗", "下周一", "project"), ] for um, ar, expected in cases: result = AgentMemory._infer_memory_type(um, ar) assert result == expected, f"'{um[:30]}' 期望 {expected} 实际 {result}" return f"OK {len(cases)} cases" # ═══════════════════════════════════════════════════════════════ # G. Auto Dream 高级测试 # ═══════════════════════════════════════════════════════════════ @test("G1. Auto Dream 每日触发检查") def test_auto_dream_daily_check(): from app.services.auto_dream_service import _should_dream_today from datetime import datetime, timedelta, timezone # 当前时间如果不是凌晨3点,应该返回False now = datetime.now(timezone.utc).astimezone(timezone(timedelta(hours=8))) result = _should_dream_today() if now.hour == 3: # 如果恰好是3点,可能会触发(但已执行过就不会) pass else: assert result is False, f"非凌晨3点不应触发 (当前{now.hour}点)" return f"OK (hour={now.hour})" @test("G2. Auto Dream 整合函数可调用") def test_auto_dream_runnable(): from app.services.auto_dream_service import run_auto_dream, _do_consolidate assert callable(run_auto_dream) assert callable(_do_consolidate) # 不应报错 (带锁保护) return "OK" @test("G3. Auto Dream 合并阈值边界检查") def test_auto_dream_threshold_validation(): from app.services.auto_dream_service import MERGE_SIMILARITY_THRESHOLD assert isinstance(MERGE_SIMILARITY_THRESHOLD, float) assert 0.0 < MERGE_SIMILARITY_THRESHOLD <= 1.0 # 85% 是合理的合并阈值 assert 0.80 <= MERGE_SIMILARITY_THRESHOLD <= 0.95 return f"OK threshold={MERGE_SIMILARITY_THRESHOLD}" # ═══════════════════════════════════════════════════════════════ # H. 团队共享记忆边界测试 (P6) # ═══════════════════════════════════════════════════════════════ @test("H1. 团队共享配置完整性") def test_team_sharing_config_full(): from app.agent_runtime.memory import AgentMemory # 启用团队共享 mem = AgentMemory( scope_id="agent_a", team_id="team_1", team_share_enabled=True, ) assert mem.team_id == "team_1" assert mem.team_share_enabled is True # 有team_id但不启用共享 mem2 = AgentMemory( scope_id="agent_b", team_id="team_1", team_share_enabled=False, ) assert mem2.team_id == "team_1" assert mem2.team_share_enabled is False # 无团队 mem3 = AgentMemory(scope_id="agent_c") assert mem3.team_id is None assert mem3.team_share_enabled is False return "OK" @test("H2. 团队共享 schema 默认值") def test_team_sharing_schema_defaults(): from app.agent_runtime.schemas import AgentMemoryConfig cfg = AgentMemoryConfig() assert cfg.team_id is None assert cfg.team_share_enabled is False cfg2 = AgentMemoryConfig(team_id="my_team", team_share_enabled=True) assert cfg2.team_id == "my_team" assert cfg2.team_share_enabled is True return "OK" # ═══════════════════════════════════════════════════════════════ # 运行 # ═══════════════════════════════════════════════════════════════ if __name__ == "__main__": print("=" * 60) print("天工 Agent 记忆系统 — 高级/边界测试") print("=" * 60) print() total = PASS + FAIL + SKIP print() print("=" * 60) print(f"测试结果: {PASS} 通过 / {FAIL} 失败 / {SKIP} 跳过 (共 {total})") print("=" * 60) if FAIL > 0: sys.exit(1) else: print("\n全部高级测试通过!")