Files
aiagent/backend/app/services/execution_logger.py
2026-01-19 00:09:36 +08:00

118 lines
3.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
执行日志服务
"""
from typing import Dict, Any, Optional
from datetime import datetime
from sqlalchemy.orm import Session
from app.models.execution_log import ExecutionLog
import logging
logger = logging.getLogger(__name__)
class ExecutionLogger:
"""执行日志记录器"""
def __init__(self, execution_id: str, db: Session):
"""
初始化日志记录器
Args:
execution_id: 执行ID
db: 数据库会话
"""
self.execution_id = execution_id
self.db = db
def log(
self,
level: str,
message: str,
node_id: Optional[str] = None,
node_type: Optional[str] = None,
data: Optional[Dict[str, Any]] = None,
duration: Optional[int] = None
):
"""
记录日志
Args:
level: 日志级别 (INFO/WARN/ERROR/DEBUG)
message: 日志消息
node_id: 节点ID可选
node_type: 节点类型(可选)
data: 附加数据(可选)
duration: 执行耗时(毫秒,可选)
"""
try:
log_entry = ExecutionLog(
execution_id=self.execution_id,
node_id=node_id,
node_type=node_type,
level=level.upper(),
message=message,
data=data,
duration=duration,
timestamp=datetime.utcnow()
)
self.db.add(log_entry)
self.db.commit()
# 同时输出到标准日志
log_method = getattr(logger, level.lower(), logger.info)
log_msg = f"[执行 {self.execution_id}]"
if node_id:
log_msg += f" [节点 {node_id}]"
log_msg += f" {message}"
log_method(log_msg)
except Exception as e:
# 如果数据库记录失败,至少输出到标准日志
logger.error(f"记录执行日志失败: {str(e)}")
logger.error(f"[执行 {self.execution_id}] {message}")
def info(self, message: str, **kwargs):
"""记录INFO级别日志"""
self.log("INFO", message, **kwargs)
def warn(self, message: str, **kwargs):
"""记录WARN级别日志"""
self.log("WARN", message, **kwargs)
def error(self, message: str, **kwargs):
"""记录ERROR级别日志"""
self.log("ERROR", message, **kwargs)
def debug(self, message: str, **kwargs):
"""记录DEBUG级别日志"""
self.log("DEBUG", message, **kwargs)
def log_node_start(self, node_id: str, node_type: str, input_data: Optional[Dict[str, Any]] = None):
"""记录节点开始执行"""
self.info(
f"节点 {node_id} ({node_type}) 开始执行",
node_id=node_id,
node_type=node_type,
data={"input": input_data} if input_data else None
)
def log_node_complete(self, node_id: str, node_type: str, output_data: Optional[Dict[str, Any]] = None, duration: Optional[int] = None):
"""记录节点执行完成"""
self.info(
f"节点 {node_id} ({node_type}) 执行完成",
node_id=node_id,
node_type=node_type,
data={"output": output_data} if output_data else None,
duration=duration
)
def log_node_error(self, node_id: str, node_type: str, error: Exception, duration: Optional[int] = None):
"""记录节点执行错误"""
self.error(
f"节点 {node_id} ({node_type}) 执行失败: {str(error)}",
node_id=node_id,
node_type=node_type,
data={"error": str(error), "error_type": type(error).__name__},
duration=duration
)