""" 知识图谱服务 — 实体抽取、关系构建、图谱查询、向量融合检索 为智能学习助手提供 KG+RAG 核心能力: - 从对话/文本中提取知识点实体 - 构建实体间的语义关系(前置/扩展/包含/示例/关联) - 图谱查询:邻近节点、路径查找、子图展开 - 向量+图谱融合检索:同时命中语义相似和结构关联 """ from __future__ import annotations import json import logging from typing import Any, Dict, List, Optional, Tuple, Set from sqlalchemy.orm import Session from sqlalchemy import or_, and_ from app.core.database import SessionLocal from app.models.agent import KnowledgeEntity, KnowledgeRelation from app.services.embedding_service import embedding_service, VectorEntry logger = logging.getLogger(__name__) # 实体类型定义 ENTITY_TYPES = ["concept", "formula", "fact", "term", "task", "skill"] # 关系类型定义 RELATION_TYPES = { "prerequisite": "前置知识(学习 B 前需要先掌握 A)", "extends": "扩展(B 是 A 的深入/延伸)", "contains": "包含(A 包含子知识点 B)", "related_to": "相关(A 与 B 存在关联)", "example_of": "示例(B 是 A 的实例/例题)", "applies_to": "应用(A 可应用于 B)", } def _build_entity_embedding_text(name: str, entity_type: str, description: str) -> str: """构建用于 embedding 的统一文本。""" parts = [f"[{entity_type}] {name}"] if description: parts.append(f": {description[:500]}") return "".join(parts) def _serialize_embedding(emb: Optional[List[float]]) -> Optional[str]: if not emb: return None return embedding_service.serialize_embedding(emb) # ═══════════════════════════════════════════════════════════════ # 实体管理 # ═══════════════════════════════════════════════════════════════ async def add_entity( db: Session, name: str, entity_type: str = "concept", description: str = "", metadata: Optional[Dict[str, Any]] = None, source: str = "extracted", confidence: str = "medium", scope_kind: str = "agent", scope_id: str = "", user_id: Optional[str] = None, ) -> Optional[KnowledgeEntity]: """添加或更新知识实体(按 name+scope 去重,更新描述和 embedding)。""" name = name.strip()[:200] if not name: return None if entity_type not in ENTITY_TYPES: entity_type = "concept" # 查找已有实体(同名+同scope) existing = ( db.query(KnowledgeEntity) .filter( KnowledgeEntity.name == name, KnowledgeEntity.scope_kind == scope_kind, KnowledgeEntity.scope_id == scope_id, ) .first() ) # 生成 embedding emb_text = _build_entity_embedding_text(name, entity_type, description) embedding_json = None try: emb = await embedding_service.generate_embedding(emb_text) if emb: embedding_json = _serialize_embedding(emb) except Exception as e: logger.warning("生成实体 embedding 失败: %s", e) if existing: # 更新已有实体 if description and len(description) > len(existing.description or ""): existing.description = description if embedding_json: existing.embedding = embedding_json if metadata: merged = {**(existing.metadata_ or {}), **metadata} existing.metadata_ = merged existing.confidence = confidence db.commit() db.refresh(existing) logger.debug("更新知识实体: %s (%s)", name, entity_type) return existing entity = KnowledgeEntity( name=name, entity_type=entity_type, description=description, embedding=embedding_json, metadata_=metadata or {}, source=source, confidence=confidence, scope_kind=scope_kind, scope_id=scope_id, user_id=user_id, ) db.add(entity) db.commit() db.refresh(entity) logger.info("新增知识实体: %s (%s) id=%s", name, entity_type, entity.id) return entity async def add_entities_batch( db: Session, entities: List[Dict[str, Any]], scope_kind: str = "agent", scope_id: str = "", user_id: Optional[str] = None, ) -> List[KnowledgeEntity]: """批量添加实体,返回新增/更新后的实体列表。""" results: List[KnowledgeEntity] = [] for ent in entities: try: e = await add_entity( db, name=ent.get("name", ""), entity_type=ent.get("entity_type", "concept"), description=ent.get("description", ""), metadata=ent.get("metadata"), source=ent.get("source", "extracted"), confidence=ent.get("confidence", "medium"), scope_kind=scope_kind, scope_id=scope_id, user_id=user_id, ) if e: results.append(e) except Exception as ex: logger.warning("批量添加实体失败: name=%s err=%s", ent.get("name"), ex) return results # ═══════════════════════════════════════════════════════════════ # 关系管理 # ═══════════════════════════════════════════════════════════════ def add_relation( db: Session, source_entity_id: str, target_entity_id: str, relation_type: str = "related_to", description: str = "", weight: float = 1.0, scope_kind: str = "agent", scope_id: str = "", ) -> Optional[KnowledgeRelation]: """添加两个实体间的关系(去重)。""" if source_entity_id == target_entity_id: return None if relation_type not in RELATION_TYPES: relation_type = "related_to" existing = ( db.query(KnowledgeRelation) .filter( KnowledgeRelation.source_entity_id == source_entity_id, KnowledgeRelation.target_entity_id == target_entity_id, KnowledgeRelation.relation_type == relation_type, KnowledgeRelation.scope_kind == scope_kind, KnowledgeRelation.scope_id == scope_id, ) .first() ) if existing: logger.debug("关系已存在: %s -[%s]-> %s", source_entity_id[:8], relation_type, target_entity_id[:8]) return existing rel = KnowledgeRelation( source_entity_id=source_entity_id, target_entity_id=target_entity_id, relation_type=relation_type, description=description, weight=str(weight), scope_kind=scope_kind, scope_id=scope_id, ) db.add(rel) db.commit() db.refresh(rel) logger.info("新增关系: %s -[%s]-> %s", source_entity_id[:8], relation_type, target_entity_id[:8]) return rel def add_relations_from_map( db: Session, entity_map: Dict[str, str], # name -> entity_id relations: List[Dict[str, Any]], scope_kind: str = "agent", scope_id: str = "", ) -> int: """从关系映射批量添加关系。relations 中 source/target 使用实体名称,自动映射为 ID。""" count = 0 for rel in relations: src_name = rel.get("source", "") tgt_name = rel.get("target", "") src_id = entity_map.get(src_name) tgt_id = entity_map.get(tgt_name) if not src_id or not tgt_id: continue r = add_relation( db, source_entity_id=src_id, target_entity_id=tgt_id, relation_type=rel.get("relation_type", "related_to"), description=rel.get("description", ""), weight=float(rel.get("weight", 1.0)), scope_kind=scope_kind, scope_id=scope_id, ) if r: count += 1 return count # ═══════════════════════════════════════════════════════════════ # 图谱查询 # ═══════════════════════════════════════════════════════════════ def get_entity_by_id(db: Session, entity_id: str) -> Optional[KnowledgeEntity]: return db.query(KnowledgeEntity).filter(KnowledgeEntity.id == entity_id).first() def search_entities( db: Session, keyword: str = "", entity_type: Optional[str] = None, scope_kind: str = "agent", scope_id: str = "", limit: int = 20, ) -> List[KnowledgeEntity]: """关键词搜索实体。""" q = db.query(KnowledgeEntity).filter( KnowledgeEntity.scope_kind == scope_kind, KnowledgeEntity.scope_id == scope_id, ) if entity_type: q = q.filter(KnowledgeEntity.entity_type == entity_type) if keyword: pattern = f"%{keyword}%" q = q.filter( or_( KnowledgeEntity.name.like(pattern), KnowledgeEntity.description.like(pattern), ) ) return q.order_by(KnowledgeEntity.confidence.desc(), KnowledgeEntity.created_at.desc()).limit(limit).all() def get_neighbors( db: Session, entity_id: str, relation_types: Optional[List[str]] = None, direction: str = "both", max_depth: int = 1, limit: int = 30, ) -> Dict[str, Any]: """ 获取实体的图谱邻居。 Returns: { "entity": {...}, "neighbors": [{"entity": {...}, "relation": {...}, "direction": "out"|"in"}], "subgraph_size": int, } """ entity = get_entity_by_id(db, entity_id) if not entity: return {"entity": None, "neighbors": [], "subgraph_size": 0} neighbors: List[Dict[str, Any]] = [] if direction in ("out", "both"): q = db.query(KnowledgeRelation).filter( KnowledgeRelation.source_entity_id == entity_id, ) if relation_types: q = q.filter(KnowledgeRelation.relation_type.in_(relation_types)) for rel in q.limit(limit).all(): target = get_entity_by_id(db, rel.target_entity_id) if target: neighbors.append({ "entity": _entity_to_dict(target), "relation": _relation_to_dict(rel), "direction": "out", }) if direction in ("in", "both"): q = db.query(KnowledgeRelation).filter( KnowledgeRelation.target_entity_id == entity_id, ) if relation_types: q = q.filter(KnowledgeRelation.relation_type.in_(relation_types)) for rel in q.limit(limit).all(): source = get_entity_by_id(db, rel.source_entity_id) if source: neighbors.append({ "entity": _entity_to_dict(source), "relation": _relation_to_dict(rel), "direction": "in", }) return { "entity": _entity_to_dict(entity), "neighbors": neighbors[:limit], "subgraph_size": len(neighbors), } def get_entity_graph( db: Session, entity_id: str, max_depth: int = 2, limit: int = 50, ) -> Dict[str, Any]: """获取以实体为中心的子图(BFS 展开)。""" entity = get_entity_by_id(db, entity_id) if not entity: return {"nodes": [], "edges": [], "center_id": entity_id} visited: Set[str] = {entity_id} nodes: Dict[str, Dict[str, Any]] = {entity_id: _entity_to_dict(entity)} edges: List[Dict[str, Any]] = [] frontier = {entity_id} for depth in range(max_depth): next_frontier: Set[str] = set() if len(nodes) >= limit: break for nid in list(frontier): # 出边 out_rels = ( db.query(KnowledgeRelation) .filter(KnowledgeRelation.source_entity_id == nid) .limit(limit // 2) .all() ) for rel in out_rels: edges.append(_relation_to_dict(rel)) if rel.target_entity_id not in visited and len(nodes) < limit: tgt = get_entity_by_id(db, rel.target_entity_id) if tgt: nodes[rel.target_entity_id] = _entity_to_dict(tgt) visited.add(rel.target_entity_id) next_frontier.add(rel.target_entity_id) # 入边 in_rels = ( db.query(KnowledgeRelation) .filter(KnowledgeRelation.target_entity_id == nid) .limit(limit // 2) .all() ) for rel in in_rels: edges.append(_relation_to_dict(rel)) if rel.source_entity_id not in visited and len(nodes) < limit: src = get_entity_by_id(db, rel.source_entity_id) if src: nodes[rel.source_entity_id] = _entity_to_dict(src) visited.add(rel.source_entity_id) next_frontier.add(rel.source_entity_id) frontier = next_frontier if not frontier: break return { "nodes": list(nodes.values()), "edges": edges, "center_id": entity_id, } def find_path( db: Session, source_id: str, target_id: str, max_depth: int = 4, ) -> Optional[List[Dict[str, Any]]]: """BFS 查找两个实体间的最短路径。""" if source_id == target_id: return [{"entity": _entity_to_dict(get_entity_by_id(db, source_id)), "relation": None}] visited: Set[str] = {source_id} # BFS queue: (current_id, path_so_far) from collections import deque queue: deque = deque() queue.append((source_id, [])) while queue: current_id, path = queue.popleft() if len(path) >= max_depth: continue # 检查所有出边 out_rels = ( db.query(KnowledgeRelation) .filter(KnowledgeRelation.source_entity_id == current_id) .all() ) for rel in out_rels: if rel.target_entity_id in visited: continue new_path = path + [{ "from": _entity_to_dict(get_entity_by_id(db, current_id)), "relation": _relation_to_dict(rel), "to": _entity_to_dict(get_entity_by_id(db, rel.target_entity_id)), }] if rel.target_entity_id == target_id: return new_path visited.add(rel.target_entity_id) queue.append((rel.target_entity_id, new_path)) # 检查所有入边 in_rels = ( db.query(KnowledgeRelation) .filter(KnowledgeRelation.target_entity_id == current_id) .all() ) for rel in in_rels: if rel.source_entity_id in visited: continue new_path = path + [{ "from": _entity_to_dict(get_entity_by_id(db, current_id)), "relation": _relation_to_dict(rel), "to": _entity_to_dict(get_entity_by_id(db, rel.source_entity_id)), }] if rel.source_entity_id == target_id: return new_path visited.add(rel.source_entity_id) queue.append((rel.source_entity_id, new_path)) return None # ═══════════════════════════════════════════════════════════════ # LLM 驱动的实体与关系提取 # ═══════════════════════════════════════════════════════════════ EXTRACTION_PROMPT = """你是一个知识图谱构建助手。请分析以下文本,提取其中的知识点实体和它们之间的关系。 请返回 JSON 格式(不要 markdown 包裹),包含: 1. entities: 提取的知识实体列表,每个实体包含: - name: 实体名称(简洁准确) - entity_type: 类型(concept=概念, formula=公式, fact=事实, term=术语, task=任务, skill=技能) - description: 简要描述(1-2句) - confidence: 置信度(low/medium/high) 2. relations: 实体间的关系列表,每个关系包含: - source: 源实体名称 - target: 目标实体名称 - relation_type: 关系类型(prerequisite=前置知识, extends=扩展, contains=包含, related_to=相关, example_of=示例, applies_to=应用) - description: 关系说明(1句话) 注意: - 只提取文本中明确出现的知识点,不编造 - 关系应只在同段文本中有关联的实体间建立 - 如果文本中知识点不足,返回空数组即可 文本内容: {text}""" async def extract_from_text( text: str, scope_kind: str = "agent", scope_id: str = "", user_id: Optional[str] = None, ) -> Dict[str, Any]: """ 使用 LLM 从文本中提取实体和关系,并写入知识图谱。 Returns: {"entities": [...], "relations": [...], "entity_count": int, "relation_count": int} """ if not text or len(text.strip()) < 20: return {"entities": [], "relations": [], "entity_count": 0, "relation_count": 0} from openai import AsyncOpenAI from app.core.config import settings # 调用 LLM 提取 try: api_key = settings.DEEPSEEK_API_KEY or settings.OPENAI_API_KEY or "" base_url = settings.DEEPSEEK_BASE_URL or settings.OPENAI_BASE_URL or "https://api.deepseek.com" if api_key == "your-openai-api-key": api_key = settings.DEEPSEEK_API_KEY or "" base_url = settings.DEEPSEEK_BASE_URL or "https://api.deepseek.com" if not api_key: logger.warning("知识图谱提取:未配置 API Key,跳过") return {"entities": [], "relations": [], "entity_count": 0, "relation_count": 0} client = AsyncOpenAI(api_key=api_key, base_url=base_url) resp = await client.chat.completions.create( model="deepseek-v4-flash", messages=[{"role": "user", "content": EXTRACTION_PROMPT.format(text=text[:4000])}], temperature=0.2, max_tokens=2048, timeout=30, ) raw = resp.choices[0].message.content or "" # 解析 JSON raw = raw.strip().removeprefix("```json").removesuffix("```").strip() result = json.loads(raw) except json.JSONDecodeError: logger.warning("知识图谱提取:LLM 返回非 JSON,跳过") return {"entities": [], "relations": [], "entity_count": 0, "relation_count": 0} except Exception as e: logger.warning("知识图谱提取失败: %s", e) return {"entities": [], "relations": [], "entity_count": 0, "relation_count": 0} extracted_entities = result.get("entities", []) extracted_relations = result.get("relations", []) if not extracted_entities: return {"entities": [], "relations": [], "entity_count": 0, "relation_count": 0} # 写入数据库 db: Optional[Session] = None try: db = SessionLocal() # 批量添加实体 created = await add_entities_batch( db, extracted_entities, scope_kind=scope_kind, scope_id=scope_id, user_id=user_id, ) # 构建 name->id 映射 entity_map: Dict[str, str] = {} for e in created: entity_map[e.name] = e.id # 添加关系 rel_count = add_relations_from_map( db, entity_map, extracted_relations, scope_kind=scope_kind, scope_id=scope_id, ) return { "entities": [_entity_to_dict(e) for e in created], "relations": extracted_relations[:rel_count], "entity_count": len(created), "relation_count": rel_count, } except Exception as e: logger.error("写入知识图谱失败: %s", e) if db: db.rollback() return {"entities": [], "relations": [], "entity_count": 0, "relation_count": 0} finally: if db: db.close() # ═══════════════════════════════════════════════════════════════ # 向量+图谱融合检索(核心 RAG 能力) # ═══════════════════════════════════════════════════════════════ async def hybrid_search( query: str, scope_kind: str = "agent", scope_id: str = "", top_k: int = 5, include_neighbors: bool = True, ) -> Dict[str, Any]: """ 向量+图谱融合检索。 1. 用 query 生成 embedding,在知识实体中做向量相似度搜索 2. 对 Top-K 实体展开图谱邻居 3. 返回融合后的结构化知识上下文 Returns: { "query": str, "vector_matches": [...], # 向量检索命中的实体 "graph_expansion": {...}, # 图谱展开的邻居子图 "formatted_context": str, # 格式化后的文本上下文(可直接注入 LLM) } """ if not query or not query.strip(): return {"query": query, "vector_matches": [], "graph_expansion": {}, "formatted_context": ""} db: Optional[Session] = None try: db = SessionLocal() # Step 1: 向量检索 query_emb = await embedding_service.generate_embedding(query) if not query_emb: # 降级:关键词检索 entities = search_entities(db, keyword=query, scope_kind=scope_kind, scope_id=scope_id, limit=top_k * 3) vector_matches = [_entity_to_dict(e) for e in entities[:top_k]] else: all_entities = ( db.query(KnowledgeEntity) .filter( KnowledgeEntity.scope_kind == scope_kind, KnowledgeEntity.scope_id == scope_id, KnowledgeEntity.embedding.isnot(None), ) .limit(200) .all() ) entries: List[VectorEntry] = [] for e in all_entities: emb = embedding_service.deserialize_embedding(e.embedding) if e.embedding else [] if emb: entries.append({ "id": e.id, "scope_kind": scope_kind, "scope_id": scope_id, "content_text": _build_entity_embedding_text(e.name, e.entity_type, e.description or ""), "embedding": emb, "metadata": {"entity_type": e.entity_type, "name": e.name, "entity_id": e.id}, }) matched = await embedding_service.similarity_search(query_emb, entries, top_k=top_k) # 重新获取完整实体信息 matched_ids = [m["metadata"].get("entity_id", "") for m in matched] vector_matches = [] for mid in matched_ids: e = get_entity_by_id(db, mid) if e: score = next((m.get("score", 0) for m in matched if m["metadata"].get("entity_id") == mid), 0) d = _entity_to_dict(e) d["score"] = score vector_matches.append(d) # Step 2: 图谱展开(对 Top-3 实体取邻居) graph_expansion: Dict[str, Any] = {"entities": [], "relations": []} seen_entity_ids: Set[str] = set() if include_neighbors and vector_matches: for vm in vector_matches[:3]: eid = vm["id"] if eid in seen_entity_ids: continue seen_entity_ids.add(eid) sub = get_neighbors(db, eid, direction="both", limit=5) if sub["entity"] and sub["entity"] not in graph_expansion["entities"]: graph_expansion["entities"].append(sub["entity"]) for nb in sub["neighbors"]: if nb["entity"] not in graph_expansion["entities"]: graph_expansion["entities"].append(nb["entity"]) rel_dict = nb["relation"] if rel_dict not in graph_expansion["relations"]: graph_expansion["relations"].append(rel_dict) # Step 3: 格式化上下文 formatted = _format_hybrid_context(query, vector_matches, graph_expansion) return { "query": query, "vector_matches": vector_matches, "graph_expansion": graph_expansion, "formatted_context": formatted, } except Exception as e: logger.error("融合检索失败: %s", e) return {"query": query, "vector_matches": [], "graph_expansion": {}, "formatted_context": ""} finally: if db: db.close() def _format_hybrid_context( query: str, vector_matches: List[Dict[str, Any]], graph_expansion: Dict[str, Any], ) -> str: """格式化融合检索结果为 LLM 可读的文本块。""" parts: List[str] = [] if vector_matches: parts.append("## 相关知识实体(向量检索)") for i, vm in enumerate(vector_matches, 1): score_str = f" (匹配度: {vm.get('score', 1.0):.2f})" if vm.get('score', 1.0) < 1.0 else "" parts.append(f"{i}. [{vm.get('entity_type', 'concept')}] **{vm.get('name', '')}**{score_str}") if vm.get("description"): parts.append(f" {vm['description'][:300]}") if graph_expansion.get("entities"): parts.append("\n## 关联知识点(图谱展开)") for ent in graph_expansion["entities"]: parts.append(f"- [{ent.get('entity_type', '')}] **{ent.get('name', '')}**") if ent.get("description"): parts.append(f" {ent['description'][:200]}") if graph_expansion.get("relations"): parts.append("\n## 知识关系") for rel in graph_expansion["relations"]: parts.append(f"- `{rel.get('relation_type', '')}`: {rel.get('description', '')}") return "\n".join(parts) if parts else "" # ═══════════════════════════════════════════════════════════════ # 学习路径推荐 # ═══════════════════════════════════════════════════════════════ def recommend_learning_path( db: Session, target_entity_ids: List[str], scope_kind: str = "agent", scope_id: str = "", ) -> Dict[str, Any]: """ 基于知识图谱推荐学习路径。 对目标实体集合,分析前置关系,给出建议的学习顺序。 """ all_entities: Dict[str, Dict[str, Any]] = {} all_prereqs: List[Dict[str, Any]] = [] for eid in target_entity_ids: entity = get_entity_by_id(db, eid) if not entity: continue all_entities[eid] = _entity_to_dict(entity) # 查找前置知识 prereqs = ( db.query(KnowledgeRelation) .filter( KnowledgeRelation.target_entity_id == eid, KnowledgeRelation.relation_type == "prerequisite", KnowledgeRelation.scope_kind == scope_kind, KnowledgeRelation.scope_id == scope_id, ) .all() ) for pr in prereqs: src = get_entity_by_id(db, pr.source_entity_id) if src: all_entities[pr.source_entity_id] = _entity_to_dict(src) all_prereqs.append({ "prerequisite": _entity_to_dict(src), "target": _entity_to_dict(entity), "relation": _relation_to_dict(pr), }) # 简单排序:按依赖数量(叶子在前,被依赖的在后) def _dep_count(eid: str) -> int: return sum(1 for p in all_prereqs if p["target"]["id"] == eid) sorted_entities = sorted(all_entities.values(), key=lambda e: _dep_count(e["id"])) return { "entities": sorted_entities, "prerequisites": all_prereqs, "suggested_order": [e["name"] for e in sorted_entities], "summary": f"建议按以下顺序学习: {' → '.join(e['name'] for e in sorted_entities)}" if sorted_entities else "暂无学习路径", } # ═══════════════════════════════════════════════════════════════ # 工具函数 # ═══════════════════════════════════════════════════════════════ def _entity_to_dict(entity: KnowledgeEntity) -> Dict[str, Any]: return { "id": entity.id, "name": entity.name, "entity_type": entity.entity_type, "description": entity.description, "confidence": entity.confidence, "source": entity.source, "metadata": entity.metadata_ or {}, "created_at": entity.created_at.isoformat() if entity.created_at else None, } def _relation_to_dict(rel: KnowledgeRelation) -> Dict[str, Any]: return { "id": rel.id, "source_entity_id": rel.source_entity_id, "target_entity_id": rel.target_entity_id, "relation_type": rel.relation_type, "description": rel.description, "weight": float(rel.weight) if rel.weight else 1.0, }