Files
aiagent/backend/app/api/executions.py
renjianbo df4fab1e6e feat: Agent 批量测试、作业助手与上传预览;Windows 启动脚本与文档- 新增 run_agent_test_cases 与示例 JSON、(红头)agent测试用例文档
- 扩展 test_agent_execution(--homework、UTF-8 控制台)
- 后端:uploads 预览、file_read、工作流与对话落盘等
- 前端:AgentChatPreview 与设计器相关调整
- 忽略 redis二进制、agent_workspaces、uploads、tessdata 等本机产物

Made-with: Cursor
2026-04-13 20:17:18 +08:00

490 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
执行管理API
"""
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy import or_
from sqlalchemy.orm import Session
from pydantic import BaseModel
from typing import List, Optional, Dict, Any, Literal
from datetime import datetime
from app.core.database import get_db
from app.models.execution import Execution
from app.models.workflow import Workflow
from app.models.agent import Agent
from app.api.auth import get_current_user
from app.models.user import User
from app.tasks.workflow_tasks import execute_workflow_task, resume_workflow_task
from app.services.agent_workspace_chat_log import ensure_agent_dialogue_logged_from_db_execution
import uuid
import logging
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v1/executions", tags=["executions"])
def _enqueue_workflow_task_safe(
db: Session,
execution: Execution,
workflow_label: str,
workflow_data: dict,
input_data: dict,
) -> Execution:
"""
将工作流执行任务投递到 Celery。若 Redis/Worker 不可用,回写执行记录为 failed 并抛出 503。
"""
try:
task = execute_workflow_task.delay(
str(execution.id),
workflow_label,
workflow_data,
input_data,
)
execution.task_id = task.id
db.commit()
db.refresh(execution)
return execution
except Exception as e:
logger.exception(
"Celery 任务入队失败 execution_id=%s workflow_label=%s",
execution.id,
workflow_label,
)
msg = (
"异步任务入队失败:无法连接消息队列或 Celery Worker。"
"请确认 Redis 已启动、.env 中 REDIS_URL 正确,并已运行 Celery Worker如 celery -A app.core.celery_app worker"
f" 原始错误: {e!s}"
)
try:
execution.status = "failed"
execution.error_message = msg[:2000]
db.commit()
db.refresh(execution)
except Exception as e2:
logger.exception("回写执行失败状态时出错: %s", e2)
db.rollback()
raise HTTPException(status_code=503, detail=msg[:2000]) from e
class ExecutionCreate(BaseModel):
"""执行创建模型"""
workflow_id: Optional[str] = None
agent_id: Optional[str] = None
input_data: Dict[str, Any]
parent_execution_id: Optional[str] = None
depth: Optional[int] = 0
class ExecutionResponse(BaseModel):
"""执行响应模型"""
id: str
workflow_id: Optional[str]
agent_id: Optional[str]
input_data: Optional[Dict[str, Any]]
output_data: Optional[Dict[str, Any]]
status: str
error_message: Optional[str]
execution_time: Optional[int]
task_id: Optional[str]
parent_execution_id: Optional[str]
depth: int
pause_state: Optional[Dict[str, Any]] = None
created_at: datetime
class Config:
from_attributes = True
def _execution_to_response(ex: Execution) -> ExecutionResponse:
return ExecutionResponse(
id=str(ex.id),
workflow_id=str(ex.workflow_id) if ex.workflow_id is not None else None,
agent_id=str(ex.agent_id) if ex.agent_id is not None else None,
input_data=ex.input_data,
output_data=ex.output_data,
status=ex.status,
error_message=ex.error_message,
execution_time=ex.execution_time,
task_id=str(ex.task_id) if ex.task_id is not None else None,
parent_execution_id=(
str(ex.parent_execution_id) if ex.parent_execution_id is not None else None
),
depth=int(ex.depth) if ex.depth is not None else 0,
pause_state=ex.pause_state,
created_at=ex.created_at,
)
class ResumeExecutionBody(BaseModel):
"""恢复挂起的执行(审批)"""
decision: Literal["approved", "rejected"]
comment: Optional[str] = None
def _can_view_execution(
db: Session, current_user: User, execution: Execution
) -> bool:
if getattr(current_user, "role", None) == "admin":
return True
if execution.workflow_id:
wf = db.query(Workflow).filter(Workflow.id == execution.workflow_id).first()
if wf and wf.user_id == current_user.id:
return True
if execution.agent_id:
ag = db.query(Agent).filter(Agent.id == execution.agent_id).first()
if ag and ag.user_id == current_user.id:
return True
return False
@router.post("", response_model=ExecutionResponse, status_code=status.HTTP_201_CREATED)
async def create_execution(
execution_data: ExecutionCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""创建执行任务"""
# 验证工作流或智能体是否存在
if execution_data.workflow_id:
workflow = db.query(Workflow).filter(
Workflow.id == execution_data.workflow_id,
Workflow.user_id == current_user.id
).first()
if not workflow:
raise HTTPException(status_code=404, detail="工作流不存在")
# 创建执行记录
execution = Execution(
workflow_id=execution_data.workflow_id,
input_data=execution_data.input_data,
status="pending",
parent_execution_id=execution_data.parent_execution_id,
depth=max(0, int(execution_data.depth or 0)),
)
db.add(execution)
db.commit()
db.refresh(execution)
# 异步执行工作流
workflow_data = {
'nodes': workflow.nodes,
'edges': workflow.edges
}
ex = _enqueue_workflow_task_safe(
db,
execution,
execution_data.workflow_id,
workflow_data,
execution_data.input_data,
)
return _execution_to_response(ex)
elif execution_data.agent_id:
agent = db.query(Agent).filter(Agent.id == execution_data.agent_id).first()
if not agent:
raise HTTPException(status_code=404, detail="智能体不存在")
# 检查权限只有已发布的Agent可以执行或者所有者可以测试
if agent.status not in ["published", "running"] and agent.user_id != current_user.id:
raise HTTPException(status_code=403, detail="Agent未发布或无权执行")
# 验证工作流配置
if not agent.workflow_config or "nodes" not in agent.workflow_config or "edges" not in agent.workflow_config:
raise HTTPException(status_code=400, detail="Agent工作流配置无效")
# 创建执行记录
execution = Execution(
agent_id=execution_data.agent_id,
input_data=execution_data.input_data,
status="pending",
parent_execution_id=execution_data.parent_execution_id,
depth=max(0, int(execution_data.depth or 0)),
)
db.add(execution)
db.commit()
db.refresh(execution)
# 异步执行Agent工作流
workflow_data = {
'nodes': agent.workflow_config.get('nodes', []),
'edges': agent.workflow_config.get('edges', [])
}
# 调试:检查节点数据是否包含 api_key
logger.debug(f"[rjb] Agent工作流数据: nodes数量={len(workflow_data['nodes'])}")
for node in workflow_data['nodes']:
if node.get('type') == 'llm':
node_data = node.get('data', {})
logger.debug(f"[rjb] LLM节点: node_id={node.get('id')}, data keys={list(node_data.keys())}, api_key={'已配置' if node_data.get('api_key') else '未配置'}")
ex = _enqueue_workflow_task_safe(
db,
execution,
f"agent_{agent.id}",
workflow_data,
execution_data.input_data,
)
return _execution_to_response(ex)
else:
raise HTTPException(status_code=400, detail="必须提供workflow_id或agent_id")
@router.get("", response_model=List[ExecutionResponse])
async def get_executions(
skip: int = 0,
limit: int = 100,
workflow_id: Optional[str] = None,
agent_id: Optional[str] = None,
status: Optional[str] = None,
search: Optional[str] = None,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
获取执行记录列表(支持分页、筛选、搜索)
Args:
skip: 跳过记录数(分页)
limit: 每页记录数分页最大100
workflow_id: 工作流ID筛选
status: 状态筛选pending, running, completed, failed, awaiting_approval
search: 搜索关键词搜索执行ID、工作流ID、任务ID
"""
# 限制每页最大记录数
limit = min(limit, 100)
# 管理员可看全部;普通用户:自己拥有的工作流或 Agent 上的执行记录(含纯 Agent 执行)
if getattr(current_user, "role", None) == "admin":
query = db.query(Execution)
else:
query = (
db.query(Execution)
.outerjoin(Workflow, Execution.workflow_id == Workflow.id)
.outerjoin(Agent, Execution.agent_id == Agent.id)
.filter(
or_(
Workflow.user_id == current_user.id,
Agent.user_id == current_user.id,
)
)
)
# 工作流ID筛选
if workflow_id:
query = query.filter(Execution.workflow_id == workflow_id)
if agent_id:
query = query.filter(Execution.agent_id == agent_id)
# 状态筛选
if status:
query = query.filter(Execution.status == status)
# 搜索
if search:
search_pattern = f"%{search}%"
query = query.filter(
(Execution.id.like(search_pattern)) |
(Execution.workflow_id.like(search_pattern)) |
(Execution.agent_id.like(search_pattern)) |
(Execution.task_id.like(search_pattern))
)
# 排序和分页
executions = query.order_by(Execution.created_at.desc()).offset(skip).limit(limit).all()
return [_execution_to_response(e) for e in executions]
@router.get("/{execution_id}/status", response_model=Dict[str, Any])
async def get_execution_status(
execution_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
获取执行状态和当前执行的节点信息
用于实时显示执行进度
"""
from app.models.execution_log import ExecutionLog
from sqlalchemy import desc
execution = db.query(Execution).filter(Execution.id == execution_id).first()
if not execution:
raise HTTPException(status_code=404, detail="执行记录不存在")
# 获取最新的节点执行日志
current_node = None
executed_nodes = []
failed_nodes = []
# 无论执行状态如何,都获取节点执行日志(包括已完成和失败的执行)
# 获取最近执行的节点(正在执行或刚完成的节点)
recent_logs = db.query(ExecutionLog).filter(
ExecutionLog.execution_id == execution_id,
ExecutionLog.node_id.isnot(None)
).order_by(desc(ExecutionLog.timestamp)).limit(50).all()
logger.info(f"[rjb] 获取执行 {execution_id} 的日志,状态: {execution.status}, 日志数量: {len(recent_logs)}")
# 找出正在执行的节点(有开始日志但没有完成日志的节点)
node_status = {}
for log in recent_logs:
node_id = log.node_id
logger.debug(f"[rjb] 日志: node_id={node_id}, node_type={log.node_type}, message={log.message}, level={log.level}, data={log.data}")
if node_id not in node_status:
node_status[node_id] = {
'node_id': node_id,
'node_type': log.node_type,
'started': False,
'completed': False,
'failed': False,
'duration': None,
'error_message': None,
'error_type': None,
'timestamp': log.timestamp.isoformat() if log.timestamp else None
}
# 匹配日志消息,支持多种格式
message = log.message or ''
if '开始执行' in message or '开始' in message:
node_status[node_id]['started'] = True
logger.debug(f"[rjb] 节点 {node_id} 标记为已开始")
elif '执行完成' in message or '完成' in message:
node_status[node_id]['completed'] = True
node_status[node_id]['duration'] = log.duration
logger.debug(f"[rjb] 节点 {node_id} 标记为已完成")
elif '执行失败' in message or '失败' in message or log.level == 'ERROR':
node_status[node_id]['failed'] = True
# 从日志的 data 字段中提取错误信息
if log.data and isinstance(log.data, dict):
if 'error' in log.data:
node_status[node_id]['error_message'] = log.data.get('error')
if 'error_type' in log.data:
node_status[node_id]['error_type'] = log.data.get('error_type')
# 如果 data 中没有错误信息,尝试从 message 中提取
if not node_status[node_id]['error_message']:
# 尝试从消息中提取错误信息(格式:节点 xxx 执行失败: 错误信息)
if '执行失败:' in message:
error_msg = message.split('执行失败:')[-1].strip()
node_status[node_id]['error_message'] = error_msg
elif '失败:' in message:
error_msg = message.split('失败:')[-1].strip()
node_status[node_id]['error_message'] = error_msg
logger.debug(f"[rjb] 节点 {node_id} 标记为失败, 错误信息: {node_status[node_id]['error_message']}")
logger.info(f"[rjb] 节点状态统计: {len(node_status)} 个节点")
for node_id, status in node_status.items():
logger.debug(f"[rjb] 节点 {node_id}: started={status['started']}, completed={status['completed']}, failed={status['failed']}")
# 找出正在执行的节点(已开始但未完成且未失败)
for node_id, status in node_status.items():
if status['started'] and not status['completed'] and not status['failed']:
current_node = {
'node_id': node_id,
'node_type': status['node_type'],
'status': 'running'
}
logger.info(f"[rjb] 当前执行节点: {node_id} ({status['node_type']})")
break
# 已完成的节点
executed_nodes = [
{
'node_id': node_id,
'node_type': status['node_type'],
'status': 'completed',
'duration': status['duration']
}
for node_id, status in node_status.items()
if status['completed']
]
# 失败的节点(包含错误信息)
failed_nodes = [
{
'node_id': node_id,
'node_type': status['node_type'],
'status': 'failed',
'error_message': status.get('error_message'),
'error_type': status.get('error_type')
}
for node_id, status in node_status.items()
if status['failed']
]
logger.info(f"[rjb] 执行状态汇总: current_node={current_node}, executed={len(executed_nodes)}, failed={len(failed_nodes)}")
return {
'execution_id': execution.id,
'status': execution.status,
'current_node': current_node,
'executed_nodes': executed_nodes,
'failed_nodes': failed_nodes,
'execution_time': execution.execution_time
}
@router.post("/{execution_id}/resume", response_model=ExecutionResponse)
async def resume_execution(
execution_id: str,
body: ResumeExecutionBody,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""在审批挂起awaiting_approval后恢复执行。"""
execution = db.query(Execution).filter(Execution.id == execution_id).first()
if not execution:
raise HTTPException(status_code=404, detail="执行记录不存在")
if not _can_view_execution(db, current_user, execution):
raise HTTPException(status_code=403, detail="无权访问")
if execution.status != "awaiting_approval":
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"当前状态不可恢复: {execution.status}",
)
if not execution.pause_state:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="缺少挂起快照,无法恢复",
)
task = resume_workflow_task.delay(
str(execution.id),
body.decision,
body.comment,
)
execution.task_id = task.id
db.commit()
db.refresh(execution)
return execution
@router.get("/{execution_id}", response_model=ExecutionResponse)
async def get_execution(
execution_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""获取执行详情"""
execution = db.query(Execution).filter(Execution.id == execution_id).first()
if not execution:
raise HTTPException(status_code=404, detail="执行记录不存在")
if not _can_view_execution(db, current_user, execution):
raise HTTPException(status_code=403, detail="无权访问")
# 补救:若 Celery Worker 未带对话落盘逻辑,首次拉取已完成执行时由 API 进程写入 dialogue.md与任务内写入幂等
if execution.status == "completed":
try:
ensure_agent_dialogue_logged_from_db_execution(db, execution)
except Exception:
logger.debug("补写智能体对话 MD 跳过", exc_info=True)
return _execution_to_response(execution)