Files
aiagent/backend/scripts/generate_knowledge_base_qa_agent.py
2026-01-23 09:49:45 +08:00

335 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
生成知识库问答Agent示例
展示如何使用向量数据库和RAG技术构建知识库问答系统包含
- 文本向量化HTTP节点调用embedding API
- 向量数据库检索vector_db节点
- 基于检索结果的答案生成LLM节点
- 上下文整合和格式化
"""
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from sqlalchemy.orm import Session
from app.core.database import SessionLocal
from app.models.agent import Agent
from app.models.user import User
from datetime import datetime
import uuid
def generate_knowledge_base_qa_agent(db: Session, user: User):
"""生成知识库问答Agent"""
nodes = []
edges = []
# ========== 1. 开始节点 ==========
start_node = {
"id": "start-1",
"type": "start",
"position": {"x": 50, "y": 400},
"data": {
"label": "开始",
"output_format": "json"
}
}
nodes.append(start_node)
# ========== 2. 问题预处理节点 ==========
preprocess_node = {
"id": "transform-preprocess",
"type": "transform",
"position": {"x": 250, "y": 400},
"data": {
"label": "问题预处理",
"mode": "merge",
"mapping": {
"query": "{{query}}",
"user_id": "{{user_id}}",
"timestamp": "{{timestamp}}"
}
}
}
nodes.append(preprocess_node)
# ========== 3. 文本向量化节点HTTP调用embedding API==========
# 注意需要在HTTP节点配置中手动设置Authorization header使用环境变量DEEPSEEK_API_KEY
embedding_node = {
"id": "http-embedding",
"type": "http",
"position": {"x": 450, "y": 400},
"data": {
"label": "文本向量化",
"method": "POST",
"url": "https://api.deepseek.com/v1/embeddings",
"headers": {
"Content-Type": "application/json",
"Authorization": "Bearer sk-fdf7cc1c73504e628ec0119b7e11b8cc"
},
"body": {
"model": "deepseek-embedding",
"input": "{{query}}"
},
"response_format": "json"
}
}
nodes.append(embedding_node)
# ========== 4. 提取embedding向量节点 ==========
extract_embedding_node = {
"id": "json-extract-embedding",
"type": "json",
"position": {"x": 650, "y": 400},
"data": {
"label": "提取向量",
"operation": "extract",
"path": "output.data.data[0].embedding"
}
}
nodes.append(extract_embedding_node)
# ========== 5. 准备向量搜索数据节点 ==========
prepare_search_node = {
"id": "transform-prepare-search",
"type": "transform",
"position": {"x": 850, "y": 400},
"data": {
"label": "准备搜索数据",
"mode": "merge",
"mapping": {
"embedding": "{{output}}",
"query": "{{query}}"
}
}
}
nodes.append(prepare_search_node)
# ========== 6. 向量数据库检索节点 ==========
vector_search_node = {
"id": "vector-search",
"type": "vector_db",
"position": {"x": 1050, "y": 400},
"data": {
"label": "知识库检索",
"operation": "search",
"collection": "knowledge_base",
"query_vector": "{{embedding}}",
"top_k": 5
}
}
nodes.append(vector_search_node)
# ========== 7. 整理检索结果节点 ==========
format_results_node = {
"id": "transform-format-results",
"type": "transform",
"position": {"x": 1250, "y": 400},
"data": {
"label": "整理检索结果",
"mode": "merge",
"mapping": {
"query": "{{query}}",
"search_results": "{{output}}"
}
}
}
nodes.append(format_results_node)
# ========== 8. 生成答案节点LLM==========
answer_node = {
"id": "llm-answer",
"type": "llm",
"position": {"x": 1650, "y": 400},
"data": {
"label": "生成答案",
"provider": "deepseek",
"model": "deepseek-chat",
"temperature": "0.7",
"max_tokens": "2000",
"prompt": """你是一个专业的知识库问答助手。请基于提供的知识库内容回答用户的问题。
用户问题:{{query}}
相关知识库内容(从向量搜索中检索到的相关文档):
{{search_results}}
请根据以上知识库内容回答用户的问题。要求:
1. 答案要准确、完整,基于知识库内容
2. 如果知识库中没有相关信息,请明确说明"根据知识库,未找到相关信息"
3. 答案要清晰、有条理使用Markdown格式
4. 如果知识库内容与问题不完全匹配,可以结合常识进行补充说明,但要标注哪些是知识库内容,哪些是补充说明
请直接输出答案,不要包含其他格式说明。"""
}
}
nodes.append(answer_node)
# ========== 9. 提取最终答案节点 ==========
extract_answer_node = {
"id": "json-extract-answer",
"type": "json",
"position": {"x": 1850, "y": 400},
"data": {
"label": "提取最终答案",
"operation": "extract",
"path": "output"
}
}
nodes.append(extract_answer_node)
# ========== 10. 结束节点 ==========
end_node = {
"id": "end-1",
"type": "end",
"position": {"x": 2050, "y": 400},
"data": {
"label": "结束"
}
}
nodes.append(end_node)
# ========== 连接边 ==========
edges.append({
"id": "e1",
"source": "start-1",
"target": "transform-preprocess",
"sourceHandle": "right",
"targetHandle": "left"
})
edges.append({
"id": "e2",
"source": "transform-preprocess",
"target": "http-embedding",
"sourceHandle": "right",
"targetHandle": "left"
})
edges.append({
"id": "e3",
"source": "http-embedding",
"target": "json-extract-embedding",
"sourceHandle": "right",
"targetHandle": "left"
})
edges.append({
"id": "e4",
"source": "json-extract-embedding",
"target": "transform-prepare-search",
"sourceHandle": "right",
"targetHandle": "left"
})
edges.append({
"id": "e5",
"source": "transform-prepare-search",
"target": "vector-search",
"sourceHandle": "right",
"targetHandle": "left"
})
edges.append({
"id": "e6",
"source": "vector-search",
"target": "transform-format-results",
"sourceHandle": "right",
"targetHandle": "left"
})
edges.append({
"id": "e7",
"source": "transform-format-results",
"target": "llm-answer",
"sourceHandle": "right",
"targetHandle": "left"
})
edges.append({
"id": "e8",
"source": "llm-answer",
"target": "json-extract-answer",
"sourceHandle": "right",
"targetHandle": "left"
})
edges.append({
"id": "e9",
"source": "json-extract-answer",
"target": "end-1",
"sourceHandle": "right",
"targetHandle": "left"
})
return {
"name": "知识库问答助手",
"description": "基于向量数据库和RAG技术的知识库问答系统支持语义搜索和智能回答。需要先使用向量数据库节点将知识库文档向量化并存储。",
"workflow_config": {"nodes": nodes, "edges": edges}
}
def main():
"""主函数"""
db = SessionLocal()
try:
# 获取或创建测试用户
user = db.query(User).first()
if not user:
print("❌ 未找到用户,请先创建用户")
return
print(f"📝 使用用户: {user.username} (ID: {user.id})")
# 生成Agent数据
agent_data = generate_knowledge_base_qa_agent(db, user)
# 检查是否已存在
existing = db.query(Agent).filter(
Agent.name == agent_data["name"],
Agent.user_id == user.id
).first()
if existing:
print(f"Agent '{agent_data['name']}' 已存在,跳过创建")
return
# 创建Agent
agent = Agent(
name=agent_data["name"],
description=agent_data["description"],
workflow_config=agent_data["workflow_config"],
user_id=user.id,
status="draft"
)
db.add(agent)
db.commit()
db.refresh(agent)
print(f"✅ 成功创建Agent: {agent.name} (ID: {agent.id})")
print(f" 节点数量: {len(agent_data['workflow_config']['nodes'])}")
print(f" 连接数量: {len(agent_data['workflow_config']['edges'])}")
print(f"\n📝 使用说明:")
print(f" 1. 在Agent管理页面找到 '{agent.name}'")
print(f" 2. 点击'设计'按钮进入工作流编辑器")
print(f" 3. 配置HTTP节点的API密钥DeepSeek API Key")
print(f" 4. 使用向量数据库节点将知识库文档向量化并存储到 'knowledge_base' 集合")
print(f" 5. 点击'发布'按钮发布Agent")
print(f" 6. 点击'使用'按钮测试问答功能")
print(f"\n💡 提示:")
print(f" - 知识库文档需要先通过向量数据库节点的 'upsert' 操作存储")
print(f" - 每个文档需要包含 'text''embedding' 字段")
print(f" - 可以使用HTTP节点调用embedding API将文档文本转为向量")
except Exception as e:
print(f"❌ 创建Agent失败: {str(e)}")
import traceback
traceback.print_exc()
db.rollback()
finally:
db.close()
if __name__ == "__main__":
main()