Files
aitsc/scripts/port_monitor.py

400 lines
14 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/home/renjianbo/miniconda3/envs/myenv/bin/python
# -*- coding: utf-8 -*-
"""
端口监控脚本
用于监控Flask应用的端口占用和服务状态
"""
import os
import sys
import time
import json
import subprocess
import logging
from datetime import datetime
from pathlib import Path
# 配置
PORT = 5002
APP_NAME = "flask_prompt_master"
PID_FILE = "logs/gunicorn.pid"
LOG_FILE = "logs/port_monitor.log"
CONFIG_FILE = "logs/monitor_config.json"
ALERT_FILE = "logs/port_alerts.log"
# 日志配置
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(LOG_FILE),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class PortMonitor:
def __init__(self):
self.project_dir = Path(__file__).parent.parent
self.pid_file = self.project_dir / PID_FILE
self.config_file = self.project_dir / CONFIG_FILE
self.alert_file = self.project_dir / ALERT_FILE
# 确保日志目录存在
self.project_dir.mkdir(exist_ok=True)
(self.project_dir / "logs").mkdir(exist_ok=True)
# 加载配置
self.config = self.load_config()
def load_config(self):
"""加载监控配置"""
default_config = {
"check_interval": 30, # 检查间隔(秒)
"max_restart_attempts": 3, # 最大重启尝试次数
"alert_threshold": 2, # 告警阈值
"port_timeout": 5, # 端口检查超时时间
"enable_alerts": True, # 启用告警
"auto_restart": True, # 自动重启
}
if self.config_file.exists():
try:
with open(self.config_file, 'r', encoding='utf-8') as f:
config = json.load(f)
default_config.update(config)
except Exception as e:
logger.error("加载配置文件失败: {}".format(e))
return default_config
def save_config(self):
"""保存监控配置"""
try:
with open(self.config_file, 'w', encoding='utf-8') as f:
json.dump(self.config, f, indent=2, ensure_ascii=False)
except Exception as e:
logger.error("保存配置文件失败: {}".format(e))
def run_command(self, cmd, timeout=10):
"""执行命令"""
try:
result = subprocess.run(
cmd,
shell=True,
capture_output=True,
text=True,
timeout=timeout
)
return result.returncode == 0, result.stdout, result.stderr
except subprocess.TimeoutExpired:
return False, "", "命令执行超时"
except Exception as e:
return False, "", str(e)
def check_port_usage(self):
"""检查端口占用情况"""
cmd = f"ss -tlnp | grep ':{PORT} '"
success, stdout, stderr = self.run_command(cmd)
if success and stdout.strip():
# 解析进程信息
processes = []
for line in stdout.strip().split('\n'):
if line:
parts = line.split()
if len(parts) >= 7:
pid_info = parts[6]
if 'pid=' in pid_info:
pid = pid_info.split('pid=')[1].split(',')[0]
processes.append(pid)
return True, processes
else:
return False, []
def check_gunicorn_process(self):
"""检查Gunicorn进程状态"""
if not self.pid_file.exists():
return False, "PID文件不存在"
try:
with open(self.pid_file, 'r') as f:
pid = f.read().strip()
if not pid:
return False, "PID文件为空"
# 检查进程是否存在
cmd = f"ps -p {pid}"
success, stdout, stderr = self.run_command(cmd)
if success and stdout.strip():
# 检查工作进程数量
cmd = "ps aux | grep 'gunicorn.*run_dev:app' | grep -v grep | wc -l"
success, stdout, stderr = self.run_command(cmd)
if success:
worker_count = int(stdout.strip())
return True, f"主进程 {pid}, 工作进程 {worker_count}"
return False, f"进程 {pid} 不存在"
except Exception as e:
return False, "检查进程失败: {}".format(e)
def check_service_response(self):
"""检查服务响应"""
cmd = f"curl -s -o /dev/null -w '%{{http_code}}' http://localhost:{PORT}/"
success, stdout, stderr = self.run_command(cmd, timeout=self.config['port_timeout'])
if success and stdout.strip() == '200':
return True, "服务正常响应"
else:
return False, "服务无响应 (HTTP: {})".format(stdout.strip())
def get_system_info(self):
"""获取系统信息"""
info = {}
# CPU使用率
cmd = "top -bn1 | grep 'Cpu(s)' | awk '{print $2}' | cut -d'%' -f1"
success, stdout, stderr = self.run_command(cmd)
if success:
info['cpu_usage'] = float(stdout.strip())
# 内存使用率
cmd = "free | grep Mem | awk '{printf \"%.2f\", $3/$2 * 100.0}'"
success, stdout, stderr = self.run_command(cmd)
if success:
info['memory_usage'] = float(stdout.strip())
# 磁盘使用率
cmd = "df / | tail -1 | awk '{print $5}' | sed 's/%//'"
success, stdout, stderr = self.run_command(cmd)
if success:
info['disk_usage'] = float(stdout.strip())
return info
def log_alert(self, message, level="WARNING"):
"""记录告警信息"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
alert_msg = f"[{timestamp}] [{level}] {message}"
logger.warning(alert_msg)
if self.config['enable_alerts']:
try:
with open(self.alert_file, 'a', encoding='utf-8') as f:
f.write(alert_msg + '\n')
except Exception as e:
logger.error("写入告警文件失败: {}".format(e))
def restart_service(self):
"""重启服务"""
logger.info("尝试重启服务...")
# 停止服务
cmd = f"cd {self.project_dir} && ./scripts/port_manager.sh stop"
success, stdout, stderr = self.run_command(cmd)
if not success:
logger.error("停止服务失败: {}".format(stderr))
return False
time.sleep(2)
# 启动服务
cmd = f"cd {self.project_dir} && ./scripts/port_manager.sh start"
success, stdout, stderr = self.run_command(cmd)
if not success:
logger.error("启动服务失败: {}".format(stderr))
return False
logger.info("服务重启成功")
return True
def generate_report(self):
"""生成监控报告"""
report = {
"timestamp": datetime.now().isoformat(),
"port_usage": {},
"gunicorn_status": {},
"service_response": {},
"system_info": {},
"recommendations": []
}
# 检查端口占用
port_occupied, processes = self.check_port_usage()
report["port_usage"] = {
"occupied": port_occupied,
"processes": processes
}
# 检查Gunicorn进程
gunicorn_running, status = self.check_gunicorn_process()
report["gunicorn_status"] = {
"running": gunicorn_running,
"status": status
}
# 检查服务响应
service_ok, response = self.check_service_response()
report["service_response"] = {
"ok": service_ok,
"response": response
}
# 获取系统信息
report["system_info"] = self.get_system_info()
# 生成建议
if not port_occupied:
report["recommendations"].append("端口未被占用,可能需要启动服务")
if not gunicorn_running:
report["recommendations"].append("Gunicorn进程未运行需要重启服务")
if not service_ok:
report["recommendations"].append("服务无响应,需要检查服务状态")
# 检查系统资源
sys_info = report["system_info"]
if sys_info.get('cpu_usage', 0) > 80:
report["recommendations"].append("CPU使用率过高建议优化或扩容")
if sys_info.get('memory_usage', 0) > 90:
report["recommendations"].append("内存使用率过高,建议增加内存或优化应用")
if sys_info.get('disk_usage', 0) > 85:
report["recommendations"].append("磁盘使用率过高,建议清理日志或扩容")
return report
def monitor_loop(self):
"""监控主循环"""
logger.info("开始端口监控...")
restart_count = 0
last_restart_time = 0
while True:
try:
# 生成报告
report = self.generate_report()
# 检查是否需要重启
need_restart = False
if not report["gunicorn_status"]["running"]:
logger.warning("Gunicorn进程未运行")
need_restart = True
if not report["service_response"]["ok"]:
logger.warning("服务无响应")
need_restart = True
# 执行重启
if need_restart and self.config['auto_restart']:
current_time = time.time()
# 检查重启限制
if (current_time - last_restart_time > 300 and # 5分钟内不重复重启
restart_count < self.config['max_restart_attempts']):
self.log_alert("检测到服务异常,尝试自动重启")
if self.restart_service():
restart_count += 1
last_restart_time = current_time
else:
self.log_alert("自动重启失败", "ERROR")
else:
self.log_alert("达到重启限制,跳过自动重启", "ERROR")
# 记录状态
status_msg = f"端口占用: {report['port_usage']['occupied']}, " \
f"Gunicorn: {report['gunicorn_status']['running']}, " \
f"服务响应: {report['service_response']['ok']}"
logger.info(status_msg)
# 等待下次检查
time.sleep(self.config['check_interval'])
except KeyboardInterrupt:
logger.info("监控被用户中断")
break
except Exception as e:
logger.error(f"监控过程中发生错误: {e}")
time.sleep(self.config['check_interval'])
def show_status(self):
"""显示当前状态"""
report = self.generate_report()
print("=== Flask应用监控状态 ===")
print(f"检查时间: {report['timestamp']}")
print()
print("1. 端口占用检查:")
if report['port_usage']['occupied']:
print(f" ✅ 端口 {PORT} 被占用")
print(f" 进程: {', '.join(report['port_usage']['processes'])}")
else:
print(f" ❌ 端口 {PORT} 未被占用")
print()
print("2. Gunicorn进程检查:")
if report['gunicorn_status']['running']:
print(f"{report['gunicorn_status']['status']}")
else:
print(f"{report['gunicorn_status']['status']}")
print()
print("3. 服务响应检查:")
if report['service_response']['ok']:
print(f"{report['service_response']['response']}")
else:
print(f"{report['service_response']['response']}")
print()
print("4. 系统资源:")
sys_info = report['system_info']
print(f" CPU使用率: {sys_info.get('cpu_usage', 'N/A')}%")
print(f" 内存使用率: {sys_info.get('memory_usage', 'N/A')}%")
print(f" 磁盘使用率: {sys_info.get('disk_usage', 'N/A')}%")
if report['recommendations']:
print()
print("5. 建议:")
for i, rec in enumerate(report['recommendations'], 1):
print(f" {i}. {rec}")
def main():
monitor = PortMonitor()
if len(sys.argv) > 1:
command = sys.argv[1]
if command == "status":
monitor.show_status()
elif command == "report":
report = monitor.generate_report()
print(json.dumps(report, indent=2, ensure_ascii=False))
elif command == "restart":
monitor.restart_service()
elif command == "monitor":
monitor.monitor_loop()
elif command == "config":
print(json.dumps(monitor.config, indent=2, ensure_ascii=False))
else:
print("用法: python port_monitor.py [status|report|restart|monitor|config]")
else:
monitor.monitor_loop()
if __name__ == "__main__":
main()