118 lines
3.8 KiB
Python
118 lines
3.8 KiB
Python
"""
|
||
执行日志服务
|
||
"""
|
||
from typing import Dict, Any, Optional
|
||
from datetime import datetime
|
||
from sqlalchemy.orm import Session
|
||
from app.models.execution_log import ExecutionLog
|
||
import logging
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class ExecutionLogger:
|
||
"""执行日志记录器"""
|
||
|
||
def __init__(self, execution_id: str, db: Session):
|
||
"""
|
||
初始化日志记录器
|
||
|
||
Args:
|
||
execution_id: 执行ID
|
||
db: 数据库会话
|
||
"""
|
||
self.execution_id = execution_id
|
||
self.db = db
|
||
|
||
def log(
|
||
self,
|
||
level: str,
|
||
message: str,
|
||
node_id: Optional[str] = None,
|
||
node_type: Optional[str] = None,
|
||
data: Optional[Dict[str, Any]] = None,
|
||
duration: Optional[int] = None
|
||
):
|
||
"""
|
||
记录日志
|
||
|
||
Args:
|
||
level: 日志级别 (INFO/WARN/ERROR/DEBUG)
|
||
message: 日志消息
|
||
node_id: 节点ID(可选)
|
||
node_type: 节点类型(可选)
|
||
data: 附加数据(可选)
|
||
duration: 执行耗时(毫秒,可选)
|
||
"""
|
||
try:
|
||
log_entry = ExecutionLog(
|
||
execution_id=self.execution_id,
|
||
node_id=node_id,
|
||
node_type=node_type,
|
||
level=level.upper(),
|
||
message=message,
|
||
data=data,
|
||
duration=duration,
|
||
timestamp=datetime.utcnow()
|
||
)
|
||
self.db.add(log_entry)
|
||
self.db.commit()
|
||
|
||
# 同时输出到标准日志
|
||
log_method = getattr(logger, level.lower(), logger.info)
|
||
log_msg = f"[执行 {self.execution_id}]"
|
||
if node_id:
|
||
log_msg += f" [节点 {node_id}]"
|
||
log_msg += f" {message}"
|
||
log_method(log_msg)
|
||
|
||
except Exception as e:
|
||
# 如果数据库记录失败,至少输出到标准日志
|
||
logger.error(f"记录执行日志失败: {str(e)}")
|
||
logger.error(f"[执行 {self.execution_id}] {message}")
|
||
|
||
def info(self, message: str, **kwargs):
|
||
"""记录INFO级别日志"""
|
||
self.log("INFO", message, **kwargs)
|
||
|
||
def warn(self, message: str, **kwargs):
|
||
"""记录WARN级别日志"""
|
||
self.log("WARN", message, **kwargs)
|
||
|
||
def error(self, message: str, **kwargs):
|
||
"""记录ERROR级别日志"""
|
||
self.log("ERROR", message, **kwargs)
|
||
|
||
def debug(self, message: str, **kwargs):
|
||
"""记录DEBUG级别日志"""
|
||
self.log("DEBUG", message, **kwargs)
|
||
|
||
def log_node_start(self, node_id: str, node_type: str, input_data: Optional[Dict[str, Any]] = None):
|
||
"""记录节点开始执行"""
|
||
self.info(
|
||
f"节点 {node_id} ({node_type}) 开始执行",
|
||
node_id=node_id,
|
||
node_type=node_type,
|
||
data={"input": input_data} if input_data else None
|
||
)
|
||
|
||
def log_node_complete(self, node_id: str, node_type: str, output_data: Optional[Dict[str, Any]] = None, duration: Optional[int] = None):
|
||
"""记录节点执行完成"""
|
||
self.info(
|
||
f"节点 {node_id} ({node_type}) 执行完成",
|
||
node_id=node_id,
|
||
node_type=node_type,
|
||
data={"output": output_data} if output_data else None,
|
||
duration=duration
|
||
)
|
||
|
||
def log_node_error(self, node_id: str, node_type: str, error: Exception, duration: Optional[int] = None):
|
||
"""记录节点执行错误"""
|
||
self.error(
|
||
f"节点 {node_id} ({node_type}) 执行失败: {str(error)}",
|
||
node_id=node_id,
|
||
node_type=node_type,
|
||
data={"error": str(error), "error_type": type(error).__name__},
|
||
duration=duration
|
||
)
|