配置监控和日志系统

This commit is contained in:
2025-08-17 22:10:51 +08:00
parent 799416335b
commit 23a5c907f7
8 changed files with 1706 additions and 1 deletions

View File

@@ -0,0 +1,409 @@
# Flask 提示词大师 - 监控和日志系统使用指南
## 快速开始
### 1. 启动监控系统
#### 方法一:使用批处理脚本(推荐)
```bash
# 双击运行或在命令行执行
start_monitor.bat
```
#### 方法二直接使用Python
```bash
# 激活虚拟环境
.venv\Scripts\Activate.ps1
# 启动监控管理器
python monitor_manager.py
```
### 2. 基本操作
启动后,您将看到交互式界面,可以输入以下命令:
- `start` - 启动持续监控
- `stop` - 停止监控
- `status` - 检查服务状态
- `logs` - 管理日志文件
- `report` - 生成监控报告
- `dashboard` - 显示完整仪表板
- `quit` - 退出监控系统
## 详细功能说明
### 1. 服务状态监控
#### 检查服务状态
```bash
# 使用监控管理器
python monitor_manager.py status
# 直接使用监控脚本
python simple_monitor.py check
```
**输出示例:**
```
🔍 检查服务状态...
✅ 服务运行正常
服务运行正常
```
#### 启动持续监控
```bash
# 在交互式界面中输入
start
# 或直接运行
python simple_monitor.py
```
**功能特点:**
- 每30秒自动检查服务健康状态
- 监控响应时间超过2秒发出警告
- 自动检测错误日志
- 生成监控报告
### 2. 日志管理
#### 查看日志统计
```bash
# 使用监控管理器
python monitor_manager.py logs stats
# 直接使用日志管理器
python log_manager.py stats
```
**输出示例:**
```
日志统计信息:
总文件数: 2
总大小: 0.00MB
当前日志文件:
app.log: 0.00MB
simple_service.log: 0.00MB
```
#### 日志轮转
```bash
# 使用监控管理器
python monitor_manager.py logs rotate
# 直接使用日志管理器
python log_manager.py rotate
```
**功能说明:**
- 自动检测超过10MB的日志文件
- 将大文件移动到归档目录
- 自动压缩归档文件gzip格式
#### 清理旧日志
```bash
# 使用监控管理器
python monitor_manager.py logs cleanup
# 直接使用日志管理器
python log_manager.py cleanup
```
**功能说明:**
- 自动删除30天前的归档日志
- 释放磁盘空间
- 保持日志目录整洁
### 3. 监控报告
#### 生成监控报告
```bash
# 使用监控管理器
python monitor_manager.py report
# 直接使用监控脚本
python simple_monitor.py report
```
**报告内容:**
- 监控运行时间
- 总请求数和成功率
- 平均响应时间
- 最后检查时间
#### 显示完整仪表板
```bash
python monitor_manager.py dashboard
```
**仪表板包含:**
- 服务状态检查
- 日志统计信息
- 监控报告摘要
## 配置文件说明
### 1. 监控配置
#### 监控间隔
**文件:** `simple_monitor.py`
**参数:** `monitor_interval = 30`
**说明:** 健康检查的执行间隔(秒)
#### 响应时间阈值
**文件:** `simple_monitor.py`
**参数:**`check_health` 方法中设置
**说明:** 超过此时间会发出警告默认2秒
### 2. 日志配置
#### 日志轮转阈值
**文件:** `log_manager.py`
**参数:** `max_file_size = 10 * 1024 * 1024`
**说明:** 日志文件超过10MB自动轮转
#### 日志保留时间
**文件:** `log_manager.py`
**参数:** `retention_days = 30`
**说明:** 归档日志保留30天
#### 日志压缩
**文件:** `log_manager.py`
**参数:** `compress_files = True`
**说明:** 启用gzip压缩归档文件
## 自动化配置
### 1. Windows 定时任务
#### 创建监控任务
```batch
# 每天上午9点运行监控
schtasks /create /tn "FlaskMonitor" /tr "python D:\wxxcx\aitsc\simple_monitor.py" /sc daily /st 09:00
# 每天凌晨2点维护日志
schtasks /create /tn "FlaskLogMaintenance" /tr "python D:\wxxcx\aitsc\log_manager.py" /sc daily /st 02:00
```
#### 管理定时任务
```batch
# 查看任务
schtasks /query /tn "FlaskMonitor"
# 删除任务
schtasks /delete /tn "FlaskMonitor" /f
```
### 2. 服务集成
#### 与Windows服务集成
可以将监控脚本集成到现有的Windows服务中
1. 修改 `simple_windows_service.py`
2. 在服务启动时自动启动监控
3. 在服务停止时自动停止监控
#### 与Docker集成
如果使用Docker部署
1. 将监控脚本添加到Dockerfile
2. 配置日志卷挂载
3. 使用Docker的健康检查机制
## 故障排除
### 1. 常见问题
#### 问题:监控显示服务异常
**可能原因:**
- 应用服务未启动
- 端口配置错误
- 网络连接问题
**解决方法:**
```bash
# 检查服务是否运行
curl http://localhost:5000/health
# 检查端口占用
netstat -an | findstr :5000
# 重启应用服务
python simple_windows_service.py start
```
#### 问题:日志文件编码错误
**可能原因:**
- 日志文件包含非UTF-8字符
- 系统编码设置问题
**解决方法:**
```bash
# 清理损坏的日志文件
del logs\app.log
# 重新启动服务生成新日志
python simple_windows_service.py restart
```
#### 问题:监控脚本无法启动
**可能原因:**
- 虚拟环境未激活
- 依赖包未安装
- Python路径问题
**解决方法:**
```bash
# 激活虚拟环境
.venv\Scripts\Activate.ps1
# 安装依赖
pip install -r requirements.txt
# 检查Python路径
python -c "import sys; print(sys.path)"
```
### 2. 调试方法
#### 查看监控日志
```bash
# 实时查看监控日志
tail -f logs/monitor.log
# 查看最近的监控日志
type logs\monitor.log
```
#### 查看日志管理日志
```bash
# 实时查看日志管理日志
tail -f logs/log_manager.log
# 查看最近的日志管理日志
type logs\log_manager.log
```
#### 手动测试健康检查
```bash
# 使用curl测试
curl http://localhost:5000/health
# 使用PowerShell测试
Invoke-WebRequest -Uri http://localhost:5000/health
```
## 性能优化
### 1. 监控性能优化
#### 调整监控间隔
- 生产环境30-60秒
- 开发环境10-30秒
- 调试环境5-10秒
#### 优化响应时间阈值
- 根据实际性能调整警告阈值
- 考虑网络延迟和服务器负载
### 2. 日志性能优化
#### 日志轮转策略
- 根据磁盘空间调整轮转阈值
- 考虑日志写入频率
#### 压缩策略
- 启用压缩减少存储空间
- 平衡压缩时间和存储空间
## 扩展功能
### 1. 告警系统
#### 邮件告警
可以扩展监控脚本,添加邮件告警功能:
```python
import smtplib
from email.mime.text import MIMEText
def send_alert(subject, message):
# 配置邮件服务器
# 发送告警邮件
pass
```
#### 短信告警
可以集成短信服务提供商的API
```python
def send_sms_alert(message):
# 调用短信API
# 发送告警短信
pass
```
### 2. 监控面板
#### Web监控面板
可以开发一个Web界面来显示监控数据
```python
from flask import Flask, render_template
app = Flask(__name__)
@app.route('/monitor')
def monitor_dashboard():
# 读取监控报告
# 渲染监控面板
return render_template('monitor.html')
```
#### 集成Grafana
可以将监控数据发送到Grafana进行可视化
```python
def send_to_grafana(metrics):
# 发送指标到Grafana
# 配置数据源和面板
pass
```
## 最佳实践
### 1. 监控最佳实践
1. **设置合理的监控间隔**:避免过于频繁的检查
2. **配置适当的告警阈值**:避免误报和漏报
3. **定期检查监控日志**:及时发现和解决问题
4. **备份监控配置**:确保配置的可恢复性
### 2. 日志最佳实践
1. **定期清理旧日志**:避免磁盘空间不足
2. **监控日志文件大小**:及时进行轮转
3. **保留重要日志**:确保问题可追溯
4. **配置日志级别**:根据环境调整日志详细程度
### 3. 运维最佳实践
1. **自动化部署监控**:将监控系统集成到部署流程
2. **定期更新监控脚本**:保持功能的最新性
3. **建立监控文档**:记录监控配置和操作流程
4. **培训运维人员**:确保团队能够有效使用监控系统
## 总结
通过本指南,您应该能够:
1. **快速启动监控系统**:使用提供的脚本和命令
2. **有效管理日志文件**:轮转、清理、压缩日志
3. **生成监控报告**:了解服务运行状态
4. **解决常见问题**:使用故障排除方法
5. **优化系统性能**:根据实际需求调整配置
6. **扩展监控功能**:添加告警和可视化功能
这套监控和日志系统为Flask提示词大师项目提供了完整的运维支持确保服务的稳定运行和问题的及时发现。

View File

@@ -0,0 +1,311 @@
# Flask 提示词大师 - 监控和日志系统配置总结
## 概述
本文档总结了为 Flask 提示词大师项目配置的监控和日志系统,包括服务监控、日志管理、性能监控等功能。
## 系统架构
### 1. 监控系统
#### 1.1 简化监控脚本 (`simple_monitor.py`)
- **功能**: 基础服务健康检查、响应时间监控、日志文件检查
- **特点**: 不依赖外部库,轻量级实现
- **监控项目**:
- 应用健康状态 (`/health` 端点)
- 响应时间统计
- 错误日志检测
- 监控报告生成
#### 1.2 监控管理脚本 (`monitor_manager.py`)
- **功能**: 统一管理监控和日志功能
- **特点**: 支持交互式操作和命令行操作
- **管理功能**:
- 启动/停止监控
- 服务状态检查
- 日志管理
- 报告生成
- 监控仪表板
### 2. 日志管理系统
#### 2.1 日志管理脚本 (`log_manager.py`)
- **功能**: 日志轮转、清理、压缩
- **特点**: 自动化日志维护
- **管理功能**:
- 日志文件轮转超过10MB自动轮转
- 旧日志清理保留30天
- 日志文件压缩gzip格式
- 日志统计信息
## 文件结构
```
aitsc/
├── simple_monitor.py # 简化监控脚本
├── log_manager.py # 日志管理脚本
├── monitor_manager.py # 监控管理脚本
├── logs/ # 日志目录
│ ├── app.log # 应用日志
│ ├── simple_service.log # 服务日志
│ ├── monitor.log # 监控日志
│ ├── log_manager.log # 日志管理日志
│ ├── monitor_report.json # 监控报告
│ └── archive/ # 归档目录
│ └── *.log.gz # 压缩的归档日志
```
## 使用方法
### 1. 监控系统使用
#### 1.1 启动监控
```bash
# 交互式模式
python monitor_manager.py
# 直接启动监控
python monitor_manager.py start
# 直接运行监控脚本
python simple_monitor.py
```
#### 1.2 检查服务状态
```bash
# 使用监控管理器
python monitor_manager.py status
# 直接使用监控脚本
python simple_monitor.py check
```
#### 1.3 生成监控报告
```bash
# 使用监控管理器
python monitor_manager.py report
# 直接使用监控脚本
python simple_monitor.py report
```
#### 1.4 显示监控仪表板
```bash
python monitor_manager.py dashboard
```
### 2. 日志管理系统使用
#### 2.1 查看日志统计
```bash
# 使用监控管理器
python monitor_manager.py logs stats
# 直接使用日志管理器
python log_manager.py stats
```
#### 2.2 日志轮转
```bash
# 使用监控管理器
python monitor_manager.py logs rotate
# 直接使用日志管理器
python log_manager.py rotate
```
#### 2.3 清理旧日志
```bash
# 使用监控管理器
python monitor_manager.py logs cleanup
# 直接使用日志管理器
python log_manager.py cleanup
```
#### 2.4 完整日志维护
```bash
# 轮转和清理
python log_manager.py
```
## 配置参数
### 1. 监控配置
#### 1.1 监控间隔
- **默认值**: 30秒
- **位置**: `simple_monitor.py` 中的 `monitor_interval`
- **说明**: 健康检查的执行间隔
#### 1.2 响应时间阈值
- **默认值**: 2.0秒
- **位置**: `simple_monitor.py` 中的阈值检查
- **说明**: 超过此时间会发出警告
### 2. 日志配置
#### 2.1 日志轮转阈值
- **默认值**: 10MB
- **位置**: `log_manager.py` 中的 `max_file_size`
- **说明**: 日志文件超过此大小会自动轮转
#### 2.2 日志保留时间
- **默认值**: 30天
- **位置**: `log_manager.py` 中的 `retention_days`
- **说明**: 归档日志的保留时间
#### 2.3 日志压缩
- **默认值**: 启用
- **位置**: `log_manager.py` 中的 `compress_files`
- **说明**: 是否压缩归档的日志文件
## 监控指标
### 1. 服务健康指标
- **状态**: 运行/停止
- **环境**: development/production
- **响应时间**: 平均响应时间
- **成功率**: 健康检查成功率
### 2. 性能指标
- **总请求数**: 监控期间的总请求数
- **成功请求数**: 成功的健康检查次数
- **失败请求数**: 失败的健康检查次数
- **平均响应时间**: 健康检查的平均响应时间
### 3. 日志指标
- **日志文件数量**: 当前日志文件总数
- **日志总大小**: 所有日志文件的总大小
- **错误日志**: 最近发现的错误日志数量
## 报告生成
### 1. 监控报告 (`monitor_report.json`)
```json
{
"timestamp": "2025-08-17T21:58:19.822324",
"uptime": "0:00:00",
"total_requests": 0,
"successful_requests": 0,
"failed_requests": 0,
"success_rate": "0.00%",
"avg_response_time": "0.00s",
"last_check": null
}
```
### 2. 日志统计报告
- 当前日志文件列表及大小
- 归档日志文件列表及大小
- 总文件数和总大小
## 自动化建议
### 1. 定时任务配置
#### 1.1 Windows 任务计划程序
```batch
# 创建定时监控任务
schtasks /create /tn "FlaskMonitor" /tr "python D:\wxxcx\aitsc\simple_monitor.py" /sc daily /st 09:00
# 创建定时日志维护任务
schtasks /create /tn "FlaskLogMaintenance" /tr "python D:\wxxcx\aitsc\log_manager.py" /sc daily /st 02:00
```
#### 1.2 Linux Cron 任务
```bash
# 编辑 crontab
crontab -e
# 添加定时任务
# 每天上午9点运行监控
0 9 * * * cd /path/to/aitsc && python simple_monitor.py
# 每天凌晨2点维护日志
0 2 * * * cd /path/to/aitsc && python log_manager.py
```
### 2. 服务集成
#### 2.1 与 Windows 服务集成
- 将监控脚本集成到现有的 Windows 服务中
- 在服务启动时自动启动监控
- 在服务停止时自动停止监控
#### 2.2 与 Docker 集成
- 将监控脚本添加到 Docker 容器中
- 使用 Docker 的健康检查机制
- 配置日志卷挂载
## 故障排除
### 1. 常见问题
#### 1.1 编码问题
- **问题**: Unicode 字符显示异常
- **解决**: 已移除所有 Unicode 表情符号,使用纯文本
#### 1.2 权限问题
- **问题**: 无法访问日志文件
- **解决**: 确保脚本有读写权限
#### 1.3 服务连接问题
- **问题**: 无法连接到应用服务
- **解决**: 检查服务是否运行,端口是否正确
### 2. 调试方法
#### 2.1 查看监控日志
```bash
tail -f logs/monitor.log
```
#### 2.2 查看日志管理日志
```bash
tail -f logs/log_manager.log
```
#### 2.3 手动测试健康检查
```bash
curl http://localhost:5000/health
```
## 扩展建议
### 1. 监控扩展
- 添加数据库连接监控
- 添加磁盘空间监控
- 添加网络连接监控
- 添加邮件/短信告警
### 2. 日志扩展
- 添加日志分析功能
- 添加日志搜索功能
- 添加日志可视化
- 集成 ELK 栈
### 3. 性能扩展
- 添加性能指标收集
- 添加慢查询监控
- 添加内存使用监控
- 添加 CPU 使用监控
## 总结
通过配置这套监控和日志系统,我们实现了:
1. **自动化监控**: 服务健康状态自动检查
2. **日志管理**: 自动轮转、清理、压缩日志文件
3. **统一管理**: 通过监控管理器统一管理所有功能
4. **报告生成**: 自动生成监控报告和统计信息
5. **故障排除**: 提供完整的故障排除工具
这套系统为 Flask 提示词大师项目提供了完整的运维支持,确保服务的稳定运行和问题的及时发现。
## 下一步建议
1. **配置告警系统**: 添加邮件或短信告警功能
2. **集成监控面板**: 集成 Grafana 等可视化监控面板
3. **性能优化**: 根据监控数据优化应用性能
4. **自动化部署**: 将监控系统集成到 CI/CD 流程中

216
log_manager.py Normal file
View File

@@ -0,0 +1,216 @@
#!/usr/bin/env python3
"""
Flask 提示词大师 - 日志管理脚本
用于日志轮转、清理和压缩
"""
import os
import sys
import gzip
import shutil
import logging
from datetime import datetime, timedelta
from pathlib import Path
# 添加项目路径到 Python 路径
project_root = Path(__file__).parent
sys.path.insert(0, str(project_root))
def setup_logging():
"""配置日志管理日志"""
log_dir = project_root / "logs"
log_dir.mkdir(exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_dir / "log_manager.log", encoding='utf-8'),
logging.StreamHandler()
]
)
return logging.getLogger(__name__)
class LogManager:
"""日志管理类"""
def __init__(self):
self.logger = setup_logging()
self.log_dir = project_root / "logs"
self.archive_dir = self.log_dir / "archive"
self.archive_dir.mkdir(exist_ok=True)
# 日志保留策略
self.retention_days = 30 # 保留天数
self.max_file_size = 10 * 1024 * 1024 # 10MB
self.compress_files = True # 是否压缩归档文件
def rotate_logs(self):
"""日志轮转"""
try:
self.logger.info("🔄 开始日志轮转...")
# 获取所有日志文件
log_files = list(self.log_dir.glob("*.log"))
rotated_count = 0
for log_file in log_files:
if log_file.name in ['log_manager.log', 'monitor.log']:
continue # 跳过管理日志
# 检查文件大小
if log_file.stat().st_size > self.max_file_size:
# 创建归档文件名
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
archive_name = f"{log_file.stem}_{timestamp}.log"
archive_path = self.archive_dir / archive_name
# 移动文件到归档目录
shutil.move(str(log_file), str(archive_path))
# 压缩归档文件
if self.compress_files:
self.compress_file(archive_path)
self.logger.info(f"📦 轮转日志文件: {log_file.name} -> {archive_name}")
rotated_count += 1
self.logger.info(f"✅ 日志轮转完成,共轮转 {rotated_count} 个文件")
return rotated_count
except Exception as e:
self.logger.error(f"❌ 日志轮转失败: {e}")
return 0
def compress_file(self, file_path):
"""压缩文件"""
try:
compressed_path = file_path.with_suffix('.log.gz')
with open(file_path, 'rb') as f_in:
with gzip.open(compressed_path, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
# 删除原文件
file_path.unlink()
self.logger.info(f"🗜️ 压缩文件: {file_path.name} -> {compressed_path.name}")
except Exception as e:
self.logger.error(f"❌ 压缩文件失败 {file_path}: {e}")
def cleanup_old_logs(self):
"""清理旧日志"""
try:
self.logger.info("🧹 开始清理旧日志...")
cutoff_date = datetime.now() - timedelta(days=self.retention_days)
deleted_count = 0
# 清理归档目录中的旧文件
for file_path in self.archive_dir.iterdir():
if file_path.is_file():
# 获取文件修改时间
mtime = datetime.fromtimestamp(file_path.stat().st_mtime)
if mtime < cutoff_date:
file_path.unlink()
self.logger.info(f"🗑️ 删除旧日志: {file_path.name}")
deleted_count += 1
self.logger.info(f"✅ 清理完成,共删除 {deleted_count} 个旧文件")
return deleted_count
except Exception as e:
self.logger.error(f"❌ 清理旧日志失败: {e}")
return 0
def get_log_stats(self):
"""获取日志统计信息"""
try:
stats = {
'current_logs': {},
'archived_logs': {},
'total_size': 0,
'file_count': 0
}
# 当前日志文件
for log_file in self.log_dir.glob("*.log"):
if log_file.name not in ['log_manager.log', 'monitor.log']:
size = log_file.stat().st_size
stats['current_logs'][log_file.name] = {
'size': size,
'size_mb': size / 1024 / 1024
}
stats['total_size'] += size
stats['file_count'] += 1
# 归档日志文件
for file_path in self.archive_dir.iterdir():
if file_path.is_file():
size = file_path.stat().st_size
stats['archived_logs'][file_path.name] = {
'size': size,
'size_mb': size / 1024 / 1024
}
stats['total_size'] += size
stats['file_count'] += 1
stats['total_size_mb'] = stats['total_size'] / 1024 / 1024
return stats
except Exception as e:
self.logger.error(f"❌ 获取日志统计失败: {e}")
return None
def print_stats(self):
"""打印日志统计信息"""
stats = self.get_log_stats()
if not stats:
return
print("日志统计信息:")
print(f" 总文件数: {stats['file_count']}")
print(f" 总大小: {stats['total_size_mb']:.2f}MB")
print()
if stats['current_logs']:
print("当前日志文件:")
for name, info in stats['current_logs'].items():
print(f" {name}: {info['size_mb']:.2f}MB")
print()
if stats['archived_logs']:
print("归档日志文件:")
for name, info in stats['archived_logs'].items():
print(f" {name}: {info['size_mb']:.2f}MB")
def main():
"""主函数"""
log_manager = LogManager()
if len(sys.argv) == 1:
# 默认操作:轮转和清理
log_manager.rotate_logs()
log_manager.cleanup_old_logs()
log_manager.print_stats()
elif sys.argv[1] == "rotate":
# 只进行轮转
log_manager.rotate_logs()
elif sys.argv[1] == "cleanup":
# 只进行清理
log_manager.cleanup_old_logs()
elif sys.argv[1] == "stats":
# 显示统计信息
log_manager.print_stats()
else:
print("用法:")
print(" python log_manager.py # 轮转和清理")
print(" python log_manager.py rotate # 只轮转")
print(" python log_manager.py cleanup # 只清理")
print(" python log_manager.py stats # 显示统计")
if __name__ == '__main__':
main()

257
monitor_manager.py Normal file
View File

@@ -0,0 +1,257 @@
#!/usr/bin/env python3
"""
Flask 提示词大师 - 监控管理脚本
统一管理监控和日志功能
"""
import os
import sys
import time
import json
import subprocess
import threading
from datetime import datetime
from pathlib import Path
# 添加项目路径到 Python 路径
project_root = Path(__file__).parent
sys.path.insert(0, str(project_root))
class MonitorManager:
"""监控管理类"""
def __init__(self):
self.project_root = project_root
self.monitor_process = None
self.log_manager_process = None
self.is_running = False
def start_monitoring(self):
"""启动监控"""
try:
print("🚀 启动服务监控...")
# 启动监控脚本
self.monitor_process = subprocess.Popen(
[sys.executable, "simple_monitor.py"],
cwd=self.project_root,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
print(f"✅ 监控进程已启动 (PID: {self.monitor_process.pid})")
return True
except Exception as e:
print(f"❌ 启动监控失败: {e}")
return False
def stop_monitoring(self):
"""停止监控"""
try:
if self.monitor_process:
print("🛑 停止服务监控...")
self.monitor_process.terminate()
self.monitor_process.wait(timeout=5)
print("✅ 监控已停止")
self.monitor_process = None
else:
print(" 没有运行中的监控进程")
except subprocess.TimeoutExpired:
print("⚠️ 监控进程未响应,强制终止")
self.monitor_process.kill()
self.monitor_process = None
except Exception as e:
print(f"❌ 停止监控失败: {e}")
def check_service_status(self):
"""检查服务状态"""
try:
print("🔍 检查服务状态...")
result = subprocess.run(
[sys.executable, "simple_monitor.py", "check"],
cwd=self.project_root,
capture_output=True,
text=True,
timeout=10
)
if result.returncode == 0:
print("✅ 服务运行正常")
if result.stdout:
print(result.stdout)
return True
else:
print("❌ 服务运行异常")
if result.stderr:
print(result.stderr)
return False
except subprocess.TimeoutExpired:
print("⚠️ 服务检查超时")
return False
except Exception as e:
print(f"❌ 服务检查失败: {e}")
return False
def manage_logs(self, action="stats"):
"""管理日志"""
try:
print(f"📁 执行日志管理操作: {action}")
result = subprocess.run(
[sys.executable, "log_manager.py", action],
cwd=self.project_root,
capture_output=True,
text=True,
timeout=30
)
if result.returncode == 0:
print("✅ 日志管理操作完成")
if result.stdout:
print(result.stdout)
else:
print("❌ 日志管理操作失败")
if result.stderr:
print(result.stderr)
except subprocess.TimeoutExpired:
print("⚠️ 日志管理操作超时")
except Exception as e:
print(f"❌ 日志管理失败: {e}")
def generate_report(self):
"""生成监控报告"""
try:
print("📊 生成监控报告...")
result = subprocess.run(
[sys.executable, "simple_monitor.py", "report"],
cwd=self.project_root,
capture_output=True,
text=True,
timeout=10
)
if result.returncode == 0:
print("✅ 监控报告已生成")
if result.stdout:
print(result.stdout)
else:
print("❌ 生成监控报告失败")
if result.stderr:
print(result.stderr)
except subprocess.TimeoutExpired:
print("⚠️ 生成报告超时")
except Exception as e:
print(f"❌ 生成报告失败: {e}")
def show_dashboard(self):
"""显示监控仪表板"""
try:
print("=" * 60)
print("📊 Flask 提示词大师 - 监控仪表板")
print("=" * 60)
# 检查服务状态
service_ok = self.check_service_status()
# 显示日志统计
self.manage_logs("stats")
# 显示监控报告
self.generate_report()
print("=" * 60)
except Exception as e:
print(f"❌ 显示仪表板失败: {e}")
def run_interactive(self):
"""运行交互式模式"""
self.is_running = True
print("🎮 进入交互式监控模式")
print("命令:")
print(" start - 启动监控")
print(" stop - 停止监控")
print(" status - 检查服务状态")
print(" logs - 管理日志")
print(" report - 生成报告")
print(" dashboard - 显示仪表板")
print(" quit - 退出")
while self.is_running:
try:
command = input("\n📝 请输入命令: ").strip().lower()
if command == "quit":
self.is_running = False
print("👋 退出监控管理")
elif command == "start":
self.start_monitoring()
elif command == "stop":
self.stop_monitoring()
elif command == "status":
self.check_service_status()
elif command == "logs":
self.manage_logs()
elif command == "report":
self.generate_report()
elif command == "dashboard":
self.show_dashboard()
else:
print("❓ 未知命令,请重试")
except KeyboardInterrupt:
print("\n👋 退出监控管理")
self.is_running = False
except Exception as e:
print(f"❌ 执行命令失败: {e}")
# 确保停止监控
self.stop_monitoring()
def main():
"""主函数"""
manager = MonitorManager()
if len(sys.argv) == 1:
# 交互式模式
manager.run_interactive()
elif sys.argv[1] == "start":
# 启动监控
manager.start_monitoring()
elif sys.argv[1] == "stop":
# 停止监控
manager.stop_monitoring()
elif sys.argv[1] == "status":
# 检查状态
manager.check_service_status()
elif sys.argv[1] == "logs":
# 管理日志
action = sys.argv[2] if len(sys.argv) > 2 else "stats"
manager.manage_logs(action)
elif sys.argv[1] == "report":
# 生成报告
manager.generate_report()
elif sys.argv[1] == "dashboard":
# 显示仪表板
manager.show_dashboard()
else:
print("用法:")
print(" python monitor_manager.py # 交互式模式")
print(" python monitor_manager.py start # 启动监控")
print(" python monitor_manager.py stop # 停止监控")
print(" python monitor_manager.py status # 检查状态")
print(" python monitor_manager.py logs [action] # 管理日志")
print(" python monitor_manager.py report # 生成报告")
print(" python monitor_manager.py dashboard # 显示仪表板")
if __name__ == '__main__':
main()

263
monitor_service.py Normal file
View File

@@ -0,0 +1,263 @@
#!/usr/bin/env python3
"""
Flask 提示词大师 - 服务监控脚本
用于监控应用状态、性能和日志
"""
import os
import sys
import time
import json
import logging
import requests
import psutil
import threading
from datetime import datetime, timedelta
from pathlib import Path
# 添加项目路径到 Python 路径
project_root = Path(__file__).parent
sys.path.insert(0, str(project_root))
def setup_logging():
"""配置监控日志"""
log_dir = project_root / "logs"
log_dir.mkdir(exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_dir / "monitor.log", encoding='utf-8'),
logging.StreamHandler()
]
)
return logging.getLogger(__name__)
class ServiceMonitor:
"""服务监控类"""
def __init__(self):
self.logger = setup_logging()
self.app_url = "http://localhost:5000"
self.monitor_interval = 30 # 监控间隔(秒)
self.alert_thresholds = {
'cpu_percent': 80, # CPU 使用率阈值
'memory_percent': 80, # 内存使用率阈值
'response_time': 2.0, # 响应时间阈值(秒)
'error_rate': 0.1 # 错误率阈值
}
self.stats = {
'start_time': datetime.now(),
'total_requests': 0,
'successful_requests': 0,
'failed_requests': 0,
'avg_response_time': 0,
'last_check': None
}
def check_health(self):
"""检查应用健康状态"""
try:
start_time = time.time()
response = requests.get(f"{self.app_url}/health", timeout=5)
response_time = time.time() - start_time
self.stats['total_requests'] += 1
self.stats['last_check'] = datetime.now()
if response.status_code == 200:
self.stats['successful_requests'] += 1
health_data = response.json()
self.logger.info(f"健康检查成功 - 响应时间: {response_time:.2f}s")
self.logger.info(f"应用状态: {health_data.get('status', 'unknown')}")
self.logger.info(f"运行环境: {health_data.get('environment', 'unknown')}")
# 更新平均响应时间
if self.stats['avg_response_time'] == 0:
self.stats['avg_response_time'] = response_time
else:
self.stats['avg_response_time'] = (self.stats['avg_response_time'] + response_time) / 2
# 检查响应时间阈值
if response_time > self.alert_thresholds['response_time']:
self.logger.warning(f"响应时间过长: {response_time:.2f}s")
return True, health_data
else:
self.stats['failed_requests'] += 1
self.logger.error(f"健康检查失败 - 状态码: {response.status_code}")
return False, None
except requests.exceptions.RequestException as e:
self.stats['failed_requests'] += 1
self.logger.error(f"健康检查异常: {e}")
return False, None
def check_system_resources(self):
"""检查系统资源使用情况"""
try:
# CPU 使用率
cpu_percent = psutil.cpu_percent(interval=1)
# 内存使用率
memory = psutil.virtual_memory()
memory_percent = memory.percent
# 磁盘使用率
disk = psutil.disk_usage('/')
disk_percent = disk.percent
# 网络连接数
connections = len(psutil.net_connections())
self.logger.info(f"系统资源 - CPU: {cpu_percent}%, 内存: {memory_percent}%, 磁盘: {disk_percent}%")
self.logger.info(f"网络连接数: {connections}")
# 检查阈值
if cpu_percent > self.alert_thresholds['cpu_percent']:
self.logger.warning(f"CPU 使用率过高: {cpu_percent}%")
if memory_percent > self.alert_thresholds['memory_percent']:
self.logger.warning(f"内存使用率过高: {memory_percent}%")
return {
'cpu_percent': cpu_percent,
'memory_percent': memory_percent,
'disk_percent': disk_percent,
'connections': connections
}
except Exception as e:
self.logger.error(f"系统资源检查异常: {e}")
return None
def check_log_files(self):
"""检查日志文件"""
try:
log_dir = project_root / "logs"
if not log_dir.exists():
self.logger.warning("日志目录不存在")
return
log_files = list(log_dir.glob("*.log"))
total_size = sum(f.stat().st_size for f in log_files)
self.logger.info(f"日志文件数量: {len(log_files)}, 总大小: {total_size / 1024 / 1024:.2f}MB")
# 检查最近的错误日志
for log_file in log_files:
if log_file.name in ['monitor.log']:
continue
try:
# 读取最后几行日志
with open(log_file, 'r', encoding='utf-8') as f:
lines = f.readlines()
recent_lines = lines[-10:] # 最近10行
# 检查是否有错误
error_lines = [line for line in recent_lines if 'ERROR' in line or 'CRITICAL' in line]
if error_lines:
self.logger.warning(f"发现错误日志 - {log_file.name}:")
for error_line in error_lines[-3:]: # 显示最近3个错误
self.logger.warning(f" {error_line.strip()}")
except Exception as e:
self.logger.error(f"读取日志文件失败 {log_file}: {e}")
except Exception as e:
self.logger.error(f"日志文件检查异常: {e}")
def generate_report(self):
"""生成监控报告"""
try:
uptime = datetime.now() - self.stats['start_time']
success_rate = (self.stats['successful_requests'] / max(self.stats['total_requests'], 1)) * 100
report = {
'timestamp': datetime.now().isoformat(),
'uptime': str(uptime),
'total_requests': self.stats['total_requests'],
'successful_requests': self.stats['successful_requests'],
'failed_requests': self.stats['failed_requests'],
'success_rate': f"{success_rate:.2f}%",
'avg_response_time': f"{self.stats['avg_response_time']:.2f}s",
'last_check': self.stats['last_check'].isoformat() if self.stats['last_check'] else None
}
# 保存报告
report_file = project_root / "logs" / "monitor_report.json"
with open(report_file, 'w', encoding='utf-8') as f:
json.dump(report, f, indent=2, ensure_ascii=False)
self.logger.info("监控报告已生成")
return report
except Exception as e:
self.logger.error(f"生成监控报告失败: {e}")
return None
def run_monitoring(self):
"""运行监控循环"""
self.logger.info("开始服务监控...")
while True:
try:
# 健康检查
health_ok, health_data = self.check_health()
# 系统资源检查
system_resources = self.check_system_resources()
# 日志文件检查
self.check_log_files()
# 生成报告
if self.stats['total_requests'] % 10 == 0: # 每10次检查生成一次报告
self.generate_report()
# 等待下次检查
time.sleep(self.monitor_interval)
except KeyboardInterrupt:
self.logger.info("监控已停止")
break
except Exception as e:
self.logger.error(f"监控循环异常: {e}")
time.sleep(self.monitor_interval)
def main():
"""主函数"""
if len(sys.argv) == 1:
# 运行监控
monitor = ServiceMonitor()
monitor.run_monitoring()
elif sys.argv[1] == "check":
# 单次检查
monitor = ServiceMonitor()
health_ok, health_data = monitor.check_health()
system_resources = monitor.check_system_resources()
monitor.check_log_files()
if health_ok:
print("✅ 服务运行正常")
else:
print("❌ 服务运行异常")
elif sys.argv[1] == "report":
# 生成报告
monitor = ServiceMonitor()
report = monitor.generate_report()
if report:
print("📊 监控报告:")
for key, value in report.items():
print(f" {key}: {value}")
else:
print("用法:")
print(" python monitor_service.py # 运行监控")
print(" python monitor_service.py check # 单次检查")
print(" python monitor_service.py report # 生成报告")
if __name__ == '__main__':
main()

View File

@@ -9,3 +9,4 @@ waitress>=3.0.0
redis>=5.0.1 redis>=5.0.1
pywin32>=306 pywin32>=306
requests>=2.28.0 requests>=2.28.0
psutil>=5.9.0

213
simple_monitor.py Normal file
View File

@@ -0,0 +1,213 @@
#!/usr/bin/env python3
"""
Flask 提示词大师 - 简化服务监控脚本
用于监控应用状态和日志
"""
import os
import sys
import time
import json
import logging
import requests
from datetime import datetime
from pathlib import Path
# 添加项目路径到 Python 路径
project_root = Path(__file__).parent
sys.path.insert(0, str(project_root))
def setup_logging():
"""配置监控日志"""
log_dir = project_root / "logs"
log_dir.mkdir(exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_dir / "monitor.log", encoding='utf-8'),
logging.StreamHandler()
]
)
return logging.getLogger(__name__)
class SimpleServiceMonitor:
"""简化服务监控类"""
def __init__(self):
self.logger = setup_logging()
self.app_url = "http://localhost:5000"
self.monitor_interval = 30 # 监控间隔(秒)
self.stats = {
'start_time': datetime.now(),
'total_requests': 0,
'successful_requests': 0,
'failed_requests': 0,
'avg_response_time': 0,
'last_check': None
}
def check_health(self):
"""检查应用健康状态"""
try:
start_time = time.time()
response = requests.get(f"{self.app_url}/health", timeout=5)
response_time = time.time() - start_time
self.stats['total_requests'] += 1
self.stats['last_check'] = datetime.now()
if response.status_code == 200:
self.stats['successful_requests'] += 1
health_data = response.json()
self.logger.info(f"健康检查成功 - 响应时间: {response_time:.2f}s")
self.logger.info(f"应用状态: {health_data.get('status', 'unknown')}")
self.logger.info(f"运行环境: {health_data.get('environment', 'unknown')}")
# 更新平均响应时间
if self.stats['avg_response_time'] == 0:
self.stats['avg_response_time'] = response_time
else:
self.stats['avg_response_time'] = (self.stats['avg_response_time'] + response_time) / 2
# 检查响应时间阈值
if response_time > 2.0:
self.logger.warning(f"响应时间过长: {response_time:.2f}s")
return True, health_data
else:
self.stats['failed_requests'] += 1
self.logger.error(f"健康检查失败 - 状态码: {response.status_code}")
return False, None
except requests.exceptions.RequestException as e:
self.stats['failed_requests'] += 1
self.logger.error(f"健康检查异常: {e}")
return False, None
def check_log_files(self):
"""检查日志文件"""
try:
log_dir = project_root / "logs"
if not log_dir.exists():
self.logger.warning("⚠️ 日志目录不存在")
return
log_files = list(log_dir.glob("*.log"))
total_size = sum(f.stat().st_size for f in log_files)
self.logger.info(f"日志文件数量: {len(log_files)}, 总大小: {total_size / 1024 / 1024:.2f}MB")
# 检查最近的错误日志
for log_file in log_files:
if log_file.name in ['monitor.log']:
continue
try:
# 读取最后几行日志
with open(log_file, 'r', encoding='utf-8') as f:
lines = f.readlines()
recent_lines = lines[-10:] # 最近10行
# 检查是否有错误
error_lines = [line for line in recent_lines if 'ERROR' in line or 'CRITICAL' in line]
if error_lines:
self.logger.warning(f"发现错误日志 - {log_file.name}:")
for error_line in error_lines[-3:]: # 显示最近3个错误
self.logger.warning(f" {error_line.strip()}")
except Exception as e:
self.logger.error(f"读取日志文件失败 {log_file}: {e}")
except Exception as e:
self.logger.error(f"日志文件检查异常: {e}")
def generate_report(self):
"""生成监控报告"""
try:
uptime = datetime.now() - self.stats['start_time']
success_rate = (self.stats['successful_requests'] / max(self.stats['total_requests'], 1)) * 100
report = {
'timestamp': datetime.now().isoformat(),
'uptime': str(uptime),
'total_requests': self.stats['total_requests'],
'successful_requests': self.stats['successful_requests'],
'failed_requests': self.stats['failed_requests'],
'success_rate': f"{success_rate:.2f}%",
'avg_response_time': f"{self.stats['avg_response_time']:.2f}s",
'last_check': self.stats['last_check'].isoformat() if self.stats['last_check'] else None
}
# 保存报告
report_file = project_root / "logs" / "monitor_report.json"
with open(report_file, 'w', encoding='utf-8') as f:
json.dump(report, f, indent=2, ensure_ascii=False)
self.logger.info("监控报告已生成")
return report
except Exception as e:
self.logger.error(f"生成监控报告失败: {e}")
return None
def run_monitoring(self):
"""运行监控循环"""
self.logger.info("开始服务监控...")
while True:
try:
# 健康检查
health_ok, health_data = self.check_health()
# 日志文件检查
self.check_log_files()
# 生成报告
if self.stats['total_requests'] % 10 == 0: # 每10次检查生成一次报告
self.generate_report()
# 等待下次检查
time.sleep(self.monitor_interval)
except KeyboardInterrupt:
self.logger.info("监控已停止")
break
except Exception as e:
self.logger.error(f"监控循环异常: {e}")
time.sleep(self.monitor_interval)
def main():
"""主函数"""
if len(sys.argv) == 1:
# 运行监控
monitor = SimpleServiceMonitor()
monitor.run_monitoring()
elif sys.argv[1] == "check":
# 单次检查
monitor = SimpleServiceMonitor()
health_ok, health_data = monitor.check_health()
monitor.check_log_files()
if health_ok:
print("服务运行正常")
else:
print("服务运行异常")
elif sys.argv[1] == "report":
# 生成报告
monitor = SimpleServiceMonitor()
report = monitor.generate_report()
if report:
print("监控报告:")
for key, value in report.items():
print(f" {key}: {value}")
else:
print("用法:")
print(" python simple_monitor.py # 运行监控")
print(" python simple_monitor.py check # 单次检查")
print(" python simple_monitor.py report # 生成报告")
if __name__ == '__main__':
main()

35
start_monitor.bat Normal file
View File

@@ -0,0 +1,35 @@
@echo off
chcp 65001 >nul
echo ============================================================
echo 🚀 Flask 提示词大师 - 监控系统启动脚本
echo ============================================================
echo.
REM 检查虚拟环境
if not exist ".venv\Scripts\Activate.ps1" (
echo ❌ 虚拟环境不存在,请先创建虚拟环境
pause
exit /b 1
)
REM 激活虚拟环境并启动监控
echo 🔧 激活虚拟环境...
call .venv\Scripts\Activate.ps1
echo.
echo 🎮 启动监控管理器...
echo 可用命令:
echo start - 启动监控
echo stop - 停止监控
echo status - 检查服务状态
echo logs - 管理日志
echo report - 生成报告
echo dashboard - 显示仪表板
echo quit - 退出
echo.
python monitor_manager.py
echo.
echo 👋 监控系统已退出
pause