Files
aiagent/backend/app/api/execution_logs.py
2026-01-23 09:49:45 +08:00

423 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
执行日志API
"""
from fastapi import APIRouter, Depends, Query, HTTPException
from sqlalchemy.orm import Session
from pydantic import BaseModel
from typing import List, Optional, Dict, Any
from datetime import datetime
from app.core.database import get_db
from app.models.execution_log import ExecutionLog
from app.models.execution import Execution
from app.models.workflow import Workflow
from app.models.agent import Agent
from app.api.auth import get_current_user
from app.models.user import User
from app.core.exceptions import NotFoundError
import json
import logging
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v1/execution-logs", tags=["execution-logs"])
class ExecutionLogResponse(BaseModel):
"""执行日志响应模型"""
id: str
execution_id: str
node_id: Optional[str]
node_type: Optional[str]
level: str
message: str
data: Optional[dict]
timestamp: datetime
duration: Optional[int]
class Config:
from_attributes = True
@router.get("/executions/{execution_id}", response_model=List[ExecutionLogResponse])
async def get_execution_logs(
execution_id: str,
level: Optional[str] = Query(None, description="日志级别筛选: INFO/WARN/ERROR/DEBUG"),
node_id: Optional[str] = Query(None, description="节点ID筛选"),
skip: int =Query(0, ge=0),
limit: int = Query(100, ge=1, le=1000),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""获取执行日志列表"""
from app.models.agent import Agent
# 验证执行记录是否存在
execution = db.query(Execution).filter(Execution.id == execution_id).first()
if not execution:
raise NotFoundError("执行记录", execution_id)
# 验证权限检查workflow或agent的所有权
has_permission = False
if execution.workflow_id:
workflow = db.query(Workflow).filter(Workflow.id == execution.workflow_id).first()
if workflow and workflow.user_id == current_user.id:
has_permission = True
elif execution.agent_id:
agent = db.query(Agent).filter(Agent.id == execution.agent_id).first()
if agent and (agent.user_id == current_user.id or agent.status in ["published", "running"]):
has_permission = True
if not has_permission:
raise NotFoundError("执行记录", execution_id)
# 构建查询
query = db.query(ExecutionLog).filter(
ExecutionLog.execution_id == execution_id
)
# 日志级别筛选
if level:
query = query.filter(ExecutionLog.level == level.upper())
# 节点ID筛选
if node_id:
query = query.filter(ExecutionLog.node_id == node_id)
# 排序和分页
logs = query.order_by(ExecutionLog.timestamp.asc()).offset(skip).limit(limit).all()
return logs
@router.get("/executions/{execution_id}/summary")
async def get_execution_log_summary(
execution_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""获取执行日志摘要(统计信息)"""
# 验证执行记录是否存在
execution = db.query(Execution).filter(Execution.id == execution_id).first()
if not execution:
raise NotFoundError("执行记录", execution_id)
# 验证权限检查workflow或agent的所有权
has_permission = False
if execution.workflow_id:
workflow = db.query(Workflow).filter(Workflow.id == execution.workflow_id).first()
if workflow and workflow.user_id == current_user.id:
has_permission = True
elif execution.agent_id:
agent = db.query(Agent).filter(Agent.id == execution.agent_id).first()
if agent and (agent.user_id == current_user.id or agent.status in ["published", "running"]):
has_permission = True
if not has_permission:
raise NotFoundError("执行记录", execution_id)
# 统计各级别日志数量
from sqlalchemy import func
level_stats = db.query(
ExecutionLog.level,
func.count(ExecutionLog.id).label('count')
).filter(
ExecutionLog.execution_id == execution_id
).group_by(ExecutionLog.level).all()
# 统计节点执行情况
node_stats = db.query(
ExecutionLog.node_id,
ExecutionLog.node_type,
func.count(ExecutionLog.id).label('log_count'),
func.sum(ExecutionLog.duration).label('total_duration')
).filter(
ExecutionLog.execution_id == execution_id,
ExecutionLog.node_id.isnot(None)
).group_by(ExecutionLog.node_id, ExecutionLog.node_type).all()
# 获取错误日志
error_logs = db.query(ExecutionLog).filter(
ExecutionLog.execution_id == execution_id,
ExecutionLog.level == 'ERROR'
).order_by(ExecutionLog.timestamp.desc()).limit(10).all()
return {
"level_stats": {level: count for level, count in level_stats},
"node_stats": [
{
"node_id": node_id,
"node_type": node_type,
"log_count": log_count,
"total_duration": total_duration
}
for node_id, node_type, log_count, total_duration in node_stats
],
"error_logs": [
{
"id": log.id,
"node_id": log.node_id,
"message": log.message,
"timestamp": log.timestamp,
"data": log.data
}
for log in error_logs
],
"total_logs": db.query(func.count(ExecutionLog.id)).filter(
ExecutionLog.execution_id == execution_id
).scalar()
}
@router.get("/executions/{execution_id}/performance")
async def get_execution_performance(
execution_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""获取执行性能分析数据"""
# 验证执行记录是否存在
execution = db.query(Execution).filter(Execution.id == execution_id).first()
if not execution:
raise NotFoundError("执行记录", execution_id)
# 验证权限检查workflow或agent的所有权
has_permission = False
if execution.workflow_id:
workflow = db.query(Workflow).filter(Workflow.id == execution.workflow_id).first()
if workflow and workflow.user_id == current_user.id:
has_permission = True
elif execution.agent_id:
agent = db.query(Agent).filter(Agent.id == execution.agent_id).first()
if agent and (agent.user_id == current_user.id or agent.status in ["published", "running"]):
has_permission = True
if not has_permission:
raise NotFoundError("执行记录", execution_id)
from sqlalchemy import func
# 获取总执行时间
total_execution_time = execution.execution_time or 0
# 统计各节点执行时间按节点ID分组
node_performance = db.query(
ExecutionLog.node_id,
ExecutionLog.node_type,
func.sum(ExecutionLog.duration).label('total_duration'),
func.avg(ExecutionLog.duration).label('avg_duration'),
func.min(ExecutionLog.duration).label('min_duration'),
func.max(ExecutionLog.duration).label('max_duration'),
func.count(ExecutionLog.id).label('execution_count')
).filter(
ExecutionLog.execution_id == execution_id,
ExecutionLog.node_id.isnot(None),
ExecutionLog.duration.isnot(None)
).group_by(ExecutionLog.node_id, ExecutionLog.node_type).all()
# 按节点类型统计
type_performance = db.query(
ExecutionLog.node_type,
func.sum(ExecutionLog.duration).label('total_duration'),
func.avg(ExecutionLog.duration).label('avg_duration'),
func.count(ExecutionLog.id).label('execution_count')
).filter(
ExecutionLog.execution_id == execution_id,
ExecutionLog.node_type.isnot(None),
ExecutionLog.duration.isnot(None)
).group_by(ExecutionLog.node_type).all()
# 获取执行时间线(按时间顺序)
timeline_logs = db.query(ExecutionLog).filter(
ExecutionLog.execution_id == execution_id,
ExecutionLog.duration.isnot(None),
ExecutionLog.node_id.isnot(None)
).order_by(ExecutionLog.timestamp.asc()).all()
return {
"total_execution_time": total_execution_time,
"node_performance": [
{
"node_id": node_id,
"node_type": node_type,
"total_duration": int(total_duration or 0),
"avg_duration": float(avg_duration or 0),
"min_duration": int(min_duration or 0),
"max_duration": int(max_duration or 0),
"execution_count": int(execution_count or 0)
}
for node_id, node_type, total_duration, avg_duration, min_duration, max_duration, execution_count in node_performance
],
"type_performance": [
{
"node_type": node_type,
"total_duration": int(total_duration or 0),
"avg_duration": float(avg_duration or 0),
"execution_count": int(execution_count or 0)
}
for node_type, total_duration, avg_duration, execution_count in type_performance
],
"timeline": [
{
"timestamp": log.timestamp.isoformat() if log.timestamp else None,
"node_id": log.node_id,
"node_type": log.node_type,
"duration": log.duration,
"message": log.message
}
for log in timeline_logs
]
}
@router.get("/executions/{execution_id}/nodes/{node_id}/data")
async def get_node_execution_data(
execution_id: str,
node_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
获取特定节点的执行数据(输入和输出)
从执行日志中提取节点的输入和输出数据
"""
# 验证执行记录是否存在
execution = db.query(Execution).filter(Execution.id == execution_id).first()
if not execution:
raise NotFoundError("执行记录", execution_id)
# 验证权限检查workflow或agent的所有权
has_permission = False
if execution.workflow_id:
workflow = db.query(Workflow).filter(Workflow.id == execution.workflow_id).first()
if workflow and workflow.user_id == current_user.id:
has_permission = True
elif execution.agent_id:
agent = db.query(Agent).filter(Agent.id == execution.agent_id).first()
if agent and (agent.user_id == current_user.id or agent.status in ["published", "running"]):
has_permission = True
if not has_permission:
raise NotFoundError("执行记录", execution_id)
# 查找节点的开始和完成日志
start_log = db.query(ExecutionLog).filter(
ExecutionLog.execution_id == execution_id,
ExecutionLog.node_id == node_id,
ExecutionLog.message.like('%开始执行%')
).order_by(ExecutionLog.timestamp.asc()).first()
complete_log = db.query(ExecutionLog).filter(
ExecutionLog.execution_id == execution_id,
ExecutionLog.node_id == node_id,
ExecutionLog.message.like('%执行完成%')
).order_by(ExecutionLog.timestamp.desc()).first()
input_data = None
output_data = None
# 从开始日志中提取输入数据
if start_log and start_log.data:
input_data = start_log.data.get('input', start_log.data)
# 从完成日志中提取输出数据
if complete_log and complete_log.data:
output_data = complete_log.data.get('output', complete_log.data)
return {
"execution_id": execution_id,
"node_id": node_id,
"input": input_data,
"output": output_data,
"start_time": start_log.timestamp.isoformat() if start_log else None,
"complete_time": complete_log.timestamp.isoformat() if complete_log else None,
"duration": complete_log.duration if complete_log else None
}
@router.get("/cache/{key:path}")
async def get_cache_value(
key: str,
current_user: User = Depends(get_current_user)
):
"""
获取缓存值(记忆数据)
从Redis或内存缓存中获取指定key的值
"""
try:
from app.core.redis_client import get_redis_client
redis_client = get_redis_client()
value = None
if redis_client:
# 从Redis获取
try:
cached_data = redis_client.get(key)
if cached_data:
value = json.loads(cached_data)
except json.JSONDecodeError:
# 如果不是JSON直接返回字符串
value = cached_data
except Exception as e:
logger.warning(f"从Redis获取缓存失败: {str(e)}")
if value is None:
# 如果Redis中没有返回空
raise HTTPException(status_code=404, detail=f"缓存键 '{key}' 不存在")
return {
"key": key,
"value": value,
"exists": True
}
except HTTPException:
raise
except Exception as e:
logger.error(f"获取缓存值失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"获取缓存值失败: {str(e)}")
@router.delete("/cache/{key:path}")
async def delete_cache_value(
key: str,
current_user: User = Depends(get_current_user)
):
"""
删除缓存值(记忆数据)
从Redis或内存缓存中删除指定key的值
"""
try:
from app.core.redis_client import get_redis_client
redis_client = get_redis_client()
deleted = False
if redis_client:
# 从Redis删除
try:
result = redis_client.delete(key)
deleted = result > 0
except Exception as e:
logger.warning(f"从Redis删除缓存失败: {str(e)}")
if not deleted:
raise HTTPException(status_code=404, detail=f"缓存键 '{key}' 不存在")
return {
"key": key,
"deleted": True,
"message": "缓存已删除"
}
except HTTPException:
raise
except Exception as e:
logger.error(f"删除缓存值失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"删除缓存值失败: {str(e)}")