feat: add AI学习助手 agent (KG+RAG ideal) and renshenguo feishu bot

- Add AI学习助手 agent creation script with all 39 tools, 3-layer KG+RAG memory
- Add renshenguo (人参果) feishu bot integration (app_service + ws_handler)
- Register renshenguo WS client in main.py startup
- Add RENSHENGUO_APP_ID / RENSHENGUO_APP_SECRET / RENSHENGUO_AGENT_ID config
- Reorganize docs from root into docs/ subdirectories
- Move startup scripts to scripts/startup/
- Various backend optimizations and tool improvements

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
renjianbo
2026-05-06 01:37:13 +08:00
parent f33bc461ff
commit eabf90c496
171 changed files with 4906 additions and 445 deletions

View File

@@ -0,0 +1,822 @@
"""
知识图谱服务 — 实体抽取、关系构建、图谱查询、向量融合检索
为智能学习助手提供 KG+RAG 核心能力:
- 从对话/文本中提取知识点实体
- 构建实体间的语义关系(前置/扩展/包含/示例/关联)
- 图谱查询:邻近节点、路径查找、子图展开
- 向量+图谱融合检索:同时命中语义相似和结构关联
"""
from __future__ import annotations
import json
import logging
from typing import Any, Dict, List, Optional, Tuple, Set
from sqlalchemy.orm import Session
from sqlalchemy import or_, and_
from app.core.database import SessionLocal
from app.models.agent import KnowledgeEntity, KnowledgeRelation
from app.services.embedding_service import embedding_service, VectorEntry
logger = logging.getLogger(__name__)
# 实体类型定义
ENTITY_TYPES = ["concept", "formula", "fact", "term", "task", "skill"]
# 关系类型定义
RELATION_TYPES = {
"prerequisite": "前置知识(学习 B 前需要先掌握 A",
"extends": "扩展B 是 A 的深入/延伸)",
"contains": "包含A 包含子知识点 B",
"related_to": "相关A 与 B 存在关联)",
"example_of": "示例B 是 A 的实例/例题)",
"applies_to": "应用A 可应用于 B",
}
def _build_entity_embedding_text(name: str, entity_type: str, description: str) -> str:
"""构建用于 embedding 的统一文本。"""
parts = [f"[{entity_type}] {name}"]
if description:
parts.append(f": {description[:500]}")
return "".join(parts)
def _serialize_embedding(emb: Optional[List[float]]) -> Optional[str]:
if not emb:
return None
return embedding_service.serialize_embedding(emb)
# ═══════════════════════════════════════════════════════════════
# 实体管理
# ═══════════════════════════════════════════════════════════════
async def add_entity(
db: Session,
name: str,
entity_type: str = "concept",
description: str = "",
metadata: Optional[Dict[str, Any]] = None,
source: str = "extracted",
confidence: str = "medium",
scope_kind: str = "agent",
scope_id: str = "",
user_id: Optional[str] = None,
) -> Optional[KnowledgeEntity]:
"""添加或更新知识实体(按 name+scope 去重,更新描述和 embedding"""
name = name.strip()[:200]
if not name:
return None
if entity_type not in ENTITY_TYPES:
entity_type = "concept"
# 查找已有实体(同名+同scope
existing = (
db.query(KnowledgeEntity)
.filter(
KnowledgeEntity.name == name,
KnowledgeEntity.scope_kind == scope_kind,
KnowledgeEntity.scope_id == scope_id,
)
.first()
)
# 生成 embedding
emb_text = _build_entity_embedding_text(name, entity_type, description)
embedding_json = None
try:
emb = await embedding_service.generate_embedding(emb_text)
if emb:
embedding_json = _serialize_embedding(emb)
except Exception as e:
logger.warning("生成实体 embedding 失败: %s", e)
if existing:
# 更新已有实体
if description and len(description) > len(existing.description or ""):
existing.description = description
if embedding_json:
existing.embedding = embedding_json
if metadata:
merged = {**(existing.metadata_ or {}), **metadata}
existing.metadata_ = merged
existing.confidence = confidence
db.commit()
db.refresh(existing)
logger.debug("更新知识实体: %s (%s)", name, entity_type)
return existing
entity = KnowledgeEntity(
name=name,
entity_type=entity_type,
description=description,
embedding=embedding_json,
metadata_=metadata or {},
source=source,
confidence=confidence,
scope_kind=scope_kind,
scope_id=scope_id,
user_id=user_id,
)
db.add(entity)
db.commit()
db.refresh(entity)
logger.info("新增知识实体: %s (%s) id=%s", name, entity_type, entity.id)
return entity
async def add_entities_batch(
db: Session,
entities: List[Dict[str, Any]],
scope_kind: str = "agent",
scope_id: str = "",
user_id: Optional[str] = None,
) -> List[KnowledgeEntity]:
"""批量添加实体,返回新增/更新后的实体列表。"""
results: List[KnowledgeEntity] = []
for ent in entities:
try:
e = await add_entity(
db,
name=ent.get("name", ""),
entity_type=ent.get("entity_type", "concept"),
description=ent.get("description", ""),
metadata=ent.get("metadata"),
source=ent.get("source", "extracted"),
confidence=ent.get("confidence", "medium"),
scope_kind=scope_kind,
scope_id=scope_id,
user_id=user_id,
)
if e:
results.append(e)
except Exception as ex:
logger.warning("批量添加实体失败: name=%s err=%s", ent.get("name"), ex)
return results
# ═══════════════════════════════════════════════════════════════
# 关系管理
# ═══════════════════════════════════════════════════════════════
def add_relation(
db: Session,
source_entity_id: str,
target_entity_id: str,
relation_type: str = "related_to",
description: str = "",
weight: float = 1.0,
scope_kind: str = "agent",
scope_id: str = "",
) -> Optional[KnowledgeRelation]:
"""添加两个实体间的关系(去重)。"""
if source_entity_id == target_entity_id:
return None
if relation_type not in RELATION_TYPES:
relation_type = "related_to"
existing = (
db.query(KnowledgeRelation)
.filter(
KnowledgeRelation.source_entity_id == source_entity_id,
KnowledgeRelation.target_entity_id == target_entity_id,
KnowledgeRelation.relation_type == relation_type,
KnowledgeRelation.scope_kind == scope_kind,
KnowledgeRelation.scope_id == scope_id,
)
.first()
)
if existing:
logger.debug("关系已存在: %s -[%s]-> %s", source_entity_id[:8], relation_type, target_entity_id[:8])
return existing
rel = KnowledgeRelation(
source_entity_id=source_entity_id,
target_entity_id=target_entity_id,
relation_type=relation_type,
description=description,
weight=str(weight),
scope_kind=scope_kind,
scope_id=scope_id,
)
db.add(rel)
db.commit()
db.refresh(rel)
logger.info("新增关系: %s -[%s]-> %s", source_entity_id[:8], relation_type, target_entity_id[:8])
return rel
def add_relations_from_map(
db: Session,
entity_map: Dict[str, str], # name -> entity_id
relations: List[Dict[str, Any]],
scope_kind: str = "agent",
scope_id: str = "",
) -> int:
"""从关系映射批量添加关系。relations 中 source/target 使用实体名称,自动映射为 ID。"""
count = 0
for rel in relations:
src_name = rel.get("source", "")
tgt_name = rel.get("target", "")
src_id = entity_map.get(src_name)
tgt_id = entity_map.get(tgt_name)
if not src_id or not tgt_id:
continue
r = add_relation(
db,
source_entity_id=src_id,
target_entity_id=tgt_id,
relation_type=rel.get("relation_type", "related_to"),
description=rel.get("description", ""),
weight=float(rel.get("weight", 1.0)),
scope_kind=scope_kind,
scope_id=scope_id,
)
if r:
count += 1
return count
# ═══════════════════════════════════════════════════════════════
# 图谱查询
# ═══════════════════════════════════════════════════════════════
def get_entity_by_id(db: Session, entity_id: str) -> Optional[KnowledgeEntity]:
return db.query(KnowledgeEntity).filter(KnowledgeEntity.id == entity_id).first()
def search_entities(
db: Session,
keyword: str = "",
entity_type: Optional[str] = None,
scope_kind: str = "agent",
scope_id: str = "",
limit: int = 20,
) -> List[KnowledgeEntity]:
"""关键词搜索实体。"""
q = db.query(KnowledgeEntity).filter(
KnowledgeEntity.scope_kind == scope_kind,
KnowledgeEntity.scope_id == scope_id,
)
if entity_type:
q = q.filter(KnowledgeEntity.entity_type == entity_type)
if keyword:
pattern = f"%{keyword}%"
q = q.filter(
or_(
KnowledgeEntity.name.like(pattern),
KnowledgeEntity.description.like(pattern),
)
)
return q.order_by(KnowledgeEntity.confidence.desc(), KnowledgeEntity.created_at.desc()).limit(limit).all()
def get_neighbors(
db: Session,
entity_id: str,
relation_types: Optional[List[str]] = None,
direction: str = "both",
max_depth: int = 1,
limit: int = 30,
) -> Dict[str, Any]:
"""
获取实体的图谱邻居。
Returns:
{
"entity": {...},
"neighbors": [{"entity": {...}, "relation": {...}, "direction": "out"|"in"}],
"subgraph_size": int,
}
"""
entity = get_entity_by_id(db, entity_id)
if not entity:
return {"entity": None, "neighbors": [], "subgraph_size": 0}
neighbors: List[Dict[str, Any]] = []
if direction in ("out", "both"):
q = db.query(KnowledgeRelation).filter(
KnowledgeRelation.source_entity_id == entity_id,
)
if relation_types:
q = q.filter(KnowledgeRelation.relation_type.in_(relation_types))
for rel in q.limit(limit).all():
target = get_entity_by_id(db, rel.target_entity_id)
if target:
neighbors.append({
"entity": _entity_to_dict(target),
"relation": _relation_to_dict(rel),
"direction": "out",
})
if direction in ("in", "both"):
q = db.query(KnowledgeRelation).filter(
KnowledgeRelation.target_entity_id == entity_id,
)
if relation_types:
q = q.filter(KnowledgeRelation.relation_type.in_(relation_types))
for rel in q.limit(limit).all():
source = get_entity_by_id(db, rel.source_entity_id)
if source:
neighbors.append({
"entity": _entity_to_dict(source),
"relation": _relation_to_dict(rel),
"direction": "in",
})
return {
"entity": _entity_to_dict(entity),
"neighbors": neighbors[:limit],
"subgraph_size": len(neighbors),
}
def get_entity_graph(
db: Session,
entity_id: str,
max_depth: int = 2,
limit: int = 50,
) -> Dict[str, Any]:
"""获取以实体为中心的子图BFS 展开)。"""
entity = get_entity_by_id(db, entity_id)
if not entity:
return {"nodes": [], "edges": [], "center_id": entity_id}
visited: Set[str] = {entity_id}
nodes: Dict[str, Dict[str, Any]] = {entity_id: _entity_to_dict(entity)}
edges: List[Dict[str, Any]] = []
frontier = {entity_id}
for depth in range(max_depth):
next_frontier: Set[str] = set()
if len(nodes) >= limit:
break
for nid in list(frontier):
# 出边
out_rels = (
db.query(KnowledgeRelation)
.filter(KnowledgeRelation.source_entity_id == nid)
.limit(limit // 2)
.all()
)
for rel in out_rels:
edges.append(_relation_to_dict(rel))
if rel.target_entity_id not in visited and len(nodes) < limit:
tgt = get_entity_by_id(db, rel.target_entity_id)
if tgt:
nodes[rel.target_entity_id] = _entity_to_dict(tgt)
visited.add(rel.target_entity_id)
next_frontier.add(rel.target_entity_id)
# 入边
in_rels = (
db.query(KnowledgeRelation)
.filter(KnowledgeRelation.target_entity_id == nid)
.limit(limit // 2)
.all()
)
for rel in in_rels:
edges.append(_relation_to_dict(rel))
if rel.source_entity_id not in visited and len(nodes) < limit:
src = get_entity_by_id(db, rel.source_entity_id)
if src:
nodes[rel.source_entity_id] = _entity_to_dict(src)
visited.add(rel.source_entity_id)
next_frontier.add(rel.source_entity_id)
frontier = next_frontier
if not frontier:
break
return {
"nodes": list(nodes.values()),
"edges": edges,
"center_id": entity_id,
}
def find_path(
db: Session,
source_id: str,
target_id: str,
max_depth: int = 4,
) -> Optional[List[Dict[str, Any]]]:
"""BFS 查找两个实体间的最短路径。"""
if source_id == target_id:
return [{"entity": _entity_to_dict(get_entity_by_id(db, source_id)), "relation": None}]
visited: Set[str] = {source_id}
# BFS queue: (current_id, path_so_far)
from collections import deque
queue: deque = deque()
queue.append((source_id, []))
while queue:
current_id, path = queue.popleft()
if len(path) >= max_depth:
continue
# 检查所有出边
out_rels = (
db.query(KnowledgeRelation)
.filter(KnowledgeRelation.source_entity_id == current_id)
.all()
)
for rel in out_rels:
if rel.target_entity_id in visited:
continue
new_path = path + [{
"from": _entity_to_dict(get_entity_by_id(db, current_id)),
"relation": _relation_to_dict(rel),
"to": _entity_to_dict(get_entity_by_id(db, rel.target_entity_id)),
}]
if rel.target_entity_id == target_id:
return new_path
visited.add(rel.target_entity_id)
queue.append((rel.target_entity_id, new_path))
# 检查所有入边
in_rels = (
db.query(KnowledgeRelation)
.filter(KnowledgeRelation.target_entity_id == current_id)
.all()
)
for rel in in_rels:
if rel.source_entity_id in visited:
continue
new_path = path + [{
"from": _entity_to_dict(get_entity_by_id(db, current_id)),
"relation": _relation_to_dict(rel),
"to": _entity_to_dict(get_entity_by_id(db, rel.source_entity_id)),
}]
if rel.source_entity_id == target_id:
return new_path
visited.add(rel.source_entity_id)
queue.append((rel.source_entity_id, new_path))
return None
# ═══════════════════════════════════════════════════════════════
# LLM 驱动的实体与关系提取
# ═══════════════════════════════════════════════════════════════
EXTRACTION_PROMPT = """你是一个知识图谱构建助手。请分析以下文本,提取其中的知识点实体和它们之间的关系。
请返回 JSON 格式(不要 markdown 包裹),包含:
1. entities: 提取的知识实体列表,每个实体包含:
- name: 实体名称(简洁准确)
- entity_type: 类型concept=概念, formula=公式, fact=事实, term=术语, task=任务, skill=技能)
- description: 简要描述1-2句
- confidence: 置信度low/medium/high
2. relations: 实体间的关系列表,每个关系包含:
- source: 源实体名称
- target: 目标实体名称
- relation_type: 关系类型prerequisite=前置知识, extends=扩展, contains=包含, related_to=相关, example_of=示例, applies_to=应用)
- description: 关系说明1句话
注意:
- 只提取文本中明确出现的知识点,不编造
- 关系应只在同段文本中有关联的实体间建立
- 如果文本中知识点不足,返回空数组即可
文本内容:
{text}"""
async def extract_from_text(
text: str,
scope_kind: str = "agent",
scope_id: str = "",
user_id: Optional[str] = None,
) -> Dict[str, Any]:
"""
使用 LLM 从文本中提取实体和关系,并写入知识图谱。
Returns:
{"entities": [...], "relations": [...], "entity_count": int, "relation_count": int}
"""
if not text or len(text.strip()) < 20:
return {"entities": [], "relations": [], "entity_count": 0, "relation_count": 0}
from openai import AsyncOpenAI
from app.core.config import settings
# 调用 LLM 提取
try:
api_key = settings.DEEPSEEK_API_KEY or settings.OPENAI_API_KEY or ""
base_url = settings.DEEPSEEK_BASE_URL or settings.OPENAI_BASE_URL or "https://api.deepseek.com"
if api_key == "your-openai-api-key":
api_key = settings.DEEPSEEK_API_KEY or ""
base_url = settings.DEEPSEEK_BASE_URL or "https://api.deepseek.com"
if not api_key:
logger.warning("知识图谱提取:未配置 API Key跳过")
return {"entities": [], "relations": [], "entity_count": 0, "relation_count": 0}
client = AsyncOpenAI(api_key=api_key, base_url=base_url)
resp = await client.chat.completions.create(
model="deepseek-v4-flash",
messages=[{"role": "user", "content": EXTRACTION_PROMPT.format(text=text[:4000])}],
temperature=0.2,
max_tokens=2048,
timeout=30,
)
raw = resp.choices[0].message.content or ""
# 解析 JSON
raw = raw.strip().removeprefix("```json").removesuffix("```").strip()
result = json.loads(raw)
except json.JSONDecodeError:
logger.warning("知识图谱提取LLM 返回非 JSON跳过")
return {"entities": [], "relations": [], "entity_count": 0, "relation_count": 0}
except Exception as e:
logger.warning("知识图谱提取失败: %s", e)
return {"entities": [], "relations": [], "entity_count": 0, "relation_count": 0}
extracted_entities = result.get("entities", [])
extracted_relations = result.get("relations", [])
if not extracted_entities:
return {"entities": [], "relations": [], "entity_count": 0, "relation_count": 0}
# 写入数据库
db: Optional[Session] = None
try:
db = SessionLocal()
# 批量添加实体
created = await add_entities_batch(
db, extracted_entities,
scope_kind=scope_kind, scope_id=scope_id, user_id=user_id,
)
# 构建 name->id 映射
entity_map: Dict[str, str] = {}
for e in created:
entity_map[e.name] = e.id
# 添加关系
rel_count = add_relations_from_map(
db, entity_map, extracted_relations,
scope_kind=scope_kind, scope_id=scope_id,
)
return {
"entities": [_entity_to_dict(e) for e in created],
"relations": extracted_relations[:rel_count],
"entity_count": len(created),
"relation_count": rel_count,
}
except Exception as e:
logger.error("写入知识图谱失败: %s", e)
if db:
db.rollback()
return {"entities": [], "relations": [], "entity_count": 0, "relation_count": 0}
finally:
if db:
db.close()
# ═══════════════════════════════════════════════════════════════
# 向量+图谱融合检索(核心 RAG 能力)
# ═══════════════════════════════════════════════════════════════
async def hybrid_search(
query: str,
scope_kind: str = "agent",
scope_id: str = "",
top_k: int = 5,
include_neighbors: bool = True,
) -> Dict[str, Any]:
"""
向量+图谱融合检索。
1. 用 query 生成 embedding在知识实体中做向量相似度搜索
2. 对 Top-K 实体展开图谱邻居
3. 返回融合后的结构化知识上下文
Returns:
{
"query": str,
"vector_matches": [...], # 向量检索命中的实体
"graph_expansion": {...}, # 图谱展开的邻居子图
"formatted_context": str, # 格式化后的文本上下文(可直接注入 LLM
}
"""
if not query or not query.strip():
return {"query": query, "vector_matches": [], "graph_expansion": {}, "formatted_context": ""}
db: Optional[Session] = None
try:
db = SessionLocal()
# Step 1: 向量检索
query_emb = await embedding_service.generate_embedding(query)
if not query_emb:
# 降级:关键词检索
entities = search_entities(db, keyword=query, scope_kind=scope_kind, scope_id=scope_id, limit=top_k * 3)
vector_matches = [_entity_to_dict(e) for e in entities[:top_k]]
else:
all_entities = (
db.query(KnowledgeEntity)
.filter(
KnowledgeEntity.scope_kind == scope_kind,
KnowledgeEntity.scope_id == scope_id,
KnowledgeEntity.embedding.isnot(None),
)
.limit(200)
.all()
)
entries: List[VectorEntry] = []
for e in all_entities:
emb = embedding_service.deserialize_embedding(e.embedding) if e.embedding else []
if emb:
entries.append({
"id": e.id,
"scope_kind": scope_kind,
"scope_id": scope_id,
"content_text": _build_entity_embedding_text(e.name, e.entity_type, e.description or ""),
"embedding": emb,
"metadata": {"entity_type": e.entity_type, "name": e.name, "entity_id": e.id},
})
matched = await embedding_service.similarity_search(query_emb, entries, top_k=top_k)
# 重新获取完整实体信息
matched_ids = [m["metadata"].get("entity_id", "") for m in matched]
vector_matches = []
for mid in matched_ids:
e = get_entity_by_id(db, mid)
if e:
score = next((m.get("score", 0) for m in matched if m["metadata"].get("entity_id") == mid), 0)
d = _entity_to_dict(e)
d["score"] = score
vector_matches.append(d)
# Step 2: 图谱展开(对 Top-3 实体取邻居)
graph_expansion: Dict[str, Any] = {"entities": [], "relations": []}
seen_entity_ids: Set[str] = set()
if include_neighbors and vector_matches:
for vm in vector_matches[:3]:
eid = vm["id"]
if eid in seen_entity_ids:
continue
seen_entity_ids.add(eid)
sub = get_neighbors(db, eid, direction="both", limit=5)
if sub["entity"] and sub["entity"] not in graph_expansion["entities"]:
graph_expansion["entities"].append(sub["entity"])
for nb in sub["neighbors"]:
if nb["entity"] not in graph_expansion["entities"]:
graph_expansion["entities"].append(nb["entity"])
rel_dict = nb["relation"]
if rel_dict not in graph_expansion["relations"]:
graph_expansion["relations"].append(rel_dict)
# Step 3: 格式化上下文
formatted = _format_hybrid_context(query, vector_matches, graph_expansion)
return {
"query": query,
"vector_matches": vector_matches,
"graph_expansion": graph_expansion,
"formatted_context": formatted,
}
except Exception as e:
logger.error("融合检索失败: %s", e)
return {"query": query, "vector_matches": [], "graph_expansion": {}, "formatted_context": ""}
finally:
if db:
db.close()
def _format_hybrid_context(
query: str,
vector_matches: List[Dict[str, Any]],
graph_expansion: Dict[str, Any],
) -> str:
"""格式化融合检索结果为 LLM 可读的文本块。"""
parts: List[str] = []
if vector_matches:
parts.append("## 相关知识实体(向量检索)")
for i, vm in enumerate(vector_matches, 1):
score_str = f" (匹配度: {vm.get('score', 1.0):.2f})" if vm.get('score', 1.0) < 1.0 else ""
parts.append(f"{i}. [{vm.get('entity_type', 'concept')}] **{vm.get('name', '')}**{score_str}")
if vm.get("description"):
parts.append(f" {vm['description'][:300]}")
if graph_expansion.get("entities"):
parts.append("\n## 关联知识点(图谱展开)")
for ent in graph_expansion["entities"]:
parts.append(f"- [{ent.get('entity_type', '')}] **{ent.get('name', '')}**")
if ent.get("description"):
parts.append(f" {ent['description'][:200]}")
if graph_expansion.get("relations"):
parts.append("\n## 知识关系")
for rel in graph_expansion["relations"]:
parts.append(f"- `{rel.get('relation_type', '')}`: {rel.get('description', '')}")
return "\n".join(parts) if parts else ""
# ═══════════════════════════════════════════════════════════════
# 学习路径推荐
# ═══════════════════════════════════════════════════════════════
def recommend_learning_path(
db: Session,
target_entity_ids: List[str],
scope_kind: str = "agent",
scope_id: str = "",
) -> Dict[str, Any]:
"""
基于知识图谱推荐学习路径。
对目标实体集合,分析前置关系,给出建议的学习顺序。
"""
all_entities: Dict[str, Dict[str, Any]] = {}
all_prereqs: List[Dict[str, Any]] = []
for eid in target_entity_ids:
entity = get_entity_by_id(db, eid)
if not entity:
continue
all_entities[eid] = _entity_to_dict(entity)
# 查找前置知识
prereqs = (
db.query(KnowledgeRelation)
.filter(
KnowledgeRelation.target_entity_id == eid,
KnowledgeRelation.relation_type == "prerequisite",
KnowledgeRelation.scope_kind == scope_kind,
KnowledgeRelation.scope_id == scope_id,
)
.all()
)
for pr in prereqs:
src = get_entity_by_id(db, pr.source_entity_id)
if src:
all_entities[pr.source_entity_id] = _entity_to_dict(src)
all_prereqs.append({
"prerequisite": _entity_to_dict(src),
"target": _entity_to_dict(entity),
"relation": _relation_to_dict(pr),
})
# 简单排序:按依赖数量(叶子在前,被依赖的在后)
def _dep_count(eid: str) -> int:
return sum(1 for p in all_prereqs if p["target"]["id"] == eid)
sorted_entities = sorted(all_entities.values(), key=lambda e: _dep_count(e["id"]))
return {
"entities": sorted_entities,
"prerequisites": all_prereqs,
"suggested_order": [e["name"] for e in sorted_entities],
"summary": f"建议按以下顺序学习: {''.join(e['name'] for e in sorted_entities)}" if sorted_entities else "暂无学习路径",
}
# ═══════════════════════════════════════════════════════════════
# 工具函数
# ═══════════════════════════════════════════════════════════════
def _entity_to_dict(entity: KnowledgeEntity) -> Dict[str, Any]:
return {
"id": entity.id,
"name": entity.name,
"entity_type": entity.entity_type,
"description": entity.description,
"confidence": entity.confidence,
"source": entity.source,
"metadata": entity.metadata_ or {},
"created_at": entity.created_at.isoformat() if entity.created_at else None,
}
def _relation_to_dict(rel: KnowledgeRelation) -> Dict[str, Any]:
return {
"id": rel.id,
"source_entity_id": rel.source_entity_id,
"target_entity_id": rel.target_entity_id,
"relation_type": rel.relation_type,
"description": rel.description,
"weight": float(rel.weight) if rel.weight else 1.0,
}