Files
aitsc/monitor_service.py

264 lines
9.8 KiB
Python
Raw Permalink Normal View History

2025-08-17 22:10:51 +08:00
#!/usr/bin/env python3
"""
Flask 提示词大师 - 服务监控脚本
用于监控应用状态性能和日志
"""
import os
import sys
import time
import json
import logging
import requests
import psutil
import threading
from datetime import datetime, timedelta
from pathlib import Path
# 添加项目路径到 Python 路径
project_root = Path(__file__).parent
sys.path.insert(0, str(project_root))
def setup_logging():
"""配置监控日志"""
log_dir = project_root / "logs"
log_dir.mkdir(exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_dir / "monitor.log", encoding='utf-8'),
logging.StreamHandler()
]
)
return logging.getLogger(__name__)
class ServiceMonitor:
"""服务监控类"""
def __init__(self):
self.logger = setup_logging()
self.app_url = "http://localhost:5000"
self.monitor_interval = 30 # 监控间隔(秒)
self.alert_thresholds = {
'cpu_percent': 80, # CPU 使用率阈值
'memory_percent': 80, # 内存使用率阈值
'response_time': 2.0, # 响应时间阈值(秒)
'error_rate': 0.1 # 错误率阈值
}
self.stats = {
'start_time': datetime.now(),
'total_requests': 0,
'successful_requests': 0,
'failed_requests': 0,
'avg_response_time': 0,
'last_check': None
}
def check_health(self):
"""检查应用健康状态"""
try:
start_time = time.time()
response = requests.get(f"{self.app_url}/health", timeout=5)
response_time = time.time() - start_time
self.stats['total_requests'] += 1
self.stats['last_check'] = datetime.now()
if response.status_code == 200:
self.stats['successful_requests'] += 1
health_data = response.json()
self.logger.info(f"健康检查成功 - 响应时间: {response_time:.2f}s")
self.logger.info(f"应用状态: {health_data.get('status', 'unknown')}")
self.logger.info(f"运行环境: {health_data.get('environment', 'unknown')}")
# 更新平均响应时间
if self.stats['avg_response_time'] == 0:
self.stats['avg_response_time'] = response_time
else:
self.stats['avg_response_time'] = (self.stats['avg_response_time'] + response_time) / 2
# 检查响应时间阈值
if response_time > self.alert_thresholds['response_time']:
self.logger.warning(f"响应时间过长: {response_time:.2f}s")
return True, health_data
else:
self.stats['failed_requests'] += 1
self.logger.error(f"健康检查失败 - 状态码: {response.status_code}")
return False, None
except requests.exceptions.RequestException as e:
self.stats['failed_requests'] += 1
self.logger.error(f"健康检查异常: {e}")
return False, None
def check_system_resources(self):
"""检查系统资源使用情况"""
try:
# CPU 使用率
cpu_percent = psutil.cpu_percent(interval=1)
# 内存使用率
memory = psutil.virtual_memory()
memory_percent = memory.percent
# 磁盘使用率
disk = psutil.disk_usage('/')
disk_percent = disk.percent
# 网络连接数
connections = len(psutil.net_connections())
self.logger.info(f"系统资源 - CPU: {cpu_percent}%, 内存: {memory_percent}%, 磁盘: {disk_percent}%")
self.logger.info(f"网络连接数: {connections}")
# 检查阈值
if cpu_percent > self.alert_thresholds['cpu_percent']:
self.logger.warning(f"CPU 使用率过高: {cpu_percent}%")
if memory_percent > self.alert_thresholds['memory_percent']:
self.logger.warning(f"内存使用率过高: {memory_percent}%")
return {
'cpu_percent': cpu_percent,
'memory_percent': memory_percent,
'disk_percent': disk_percent,
'connections': connections
}
except Exception as e:
self.logger.error(f"系统资源检查异常: {e}")
return None
def check_log_files(self):
"""检查日志文件"""
try:
log_dir = project_root / "logs"
if not log_dir.exists():
self.logger.warning("日志目录不存在")
return
log_files = list(log_dir.glob("*.log"))
total_size = sum(f.stat().st_size for f in log_files)
self.logger.info(f"日志文件数量: {len(log_files)}, 总大小: {total_size / 1024 / 1024:.2f}MB")
# 检查最近的错误日志
for log_file in log_files:
if log_file.name in ['monitor.log']:
continue
try:
# 读取最后几行日志
with open(log_file, 'r', encoding='utf-8') as f:
lines = f.readlines()
recent_lines = lines[-10:] # 最近10行
# 检查是否有错误
error_lines = [line for line in recent_lines if 'ERROR' in line or 'CRITICAL' in line]
if error_lines:
self.logger.warning(f"发现错误日志 - {log_file.name}:")
for error_line in error_lines[-3:]: # 显示最近3个错误
self.logger.warning(f" {error_line.strip()}")
except Exception as e:
self.logger.error(f"读取日志文件失败 {log_file}: {e}")
except Exception as e:
self.logger.error(f"日志文件检查异常: {e}")
def generate_report(self):
"""生成监控报告"""
try:
uptime = datetime.now() - self.stats['start_time']
success_rate = (self.stats['successful_requests'] / max(self.stats['total_requests'], 1)) * 100
report = {
'timestamp': datetime.now().isoformat(),
'uptime': str(uptime),
'total_requests': self.stats['total_requests'],
'successful_requests': self.stats['successful_requests'],
'failed_requests': self.stats['failed_requests'],
'success_rate': f"{success_rate:.2f}%",
'avg_response_time': f"{self.stats['avg_response_time']:.2f}s",
'last_check': self.stats['last_check'].isoformat() if self.stats['last_check'] else None
}
# 保存报告
report_file = project_root / "logs" / "monitor_report.json"
with open(report_file, 'w', encoding='utf-8') as f:
json.dump(report, f, indent=2, ensure_ascii=False)
self.logger.info("监控报告已生成")
return report
except Exception as e:
self.logger.error(f"生成监控报告失败: {e}")
return None
def run_monitoring(self):
"""运行监控循环"""
self.logger.info("开始服务监控...")
while True:
try:
# 健康检查
health_ok, health_data = self.check_health()
# 系统资源检查
system_resources = self.check_system_resources()
# 日志文件检查
self.check_log_files()
# 生成报告
if self.stats['total_requests'] % 10 == 0: # 每10次检查生成一次报告
self.generate_report()
# 等待下次检查
time.sleep(self.monitor_interval)
except KeyboardInterrupt:
self.logger.info("监控已停止")
break
except Exception as e:
self.logger.error(f"监控循环异常: {e}")
time.sleep(self.monitor_interval)
def main():
"""主函数"""
if len(sys.argv) == 1:
# 运行监控
monitor = ServiceMonitor()
monitor.run_monitoring()
elif sys.argv[1] == "check":
# 单次检查
monitor = ServiceMonitor()
health_ok, health_data = monitor.check_health()
system_resources = monitor.check_system_resources()
monitor.check_log_files()
if health_ok:
print("✅ 服务运行正常")
else:
print("❌ 服务运行异常")
elif sys.argv[1] == "report":
# 生成报告
monitor = ServiceMonitor()
report = monitor.generate_report()
if report:
print("📊 监控报告:")
for key, value in report.items():
print(f" {key}: {value}")
else:
print("用法:")
print(" python monitor_service.py # 运行监控")
print(" python monitor_service.py check # 单次检查")
print(" python monitor_service.py report # 生成报告")
if __name__ == '__main__':
main()