Files
aiagent/backend/scripts/generate_test_workflow.py
2026-01-19 00:09:36 +08:00

360 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
生成测试工作流 - 包含多个节点的完整工作流
用于验证工作流功能
"""
import sys
import os
from pathlib import Path
import json
from datetime import datetime
# 添加项目根目录到路径
sys.path.insert(0, str(Path(__file__).parent.parent))
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from app.core.config import settings
from app.models.workflow import Workflow
from app.models.user import User
def generate_test_workflow():
"""生成测试工作流"""
# 创建数据库连接
engine = create_engine(settings.DATABASE_URL)
Session = sessionmaker(bind=engine)
session = Session()
try:
# 获取第一个用户
user = session.query(User).first()
if not user:
print("❌ 错误:数据库中没有用户,请先创建用户")
return
print(f"✅ 使用用户: {user.username} (ID: {user.id})")
# 定义测试工作流的节点
nodes = [
{
"id": "start-1",
"type": "start",
"position": {"x": 100, "y": 100},
"data": {
"label": "开始",
"description": "工作流开始节点"
}
},
{
"id": "llm-1",
"type": "llm",
"position": {"x": 300, "y": 100},
"data": {
"label": "LLM处理",
"provider": "deepseek",
"model": "deepseek-chat",
"prompt": "请分析用户输入的内容:{input}\n\n要求:\n1. 提取关键信息\n2. 判断内容类型\n3. 返回JSON格式{\"type\": \"类型\", \"keywords\": [\"关键词1\", \"关键词2\"], \"sentiment\": \"情感\"}",
"temperature": 0.7,
"max_tokens": 1500
}
},
{
"id": "condition-1",
"type": "condition",
"position": {"x": 500, "y": 100},
"data": {
"label": "判断类型",
"condition": "JSON.parse(input).type === 'question'"
}
},
{
"id": "llm-2",
"type": "llm",
"position": {"x": 700, "y": 50},
"data": {
"label": "回答问题",
"provider": "deepseek",
"model": "deepseek-chat",
"prompt": "用户提出了一个问题,请提供详细、准确的回答。\n\n问题:{input}\n\n要求:\n1. 回答要准确、专业\n2. 如果涉及技术问题,提供代码示例\n3. 回答要友好、易懂",
"temperature": 0.7,
"max_tokens": 2000
}
},
{
"id": "llm-3",
"type": "llm",
"position": {"x": 700, "y": 150},
"data": {
"label": "处理其他",
"provider": "deepseek",
"model": "deepseek-chat",
"prompt": "用户输入了其他类型的内容,请进行适当的处理。\n\n内容:{input}\n\n要求:\n1. 理解用户意图\n2. 提供合适的响应\n3. 保持友好和专业",
"temperature": 0.7,
"max_tokens": 1500
}
},
{
"id": "transform-1",
"type": "transform",
"position": {"x": 900, "y": 100},
"data": {
"label": "格式化输出",
"transform": "const data = typeof input === 'string' ? JSON.parse(input) : input;\nreturn JSON.stringify({\n success: true,\n result: data,\n timestamp: new Date().toISOString()\n}, null, 2);"
}
},
{
"id": "end-1",
"type": "end",
"position": {"x": 1100, "y": 100},
"data": {
"label": "结束",
"description": "工作流结束节点"
}
}
]
# 定义边(连接)
edges = [
{
"id": "edge-start-llm1",
"source": "start-1",
"target": "llm-1",
"sourceHandle": "bottom",
"targetHandle": "top",
"type": "smoothstep"
},
{
"id": "edge-llm1-condition",
"source": "llm-1",
"target": "condition-1",
"sourceHandle": "bottom",
"targetHandle": "top",
"type": "smoothstep"
},
{
"id": "edge-condition-llm2",
"source": "condition-1",
"target": "llm-2",
"sourceHandle": "top",
"targetHandle": "left",
"type": "smoothstep",
"label": "是问题"
},
{
"id": "edge-condition-llm3",
"source": "condition-1",
"target": "llm-3",
"sourceHandle": "bottom",
"targetHandle": "left",
"type": "smoothstep",
"label": "其他"
},
{
"id": "edge-llm2-transform",
"source": "llm-2",
"target": "transform-1",
"sourceHandle": "right",
"targetHandle": "left",
"type": "smoothstep"
},
{
"id": "edge-llm3-transform",
"source": "llm-3",
"target": "transform-1",
"sourceHandle": "right",
"targetHandle": "left",
"type": "smoothstep"
},
{
"id": "edge-transform-end",
"source": "transform-1",
"target": "end-1",
"sourceHandle": "right",
"targetHandle": "left",
"type": "smoothstep"
}
]
# 检查是否已存在同名工作流
existing = session.query(Workflow).filter(
Workflow.name == "智能问答工作流(测试)",
Workflow.user_id == user.id
).first()
if existing:
print(f"⏭️ 工作流已存在,更新现有工作流...")
existing.nodes = nodes
existing.edges = edges
existing.description = "一个包含多个节点的测试工作流用于验证工作流功能。包含开始节点、LLM节点、条件判断、数据转换、结束节点"
existing.status = "draft"
existing.updated_at = datetime.now()
workflow = existing
else:
# 创建新工作流
workflow = Workflow(
name="智能问答工作流(测试)",
description="一个包含多个节点的测试工作流用于验证工作流功能。包含开始节点、LLM节点、条件判断、数据转换、结束节点",
nodes=nodes,
edges=edges,
status="draft",
user_id=user.id
)
session.add(workflow)
session.commit()
session.refresh(workflow)
print(f"\n✅ 测试工作流创建成功!")
print(f" - 工作流ID: {workflow.id}")
print(f" - 工作流名称: {workflow.name}")
print(f" - 节点数量: {len(nodes)}")
print(f" - 连接数量: {len(edges)}")
print(f"\n📋 节点列表:")
for i, node in enumerate(nodes, 1):
print(f" {i}. {node['data']['label']} ({node['type']})")
print(f"\n🔗 连接关系:")
for i, edge in enumerate(edges, 1):
source_node = next(n for n in nodes if n['id'] == edge['source'])
target_node = next(n for n in nodes if n['id'] == edge['target'])
label = edge.get('label', '')
print(f" {i}. {source_node['data']['label']}{target_node['data']['label']} {label}")
print(f"\n💡 使用说明:")
print(f" 1. 在工作流列表中点击'编辑'按钮")
print(f" 2. 可以查看和修改工作流配置")
print(f" 3. 点击'运行'按钮测试工作流")
print(f" 4. 输入测试数据,例如:'什么是Python'")
except Exception as e:
session.rollback()
print(f"❌ 创建失败: {e}")
import traceback
traceback.print_exc()
finally:
session.close()
def generate_simple_test_workflow():
"""生成简单的测试工作流(仅包含基本节点)"""
# 创建数据库连接
engine = create_engine(settings.DATABASE_URL)
Session = sessionmaker(bind=engine)
session = Session()
try:
# 获取第一个用户
user = session.query(User).first()
if not user:
print("❌ 错误:数据库中没有用户,请先创建用户")
return
print(f"✅ 使用用户: {user.username} (ID: {user.id})")
# 定义简单工作流的节点
nodes = [
{
"id": "start-1",
"type": "start",
"position": {"x": 100, "y": 100},
"data": {
"label": "开始",
"description": "工作流开始"
}
},
{
"id": "llm-1",
"type": "llm",
"position": {"x": 300, "y": 100},
"data": {
"label": "LLM处理",
"provider": "deepseek",
"model": "deepseek-chat",
"prompt": "请回答用户的问题:{input}\n\n要求:\n1. 回答要准确、专业\n2. 语言要友好、易懂\n3. 如果问题不清楚,可以询问更多信息",
"temperature": 0.7,
"max_tokens": 1500
}
},
{
"id": "end-1",
"type": "end",
"position": {"x": 500, "y": 100},
"data": {
"label": "结束",
"description": "工作流结束"
}
}
]
# 定义边(连接)
edges = [
{
"id": "edge-start-llm",
"source": "start-1",
"target": "llm-1",
"sourceHandle": "bottom",
"targetHandle": "top",
"type": "smoothstep"
},
{
"id": "edge-llm-end",
"source": "llm-1",
"target": "end-1",
"sourceHandle": "right",
"targetHandle": "left",
"type": "smoothstep"
}
]
# 检查是否已存在同名工作流
existing = session.query(Workflow).filter(
Workflow.name == "简单问答工作流(测试)",
Workflow.user_id == user.id
).first()
if existing:
print(f"⏭️ 工作流已存在,更新现有工作流...")
existing.nodes = nodes
existing.edges = edges
existing.description = "一个简单的测试工作流包含开始、LLM处理、结束三个节点"
existing.status = "draft"
existing.updated_at = datetime.now()
workflow = existing
else:
# 创建新工作流
workflow = Workflow(
name="简单问答工作流(测试)",
description="一个简单的测试工作流包含开始、LLM处理、结束三个节点",
nodes=nodes,
edges=edges,
status="draft",
user_id=user.id
)
session.add(workflow)
session.commit()
session.refresh(workflow)
print(f"\n✅ 简单测试工作流创建成功!")
print(f" - 工作流ID: {workflow.id}")
print(f" - 工作流名称: {workflow.name}")
print(f" - 节点数量: {len(nodes)}")
print(f" - 连接数量: {len(edges)}")
except Exception as e:
session.rollback()
print(f"❌ 创建失败: {e}")
import traceback
traceback.print_exc()
finally:
session.close()
if __name__ == '__main__':
print("=" * 60)
print("生成复杂测试工作流(包含条件判断)")
print("=" * 60)
generate_test_workflow()
print("\n" + "=" * 60)
print("生成简单测试工作流(仅基本节点)")
print("=" * 60)
generate_simple_test_workflow()