""" 将localStorage中的优化历史数据迁移到腾讯云数据库 """ import json import os import sys from datetime import datetime # 添加项目路径 sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from flask import Flask from flask_sqlalchemy import SQLAlchemy from flask_login import LoginManager from src.flask_prompt_master.models.optimization_history import ( OptimizationHistory, OptimizationTag, UserUsageStats ) def create_app(): """创建Flask应用""" app = Flask(__name__) # 配置数据库 app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://username:password@host:port/database' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False # 初始化扩展 db = SQLAlchemy(app) login_manager = LoginManager() login_manager.init_app(app) return app, db def migrate_localStorage_data(): """迁移localStorage数据到数据库""" app, db = create_app() with app.app_context(): print("开始迁移localStorage数据到数据库...") # 模拟localStorage数据(实际使用时需要从浏览器获取) sample_localStorage_data = [ { "id": 1704672000000, "timestamp": "2024-01-08T12:00:00.000Z", "original": "请帮我写一个营销文案", "optimized": "以下是针对您需求的专业营销文案:\n\n【产品定位】\n明确产品核心价值主张...", "type": "提示词优化" }, { "id": 1704675600000, "timestamp": "2024-01-08T13:00:00.000Z", "original": "如何做金融投资", "optimized": "【专业金融投资提示框架】\n一、投资主体定位\n1. 请说明您的风险承受能力等级...", "type": "提示词优化" } ] migrated_count = 0 error_count = 0 for data in sample_localStorage_data: try: # 解析时间戳 timestamp = datetime.fromisoformat(data['timestamp'].replace('Z', '+00:00')) # 创建历史记录(假设用户ID为1,实际使用时需要根据登录用户确定) history = OptimizationHistory( user_id=1, # 实际使用时需要从登录用户获取 original_text=data['original'], optimized_text=data['optimized'], optimization_type=data.get('type', '提示词优化'), created_at=timestamp ) db.session.add(history) db.session.flush() # 获取ID # 添加默认标签 default_tags = ['逻辑强化', '场景适配', '结构优化'] for tag_name in default_tags: tag = OptimizationTag( history_id=history.id, tag_name=tag_name, tag_type='optimization' ) db.session.add(tag) migrated_count += 1 print(f"✓ 迁移记录 {migrated_count}: {data['original'][:30]}...") except Exception as e: error_count += 1 print(f"✗ 迁移失败: {str(e)}") # 提交所有更改 try: db.session.commit() print(f"\n迁移完成!") print(f"成功迁移: {migrated_count} 条记录") print(f"失败记录: {error_count} 条") except Exception as e: db.session.rollback() print(f"提交失败: {str(e)}") def create_sample_data(): """创建示例数据""" app, db = create_app() with app.app_context(): print("创建示例数据...") # 创建示例历史记录 sample_histories = [ { 'user_id': 1, 'original_text': '请帮我写一个产品介绍', 'optimized_text': '以下是专业的产品介绍模板:\n\n【产品概述】\n产品名称:[产品名称]\n核心功能:[主要功能]\n目标用户:[用户群体]\n\n【产品特色】\n1. 独特卖点1\n2. 独特卖点2\n3. 独特卖点3\n\n【使用场景】\n场景1:具体应用情况\n场景2:具体应用情况\n\n【技术优势】\n- 技术特点1\n- 技术特点2\n- 技术特点3', 'optimization_type': '产品文案', 'industry': '科技', 'profession': '产品经理', 'satisfaction_rating': 5 }, { 'user_id': 1, 'original_text': '如何写邮件', 'optimized_text': '【专业商务邮件写作指南】\n\n一、邮件结构\n1. 主题行:简洁明确,突出要点\n2. 称呼:根据关系选择正式或非正式\n3. 正文:逻辑清晰,分段明确\n4. 结尾:礼貌用语+签名\n\n二、写作要点\n- 语言简洁:避免冗长句子\n- 重点突出:使用加粗或列表\n- 语气恰当:根据收件人调整\n- 检查无误:发送前仔细校对\n\n三、常见类型\n1. 工作汇报邮件\n2. 会议邀请邮件\n3. 项目协调邮件\n4. 客户沟通邮件', 'optimization_type': '商务邮件', 'industry': '商务', 'profession': '行政助理', 'satisfaction_rating': 4 }, { 'user_id': 1, 'original_text': '数据库设计', 'optimized_text': '【数据库设计最佳实践】\n\n一、需求分析\n1. 业务需求梳理\n2. 数据关系分析\n3. 性能要求评估\n\n二、表结构设计\n- 主键设计:自增ID或UUID\n- 字段类型:选择合适的数据类型\n- 索引设计:提高查询性能\n- 约束设置:保证数据完整性\n\n三、优化策略\n1. 分库分表:应对大数据量\n2. 读写分离:提高并发性能\n3. 缓存策略:减少数据库压力\n4. 监控告警:及时发现问题', 'optimization_type': '技术文档', 'industry': 'IT', 'profession': '后端开发', 'satisfaction_rating': 5 } ] for i, history_data in enumerate(sample_histories): try: # 创建历史记录 history = OptimizationHistory(**history_data) db.session.add(history) db.session.flush() # 添加标签 tags = ['逻辑强化', '场景适配', '结构优化'] for tag_name in tags: tag = OptimizationTag( history_id=history.id, tag_name=tag_name, tag_type='optimization' ) db.session.add(tag) print(f"✓ 创建示例记录 {i+1}: {history_data['original_text'][:30]}...") except Exception as e: print(f"✗ 创建失败: {str(e)}") # 创建用户统计 try: from datetime import date stats = UserUsageStats( user_id=1, date=date.today(), generation_count=3, total_time_saved=15, avg_rating=4.7, total_ratings=3 ) db.session.add(stats) print("✓ 创建用户统计") except Exception as e: print(f"✗ 创建统计失败: {str(e)}") # 提交所有更改 try: db.session.commit() print("\n示例数据创建完成!") except Exception as e: db.session.rollback() print(f"提交失败: {str(e)}") def backup_localStorage_data(): """备份localStorage数据""" print("备份localStorage数据...") # 这里应该从浏览器获取localStorage数据 # 实际实现需要JavaScript代码从浏览器获取数据 backup_script = """ // 在浏览器控制台运行此代码来备份localStorage数据 function backupOptimizationHistory() { const history = JSON.parse(localStorage.getItem('optimization_history') || '[]'); const stats = JSON.parse(localStorage.getItem('usage_stats') || '{}'); const backup = { timestamp: new Date().toISOString(), history: history, stats: stats, count: history.length }; // 下载备份文件 const blob = new Blob([JSON.stringify(backup, null, 2)], {type: 'application/json'}); const url = URL.createObjectURL(blob); const a = document.createElement('a'); a.href = url; a.download = `optimization_history_backup_${new Date().toISOString().split('T')[0]}.json`; document.body.appendChild(a); a.click(); document.body.removeChild(a); URL.revokeObjectURL(url); console.log(`备份完成!共 ${history.length} 条记录`); return backup; } // 运行备份 backupOptimizationHistory(); """ print("请在浏览器控制台运行以下代码来备份数据:") print(backup_script) def main(): """主函数""" print("=== 优化历史数据迁移工具 ===") print("1. 备份localStorage数据") print("2. 创建示例数据") print("3. 迁移localStorage数据") print("4. 退出") while True: choice = input("\n请选择操作 (1-4): ").strip() if choice == '1': backup_localStorage_data() elif choice == '2': create_sample_data() elif choice == '3': migrate_localStorage_data() elif choice == '4': print("退出程序") break else: print("无效选择,请重新输入") if __name__ == '__main__': main()