Files
aiagent/backend/app/services/monitoring_service.py

277 lines
9.4 KiB
Python
Raw Normal View History

2026-01-19 00:09:36 +08:00
"""
系统监控服务
提供系统状态执行统计性能指标等监控数据
"""
from sqlalchemy.orm import Session
from sqlalchemy import func, and_, case
from datetime import datetime, timedelta
from typing import Dict, Any, List
from app.models.user import User
from app.models.workflow import Workflow
from app.models.agent import Agent
from app.models.execution import Execution
from app.models.execution_log import ExecutionLog
from app.models.data_source import DataSource
from app.models.model_config import ModelConfig
import logging
logger = logging.getLogger(__name__)
class MonitoringService:
"""系统监控服务"""
@staticmethod
def get_system_overview(db: Session, user_id: str = None) -> Dict[str, Any]:
"""
获取系统概览统计
Args:
db: 数据库会话
user_id: 用户ID如果提供则只统计该用户的数据
Returns:
系统概览数据
"""
# 构建基础查询条件
user_filter = Workflow.user_id == user_id if user_id else True
# 统计工作流数量
workflow_count = db.query(func.count(Workflow.id)).filter(user_filter).scalar() or 0
# 统计Agent数量
agent_filter = Agent.user_id == user_id if user_id else True
agent_count = db.query(func.count(Agent.id)).filter(agent_filter).scalar() or 0
# 统计执行记录数量
execution_filter = None
if user_id:
execution_filter = Execution.workflow_id.in_(
db.query(Workflow.id).filter(Workflow.user_id == user_id)
)
execution_count = db.query(func.count(Execution.id)).filter(
execution_filter if execution_filter else True
).scalar() or 0
# 统计数据源数量
data_source_filter = DataSource.user_id == user_id if user_id else True
data_source_count = db.query(func.count(DataSource.id)).filter(
data_source_filter
).scalar() or 0
# 统计模型配置数量
model_config_filter = ModelConfig.user_id == user_id if user_id else True
model_config_count = db.query(func.count(ModelConfig.id)).filter(
model_config_filter
).scalar() or 0
# 统计用户数量(仅管理员可见)
user_count = None
if not user_id:
user_count = db.query(func.count(User.id)).scalar() or 0
return {
"workflows": workflow_count,
"agents": agent_count,
"executions": execution_count,
"data_sources": data_source_count,
"model_configs": model_config_count,
"users": user_count
}
@staticmethod
def get_execution_statistics(
db: Session,
user_id: str = None,
days: int = 7
) -> Dict[str, Any]:
"""
获取执行统计信息
Args:
db: 数据库会话
user_id: 用户ID如果提供则只统计该用户的数据
days: 统计天数默认7天
Returns:
执行统计数据
"""
# 构建时间范围
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=days)
# 构建查询条件
execution_filter = Execution.created_at >= start_time
if user_id:
execution_filter = and_(
execution_filter,
Execution.workflow_id.in_(
db.query(Workflow.id).filter(Workflow.user_id == user_id)
)
)
# 统计总执行数
total_executions = db.query(func.count(Execution.id)).filter(
execution_filter
).scalar() or 0
# 统计各状态执行数
status_stats = db.query(
Execution.status,
func.count(Execution.id).label('count')
).filter(execution_filter).group_by(Execution.status).all()
status_counts = {status: count for status, count in status_stats}
# 计算成功率
completed = status_counts.get('completed', 0)
failed = status_counts.get('failed', 0)
success_rate = (completed / total_executions * 100) if total_executions > 0 else 0
# 统计平均执行时间
avg_execution_time = db.query(
func.avg(Execution.execution_time)
).filter(
and_(execution_filter, Execution.execution_time.isnot(None))
).scalar() or 0
# 统计最近24小时的执行趋势
hourly_trends = []
for i in range(24):
hour_start = end_time - timedelta(hours=24-i)
hour_end = hour_start + timedelta(hours=1)
hour_filter = and_(
execution_filter,
Execution.created_at >= hour_start,
Execution.created_at < hour_end
)
hour_count = db.query(func.count(Execution.id)).filter(
hour_filter
).scalar() or 0
hourly_trends.append({
"hour": hour_start.strftime("%H:00"),
"count": hour_count
})
return {
"total": total_executions,
"status_counts": status_counts,
"success_rate": round(success_rate, 2),
"avg_execution_time": round(avg_execution_time, 2) if avg_execution_time else 0,
"hourly_trends": hourly_trends
}
@staticmethod
def get_node_type_statistics(
db: Session,
user_id: str = None,
days: int = 7
) -> List[Dict[str, Any]]:
"""
获取节点类型统计
Args:
db: 数据库会话
user_id: 用户ID如果提供则只统计该用户的数据
days: 统计天数默认7天
Returns:
节点类型统计数据
"""
# 构建时间范围
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=days)
# 构建查询条件
execution_filter = Execution.created_at >= start_time
if user_id:
execution_filter = and_(
execution_filter,
Execution.workflow_id.in_(
db.query(Workflow.id).filter(Workflow.user_id == user_id)
)
)
# 获取符合条件的执行ID列表
execution_ids_query = db.query(Execution.id).filter(execution_filter)
execution_ids = [row[0] for row in execution_ids_query.all()]
if not execution_ids:
return []
# 统计各节点类型的执行情况
node_stats = db.query(
ExecutionLog.node_type,
func.count(ExecutionLog.id).label('execution_count'),
func.sum(ExecutionLog.duration).label('total_duration'),
func.avg(ExecutionLog.duration).label('avg_duration'),
func.count(
case((ExecutionLog.level == 'ERROR', 1))
).label('error_count')
).filter(
and_(
ExecutionLog.execution_id.in_(execution_ids),
ExecutionLog.node_type.isnot(None),
ExecutionLog.duration.isnot(None)
)
).group_by(ExecutionLog.node_type).all()
result = []
for node_type, exec_count, total_dur, avg_dur, error_count in node_stats:
result.append({
"node_type": node_type,
"execution_count": exec_count,
"total_duration": round(total_dur or 0, 2),
"avg_duration": round(avg_dur or 0, 2),
"error_count": error_count,
"success_rate": round((exec_count - error_count) / exec_count * 100, 2) if exec_count > 0 else 0
})
return result
@staticmethod
def get_recent_activities(
db: Session,
user_id: str = None,
limit: int = 10
) -> List[Dict[str, Any]]:
"""
获取最近的活动记录
Args:
db: 数据库会话
user_id: 用户ID如果提供则只统计该用户的数据
limit: 返回数量限制
Returns:
最近活动列表
"""
# 构建查询条件
execution_filter = True
if user_id:
execution_filter = Execution.workflow_id.in_(
db.query(Workflow.id).filter(Workflow.user_id == user_id)
)
# 获取最近的执行记录
recent_executions = db.query(Execution).filter(
execution_filter
).order_by(Execution.created_at.desc()).limit(limit).all()
result = []
for execution in recent_executions:
workflow = db.query(Workflow).filter(
Workflow.id == execution.workflow_id
).first() if execution.workflow_id else None
result.append({
"id": execution.id,
"type": "execution",
"workflow_name": workflow.name if workflow else "未知工作流",
"status": execution.status,
"created_at": execution.created_at.isoformat() if execution.created_at else None,
"execution_time": execution.execution_time
})
return result