From a924486f26f529d7a74095f1a2aec16792b15a44 Mon Sep 17 00:00:00 2001 From: renjianbo <18691577328@163.com> Date: Sat, 2 May 2026 21:44:47 +0800 Subject: [PATCH] feat: add Suyao Feishu bot and per-agent memory config support MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Create suyao_app_service.py and suyao_ws_handler.py for 苏瑶 Feishu bot - Add SUYAO_APP_ID/SUYAO_APP_SECRET/SUYAO_AGENT_ID config fields - Fix node config extraction bug (n.get("config") → n.get("data")) in all WS handlers - Add _build_memory_config_from_node() to support per-agent memory settings (max_history_messages, vector_memory_top_k, persist_to_db, etc.) - Create 苏瑶1号 (Plan A: long context), 苏瑶2号 (Plan B: emotion tracking), 苏瑶3号 (Plan C: knowledge graph + RAG) agents with different memory strategies Co-Authored-By: Claude Opus 4.6 --- backend/app/api/agent_chat.py | 16 ++ backend/app/core/config.py | 5 + backend/app/main.py | 7 + backend/app/services/feishu_ws_handler.py | 26 +- backend/app/services/orange_ws_handler.py | 26 +- backend/app/services/suyao_app_service.py | 137 ++++++++++ backend/app/services/suyao_ws_handler.py | 304 ++++++++++++++++++++++ 7 files changed, 503 insertions(+), 18 deletions(-) create mode 100644 backend/app/services/suyao_app_service.py create mode 100644 backend/app/services/suyao_ws_handler.py diff --git a/backend/app/api/agent_chat.py b/backend/app/api/agent_chat.py index 23c281c..2edeb0f 100644 --- a/backend/app/api/agent_chat.py +++ b/backend/app/api/agent_chat.py @@ -26,6 +26,7 @@ from app.agent_runtime import ( AgentLLMConfig, AgentToolConfig, AgentBudgetConfig, + AgentMemoryConfig, AgentStep, AgentOrchestrator, OrchestratorAgentConfig, @@ -293,6 +294,7 @@ async def chat_with_agent( uid = current_user.id mem_scope = f"{uid}:{agent_id}" if uid else str(agent_id) + memory_cfg = _build_memory_config_from_node(agent_node_cfg) config = AgentConfig( name=agent.name, system_prompt=system_prompt, @@ -306,6 +308,7 @@ async def chat_with_agent( include_tools=agent_node_cfg.get("tools", []), exclude_tools=agent_node_cfg.get("exclude_tools", []), ), + memory=memory_cfg, budget=budget, user_id=uid, memory_scope_id=mem_scope, @@ -360,6 +363,7 @@ async def chat_with_agent_stream( uid = current_user.id mem_scope = f"{uid}:{agent_id}" if uid else str(agent_id) + memory_cfg = _build_memory_config_from_node(agent_node_cfg) config = AgentConfig( name=agent.name, system_prompt=system_prompt, @@ -373,6 +377,7 @@ async def chat_with_agent_stream( include_tools=agent_node_cfg.get("tools", []), exclude_tools=agent_node_cfg.get("exclude_tools", []), ), + memory=memory_cfg, budget=budget, user_id=uid, memory_scope_id=mem_scope, @@ -400,3 +405,14 @@ def _find_agent_node_config(nodes: list) -> Dict[str, Any]: if typ in ("agent", "llm", "template"): return node.get("data") or {} return {} + + +def _build_memory_config_from_node(agent_node_cfg: dict) -> AgentMemoryConfig: + """从 Agent 工作流节点配置中提取记忆配置。""" + return AgentMemoryConfig( + max_history_messages=int(agent_node_cfg.get("memory_max_history", 20)), + vector_memory_top_k=int(agent_node_cfg.get("memory_vector_top_k", 5)), + persist_to_db=bool(agent_node_cfg.get("memory_persist", True)), + vector_memory_enabled=bool(agent_node_cfg.get("memory_vector_enabled", True)), + learning_enabled=bool(agent_node_cfg.get("memory_learning", True)), + ) diff --git a/backend/app/core/config.py b/backend/app/core/config.py index 43eb016..c6e8874 100644 --- a/backend/app/core/config.py +++ b/backend/app/core/config.py @@ -95,6 +95,11 @@ class Settings(BaseSettings): ORANGE_APP_SECRET: str = "" ORANGE_AGENT_ID: str = "" # 创建橙子助手后写入 + # 苏瑶飞书应用配置(独立 WS 连接,直接路由到苏瑶 Agent) + SUYAO_APP_ID: str = "" + SUYAO_APP_SECRET: str = "" + SUYAO_AGENT_ID: str = "" # 创建苏瑶后写入 + class Config: env_file = str(_ENV_PATH) case_sensitive = True diff --git a/backend/app/main.py b/backend/app/main.py index f673d51..6f11b42 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -227,6 +227,13 @@ async def startup_event(): except Exception as e: logger.error(f"橙子长连接启动失败: {e}") + # 启动苏瑶飞书长连接 + try: + from app.services.suyao_ws_handler import start_ws_client as start_suyao_ws + asyncio.ensure_future(start_suyao_ws()) + except Exception as e: + logger.error(f"苏瑶长连接启动失败: {e}") + # 注册路由 from app.api import auth, uploads, workflows, executions, websocket, execution_logs, data_sources, agents, platform_templates, model_configs, webhooks, template_market, batch_operations, collaboration, permissions, monitoring, alert_rules, node_test, node_templates, tools, agent_chat, agent_monitoring, knowledge_base, agent_schedules, notifications, feishu_bind diff --git a/backend/app/services/feishu_ws_handler.py b/backend/app/services/feishu_ws_handler.py index 449b5a9..c2f443e 100644 --- a/backend/app/services/feishu_ws_handler.py +++ b/backend/app/services/feishu_ws_handler.py @@ -153,7 +153,7 @@ async def _handle_message_async(data): _reply_to_feishu(open_id, f"🤔 正在思考,请稍候...") - from app.agent_runtime import AgentRuntime, AgentConfig, AgentLLMConfig, AgentToolConfig + from app.agent_runtime import AgentRuntime, AgentConfig, AgentLLMConfig, AgentToolConfig, AgentMemoryConfig wc = agent.workflow_config or {} nodes = wc.get("nodes", []) @@ -164,14 +164,15 @@ async def _handle_message_async(data): max_iterations = 10 for n in nodes: - cfg = n.get("config", {}) if isinstance(n, dict) else getattr(n, "config", {}) - if cfg.get("type") in ("agent", "llm"): - system_prompt = cfg.get("system_prompt", "") or system_prompt - model = cfg.get("model", model) - provider = cfg.get("provider", provider) - temperature = float(cfg.get("temperature", temperature)) - max_iterations = int(cfg.get("max_iterations", max_iterations)) - break + if n.get("type") not in ("agent", "llm", "template"): + continue + cfg = n.get("data", {}) if isinstance(n, dict) else getattr(n, "data", {}) + system_prompt = cfg.get("system_prompt", "") or system_prompt + model = cfg.get("model", model) + provider = cfg.get("provider", provider) + temperature = float(cfg.get("temperature", temperature)) + max_iterations = int(cfg.get("max_iterations", max_iterations)) + break config = AgentConfig( name=agent.name or "agent", @@ -183,6 +184,13 @@ async def _handle_message_async(data): max_iterations=max_iterations, ), tools=AgentToolConfig(), + memory=AgentMemoryConfig( + max_history_messages=int(cfg.get("memory_max_history", 20)), + vector_memory_top_k=int(cfg.get("memory_vector_top_k", 5)), + persist_to_db=bool(cfg.get("memory_persist", True)), + vector_memory_enabled=bool(cfg.get("memory_vector_enabled", True)), + learning_enabled=bool(cfg.get("memory_learning", True)), + ), user_id=user.id, memory_scope_id=str(agent.id), ) diff --git a/backend/app/services/orange_ws_handler.py b/backend/app/services/orange_ws_handler.py index 954c4e1..2752248 100644 --- a/backend/app/services/orange_ws_handler.py +++ b/backend/app/services/orange_ws_handler.py @@ -157,7 +157,7 @@ async def _handle_message_async(data): _reply_to_feishu(open_id, "🤔 正在思考,请稍候...") - from app.agent_runtime import AgentRuntime, AgentConfig, AgentLLMConfig, AgentToolConfig + from app.agent_runtime import AgentRuntime, AgentConfig, AgentLLMConfig, AgentToolConfig, AgentMemoryConfig wc = agent.workflow_config or {} nodes = wc.get("nodes", []) @@ -168,14 +168,15 @@ async def _handle_message_async(data): max_iterations = 10 for n in nodes: - cfg = n.get("config", {}) if isinstance(n, dict) else getattr(n, "config", {}) - if cfg.get("type") in ("agent", "llm"): - system_prompt = cfg.get("system_prompt", "") or system_prompt - model = cfg.get("model", model) - provider = cfg.get("provider", provider) - temperature = float(cfg.get("temperature", temperature)) - max_iterations = int(cfg.get("max_iterations", max_iterations)) - break + if n.get("type") not in ("agent", "llm", "template"): + continue + cfg = n.get("data", {}) if isinstance(n, dict) else getattr(n, "data", {}) + system_prompt = cfg.get("system_prompt", "") or system_prompt + model = cfg.get("model", model) + provider = cfg.get("provider", provider) + temperature = float(cfg.get("temperature", temperature)) + max_iterations = int(cfg.get("max_iterations", max_iterations)) + break config = AgentConfig( name=agent.name or "橙子助手", @@ -187,6 +188,13 @@ async def _handle_message_async(data): max_iterations=max_iterations, ), tools=AgentToolConfig(), + memory=AgentMemoryConfig( + max_history_messages=int(cfg.get("memory_max_history", 20)), + vector_memory_top_k=int(cfg.get("memory_vector_top_k", 5)), + persist_to_db=bool(cfg.get("memory_persist", True)), + vector_memory_enabled=bool(cfg.get("memory_vector_enabled", True)), + learning_enabled=bool(cfg.get("memory_learning", True)), + ), user_id=None, memory_scope_id=str(agent.id), ) diff --git a/backend/app/services/suyao_app_service.py b/backend/app/services/suyao_app_service.py new file mode 100644 index 0000000..02c0fab --- /dev/null +++ b/backend/app/services/suyao_app_service.py @@ -0,0 +1,137 @@ +"""苏瑶飞书应用 API 服务 — 通过苏瑶应用发送消息到用户""" +from __future__ import annotations + +import json +import logging +import time +from typing import Optional + +import httpx + +from app.core.config import settings + +logger = logging.getLogger(__name__) + +# Token 缓存(tenant_access_token 有效期 2 小时,提前 5 分钟刷新) +_token_cache: dict = {"token": None, "expires_at": 0} + + +def _get_tenant_access_token() -> Optional[str]: + """获取苏瑶应用的 tenant_access_token(带缓存)。""" + now = time.time() + if _token_cache["token"] and now < _token_cache["expires_at"] - 300: + return _token_cache["token"] + + app_id = settings.SUYAO_APP_ID + app_secret = settings.SUYAO_APP_SECRET + if not app_id or not app_secret: + logger.warning("苏瑶应用未配置(SUYAO_APP_ID / SUYAO_APP_SECRET)") + return None + + try: + with httpx.Client(timeout=10) as client: + resp = client.post( + "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal", + json={"app_id": app_id, "app_secret": app_secret}, + ) + result = resp.json() + if resp.is_success and result.get("code") == 0: + token = result["tenant_access_token"] + expire = result.get("expire", 7200) + _token_cache["token"] = token + _token_cache["expires_at"] = now + expire + logger.info("苏瑶 tenant_access_token 获取成功") + return token + else: + logger.warning("苏瑶 token 获取失败: %s", result) + return None + except Exception as e: + logger.warning("苏瑶 token 获取异常: %s", e) + return None + + +def send_message_to_user( + open_id: str, + title: str, + content: str, + status: str = "info", + detail_link: Optional[str] = None, +) -> bool: + """通过苏瑶应用向用户发送消息卡片。""" + token = _get_tenant_access_token() + if not token: + return False + + color_map = {"success": "green", "failed": "red", "info": "blue"} + color = color_map.get(status, "blue") + + elements = [ + {"tag": "markdown", "content": content}, + ] + if detail_link: + elements.append({ + "tag": "action", + "actions": [ + { + "tag": "button", + "text": {"tag": "plain_text", "content": "查看详情"}, + "url": detail_link, + "type": "default", + } + ], + }) + + card = { + "config": {"wide_screen_mode": True}, + "header": { + "title": {"tag": "plain_text", "content": title}, + "template": color, + }, + "elements": elements, + } + + try: + with httpx.Client(timeout=10) as client: + resp = client.post( + "https://open.feishu.cn/open-apis/im/v1/messages?receive_id_type=open_id", + headers={"Authorization": f"Bearer {token}"}, + json={ + "receive_id": open_id, + "msg_type": "interactive", + "content": json.dumps(card, ensure_ascii=False), + }, + ) + result = resp.json() + if resp.is_success and result.get("code") == 0: + logger.info("苏瑶消息发送成功: open_id=%s title=%s", open_id[:20], title) + return True + else: + logger.warning("苏瑶消息发送失败: code=%s msg=%s", result.get("code"), result.get("msg")) + return False + except Exception as e: + logger.warning("苏瑶消息发送异常: %s", e) + return False + + +def send_plain_text(open_id: str, text: str) -> bool: + """通过苏瑶应用向用户发送纯文本消息。""" + token = _get_tenant_access_token() + if not token: + return False + + try: + with httpx.Client(timeout=10) as client: + resp = client.post( + "https://open.feishu.cn/open-apis/im/v1/messages?receive_id_type=open_id", + headers={"Authorization": f"Bearer {token}"}, + json={ + "receive_id": open_id, + "msg_type": "text", + "content": json.dumps({"text": text}, ensure_ascii=False), + }, + ) + result = resp.json() + return resp.is_success and result.get("code") == 0 + except Exception as e: + logger.warning("苏瑶文本消息发送异常: %s", e) + return False diff --git a/backend/app/services/suyao_ws_handler.py b/backend/app/services/suyao_ws_handler.py new file mode 100644 index 0000000..06de64f --- /dev/null +++ b/backend/app/services/suyao_ws_handler.py @@ -0,0 +1,304 @@ +"""苏瑶飞书长连接 — 固定路由到苏瑶 Agent""" +from __future__ import annotations + +import asyncio +import json +import logging +import threading +from collections import deque +from typing import Optional + +from app.core.config import settings + +logger = logging.getLogger(__name__) + +# 已处理消息 ID 去重(防止 WS 重连导致重复处理) +_processed_msg_ids: deque[str] = deque(maxlen=20) + +_ws_thread: threading.Thread | None = None + + +def _get_message_id(data) -> Optional[str]: + """从消息事件中提取 message_id。""" + try: + ev = data.event + msg = getattr(ev, "message", None) + if msg: + return getattr(msg, "message_id", None) + except Exception: + return None + return None + + +def _get_message_text(data) -> Optional[str]: + """从消息事件中提取纯文本内容。""" + try: + ev = data.event + msg = getattr(ev, "message", None) + if not msg: + return None + content_str = getattr(msg, "content", None) + msg_type = getattr(msg, "message_type", "") + if not content_str: + return None + + if msg_type == "text": + parsed = json.loads(content_str) + return parsed.get("text", "") + return None + except Exception as e: + logger.warning("解析苏瑶消息内容失败: %s", e) + return None + + +def _get_sender_open_id(data) -> Optional[str]: + """从消息事件中提取发送者 open_id。""" + try: + ev = data.event + sender = getattr(ev, "sender", None) + if not sender: + return None + sender_id = getattr(sender, "sender_id", None) + if not sender_id: + return None + return getattr(sender_id, "open_id", None) + except Exception: + return None + + +def _get_chat_type(data) -> str: + """获取聊天类型。""" + try: + ev = data.event + msg = getattr(ev, "message", None) + return getattr(msg, "chat_type", "") if msg else "" + except Exception: + return "" + + +def _reply_to_feishu(open_id: str, text: str): + """通过苏瑶应用回复用户消息。""" + try: + from app.services.suyao_app_service import send_plain_text + send_plain_text(open_id, text) + except Exception as e: + logger.warning("苏瑶回复消息失败: %s", e) + + +def _reply_card(open_id: str, title: str, content: str, status: str = "info"): + """通过苏瑶应用回复卡片消息。""" + try: + from app.services.suyao_app_service import send_message_to_user + send_message_to_user(open_id, title, content, status=status) + except Exception as e: + logger.warning("苏瑶回复卡片失败: %s", e) + + +def _make_llm_logger(db, agent_id: Optional[str] = None, user_id: Optional[str] = None): + """创建 LLM 调用日志回调。""" + def _log(metrics: dict): + try: + from app.models.agent_llm_log import AgentLLMLog + log = AgentLLMLog( + agent_id=agent_id, + session_id=metrics.get("session_id"), + user_id=user_id, + model=metrics.get("model", ""), + provider=metrics.get("provider"), + prompt_tokens=metrics.get("prompt_tokens", 0), + completion_tokens=metrics.get("completion_tokens", 0), + total_tokens=metrics.get("total_tokens", 0), + latency_ms=metrics.get("latency_ms", 0), + iteration_number=metrics.get("iteration_number", 0), + step_type=metrics.get("step_type"), + tool_name=metrics.get("tool_name"), + status=metrics.get("status", "success"), + error_message=metrics.get("error_message"), + ) + db.add(log) + db.commit() + except Exception as e: + logger.warning("写入 AgentLLMLog 失败: %s", e) + return _log + + +async def _handle_message_async(data): + """异步处理苏瑶消息 — 固定使用苏瑶 Agent。""" + open_id = _get_sender_open_id(data) + chat_type = _get_chat_type(data) + text = _get_message_text(data) + + if not open_id or chat_type != "p2p": + return + + logger.info("苏瑶收到消息: open_id=%s text=%s", open_id[:20], text[:50] if text else "(空)") + + if not text: + return + + from sqlalchemy.orm import Session + from app.core.database import SessionLocal + from app.models.agent import Agent + + db: Optional[Session] = None + try: + db = SessionLocal() + + # 固定使用苏瑶 Agent + agent_id = settings.SUYAO_AGENT_ID + if not agent_id: + _reply_to_feishu(open_id, "苏瑶尚未配置,请联系管理员。") + return + + agent = db.query(Agent).filter(Agent.id == agent_id).first() + if not agent: + _reply_to_feishu(open_id, "苏瑶 Agent 已不存在,请联系管理员。") + return + + _reply_to_feishu(open_id, "🤔 正在思考,请稍候...") + + from app.agent_runtime import AgentRuntime, AgentConfig, AgentLLMConfig, AgentToolConfig, AgentMemoryConfig + + wc = agent.workflow_config or {} + nodes = wc.get("nodes", []) + system_prompt = agent.description or "" + model = "deepseek-v4-flash" + provider = "deepseek" + temperature = 0.85 + max_iterations = 15 + + for n in nodes: + if n.get("type") not in ("agent", "llm", "template"): + continue + cfg = n.get("data", {}) if isinstance(n, dict) else getattr(n, "data", {}) + system_prompt = cfg.get("system_prompt", "") or system_prompt + model = cfg.get("model", model) + provider = cfg.get("provider", provider) + temperature = float(cfg.get("temperature", temperature)) + max_iterations = int(cfg.get("max_iterations", max_iterations)) + break + + config = AgentConfig( + name=agent.name or "苏瑶", + system_prompt=system_prompt, + llm=AgentLLMConfig( + model=model, + provider=provider, + temperature=temperature, + max_iterations=max_iterations, + ), + tools=AgentToolConfig(), + memory=AgentMemoryConfig( + max_history_messages=int(cfg.get("memory_max_history", 20)), + vector_memory_top_k=int(cfg.get("memory_vector_top_k", 5)), + persist_to_db=bool(cfg.get("memory_persist", True)), + vector_memory_enabled=bool(cfg.get("memory_vector_enabled", True)), + learning_enabled=bool(cfg.get("memory_learning", True)), + ), + user_id=None, + memory_scope_id=str(agent.id), + ) + + on_llm_call = _make_llm_logger(db, agent_id=str(agent.id)) + runtime = AgentRuntime(config=config, on_llm_call=on_llm_call) + result = await runtime.run(text) + + if result.content: + _reply_card(open_id, f"{agent.name}", result.content.strip(), status="success") + else: + _reply_to_feishu(open_id, "Agent 未返回有效回复,请重试。") + + logger.info( + "苏瑶 Agent 回复完成: open_id=%s agent=%s iterations=%d tools=%d", + open_id[:20], agent.name, result.iterations_used, result.tool_calls_made, + ) + + except Exception as e: + logger.error("苏瑶消息处理失败: %s", e) + try: + _reply_to_feishu(open_id, f"处理失败: {e!s}") + except Exception: + pass + finally: + if db: + db.close() + + +def _handle_message_internal(data): + """同步入口 — 创建异步任务处理苏瑶消息。""" + # 去重 + msg_id = _get_message_id(data) + if msg_id: + if msg_id in _processed_msg_ids: + logger.debug("苏瑶跳过已处理消息: %s", msg_id) + return + _processed_msg_ids.append(msg_id) + + open_id = _get_sender_open_id(data) + chat_type = _get_chat_type(data) + text = _get_message_text(data) + + if not open_id or chat_type != "p2p" or not text: + return + + logger.info("苏瑶收到消息: open_id=%s text=%s", open_id[:20], text[:50] if text else "(空)") + + try: + loop = asyncio.get_event_loop() + if loop.is_running(): + asyncio.ensure_future(_handle_message_async(data)) + else: + loop.run_until_complete(_handle_message_async(data)) + except Exception as e: + logger.error("苏瑶创建消息处理任务失败: %s", e) + try: + _reply_to_feishu(open_id, f"处理失败: {e!s}") + except Exception: + pass + + +def _build_event_handler(): + """构建苏瑶事件处理器。""" + from lark_oapi.event.dispatcher_handler import EventDispatcherHandler + + def on_message_receive(data): + _handle_message_internal(data) + + builder = EventDispatcherHandler.builder( + encrypt_key="", + verification_token="", + ) + builder.register_p2_im_message_receive_v1(on_message_receive) + return builder.build() + + +async def start_ws_client(): + """在 async 上下文中启动苏瑶飞书长连接(在主事件循环运行)。""" + if not settings.SUYAO_APP_ID or not settings.SUYAO_APP_SECRET: + logger.warning("苏瑶应用未配置,跳过苏瑶长连接启动") + return + + from lark_oapi.ws import Client as WSClient + + handler = _build_event_handler() + client = WSClient( + app_id=settings.SUYAO_APP_ID, + app_secret=settings.SUYAO_APP_SECRET, + event_handler=handler, + auto_reconnect=True, + ) + + logger.info("苏瑶长连接客户端启动中...") + + while True: + try: + await client._connect() + logger.info("苏瑶长连接已建立") + ping_task = asyncio.ensure_future(client._ping_loop()) + while True: + await asyncio.sleep(3600) + except asyncio.CancelledError: + break + except Exception as e: + logger.warning("苏瑶长连接断开,3秒后重连: %s", e) + await asyncio.sleep(3)