From 1c83b6284f9e43e859a3cba29ff69918f8be547c Mon Sep 17 00:00:00 2001 From: renjianbo <18691577328@163.com> Date: Sun, 3 May 2026 00:20:29 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20add=20=E7=81=B5=E7=8A=80=20Feishu=20bot?= =?UTF-8?q?=20+=20fix=20agent=20schedule=20system=20+=20default=20all=20to?= =?UTF-8?q?ols?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add 灵犀学习助手 Feishu bot (lingxi_app_service + lingxi_ws_handler) - Fix agent_schedule_service missing AgentSchedule import (Celery Beat) - Fix scene_templates default enable_tools=False → True - Fix workflow_engine LLM node: empty tools list now = all tools (consistent with agent node) - Add 创建agent.md guide document Co-Authored-By: Claude Opus 4.6 --- backend/app/core/config.py | 5 + backend/app/main.py | 7 + .../app/services/agent_schedule_service.py | 2 + backend/app/services/lingxi_app_service.py | 105 +++++++ backend/app/services/lingxi_ws_handler.py | 278 ++++++++++++++++++ backend/app/services/scene_templates.py | 4 +- backend/app/services/workflow_engine.py | 17 +- 创建agent.md | 158 ++++++++++ 8 files changed, 564 insertions(+), 12 deletions(-) create mode 100644 backend/app/services/lingxi_app_service.py create mode 100644 backend/app/services/lingxi_ws_handler.py create mode 100644 创建agent.md diff --git a/backend/app/core/config.py b/backend/app/core/config.py index e9ad7f0..a5df8f5 100644 --- a/backend/app/core/config.py +++ b/backend/app/core/config.py @@ -105,6 +105,11 @@ class Settings(BaseSettings): TIANTIAN_APP_SECRET: str = "" TIANTIAN_AGENT_ID: str = "" # 创建苏瑶3号后写入 + # 灵犀飞书应用配置(独立 WS 连接,路由到灵犀学习助手 Agent) + LINGXI_APP_ID: str = "" + LINGXI_APP_SECRET: str = "" + LINGXI_AGENT_ID: str = "" # 创建灵犀后写入 + class Config: env_file = str(_ENV_PATH) case_sensitive = True diff --git a/backend/app/main.py b/backend/app/main.py index f0c8d50..81154f1 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -241,6 +241,13 @@ async def startup_event(): except Exception as e: logger.error(f"甜甜长连接启动失败: {e}") + # 启动灵犀飞书长连接(学习助手) + try: + from app.services.lingxi_ws_handler import start_ws_client as start_lingxi_ws + asyncio.ensure_future(start_lingxi_ws()) + except Exception as e: + logger.error(f"灵犀长连接启动失败: {e}") + # 注册路由 from app.api import auth, uploads, workflows, executions, websocket, execution_logs, data_sources, agents, platform_templates, model_configs, webhooks, template_market, batch_operations, collaboration, permissions, monitoring, alert_rules, node_test, node_templates, tools, agent_chat, agent_monitoring, knowledge_base, agent_schedules, notifications, feishu_bind diff --git a/backend/app/services/agent_schedule_service.py b/backend/app/services/agent_schedule_service.py index 3c2433c..1a3eb04 100644 --- a/backend/app/services/agent_schedule_service.py +++ b/backend/app/services/agent_schedule_service.py @@ -95,6 +95,8 @@ def check_and_run_due_schedules() -> int: Returns: 本次触发的任务数 """ + from app.models.agent_schedule import AgentSchedule + db: Optional[Session] = None try: db = SessionLocal() diff --git a/backend/app/services/lingxi_app_service.py b/backend/app/services/lingxi_app_service.py new file mode 100644 index 0000000..3ff2750 --- /dev/null +++ b/backend/app/services/lingxi_app_service.py @@ -0,0 +1,105 @@ +"""灵犀飞书应用 API 服务 — 通过灵犀应用发送消息到用户""" +from __future__ import annotations + +import json +import logging +import time +from typing import Optional + +import httpx + +from app.core.config import settings + +logger = logging.getLogger(__name__) + +_token_cache: dict = {"token": None, "expires_at": 0} + + +def _get_tenant_access_token() -> Optional[str]: + now = time.time() + if _token_cache["token"] and now < _token_cache["expires_at"] - 300: + return _token_cache["token"] + + app_id = settings.LINGXI_APP_ID + app_secret = settings.LINGXI_APP_SECRET + if not app_id or not app_secret: + logger.warning("灵犀应用未配置(LINGXI_APP_ID / LINGXI_APP_SECRET)") + return None + + try: + with httpx.Client(timeout=10) as client: + resp = client.post( + "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal", + json={"app_id": app_id, "app_secret": app_secret}, + ) + result = resp.json() + if resp.is_success and result.get("code") == 0: + token = result["tenant_access_token"] + expire = result.get("expire", 7200) + _token_cache["token"] = token + _token_cache["expires_at"] = now + expire + logger.info("灵犀 tenant_access_token 获取成功") + return token + else: + logger.warning("灵犀 token 获取失败: %s", result) + return None + except Exception as e: + logger.warning("灵犀 token 获取异常: %s", e) + return None + + +def send_message_to_user( + open_id: str, title: str, content: str, + status: str = "info", detail_link: Optional[str] = None, +) -> bool: + token = _get_tenant_access_token() + if not token: + return False + color_map = {"success": "green", "failed": "red", "info": "blue"} + color = color_map.get(status, "blue") + elements = [{"tag": "markdown", "content": content}] + if detail_link: + elements.append({ + "tag": "action", + "actions": [{"tag": "button", "text": {"tag": "plain_text", "content": "查看详情"}, "url": detail_link, "type": "default"}], + }) + card = { + "config": {"wide_screen_mode": True}, + "header": {"title": {"tag": "plain_text", "content": title}, "template": color}, + "elements": elements, + } + try: + with httpx.Client(timeout=10) as client: + resp = client.post( + "https://open.feishu.cn/open-apis/im/v1/messages?receive_id_type=open_id", + headers={"Authorization": f"Bearer {token}"}, + json={"receive_id": open_id, "msg_type": "interactive", "content": json.dumps(card, ensure_ascii=False)}, + ) + result = resp.json() + if resp.is_success and result.get("code") == 0: + logger.info("灵犀消息发送成功: open_id=%s title=%s", open_id[:20], title) + return True + else: + logger.warning("灵犀消息发送失败: code=%s msg=%s", result.get("code"), result.get("msg")) + return False + except Exception as e: + logger.warning("灵犀消息发送异常: %s", e) + return False + + +def send_plain_text(open_id: str, text: str) -> bool: + token = _get_tenant_access_token() + if not token: + return False + try: + with httpx.Client(timeout=10) as client: + resp = client.post( + "https://open.feishu.cn/open-apis/im/v1/messages?receive_id_type=open_id", + headers={"Authorization": f"Bearer {token}"}, + json={"receive_id": open_id, "msg_type": "text", "content": json.dumps({"text": text}, ensure_ascii=False)}, + ) + result = resp.json() + return resp.is_success and result.get("code") == 0 + except Exception as e: + logger.warning("灵犀文本消息发送异常: %s", e) + return False diff --git a/backend/app/services/lingxi_ws_handler.py b/backend/app/services/lingxi_ws_handler.py new file mode 100644 index 0000000..2ef83bd --- /dev/null +++ b/backend/app/services/lingxi_ws_handler.py @@ -0,0 +1,278 @@ +"""灵犀飞书长连接 — 固定路由到灵犀学习助手 Agent(方案C:知识图谱+RAG)""" +from __future__ import annotations + +import asyncio +import json +import logging +from collections import deque +from typing import Optional + +from app.core.config import settings + +logger = logging.getLogger(__name__) + +_processed_msg_ids: deque[str] = deque(maxlen=20) + + +def _get_message_id(data) -> Optional[str]: + try: + ev = data.event + msg = getattr(ev, "message", None) + if msg: + return getattr(msg, "message_id", None) + except Exception: + return None + return None + + +def _get_message_text(data) -> Optional[str]: + try: + ev = data.event + msg = getattr(ev, "message", None) + if not msg: + return None + content_str = getattr(msg, "content", None) + msg_type = getattr(msg, "message_type", "") + if not content_str: + return None + if msg_type == "text": + parsed = json.loads(content_str) + return parsed.get("text", "") + return None + except Exception as e: + logger.warning("解析灵犀消息内容失败: %s", e) + return None + + +def _get_sender_open_id(data) -> Optional[str]: + try: + ev = data.event + sender = getattr(ev, "sender", None) + if not sender: + return None + sender_id = getattr(sender, "sender_id", None) + if not sender_id: + return None + return getattr(sender_id, "open_id", None) + except Exception: + return None + + +def _get_chat_type(data) -> str: + try: + ev = data.event + msg = getattr(ev, "message", None) + return getattr(msg, "chat_type", "") if msg else "" + except Exception: + return "" + + +def _reply_to_feishu(open_id: str, text: str): + try: + from app.services.lingxi_app_service import send_plain_text + send_plain_text(open_id, text) + except Exception as e: + logger.warning("灵犀回复消息失败: %s", e) + + +def _reply_card(open_id: str, title: str, content: str, status: str = "info"): + try: + from app.services.lingxi_app_service import send_message_to_user + send_message_to_user(open_id, title, content, status=status) + except Exception as e: + logger.warning("灵犀回复卡片失败: %s", e) + + +def _make_llm_logger(db, agent_id: Optional[str] = None, user_id: Optional[str] = None): + def _log(metrics: dict): + try: + from app.models.agent_llm_log import AgentLLMLog + log = AgentLLMLog( + agent_id=agent_id, session_id=metrics.get("session_id"), + user_id=user_id, model=metrics.get("model", ""), + provider=metrics.get("provider"), + prompt_tokens=metrics.get("prompt_tokens", 0), + completion_tokens=metrics.get("completion_tokens", 0), + total_tokens=metrics.get("total_tokens", 0), + latency_ms=metrics.get("latency_ms", 0), + iteration_number=metrics.get("iteration_number", 0), + step_type=metrics.get("step_type"), + tool_name=metrics.get("tool_name"), + status=metrics.get("status", "success"), + error_message=metrics.get("error_message"), + ) + db.add(log) + db.commit() + except Exception as e: + logger.warning("写入 AgentLLMLog 失败: %s", e) + return _log + + +async def _handle_message_async(data): + open_id = _get_sender_open_id(data) + chat_type = _get_chat_type(data) + text = _get_message_text(data) + + if not open_id or chat_type != "p2p": + return + + logger.info("灵犀收到消息: open_id=%s text=%s", open_id[:20], text[:50] if text else "(空)") + + if not text: + return + + from sqlalchemy.orm import Session + from app.core.database import SessionLocal + from app.models.agent import Agent + + db: Optional[Session] = None + try: + db = SessionLocal() + + agent_id = settings.LINGXI_AGENT_ID + if not agent_id: + _reply_to_feishu(open_id, "灵犀尚未配置,请联系管理员。") + return + + agent = db.query(Agent).filter(Agent.id == agent_id).first() + if not agent: + _reply_to_feishu(open_id, "灵犀 Agent 已不存在,请联系管理员。") + return + + _reply_to_feishu(open_id, "正在思考,请稍候...") + + from app.agent_runtime import AgentRuntime, AgentConfig, AgentLLMConfig, AgentToolConfig, AgentMemoryConfig + + wc = agent.workflow_config or {} + nodes = wc.get("nodes", []) + system_prompt = agent.description or "" + model = "deepseek-v4-flash" + provider = "deepseek" + temperature = 0.85 + max_iterations = 30 + tools_whitelist = [] + + for n in nodes: + if n.get("type") not in ("agent", "llm", "template"): + continue + cfg = n.get("data", {}) if isinstance(n, dict) else getattr(n, "data", {}) + system_prompt = cfg.get("system_prompt", "") or system_prompt + model = cfg.get("model", model) + provider = cfg.get("provider", provider) + temperature = float(cfg.get("temperature", temperature)) + max_iterations = int(cfg.get("max_iterations", max_iterations)) + tools_whitelist = cfg.get("tools", tools_whitelist) + break + + config = AgentConfig( + name=agent.name or "灵犀", + system_prompt=system_prompt, + llm=AgentLLMConfig( + model=model, provider=provider, + temperature=temperature, max_iterations=max_iterations, + ), + tools=AgentToolConfig(include_tools=tools_whitelist), + memory=AgentMemoryConfig( + max_history_messages=int(cfg.get("memory_max_history", 20)), + vector_memory_top_k=int(cfg.get("memory_vector_top_k", 5)), + persist_to_db=bool(cfg.get("memory_persist", True)), + vector_memory_enabled=bool(cfg.get("memory_vector_enabled", True)), + learning_enabled=bool(cfg.get("memory_learning", True)), + ), + user_id=None, + memory_scope_id=str(agent.id), + ) + + on_llm_call = _make_llm_logger(db, agent_id=str(agent.id)) + runtime = AgentRuntime(config=config, on_llm_call=on_llm_call) + result = await runtime.run(text) + + if result.content: + _reply_card(open_id, f"{agent.name}", result.content.strip(), status="success") + else: + _reply_to_feishu(open_id, "Agent 未返回有效回复,请重试。") + + logger.info( + "灵犀 Agent 回复完成: open_id=%s agent=%s iterations=%d tools=%d", + open_id[:20], agent.name, result.iterations_used, result.tool_calls_made, + ) + + except Exception as e: + logger.error("灵犀消息处理失败: %s", e) + try: + _reply_to_feishu(open_id, f"处理失败: {e!s}") + except Exception: + pass + finally: + if db: + db.close() + + +def _handle_message_internal(data): + msg_id = _get_message_id(data) + if msg_id: + if msg_id in _processed_msg_ids: + return + _processed_msg_ids.append(msg_id) + + open_id = _get_sender_open_id(data) + chat_type = _get_chat_type(data) + text = _get_message_text(data) + + if not open_id or chat_type != "p2p" or not text: + return + + try: + loop = asyncio.get_event_loop() + if loop.is_running(): + asyncio.ensure_future(_handle_message_async(data)) + else: + loop.run_until_complete(_handle_message_async(data)) + except Exception as e: + logger.error("灵犀创建消息处理任务失败: %s", e) + try: + _reply_to_feishu(open_id, f"处理失败: {e!s}") + except Exception: + pass + + +def _build_event_handler(): + from lark_oapi.event.dispatcher_handler import EventDispatcherHandler + + def on_message_receive(data): + _handle_message_internal(data) + + builder = EventDispatcherHandler.builder(encrypt_key="", verification_token="") + builder.register_p2_im_message_receive_v1(on_message_receive) + return builder.build() + + +async def start_ws_client(): + if not settings.LINGXI_APP_ID or not settings.LINGXI_APP_SECRET: + logger.warning("灵犀应用未配置,跳过灵犀长连接启动") + return + + from lark_oapi.ws import Client as WSClient + + handler = _build_event_handler() + client = WSClient( + app_id=settings.LINGXI_APP_ID, + app_secret=settings.LINGXI_APP_SECRET, + event_handler=handler, + auto_reconnect=True, + ) + + logger.info("灵犀长连接客户端启动中...") + + while True: + try: + await client._connect() + logger.info("灵犀长连接已建立") + asyncio.ensure_future(client._ping_loop()) + while True: + await asyncio.sleep(3600) + except asyncio.CancelledError: + break + except Exception as e: + logger.warning("灵犀长连接断开,3秒后重连: %s", e) + await asyncio.sleep(3) diff --git a/backend/app/services/scene_templates.py b/backend/app/services/scene_templates.py index 7dbb462..82df402 100644 --- a/backend/app/services/scene_templates.py +++ b/backend/app/services/scene_templates.py @@ -103,7 +103,7 @@ def build_workflow_for_template(template_id: str, parameters: Optional[Dict[str, raise ValueError(f"未知模板: {template_id}") temperature = float(parameters.get("temperature", meta.get("default_temperature", 0.3))) - enable_tools = bool(parameters.get("enable_tools", False)) + enable_tools = bool(parameters.get("enable_tools", True)) tools = parameters.get("tools") if tools is not None and not isinstance(tools, list): tools = [] @@ -116,7 +116,7 @@ def build_workflow_for_template(template_id: str, parameters: Optional[Dict[str, prompt, temperature=temperature, enable_tools=enable_tools, - tools=tools if enable_tools else [], + tools=tools, ) diff --git a/backend/app/services/workflow_engine.py b/backend/app/services/workflow_engine.py index 13e91d8..1885765 100644 --- a/backend/app/services/workflow_engine.py +++ b/backend/app/services/workflow_engine.py @@ -1825,23 +1825,20 @@ class WorkflowEngine: # 如果启用了工具,加载工具定义 tools = [] - if enable_tools and tools_config: + if enable_tools: from app.services.tool_registry import tool_registry - # 从注册表加载工具定义 - tools = tool_registry.get_tools_by_names(tools_config) - logger.info(f"[rjb] LLM节点启用工具调用: {len(tools)} 个工具, 工具列表: {tools_config}") + if tools_config: + tools = tool_registry.get_tools_by_names(tools_config) + else: + # 空列表 = 全部工具(与 Agent 节点行为一致) + tools = tool_registry.get_all_tool_schemas() + logger.info(f"[rjb] LLM节点启用工具调用: {len(tools)} 个工具, 工具列表: {tools_config or '全部'}") if not tools: logger.warning( "[rjb] LLM 已 enable_tools 但当前进程 tool_registry 中 0 个匹配 schema," "将无法发起 function calling(常见于 Celery Worker 未加载 tools_bootstrap)。配置=%s", tools_config, ) - elif len(tools) < len(tools_config): - missing = [n for n in tools_config if not tool_registry.get_tool_schema(n)] - logger.warning( - "[rjb] LLM 工具部分缺失 schema,缺失=%s(可动手能力不完整)", - missing, - ) # 调用LLM服务 try: diff --git a/创建agent.md b/创建agent.md new file mode 100644 index 0000000..2ec95f0 --- /dev/null +++ b/创建agent.md @@ -0,0 +1,158 @@ +# Agent 创建指南 + +## 概述 + +本系统支持多种方式创建 Agent。**所有创建方式均默认赋予 Agent 全部 18 个内置工具能力**,除非明确限制。 + +## 内置工具清单(18个) + +| 类别 | 工具 | 用途 | +|------|------|------| +| 文件 | `file_read` | 读文件:文本/PDF/docx/xlsx/图片OCR(作业拍照识别) | +| 文件 | `file_write` | 写文件:笔记、报告、数据导出 | +| 网络 | `http_request` | HTTP 请求:上网查资料、调 API | +| 网络 | `url_parse` | URL 解析 | +| 数据 | `json_process` | JSON 结构化数据处理 | +| 数据 | `database_query` | 数据库查询 | +| 计算 | `math_calculate` | 数学计算 | +| 文本 | `text_analyze` | 文本分析 | +| 时间 | `datetime` | 日期时间计算、倒计时 | +| 系统 | `system_info` | 系统环境信息 | +| 调度 | `schedule_create` | 创建定时任务(cron 表达式) | +| 调度 | `schedule_list` | 查看定时任务列表 | +| 调度 | `schedule_delete` | 删除定时任务 | +| 通信 | `send_email` | 发送邮件 | +| 工具 | `regex_test` | 正则表达式测试 | +| 工具 | `crypto_util` | 加密/解密工具 | +| 工具 | `random_generate` | 随机数据生成 | +| 调试 | `adb_log` | Android 设备日志(ADB) | + +--- + +## 创建方式 + +### 方式一:前端界面创建(http://localhost:3001/agents) + +#### 1.1 手动创建 +1. 打开 http://localhost:3001/agents +2. 点击「创建 Agent」 +3. 填写名称、描述 +4. 进入工作流设计器拖拽节点(至少需要一个 LLM 或 Agent 节点) +5. **不设置工具白名单 = 全部工具可用** +6. 保存 + +#### 1.2 场景模板创建 +1. 在 Agents 页面点击「从场景模板创建」 +2. 选择模板(客服/研发/运维) +3. 填写名称 +4. 系统默认启用全部工具(`enable_tools=true`,`tools` 为空) + +#### 1.3 导入 JSON +1. 导出已有 Agent 的 JSON +2. 修改后导入 +3. 若 JSON 中未指定 `tools` 字段 = 全部工具 + +--- + +### 方式二:后端 API 创建 + +```bash +# 直接创建 +curl -X POST http://localhost:8037/api/v1/agents \ + -H "Authorization: Bearer " \ + -H "Content-Type: application/json" \ + -d '{ + "name": "My Agent", + "description": "Agent description", + "workflow_config": { + "nodes": [ + {"id": "start-1", "type": "start", "position": {"x": 80, "y": 120}, "data": {}}, + {"id": "llm-1", "type": "llm", "position": {"x": 320, "y": 120}, + "data": { + "prompt": "你是一个有用的AI助手", + "enable_tools": true + // 不指定 tools = 全部工具 + }} + ], + "edges": [...] + } + }' +``` + +### 方式三:Python 脚本创建 + +```python +from app.core.database import SessionLocal +from app.models.agent import Agent +import uuid + +agent = Agent( + id=str(uuid.uuid4()), + name="My Agent", + description="Description", + workflow_config={ + "nodes": [ + {"id": "node_1", "type": "llm", "label": "LLM", + "data": { + "system_prompt": "你是一个有用的AI助手", + "model": "deepseek-v4-flash", + "provider": "deepseek", + "temperature": 0.7, + "max_iterations": 10, + # 不指定 tools = 全部工具 + }} + ], + "edges": [] + }, + status="active", +) +db = SessionLocal() +db.add(agent) +db.commit() +db.close() +``` + +--- + +## 工具配置规则 + +### Agent 节点(AgentRuntime 执行) +- `tools` 字段不存在 → 全部工具 +- `tools: []` → 全部工具 +- `tools: ["file_read", "http_request"]` → 仅这两个工具 + +### LLM 节点(工作流引擎执行) +- `enable_tools: false` → 无工具 +- `enable_tools: true` + `tools` 未设置/为空 → 全部工具 +- `enable_tools: true` + `tools: ["file_read"]` → 仅 file_read + +### 飞书长连接路由(WS Handler) +- 读取 Agent workflow_config 节点 data 中的 `tools` 字段 +- 未设置 = `include_tools=[]` = 全部工具 + +--- + +## 快速检查 Agent 能力 + +``` +GET /api/v1/health +``` + +返回 `builtin_tools.count` 和 `builtin_tools.names`,确认工具已注册。 + +--- + +## 配置飞书连接 + +1. 在飞书开放平台创建应用,开启「机器人」能力 +2. 在 `.env` 中添加: +``` +XXXXX_APP_ID=cli_xxx +XXXXX_APP_SECRET=xxx +XXXXX_AGENT_ID= +``` +3. 创建 `app/services/xxxxx_app_service.py`(token 管理 + 消息发送) +4. 创建 `app/services/xxxxx_ws_handler.py`(WebSocket 长连接 + 消息路由) +5. 在 `app/core/config.py` 添加配置字段 +6. 在 `app/main.py` startup 事件中启动 WS 客户端 +7. 重启后端