Files
aiagent/backend/scripts/generate_test_workflow.py

360 lines
13 KiB
Python
Raw Permalink Normal View History

2026-01-19 00:09:36 +08:00
"""
生成测试工作流 - 包含多个节点的完整工作流
用于验证工作流功能
"""
import sys
import os
from pathlib import Path
import json
from datetime import datetime
# 添加项目根目录到路径
sys.path.insert(0, str(Path(__file__).parent.parent))
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from app.core.config import settings
from app.models.workflow import Workflow
from app.models.user import User
def generate_test_workflow():
"""生成测试工作流"""
# 创建数据库连接
engine = create_engine(settings.DATABASE_URL)
Session = sessionmaker(bind=engine)
session = Session()
try:
# 获取第一个用户
user = session.query(User).first()
if not user:
print("❌ 错误:数据库中没有用户,请先创建用户")
return
print(f"✅ 使用用户: {user.username} (ID: {user.id})")
# 定义测试工作流的节点
nodes = [
{
"id": "start-1",
"type": "start",
"position": {"x": 100, "y": 100},
"data": {
"label": "开始",
"description": "工作流开始节点"
}
},
{
"id": "llm-1",
"type": "llm",
"position": {"x": 300, "y": 100},
"data": {
"label": "LLM处理",
"provider": "deepseek",
"model": "deepseek-chat",
"prompt": "请分析用户输入的内容:{input}\n\n要求:\n1. 提取关键信息\n2. 判断内容类型\n3. 返回JSON格式{\"type\": \"类型\", \"keywords\": [\"关键词1\", \"关键词2\"], \"sentiment\": \"情感\"}",
"temperature": 0.7,
"max_tokens": 1500
}
},
{
"id": "condition-1",
"type": "condition",
"position": {"x": 500, "y": 100},
"data": {
"label": "判断类型",
"condition": "JSON.parse(input).type === 'question'"
}
},
{
"id": "llm-2",
"type": "llm",
"position": {"x": 700, "y": 50},
"data": {
"label": "回答问题",
"provider": "deepseek",
"model": "deepseek-chat",
"prompt": "用户提出了一个问题,请提供详细、准确的回答。\n\n问题:{input}\n\n要求:\n1. 回答要准确、专业\n2. 如果涉及技术问题,提供代码示例\n3. 回答要友好、易懂",
"temperature": 0.7,
"max_tokens": 2000
}
},
{
"id": "llm-3",
"type": "llm",
"position": {"x": 700, "y": 150},
"data": {
"label": "处理其他",
"provider": "deepseek",
"model": "deepseek-chat",
"prompt": "用户输入了其他类型的内容,请进行适当的处理。\n\n内容:{input}\n\n要求:\n1. 理解用户意图\n2. 提供合适的响应\n3. 保持友好和专业",
"temperature": 0.7,
"max_tokens": 1500
}
},
{
"id": "transform-1",
"type": "transform",
"position": {"x": 900, "y": 100},
"data": {
"label": "格式化输出",
"transform": "const data = typeof input === 'string' ? JSON.parse(input) : input;\nreturn JSON.stringify({\n success: true,\n result: data,\n timestamp: new Date().toISOString()\n}, null, 2);"
}
},
{
"id": "end-1",
"type": "end",
"position": {"x": 1100, "y": 100},
"data": {
"label": "结束",
"description": "工作流结束节点"
}
}
]
# 定义边(连接)
edges = [
{
"id": "edge-start-llm1",
"source": "start-1",
"target": "llm-1",
"sourceHandle": "bottom",
"targetHandle": "top",
"type": "smoothstep"
},
{
"id": "edge-llm1-condition",
"source": "llm-1",
"target": "condition-1",
"sourceHandle": "bottom",
"targetHandle": "top",
"type": "smoothstep"
},
{
"id": "edge-condition-llm2",
"source": "condition-1",
"target": "llm-2",
"sourceHandle": "top",
"targetHandle": "left",
"type": "smoothstep",
"label": "是问题"
},
{
"id": "edge-condition-llm3",
"source": "condition-1",
"target": "llm-3",
"sourceHandle": "bottom",
"targetHandle": "left",
"type": "smoothstep",
"label": "其他"
},
{
"id": "edge-llm2-transform",
"source": "llm-2",
"target": "transform-1",
"sourceHandle": "right",
"targetHandle": "left",
"type": "smoothstep"
},
{
"id": "edge-llm3-transform",
"source": "llm-3",
"target": "transform-1",
"sourceHandle": "right",
"targetHandle": "left",
"type": "smoothstep"
},
{
"id": "edge-transform-end",
"source": "transform-1",
"target": "end-1",
"sourceHandle": "right",
"targetHandle": "left",
"type": "smoothstep"
}
]
# 检查是否已存在同名工作流
existing = session.query(Workflow).filter(
Workflow.name == "智能问答工作流(测试)",
Workflow.user_id == user.id
).first()
if existing:
print(f"⏭️ 工作流已存在,更新现有工作流...")
existing.nodes = nodes
existing.edges = edges
existing.description = "一个包含多个节点的测试工作流用于验证工作流功能。包含开始节点、LLM节点、条件判断、数据转换、结束节点"
existing.status = "draft"
existing.updated_at = datetime.now()
workflow = existing
else:
# 创建新工作流
workflow = Workflow(
name="智能问答工作流(测试)",
description="一个包含多个节点的测试工作流用于验证工作流功能。包含开始节点、LLM节点、条件判断、数据转换、结束节点",
nodes=nodes,
edges=edges,
status="draft",
user_id=user.id
)
session.add(workflow)
session.commit()
session.refresh(workflow)
print(f"\n✅ 测试工作流创建成功!")
print(f" - 工作流ID: {workflow.id}")
print(f" - 工作流名称: {workflow.name}")
print(f" - 节点数量: {len(nodes)}")
print(f" - 连接数量: {len(edges)}")
print(f"\n📋 节点列表:")
for i, node in enumerate(nodes, 1):
print(f" {i}. {node['data']['label']} ({node['type']})")
print(f"\n🔗 连接关系:")
for i, edge in enumerate(edges, 1):
source_node = next(n for n in nodes if n['id'] == edge['source'])
target_node = next(n for n in nodes if n['id'] == edge['target'])
label = edge.get('label', '')
print(f" {i}. {source_node['data']['label']}{target_node['data']['label']} {label}")
print(f"\n💡 使用说明:")
print(f" 1. 在工作流列表中点击'编辑'按钮")
print(f" 2. 可以查看和修改工作流配置")
print(f" 3. 点击'运行'按钮测试工作流")
print(f" 4. 输入测试数据,例如:'什么是Python'")
except Exception as e:
session.rollback()
print(f"❌ 创建失败: {e}")
import traceback
traceback.print_exc()
finally:
session.close()
def generate_simple_test_workflow():
"""生成简单的测试工作流(仅包含基本节点)"""
# 创建数据库连接
engine = create_engine(settings.DATABASE_URL)
Session = sessionmaker(bind=engine)
session = Session()
try:
# 获取第一个用户
user = session.query(User).first()
if not user:
print("❌ 错误:数据库中没有用户,请先创建用户")
return
print(f"✅ 使用用户: {user.username} (ID: {user.id})")
# 定义简单工作流的节点
nodes = [
{
"id": "start-1",
"type": "start",
"position": {"x": 100, "y": 100},
"data": {
"label": "开始",
"description": "工作流开始"
}
},
{
"id": "llm-1",
"type": "llm",
"position": {"x": 300, "y": 100},
"data": {
"label": "LLM处理",
"provider": "deepseek",
"model": "deepseek-chat",
"prompt": "请回答用户的问题:{input}\n\n要求:\n1. 回答要准确、专业\n2. 语言要友好、易懂\n3. 如果问题不清楚,可以询问更多信息",
"temperature": 0.7,
"max_tokens": 1500
}
},
{
"id": "end-1",
"type": "end",
"position": {"x": 500, "y": 100},
"data": {
"label": "结束",
"description": "工作流结束"
}
}
]
# 定义边(连接)
edges = [
{
"id": "edge-start-llm",
"source": "start-1",
"target": "llm-1",
"sourceHandle": "bottom",
"targetHandle": "top",
"type": "smoothstep"
},
{
"id": "edge-llm-end",
"source": "llm-1",
"target": "end-1",
"sourceHandle": "right",
"targetHandle": "left",
"type": "smoothstep"
}
]
# 检查是否已存在同名工作流
existing = session.query(Workflow).filter(
Workflow.name == "简单问答工作流(测试)",
Workflow.user_id == user.id
).first()
if existing:
print(f"⏭️ 工作流已存在,更新现有工作流...")
existing.nodes = nodes
existing.edges = edges
existing.description = "一个简单的测试工作流包含开始、LLM处理、结束三个节点"
existing.status = "draft"
existing.updated_at = datetime.now()
workflow = existing
else:
# 创建新工作流
workflow = Workflow(
name="简单问答工作流(测试)",
description="一个简单的测试工作流包含开始、LLM处理、结束三个节点",
nodes=nodes,
edges=edges,
status="draft",
user_id=user.id
)
session.add(workflow)
session.commit()
session.refresh(workflow)
print(f"\n✅ 简单测试工作流创建成功!")
print(f" - 工作流ID: {workflow.id}")
print(f" - 工作流名称: {workflow.name}")
print(f" - 节点数量: {len(nodes)}")
print(f" - 连接数量: {len(edges)}")
except Exception as e:
session.rollback()
print(f"❌ 创建失败: {e}")
import traceback
traceback.print_exc()
finally:
session.close()
if __name__ == '__main__':
print("=" * 60)
print("生成复杂测试工作流(包含条件判断)")
print("=" * 60)
generate_test_workflow()
print("\n" + "=" * 60)
print("生成简单测试工作流(仅基本节点)")
print("=" * 60)
generate_simple_test_workflow()