Files
aiagent/backend/app/tasks/workflow_tasks.py
2026-01-19 00:09:36 +08:00

114 lines
3.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
工作流任务
"""
from celery import Task
from app.core.celery_app import celery_app
from app.services.workflow_engine import WorkflowEngine
from app.services.execution_logger import ExecutionLogger
from app.services.alert_service import AlertService
from app.core.database import SessionLocal
# 导入所有相关模型,确保关系可以正确解析
from app.models.execution import Execution
from app.models.agent import Agent
from app.models.workflow import Workflow
import asyncio
import time
@celery_app.task(bind=True)
def execute_workflow_task(
self,
execution_id: str,
workflow_id: str,
workflow_data: dict,
input_data: dict
):
"""
执行工作流任务
Args:
execution_id: 执行记录ID
workflow_id: 工作流ID
workflow_data: 工作流数据nodes和edges
input_data: 输入数据
"""
db = SessionLocal()
start_time = time.time()
execution_logger = None
try:
# 更新执行状态为运行中
execution = db.query(Execution).filter(Execution.id == execution_id).first()
if execution:
execution.status = "running"
db.commit()
# 更新任务状态
self.update_state(state='PROGRESS', meta={'progress': 0, 'status': 'running'})
# 创建执行日志记录器
execution_logger = ExecutionLogger(execution_id, db)
execution_logger.info("工作流任务开始执行")
# 创建工作流引擎传入logger和db
engine = WorkflowEngine(workflow_id, workflow_data, logger=execution_logger, db=db)
# 执行工作流(异步)
result = asyncio.run(engine.execute(input_data))
# 计算执行时间
execution_time = int((time.time() - start_time) * 1000)
# 更新执行记录
if execution:
execution.status = "completed"
execution.output_data = result
execution.execution_time = execution_time
db.commit()
# 记录执行完成日志
execution_logger.info(f"工作流任务执行完成,耗时: {execution_time}ms")
# 检查告警规则(异步)
if execution:
try:
asyncio.run(AlertService.check_alerts_for_execution(db, execution))
except Exception as e:
# 告警检测失败不影响执行结果
execution_logger.warn(f"告警检测失败: {str(e)}")
return {
'status': 'completed',
'result': result,
'execution_time': execution_time
}
except Exception as e:
execution_time = int((time.time() - start_time) * 1000)
# 记录错误日志
if execution_logger:
execution_logger.error(f"工作流任务执行失败: {str(e)}", data={"error_type": type(e).__name__})
# 更新执行记录为失败
execution = db.query(Execution).filter(Execution.id == execution_id).first()
if execution:
execution.status = "failed"
execution.error_message = str(e)
execution.execution_time = execution_time
db.commit()
# 检查告警规则(异步)
if execution:
try:
asyncio.run(AlertService.check_alerts_for_execution(db, execution))
except Exception as e2:
# 告警检测失败不影响错误处理
if execution_logger:
execution_logger.warn(f"告警检测失败: {str(e2)}")
raise
finally:
db.close()