""" 执行日志服务 """ from typing import Dict, Any, Optional from datetime import datetime from sqlalchemy.orm import Session from app.models.execution_log import ExecutionLog import logging logger = logging.getLogger(__name__) class ExecutionLogger: """执行日志记录器""" def __init__(self, execution_id: str, db: Session): """ 初始化日志记录器 Args: execution_id: 执行ID db: 数据库会话 """ self.execution_id = execution_id self.db = db def log( self, level: str, message: str, node_id: Optional[str] = None, node_type: Optional[str] = None, data: Optional[Dict[str, Any]] = None, duration: Optional[int] = None ): """ 记录日志 Args: level: 日志级别 (INFO/WARN/ERROR/DEBUG) message: 日志消息 node_id: 节点ID(可选) node_type: 节点类型(可选) data: 附加数据(可选) duration: 执行耗时(毫秒,可选) """ try: log_entry = ExecutionLog( execution_id=self.execution_id, node_id=node_id, node_type=node_type, level=level.upper(), message=message, data=data, duration=duration, timestamp=datetime.utcnow() ) self.db.add(log_entry) self.db.commit() # 同时输出到标准日志 log_method = getattr(logger, level.lower(), logger.info) log_msg = f"[执行 {self.execution_id}]" if node_id: log_msg += f" [节点 {node_id}]" log_msg += f" {message}" log_method(log_msg) except Exception as e: # 如果数据库记录失败,至少输出到标准日志 logger.error(f"记录执行日志失败: {str(e)}") logger.error(f"[执行 {self.execution_id}] {message}") def info(self, message: str, **kwargs): """记录INFO级别日志""" self.log("INFO", message, **kwargs) def warn(self, message: str, **kwargs): """记录WARN级别日志""" self.log("WARN", message, **kwargs) def error(self, message: str, **kwargs): """记录ERROR级别日志""" self.log("ERROR", message, **kwargs) def debug(self, message: str, **kwargs): """记录DEBUG级别日志""" self.log("DEBUG", message, **kwargs) def log_node_start(self, node_id: str, node_type: str, input_data: Optional[Dict[str, Any]] = None): """记录节点开始执行""" self.info( f"节点 {node_id} ({node_type}) 开始执行", node_id=node_id, node_type=node_type, data={"input": input_data} if input_data else None ) def log_node_complete(self, node_id: str, node_type: str, output_data: Optional[Dict[str, Any]] = None, duration: Optional[int] = None): """记录节点执行完成""" self.info( f"节点 {node_id} ({node_type}) 执行完成", node_id=node_id, node_type=node_type, data={"output": output_data} if output_data else None, duration=duration ) def log_node_error(self, node_id: str, node_type: str, error: Exception, duration: Optional[int] = None): """记录节点执行错误""" self.error( f"节点 {node_id} ({node_type}) 执行失败: {str(error)}", node_id=node_id, node_type=node_type, data={"error": str(error), "error_type": type(error).__name__}, duration=duration )