Files
aiagent/backend/app/api/execution_logs.py
2026-01-20 09:40:16 +08:00

269 lines
9.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
执行日志API
"""
from fastapi import APIRouter, Depends, Query
from sqlalchemy.orm import Session
from pydantic import BaseModel
from typing import List, Optional
from datetime import datetime
from app.core.database import get_db
from app.models.execution_log import ExecutionLog
from app.models.execution import Execution
from app.models.workflow import Workflow
from app.models.agent import Agent
from app.api.auth import get_current_user
from app.models.user import User
from app.core.exceptions import NotFoundError
router = APIRouter(prefix="/api/v1/execution-logs", tags=["execution-logs"])
class ExecutionLogResponse(BaseModel):
"""执行日志响应模型"""
id: str
execution_id: str
node_id: Optional[str]
node_type: Optional[str]
level: str
message: str
data: Optional[dict]
timestamp: datetime
duration: Optional[int]
class Config:
from_attributes = True
@router.get("/executions/{execution_id}", response_model=List[ExecutionLogResponse])
async def get_execution_logs(
execution_id: str,
level: Optional[str] = Query(None, description="日志级别筛选: INFO/WARN/ERROR/DEBUG"),
node_id: Optional[str] = Query(None, description="节点ID筛选"),
skip: int =Query(0, ge=0),
limit: int = Query(100, ge=1, le=1000),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""获取执行日志列表"""
from app.models.agent import Agent
# 验证执行记录是否存在
execution = db.query(Execution).filter(Execution.id == execution_id).first()
if not execution:
raise NotFoundError("执行记录", execution_id)
# 验证权限检查workflow或agent的所有权
has_permission = False
if execution.workflow_id:
workflow = db.query(Workflow).filter(Workflow.id == execution.workflow_id).first()
if workflow and workflow.user_id == current_user.id:
has_permission = True
elif execution.agent_id:
agent = db.query(Agent).filter(Agent.id == execution.agent_id).first()
if agent and (agent.user_id == current_user.id or agent.status in ["published", "running"]):
has_permission = True
if not has_permission:
raise NotFoundError("执行记录", execution_id)
# 构建查询
query = db.query(ExecutionLog).filter(
ExecutionLog.execution_id == execution_id
)
# 日志级别筛选
if level:
query = query.filter(ExecutionLog.level == level.upper())
# 节点ID筛选
if node_id:
query = query.filter(ExecutionLog.node_id == node_id)
# 排序和分页
logs = query.order_by(ExecutionLog.timestamp.asc()).offset(skip).limit(limit).all()
return logs
@router.get("/executions/{execution_id}/summary")
async def get_execution_log_summary(
execution_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""获取执行日志摘要(统计信息)"""
# 验证执行记录是否存在
execution = db.query(Execution).filter(Execution.id == execution_id).first()
if not execution:
raise NotFoundError("执行记录", execution_id)
# 验证权限检查workflow或agent的所有权
has_permission = False
if execution.workflow_id:
workflow = db.query(Workflow).filter(Workflow.id == execution.workflow_id).first()
if workflow and workflow.user_id == current_user.id:
has_permission = True
elif execution.agent_id:
agent = db.query(Agent).filter(Agent.id == execution.agent_id).first()
if agent and (agent.user_id == current_user.id or agent.status in ["published", "running"]):
has_permission = True
if not has_permission:
raise NotFoundError("执行记录", execution_id)
# 统计各级别日志数量
from sqlalchemy import func
level_stats = db.query(
ExecutionLog.level,
func.count(ExecutionLog.id).label('count')
).filter(
ExecutionLog.execution_id == execution_id
).group_by(ExecutionLog.level).all()
# 统计节点执行情况
node_stats = db.query(
ExecutionLog.node_id,
ExecutionLog.node_type,
func.count(ExecutionLog.id).label('log_count'),
func.sum(ExecutionLog.duration).label('total_duration')
).filter(
ExecutionLog.execution_id == execution_id,
ExecutionLog.node_id.isnot(None)
).group_by(ExecutionLog.node_id, ExecutionLog.node_type).all()
# 获取错误日志
error_logs = db.query(ExecutionLog).filter(
ExecutionLog.execution_id == execution_id,
ExecutionLog.level == 'ERROR'
).order_by(ExecutionLog.timestamp.desc()).limit(10).all()
return {
"level_stats": {level: count for level, count in level_stats},
"node_stats": [
{
"node_id": node_id,
"node_type": node_type,
"log_count": log_count,
"total_duration": total_duration
}
for node_id, node_type, log_count, total_duration in node_stats
],
"error_logs": [
{
"id": log.id,
"node_id": log.node_id,
"message": log.message,
"timestamp": log.timestamp,
"data": log.data
}
for log in error_logs
],
"total_logs": db.query(func.count(ExecutionLog.id)).filter(
ExecutionLog.execution_id == execution_id
).scalar()
}
@router.get("/executions/{execution_id}/performance")
async def get_execution_performance(
execution_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""获取执行性能分析数据"""
# 验证执行记录是否存在
execution = db.query(Execution).filter(Execution.id == execution_id).first()
if not execution:
raise NotFoundError("执行记录", execution_id)
# 验证权限检查workflow或agent的所有权
has_permission = False
if execution.workflow_id:
workflow = db.query(Workflow).filter(Workflow.id == execution.workflow_id).first()
if workflow and workflow.user_id == current_user.id:
has_permission = True
elif execution.agent_id:
agent = db.query(Agent).filter(Agent.id == execution.agent_id).first()
if agent and (agent.user_id == current_user.id or agent.status in ["published", "running"]):
has_permission = True
if not has_permission:
raise NotFoundError("执行记录", execution_id)
from sqlalchemy import func
# 获取总执行时间
total_execution_time = execution.execution_time or 0
# 统计各节点执行时间按节点ID分组
node_performance = db.query(
ExecutionLog.node_id,
ExecutionLog.node_type,
func.sum(ExecutionLog.duration).label('total_duration'),
func.avg(ExecutionLog.duration).label('avg_duration'),
func.min(ExecutionLog.duration).label('min_duration'),
func.max(ExecutionLog.duration).label('max_duration'),
func.count(ExecutionLog.id).label('execution_count')
).filter(
ExecutionLog.execution_id == execution_id,
ExecutionLog.node_id.isnot(None),
ExecutionLog.duration.isnot(None)
).group_by(ExecutionLog.node_id, ExecutionLog.node_type).all()
# 按节点类型统计
type_performance = db.query(
ExecutionLog.node_type,
func.sum(ExecutionLog.duration).label('total_duration'),
func.avg(ExecutionLog.duration).label('avg_duration'),
func.count(ExecutionLog.id).label('execution_count')
).filter(
ExecutionLog.execution_id == execution_id,
ExecutionLog.node_type.isnot(None),
ExecutionLog.duration.isnot(None)
).group_by(ExecutionLog.node_type).all()
# 获取执行时间线(按时间顺序)
timeline_logs = db.query(ExecutionLog).filter(
ExecutionLog.execution_id == execution_id,
ExecutionLog.duration.isnot(None),
ExecutionLog.node_id.isnot(None)
).order_by(ExecutionLog.timestamp.asc()).all()
return {
"total_execution_time": total_execution_time,
"node_performance": [
{
"node_id": node_id,
"node_type": node_type,
"total_duration": int(total_duration or 0),
"avg_duration": float(avg_duration or 0),
"min_duration": int(min_duration or 0),
"max_duration": int(max_duration or 0),
"execution_count": int(execution_count or 0)
}
for node_id, node_type, total_duration, avg_duration, min_duration, max_duration, execution_count in node_performance
],
"type_performance": [
{
"node_type": node_type,
"total_duration": int(total_duration or 0),
"avg_duration": float(avg_duration or 0),
"execution_count": int(execution_count or 0)
}
for node_type, total_duration, avg_duration, execution_count in type_performance
],
"timeline": [
{
"timestamp": log.timestamp.isoformat() if log.timestamp else None,
"node_id": log.node_id,
"node_type": log.node_type,
"duration": log.duration,
"message": log.message
}
for log in timeline_logs
]
}