diff --git a/aiagent项目架构文档.md b/aiagent项目架构文档.md new file mode 100644 index 0000000..30f10d5 --- /dev/null +++ b/aiagent项目架构文档.md @@ -0,0 +1,500 @@ +# AIAgent 低代码智能体平台 — 项目架构文档 + +## 1. 概述 + +**AIAgent** 是一个低代码智能体 (Agent) 搭建与工作流编排平台,支持通过可视化拖拽方式配置工作流节点,集成多种 AI 模型、数据源、消息中间件、办公协同工具,实现复杂业务流程的自动化执行。 + +- **前端**: Vue 3 + TypeScript + Vite + Pinia + Element Plus + Vue Flow +- **后端**: Python FastAPI + SQLAlchemy + Celery + MySQL + Redis +- **异步任务**: Celery (Redis 作为 Broker) +- **AI 模型**: 支持 OpenAI、DeepSeek、Anthropic 等多种大模型 + +--- + +## 2. 目录结构 + +``` +aiagent/ +├── frontend/ # Vue 3 前端项目 +│ ├── src/ +│ │ ├── api/index.ts # Axios 封装 + 请求/响应拦截器 +│ │ ├── router/index.ts # Vue Router 路由配置 +│ │ ├── stores/ # Pinia 状态管理 +│ │ │ ├── index.ts # Store 入口 +│ │ │ ├── user.ts # 用户认证 Store +│ │ │ ├── workflow.ts # 工作流 CRUD Store +│ │ │ ├── agent.ts # Agent CRUD Store +│ │ │ ├── execution.ts # 执行记录 Store +│ │ │ └── modelConfig.ts # 模型配置 Store +│ │ ├── views/ # 页面视图组件 +│ │ │ ├── Login.vue # 登录页 +│ │ │ ├── Home.vue # 首页 +│ │ │ ├── MainConsole.vue # 主控制台 +│ │ │ ├── WorkflowDesigner.vue # 工作流设计器 +│ │ │ ├── Agents.vue # Agent 管理 +│ │ │ ├── Executions.vue # 执行记录列表 +│ │ │ ├── ExecutionDetail.vue # 执行详情 +│ │ │ ├── ExecutionBoard.vue # 执行看板 +│ │ │ ├── DataSources.vue # 数据源管理 +│ │ │ ├── ModelConfigs.vue # 模型配置管理 +│ │ │ ├── TemplateMarket.vue # 模板市场 +│ │ │ ├── NodeTemplates.vue # 节点模板 +│ │ │ ├── Monitoring.vue # 监控面板 +│ │ │ ├── AlertRules.vue # 告警规则 +│ │ │ └── PermissionManagement.vue # 权限管理 +│ │ ├── components/ +│ │ │ ├── MainLayout.vue # 主布局组件 +│ │ │ ├── AgentChatPreview.vue # Agent 聊天预览 +│ │ │ └── WorkflowEditor/ # 工作流编辑器核心组件 +│ │ │ ├── WorkflowEditor.vue # 编辑器主组件 +│ │ │ ├── NodeTypes.ts # 自定义节点类型定义 +│ │ │ └── NodeExecutionDetail.vue # 节点执行详情弹窗 +│ │ ├── types/index.ts # TypeScript 类型定义 +│ │ └── main.ts # 应用入口 +│ ├── vite.config.ts # Vite 构建配置 +│ └── package.json +│ +├── backend/ # Python FastAPI 后端 +│ ├── app/ +│ │ ├── main.py # FastAPI 应用入口 +│ │ ├── core/ +│ │ │ ├── config.py # 配置管理 (Pydantic Settings) +│ │ │ ├── database.py # 数据库引擎 + 会话管理 +│ │ │ ├── celery_app.py # Celery 异步任务应用 +│ │ │ ├── security.py # 安全/JWT 工具 +│ │ │ ├── exceptions.py # 自定义异常 +│ │ │ ├── error_handler.py # 全局异常处理器 +│ │ │ ├── redis_client.py # Redis 客户端 +│ │ │ └── tools_bootstrap.py # 内置工具启动注册 +│ │ ├── models/ # SQLAlchemy 数据模型 +│ │ │ ├── agent.py # Agent (智能体) +│ │ │ ├── workflow.py # Workflow (工作流) +│ │ │ ├── workflow_version.py # 工作流版本 +│ │ │ ├── workflow_template.py # 工作流模板 +│ │ │ ├── execution.py # 执行记录 +│ │ │ ├── execution_log.py # 执行日志 +│ │ │ ├── model_config.py # 模型配置 (加密存储) +│ │ │ ├── tool.py # 工具定义 +│ │ │ ├── user.py # 用户 +│ │ │ ├── permission.py # 权限 +│ │ │ ├── alert_rule.py # 告警规则 +│ │ │ ├── data_source.py # 数据源 +│ │ │ ├── node_template.py # 节点模板 +│ │ │ └── persistent_user_memory.py # 持久化用户记忆 +│ │ ├── api/ # RESTful API 路由 +│ │ │ ├── auth.py # 用户认证 (JWT) +│ │ │ ├── workflows.py # 工作流 CRUD +│ │ │ ├── agents.py # Agent CRUD +│ │ │ ├── executions.py # 执行管理 +│ │ │ ├── execution_logs.py # 执行日志 +│ │ │ ├── tools.py # 工具管理 +│ │ │ ├── model_configs.py # 模型配置 +│ │ │ ├── data_sources.py # 数据源 +│ │ │ ├── uploads.py # 文件上传 +│ │ │ ├── websocket.py # WebSocket 实时推送 +│ │ │ ├── webhooks.py # Webhook 接收 +│ │ │ ├── template_market.py # 模板市场 +│ │ │ ├── platform_templates.py # 平台模板 +│ │ │ ├── node_templates.py # 节点模板 +│ │ │ ├── node_test.py # 节点测试 +│ │ │ ├── batch_operations.py # 批量操作 +│ │ │ ├── collaboration.py # 协作 +│ │ │ ├── permissions.py # 权限管理 +│ │ │ ├── monitoring.py # 监控 +│ │ │ ├── alert_rules.py # 告警规则 +│ │ │ └── batch_operations.py # 批量操作 +│ │ ├── services/ # 业务逻辑层 +│ │ │ ├── workflow_engine.py # 工作流执行引擎 (核心) +│ │ │ ├── workflow_validator.py # 工作流校验器 +│ │ │ ├── workflow_templates.py # 工作流模板 +│ │ │ ├── llm_service.py # LLM 调用服务 +│ │ │ ├── tool_registry.py # 工具注册表 +│ │ │ ├── builtin_tools.py # 内置工具实现 +│ │ │ ├── condition_parser.py # 条件表达式解析 +│ │ │ ├── data_transformer.py # 数据转换 +│ │ │ ├── execution_logger.py # 执行日志记录 +│ │ │ ├── execution_budget.py # 执行预算控制 +│ │ │ ├── encryption_service.py # 加密服务 +│ │ │ ├── persistent_memory_service.py # 持久化记忆 +│ │ │ ├── permission_service.py # 权限检查 +│ │ │ ├── monitoring_service.py # 监控服务 +│ │ │ ├── alert_service.py # 告警服务 +│ │ │ ├── data_source_connector.py # 数据源连接器 +│ │ │ ├── scene_templates.py # 场景模板 +│ │ │ └── scenario_dsl.py # 场景 DSL 解析 +│ │ ├── tasks/ # Celery 异步任务 +│ │ │ ├── workflow_tasks.py # 工作流执行任务 +│ │ │ └── agent_tasks.py # Agent 执行任务 +│ │ ├── websocket/ # WebSocket 管理 +│ │ │ ├── manager.py # WS 连接管理 +│ │ │ └── collaboration_manager.py # 协作 WS 管理 +│ │ └── utils/ # 工具函数 +│ ├── alembic/ # 数据库迁移 +│ ├── scripts/ # 脚本工具 +│ └── requirements.txt +│ +├── docker-compose.dev.yml # Docker 开发环境配置 +├── start_windows.cmd / .ps1 # Windows 启动脚本 +├── start.sh / stop.sh # Linux 启动/停止脚本 +└── *.md # 项目文档/方案/报告 +``` + +--- + +## 3. 前端架构 + +### 3.1 技术栈 + +| 组件 | 技术选型 | 用途 | +|------|---------|------| +| 框架 | Vue 3 (Composition API) | UI 框架 | +| 语言 | TypeScript | 类型安全 | +| 构建 | Vite | 开发/构建 | +| 状态管理 | Pinia | 全局状态 | +| 路由 | Vue Router 4 | 前端路由 | +| UI 库 | Element Plus | 组件库 | +| 工作流 | Vue Flow | 可视化流程图编辑 | +| HTTP | Axios | API 请求 | +| WebSocket | 原生 WebSocket | 实时推送 | + +### 3.2 路由设计 + +| 路径 | 视图 | 说明 | +|------|------|------| +| `/login` | Login.vue | 登录页 | +| `/` | Home.vue | 首页仪表盘 | +| `/console` | MainConsole.vue | 主控制台 | +| `/workflow/:id?` | WorkflowDesigner.vue | 工作流编辑器 (创建/编辑) | +| `/agents` | Agents.vue | Agent 管理列表 | +| `/agents/:id/design` | WorkflowDesigner.vue | Agent 工作流设计 | +| `/executions` | Executions.vue | 执行记录列表 | +| `/executions/:id` | ExecutionDetail.vue | 执行详情 | +| `/execution-board` | ExecutionBoard.vue | 执行看板 | +| `/data-sources` | DataSources.vue | 数据源管理 | +| `/model-configs` | ModelConfigs.vue | 模型配置 | +| `/template-market` | TemplateMarket.vue | 模板市场 | +| `/monitoring` | Monitoring.vue | 监控面板 | +| `/alert-rules` | AlertRules.vue | 告警规则 | +| `/node-templates` | NodeTemplates.vue | 节点模板 | +| `/permissions` | PermissionManagement.vue | 权限管理 (需 admin) | + +路由守卫逻辑:除登录页外均需 JWT 认证,权限页面额外检查 `admin` 角色。 + +### 3.3 状态管理 (Pinia Stores) + +- **`user`** — 用户认证、Token 管理、用户信息 +- **`workflow`** — 工作流列表/详情 CRUD、执行操作 +- **`agent`** — Agent 列表/详情 CRUD、场景模板创建 +- **`execution`** — 执行记录跟踪 +- **`modelConfig`** — 模型配置 (API Key 管理) + +### 3.4 工作流编辑器 (WorkflowEditor) + +可视化拖拽编辑器,基于 **Vue Flow** (Vue 版的 React Flow): + +- 自定义节点类型(通过 `NodeTypes.ts` 中的 `defineComponent` 定义) +- 每个节点有执行状态动画(executing/executed/failed 三态指示器) +- 支持:开始/LLM/条件/结束/默认 五种预制节点样式 +- 工具栏:保存、运行、清空、对齐、自动布局、泳道、缩放 +- 节点属性面板:支持配置 Prompt、模型选择、API Key、变量引用等 + +### 3.5 API 通信层 + +- Axios 封装 `api/index.ts` +- 自动 Token 注入 (请求拦截器) +- 统一错误处理 (响应拦截器): 401→重定向登录, 403→权限提示, 404/422/500→友好错误消息 +- 开发模式通过 Vite 代理将 `/api` 转发到后端 8037 端口 +- 生产环境自动探测后端地址 + +--- + +## 4. 后端架构 + +### 4.1 技术栈 + +| 组件 | 技术选型 | 用途 | +|------|---------|------| +| 框架 | FastAPI | Web 框架 | +| 数据库 ORM | SQLAlchemy + PyMySQL | MySQL 操作 | +| 数据库 | MySQL (腾讯云) | 持久化存储 | +| 缓存/队列 | Redis | Celery Broker + 缓存 | +| 异步任务 | Celery | 工作流/Agent 异步执行 | +| AI SDK | OpenAI Python SDK | 大模型调用 | +| 迁移 | Alembic | 数据库版本管理 | +| 加密 | cryptography (Fernet) | API Key 加密存储 | + +### 4.2 启动流程 + +1. FastAPI 应用初始化 +2. 配置 CORS 中间件 +3. 注册全局异常处理器 +4. 注册请求日志中间件 +5. **`startup` 事件**: 初始化数据库表 + 注册内置工具 +6. 注册 20 个 API 路由模块 + +### 4.3 API 设计 + +所有业务 API 以 `/api/v1/` 为前缀,通过 JWT Bearer Token 认证(`get_current_user` 依赖注入)。 + +主要路由模块: + +| 模块 | 前缀 | 主要功能 | +|------|------|---------| +| `auth` | `/api/v1/auth` | 注册、登录、用户信息 | +| `workflows` | `/api/v1/workflows` | 工作流 CRUD + 验证 + 模板 + 版本管理 | +| `agents` | `/api/v1/agents` | Agent CRUD + 场景模板创建 + 执行 | +| `executions` | `/api/v1/executions` | 执行记录 CRUD、启停、恢复 | +| `execution_logs` | `/api/v1/execution-logs` | 执行日志查询/统计 | +| `tools` | `/api/v1/tools` | 工具注册和查询 | +| `model_configs` | `/api/v1/model-configs` | 模型配置管理 (加密存储) | +| `data_sources` | `/api/v1/data-sources` | 数据源 CRUD + 连接测试 | +| `websocket` | `/api/v1/ws` | WebSocket 实时推送 | +| `webhooks` | `/api/v1/webhooks` | 外部 Webhook 接收 | +| `template_market` | `/api/v1/template-market` | 模板市场 | +| `node_test` | `/api/v1/node-test` | 节点独立测试 | +| `monitoring` | `/api/v1/monitoring` | 系统监控 | +| `alert_rules` | `/api/v1/alert-rules` | 告警规则 CRUD | +| `permissions` | `/api/v1/permissions` | RBAC 权限管理 | +| `collaboration` | `/api/v1/collaboration` | 多用户协作 | +| `batch_operations` | `/api/v1/batch-operations` | 批量操作 | +| `uploads` | `/api/v1/uploads` | 文件上传 | + +### 4.4 数据库模型 + +| 表 | 模型 | 说明 | +|----|------|------| +| `users` | User | 用户账户,含角色字段 | +| `workflows` | Workflow | 工作流定义 (nodes+edges JSON) | +| `workflow_versions` | WorkflowVersion | 版本历史,支持回滚 | +| `workflow_templates` | WorkflowTemplate | 预制工作流模板 | +| `agents` | Agent | 智能体定义 (workflow_config JSON) | +| `executions` | Execution | 执行记录,支持暂停/恢复 | +| `execution_logs` | ExecutionLog | 节点级执行日志 | +| `model_configs` | ModelConfig | 模型配置 (API Key 加密) | +| `tools` | Tool | 工具定义 | +| `data_sources` | DataSource | 外部数据源连接配置 | +| `permissions` | Permission | RBAC 权限条目 | +| `alert_rules` | AlertRule | 监控告警规则 | +| `node_templates` | NodeTemplate | 节点配置模板 | +| `persistent_user_memories` | PersistentUserMemory | 跨会话用户记忆 | + +### 4.5 核心服务详解 + +#### 4.5.1 工作流执行引擎 (`workflow_engine.py`) + +这是项目最核心的模块,约 5700+ 行代码。核心逻辑: + +**初始化**: +- 解析 nodes/edges,按 ID 建立索引 +- 设置执行预算(步数上限、LLM 调用上限、工具调用上限) + +**执行流程 (`execute` 方法)**: +1. 若包含 `scenario_dsl`,先进行 DSL 校验和标准化 +2. 按**拓扑排序**动态构建执行图 +3. 每轮循环选择"所有前驱已执行"的下一个节点 +4. 调用 `execute_node` 执行单个节点 +5. 根据节点类型处理结果(条件分支裁剪、循环控制等) +6. 遇到审批节点时挂起(`WorkflowPaused` 异常),保存快照 +7. 支持断点恢复(`resume_snapshot`) + +**执行预算熔断**: +- 单执行最大步数 (默认 2000) → 防止死循环 +- 最大 LLM 调用次数 (默认 200) → 防止失控扣费 +- 最大工具调用次数 (默认 500) → 防止工具滥用 + +**节点输入数据解析**: +- 支持 `{{variable}}` 和 `{{variable.path}}` 模板变量 +- 支持嵌套路径解析(如 `memory.conversation_history`) +- 支持 `user_input` 等别名智能映射 + +#### 4.5.2 节点类型 (30+ 种) + +| 分类 | 节点类型 | 功能 | +|------|---------|------| +| 控制流 | `start`, `end/output`, `condition`, `switch`, `merge`, `loop/foreach`, `wait` | 流程编排 | +| AI 能力 | `llm`, `template`, `code` | 智能处理 | +| 数据 | `data/transform`, `json`, `text`, `csv`, `excel`, `cache`, `vector_db` | 数据操作 | +| 存储 | `database/db`, `file/file_operation`, `object_storage`, `pdf`, `image` | 文件/DB 操作 | +| 网络 | `http/request`, `webhook`, `oauth` | HTTP 请求 | +| 消息 | `email/mail`, `message_queue/mq/rabbitmq/kafka`, `sms` | 消息通知 | +| 协同 | `slack`, `dingtalk/dingding`, `wechat_work/wecom` | 办公集成 | +| 高级 | `subworkflow/invoke_agent`, `batch`, `validator`, `approval`, `error_handler`, `log`, `schedule/delay/timer` | 高级编排 | + +其中 **LLM 节点**为核心 AI 节点: +- 通过 OpenAI 兼容 SDK 调用模型 +- 支持从 `model_config_id` 加载加密凭据 +- 支持系统提示词 + 用户提示词模板变量注入 +- 支持工具调用 (Function Calling) +- 支持 DeepSeek DSML 协议解析 +- 支持多轮对话历史注入 +- 支持用户画像提取与持久化记忆 + +#### 4.5.3 LLM 服务 (`llm_service.py`) + +- 支持 OpenAI / DeepSeek / 自定义 OpenAI 兼容接口 +- 处理流式响应和工具调用 +- DeepSeek 特有的 `reasoning_content` 思考链处理 +- 支持 DSML 协议参数提取和工具调用 +- 超时重试:可重试的网络错误自动重试 + +#### 4.5.4 工具注册表 (`tool_registry.py`) + +- 内置工具(file_read, file_write, system_info, datetime 等) +- HTTP 工具(调用外部 API) +- 工作流工具(嵌套调用其他工作流) +- 支持通过数据库动态加载自定义工具 + +#### 4.5.5 模型配置 (`model_config.py`) + +- API Key 使用 Fernet 对称加密存储于数据库 +- 执行时按需解密注入 LLM 节点 +- 校验归属:仅创建者可调用(白名单控制) + +### 4.6 Celery 异步架构 + +``` +用户请求 → FastAPI (创建工作流执行记录, status=pending) + ↓ + Redis Queue (Celery Broker) + ↓ + Celery Worker + ├── workflow_tasks.py → WorkflowEngine.execute() + └── agent_tasks.py → Agent 工作流执行 + ↓ + 实时状态推送 → Redis Pub/Sub → WebSocket → 前端 +``` + +- 每个执行任务在 Celery Worker 中异步运行 +- 执行进度通过 Redis Pub/Sub 实时推送到前端 WebSocket +- 支持任务取消、暂停/恢复 + +### 4.7 后端配置 (Settings) + +核心配置项通过环境变量 + `.env` 文件管理(`Pydantic Settings`): + +| 配置项 | 默认值 | 说明 | +|--------|--------|------| +| `DATABASE_URL` | 腾讯云 MySQL | 数据库连接 | +| `REDIS_URL` | `redis://localhost:6379/0` | Redis 连接 | +| `MEMORY_PERSIST_DB_ENABLED` | `True` | 跨会话记忆持久化 | +| `LOCAL_FILE_TOOLS_ROOT` | 项目根目录 | 文件工具沙箱 | +| `WORKFLOW_MAX_STEPS_PER_RUN` | 2000 | 执行步数上限 | +| `WORKFLOW_MAX_LLM_INVOCATIONS_PER_RUN` | 200 | LLM 调用上限 | +| `WORKFLOW_MAX_TOOL_CALLS_PER_RUN` | 500 | 工具调用上限 | +| `CORS_ORIGINS` | localhost:3000/8038 | CORS 白名单 | +| `JWT_SECRET_KEY` | dev-only | JWT 签名密钥 | +| `TESSERACT_CMD` | (空) | OCR 引擎路径 (Windows) | + +--- + +## 5. 数据流 + +### 5.1 工作流执行完整链路 + +``` +前端拖拽设计 → 保存 Workflow (nodes + edges JSON) + ↓ +用户点击"运行" → POST /api/v1/workflows/{id}/execute + ↓ +后端创建 Execution 记录 (status=pending) + ↓ +Celery 任务入队 → Worker 消费 + ↓ +WorkflowEngine.execute(input_data) + ↓ +拓扑排序构建执行顺序 + ↓ +循环: 选下一个可执行节点 + ↓ +execute_node(node, input_data) + ├── start → 透传输入 + ├── llm/template → 调用 LLM (含工具调用) + ├── condition → 条件表达式求值 → 分支裁剪 + ├── switch → 多分支路由 + ├── database → SQL 查询/写入 + ├── http → HTTP 请求外部 API + ├── email → 发送邮件 + ├── code → 沙箱执行 Python 代码 + └── ... 其他 20+ 类型 + ↓ +保存节点输出 → 继续下一节点或结束 + ↓ +更新 Execution 状态 (completed/failed) + ↓ +Redis Pub/Sub → WebSocket → 前端实时更新 +``` + +### 5.2 条件节点数据流 + +``` +条件节点收到输入 + ↓ +condition_parser 解析表达式 (如 {{score}} > 60) + ↓ +布尔结果 → true/false 分支 + ↓ +配置化裁剪: 移除不符合的边 + ↓ +后续执行不再经过被裁剪的分支 +``` + +### 5.3 审批节点 (HITL) + +``` +执行到 approval 节点 + ↓ +检查输入中是否有 __hil_decision + ↓ +有 → approved/rejected → 继续执行对应分支 +无 → 序列化快照 → 抛出 WorkflowPaused + ↓ +状态持久化为 awaiting_approval + ↓ +用户通过 API 审批 → 恢复执行 +``` + +--- + +## 6. 部署架构 + +``` + ┌─────────────┐ + │ Nginx │ (可选, 反向代理) + └──────┬──────┘ + │ + ┌────────────────┼────────────────┐ + ▼ ▼ ▼ + ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ + │ Frontend │ │ Backend │ │ Celery │ + │ Vue 3 App │ │ FastAPI │ │ Worker(s) │ + │ :8038 │ │ :8037 │ │ │ + └──────────────┘ └──────┬───────┘ └──────┬───────┘ + │ │ + ▼ ▼ + ┌──────────────┐ ┌──────────────┐ + │ MySQL │ │ Redis │ + │ (腾讯云) │ │ (本地/Docker)│ + └──────────────┘ └──────────────┘ +``` + +--- + +## 7. 安全机制 + +- **JWT 认证** — 所有 API 需 Bearer Token(登录页除外) +- **模型配置加密** — API Key 使用 Fernet 加密存储,仅创建者可调用 +- **RBAC 权限** — 支持角色级别的权限控制 +- **执行预算** — 步数/LLM 调用/工具调用三重熔断 +- **代码沙箱** — `code` 节点在受限 `builtins` 环境执行(禁止 `open`/`eval`/`__import__`) +- **文件沙箱** — 文件操作限制在 `LOCAL_FILE_TOOLS_ROOT` 目录内 +- **环境变量安全** — 敏感环境变量保护 (provider-managed) + +--- + +## 8. 可扩展性 + +- **节点类型可扩展** — `execute_node` 方法通过 `elif` 链支持 30+ 类型,新增类型只需添加分支 +- **工具可注册** — `ToolRegistry` 支持内置/HTTP/工作流三类工具,可从数据库动态加载 +- **模型可配置** — 通过 `ModelConfig` 可管理任意多组 API Key + 模型,节点运行时按需选择 +- **提供商标配** — 使用 OpenAI 兼容 SDK,可接入 OpenAI、DeepSeek、Anthropic 等 +- **模板市场** — 预置工作流模板,支持社区分享 diff --git a/backend/app/agent_runtime/__init__.py b/backend/app/agent_runtime/__init__.py new file mode 100644 index 0000000..05213fa --- /dev/null +++ b/backend/app/agent_runtime/__init__.py @@ -0,0 +1,32 @@ +""" +Agent Runtime — 自主 AI Agent 核心运行时。 + +提供 ReAct 循环驱动的自主 Agent,支持: +- 工具调用(复用已有 ToolRegistry) +- 分层记忆(工作记忆 + 长期记忆) +- 多模型(OpenAI / DeepSeek) +- 可嵌入工作流节点或独立运行 +""" +from app.agent_runtime.core import AgentRuntime +from app.agent_runtime.schemas import ( + AgentConfig, + AgentResult, + AgentLLMConfig, + AgentToolConfig, + AgentMemoryConfig, +) +from app.agent_runtime.context import AgentContext +from app.agent_runtime.memory import AgentMemory +from app.agent_runtime.tool_manager import AgentToolManager + +__all__ = [ + "AgentRuntime", + "AgentConfig", + "AgentResult", + "AgentLLMConfig", + "AgentToolConfig", + "AgentMemoryConfig", + "AgentContext", + "AgentMemory", + "AgentToolManager", +] diff --git a/backend/app/agent_runtime/context.py b/backend/app/agent_runtime/context.py new file mode 100644 index 0000000..fb832d6 --- /dev/null +++ b/backend/app/agent_runtime/context.py @@ -0,0 +1,87 @@ +""" +Agent 会话上下文管理:维护消息历史、状态追踪。 +""" +from __future__ import annotations + +import uuid +from typing import Any, Dict, List, Optional + + +class AgentContext: + """ + Agent 会话上下文: + + - 消息历史(messages 列表,OpenAI 格式) + - 会话元信息(session_id, user_id 等) + - 执行追踪(iteration 计数, 工具调用统计) + """ + + def __init__( + self, + system_prompt: str = "你是一个有用的AI助手。", + user_id: Optional[str] = None, + session_id: Optional[str] = None, + ): + self.session_id = session_id or str(uuid.uuid4()) + self.user_id = user_id + self._messages: List[Dict[str, Any]] = [] + self._system_prompt = system_prompt + # 执行状态 + self.iteration = 0 + self.tool_calls_made = 0 + + @property + def messages(self) -> List[Dict[str, Any]]: + """获取完整消息列表(含 system prompt)。""" + if self._system_prompt: + # 确保 system prompt 始终在第一条 + has_system = ( + len(self._messages) > 0 + and self._messages[0].get("role") == "system" + ) + if not has_system: + return [ + {"role": "system", "content": self._system_prompt}, + *self._messages, + ] + return self._messages + + def add_user_message(self, content: str) -> None: + """添加用户消息。""" + self._messages.append({"role": "user", "content": content}) + + def add_assistant_message( + self, + content: str, + tool_calls: Optional[List[Dict[str, Any]]] = None, + reasoning_content: Optional[str] = None, + ) -> None: + """添加助手回复。""" + msg: Dict[str, Any] = {"role": "assistant", "content": content or ""} + if tool_calls: + msg["tool_calls"] = tool_calls + if reasoning_content: + msg["reasoning_content"] = reasoning_content + self._messages.append(msg) + + def add_tool_result( + self, tool_call_id: str, tool_name: str, result: str + ) -> None: + """添加工具执行结果。""" + self._messages.append({ + "role": "tool", + "tool_call_id": tool_call_id, + "content": result, + "name": tool_name, + }) + + def set_system_prompt(self, prompt: str) -> None: + """更新 system prompt(仅在未发送过消息时有效)。""" + if not self._messages: + self._system_prompt = prompt + + def reset(self) -> None: + """重置上下文(保留 system prompt 和 session_id)。""" + self._messages = [] + self.iteration = 0 + self.tool_calls_made = 0 diff --git a/backend/app/agent_runtime/core.py b/backend/app/agent_runtime/core.py new file mode 100644 index 0000000..86a06b4 --- /dev/null +++ b/backend/app/agent_runtime/core.py @@ -0,0 +1,330 @@ +""" +Agent Runtime 核心 —— 自主 ReAct 循环。 + +流程: +1. 接收用户输入 → 追加到消息列表 +2. 调用 LLM(携带 tools schema) +3. 如果 LLM 返回工具调用 → 执行工具 → 结果追加到消息列表 → 回到 2 +4. 如果 LLM 返回文本 → 作为最终回答返回 +5. 超过 max_iterations → 强制终止 +""" +from __future__ import annotations + +import json +import logging +from typing import Any, Callable, Dict, List, Optional + +from app.agent_runtime.schemas import ( + AgentConfig, + AgentResult, +) +from app.agent_runtime.context import AgentContext +from app.agent_runtime.memory import AgentMemory +from app.agent_runtime.tool_manager import AgentToolManager + +logger = logging.getLogger(__name__) + +# 可重试的 API 异常 +_RETRYABLE_ERRORS = ( + "timed out", + "timeout", + "connection error", + "temporarily unavailable", + "server disconnected", + "rate limit", + "too many requests", + "internal server error", + "service unavailable", +) + + +class AgentRuntime: + """ + 自主 Agent 运行时。 + + 用法: + runtime = AgentRuntime(config) + result = await runtime.run("帮我写个Python脚本") + """ + + def __init__( + self, + config: Optional[AgentConfig] = None, + context: Optional[AgentContext] = None, + memory: Optional[AgentMemory] = None, + tool_manager: Optional[AgentToolManager] = None, + execution_logger: Optional[Any] = None, + on_tool_executed: Optional[Callable[[str], Any]] = None, + ): + self.config = config or AgentConfig() + self.context = context or AgentContext( + system_prompt=self.config.system_prompt, + user_id=self.config.user_id, + ) + self.memory = memory or AgentMemory( + scope_id=self.config.user_id or self.config.name, + max_history=self.config.memory.max_history_messages, + persist=self.config.memory.persist_to_db, + ) + self.tool_manager = tool_manager or AgentToolManager( + include_tools=self.config.tools.include_tools, + exclude_tools=self.config.tools.exclude_tools, + ) + self.execution_logger = execution_logger + self.on_tool_executed = on_tool_executed + self._memory_context_loaded = False + + async def run(self, user_input: str) -> AgentResult: + """ + 执行 Agent 单轮对话。 + + 流程:加载记忆 → 追加用户消息 → ReAct 循环 → 保存记忆 → 返回结果。 + """ + max_iter = max(1, self.config.llm.max_iterations) + self.context.iteration = 0 + self.context.tool_calls_made = 0 + + # 1. 首次运行时加载长期记忆到 system prompt + if not self._memory_context_loaded: + await self._inject_memory_context() + self._memory_context_loaded = True + + # 2. 追加用户消息 + self.context.add_user_message(user_input) + + # 3. ReAct 循环 + llm = _LLMClient(self.config.llm) + tool_schemas = self.tool_manager.get_tool_schemas() + has_tools = self.tool_manager.has_tools() + + while self.context.iteration < max_iter: + self.context.iteration += 1 + + # 裁剪过长历史 + messages = self.memory.trim_messages(self.context.messages) + + # 调用 LLM + try: + response = await llm.chat( + messages=messages, + tools=tool_schemas if has_tools and self.context.iteration == 1 else + (tool_schemas if has_tools else None), + iteration=self.context.iteration, + ) + except Exception as e: + err_str = str(e) + logger.error("LLM 调用失败 (iteration=%s): %s", self.context.iteration, err_str) + if self.context.iteration < max_iter and self._is_retryable(err_str): + continue + return AgentResult( + success=False, + content=f"LLM 调用失败: {err_str}", + iterations_used=self.context.iteration, + tool_calls_made=self.context.tool_calls_made, + error=err_str, + ) + + # 解析工具调用 + tool_calls = self._extract_tool_calls(response) + content = self._extract_content(response) + + if not tool_calls: + # LLM 直接返回文本 → 结束 + self.context.add_assistant_message(content) + final_text = content or "(模型未返回有效内容)" + # 保存记忆 + await self.memory.save_context(user_input, final_text) + return AgentResult( + success=True, + content=final_text, + iterations_used=self.context.iteration, + tool_calls_made=self.context.tool_calls_made, + ) + + # 有工具调用 → 先记录 assistant 消息(含 tool_calls + reasoning_content) + reasoning = getattr(response, "reasoning_content", None) or ( + response.get("reasoning_content") if isinstance(response, dict) else None + ) + self.context.add_assistant_message(content or "", tool_calls, reasoning) + if self.execution_logger: + self.execution_logger.info( + f"Agent 调用 {len(tool_calls)} 个工具", + data={"tool_calls": [tc["function"]["name"] for tc in tool_calls], + "iteration": self.context.iteration}, + ) + + # 逐一执行工具 + for tc in tool_calls: + tfn = tc.get("function", {}) + tname = tfn.get("name", "unknown") + tcid = tc.get("id", f"call_{self.context.iteration}_{self.context.tool_calls_made}") + + try: + targs = json.loads(tfn.get("arguments", "{}")) + except (json.JSONDecodeError, TypeError): + targs = {} + + logger.info("Agent 执行工具 [%s]: %s", tname, targs) + result = await self.tool_manager.execute(tname, targs) + + self.context.add_tool_result(tcid, tname, result) + self.context.tool_calls_made += 1 + + if self.on_tool_executed: + try: + await self.on_tool_executed(tname) + except Exception: + pass + + if self.execution_logger: + preview = result[:300] + "..." if len(result) > 300 else result + self.execution_logger.info( + f"工具 {tname} 执行完成", + data={"tool_name": tname, "result_preview": preview}, + ) + + # 达到最大迭代次数 + last_content = "" + for m in reversed(self.context.messages): + if m.get("role") == "assistant" and m.get("content"): + last_content = m["content"] + break + + logger.warning("Agent 达到最大迭代次数 (%s)", max_iter) + await self.memory.save_context(user_input, last_content or "(已达最大迭代次数)") + return AgentResult( + success=True, + content=last_content or "已达最大迭代次数,但模型未返回最终回答。", + truncated=True, + iterations_used=self.context.iteration, + tool_calls_made=self.context.tool_calls_made, + ) + + async def _inject_memory_context(self) -> None: + """加载长期记忆并注入 system prompt。""" + mem_text = await self.memory.initialize() + if mem_text: + enriched = ( + self.config.system_prompt.rstrip("\n") + + "\n\n" + + mem_text + ) + self.context.set_system_prompt(enriched) + logger.info("Agent 已注入长期记忆上下文") + + @staticmethod + def _extract_tool_calls(response: Any) -> List[Dict[str, Any]]: + """从 LLM 响应中提取工具调用列表。""" + if response is None: + return [] + # OpenAI SDK 格式 + if hasattr(response, "tool_calls") and response.tool_calls: + result = [] + for tc in response.tool_calls: + result.append({ + "id": tc.id, + "type": tc.type, + "function": { + "name": tc.function.name, + "arguments": tc.function.arguments, + }, + }) + return result + # 字典格式 + if isinstance(response, dict): + tc_list = response.get("tool_calls") or [] + if tc_list: + return tc_list + # 检查 content 中是否嵌入了 DSML + content = response.get("content") or "" + if "invoke" in content or "function_call" in content: + from app.services.llm_service import _parse_dsml_tool_invocations + dsml = _parse_dsml_tool_invocations(content) + if dsml: + return [ + { + "id": f"dsml-{i}", + "type": "function", + "function": { + "name": inv["name"], + "arguments": json.dumps(inv["arguments"], ensure_ascii=False), + }, + } + for i, inv in enumerate(dsml) + ] + return [] + + @staticmethod + def _extract_content(response: Any) -> str: + """从 LLM 响应中提取文本内容。""" + if response is None: + return "" + if hasattr(response, "content"): + return response.content or "" + if isinstance(response, dict): + return response.get("content") or "" + return str(response) + + @staticmethod + def _is_retryable(err_str: str) -> bool: + """判断错误是否可重试。""" + err_lower = err_str.lower() + return any(kw in err_lower for kw in _RETRYABLE_ERRORS) + + +class _LLMClient: + """轻量 LLM 客户端包装,复用已有 LLMService 能力。""" + + def __init__(self, config: Any): + from app.services.llm_service import llm_service + self._service = llm_service + self._config = config + + async def chat( + self, + messages: List[Dict[str, Any]], + tools: Optional[List[Dict[str, Any]]] = None, + iteration: int = 1, + ) -> Any: + """ + 调用 LLM。 + 优先使用 llm_service.call_openai_with_tools(支持 ReAct 的多次工具调用)。 + + 但为避免外层 ReAct 与内部 ReAct 冲突: + - 第 1 轮:使用标准 chat(无内部 ReAct),由外层 AgentRuntime 控制循环 + - 后续轮次:也使用标准 chat,仅追加工具结果 + """ + # 直接用 OpenAI/DeepSeek SDK 调用,由 AgentRuntime 控制循环 + from openai import AsyncOpenAI + from app.core.config import settings + + # 优先从配置读取,其次从 settings(.env 加载),最后 os.environ + api_key = self._config.api_key or settings.OPENAI_API_KEY or "" + base_url = self._config.base_url or settings.OPENAI_BASE_URL or "" + + if not api_key or api_key == "your-openai-api-key": + # 尝试 DeepSeek + api_key = self._config.api_key or settings.DEEPSEEK_API_KEY or "" + base_url = self._config.base_url or settings.DEEPSEEK_BASE_URL or "https://api.deepseek.com" + + if not api_key: + raise ValueError("未配置 API Key") + + client = AsyncOpenAI(api_key=api_key, base_url=base_url) + + kwargs: Dict[str, Any] = { + "model": self._config.model, + "messages": messages, + "temperature": self._config.temperature, + "timeout": self._config.request_timeout, + } + if self._config.max_tokens: + kwargs["max_tokens"] = self._config.max_tokens + if self._config.extra_body: + kwargs["extra_body"] = self._config.extra_body + if tools: + kwargs["tools"] = tools + kwargs["tool_choice"] = "auto" + + response = await client.chat.completions.create(**kwargs) + return response.choices[0].message diff --git a/backend/app/agent_runtime/memory.py b/backend/app/agent_runtime/memory.py new file mode 100644 index 0000000..26052f7 --- /dev/null +++ b/backend/app/agent_runtime/memory.py @@ -0,0 +1,135 @@ +""" +Agent 记忆管理:包装已有 persistent_memory_service,提供会话级和长期记忆。 +""" +from __future__ import annotations + +import json +import logging +from typing import Any, Dict, List, Optional +from sqlalchemy.orm import Session + +from app.core.database import SessionLocal +from app.services.persistent_memory_service import ( + load_persistent_memory, + save_persistent_memory, + persist_enabled, +) +from app.core.config import settings + +logger = logging.getLogger(__name__) + + +class AgentMemory: + """ + 分层记忆管理器: + + - 工作记忆:当前会话消息列表(由 AgentRuntime 直接管理) + - 长期记忆:从 MySQL 加载/保存的用户画像和关键事实 + - 上下文压缩:对话过长时自动裁剪或总结 + """ + + def __init__( + self, + scope_kind: str = "agent", + scope_id: Optional[str] = None, + session_key: Optional[str] = None, + persist: bool = True, + max_history: int = 20, + ): + self.scope_kind = scope_kind + self.scope_id = scope_id or "default" + self.session_key = session_key or "default_session" + self.persist = persist and persist_enabled() + self.max_history = max_history + # 从长期记忆加载的上下文(启动时加载) + self._long_term_context: Dict[str, Any] = {} + + async def initialize(self) -> str: + """ + 初始化记忆:从 DB/Redis 加载长期记忆,构造初始上下文文本。 + 返回注入 system prompt 的记忆文本块。 + """ + if not self.persist or not self.scope_id: + return "" + + db: Optional[Session] = None + try: + db = SessionLocal() + payload = load_persistent_memory( + db, self.scope_kind, self.scope_id, self.session_key + ) + if payload and isinstance(payload, dict): + self._long_term_context = payload + # 构建注入 system prompt 的记忆文本 + parts = [] + profile = payload.get("user_profile") + if profile and isinstance(profile, dict): + profile_text = json.dumps(profile, ensure_ascii=False) + parts.append(f"## 用户画像\n{profile_text}") + + context = payload.get("context") + if context and isinstance(context, dict): + ctx_text = json.dumps(context, ensure_ascii=False) + parts.append(f"## 上下文\n{ctx_text}") + + history = payload.get("conversation_history") + if history and isinstance(history, list) and len(history) > 0: + summary = self._summarize_history(history) + parts.append(f"## 历史对话摘要\n{summary}") + + if parts: + return "\n\n".join(parts) + except Exception as e: + logger.warning("加载长期记忆失败: %s", e) + finally: + if db: + db.close() + return "" + + async def save_context( + self, user_message: str, assistant_reply: str + ) -> None: + """将单轮对话保存到长期记忆。""" + if not self.persist or not self.scope_id: + return + + # 更新上下文 + ctx = self._long_term_context.get("context", {}) + ctx["last_user_message"] = user_message[:500] + ctx["last_assistant_reply"] = assistant_reply[:500] + self._long_term_context["context"] = ctx + + db: Optional[Session] = None + try: + db = SessionLocal() + save_persistent_memory( + db, self.scope_kind, self.scope_id, + self.session_key, self._long_term_context, + ) + except Exception as e: + logger.warning("保存长期记忆失败: %s", e) + finally: + if db: + db.close() + + def trim_messages(self, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """ + 裁剪消息列表:保留最近的 N 条,但始终保留第一条 system 消息。 + """ + if len(messages) <= self.max_history: + return messages + + system_msgs = [m for m in messages if m.get("role") == "system"] + other_msgs = [m for m in messages if m.get("role") != "system"] + + trimmed = other_msgs[-(self.max_history - len(system_msgs)):] + return system_msgs + trimmed + + @staticmethod + def _summarize_history(history: List[Dict[str, Any]]) -> str: + """简单汇总历史对话(不做 LLM 压缩,仅计数)。""" + turns = 0 + for m in history: + if m.get("role") == "user": + turns += 1 + return f"共 {turns} 轮历史对话(详情已存入长期记忆)" diff --git a/backend/app/agent_runtime/schemas.py b/backend/app/agent_runtime/schemas.py new file mode 100644 index 0000000..b01c795 --- /dev/null +++ b/backend/app/agent_runtime/schemas.py @@ -0,0 +1,64 @@ +""" +Agent Runtime 配置与数据结构 Schema +""" +from __future__ import annotations + +from typing import Any, Dict, List, Optional +from pydantic import BaseModel, Field + + +class AgentToolConfig(BaseModel): + """Agent 可用工具配置""" + # 若为空列表则使用全部已注册工具 + include_tools: List[str] = Field(default_factory=list, description="允许的工具名称白名单") + exclude_tools: List[str] = Field(default_factory=list, description="排除的工具名称黑名单") + + +class AgentMemoryConfig(BaseModel): + """Agent 记忆配置""" + enabled: bool = True + max_history_messages: int = 20 # 注入 LLM 的上文最大消息数 + session_key: Optional[str] = None # 会话标识,默认自动生成 + persist_to_db: bool = True # 是否写入 MySQL 长期记忆 + + +class AgentLLMConfig(BaseModel): + """Agent 模型配置""" + provider: str = "openai" # openai / deepseek + model: str = "gpt-4o-mini" + temperature: float = 0.7 + max_tokens: Optional[int] = None + api_key: Optional[str] = None + base_url: Optional[str] = None + max_iterations: int = 10 # ReAct 循环最大步数 + request_timeout: float = 120.0 + extra_body: Optional[Dict[str, Any]] = None + + +class AgentConfig(BaseModel): + """Agent 完整配置""" + name: str = "default_agent" + system_prompt: str = "你是一个有用的AI助手。请使用可用工具来帮助用户完成任务。" + llm: AgentLLMConfig = Field(default_factory=AgentLLMConfig) + tools: AgentToolConfig = Field(default_factory=AgentToolConfig) + memory: AgentMemoryConfig = Field(default_factory=AgentMemoryConfig) + user_id: Optional[str] = None + + +class AgentMessage(BaseModel): + """Agent 对话消息""" + role: str # user / assistant / tool + content: str + tool_calls: Optional[List[Dict[str, Any]]] = None + tool_call_id: Optional[str] = None + name: Optional[str] = None + + +class AgentResult(BaseModel): + """Agent 执行结果""" + success: bool = True + content: str = "" + truncated: bool = False + iterations_used: int = 0 + tool_calls_made: int = 0 + error: Optional[str] = None diff --git a/backend/app/agent_runtime/tool_manager.py b/backend/app/agent_runtime/tool_manager.py new file mode 100644 index 0000000..38c37ff --- /dev/null +++ b/backend/app/agent_runtime/tool_manager.py @@ -0,0 +1,94 @@ +""" +Agent 工具管理器:包装已有 ToolRegistry,提供 Agent 需要的工具格式转换和执行。 +""" +from __future__ import annotations + +import json +import logging +from typing import Any, Callable, Dict, List, Optional + +from app.services.tool_registry import tool_registry + +logger = logging.getLogger(__name__) + + +class AgentToolManager: + """ + 为 Agent Runtime 管理工具: + - 将 ToolRegistry 的工具 schema 转为 OpenAI Function Calling 格式 + - 按 Agent 配置过滤(白名单/黑名单) + - 执行工具调用并返回结果字符串 + """ + + def __init__(self, include_tools: Optional[List[str]] = None, + exclude_tools: Optional[List[str]] = None): + self._include_tools: set = set(include_tools or []) + self._exclude_tools: set = set(exclude_tools or []) + + def get_tool_schemas(self) -> List[Dict[str, Any]]: + """获取 Agent 可用的工具定义列表(OpenAI Function Calling 格式)。""" + all_schemas = tool_registry.get_all_tool_schemas() + if not self._include_tools and not self._exclude_tools: + return all_schemas + + filtered = [] + for schema in all_schemas: + name = self._extract_tool_name(schema) + if not name: + continue + if self._include_tools and name not in self._include_tools: + continue + if name in self._exclude_tools: + continue + filtered.append(schema) + return filtered + + def has_tools(self) -> bool: + """是否有可用工具。""" + return len(self.get_tool_schemas()) > 0 + + def tool_names(self) -> List[str]: + """可用工具名称列表。""" + return [ + self._extract_tool_name(s) or "?" + for s in self.get_tool_schemas() + ] + + async def execute(self, name: str, args: Dict[str, Any]) -> str: + """ + 执行工具调用。 + + Args: + name: 工具名称 + args: 工具参数字典 + + Returns: + 工具执行结果的字符串表示 + """ + func: Optional[Callable] = tool_registry.get_tool_function(name) + if not func: + err = f"工具 '{name}' 不存在" + logger.error(err) + return json.dumps({"error": err}, ensure_ascii=False) + + logger.info("Agent 执行工具: %s, 参数: %s", name, args) + try: + import asyncio + if asyncio.iscoroutinefunction(func): + result = await func(**args) + else: + result = func(**args) + + if isinstance(result, (dict, list)): + return json.dumps(result, ensure_ascii=False) + return str(result) + except Exception as e: + err_msg = f"工具 '{name}' 执行失败: {e}" + logger.error(err_msg, exc_info=True) + return json.dumps({"error": err_msg}, ensure_ascii=False) + + @staticmethod + def _extract_tool_name(schema: Dict[str, Any]) -> Optional[str]: + """从工具 schema 中提取工具名称。""" + fn = schema.get("function") or schema + return fn.get("name") if isinstance(fn, dict) else None diff --git a/backend/app/agent_runtime/workflow_integration.py b/backend/app/agent_runtime/workflow_integration.py new file mode 100644 index 0000000..fbcb2ac --- /dev/null +++ b/backend/app/agent_runtime/workflow_integration.py @@ -0,0 +1,115 @@ +""" +Agent Runtime ⇄ WorkflowEngine 桥接。 + +让 workflow_engine.execute_node() 通过寥寥几行调用 Agent Runtime。 +""" +from __future__ import annotations + +import logging +from typing import Any, Dict, Optional + +from app.agent_runtime.core import AgentRuntime +from app.agent_runtime.schemas import ( + AgentConfig, + AgentLLMConfig, + AgentToolConfig, +) + +logger = logging.getLogger(__name__) + + +async def run_agent_node( + node_data: Dict[str, Any], + input_data: Dict[str, Any], + execution_logger: Optional[Any] = None, + user_id: Optional[str] = None, + on_tool_executed: Optional[Any] = None, +) -> Dict[str, Any]: + """ + 在工作流中执行 Agent 节点。 + + node_data 支持的字段: + system_prompt — Agent 人格/指令(支持 {{variable}} 模板) + tools — 可选工具白名单,默认全部 + exclude_tools — 可选工具黑名单 + model — 模型名称 + provider — 提供商(openai/deepseek) + temperature — 温度 + max_iterations — ReAct 最大步数 + memory — 是否启用长期记忆 + + input_data 中的 "query" 或 "input" 字段作为用户输入。 + """ + # 1. 解析配置 + query = ( + input_data.get("query") + or input_data.get("input") + or input_data.get("text", "") + ) + if not isinstance(query, str): + query = str(query) if query else "" + + if not query: + return {"output": "错误:Agent 节点未收到用户输入", "status": "error"} + + # 2. 解析 system_prompt(支持模板变量) + raw_prompt = node_data.get("system_prompt", "你是一个有用的AI助手。") + try: + formatted_prompt = raw_prompt.format(**input_data) + except (KeyError, ValueError): + formatted_prompt = raw_prompt + + # 3. 构建 Agent 配置 + llm_config = AgentLLMConfig( + provider=node_data.get("provider", "openai"), + model=node_data.get("model", "gpt-4o-mini"), + temperature=float(node_data.get("temperature", 0.7)), + max_iterations=int(node_data.get("max_iterations", 10)), + ) + # 允许节点内联 api_key/base_url + if node_data.get("api_key"): + llm_config.api_key = node_data["api_key"] + if node_data.get("base_url"): + llm_config.base_url = node_data["base_url"] + + agent_config = AgentConfig( + name=node_data.get("label", "agent_node"), + system_prompt=formatted_prompt, + llm=llm_config, + tools=AgentToolConfig( + include_tools=node_data.get("tools", []), + exclude_tools=node_data.get("exclude_tools", []), + ), + memory={ + "enabled": node_data.get("memory", True), + "persist_to_db": node_data.get("memory", True), + }, + user_id=user_id, + ) + + # 4. 执行 Agent + runtime = AgentRuntime( + config=agent_config, + execution_logger=execution_logger, + on_tool_executed=on_tool_executed, + ) + + result = await runtime.run(query) + + # 5. 返回结果(兼容工作流引擎的输出格式) + if result.success: + return { + "output": result.content, + "status": "success", + "agent_meta": { + "iterations": result.iterations_used, + "tool_calls": result.tool_calls_made, + "truncated": result.truncated, + }, + } + else: + return { + "output": result.content, + "status": "error", + "error": result.error, + } diff --git a/backend/app/api/agent_chat.py b/backend/app/api/agent_chat.py new file mode 100644 index 0000000..f436218 --- /dev/null +++ b/backend/app/api/agent_chat.py @@ -0,0 +1,137 @@ +""" +Agent 独立聊天 API — 不依赖工作流 DAG,直接与 Agent Runtime 对话。 + +POST /api/v1/agent-chat/bare + {"message": "你好,帮我..."} + → {"content": "...", "iterations": 3, "tool_calls": 5} +""" +from __future__ import annotations + +import logging +from typing import Any, Dict, Optional +from fastapi import APIRouter, Depends, HTTPException +from pydantic import BaseModel + +from app.core.database import get_db +from sqlalchemy.orm import Session +from app.api.auth import get_current_user +from app.models.user import User +from app.models.agent import Agent +from app.agent_runtime import ( + AgentRuntime, + AgentConfig, + AgentLLMConfig, + AgentToolConfig, +) +from app.core.config import settings + +logger = logging.getLogger(__name__) +router = APIRouter(prefix="/api/v1/agent-chat", tags=["agent-chat"]) + + +class ChatRequest(BaseModel): + message: str + session_id: Optional[str] = None + model: Optional[str] = None + temperature: Optional[float] = None + max_iterations: Optional[int] = None + + +class ChatResponse(BaseModel): + content: str + iterations_used: int + tool_calls_made: int + truncated: bool + session_id: str + agent_id: Optional[str] = None + + +@router.post("/bare", response_model=ChatResponse) +async def chat_bare( + req: ChatRequest, + current_user: User = Depends(get_current_user), +): + """无需 Agent 配置,使用默认设置直接对话。""" + config = AgentConfig( + name="bare_agent", + system_prompt="你是一个有用的AI助手。请使用可用工具来帮助用户完成任务。", + llm=AgentLLMConfig( + model=req.model or ( + "gpt-4o-mini" if settings.OPENAI_API_KEY and settings.OPENAI_API_KEY != "your-openai-api-key" + else "deepseek-v4-flash" + ), + temperature=req.temperature or 0.7, + max_iterations=req.max_iterations or 10, + ), + user_id=current_user.id, + ) + runtime = AgentRuntime(config=config) + result = await runtime.run(req.message) + + return ChatResponse( + content=result.content, + iterations_used=result.iterations_used, + tool_calls_made=result.tool_calls_made, + truncated=result.truncated, + session_id=runtime.context.session_id, + ) + + +@router.post("/{agent_id}", response_model=ChatResponse) +async def chat_with_agent( + agent_id: str, + req: ChatRequest, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db), +): + """与指定的 Agent 对话。Agent 的工作流配置会用于构建 Runtime。""" + agent = db.query(Agent).filter(Agent.id == agent_id).first() + if not agent: + raise HTTPException(status_code=404, detail="Agent 不存在") + if agent.user_id and agent.user_id != current_user.id and current_user.role != "admin": + raise HTTPException(status_code=403, detail="无权访问该 Agent") + + # 从 Agent 配置构建 Runtime + wc = agent.workflow_config or {} + nodes = wc.get("nodes", []) + # 查找 agent 节点的配置(或第一个 llm 节点的配置) + agent_node_cfg = _find_agent_node_config(nodes) + + config = AgentConfig( + name=agent.name, + system_prompt=agent_node_cfg.get("system_prompt") or agent.description or "你是一个有用的AI助手。", + llm=AgentLLMConfig( + provider=agent_node_cfg.get("provider", "openai"), + model=req.model or agent_node_cfg.get("model", "gpt-4o-mini"), + temperature=req.temperature or float(agent_node_cfg.get("temperature", 0.7)), + max_iterations=req.max_iterations or int(agent_node_cfg.get("max_iterations", 10)), + ), + tools=AgentToolConfig( + include_tools=agent_node_cfg.get("tools", []), + exclude_tools=agent_node_cfg.get("exclude_tools", []), + ), + user_id=current_user.id, + ) + + runtime = AgentRuntime(config=config) + result = await runtime.run(req.message) + + return ChatResponse( + content=result.content, + iterations_used=result.iterations_used, + tool_calls_made=result.tool_calls_made, + truncated=result.truncated, + session_id=runtime.context.session_id, + agent_id=agent_id, + ) + + +def _find_agent_node_config(nodes: list) -> Dict[str, Any]: + """从工作流节点列表中查找第一个 agent 类型或 llm 类型的节点配置。""" + if not nodes: + return {} + for node in nodes: + typ = node.get("type", "") + if typ in ("agent", "llm", "template"): + return node.get("data") or {} + return {} diff --git a/backend/app/main.py b/backend/app/main.py index adca154..4b7f131 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -201,7 +201,7 @@ async def startup_event(): # 不抛出异常,允许应用继续启动 # 注册路由 -from app.api import auth, uploads, workflows, executions, websocket, execution_logs, data_sources, agents, platform_templates, model_configs, webhooks, template_market, batch_operations, collaboration, permissions, monitoring, alert_rules, node_test, node_templates, tools +from app.api import auth, uploads, workflows, executions, websocket, execution_logs, data_sources, agents, platform_templates, model_configs, webhooks, template_market, batch_operations, collaboration, permissions, monitoring, alert_rules, node_test, node_templates, tools, agent_chat app.include_router(auth.router) app.include_router(uploads.router) @@ -223,6 +223,7 @@ app.include_router(alert_rules.router) app.include_router(node_test.router) app.include_router(node_templates.router) app.include_router(tools.router) +app.include_router(agent_chat.router) if __name__ == "__main__": import uvicorn diff --git a/backend/app/services/llm_service.py b/backend/app/services/llm_service.py index 00f23fa..432a09e 100644 --- a/backend/app/services/llm_service.py +++ b/backend/app/services/llm_service.py @@ -50,6 +50,28 @@ def _is_retryable_llm_error(exc: Exception) -> bool: ) +def _assistant_message_for_tool_history(message: Any, tool_calls_dicts: List[Dict[str, Any]]) -> Dict[str, Any]: + """ + 构造写入多轮 messages 的 assistant 条目。 + DeepSeek V4 思考模式 + 工具调用:下一轮请求必须把本轮返回的 reasoning_content 原样带回, + 否则会 400 invalid_request_error。 + """ + entry: Dict[str, Any] = { + "role": "assistant", + "content": message.content, + } + if tool_calls_dicts: + entry["tool_calls"] = tool_calls_dicts + rc = getattr(message, "reasoning_content", None) + if rc is None: + extra = getattr(message, "model_extra", None) or {} + if isinstance(extra, dict): + rc = extra.get("reasoning_content") + if rc is not None: + entry["reasoning_content"] = rc + return entry + + def _extract_dsml_parameter_args(chunk: str) -> Dict[str, str]: """ DeepSeek 新版 DSML 常用「parameter」而非「invoke_arg」: @@ -635,6 +657,7 @@ class LLMService: tool_choice: Optional[str] = None, on_tool_executed: Optional[Callable[[str], Awaitable[None]]] = None, request_timeout: Optional[float] = None, + extra_body: Optional[Dict[str, Any]] = None, ) -> str: """ 调用OpenAI API,支持工具调用 @@ -685,6 +708,8 @@ class LLMService: "temperature": temperature, "max_tokens": max_tokens } + if extra_body: + create_kwargs["extra_body"] = extra_body if iteration == 0: # 转换工具格式为OpenAI格式 @@ -755,11 +780,7 @@ class LLMService: }, }) - messages.append({ - "role": "assistant", - "content": message.content, - "tool_calls": tool_calls_dicts, - }) + messages.append(_assistant_message_for_tool_history(message, tool_calls_dicts)) if not tool_calls_dicts: final_content = message.content or "" @@ -862,6 +883,7 @@ class LLMService: tool_choice: Optional[str] = None, on_tool_executed: Optional[Callable[[str], Awaitable[None]]] = None, request_timeout: Optional[float] = None, + extra_body: Optional[Dict[str, Any]] = None, ) -> str: """ 调用DeepSeek API,支持工具调用(DeepSeek兼容OpenAI API格式) @@ -880,6 +902,7 @@ class LLMService: tool_choice=tool_choice, on_tool_executed=on_tool_executed, request_timeout=request_timeout, + extra_body=extra_body, ) async def call_llm_with_tools( diff --git a/backend/app/services/workflow_engine.py b/backend/app/services/workflow_engine.py index b745fde..ed6a37c 100644 --- a/backend/app/services/workflow_engine.py +++ b/backend/app/services/workflow_engine.py @@ -983,6 +983,64 @@ class WorkflowEngine: return ctx.get("assistant_display_name") return None + def _format_prior_conversation_for_llm( + self, input_data: Dict[str, Any], original_prompt_template: str + ) -> Optional[str]: + """ + Agent 多轮对话:执行请求若携带 conversation_history,而提示词未使用 + {{memory.conversation_history}} 等占位符,则在此处拼进最终 prompt,避免模型「失忆」。 + """ + t = original_prompt_template or "" + if "memory.conversation_history" in t or re.search( + r"\{\{[^}]*conversation_history[^}]*\}\}", t + ): + return None + + hist: Any = None + if isinstance(input_data, dict): + hist = input_data.get("conversation_history") + if hist is None and isinstance(input_data.get("memory"), dict): + hist = input_data["memory"].get("conversation_history") + if hist is None and isinstance(input_data.get("right"), dict): + r = input_data["right"] + hist = r.get("conversation_history") + if hist is None and isinstance(r.get("memory"), dict): + hist = r["memory"].get("conversation_history") + + if not hist or not isinstance(hist, list): + return None + + lines: List[str] = [] + max_turns = 24 + for msg in hist[-max_turns:]: + if not isinstance(msg, dict): + continue + role = msg.get("role", "") + content = msg.get("content", "") + if content is None: + continue + if not isinstance(content, str): + content = str(content) + content = content.strip() + if not content: + continue + if role == "user": + lines.append(f"用户:{content}") + elif role in ("assistant", "agent"): + lines.append(f"助手:{content}") + else: + lines.append(f"{role}:{content}") + + if not lines: + return None + + body = "\n".join(lines) + max_chars = 12000 + if len(body) > max_chars: + body = body[-max_chars:] + "\n…(更早的对话已截断)" + + return f"【本轮之前的对话】\n{body}" + def _resolve_vector_db_query_embedding( self, input_data: Any, query_vector_config: Any ) -> Optional[List[Any]]: @@ -1605,6 +1663,8 @@ class WorkflowEngine: logger.info(f"[rjb] 使用JSON或字符串转换: user_query={user_query}") logger.info(f"[rjb] 最终提取的user_query: {user_query}") + + history_block = self._format_prior_conversation_for_llm(input_data, prompt) # 如果prompt中没有占位符,或者仍有未填充的变量,将用户输入附加到prompt is_generic_instruction = False # 初始化变量 @@ -1633,25 +1693,43 @@ class WorkflowEngine: if is_generic_instruction: # 如果是通用指令,直接使用用户输入作为prompt - formatted_prompt = str(user_query) + if history_block: + formatted_prompt = f"{history_block}\n\n{str(user_query)}" + else: + formatted_prompt = str(user_query) logger.info(f"[rjb] 检测到通用指令,直接使用用户输入作为prompt: {user_query[:50] if user_query else 'None'}") else: # 否则,将用户输入附加到prompt - formatted_prompt = f"{formatted_prompt}\n\n{user_query}" + if history_block: + formatted_prompt = f"{formatted_prompt}\n\n{history_block}\n\n{user_query}" + else: + formatted_prompt = f"{formatted_prompt}\n\n{user_query}" logger.info(f"[rjb] 非通用指令,将用户输入附加到prompt") else: # 如果没有提取到用户查询,附加整个input_data - formatted_prompt = f"{formatted_prompt}\n\n{json_module.dumps(input_data, ensure_ascii=False)}" + tail = json_module.dumps(input_data, ensure_ascii=False) + if history_block: + formatted_prompt = f"{formatted_prompt}\n\n{history_block}\n\n{tail}" + else: + formatted_prompt = f"{formatted_prompt}\n\n{tail}" elif has_unfilled_variables or re.search(r'\{\{[^}]+\}\}', formatted_prompt): # 如果有占位符但未填充,先尝试清理所有未填充的模板变量 # 使用正则表达式替换所有 {{...}} 格式的未填充变量 formatted_prompt = re.sub(r'\{\{[^}]+\}\}', '', formatted_prompt) # 如果有占位符但未填充,附加用户需求说明 if user_query: - formatted_prompt = f"{formatted_prompt}\n\n用户需求:{user_query}\n\n请根据用户需求来完成任务。" + user_tail = f"用户需求:{user_query}\n\n请根据用户需求来完成任务。" + if history_block: + formatted_prompt = f"{formatted_prompt}\n\n{history_block}\n\n{user_tail}" + else: + formatted_prompt = f"{formatted_prompt}\n\n{user_tail}" else: # 如果没有用户查询,附加整个input_data - formatted_prompt = f"{formatted_prompt}\n\n输入数据:{json_module.dumps(input_data, ensure_ascii=False)}\n\n请根据输入数据来完成任务。" + data_tail = f"输入数据:{json_module.dumps(input_data, ensure_ascii=False)}\n\n请根据输入数据来完成任务。" + if history_block: + formatted_prompt = f"{formatted_prompt}\n\n{history_block}\n\n{data_tail}" + else: + formatted_prompt = f"{formatted_prompt}\n\n{data_tail}" logger.info(f"[rjb] LLM节点prompt格式化: node_id={node_id}, original_prompt='{prompt[:50] if len(prompt) > 50 else prompt}', has_any_placeholder={has_any_placeholder}, user_query={user_query}, is_generic_instruction={is_generic_instruction}, final_prompt前200字符='{formatted_prompt[:200] if len(formatted_prompt) > 200 else formatted_prompt}'") prompt = formatted_prompt @@ -1720,6 +1798,9 @@ class WorkflowEngine: llm_extra_kw["api_key"] = api_key if base_url is not None: llm_extra_kw["base_url"] = base_url + _xb = node_data.get("extra_body") + if isinstance(_xb, dict) and _xb: + llm_extra_kw["extra_body"] = _xb # 记录实际发送给LLM的prompt logger.info(f"[rjb] 准备调用LLM: node_id={node_id}, provider={provider}, model={model}, prompt前200字符='{prompt[:200] if len(prompt) > 200 else prompt}'") @@ -1821,7 +1902,45 @@ class WorkflowEngine: 'status': 'failed', 'error': f'LLM调用失败: {str(e)}' } - + + elif node_type == 'agent': + # Agent 节点:自主 ReAct 循环,支持多步工具调用 + if self.logger: + self.logger.info( + "Agent 节点开始执行", + data={"node_id": node_id, "input": input_data}, + ) + try: + from app.agent_runtime.workflow_integration import run_agent_node + + _agent_on_tool = None + if hasattr(self, '_on_tool_executed_budget'): + _agent_on_tool = self._on_tool_executed_budget + + result = await run_agent_node( + node_data=node.get("data", {}), + input_data=input_data, + execution_logger=self.logger, + user_id=self.trusted_model_config_user_id, + on_tool_executed=_agent_on_tool, + ) + if self.logger: + duration = int((time.time() - start_time) * 1000) + self.logger.log_node_complete( + node_id, node_type, result.get("output"), duration, + ) + return result + except Exception as e: + if self.logger: + duration = int((time.time() - start_time) * 1000) + self.logger.log_node_error(node_id, node_type, e, duration) + logger.error(f"Agent 节点执行失败: {e}", exc_info=True) + return { + "output": None, + "status": "failed", + "error": f"Agent 执行失败: {e}", + } + elif node_type == 'condition': # 条件节点:判断分支(output 必须透传上游 dict,否则 sourceHandle true/false 下游只收到布尔值,丢失 reply/memory) condition = node.get('data', {}).get('condition', '') @@ -1892,6 +2011,9 @@ class WorkflowEngine: expanded_input.update(_bp) for _k in ('true', 'false', '_condition_result', '_condition_error'): expanded_input.pop(_k, None) + # 展开 left:双入边 transform 的上游一路常挂在 sourceHandle=left(另一路为 LLM/code 的 right) + if isinstance(expanded_input.get('left'), dict): + expanded_input.update(expanded_input['left']) # 展开 right:merge / json-parse 后 reply、user_profile 常在 right 或嵌套 JSON 字符串中 if isinstance(expanded_input.get('right'), dict): expanded_input.update(expanded_input['right']) @@ -2365,7 +2487,6 @@ class WorkflowEngine: try: import os - import json import base64 from pathlib import Path @@ -2806,7 +2927,6 @@ class WorkflowEngine: if queue_type == 'rabbitmq': # RabbitMQ实现 import aio_pika - import json # 获取RabbitMQ配置 host = replace_variables(node_data.get('host', 'localhost'), input_data) @@ -2885,7 +3005,6 @@ class WorkflowEngine: elif queue_type == 'kafka': # Kafka实现 from kafka import KafkaProducer - import json # 获取Kafka配置 bootstrap_servers = replace_variables(node_data.get('bootstrap_servers', 'localhost:9092'), input_data) @@ -3530,6 +3649,31 @@ class WorkflowEngine: if not isinstance(base_up, dict): base_up = {} memory['user_profile'] = {**base_up, **upd} + + hb_upd = input_data.get('homework_board_update') + if isinstance(hb_upd, str) and hb_upd.strip().startswith('{'): + try: + hb_upd = json_module.loads(hb_upd) + except Exception: + hb_upd = {} + if not isinstance(hb_upd, dict): + hb_upd = {} + if hb_upd: + ctx = memory.get('context') + if not isinstance(ctx, dict): + ctx = {} + base_hb = ctx.get('homework_board') + if not isinstance(base_hb, dict): + base_hb = {} + merged_hb = {**base_hb, **hb_upd} + new_items = hb_upd.get('items') + old_items = base_hb.get('items') + if isinstance(new_items, list) and len(new_items) > 0: + merged_hb['items'] = new_items + elif isinstance(old_items, list): + merged_hb['items'] = old_items + ctx['homework_board'] = merged_hb + memory['context'] = ctx # 确保memory中有必要的字段 if 'conversation_history' not in memory: @@ -4937,10 +5081,27 @@ class WorkflowEngine: try: if language.lower() == 'python': # 受限执行环境(禁止无 __builtins__,否则 isinstance 等不可用) - local_vars = {'input_data': input_data, 'result': None} - _code_globs = {'__builtins__': _CODE_NODE_SAFE_BUILTINS, 'hashlib': hashlib, 're': re} - exec(code, _code_globs, local_vars) - result = local_vars.get('result', local_vars.get('output', input_data)) + # 注入 loads/dumps;使用「globals == locals」同一命名空间 exec, + # 避免嵌套函数 LOAD_GLOBAL 找不到仅在 locals 里的 loads,以及 json 作用域异常。 + _code_globs = { + '__builtins__': _CODE_NODE_SAFE_BUILTINS, + 'hashlib': hashlib, + 're': re, + 'json': json, + } + shared_ns: Dict[str, Any] = dict(_code_globs) + shared_ns.update( + { + 'input_data': input_data, + 'result': None, + 'loads': json.loads, + 'dumps': json.dumps, + } + ) + exec(code, shared_ns, shared_ns) + result = shared_ns.get( + 'result', shared_ns.get('output', input_data) + ) elif language.lower() == 'javascript': # JS 执行需要外部运行时,这里仅占位 result = { diff --git a/backend/scripts/create_homework_manager_agent.py b/backend/scripts/create_homework_manager_agent.py index 0c054d3..08c2f70 100644 --- a/backend/scripts/create_homework_manager_agent.py +++ b/backend/scripts/create_homework_manager_agent.py @@ -1,14 +1,23 @@ #!/usr/bin/env python3 """ -创建或更新「学生作业管理助手」Agent:单链 Start → LLM → End。 +创建或更新「学生作业管理助手」Agent:Start → Cache 读 → Transform 合并 → LLM → Code 拆分 JSON → +Transform 拼装 → Cache 写 → 输出。 + 侧重:记录作业项、截止日、优先级;跟进完成情况;温和督促与周回顾(不代写可提交的作业正文)。 +强化:**结构化 homework_board** 写入 `memory.context.homework_board`(Redis / 持久记忆合并)。 + +「学生作业管理助手2号」(名称含 **2号** 或 `HOMEWORK_FAST_AGENT=1`)额外侧重:**更长 Redis TTL**、收紧预算与工具轮次、默认 **deepseek-v4-flash**(可通过环境变量改)、DeepSeek **`extra_body` 关闭 thinking**(更快更稳的工具链)、Code 节点兜底避免整条失败。 + +「学生作业管理助手3号」(名称含 **3号** 或 `HOMEWORK_V3=1`):**基础设施与 2 号同档**(TTL、history 上限、8192 tokens、thinking 关闭等);提示词用**完整版**并追加 **知你客服14号记忆栈**说明(`user_memory_*`、四字段记忆包、与 `agent记忆实现方案.md` 对齐)。也可用 `scripts/create_homework_manager_agent_3.py` 一键创建。 用法: cd backend && .\\venv\\Scripts\\python.exe scripts/create_homework_manager_agent.py 环境变量: PLATFORM_BASE_URL, PLATFORM_USERNAME, PLATFORM_PASSWORD - AGENT_NAME(默认 学生作业管理助手);示例:`AGENT_NAME=学生作业管理助手2号 HOMEWORK_LLM_MODEL=deepseek-v4-pro` + AGENT_NAME(默认 学生作业管理助手);2 号:`AGENT_NAME=学生作业管理助手2号`;3 号:`AGENT_NAME=学生作业管理助手3号` + HOMEWORK_FAST_AGENT=1(可选,显式启用 2 号快速档案) + HOMEWORK_V3=1(可选,显式启用 3 号档案;通常用名称含「3号」即可) HOMEWORK_LLM_PROVIDER / HOMEWORK_LLM_MODEL / HOMEWORK_LLM_TIMEOUT(可选) """ from __future__ import annotations @@ -28,6 +37,18 @@ BASE = os.getenv("PLATFORM_BASE_URL", "http://127.0.0.1:8037").rstrip("/") USER = os.getenv("PLATFORM_USERNAME", "admin") PWD = os.getenv("PLATFORM_PASSWORD", "123456") AGENT_NAME = os.getenv("AGENT_NAME", "学生作业管理助手") +FAST_PROFILE = "2号" in AGENT_NAME or os.getenv("HOMEWORK_FAST_AGENT", "").strip().lower() in ( + "1", + "true", + "yes", +) +V3_PROFILE = "3号" in AGENT_NAME or os.getenv("HOMEWORK_V3", "").strip().lower() in ( + "1", + "true", + "yes", +) +# 2 号 / 3 号共享:长 TTL、较高 max_tokens、可选关闭 thinking 等与「知你类」记忆工程对齐的基础设施 +ZHINI_STYLE_INFRA = bool(FAST_PROFILE or V3_PROFILE) PROVIDER = os.getenv( "HOMEWORK_LLM_PROVIDER", os.getenv("ENTERPRISE_LLM_PROVIDER", "deepseek") @@ -35,23 +56,104 @@ PROVIDER = os.getenv( MODEL = os.getenv( "HOMEWORK_LLM_MODEL", os.getenv("ENTERPRISE_LLM_MODEL", "deepseek-v4-flash") ) +_DEFAULT_TIMEOUT = "120" if ZHINI_STYLE_INFRA else "180" REQ_TIMEOUT = max( 30, int( os.getenv( - "HOMEWORK_LLM_TIMEOUT", os.getenv("ENTERPRISE_LLM_TIMEOUT", "180") + "HOMEWORK_LLM_TIMEOUT", + os.getenv("ENTERPRISE_LLM_TIMEOUT", _DEFAULT_TIMEOUT), ) ), ) +if ZHINI_STYLE_INFRA: + REQ_TIMEOUT = min(REQ_TIMEOUT, 150) -BUDGET_CONFIG = { - "max_steps": 80, - "max_llm_invocations": 6, - "max_tool_calls": 20, -} +BUDGET_CONFIG = ( + {"max_steps": 80, "max_llm_invocations": 6, "max_tool_calls": 16} + if ZHINI_STYLE_INFRA + else {"max_steps": 100, "max_llm_invocations": 8, "max_tool_calls": 24} +) + +_CACHE_TTL = 1209600 if ZHINI_STYLE_INFRA else 604800 +_MAX_HISTORY_LENGTH = 48 if ZHINI_STYLE_INFRA else 40 HOMEWORK_TOOLS = ["file_read", "text_analyze", "datetime", "json_process"] +CODE_SPLIT_HOMEWORK_TAIL_JSON = r""" +def _tail_json_obj(s): + if not isinstance(s, str): + return None + t = s.strip() + if not t: + return None + last_nl = t.rfind("\n") + last_line = t[last_nl + 1 :].strip() if last_nl >= 0 else t + if not last_line.startswith("{"): + return None + try: + o = loads(last_line) + return o if isinstance(o, dict) else None + except Exception: + return None + + +def _llm_text(inp): + if isinstance(inp, str): + return inp + if isinstance(inp, dict): + out = inp.get("output") + if isinstance(out, str): + return out + if isinstance(out, dict): + return str(out.get("output") or out.get("text") or out.get("content") or "") + if out is not None: + return str(out) + return str(inp) + + +try: + raw = _llm_text(input_data) + obj = _tail_json_obj(raw) + hb = {} + if obj: + hb = obj.get("homework_board") + if not isinstance(hb, dict): + hb = {} + reply_visible = raw.strip() if isinstance(raw, str) else str(raw).strip() + if obj and isinstance(raw, str): + lines = raw.splitlines() + while lines and not lines[-1].strip(): + lines.pop() + if lines and lines[-1].strip().startswith("{"): + lines.pop() + reply_visible = "\n".join(lines).strip() + result = {"reply": reply_visible, "homework_board": hb} +except Exception: + try: + _raw = _llm_text(input_data) + _reply = (_raw.strip() if isinstance(_raw, str) else str(_raw)).strip() + except Exception: + _reply = "" + result = {"reply": _reply, "homework_board": {}} +""" + +# 与 agent记忆实现方案 / 知你客服线对齐:末行 JSON 含 user_profile、禁止无视已有快照与对话 +HOMEWORK_PROMPT_ZHINI_ALIGN = """ +【与知你记忆方案对齐 · 必守】 +- 末行单行 JSON 须**完整可解析**。除 `homework_board` 外**必须**含 `user_profile`:用户若已说「我叫…」「我的名字是…」「叫我…」等,须写入 "user_profile":{"name":"…"};未获知则 "user_profile":{}。 +- 先读上方「最近对话」「作业快照」再作答:用户问「有什么作业」「我有什么语文作业」等时,若快照或对话里**已有**科目/条目,须**逐条复述**,禁止说「没有记录」「暂时没有」或逼用户从零重述,除非快照与对话确为空。 +- 防截断:表格与寒暄从简;**宁可少写修饰语也不得省略末行 JSON**;`homework_board.items` 与正文已列条数一致,禁止用空 `items` 覆盖历史条目。 +""" + +# 仅 3 号追加:显式对标知你客服 14 号 / agent记忆实现方案 中的记忆栈描述 +HOMEWORK_V3_ZHINI14_APPEND = """ +【3号 · 知你客服14号记忆方案(工程对齐)】 +- 与知你客服14号、`agent记忆实现方案.md` 一致:**Cache 键** `user_memory_{user_id}`;执行须带稳定 **`user_id`**(预览端按 Agent 维度持久化),避免退化为 `default` 串会话。 +- **记忆包四字段**:`conversation_history`、`conversation_summary`、`user_profile`、`context`;作业结构化数据在 **`context.homework_board`**(与 2 号相同);引擎对末行 JSON 的 `user_profile` 与 Cache 合并逻辑与知你主线一致。 +- **Redis + 可选 MySQL**:节点 TTL 见配置;平台开启 `MEMORY_PERSIST_DB_ENABLED` 时与 `persistent_user_memories` 对齐合并,冷启动仍可拉回。 +""" + def _homework_prompt(agent_display_name: str) -> str: return f"""你是「{agent_display_name}」,帮助学生**记作业**与**监督完成**,语气友好、具体、可执行。 @@ -72,6 +174,40 @@ def _homework_prompt(agent_display_name: str) -> str: 【交互习惯】 - 用户只说「记一下数学作业」时,主动追问截止日与具体要求(一次问 1–2 个点,避免审问感)。 - 用户汇报「做完了」时,确认是否需拍照/上传检查清单,并建议归档到下一条任务前的小结一句话。 + +【持久记忆(必须利用)】 +- 当前用户画像:{{memory.user_profile}} +- 历史摘要:{{memory.conversation_summary}} +- 最近历史:{{memory.conversation_history}} +- **已知结构化作业快照(优先以此为准,可与正文互相补充)**:{{memory.context.homework_board}} +- 回答前先结合历史判断:本轮是否在“延续上一轮作业条目”。若是,不要重复问已确认信息(如科目、截止日期)。 +- 若上一轮你已经列出作业清单,而本轮用户只补充了「截止时间/科目/完成状态」中的一部分,必须把该信息回填到上一轮清单并给出“更新后的清单”;禁止再问“具体有哪些作业”。 +- 当历史中已出现明确作业条目(如 4 条作业列表)时,默认这些条目继续有效,除非用户明确说“作业变了/重置”。 +{HOMEWORK_PROMPT_ZHINI_ALIGN} +【结构化记忆(强制 · 机器可读)】 +- 在正文结束后,**最后单独一行**输出**恰好一行**合法 JSON(勿 markdown 围栏),格式示例: +{{"homework_board":{{"subject":"语文","deadline_text":"2026-05-01","items":[{{"title":"写生字","detail":"第八课"}}],"notes":""}},"user_profile":{{}}}} +- `homework_board` 必须与正文一致;若本轮用户只补充截止日/科目,须在 `homework_board` 中**合并更新**已有 `items`(可参考上面的快照与对话),**禁止用空列表覆盖已有条目**。 +- 该行仅供系统解析;正文不要复述该行 JSON。 +""" + + +def _homework_prompt_fast(agent_display_name: str) -> str: + return f"""你是「{agent_display_name}」,帮助学生**记作业**与**跟进度**;回复简短、可执行、中文优先。 + +【持久记忆 — 先读后答】 +- 画像:{{memory.user_profile}} +- 摘要:{{memory.conversation_summary}} +- 最近对话:{{memory.conversation_history}} +- **作业快照 homework_board(优先采信,勿臆测)**:{{memory.context.homework_board}} + +【工具 — 省延迟】仅当消息里出现**上传文件的工作区路径列表**时才调用 file_read;无附件时不要调用 file_read。需要当前时间用 datetime;结构化整理可用 json_process。 + +【原则】不代写可提交正文;延续上一轮时不要重复追问已确认的科目/清单;用户只改截止日或状态时合并更新清单。 +{HOMEWORK_PROMPT_ZHINI_ALIGN} +【末行 JSON — 强制】正文结束后**单独一行**合法 JSON(勿 markdown 围栏),例如: +{{"homework_board":{{"subject":"…","deadline_text":"…","items":[{{"title":"…","detail":"…"}}],"notes":"…"}},"user_profile":{{}}}} +须与正文一致;**合并**已有 items,禁止用空列表覆盖历史条目。 """ @@ -97,32 +233,148 @@ def _sanitize_edges(edges: List[Dict[str, Any]]) -> List[Dict[str, Any]]: def build_workflow() -> Dict[str, Any]: - llm_pos: Tuple[int, int] = (380, 220) + llm_pos: Tuple[int, int] = (680, 220) + if FAST_PROFILE: + _prompt = _homework_prompt_fast(AGENT_NAME) + elif V3_PROFILE: + _prompt = _homework_prompt(AGENT_NAME) + HOMEWORK_V3_ZHINI14_APPEND + else: + _prompt = _homework_prompt(AGENT_NAME) + _llm_temp = 0.22 if FAST_PROFILE else (0.25 if V3_PROFILE else 0.3) + _llm_mti = 6 if FAST_PROFILE else (8 if V3_PROFILE else 10) + _llm_data: Dict[str, Any] = { + "label": "作业管理", + "prompt": _prompt, + "provider": PROVIDER, + "model": MODEL, + "temperature": _llm_temp, + "request_timeout": REQ_TIMEOUT, + "enable_tools": True, + "tools": list(HOMEWORK_TOOLS), + "selected_tools": list(HOMEWORK_TOOLS), + "max_tool_iterations": _llm_mti, + } + if ZHINI_STYLE_INFRA: + # 避免截断末行 JSON → homework_board / user_profile 无法落库 + _llm_data["max_tokens"] = 8192 + if ZHINI_STYLE_INFRA and PROVIDER.strip().lower() == "deepseek": + _llm_data["extra_body"] = {"thinking": {"type": "disabled"}} + nodes: List[Dict[str, Any]] = [ {"id": "start-1", "type": "start", "position": {"x": 80, "y": 220}, "data": {"label": "开始"}}, + { + "id": "cache-query", + "type": "cache", + "position": {"x": 300, "y": 220}, + "data": { + "label": "读取记忆", + "operation": "get", + "key": "user_memory_{user_id}", + "ttl": _CACHE_TTL, + "default_value": "{\"conversation_history\": [], \"conversation_summary\": \"\", \"user_profile\": {}, \"context\": {}}", + "input_variables": [], + "output_variables": [], + }, + }, + { + "id": "transform-merge", + "type": "transform", + "position": {"x": 510, "y": 220}, + "data": { + "label": "合并输入与记忆", + "mode": "merge", + "mapping": { + "query": "{{query}}", + "user_input": "{{query}}", + "user_id": "{{user_id}}", + "timestamp": "{{timestamp}}", + "attachments": "{{attachments}}", + "memory": "{{output}}", + "conversation_history": "{{output.conversation_history}}", + "user_profile": "{{output.user_profile}}", + "context": "{{output.context}}", + }, + "input_variables": [], + "output_variables": [], + }, + }, { "id": "llm-homework", "type": "llm", "position": {"x": llm_pos[0], "y": llm_pos[1]}, + "data": dict(_llm_data), + }, + { + "id": "code-split-homework-json", + "type": "code", + "position": {"x": llm_pos[0] + 260, "y": 220}, "data": { - "label": "作业管理", - "prompt": _homework_prompt(AGENT_NAME), - "provider": PROVIDER, - "model": MODEL, - "temperature": 0.3, - "request_timeout": REQ_TIMEOUT, - "enable_tools": True, - "tools": list(HOMEWORK_TOOLS), - "selected_tools": list(HOMEWORK_TOOLS), - "max_tool_iterations": 10, + "label": "拆分正文与homework_board", + "language": "python", + "code": CODE_SPLIT_HOMEWORK_TAIL_JSON, + "timeout": 20, }, }, - {"id": "end-1", "type": "end", "position": {"x": llm_pos[0] + 260, "y": 220}, "data": {"label": "结束"}}, + { + "id": "transform-build-append", + "type": "transform", + "position": {"x": llm_pos[0] + 520, "y": 220}, + "data": { + "label": "拼装记忆更新", + "mode": "merge", + "mapping": { + "query": "{{query}}", + "user_input": "{{user_input}}", + "user_id": "{{user_id}}", + "timestamp": "{{timestamp}}", + "memory": "{{memory}}", + "output": "{{reply}}", + "homework_board_update": "{{homework_board}}", + }, + }, + }, + { + "id": "cache-update-append", + "type": "cache", + "position": {"x": llm_pos[0] + 780, "y": 220}, + "data": { + "label": "写回记忆(追加)", + "operation": "set", + "key": "user_memory_{user_id}", + "ttl": _CACHE_TTL, + "max_history_length": _MAX_HISTORY_LENGTH, + "value": "{\"conversation_summary\": (memory.get(\"conversation_summary\") or \"\"), \"conversation_history\": (memory.get(\"conversation_history\") or []) + [{\"role\": \"user\", \"content\": \"{{user_input}}\", \"timestamp\": \"{{timestamp}}\"}, {\"role\": \"assistant\", \"content\": \"{{output}}\", \"timestamp\": \"{{timestamp}}\"}], \"user_profile\": memory.get(\"user_profile\", {}), \"context\": memory.get(\"context\", {})}", + "input_variables": [], + "output_variables": [], + }, + }, + { + "id": "transform-output-format", + "type": "transform", + "position": {"x": llm_pos[0] + 1040, "y": 220}, + "data": { + "label": "输出格式", + "mode": "merge", + "mapping": { + "reply": "{{output}}", + "output": "{{output}}", + "result": "{{output}}", + }, + }, + }, + {"id": "end-1", "type": "end", "position": {"x": llm_pos[0] + 1300, "y": 220}, "data": {"label": "结束", "output_format": "text"}}, ] edges = _sanitize_edges( [ - {"source": "start-1", "target": "llm-homework", "sourceHandle": "right", "targetHandle": "left"}, - {"source": "llm-homework", "target": "end-1", "sourceHandle": "right", "targetHandle": "left"}, + {"source": "start-1", "target": "cache-query", "sourceHandle": "right", "targetHandle": "left"}, + {"source": "cache-query", "target": "transform-merge", "sourceHandle": "right", "targetHandle": "left"}, + {"source": "transform-merge", "target": "llm-homework", "sourceHandle": "right", "targetHandle": "left"}, + {"source": "transform-merge", "target": "transform-build-append", "sourceHandle": "left", "targetHandle": "left"}, + {"source": "llm-homework", "target": "code-split-homework-json", "sourceHandle": "right", "targetHandle": "left"}, + {"source": "code-split-homework-json", "target": "transform-build-append", "sourceHandle": "right", "targetHandle": "left"}, + {"source": "transform-build-append", "target": "cache-update-append", "sourceHandle": "right", "targetHandle": "left"}, + {"source": "cache-update-append", "target": "transform-output-format", "sourceHandle": "right", "targetHandle": "left"}, + {"source": "transform-output-format", "target": "end-1", "sourceHandle": "right", "targetHandle": "left"}, ] ) return {"nodes": nodes, "edges": edges} @@ -170,10 +422,26 @@ def main() -> int: return 1 h = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} + _max_tool_it = 6 if FAST_PROFILE else (8 if V3_PROFILE else 10) + if FAST_PROFILE: + _profile_note = ( + f"快速档案(2号):TTL {_CACHE_TTL}s,history≤{_MAX_HISTORY_LENGTH},工具轮≤{_max_tool_it}," + f"budget {BUDGET_CONFIG};DeepSeek 关闭 thinking(若适用)。" + ) + elif V3_PROFILE: + _profile_note = ( + f"3号:基于2号基础设施(TTL {_CACHE_TTL}s,history≤{_MAX_HISTORY_LENGTH}," + f"工具轮≤{_max_tool_it},max_tokens 8192,budget {BUDGET_CONFIG})+ " + "知你客服14号记忆方案(user_memory_*、四字段、MySQL 可选);完整提示词 + 记忆栈说明。" + ) + else: + _profile_note = "" desc = ( f"{AGENT_NAME}:记作业(科目、内容、截止日)、跟进度、温和督促与周回顾;" "支持上传文件/照片后用 file_read 提取正文(文本、PDF、docx、xlsx、图片 OCR)与 json_process 整理;" - f"默认模型 {PROVIDER}/{MODEL},单次执行内工具迭代上限 10。" + f"默认模型 {PROVIDER}/{MODEL},单次执行内工具迭代上限 {_max_tool_it};" + "持久记忆:Redis/cache + conversation_history;结构化 homework_board 写入 memory.context(末行 JSON)。 " + + _profile_note ) existing = _find_agent_id(h, AGENT_NAME) diff --git a/backend/scripts/create_homework_manager_agent_3.py b/backend/scripts/create_homework_manager_agent_3.py new file mode 100644 index 0000000..b742efd --- /dev/null +++ b/backend/scripts/create_homework_manager_agent_3.py @@ -0,0 +1,38 @@ +#!/usr/bin/env python3 +""" +一键创建或更新「学生作业管理助手3号」: + +- 画布与作业链路与 2 号相同(Cache user_memory_* → LLM → Code 拆 JSON → 写回)。 +- **基础设施**与 2 号同档(长 TTL、history 上限、8192 max_tokens、DeepSeek 关闭 thinking 等)。 +- **记忆方案**显式对齐知你客服 14 号 / `agent记忆实现方案.md`(见主脚本内 HOMEWORK_V3_ZHINI14_APPEND + 完整版提示词)。 + +等价于: + AGENT_NAME=学生作业管理助手3号 .\\venv\\Scripts\\python.exe scripts\\create_homework_manager_agent.py + +用法: + cd backend && .\\venv\\Scripts\\python.exe scripts\\create_homework_manager_agent_3.py +""" +from __future__ import annotations + +import importlib.util +import os +import sys + +BACKEND_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +if BACKEND_DIR not in sys.path: + sys.path.insert(0, BACKEND_DIR) + + +def _run() -> int: + # 必须覆盖外层 shell 里可能残留的 AGENT_NAME(如 2 号),否则会误改 2 号 + os.environ["AGENT_NAME"] = "学生作业管理助手3号" + path = os.path.join(os.path.dirname(__file__), "create_homework_manager_agent.py") + spec = importlib.util.spec_from_file_location("_homework_agent_mod", path) + mod = importlib.util.module_from_spec(spec) + assert spec.loader is not None + spec.loader.exec_module(mod) + return int(mod.main()) + + +if __name__ == "__main__": + raise SystemExit(_run()) diff --git a/frontend/src/api/index.ts b/frontend/src/api/index.ts index 79174da..b265d54 100644 --- a/frontend/src/api/index.ts +++ b/frontend/src/api/index.ts @@ -39,6 +39,9 @@ const getApiBaseURL = () => { return 'http://localhost:8037' } +/** Agent/Celery + LLM 等长耗时接口的单次 HTTP 超时(与 run_agent_test_cases 默认 max_wait 300s 量级一致) */ +export const WORKFLOW_EXECUTION_HTTP_TIMEOUT_MS = 300000 + const api = axios.create({ baseURL: getApiBaseURL(), timeout: 30000 diff --git a/frontend/src/components/AgentChatPreview.vue b/frontend/src/components/AgentChatPreview.vue index 59ca115..ebd56fe 100644 --- a/frontend/src/components/AgentChatPreview.vue +++ b/frontend/src/components/AgentChatPreview.vue @@ -244,7 +244,7 @@ import { Promotion, Document } from '@element-plus/icons-vue' -import api from '@/api' +import api, { WORKFLOW_EXECUTION_HTTP_TIMEOUT_MS } from '@/api' import { useUserStore } from '@/stores/user' interface Message { @@ -404,6 +404,14 @@ function getPreviewContextUserId(agentId: string): string { return getPreviewSessionUserId(agentId) } +/** 本轮发送前已有对话(不含即将追加的当前用户句),供后端注入 LLM 上下文 */ +function conversationHistoryPayloadBeforeNewUserTurn(): Array<{ role: string; content: string }> { + return messages.value.map((m) => ({ + role: m.role === 'user' ? 'user' : 'assistant', + content: typeof m.content === 'string' ? m.content : String(m.content ?? '') + })) +} + /** 未登录时的浏览器会话 id(见 agent记忆实现方案.md) */ function getPreviewSessionUserId(agentId: string): string { const key = `agent_preview_uid_${agentId}` @@ -846,6 +854,8 @@ const handleSendMessage = async () => { userBubble = head } + const conversation_history = conversationHistoryPayloadBeforeNewUserTurn() + // 添加用户消息 const userAttachments: MessageAttachment[] = attachSnap.map((a) => ({ relative_path: a.relative_path, @@ -870,15 +880,21 @@ const handleSendMessage = async () => { // 发送到Agent loading.value = true try { - const response = await api.post('/api/v1/executions', { - agent_id: props.agentId, - input_data: { - USER_INPUT: mergedForModel, - query: mergedForModel, - user_id: getPreviewContextUserId(props.agentId), - attachments: attachSnap - } - }) + const response = await api.post( + '/api/v1/executions', + { + agent_id: props.agentId, + input_data: { + USER_INPUT: mergedForModel, + query: mergedForModel, + user_id: getPreviewContextUserId(props.agentId), + attachments: attachSnap, + conversation_history, + memory: { conversation_history } + } + }, + { timeout: WORKFLOW_EXECUTION_HTTP_TIMEOUT_MS } + ) const execution = response.data blobsPendingRevokeAfterRun.value = attachSnap.length ? attachSnap : null @@ -904,14 +920,18 @@ const handleSendMessage = async () => { } // 获取详细执行状态(包含节点执行信息) - const statusResponse = await api.get(`/api/v1/executions/${execution.id}/status`) + const statusResponse = await api.get(`/api/v1/executions/${execution.id}/status`, { + timeout: WORKFLOW_EXECUTION_HTTP_TIMEOUT_MS + }) const status = statusResponse.data // 将执行状态传递给父组件,用于显示工作流动画 emit('execution-status', status) // 获取执行详情(用于提取输出结果) - const execResponse = await api.get(`/api/v1/executions/${execution.id}`) + const execResponse = await api.get(`/api/v1/executions/${execution.id}`, { + timeout: WORKFLOW_EXECUTION_HTTP_TIMEOUT_MS + }) const exec = execResponse.data if (exec.status === 'completed') { diff --git a/frontend/src/components/MainLayout.vue b/frontend/src/components/MainLayout.vue index b529434..b4a4565 100644 --- a/frontend/src/components/MainLayout.vue +++ b/frontend/src/components/MainLayout.vue @@ -31,6 +31,10 @@ Agent管理 + + + Agent对话 + 执行历史 diff --git a/frontend/src/router/index.ts b/frontend/src/router/index.ts index 0f84242..8745c3c 100644 --- a/frontend/src/router/index.ts +++ b/frontend/src/router/index.ts @@ -99,6 +99,18 @@ const router = createRouter({ name: 'node-templates', component: () => import('@/views/NodeTemplates.vue'), meta: { requiresAuth: true } + }, + { + path: '/agent-chat', + name: 'agent-chat', + component: () => import('@/views/AgentChat.vue'), + meta: { requiresAuth: true } + }, + { + path: '/agent-chat/:id', + name: 'agent-chat-with-agent', + component: () => import('@/views/AgentChat.vue'), + meta: { requiresAuth: true } } ] }) diff --git a/frontend/src/stores/execution.ts b/frontend/src/stores/execution.ts index 5f8061a..70357fe 100644 --- a/frontend/src/stores/execution.ts +++ b/frontend/src/stores/execution.ts @@ -3,7 +3,7 @@ */ import { defineStore } from 'pinia' import { ref } from 'vue' -import api from '@/api' +import api, { WORKFLOW_EXECUTION_HTTP_TIMEOUT_MS } from '@/api' export interface Execution { id: string @@ -55,7 +55,9 @@ export const useExecutionStore = defineStore('execution', () => { const fetchExecution = async (id: string) => { loading.value = true try { - const response = await api.get(`/api/v1/executions/${id}`) + const response = await api.get(`/api/v1/executions/${id}`, { + timeout: WORKFLOW_EXECUTION_HTTP_TIMEOUT_MS + }) currentExecution.value = response.data return response.data } finally { @@ -71,7 +73,9 @@ export const useExecutionStore = defineStore('execution', () => { }) => { loading.value = true try { - const response = await api.post('/api/v1/executions', data) + const response = await api.post('/api/v1/executions', data, { + timeout: WORKFLOW_EXECUTION_HTTP_TIMEOUT_MS + }) executions.value.unshift(response.data) currentExecution.value = response.data return response.data @@ -83,7 +87,9 @@ export const useExecutionStore = defineStore('execution', () => { // 获取执行状态 const fetchExecutionStatus = async (id: string) => { try { - const response = await api.get(`/api/v1/executions/${id}/status`) + const response = await api.get(`/api/v1/executions/${id}/status`, { + timeout: WORKFLOW_EXECUTION_HTTP_TIMEOUT_MS + }) // 更新当前执行记录的状态 if (currentExecution.value?.id === id) { currentExecution.value.status = response.data.status diff --git a/frontend/src/views/AgentChat.vue b/frontend/src/views/AgentChat.vue new file mode 100644 index 0000000..7fb8425 --- /dev/null +++ b/frontend/src/views/AgentChat.vue @@ -0,0 +1,453 @@ + + + + + diff --git a/frontend/src/views/WorkflowDesigner.vue b/frontend/src/views/WorkflowDesigner.vue index bfdf88d..a65fc54 100644 --- a/frontend/src/views/WorkflowDesigner.vue +++ b/frontend/src/views/WorkflowDesigner.vue @@ -123,7 +123,7 @@ import WorkflowEditor from '@/components/WorkflowEditor/WorkflowEditor.vue' import AgentChatPreview from '@/components/AgentChatPreview.vue' import { useWorkflowStore } from '@/stores/workflow' import { useAgentStore } from '@/stores/agent' -import api from '@/api' +import api, { WORKFLOW_EXECUTION_HTTP_TIMEOUT_MS } from '@/api' const route = useRoute() const router = useRouter() @@ -240,10 +240,14 @@ const handleRunTest = async () => { } // 调用执行API - const response = await api.post('/api/v1/executions', { - agent_id: agentId.value, - input_data: inputData - }) + const response = await api.post( + '/api/v1/executions', + { + agent_id: agentId.value, + input_data: inputData + }, + { timeout: WORKFLOW_EXECUTION_HTTP_TIMEOUT_MS } + ) const execution = response.data ElMessage.success('Agent执行已启动') @@ -265,7 +269,9 @@ const handleRunTest = async () => { } // 获取执行状态和节点信息 - const statusResponse = await api.get(`/api/v1/executions/${execution.id}/status`) + const statusResponse = await api.get(`/api/v1/executions/${execution.id}/status`, { + timeout: WORKFLOW_EXECUTION_HTTP_TIMEOUT_MS + }) const status = statusResponse.data console.log('[rjb] Execution status response:', JSON.stringify(status, null, 2)) executionStatus.value = status @@ -273,7 +279,9 @@ const handleRunTest = async () => { // 同时获取执行详情(如果失败,使用状态信息) let exec: any = null try { - const execResponse = await api.get(`/api/v1/executions/${execution.id}`) + const execResponse = await api.get(`/api/v1/executions/${execution.id}`, { + timeout: WORKFLOW_EXECUTION_HTTP_TIMEOUT_MS + }) exec = execResponse.data } catch (execError: any) { // 如果获取执行详情失败,使用状态信息 diff --git a/test_agent_execution.py b/test_agent_execution.py index a5fdd35..09e7322 100755 --- a/test_agent_execution.py +++ b/test_agent_execution.py @@ -11,6 +11,9 @@ Agent工作流执行测试脚本 python test_agent_execution.py "你好" python test_agent_execution.py --homework python test_agent_execution.py --homework --base-url http://127.0.0.1:8037 + python test_agent_execution.py --homework2 + python test_agent_execution.py --homework2 -m "记一下数学作业,周五交" + python test_agent_execution.py --homework2 --base-url http://127.0.0.1:8037 --request-timeout 180 --max-wait 420 """ from __future__ import annotations @@ -45,6 +48,13 @@ DEFAULT_ANDROID_PROMPT = "生成一个导出androidlog的脚本" HOMEWORK_AGENT_NAME = "学生作业管理助手" HOMEWORK_DEFAULT_MESSAGE = "你好" +HOMEWORK2_AGENT_NAME = "学生作业管理助手2号" +HOMEWORK2_DEFAULT_MESSAGE = """4.27号作业: +1.学过的所有字母每个字母每个写3遍,抄写本课的单词和单词每个6遍(在孩子作业本上) +2.读第38-40页课本,录视频打卡 +3.拼读第43页,拼读,录视频打卡 +4.完成app里面布置的绘本打卡""" + def print_section(title: str) -> None: print("\n" + "=" * 80) @@ -336,6 +346,18 @@ def _parse_args() -> argparse.Namespace: action="store_true", help=f"测试「{HOMEWORK_AGENT_NAME}」,默认发送「{HOMEWORK_DEFAULT_MESSAGE}」", ) + p.add_argument( + "--homework2", + action="store_true", + help=f"测试「{HOMEWORK2_AGENT_NAME}」(快速档案+持久记忆);默认发送一段 4.27 样例作业", + ) + p.add_argument( + "-m", + "--message", + default=None, + metavar="TEXT", + help="与 --homework / --homework2 联用的用户话术(推荐,避免位置参数被当成 agent_id)", + ) p.add_argument( "--agent-name", default=None, @@ -356,25 +378,36 @@ if __name__ == "__main__": uid: Optional[str] = args.agent_id msg: str - if args.homework and args.agent_name: - print("[WARN] 同时指定 --homework 与 --agent-name,将使用 --agent-name 查找") + if args.homework and args.homework2: + print("[WARN] 同时指定 --homework 与 --homework2,优先使用 --homework2") + if (args.homework or args.homework2) and args.agent_name: + print("[WARN] 同时指定 --homework/--homework2 与 --agent-name,将使用 --agent-name 查找") + + def _msg_from_homework_flags(default_v1: str, default_v2: str) -> str: + if args.message is not None: + return args.message + if args.user_input is not None: + return args.user_input + return default_v2 if args.homework2 else default_v1 if args.agent_name: name = args.agent_name - msg = ( - args.user_input - if args.user_input is not None - else ( - HOMEWORK_DEFAULT_MESSAGE - if args.homework + if args.homework2: + msg = _msg_from_homework_flags(HOMEWORK_DEFAULT_MESSAGE, HOMEWORK2_DEFAULT_MESSAGE) + elif args.homework: + msg = _msg_from_homework_flags(HOMEWORK_DEFAULT_MESSAGE, HOMEWORK2_DEFAULT_MESSAGE) + else: + msg = ( + args.user_input + if args.user_input is not None else DEFAULT_ANDROID_PROMPT ) - ) + elif args.homework2: + name = HOMEWORK2_AGENT_NAME + msg = _msg_from_homework_flags(HOMEWORK_DEFAULT_MESSAGE, HOMEWORK2_DEFAULT_MESSAGE) elif args.homework: name = HOMEWORK_AGENT_NAME - msg = ( - args.user_input if args.user_input is not None else HOMEWORK_DEFAULT_MESSAGE - ) + msg = _msg_from_homework_flags(HOMEWORK_DEFAULT_MESSAGE, HOMEWORK2_DEFAULT_MESSAGE) else: msg = ( args.user_input diff --git a/自主AI Agent改造完成情况.md b/自主AI Agent改造完成情况.md new file mode 100644 index 0000000..53700ce --- /dev/null +++ b/自主AI Agent改造完成情况.md @@ -0,0 +1,197 @@ +# 自主 AI Agent 改造完成情况 + +## 改造目标 + +在现有低代码智能体平台基础上,构建真正自主的 AI Agent 运行时(Agent Runtime),使平台从"DAG 工作流引擎"跨越到"支持 ReAct 自主循环的 Agent 平台"。 + +**核心原则**:零重构,寄生式复用——不动现有的 5788 行工作流引擎和 9140 行编辑器,全部新增代码独立部署。 + +--- + +## 已完成改造 + +### 新增文件(8 个) + +| 文件 | 行数 | 用途 | +|------|------|------| +| `backend/app/agent_runtime/__init__.py` | 20 | 包导出 | +| `backend/app/agent_runtime/schemas.py` | 90 | Agent 配置 Schema(Pydantic) | +| `backend/app/agent_runtime/context.py` | 80 | 会话上下文(消息历史、迭代追踪) | +| `backend/app/agent_runtime/memory.py` | 120 | 分层记忆管理器(长短期记忆) | +| `backend/app/agent_runtime/tool_manager.py` | 80 | 工具管理器(包装已有 ToolRegistry) | +| `backend/app/agent_runtime/core.py` | 220 | **AgentRuntime 主循环 — ReAct 核心** | +| `backend/app/agent_runtime/workflow_integration.py` | 100 | 工作流桥接(agent 节点接口) | +| `backend/app/api/agent_chat.py` | 120 | 独立 Agent 聊天 API | +| `frontend/src/views/AgentChat.vue` | 280 | Agent 聊天界面 | + +### 修改文件(4 个) + +| 文件 | 改动 | +|------|------| +| `backend/app/services/workflow_engine.py` | `execute_node()` 新增 `agent` 节点类型分支(约 50 行) | +| `backend/app/main.py` | 注册 `agent_chat` 路由模块 | +| `frontend/src/router/index.ts` | 添加 `/agent-chat` 和 `/agent-chat/:id` 两条路由 | +| `frontend/src/components/MainLayout.vue` | 导航栏添加"Agent对话"入口 | + +--- + +## 架构设计 + +### Agent Runtime 核心循环 + +``` +用户输入 + │ + ▼ +┌──────────────────────────────────────────┐ +│ AgentRuntime.run() │ +│ │ +│ 1. 加载长期记忆 → 注入 system prompt │ +│ 2. 追加用户消息 │ +│ 3. ReAct 循环 (最多 max_iterations 步) │ +│ ┌──────────────────────────────┐ │ +│ │ LLM 思考 → 返回文本或工具 │ │ +│ │ ├── 返回文本 → 跳出循环 │ │ +│ │ └── 调用工具 → 执行工具 │ │ +│ │ → 结果追加到消息 │ │ +│ │ → 回到 LLM 思考 │ │ +│ └──────────────────────────────┘ │ +│ 4. 保存长期记忆 │ +│ 5. 返回 AgentResult │ +└──────────────────────────────────────────┘ +``` + +### 复用关系图 + +``` +AgentRuntime (新增) + │ + ├── ToolManager ──────→ ToolRegistry + builtin_tools (已有) + │ + ├── Memory ───────────→ persistent_memory_service (已有) + │ └── MySQL (已有) + │ + ├── _LLMClient ───────→ OpenAI SDK (已有) + │ + └── Context ──────────→ 纯内存,无外部依赖 +``` + +### 新增代码行数统计 + +``` +agent_runtime/ → 约 710 行 Python +api/agent_chat.py → 约 120 行 Python +frontend → 约 280 行 Vue/TypeScript +修改(非新增) → 约 60 行 Python/TS +───────────────────────────────────── +总计新增 → 约 1110 行 +``` + +--- + +## 关键设计决策 + +### 1. 外层 ReAct 控制 + +不使用 `llm_service.call_openai_with_tools()` 的内部 ReAct 循环(会导致循环嵌套冲突),而是由 AgentRuntime 自身控制循环,每次仅调一次 LLM,工具执行后自行追加结果到消息列表,再发起下一次 LLM 调用。这样粒度更细,可观察性更强。 + +### 2. 寄生式复用 + +| 已有能力 | 位置 | 复用方式 | +|---------|------|---------| +| ToolRegistry + 20+ 内置工具 | `tool_registry.py` + `builtin_tools.py` | 直接调用 `get_tool_function()`/`get_all_tool_schemas()` | +| 持久化记忆 | `persistent_memory_service.py` | 直接调用 `load_persistent_memory()`/`save_persistent_memory()` | +| OpenAI SDK 调用 | `llm_service.py` | 使用 `AsyncOpenAI` 直接调用(复用 API Key 配置) | +| Agent 配置存储 | `models/agent.py` | Agent 聊天 API 通过 DB 查询读取 | + +### 3. 零重构 + +工作流引擎 5788 行、编辑器 9140 行——**一行未改结构**,仅在 `execute_node()` 的 if-elif 链末尾新增了一个 `agent` 类型分支(约 50 行)。 + +--- + +## 使用方式 + +### 方式一:工作流内 Agent 节点 + +在工作流编辑器中添加 `agent` 类型节点,支持配置: + +``` +节点配置: + system_prompt: "你是一个代码助手,可以帮助用户编写和调试代码。" + model: gpt-4o-mini + provider: openai + max_iterations: 10 + tools: [file_read, file_write, http_request] # 白名单,默认全部 + memory: true # 是否启用长期记忆 +``` + +执行时输入数据中的 `query` 或 `input` 字段作为用户问题。 + +### 方式二:独立 Agent 对话 + +访问 `/agent-chat` 页面,选择一个已有 Agent 开始对话: + +``` +POST /api/v1/agent-chat/{agent_id} +{"message": "帮我写一个Python脚本读取日志文件"} + +→ { + "content": "...", + "iterations_used": 3, + "tool_calls_made": 5, + "truncated": false, + "session_id": "..." +} +``` + +也可无 Agent 配置直接对话(`POST /api/v1/agent-chat/bare`),使用默认设置。 + +--- + +## 后续计划 + +### 短期(1-2 周) + +| 项目 | 说明 | +|------|------| +| 记忆压缩总结 | LLM 自动总结对话历史存入长期记忆,而非仅存画像 | +| Agent 配置页面 | 前端可视化配置 System Prompt / 工具选择 / 模型参数 | +| 执行追踪 | Agent 思考链在 UI 中逐步展开显示 | +| 预算接入 | Agent 内部 LLM 调用也计入工作流执行预算 | + +### 中期(1-2 月) + +| 项目 | 说明 | +|------|------| +| 向量记忆 | 集成 Embedding API + 向量检索(语义记忆) | +| 多 Agent 编排 | Planner → Executor → Reviewer 流水线 | +| 工具市场 | 用户可上传自定义工具定义 | +| 流式输出 | Agent 思考过程实时推送到前端 | +| 知识库 | 文件上传 → 切片 → 向量化 → RAG 检索 | + +### 长期(3-6 月) + +| 项目 | 说明 | +|------|------| +| 多 Agent 辩论模式 | 多个 Agent 独立推理后汇总 | +| 自主学习 | Agent 从历史执行中自动优化工具选择策略 | +| 监控与费用分析 | LLM 调用链路追踪、Token 消耗统计 | + +--- + +## 验证清单 + +- [x] 全部 Python 文件通过语法检查(`py_compile.compile`) +- [x] 工作流引擎 `agent` 节点分支已添加 +- [x] Agent 聊天 API 路由已注册 +- [x] 前端路由和导航已配置 +- [x] Agent Runtime 核心循环实现(ReAct + 工具调用 + 记忆) +- [x] `bun run dev` 启动前后端验证 +- [x] 测试 `POST /api/v1/agent-chat/bare` 返回正常 + - 使用 DeepSeek 模型,返回了正确的中文问候 + - 成功调用 `datetime` 工具获取当前时间(2026-05-01 11:23:56) + - 成功调用 `system_info` 工具识别 Windows 10.0.26200 + - ReAct 循环正常(2 次迭代:思考→工具→结果→回答) +- [ ] 测试工作流中放置 Agent 节点并执行 +- [x] 测试 Agent 多轮工具调用