Files
aiagent/backend/app/services/monitoring_service.py
2026-01-19 00:09:36 +08:00

277 lines
9.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
系统监控服务
提供系统状态、执行统计、性能指标等监控数据
"""
from sqlalchemy.orm import Session
from sqlalchemy import func, and_, case
from datetime import datetime, timedelta
from typing import Dict, Any, List
from app.models.user import User
from app.models.workflow import Workflow
from app.models.agent import Agent
from app.models.execution import Execution
from app.models.execution_log import ExecutionLog
from app.models.data_source import DataSource
from app.models.model_config import ModelConfig
import logging
logger = logging.getLogger(__name__)
class MonitoringService:
"""系统监控服务"""
@staticmethod
def get_system_overview(db: Session, user_id: str = None) -> Dict[str, Any]:
"""
获取系统概览统计
Args:
db: 数据库会话
user_id: 用户ID如果提供则只统计该用户的数据
Returns:
系统概览数据
"""
# 构建基础查询条件
user_filter = Workflow.user_id == user_id if user_id else True
# 统计工作流数量
workflow_count = db.query(func.count(Workflow.id)).filter(user_filter).scalar() or 0
# 统计Agent数量
agent_filter = Agent.user_id == user_id if user_id else True
agent_count = db.query(func.count(Agent.id)).filter(agent_filter).scalar() or 0
# 统计执行记录数量
execution_filter = None
if user_id:
execution_filter = Execution.workflow_id.in_(
db.query(Workflow.id).filter(Workflow.user_id == user_id)
)
execution_count = db.query(func.count(Execution.id)).filter(
execution_filter if execution_filter else True
).scalar() or 0
# 统计数据源数量
data_source_filter = DataSource.user_id == user_id if user_id else True
data_source_count = db.query(func.count(DataSource.id)).filter(
data_source_filter
).scalar() or 0
# 统计模型配置数量
model_config_filter = ModelConfig.user_id == user_id if user_id else True
model_config_count = db.query(func.count(ModelConfig.id)).filter(
model_config_filter
).scalar() or 0
# 统计用户数量(仅管理员可见)
user_count = None
if not user_id:
user_count = db.query(func.count(User.id)).scalar() or 0
return {
"workflows": workflow_count,
"agents": agent_count,
"executions": execution_count,
"data_sources": data_source_count,
"model_configs": model_config_count,
"users": user_count
}
@staticmethod
def get_execution_statistics(
db: Session,
user_id: str = None,
days: int = 7
) -> Dict[str, Any]:
"""
获取执行统计信息
Args:
db: 数据库会话
user_id: 用户ID如果提供则只统计该用户的数据
days: 统计天数默认7天
Returns:
执行统计数据
"""
# 构建时间范围
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=days)
# 构建查询条件
execution_filter = Execution.created_at >= start_time
if user_id:
execution_filter = and_(
execution_filter,
Execution.workflow_id.in_(
db.query(Workflow.id).filter(Workflow.user_id == user_id)
)
)
# 统计总执行数
total_executions = db.query(func.count(Execution.id)).filter(
execution_filter
).scalar() or 0
# 统计各状态执行数
status_stats = db.query(
Execution.status,
func.count(Execution.id).label('count')
).filter(execution_filter).group_by(Execution.status).all()
status_counts = {status: count for status, count in status_stats}
# 计算成功率
completed = status_counts.get('completed', 0)
failed = status_counts.get('failed', 0)
success_rate = (completed / total_executions * 100) if total_executions > 0 else 0
# 统计平均执行时间
avg_execution_time = db.query(
func.avg(Execution.execution_time)
).filter(
and_(execution_filter, Execution.execution_time.isnot(None))
).scalar() or 0
# 统计最近24小时的执行趋势
hourly_trends = []
for i in range(24):
hour_start = end_time - timedelta(hours=24-i)
hour_end = hour_start + timedelta(hours=1)
hour_filter = and_(
execution_filter,
Execution.created_at >= hour_start,
Execution.created_at < hour_end
)
hour_count = db.query(func.count(Execution.id)).filter(
hour_filter
).scalar() or 0
hourly_trends.append({
"hour": hour_start.strftime("%H:00"),
"count": hour_count
})
return {
"total": total_executions,
"status_counts": status_counts,
"success_rate": round(success_rate, 2),
"avg_execution_time": round(avg_execution_time, 2) if avg_execution_time else 0,
"hourly_trends": hourly_trends
}
@staticmethod
def get_node_type_statistics(
db: Session,
user_id: str = None,
days: int = 7
) -> List[Dict[str, Any]]:
"""
获取节点类型统计
Args:
db: 数据库会话
user_id: 用户ID如果提供则只统计该用户的数据
days: 统计天数默认7天
Returns:
节点类型统计数据
"""
# 构建时间范围
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=days)
# 构建查询条件
execution_filter = Execution.created_at >= start_time
if user_id:
execution_filter = and_(
execution_filter,
Execution.workflow_id.in_(
db.query(Workflow.id).filter(Workflow.user_id == user_id)
)
)
# 获取符合条件的执行ID列表
execution_ids_query = db.query(Execution.id).filter(execution_filter)
execution_ids = [row[0] for row in execution_ids_query.all()]
if not execution_ids:
return []
# 统计各节点类型的执行情况
node_stats = db.query(
ExecutionLog.node_type,
func.count(ExecutionLog.id).label('execution_count'),
func.sum(ExecutionLog.duration).label('total_duration'),
func.avg(ExecutionLog.duration).label('avg_duration'),
func.count(
case((ExecutionLog.level == 'ERROR', 1))
).label('error_count')
).filter(
and_(
ExecutionLog.execution_id.in_(execution_ids),
ExecutionLog.node_type.isnot(None),
ExecutionLog.duration.isnot(None)
)
).group_by(ExecutionLog.node_type).all()
result = []
for node_type, exec_count, total_dur, avg_dur, error_count in node_stats:
result.append({
"node_type": node_type,
"execution_count": exec_count,
"total_duration": round(total_dur or 0, 2),
"avg_duration": round(avg_dur or 0, 2),
"error_count": error_count,
"success_rate": round((exec_count - error_count) / exec_count * 100, 2) if exec_count > 0 else 0
})
return result
@staticmethod
def get_recent_activities(
db: Session,
user_id: str = None,
limit: int = 10
) -> List[Dict[str, Any]]:
"""
获取最近的活动记录
Args:
db: 数据库会话
user_id: 用户ID如果提供则只统计该用户的数据
limit: 返回数量限制
Returns:
最近活动列表
"""
# 构建查询条件
execution_filter = True
if user_id:
execution_filter = Execution.workflow_id.in_(
db.query(Workflow.id).filter(Workflow.user_id == user_id)
)
# 获取最近的执行记录
recent_executions = db.query(Execution).filter(
execution_filter
).order_by(Execution.created_at.desc()).limit(limit).all()
result = []
for execution in recent_executions:
workflow = db.query(Workflow).filter(
Workflow.id == execution.workflow_id
).first() if execution.workflow_id else None
result.append({
"id": execution.id,
"type": "execution",
"workflow_name": workflow.name if workflow else "未知工作流",
"status": execution.status,
"created_at": execution.created_at.isoformat() if execution.created_at else None,
"execution_time": execution.execution_time
})
return result