#!/usr/bin/env python3 """ Flask 提示词大师 - 简化服务监控脚本 用于监控应用状态和日志 """ import os import sys import time import json import logging import requests from datetime import datetime from pathlib import Path # 添加项目路径到 Python 路径 project_root = Path(__file__).parent sys.path.insert(0, str(project_root)) def setup_logging(): """配置监控日志""" log_dir = project_root / "logs" log_dir.mkdir(exist_ok=True) logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_dir / "monitor.log", encoding='utf-8'), logging.StreamHandler() ] ) return logging.getLogger(__name__) class SimpleServiceMonitor: """简化服务监控类""" def __init__(self): self.logger = setup_logging() self.app_url = "http://localhost:5000" self.monitor_interval = 30 # 监控间隔(秒) self.stats = { 'start_time': datetime.now(), 'total_requests': 0, 'successful_requests': 0, 'failed_requests': 0, 'avg_response_time': 0, 'last_check': None } def check_health(self): """检查应用健康状态""" try: start_time = time.time() response = requests.get(f"{self.app_url}/health", timeout=5) response_time = time.time() - start_time self.stats['total_requests'] += 1 self.stats['last_check'] = datetime.now() if response.status_code == 200: self.stats['successful_requests'] += 1 health_data = response.json() self.logger.info(f"健康检查成功 - 响应时间: {response_time:.2f}s") self.logger.info(f"应用状态: {health_data.get('status', 'unknown')}") self.logger.info(f"运行环境: {health_data.get('environment', 'unknown')}") # 更新平均响应时间 if self.stats['avg_response_time'] == 0: self.stats['avg_response_time'] = response_time else: self.stats['avg_response_time'] = (self.stats['avg_response_time'] + response_time) / 2 # 检查响应时间阈值 if response_time > 2.0: self.logger.warning(f"响应时间过长: {response_time:.2f}s") return True, health_data else: self.stats['failed_requests'] += 1 self.logger.error(f"健康检查失败 - 状态码: {response.status_code}") return False, None except requests.exceptions.RequestException as e: self.stats['failed_requests'] += 1 self.logger.error(f"健康检查异常: {e}") return False, None def check_log_files(self): """检查日志文件""" try: log_dir = project_root / "logs" if not log_dir.exists(): self.logger.warning("⚠️ 日志目录不存在") return log_files = list(log_dir.glob("*.log")) total_size = sum(f.stat().st_size for f in log_files) self.logger.info(f"日志文件数量: {len(log_files)}, 总大小: {total_size / 1024 / 1024:.2f}MB") # 检查最近的错误日志 for log_file in log_files: if log_file.name in ['monitor.log']: continue try: # 读取最后几行日志 with open(log_file, 'r', encoding='utf-8') as f: lines = f.readlines() recent_lines = lines[-10:] # 最近10行 # 检查是否有错误 error_lines = [line for line in recent_lines if 'ERROR' in line or 'CRITICAL' in line] if error_lines: self.logger.warning(f"发现错误日志 - {log_file.name}:") for error_line in error_lines[-3:]: # 显示最近3个错误 self.logger.warning(f" {error_line.strip()}") except Exception as e: self.logger.error(f"读取日志文件失败 {log_file}: {e}") except Exception as e: self.logger.error(f"日志文件检查异常: {e}") def generate_report(self): """生成监控报告""" try: uptime = datetime.now() - self.stats['start_time'] success_rate = (self.stats['successful_requests'] / max(self.stats['total_requests'], 1)) * 100 report = { 'timestamp': datetime.now().isoformat(), 'uptime': str(uptime), 'total_requests': self.stats['total_requests'], 'successful_requests': self.stats['successful_requests'], 'failed_requests': self.stats['failed_requests'], 'success_rate': f"{success_rate:.2f}%", 'avg_response_time': f"{self.stats['avg_response_time']:.2f}s", 'last_check': self.stats['last_check'].isoformat() if self.stats['last_check'] else None } # 保存报告 report_file = project_root / "logs" / "monitor_report.json" with open(report_file, 'w', encoding='utf-8') as f: json.dump(report, f, indent=2, ensure_ascii=False) self.logger.info("监控报告已生成") return report except Exception as e: self.logger.error(f"生成监控报告失败: {e}") return None def run_monitoring(self): """运行监控循环""" self.logger.info("开始服务监控...") while True: try: # 健康检查 health_ok, health_data = self.check_health() # 日志文件检查 self.check_log_files() # 生成报告 if self.stats['total_requests'] % 10 == 0: # 每10次检查生成一次报告 self.generate_report() # 等待下次检查 time.sleep(self.monitor_interval) except KeyboardInterrupt: self.logger.info("监控已停止") break except Exception as e: self.logger.error(f"监控循环异常: {e}") time.sleep(self.monitor_interval) def main(): """主函数""" if len(sys.argv) == 1: # 运行监控 monitor = SimpleServiceMonitor() monitor.run_monitoring() elif sys.argv[1] == "check": # 单次检查 monitor = SimpleServiceMonitor() health_ok, health_data = monitor.check_health() monitor.check_log_files() if health_ok: print("服务运行正常") else: print("服务运行异常") elif sys.argv[1] == "report": # 生成报告 monitor = SimpleServiceMonitor() report = monitor.generate_report() if report: print("监控报告:") for key, value in report.items(): print(f" {key}: {value}") else: print("用法:") print(" python simple_monitor.py # 运行监控") print(" python simple_monitor.py check # 单次检查") print(" python simple_monitor.py report # 生成报告") if __name__ == '__main__': main()