#!/usr/bin/env python3 """ Flask 提示词大师 - 服务监控脚本 用于监控应用状态、性能和日志 """ import os import sys import time import json import logging import requests import psutil import threading from datetime import datetime, timedelta from pathlib import Path # 添加项目路径到 Python 路径 project_root = Path(__file__).parent sys.path.insert(0, str(project_root)) def setup_logging(): """配置监控日志""" log_dir = project_root / "logs" log_dir.mkdir(exist_ok=True) logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_dir / "monitor.log", encoding='utf-8'), logging.StreamHandler() ] ) return logging.getLogger(__name__) class ServiceMonitor: """服务监控类""" def __init__(self): self.logger = setup_logging() self.app_url = "http://localhost:5000" self.monitor_interval = 30 # 监控间隔(秒) self.alert_thresholds = { 'cpu_percent': 80, # CPU 使用率阈值 'memory_percent': 80, # 内存使用率阈值 'response_time': 2.0, # 响应时间阈值(秒) 'error_rate': 0.1 # 错误率阈值 } self.stats = { 'start_time': datetime.now(), 'total_requests': 0, 'successful_requests': 0, 'failed_requests': 0, 'avg_response_time': 0, 'last_check': None } def check_health(self): """检查应用健康状态""" try: start_time = time.time() response = requests.get(f"{self.app_url}/health", timeout=5) response_time = time.time() - start_time self.stats['total_requests'] += 1 self.stats['last_check'] = datetime.now() if response.status_code == 200: self.stats['successful_requests'] += 1 health_data = response.json() self.logger.info(f"健康检查成功 - 响应时间: {response_time:.2f}s") self.logger.info(f"应用状态: {health_data.get('status', 'unknown')}") self.logger.info(f"运行环境: {health_data.get('environment', 'unknown')}") # 更新平均响应时间 if self.stats['avg_response_time'] == 0: self.stats['avg_response_time'] = response_time else: self.stats['avg_response_time'] = (self.stats['avg_response_time'] + response_time) / 2 # 检查响应时间阈值 if response_time > self.alert_thresholds['response_time']: self.logger.warning(f"响应时间过长: {response_time:.2f}s") return True, health_data else: self.stats['failed_requests'] += 1 self.logger.error(f"健康检查失败 - 状态码: {response.status_code}") return False, None except requests.exceptions.RequestException as e: self.stats['failed_requests'] += 1 self.logger.error(f"健康检查异常: {e}") return False, None def check_system_resources(self): """检查系统资源使用情况""" try: # CPU 使用率 cpu_percent = psutil.cpu_percent(interval=1) # 内存使用率 memory = psutil.virtual_memory() memory_percent = memory.percent # 磁盘使用率 disk = psutil.disk_usage('/') disk_percent = disk.percent # 网络连接数 connections = len(psutil.net_connections()) self.logger.info(f"系统资源 - CPU: {cpu_percent}%, 内存: {memory_percent}%, 磁盘: {disk_percent}%") self.logger.info(f"网络连接数: {connections}") # 检查阈值 if cpu_percent > self.alert_thresholds['cpu_percent']: self.logger.warning(f"CPU 使用率过高: {cpu_percent}%") if memory_percent > self.alert_thresholds['memory_percent']: self.logger.warning(f"内存使用率过高: {memory_percent}%") return { 'cpu_percent': cpu_percent, 'memory_percent': memory_percent, 'disk_percent': disk_percent, 'connections': connections } except Exception as e: self.logger.error(f"系统资源检查异常: {e}") return None def check_log_files(self): """检查日志文件""" try: log_dir = project_root / "logs" if not log_dir.exists(): self.logger.warning("日志目录不存在") return log_files = list(log_dir.glob("*.log")) total_size = sum(f.stat().st_size for f in log_files) self.logger.info(f"日志文件数量: {len(log_files)}, 总大小: {total_size / 1024 / 1024:.2f}MB") # 检查最近的错误日志 for log_file in log_files: if log_file.name in ['monitor.log']: continue try: # 读取最后几行日志 with open(log_file, 'r', encoding='utf-8') as f: lines = f.readlines() recent_lines = lines[-10:] # 最近10行 # 检查是否有错误 error_lines = [line for line in recent_lines if 'ERROR' in line or 'CRITICAL' in line] if error_lines: self.logger.warning(f"发现错误日志 - {log_file.name}:") for error_line in error_lines[-3:]: # 显示最近3个错误 self.logger.warning(f" {error_line.strip()}") except Exception as e: self.logger.error(f"读取日志文件失败 {log_file}: {e}") except Exception as e: self.logger.error(f"日志文件检查异常: {e}") def generate_report(self): """生成监控报告""" try: uptime = datetime.now() - self.stats['start_time'] success_rate = (self.stats['successful_requests'] / max(self.stats['total_requests'], 1)) * 100 report = { 'timestamp': datetime.now().isoformat(), 'uptime': str(uptime), 'total_requests': self.stats['total_requests'], 'successful_requests': self.stats['successful_requests'], 'failed_requests': self.stats['failed_requests'], 'success_rate': f"{success_rate:.2f}%", 'avg_response_time': f"{self.stats['avg_response_time']:.2f}s", 'last_check': self.stats['last_check'].isoformat() if self.stats['last_check'] else None } # 保存报告 report_file = project_root / "logs" / "monitor_report.json" with open(report_file, 'w', encoding='utf-8') as f: json.dump(report, f, indent=2, ensure_ascii=False) self.logger.info("监控报告已生成") return report except Exception as e: self.logger.error(f"生成监控报告失败: {e}") return None def run_monitoring(self): """运行监控循环""" self.logger.info("开始服务监控...") while True: try: # 健康检查 health_ok, health_data = self.check_health() # 系统资源检查 system_resources = self.check_system_resources() # 日志文件检查 self.check_log_files() # 生成报告 if self.stats['total_requests'] % 10 == 0: # 每10次检查生成一次报告 self.generate_report() # 等待下次检查 time.sleep(self.monitor_interval) except KeyboardInterrupt: self.logger.info("监控已停止") break except Exception as e: self.logger.error(f"监控循环异常: {e}") time.sleep(self.monitor_interval) def main(): """主函数""" if len(sys.argv) == 1: # 运行监控 monitor = ServiceMonitor() monitor.run_monitoring() elif sys.argv[1] == "check": # 单次检查 monitor = ServiceMonitor() health_ok, health_data = monitor.check_health() system_resources = monitor.check_system_resources() monitor.check_log_files() if health_ok: print("✅ 服务运行正常") else: print("❌ 服务运行异常") elif sys.argv[1] == "report": # 生成报告 monitor = ServiceMonitor() report = monitor.generate_report() if report: print("📊 监控报告:") for key, value in report.items(): print(f" {key}: {value}") else: print("用法:") print(" python monitor_service.py # 运行监控") print(" python monitor_service.py check # 单次检查") print(" python monitor_service.py report # 生成报告") if __name__ == '__main__': main()