Files
aiagent/backend/app/api/executions.py
2026-01-19 00:09:36 +08:00

350 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
执行管理API
"""
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from pydantic import BaseModel
from typing import List, Optional, Dict, Any
from datetime import datetime
from app.core.database import get_db
from app.models.execution import Execution
from app.models.workflow import Workflow
from app.models.agent import Agent
from app.api.auth import get_current_user
from app.models.user import User
from app.services.workflow_engine import WorkflowEngine
from app.tasks.workflow_tasks import execute_workflow_task
import uuid
import logging
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v1/executions", tags=["executions"])
class ExecutionCreate(BaseModel):
"""执行创建模型"""
workflow_id: Optional[str] = None
agent_id: Optional[str] = None
input_data: Dict[str, Any]
class ExecutionResponse(BaseModel):
"""执行响应模型"""
id: str
workflow_id: Optional[str]
agent_id: Optional[str]
input_data: Optional[Dict[str, Any]]
output_data: Optional[Dict[str, Any]]
status: str
error_message: Optional[str]
execution_time: Optional[int]
task_id: Optional[str]
created_at: datetime
class Config:
from_attributes = True
@router.post("", response_model=ExecutionResponse, status_code=status.HTTP_201_CREATED)
async def create_execution(
execution_data: ExecutionCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""创建执行任务"""
# 验证工作流或智能体是否存在
if execution_data.workflow_id:
workflow = db.query(Workflow).filter(
Workflow.id == execution_data.workflow_id,
Workflow.user_id == current_user.id
).first()
if not workflow:
raise HTTPException(status_code=404, detail="工作流不存在")
# 创建执行记录
execution = Execution(
workflow_id=execution_data.workflow_id,
input_data=execution_data.input_data,
status="pending"
)
db.add(execution)
db.commit()
db.refresh(execution)
# 异步执行工作流
workflow_data = {
'nodes': workflow.nodes,
'edges': workflow.edges
}
task = execute_workflow_task.delay(
str(execution.id),
execution_data.workflow_id,
workflow_data,
execution_data.input_data
)
# 更新执行记录的task_id
execution.task_id = task.id
db.commit()
db.refresh(execution)
return execution
elif execution_data.agent_id:
agent = db.query(Agent).filter(Agent.id == execution_data.agent_id).first()
if not agent:
raise HTTPException(status_code=404, detail="智能体不存在")
# 检查权限只有已发布的Agent可以执行或者所有者可以测试
if agent.status not in ["published", "running"] and agent.user_id != current_user.id:
raise HTTPException(status_code=403, detail="Agent未发布或无权执行")
# 验证工作流配置
if not agent.workflow_config or "nodes" not in agent.workflow_config or "edges" not in agent.workflow_config:
raise HTTPException(status_code=400, detail="Agent工作流配置无效")
# 创建执行记录
execution = Execution(
agent_id=execution_data.agent_id,
input_data=execution_data.input_data,
status="pending"
)
db.add(execution)
db.commit()
db.refresh(execution)
# 异步执行Agent工作流
workflow_data = {
'nodes': agent.workflow_config.get('nodes', []),
'edges': agent.workflow_config.get('edges', [])
}
# 调试:检查节点数据是否包含 api_key
logger.debug(f"[rjb] Agent工作流数据: nodes数量={len(workflow_data['nodes'])}")
for node in workflow_data['nodes']:
if node.get('type') == 'llm':
node_data = node.get('data', {})
logger.debug(f"[rjb] LLM节点: node_id={node.get('id')}, data keys={list(node_data.keys())}, api_key={'已配置' if node_data.get('api_key') else '未配置'}")
task = execute_workflow_task.delay(
str(execution.id),
f"agent_{agent.id}", # 使用agent ID作为workflow_id标识
workflow_data,
execution_data.input_data
)
# 更新执行记录的task_id
execution.task_id = task.id
db.commit()
db.refresh(execution)
return execution
else:
raise HTTPException(status_code=400, detail="必须提供workflow_id或agent_id")
@router.get("", response_model=List[ExecutionResponse])
async def get_executions(
skip: int = 0,
limit: int = 100,
workflow_id: Optional[str] = None,
status: Optional[str] = None,
search: Optional[str] = None,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
获取执行记录列表(支持分页、筛选、搜索)
Args:
skip: 跳过记录数(分页)
limit: 每页记录数分页最大100
workflow_id: 工作流ID筛选
status: 状态筛选pending, running, completed, failed
search: 搜索关键词搜索执行ID、工作流ID、任务ID
"""
# 限制每页最大记录数
limit = min(limit, 100)
# 构建基础查询:只查询当前用户的工作流/智能体的执行记录
query = db.query(Execution).join(Workflow, Execution.workflow_id == Workflow.id).filter(
Workflow.user_id == current_user.id
)
# 工作流ID筛选
if workflow_id:
query = query.filter(Execution.workflow_id == workflow_id)
# 状态筛选
if status:
query = query.filter(Execution.status == status)
# 搜索
if search:
search_pattern = f"%{search}%"
query = query.filter(
(Execution.id.like(search_pattern)) |
(Execution.workflow_id.like(search_pattern)) |
(Execution.task_id.like(search_pattern))
)
# 排序和分页
executions = query.order_by(Execution.created_at.desc()).offset(skip).limit(limit).all()
return executions
@router.get("/{execution_id}/status", response_model=Dict[str, Any])
async def get_execution_status(
execution_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
获取执行状态和当前执行的节点信息
用于实时显示执行进度
"""
from app.models.execution_log import ExecutionLog
from sqlalchemy import desc
execution = db.query(Execution).filter(Execution.id == execution_id).first()
if not execution:
raise HTTPException(status_code=404, detail="执行记录不存在")
# 获取最新的节点执行日志
current_node = None
executed_nodes = []
failed_nodes = []
# 无论执行状态如何,都获取节点执行日志(包括已完成和失败的执行)
# 获取最近执行的节点(正在执行或刚完成的节点)
recent_logs = db.query(ExecutionLog).filter(
ExecutionLog.execution_id == execution_id,
ExecutionLog.node_id.isnot(None)
).order_by(desc(ExecutionLog.timestamp)).limit(50).all()
logger.info(f"[rjb] 获取执行 {execution_id} 的日志,状态: {execution.status}, 日志数量: {len(recent_logs)}")
# 找出正在执行的节点(有开始日志但没有完成日志的节点)
node_status = {}
for log in recent_logs:
node_id = log.node_id
logger.debug(f"[rjb] 日志: node_id={node_id}, node_type={log.node_type}, message={log.message}, level={log.level}, data={log.data}")
if node_id not in node_status:
node_status[node_id] = {
'node_id': node_id,
'node_type': log.node_type,
'started': False,
'completed': False,
'failed': False,
'duration': None,
'error_message': None,
'error_type': None,
'timestamp': log.timestamp.isoformat() if log.timestamp else None
}
# 匹配日志消息,支持多种格式
message = log.message or ''
if '开始执行' in message or '开始' in message:
node_status[node_id]['started'] = True
logger.debug(f"[rjb] 节点 {node_id} 标记为已开始")
elif '执行完成' in message or '完成' in message:
node_status[node_id]['completed'] = True
node_status[node_id]['duration'] = log.duration
logger.debug(f"[rjb] 节点 {node_id} 标记为已完成")
elif '执行失败' in message or '失败' in message or log.level == 'ERROR':
node_status[node_id]['failed'] = True
# 从日志的 data 字段中提取错误信息
if log.data and isinstance(log.data, dict):
if 'error' in log.data:
node_status[node_id]['error_message'] = log.data.get('error')
if 'error_type' in log.data:
node_status[node_id]['error_type'] = log.data.get('error_type')
# 如果 data 中没有错误信息,尝试从 message 中提取
if not node_status[node_id]['error_message']:
# 尝试从消息中提取错误信息(格式:节点 xxx 执行失败: 错误信息)
if '执行失败:' in message:
error_msg = message.split('执行失败:')[-1].strip()
node_status[node_id]['error_message'] = error_msg
elif '失败:' in message:
error_msg = message.split('失败:')[-1].strip()
node_status[node_id]['error_message'] = error_msg
logger.debug(f"[rjb] 节点 {node_id} 标记为失败, 错误信息: {node_status[node_id]['error_message']}")
logger.info(f"[rjb] 节点状态统计: {len(node_status)} 个节点")
for node_id, status in node_status.items():
logger.debug(f"[rjb] 节点 {node_id}: started={status['started']}, completed={status['completed']}, failed={status['failed']}")
# 找出正在执行的节点(已开始但未完成且未失败)
for node_id, status in node_status.items():
if status['started'] and not status['completed'] and not status['failed']:
current_node = {
'node_id': node_id,
'node_type': status['node_type'],
'status': 'running'
}
logger.info(f"[rjb] 当前执行节点: {node_id} ({status['node_type']})")
break
# 已完成的节点
executed_nodes = [
{
'node_id': node_id,
'node_type': status['node_type'],
'status': 'completed',
'duration': status['duration']
}
for node_id, status in node_status.items()
if status['completed']
]
# 失败的节点(包含错误信息)
failed_nodes = [
{
'node_id': node_id,
'node_type': status['node_type'],
'status': 'failed',
'error_message': status.get('error_message'),
'error_type': status.get('error_type')
}
for node_id, status in node_status.items()
if status['failed']
]
logger.info(f"[rjb] 执行状态汇总: current_node={current_node}, executed={len(executed_nodes)}, failed={len(failed_nodes)}")
return {
'execution_id': execution.id,
'status': execution.status,
'current_node': current_node,
'executed_nodes': executed_nodes,
'failed_nodes': failed_nodes,
'execution_time': execution.execution_time
}
@router.get("/{execution_id}", response_model=ExecutionResponse)
async def get_execution(
execution_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""获取执行详情"""
execution = db.query(Execution).filter(Execution.id == execution_id).first()
if not execution:
raise HTTPException(status_code=404, detail="执行记录不存在")
# 验证权限
if execution.workflow_id:
workflow = db.query(Workflow).filter(Workflow.id == execution.workflow_id).first()
if workflow and workflow.user_id != current_user.id:
raise HTTPException(status_code=403, detail="无权访问")
return execution