Files
aitsc/deploy_history_to_tencent.py
2025-10-10 23:39:54 +08:00

517 lines
18 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
优化历史功能腾讯云数据库部署脚本
自动创建表结构、验证连接、执行数据迁移
"""
import pymysql
import sys
import os
from datetime import datetime
import json
# 腾讯云数据库配置
TENCENT_DB_CONFIG = {
'host': 'gz-cynosdbmysql-grp-d26pzce5.sql.tencentcdb.com',
'port': 24936,
'user': 'root',
'password': '!Rjb12191',
'database': 'pro_db',
'charset': 'utf8mb4'
}
def connect_to_tencent_db():
"""连接到腾讯云数据库"""
try:
connection = pymysql.connect(**TENCENT_DB_CONFIG)
print("✅ 成功连接到腾讯云数据库")
return connection
except Exception as e:
print(f"❌ 连接腾讯云数据库失败: {str(e)}")
return None
def check_table_exists(cursor, table_name):
"""检查表是否存在"""
try:
cursor.execute(f"SHOW TABLES LIKE '{table_name}'")
result = cursor.fetchone()
return result is not None
except Exception as e:
print(f"❌ 检查表 {table_name} 失败: {str(e)}")
return False
def create_history_tables(cursor):
"""创建历史记录相关表"""
print("\n" + "="*50)
print("开始创建历史记录表结构")
print("="*50)
# 1. 创建历史记录主表
print("\n1. 创建 prompt_history 表...")
create_prompt_history_sql = """
CREATE TABLE IF NOT EXISTS prompt_history (
id INT PRIMARY KEY AUTO_INCREMENT COMMENT '主键ID',
user_id INT NOT NULL COMMENT '用户ID',
wx_user_id INT COMMENT '微信用户ID',
original_input TEXT NOT NULL COMMENT '原始输入',
generated_prompt TEXT NOT NULL COMMENT '生成的提示词',
template_id INT COMMENT '使用的模板ID',
template_name VARCHAR(100) COMMENT '模板名称',
generation_time INT COMMENT '生成耗时(毫秒)',
satisfaction_rating TINYINT COMMENT '满意度评分(1-5)',
tags JSON COMMENT '标签列表',
is_favorite BOOLEAN DEFAULT FALSE COMMENT '是否收藏',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
INDEX idx_user_id (user_id),
INDEX idx_wx_user_id (wx_user_id),
INDEX idx_created_at (created_at),
INDEX idx_template_id (template_id),
INDEX idx_is_favorite (is_favorite),
INDEX idx_satisfaction_rating (satisfaction_rating)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='提示词历史记录表'
"""
try:
cursor.execute(create_prompt_history_sql)
print(" ✅ prompt_history 表创建成功")
except Exception as e:
print(f" ❌ 创建 prompt_history 表失败: {str(e)}")
return False
# 2. 创建历史标签表
print("\n2. 创建 history_tags 表...")
create_history_tags_sql = """
CREATE TABLE IF NOT EXISTS history_tags (
id INT PRIMARY KEY AUTO_INCREMENT COMMENT '主键ID',
history_id INT NOT NULL COMMENT '历史记录ID',
tag_name VARCHAR(50) NOT NULL COMMENT '标签名称',
tag_type VARCHAR(20) DEFAULT 'custom' COMMENT '标签类型',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
INDEX idx_history_id (history_id),
INDEX idx_tag_name (tag_name),
INDEX idx_tag_type (tag_type)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='历史记录标签表'
"""
try:
cursor.execute(create_history_tags_sql)
print(" ✅ history_tags 表创建成功")
except Exception as e:
print(f" ❌ 创建 history_tags 表失败: {str(e)}")
return False
# 3. 创建用户统计表
print("\n3. 创建 user_statistics 表...")
create_user_statistics_sql = """
CREATE TABLE IF NOT EXISTS user_statistics (
id INT PRIMARY KEY AUTO_INCREMENT COMMENT '主键ID',
user_id INT UNIQUE NOT NULL COMMENT '用户ID',
total_generations INT DEFAULT 0 COMMENT '总生成数',
favorite_count INT DEFAULT 0 COMMENT '收藏数',
avg_rating DECIMAL(3,2) DEFAULT 0.00 COMMENT '平均评分',
last_generation_at TIMESTAMP NULL COMMENT '最后生成时间',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
INDEX idx_user_id (user_id),
INDEX idx_total_generations (total_generations),
INDEX idx_last_generation_at (last_generation_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='用户统计信息表'
"""
try:
cursor.execute(create_user_statistics_sql)
print(" ✅ user_statistics 表创建成功")
except Exception as e:
print(f" ❌ 创建 user_statistics 表失败: {str(e)}")
return False
return True
def create_foreign_keys(cursor):
"""创建外键约束"""
print("\n4. 创建外键约束...")
# 检查相关表是否存在
tables_to_check = ['user', 'wx_user', 'prompt_template']
existing_tables = []
for table in tables_to_check:
if check_table_exists(cursor, table):
existing_tables.append(table)
print(f" ✅ 找到表: {table}")
else:
print(f" ⚠️ 表不存在: {table}")
# 创建外键约束
foreign_keys = [
{
'name': 'fk_prompt_history_user_id',
'table': 'prompt_history',
'column': 'user_id',
'ref_table': 'user',
'ref_column': 'uid',
'on_delete': 'CASCADE'
},
{
'name': 'fk_prompt_history_wx_user_id',
'table': 'prompt_history',
'column': 'wx_user_id',
'ref_table': 'wx_user',
'ref_column': 'id',
'on_delete': 'SET NULL'
},
{
'name': 'fk_prompt_history_template_id',
'table': 'prompt_history',
'column': 'template_id',
'ref_table': 'prompt_template',
'ref_column': 'id',
'on_delete': 'SET NULL'
}
]
for fk in foreign_keys:
if fk['ref_table'] in existing_tables:
try:
sql = f"""
ALTER TABLE {fk['table']}
ADD CONSTRAINT {fk['name']}
FOREIGN KEY ({fk['column']})
REFERENCES {fk['ref_table']}({fk['ref_column']})
ON DELETE {fk['on_delete']}
"""
cursor.execute(sql)
print(f" ✅ 外键约束 {fk['name']} 创建成功")
except Exception as e:
print(f" ⚠️ 外键约束 {fk['name']} 创建失败: {str(e)}")
else:
print(f" ⚠️ 跳过外键约束 {fk['name']} (引用表 {fk['ref_table']} 不存在)")
def create_views(cursor):
"""创建统计视图"""
print("\n5. 创建统计视图...")
create_statistics_view_sql = """
CREATE OR REPLACE VIEW history_statistics AS
SELECT
h.user_id,
COUNT(*) as total_count,
COUNT(CASE WHEN h.is_favorite = TRUE THEN 1 END) as favorite_count,
AVG(h.satisfaction_rating) as avg_rating,
MAX(h.created_at) as last_generation_at,
COUNT(CASE WHEN h.created_at >= DATE_SUB(NOW(), INTERVAL 7 DAY) THEN 1 END) as week_count,
COUNT(CASE WHEN h.created_at >= DATE_SUB(NOW(), INTERVAL 30 DAY) THEN 1 END) as month_count
FROM prompt_history h
GROUP BY h.user_id
"""
try:
cursor.execute(create_statistics_view_sql)
print(" ✅ 统计视图 history_statistics 创建成功")
except Exception as e:
print(f" ❌ 创建统计视图失败: {str(e)}")
def create_stored_procedures(cursor):
"""创建存储过程"""
print("\n6. 创建存储过程...")
# 清理旧数据的存储过程
cleanup_procedure_sql = """
CREATE PROCEDURE CleanOldHistory(IN days_to_keep INT)
BEGIN
DECLARE EXIT HANDLER FOR SQLEXCEPTION
BEGIN
ROLLBACK;
RESIGNAL;
END;
START TRANSACTION;
-- 删除指定天数前的历史记录
DELETE FROM prompt_history
WHERE created_at < DATE_SUB(NOW(), INTERVAL days_to_keep DAY);
-- 更新用户统计
UPDATE user_statistics us
SET
total_generations = (
SELECT COUNT(*) FROM prompt_history h
WHERE h.user_id = us.user_id
),
favorite_count = (
SELECT COUNT(*) FROM prompt_history h
WHERE h.user_id = us.user_id AND h.is_favorite = TRUE
),
avg_rating = (
SELECT AVG(h.satisfaction_rating) FROM prompt_history h
WHERE h.user_id = us.user_id AND h.satisfaction_rating IS NOT NULL
),
last_generation_at = (
SELECT MAX(h.created_at) FROM prompt_history h
WHERE h.user_id = us.user_id
),
updated_at = NOW()
WHERE us.user_id IN (
SELECT DISTINCT user_id FROM prompt_history
);
COMMIT;
END
"""
try:
cursor.execute(cleanup_procedure_sql)
print(" ✅ 存储过程 CleanOldHistory 创建成功")
except Exception as e:
print(f" ❌ 创建存储过程失败: {str(e)}")
def create_triggers(cursor):
"""创建触发器"""
print("\n7. 创建触发器...")
# 插入触发器
insert_trigger_sql = """
CREATE TRIGGER update_user_stats_after_insert
AFTER INSERT ON prompt_history
FOR EACH ROW
BEGIN
INSERT INTO user_statistics (user_id, total_generations, favorite_count, avg_rating, last_generation_at)
VALUES (NEW.user_id, 1, IF(NEW.is_favorite, 1, 0), NEW.satisfaction_rating, NEW.created_at)
ON DUPLICATE KEY UPDATE
total_generations = total_generations + 1,
favorite_count = favorite_count + IF(NEW.is_favorite, 1, 0),
avg_rating = (
SELECT AVG(satisfaction_rating)
FROM prompt_history
WHERE user_id = NEW.user_id AND satisfaction_rating IS NOT NULL
),
last_generation_at = NEW.created_at,
updated_at = NOW();
END
"""
# 更新触发器
update_trigger_sql = """
CREATE TRIGGER update_user_stats_after_update
AFTER UPDATE ON prompt_history
FOR EACH ROW
BEGIN
UPDATE user_statistics
SET
total_generations = (
SELECT COUNT(*) FROM prompt_history WHERE user_id = NEW.user_id
),
favorite_count = (
SELECT COUNT(*) FROM prompt_history
WHERE user_id = NEW.user_id AND is_favorite = TRUE
),
avg_rating = (
SELECT AVG(satisfaction_rating) FROM prompt_history
WHERE user_id = NEW.user_id AND satisfaction_rating IS NOT NULL
),
updated_at = NOW()
WHERE user_id = NEW.user_id;
END
"""
# 删除触发器
delete_trigger_sql = """
CREATE TRIGGER update_user_stats_after_delete
AFTER DELETE ON prompt_history
FOR EACH ROW
BEGIN
UPDATE user_statistics
SET
total_generations = (
SELECT COUNT(*) FROM prompt_history WHERE user_id = OLD.user_id
),
favorite_count = (
SELECT COUNT(*) FROM prompt_history
WHERE user_id = OLD.user_id AND is_favorite = TRUE
),
avg_rating = (
SELECT AVG(satisfaction_rating) FROM prompt_history
WHERE user_id = OLD.user_id AND satisfaction_rating IS NOT NULL
),
updated_at = NOW()
WHERE user_id = OLD.user_id;
END
"""
triggers = [
("update_user_stats_after_insert", insert_trigger_sql),
("update_user_stats_after_update", update_trigger_sql),
("update_user_stats_after_delete", delete_trigger_sql)
]
for trigger_name, trigger_sql in triggers:
try:
cursor.execute(trigger_sql)
print(f" ✅ 触发器 {trigger_name} 创建成功")
except Exception as e:
print(f" ❌ 创建触发器 {trigger_name} 失败: {str(e)}")
def verify_deployment(cursor):
"""验证部署结果"""
print("\n" + "="*50)
print("验证部署结果")
print("="*50)
# 检查表是否存在
tables_to_check = ['prompt_history', 'history_tags', 'user_statistics']
for table in tables_to_check:
if check_table_exists(cursor, table):
print(f"✅ 表 {table} 存在")
# 获取表结构信息
try:
cursor.execute(f"DESCRIBE {table}")
columns = cursor.fetchall()
print(f" 列数: {len(columns)}")
except Exception as e:
print(f" ⚠️ 无法获取表结构: {str(e)}")
else:
print(f"❌ 表 {table} 不存在")
# 检查视图
try:
cursor.execute("SHOW TABLES LIKE 'history_statistics'")
if cursor.fetchone():
print("✅ 视图 history_statistics 存在")
else:
print("❌ 视图 history_statistics 不存在")
except Exception as e:
print(f"⚠️ 检查视图失败: {str(e)}")
# 检查存储过程
try:
cursor.execute("SHOW PROCEDURE STATUS WHERE Name = 'CleanOldHistory'")
if cursor.fetchone():
print("✅ 存储过程 CleanOldHistory 存在")
else:
print("❌ 存储过程 CleanOldHistory 不存在")
except Exception as e:
print(f"⚠️ 检查存储过程失败: {str(e)}")
def create_sample_data(cursor):
"""创建示例数据"""
print("\n8. 创建示例数据...")
# 检查是否已有数据
cursor.execute("SELECT COUNT(*) FROM prompt_history")
count = cursor.fetchone()[0]
if count > 0:
print(f" ⚠️ 表中已有 {count} 条记录,跳过示例数据创建")
return
# 创建示例历史记录
sample_data = [
{
'user_id': 1,
'original_input': '请帮我写一个关于人工智能的提示词',
'generated_prompt': '你是一位专业的人工智能专家,请详细分析人工智能的发展趋势、应用领域和未来前景。',
'template_id': 1,
'template_name': 'AI专家助手',
'generation_time': 1500,
'satisfaction_rating': 4,
'tags': '["AI", "技术", "分析"]',
'is_favorite': False
},
{
'user_id': 1,
'original_input': '帮我写一个产品经理的工作提示词',
'generated_prompt': '你是一位资深的产品经理,请从用户需求、市场分析、产品设计等角度提供专业建议。',
'template_id': 2,
'template_name': '产品经理助手',
'generation_time': 1200,
'satisfaction_rating': 5,
'tags': '["产品", "管理", "分析"]',
'is_favorite': True
}
]
for data in sample_data:
try:
sql = """
INSERT INTO prompt_history
(user_id, original_input, generated_prompt, template_id, template_name,
generation_time, satisfaction_rating, tags, is_favorite)
VALUES (%(user_id)s, %(original_input)s, %(generated_prompt)s, %(template_id)s,
%(template_name)s, %(generation_time)s, %(satisfaction_rating)s, %(tags)s, %(is_favorite)s)
"""
cursor.execute(sql, data)
print(f" ✅ 插入示例数据: {data['original_input'][:30]}...")
except Exception as e:
print(f" ❌ 插入示例数据失败: {str(e)}")
def main():
"""主函数"""
print("🚀 开始部署优化历史功能到腾讯云数据库")
print("="*60)
print(f"部署时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"目标数据库: {TENCENT_DB_CONFIG['host']}:{TENCENT_DB_CONFIG['port']}")
print(f"数据库名: {TENCENT_DB_CONFIG['database']}")
print("="*60)
# 连接数据库
connection = connect_to_tencent_db()
if not connection:
print("❌ 无法连接到数据库,部署终止")
sys.exit(1)
try:
cursor = connection.cursor()
# 执行部署步骤
if not create_history_tables(cursor):
print("❌ 创建表失败,部署终止")
return
create_foreign_keys(cursor)
create_views(cursor)
create_stored_procedures(cursor)
create_triggers(cursor)
create_sample_data(cursor)
# 提交所有更改
connection.commit()
print("\n✅ 所有数据库更改已提交")
# 验证部署
verify_deployment(cursor)
print("\n" + "="*60)
print("🎉 优化历史功能部署完成!")
print("="*60)
print("📋 部署摘要:")
print(" ✅ 历史记录表 (prompt_history)")
print(" ✅ 标签表 (history_tags)")
print(" ✅ 统计表 (user_statistics)")
print(" ✅ 统计视图 (history_statistics)")
print(" ✅ 存储过程 (CleanOldHistory)")
print(" ✅ 自动触发器")
print(" ✅ 示例数据")
print("\n🔗 访问地址:")
print(" 历史页面: http://localhost:5002/history")
print(" API接口: http://localhost:5002/api/history")
except Exception as e:
print(f"❌ 部署过程中发生错误: {str(e)}")
connection.rollback()
sys.exit(1)
finally:
cursor.close()
connection.close()
print("\n🔌 数据库连接已关闭")
if __name__ == "__main__":
main()