Files
aiagent/backend/app/services/knowledge_graph_service.py
renjianbo eabf90c496 feat: add AI学习助手 agent (KG+RAG ideal) and renshenguo feishu bot
- Add AI学习助手 agent creation script with all 39 tools, 3-layer KG+RAG memory
- Add renshenguo (人参果) feishu bot integration (app_service + ws_handler)
- Register renshenguo WS client in main.py startup
- Add RENSHENGUO_APP_ID / RENSHENGUO_APP_SECRET / RENSHENGUO_AGENT_ID config
- Reorganize docs from root into docs/ subdirectories
- Move startup scripts to scripts/startup/
- Various backend optimizations and tool improvements

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-06 01:37:13 +08:00

823 lines
30 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
知识图谱服务 — 实体抽取、关系构建、图谱查询、向量融合检索
为智能学习助手提供 KG+RAG 核心能力:
- 从对话/文本中提取知识点实体
- 构建实体间的语义关系(前置/扩展/包含/示例/关联)
- 图谱查询:邻近节点、路径查找、子图展开
- 向量+图谱融合检索:同时命中语义相似和结构关联
"""
from __future__ import annotations
import json
import logging
from typing import Any, Dict, List, Optional, Tuple, Set
from sqlalchemy.orm import Session
from sqlalchemy import or_, and_
from app.core.database import SessionLocal
from app.models.agent import KnowledgeEntity, KnowledgeRelation
from app.services.embedding_service import embedding_service, VectorEntry
logger = logging.getLogger(__name__)
# 实体类型定义
ENTITY_TYPES = ["concept", "formula", "fact", "term", "task", "skill"]
# 关系类型定义
RELATION_TYPES = {
"prerequisite": "前置知识(学习 B 前需要先掌握 A",
"extends": "扩展B 是 A 的深入/延伸)",
"contains": "包含A 包含子知识点 B",
"related_to": "相关A 与 B 存在关联)",
"example_of": "示例B 是 A 的实例/例题)",
"applies_to": "应用A 可应用于 B",
}
def _build_entity_embedding_text(name: str, entity_type: str, description: str) -> str:
"""构建用于 embedding 的统一文本。"""
parts = [f"[{entity_type}] {name}"]
if description:
parts.append(f": {description[:500]}")
return "".join(parts)
def _serialize_embedding(emb: Optional[List[float]]) -> Optional[str]:
if not emb:
return None
return embedding_service.serialize_embedding(emb)
# ═══════════════════════════════════════════════════════════════
# 实体管理
# ═══════════════════════════════════════════════════════════════
async def add_entity(
db: Session,
name: str,
entity_type: str = "concept",
description: str = "",
metadata: Optional[Dict[str, Any]] = None,
source: str = "extracted",
confidence: str = "medium",
scope_kind: str = "agent",
scope_id: str = "",
user_id: Optional[str] = None,
) -> Optional[KnowledgeEntity]:
"""添加或更新知识实体(按 name+scope 去重,更新描述和 embedding"""
name = name.strip()[:200]
if not name:
return None
if entity_type not in ENTITY_TYPES:
entity_type = "concept"
# 查找已有实体(同名+同scope
existing = (
db.query(KnowledgeEntity)
.filter(
KnowledgeEntity.name == name,
KnowledgeEntity.scope_kind == scope_kind,
KnowledgeEntity.scope_id == scope_id,
)
.first()
)
# 生成 embedding
emb_text = _build_entity_embedding_text(name, entity_type, description)
embedding_json = None
try:
emb = await embedding_service.generate_embedding(emb_text)
if emb:
embedding_json = _serialize_embedding(emb)
except Exception as e:
logger.warning("生成实体 embedding 失败: %s", e)
if existing:
# 更新已有实体
if description and len(description) > len(existing.description or ""):
existing.description = description
if embedding_json:
existing.embedding = embedding_json
if metadata:
merged = {**(existing.metadata_ or {}), **metadata}
existing.metadata_ = merged
existing.confidence = confidence
db.commit()
db.refresh(existing)
logger.debug("更新知识实体: %s (%s)", name, entity_type)
return existing
entity = KnowledgeEntity(
name=name,
entity_type=entity_type,
description=description,
embedding=embedding_json,
metadata_=metadata or {},
source=source,
confidence=confidence,
scope_kind=scope_kind,
scope_id=scope_id,
user_id=user_id,
)
db.add(entity)
db.commit()
db.refresh(entity)
logger.info("新增知识实体: %s (%s) id=%s", name, entity_type, entity.id)
return entity
async def add_entities_batch(
db: Session,
entities: List[Dict[str, Any]],
scope_kind: str = "agent",
scope_id: str = "",
user_id: Optional[str] = None,
) -> List[KnowledgeEntity]:
"""批量添加实体,返回新增/更新后的实体列表。"""
results: List[KnowledgeEntity] = []
for ent in entities:
try:
e = await add_entity(
db,
name=ent.get("name", ""),
entity_type=ent.get("entity_type", "concept"),
description=ent.get("description", ""),
metadata=ent.get("metadata"),
source=ent.get("source", "extracted"),
confidence=ent.get("confidence", "medium"),
scope_kind=scope_kind,
scope_id=scope_id,
user_id=user_id,
)
if e:
results.append(e)
except Exception as ex:
logger.warning("批量添加实体失败: name=%s err=%s", ent.get("name"), ex)
return results
# ═══════════════════════════════════════════════════════════════
# 关系管理
# ═══════════════════════════════════════════════════════════════
def add_relation(
db: Session,
source_entity_id: str,
target_entity_id: str,
relation_type: str = "related_to",
description: str = "",
weight: float = 1.0,
scope_kind: str = "agent",
scope_id: str = "",
) -> Optional[KnowledgeRelation]:
"""添加两个实体间的关系(去重)。"""
if source_entity_id == target_entity_id:
return None
if relation_type not in RELATION_TYPES:
relation_type = "related_to"
existing = (
db.query(KnowledgeRelation)
.filter(
KnowledgeRelation.source_entity_id == source_entity_id,
KnowledgeRelation.target_entity_id == target_entity_id,
KnowledgeRelation.relation_type == relation_type,
KnowledgeRelation.scope_kind == scope_kind,
KnowledgeRelation.scope_id == scope_id,
)
.first()
)
if existing:
logger.debug("关系已存在: %s -[%s]-> %s", source_entity_id[:8], relation_type, target_entity_id[:8])
return existing
rel = KnowledgeRelation(
source_entity_id=source_entity_id,
target_entity_id=target_entity_id,
relation_type=relation_type,
description=description,
weight=str(weight),
scope_kind=scope_kind,
scope_id=scope_id,
)
db.add(rel)
db.commit()
db.refresh(rel)
logger.info("新增关系: %s -[%s]-> %s", source_entity_id[:8], relation_type, target_entity_id[:8])
return rel
def add_relations_from_map(
db: Session,
entity_map: Dict[str, str], # name -> entity_id
relations: List[Dict[str, Any]],
scope_kind: str = "agent",
scope_id: str = "",
) -> int:
"""从关系映射批量添加关系。relations 中 source/target 使用实体名称,自动映射为 ID。"""
count = 0
for rel in relations:
src_name = rel.get("source", "")
tgt_name = rel.get("target", "")
src_id = entity_map.get(src_name)
tgt_id = entity_map.get(tgt_name)
if not src_id or not tgt_id:
continue
r = add_relation(
db,
source_entity_id=src_id,
target_entity_id=tgt_id,
relation_type=rel.get("relation_type", "related_to"),
description=rel.get("description", ""),
weight=float(rel.get("weight", 1.0)),
scope_kind=scope_kind,
scope_id=scope_id,
)
if r:
count += 1
return count
# ═══════════════════════════════════════════════════════════════
# 图谱查询
# ═══════════════════════════════════════════════════════════════
def get_entity_by_id(db: Session, entity_id: str) -> Optional[KnowledgeEntity]:
return db.query(KnowledgeEntity).filter(KnowledgeEntity.id == entity_id).first()
def search_entities(
db: Session,
keyword: str = "",
entity_type: Optional[str] = None,
scope_kind: str = "agent",
scope_id: str = "",
limit: int = 20,
) -> List[KnowledgeEntity]:
"""关键词搜索实体。"""
q = db.query(KnowledgeEntity).filter(
KnowledgeEntity.scope_kind == scope_kind,
KnowledgeEntity.scope_id == scope_id,
)
if entity_type:
q = q.filter(KnowledgeEntity.entity_type == entity_type)
if keyword:
pattern = f"%{keyword}%"
q = q.filter(
or_(
KnowledgeEntity.name.like(pattern),
KnowledgeEntity.description.like(pattern),
)
)
return q.order_by(KnowledgeEntity.confidence.desc(), KnowledgeEntity.created_at.desc()).limit(limit).all()
def get_neighbors(
db: Session,
entity_id: str,
relation_types: Optional[List[str]] = None,
direction: str = "both",
max_depth: int = 1,
limit: int = 30,
) -> Dict[str, Any]:
"""
获取实体的图谱邻居。
Returns:
{
"entity": {...},
"neighbors": [{"entity": {...}, "relation": {...}, "direction": "out"|"in"}],
"subgraph_size": int,
}
"""
entity = get_entity_by_id(db, entity_id)
if not entity:
return {"entity": None, "neighbors": [], "subgraph_size": 0}
neighbors: List[Dict[str, Any]] = []
if direction in ("out", "both"):
q = db.query(KnowledgeRelation).filter(
KnowledgeRelation.source_entity_id == entity_id,
)
if relation_types:
q = q.filter(KnowledgeRelation.relation_type.in_(relation_types))
for rel in q.limit(limit).all():
target = get_entity_by_id(db, rel.target_entity_id)
if target:
neighbors.append({
"entity": _entity_to_dict(target),
"relation": _relation_to_dict(rel),
"direction": "out",
})
if direction in ("in", "both"):
q = db.query(KnowledgeRelation).filter(
KnowledgeRelation.target_entity_id == entity_id,
)
if relation_types:
q = q.filter(KnowledgeRelation.relation_type.in_(relation_types))
for rel in q.limit(limit).all():
source = get_entity_by_id(db, rel.source_entity_id)
if source:
neighbors.append({
"entity": _entity_to_dict(source),
"relation": _relation_to_dict(rel),
"direction": "in",
})
return {
"entity": _entity_to_dict(entity),
"neighbors": neighbors[:limit],
"subgraph_size": len(neighbors),
}
def get_entity_graph(
db: Session,
entity_id: str,
max_depth: int = 2,
limit: int = 50,
) -> Dict[str, Any]:
"""获取以实体为中心的子图BFS 展开)。"""
entity = get_entity_by_id(db, entity_id)
if not entity:
return {"nodes": [], "edges": [], "center_id": entity_id}
visited: Set[str] = {entity_id}
nodes: Dict[str, Dict[str, Any]] = {entity_id: _entity_to_dict(entity)}
edges: List[Dict[str, Any]] = []
frontier = {entity_id}
for depth in range(max_depth):
next_frontier: Set[str] = set()
if len(nodes) >= limit:
break
for nid in list(frontier):
# 出边
out_rels = (
db.query(KnowledgeRelation)
.filter(KnowledgeRelation.source_entity_id == nid)
.limit(limit // 2)
.all()
)
for rel in out_rels:
edges.append(_relation_to_dict(rel))
if rel.target_entity_id not in visited and len(nodes) < limit:
tgt = get_entity_by_id(db, rel.target_entity_id)
if tgt:
nodes[rel.target_entity_id] = _entity_to_dict(tgt)
visited.add(rel.target_entity_id)
next_frontier.add(rel.target_entity_id)
# 入边
in_rels = (
db.query(KnowledgeRelation)
.filter(KnowledgeRelation.target_entity_id == nid)
.limit(limit // 2)
.all()
)
for rel in in_rels:
edges.append(_relation_to_dict(rel))
if rel.source_entity_id not in visited and len(nodes) < limit:
src = get_entity_by_id(db, rel.source_entity_id)
if src:
nodes[rel.source_entity_id] = _entity_to_dict(src)
visited.add(rel.source_entity_id)
next_frontier.add(rel.source_entity_id)
frontier = next_frontier
if not frontier:
break
return {
"nodes": list(nodes.values()),
"edges": edges,
"center_id": entity_id,
}
def find_path(
db: Session,
source_id: str,
target_id: str,
max_depth: int = 4,
) -> Optional[List[Dict[str, Any]]]:
"""BFS 查找两个实体间的最短路径。"""
if source_id == target_id:
return [{"entity": _entity_to_dict(get_entity_by_id(db, source_id)), "relation": None}]
visited: Set[str] = {source_id}
# BFS queue: (current_id, path_so_far)
from collections import deque
queue: deque = deque()
queue.append((source_id, []))
while queue:
current_id, path = queue.popleft()
if len(path) >= max_depth:
continue
# 检查所有出边
out_rels = (
db.query(KnowledgeRelation)
.filter(KnowledgeRelation.source_entity_id == current_id)
.all()
)
for rel in out_rels:
if rel.target_entity_id in visited:
continue
new_path = path + [{
"from": _entity_to_dict(get_entity_by_id(db, current_id)),
"relation": _relation_to_dict(rel),
"to": _entity_to_dict(get_entity_by_id(db, rel.target_entity_id)),
}]
if rel.target_entity_id == target_id:
return new_path
visited.add(rel.target_entity_id)
queue.append((rel.target_entity_id, new_path))
# 检查所有入边
in_rels = (
db.query(KnowledgeRelation)
.filter(KnowledgeRelation.target_entity_id == current_id)
.all()
)
for rel in in_rels:
if rel.source_entity_id in visited:
continue
new_path = path + [{
"from": _entity_to_dict(get_entity_by_id(db, current_id)),
"relation": _relation_to_dict(rel),
"to": _entity_to_dict(get_entity_by_id(db, rel.source_entity_id)),
}]
if rel.source_entity_id == target_id:
return new_path
visited.add(rel.source_entity_id)
queue.append((rel.source_entity_id, new_path))
return None
# ═══════════════════════════════════════════════════════════════
# LLM 驱动的实体与关系提取
# ═══════════════════════════════════════════════════════════════
EXTRACTION_PROMPT = """你是一个知识图谱构建助手。请分析以下文本,提取其中的知识点实体和它们之间的关系。
请返回 JSON 格式(不要 markdown 包裹),包含:
1. entities: 提取的知识实体列表,每个实体包含:
- name: 实体名称(简洁准确)
- entity_type: 类型concept=概念, formula=公式, fact=事实, term=术语, task=任务, skill=技能)
- description: 简要描述1-2句
- confidence: 置信度low/medium/high
2. relations: 实体间的关系列表,每个关系包含:
- source: 源实体名称
- target: 目标实体名称
- relation_type: 关系类型prerequisite=前置知识, extends=扩展, contains=包含, related_to=相关, example_of=示例, applies_to=应用)
- description: 关系说明1句话
注意:
- 只提取文本中明确出现的知识点,不编造
- 关系应只在同段文本中有关联的实体间建立
- 如果文本中知识点不足,返回空数组即可
文本内容:
{text}"""
async def extract_from_text(
text: str,
scope_kind: str = "agent",
scope_id: str = "",
user_id: Optional[str] = None,
) -> Dict[str, Any]:
"""
使用 LLM 从文本中提取实体和关系,并写入知识图谱。
Returns:
{"entities": [...], "relations": [...], "entity_count": int, "relation_count": int}
"""
if not text or len(text.strip()) < 20:
return {"entities": [], "relations": [], "entity_count": 0, "relation_count": 0}
from openai import AsyncOpenAI
from app.core.config import settings
# 调用 LLM 提取
try:
api_key = settings.DEEPSEEK_API_KEY or settings.OPENAI_API_KEY or ""
base_url = settings.DEEPSEEK_BASE_URL or settings.OPENAI_BASE_URL or "https://api.deepseek.com"
if api_key == "your-openai-api-key":
api_key = settings.DEEPSEEK_API_KEY or ""
base_url = settings.DEEPSEEK_BASE_URL or "https://api.deepseek.com"
if not api_key:
logger.warning("知识图谱提取:未配置 API Key跳过")
return {"entities": [], "relations": [], "entity_count": 0, "relation_count": 0}
client = AsyncOpenAI(api_key=api_key, base_url=base_url)
resp = await client.chat.completions.create(
model="deepseek-v4-flash",
messages=[{"role": "user", "content": EXTRACTION_PROMPT.format(text=text[:4000])}],
temperature=0.2,
max_tokens=2048,
timeout=30,
)
raw = resp.choices[0].message.content or ""
# 解析 JSON
raw = raw.strip().removeprefix("```json").removesuffix("```").strip()
result = json.loads(raw)
except json.JSONDecodeError:
logger.warning("知识图谱提取LLM 返回非 JSON跳过")
return {"entities": [], "relations": [], "entity_count": 0, "relation_count": 0}
except Exception as e:
logger.warning("知识图谱提取失败: %s", e)
return {"entities": [], "relations": [], "entity_count": 0, "relation_count": 0}
extracted_entities = result.get("entities", [])
extracted_relations = result.get("relations", [])
if not extracted_entities:
return {"entities": [], "relations": [], "entity_count": 0, "relation_count": 0}
# 写入数据库
db: Optional[Session] = None
try:
db = SessionLocal()
# 批量添加实体
created = await add_entities_batch(
db, extracted_entities,
scope_kind=scope_kind, scope_id=scope_id, user_id=user_id,
)
# 构建 name->id 映射
entity_map: Dict[str, str] = {}
for e in created:
entity_map[e.name] = e.id
# 添加关系
rel_count = add_relations_from_map(
db, entity_map, extracted_relations,
scope_kind=scope_kind, scope_id=scope_id,
)
return {
"entities": [_entity_to_dict(e) for e in created],
"relations": extracted_relations[:rel_count],
"entity_count": len(created),
"relation_count": rel_count,
}
except Exception as e:
logger.error("写入知识图谱失败: %s", e)
if db:
db.rollback()
return {"entities": [], "relations": [], "entity_count": 0, "relation_count": 0}
finally:
if db:
db.close()
# ═══════════════════════════════════════════════════════════════
# 向量+图谱融合检索(核心 RAG 能力)
# ═══════════════════════════════════════════════════════════════
async def hybrid_search(
query: str,
scope_kind: str = "agent",
scope_id: str = "",
top_k: int = 5,
include_neighbors: bool = True,
) -> Dict[str, Any]:
"""
向量+图谱融合检索。
1. 用 query 生成 embedding在知识实体中做向量相似度搜索
2. 对 Top-K 实体展开图谱邻居
3. 返回融合后的结构化知识上下文
Returns:
{
"query": str,
"vector_matches": [...], # 向量检索命中的实体
"graph_expansion": {...}, # 图谱展开的邻居子图
"formatted_context": str, # 格式化后的文本上下文(可直接注入 LLM
}
"""
if not query or not query.strip():
return {"query": query, "vector_matches": [], "graph_expansion": {}, "formatted_context": ""}
db: Optional[Session] = None
try:
db = SessionLocal()
# Step 1: 向量检索
query_emb = await embedding_service.generate_embedding(query)
if not query_emb:
# 降级:关键词检索
entities = search_entities(db, keyword=query, scope_kind=scope_kind, scope_id=scope_id, limit=top_k * 3)
vector_matches = [_entity_to_dict(e) for e in entities[:top_k]]
else:
all_entities = (
db.query(KnowledgeEntity)
.filter(
KnowledgeEntity.scope_kind == scope_kind,
KnowledgeEntity.scope_id == scope_id,
KnowledgeEntity.embedding.isnot(None),
)
.limit(200)
.all()
)
entries: List[VectorEntry] = []
for e in all_entities:
emb = embedding_service.deserialize_embedding(e.embedding) if e.embedding else []
if emb:
entries.append({
"id": e.id,
"scope_kind": scope_kind,
"scope_id": scope_id,
"content_text": _build_entity_embedding_text(e.name, e.entity_type, e.description or ""),
"embedding": emb,
"metadata": {"entity_type": e.entity_type, "name": e.name, "entity_id": e.id},
})
matched = await embedding_service.similarity_search(query_emb, entries, top_k=top_k)
# 重新获取完整实体信息
matched_ids = [m["metadata"].get("entity_id", "") for m in matched]
vector_matches = []
for mid in matched_ids:
e = get_entity_by_id(db, mid)
if e:
score = next((m.get("score", 0) for m in matched if m["metadata"].get("entity_id") == mid), 0)
d = _entity_to_dict(e)
d["score"] = score
vector_matches.append(d)
# Step 2: 图谱展开(对 Top-3 实体取邻居)
graph_expansion: Dict[str, Any] = {"entities": [], "relations": []}
seen_entity_ids: Set[str] = set()
if include_neighbors and vector_matches:
for vm in vector_matches[:3]:
eid = vm["id"]
if eid in seen_entity_ids:
continue
seen_entity_ids.add(eid)
sub = get_neighbors(db, eid, direction="both", limit=5)
if sub["entity"] and sub["entity"] not in graph_expansion["entities"]:
graph_expansion["entities"].append(sub["entity"])
for nb in sub["neighbors"]:
if nb["entity"] not in graph_expansion["entities"]:
graph_expansion["entities"].append(nb["entity"])
rel_dict = nb["relation"]
if rel_dict not in graph_expansion["relations"]:
graph_expansion["relations"].append(rel_dict)
# Step 3: 格式化上下文
formatted = _format_hybrid_context(query, vector_matches, graph_expansion)
return {
"query": query,
"vector_matches": vector_matches,
"graph_expansion": graph_expansion,
"formatted_context": formatted,
}
except Exception as e:
logger.error("融合检索失败: %s", e)
return {"query": query, "vector_matches": [], "graph_expansion": {}, "formatted_context": ""}
finally:
if db:
db.close()
def _format_hybrid_context(
query: str,
vector_matches: List[Dict[str, Any]],
graph_expansion: Dict[str, Any],
) -> str:
"""格式化融合检索结果为 LLM 可读的文本块。"""
parts: List[str] = []
if vector_matches:
parts.append("## 相关知识实体(向量检索)")
for i, vm in enumerate(vector_matches, 1):
score_str = f" (匹配度: {vm.get('score', 1.0):.2f})" if vm.get('score', 1.0) < 1.0 else ""
parts.append(f"{i}. [{vm.get('entity_type', 'concept')}] **{vm.get('name', '')}**{score_str}")
if vm.get("description"):
parts.append(f" {vm['description'][:300]}")
if graph_expansion.get("entities"):
parts.append("\n## 关联知识点(图谱展开)")
for ent in graph_expansion["entities"]:
parts.append(f"- [{ent.get('entity_type', '')}] **{ent.get('name', '')}**")
if ent.get("description"):
parts.append(f" {ent['description'][:200]}")
if graph_expansion.get("relations"):
parts.append("\n## 知识关系")
for rel in graph_expansion["relations"]:
parts.append(f"- `{rel.get('relation_type', '')}`: {rel.get('description', '')}")
return "\n".join(parts) if parts else ""
# ═══════════════════════════════════════════════════════════════
# 学习路径推荐
# ═══════════════════════════════════════════════════════════════
def recommend_learning_path(
db: Session,
target_entity_ids: List[str],
scope_kind: str = "agent",
scope_id: str = "",
) -> Dict[str, Any]:
"""
基于知识图谱推荐学习路径。
对目标实体集合,分析前置关系,给出建议的学习顺序。
"""
all_entities: Dict[str, Dict[str, Any]] = {}
all_prereqs: List[Dict[str, Any]] = []
for eid in target_entity_ids:
entity = get_entity_by_id(db, eid)
if not entity:
continue
all_entities[eid] = _entity_to_dict(entity)
# 查找前置知识
prereqs = (
db.query(KnowledgeRelation)
.filter(
KnowledgeRelation.target_entity_id == eid,
KnowledgeRelation.relation_type == "prerequisite",
KnowledgeRelation.scope_kind == scope_kind,
KnowledgeRelation.scope_id == scope_id,
)
.all()
)
for pr in prereqs:
src = get_entity_by_id(db, pr.source_entity_id)
if src:
all_entities[pr.source_entity_id] = _entity_to_dict(src)
all_prereqs.append({
"prerequisite": _entity_to_dict(src),
"target": _entity_to_dict(entity),
"relation": _relation_to_dict(pr),
})
# 简单排序:按依赖数量(叶子在前,被依赖的在后)
def _dep_count(eid: str) -> int:
return sum(1 for p in all_prereqs if p["target"]["id"] == eid)
sorted_entities = sorted(all_entities.values(), key=lambda e: _dep_count(e["id"]))
return {
"entities": sorted_entities,
"prerequisites": all_prereqs,
"suggested_order": [e["name"] for e in sorted_entities],
"summary": f"建议按以下顺序学习: {''.join(e['name'] for e in sorted_entities)}" if sorted_entities else "暂无学习路径",
}
# ═══════════════════════════════════════════════════════════════
# 工具函数
# ═══════════════════════════════════════════════════════════════
def _entity_to_dict(entity: KnowledgeEntity) -> Dict[str, Any]:
return {
"id": entity.id,
"name": entity.name,
"entity_type": entity.entity_type,
"description": entity.description,
"confidence": entity.confidence,
"source": entity.source,
"metadata": entity.metadata_ or {},
"created_at": entity.created_at.isoformat() if entity.created_at else None,
}
def _relation_to_dict(rel: KnowledgeRelation) -> Dict[str, Any]:
return {
"id": rel.id,
"source_entity_id": rel.source_entity_id,
"target_entity_id": rel.target_entity_id,
"relation_type": rel.relation_type,
"description": rel.description,
"weight": float(rel.weight) if rel.weight else 1.0,
}