""" 执行日志API """ from fastapi import APIRouter, Depends, Query, HTTPException from sqlalchemy.orm import Session from pydantic import BaseModel from typing import List, Optional, Dict, Any from datetime import datetime from app.core.database import get_db from app.models.execution_log import ExecutionLog from app.models.execution import Execution from app.models.workflow import Workflow from app.models.agent import Agent from app.api.auth import get_current_user from app.models.user import User from app.core.exceptions import NotFoundError import json import logging logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/v1/execution-logs", tags=["execution-logs"]) class ExecutionLogResponse(BaseModel): """执行日志响应模型""" id: str execution_id: str node_id: Optional[str] node_type: Optional[str] level: str message: str data: Optional[dict] timestamp: datetime duration: Optional[int] class Config: from_attributes = True @router.get("/executions/{execution_id}", response_model=List[ExecutionLogResponse]) async def get_execution_logs( execution_id: str, level: Optional[str] = Query(None, description="日志级别筛选: INFO/WARN/ERROR/DEBUG"), node_id: Optional[str] = Query(None, description="节点ID筛选"), skip: int =Query(0, ge=0), limit: int = Query(100, ge=1, le=1000), db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """获取执行日志列表""" from app.models.agent import Agent # 验证执行记录是否存在 execution = db.query(Execution).filter(Execution.id == execution_id).first() if not execution: raise NotFoundError("执行记录", execution_id) # 验证权限:检查workflow或agent的所有权 has_permission = False if execution.workflow_id: workflow = db.query(Workflow).filter(Workflow.id == execution.workflow_id).first() if workflow and workflow.user_id == current_user.id: has_permission = True elif execution.agent_id: agent = db.query(Agent).filter(Agent.id == execution.agent_id).first() if agent and (agent.user_id == current_user.id or agent.status in ["published", "running"]): has_permission = True if not has_permission: raise NotFoundError("执行记录", execution_id) # 构建查询 query = db.query(ExecutionLog).filter( ExecutionLog.execution_id == execution_id ) # 日志级别筛选 if level: query = query.filter(ExecutionLog.level == level.upper()) # 节点ID筛选 if node_id: query = query.filter(ExecutionLog.node_id == node_id) # 排序和分页 logs = query.order_by(ExecutionLog.timestamp.asc()).offset(skip).limit(limit).all() return logs @router.get("/executions/{execution_id}/summary") async def get_execution_log_summary( execution_id: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """获取执行日志摘要(统计信息)""" # 验证执行记录是否存在 execution = db.query(Execution).filter(Execution.id == execution_id).first() if not execution: raise NotFoundError("执行记录", execution_id) # 验证权限:检查workflow或agent的所有权 has_permission = False if execution.workflow_id: workflow = db.query(Workflow).filter(Workflow.id == execution.workflow_id).first() if workflow and workflow.user_id == current_user.id: has_permission = True elif execution.agent_id: agent = db.query(Agent).filter(Agent.id == execution.agent_id).first() if agent and (agent.user_id == current_user.id or agent.status in ["published", "running"]): has_permission = True if not has_permission: raise NotFoundError("执行记录", execution_id) # 统计各级别日志数量 from sqlalchemy import func level_stats = db.query( ExecutionLog.level, func.count(ExecutionLog.id).label('count') ).filter( ExecutionLog.execution_id == execution_id ).group_by(ExecutionLog.level).all() # 统计节点执行情况 node_stats = db.query( ExecutionLog.node_id, ExecutionLog.node_type, func.count(ExecutionLog.id).label('log_count'), func.sum(ExecutionLog.duration).label('total_duration') ).filter( ExecutionLog.execution_id == execution_id, ExecutionLog.node_id.isnot(None) ).group_by(ExecutionLog.node_id, ExecutionLog.node_type).all() # 获取错误日志 error_logs = db.query(ExecutionLog).filter( ExecutionLog.execution_id == execution_id, ExecutionLog.level == 'ERROR' ).order_by(ExecutionLog.timestamp.desc()).limit(10).all() return { "level_stats": {level: count for level, count in level_stats}, "node_stats": [ { "node_id": node_id, "node_type": node_type, "log_count": log_count, "total_duration": total_duration } for node_id, node_type, log_count, total_duration in node_stats ], "error_logs": [ { "id": log.id, "node_id": log.node_id, "message": log.message, "timestamp": log.timestamp, "data": log.data } for log in error_logs ], "total_logs": db.query(func.count(ExecutionLog.id)).filter( ExecutionLog.execution_id == execution_id ).scalar() } @router.get("/executions/{execution_id}/performance") async def get_execution_performance( execution_id: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """获取执行性能分析数据""" # 验证执行记录是否存在 execution = db.query(Execution).filter(Execution.id == execution_id).first() if not execution: raise NotFoundError("执行记录", execution_id) # 验证权限:检查workflow或agent的所有权 has_permission = False if execution.workflow_id: workflow = db.query(Workflow).filter(Workflow.id == execution.workflow_id).first() if workflow and workflow.user_id == current_user.id: has_permission = True elif execution.agent_id: agent = db.query(Agent).filter(Agent.id == execution.agent_id).first() if agent and (agent.user_id == current_user.id or agent.status in ["published", "running"]): has_permission = True if not has_permission: raise NotFoundError("执行记录", execution_id) from sqlalchemy import func # 获取总执行时间 total_execution_time = execution.execution_time or 0 # 统计各节点执行时间(按节点ID分组) node_performance = db.query( ExecutionLog.node_id, ExecutionLog.node_type, func.sum(ExecutionLog.duration).label('total_duration'), func.avg(ExecutionLog.duration).label('avg_duration'), func.min(ExecutionLog.duration).label('min_duration'), func.max(ExecutionLog.duration).label('max_duration'), func.count(ExecutionLog.id).label('execution_count') ).filter( ExecutionLog.execution_id == execution_id, ExecutionLog.node_id.isnot(None), ExecutionLog.duration.isnot(None) ).group_by(ExecutionLog.node_id, ExecutionLog.node_type).all() # 按节点类型统计 type_performance = db.query( ExecutionLog.node_type, func.sum(ExecutionLog.duration).label('total_duration'), func.avg(ExecutionLog.duration).label('avg_duration'), func.count(ExecutionLog.id).label('execution_count') ).filter( ExecutionLog.execution_id == execution_id, ExecutionLog.node_type.isnot(None), ExecutionLog.duration.isnot(None) ).group_by(ExecutionLog.node_type).all() # 获取执行时间线(按时间顺序) timeline_logs = db.query(ExecutionLog).filter( ExecutionLog.execution_id == execution_id, ExecutionLog.duration.isnot(None), ExecutionLog.node_id.isnot(None) ).order_by(ExecutionLog.timestamp.asc()).all() return { "total_execution_time": total_execution_time, "node_performance": [ { "node_id": node_id, "node_type": node_type, "total_duration": int(total_duration or 0), "avg_duration": float(avg_duration or 0), "min_duration": int(min_duration or 0), "max_duration": int(max_duration or 0), "execution_count": int(execution_count or 0) } for node_id, node_type, total_duration, avg_duration, min_duration, max_duration, execution_count in node_performance ], "type_performance": [ { "node_type": node_type, "total_duration": int(total_duration or 0), "avg_duration": float(avg_duration or 0), "execution_count": int(execution_count or 0) } for node_type, total_duration, avg_duration, execution_count in type_performance ], "timeline": [ { "timestamp": log.timestamp.isoformat() if log.timestamp else None, "node_id": log.node_id, "node_type": log.node_type, "duration": log.duration, "message": log.message } for log in timeline_logs ] } @router.get("/executions/{execution_id}/nodes/{node_id}/data") async def get_node_execution_data( execution_id: str, node_id: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """ 获取特定节点的执行数据(输入和输出) 从执行日志中提取节点的输入和输出数据 """ # 验证执行记录是否存在 execution = db.query(Execution).filter(Execution.id == execution_id).first() if not execution: raise NotFoundError("执行记录", execution_id) # 验证权限:检查workflow或agent的所有权 has_permission = False if execution.workflow_id: workflow = db.query(Workflow).filter(Workflow.id == execution.workflow_id).first() if workflow and workflow.user_id == current_user.id: has_permission = True elif execution.agent_id: agent = db.query(Agent).filter(Agent.id == execution.agent_id).first() if agent and (agent.user_id == current_user.id or agent.status in ["published", "running"]): has_permission = True if not has_permission: raise NotFoundError("执行记录", execution_id) # 查找节点的开始和完成日志 start_log = db.query(ExecutionLog).filter( ExecutionLog.execution_id == execution_id, ExecutionLog.node_id == node_id, ExecutionLog.message.like('%开始执行%') ).order_by(ExecutionLog.timestamp.asc()).first() complete_log = db.query(ExecutionLog).filter( ExecutionLog.execution_id == execution_id, ExecutionLog.node_id == node_id, ExecutionLog.message.like('%执行完成%') ).order_by(ExecutionLog.timestamp.desc()).first() input_data = None output_data = None # 从开始日志中提取输入数据 if start_log and start_log.data: input_data = start_log.data.get('input', start_log.data) # 从完成日志中提取输出数据 if complete_log and complete_log.data: output_data = complete_log.data.get('output', complete_log.data) return { "execution_id": execution_id, "node_id": node_id, "input": input_data, "output": output_data, "start_time": start_log.timestamp.isoformat() if start_log else None, "complete_time": complete_log.timestamp.isoformat() if complete_log else None, "duration": complete_log.duration if complete_log else None } @router.get("/cache/{key:path}") async def get_cache_value( key: str, current_user: User = Depends(get_current_user) ): """ 获取缓存值(记忆数据) 从Redis或内存缓存中获取指定key的值 """ try: from app.core.redis_client import get_redis_client redis_client = get_redis_client() value = None if redis_client: # 从Redis获取 try: cached_data = redis_client.get(key) if cached_data: value = json.loads(cached_data) except json.JSONDecodeError: # 如果不是JSON,直接返回字符串 value = cached_data except Exception as e: logger.warning(f"从Redis获取缓存失败: {str(e)}") if value is None: # 如果Redis中没有,返回空 raise HTTPException(status_code=404, detail=f"缓存键 '{key}' 不存在") return { "key": key, "value": value, "exists": True } except HTTPException: raise except Exception as e: logger.error(f"获取缓存值失败: {str(e)}") raise HTTPException(status_code=500, detail=f"获取缓存值失败: {str(e)}") @router.delete("/cache/{key:path}") async def delete_cache_value( key: str, current_user: User = Depends(get_current_user) ): """ 删除缓存值(记忆数据) 从Redis或内存缓存中删除指定key的值 """ try: from app.core.redis_client import get_redis_client redis_client = get_redis_client() deleted = False if redis_client: # 从Redis删除 try: result = redis_client.delete(key) deleted = result > 0 except Exception as e: logger.warning(f"从Redis删除缓存失败: {str(e)}") if not deleted: raise HTTPException(status_code=404, detail=f"缓存键 '{key}' 不存在") return { "key": key, "deleted": True, "message": "缓存已删除" } except HTTPException: raise except Exception as e: logger.error(f"删除缓存值失败: {str(e)}") raise HTTPException(status_code=500, detail=f"删除缓存值失败: {str(e)}")