diff --git a/backend/app/core/config.py b/backend/app/core/config.py index c6e8874..e9ad7f0 100644 --- a/backend/app/core/config.py +++ b/backend/app/core/config.py @@ -100,6 +100,11 @@ class Settings(BaseSettings): SUYAO_APP_SECRET: str = "" SUYAO_AGENT_ID: str = "" # 创建苏瑶后写入 + # 甜甜飞书应用配置(独立 WS 连接,路由到苏瑶3号知识图谱 Agent) + TIANTIAN_APP_ID: str = "" + TIANTIAN_APP_SECRET: str = "" + TIANTIAN_AGENT_ID: str = "" # 创建苏瑶3号后写入 + class Config: env_file = str(_ENV_PATH) case_sensitive = True diff --git a/backend/app/main.py b/backend/app/main.py index 6f11b42..f0c8d50 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -234,6 +234,13 @@ async def startup_event(): except Exception as e: logger.error(f"苏瑶长连接启动失败: {e}") + # 启动甜甜飞书长连接(苏瑶3号) + try: + from app.services.tiantian_ws_handler import start_ws_client as start_tiantian_ws + asyncio.ensure_future(start_tiantian_ws()) + except Exception as e: + logger.error(f"甜甜长连接启动失败: {e}") + # 注册路由 from app.api import auth, uploads, workflows, executions, websocket, execution_logs, data_sources, agents, platform_templates, model_configs, webhooks, template_market, batch_operations, collaboration, permissions, monitoring, alert_rules, node_test, node_templates, tools, agent_chat, agent_monitoring, knowledge_base, agent_schedules, notifications, feishu_bind diff --git a/backend/app/services/tiantian_app_service.py b/backend/app/services/tiantian_app_service.py new file mode 100644 index 0000000..a05df3e --- /dev/null +++ b/backend/app/services/tiantian_app_service.py @@ -0,0 +1,105 @@ +"""甜甜飞书应用 API 服务 — 通过甜甜应用发送消息到用户""" +from __future__ import annotations + +import json +import logging +import time +from typing import Optional + +import httpx + +from app.core.config import settings + +logger = logging.getLogger(__name__) + +_token_cache: dict = {"token": None, "expires_at": 0} + + +def _get_tenant_access_token() -> Optional[str]: + now = time.time() + if _token_cache["token"] and now < _token_cache["expires_at"] - 300: + return _token_cache["token"] + + app_id = settings.TIANTIAN_APP_ID + app_secret = settings.TIANTIAN_APP_SECRET + if not app_id or not app_secret: + logger.warning("甜甜应用未配置(TIANTIAN_APP_ID / TIANTIAN_APP_SECRET)") + return None + + try: + with httpx.Client(timeout=10) as client: + resp = client.post( + "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal", + json={"app_id": app_id, "app_secret": app_secret}, + ) + result = resp.json() + if resp.is_success and result.get("code") == 0: + token = result["tenant_access_token"] + expire = result.get("expire", 7200) + _token_cache["token"] = token + _token_cache["expires_at"] = now + expire + logger.info("甜甜 tenant_access_token 获取成功") + return token + else: + logger.warning("甜甜 token 获取失败: %s", result) + return None + except Exception as e: + logger.warning("甜甜 token 获取异常: %s", e) + return None + + +def send_message_to_user( + open_id: str, title: str, content: str, + status: str = "info", detail_link: Optional[str] = None, +) -> bool: + token = _get_tenant_access_token() + if not token: + return False + color_map = {"success": "green", "failed": "red", "info": "blue"} + color = color_map.get(status, "blue") + elements = [{"tag": "markdown", "content": content}] + if detail_link: + elements.append({ + "tag": "action", + "actions": [{"tag": "button", "text": {"tag": "plain_text", "content": "查看详情"}, "url": detail_link, "type": "default"}], + }) + card = { + "config": {"wide_screen_mode": True}, + "header": {"title": {"tag": "plain_text", "content": title}, "template": color}, + "elements": elements, + } + try: + with httpx.Client(timeout=10) as client: + resp = client.post( + "https://open.feishu.cn/open-apis/im/v1/messages?receive_id_type=open_id", + headers={"Authorization": f"Bearer {token}"}, + json={"receive_id": open_id, "msg_type": "interactive", "content": json.dumps(card, ensure_ascii=False)}, + ) + result = resp.json() + if resp.is_success and result.get("code") == 0: + logger.info("甜甜消息发送成功: open_id=%s title=%s", open_id[:20], title) + return True + else: + logger.warning("甜甜消息发送失败: code=%s msg=%s", result.get("code"), result.get("msg")) + return False + except Exception as e: + logger.warning("甜甜消息发送异常: %s", e) + return False + + +def send_plain_text(open_id: str, text: str) -> bool: + token = _get_tenant_access_token() + if not token: + return False + try: + with httpx.Client(timeout=10) as client: + resp = client.post( + "https://open.feishu.cn/open-apis/im/v1/messages?receive_id_type=open_id", + headers={"Authorization": f"Bearer {token}"}, + json={"receive_id": open_id, "msg_type": "text", "content": json.dumps({"text": text}, ensure_ascii=False)}, + ) + result = resp.json() + return resp.is_success and result.get("code") == 0 + except Exception as e: + logger.warning("甜甜文本消息发送异常: %s", e) + return False diff --git a/backend/app/services/tiantian_ws_handler.py b/backend/app/services/tiantian_ws_handler.py new file mode 100644 index 0000000..3089103 --- /dev/null +++ b/backend/app/services/tiantian_ws_handler.py @@ -0,0 +1,276 @@ +"""甜甜飞书长连接 — 固定路由到苏瑶3号 Agent(知识图谱+RAG 版)""" +from __future__ import annotations + +import asyncio +import json +import logging +from collections import deque +from typing import Optional + +from app.core.config import settings + +logger = logging.getLogger(__name__) + +_processed_msg_ids: deque[str] = deque(maxlen=20) + + +def _get_message_id(data) -> Optional[str]: + try: + ev = data.event + msg = getattr(ev, "message", None) + if msg: + return getattr(msg, "message_id", None) + except Exception: + return None + return None + + +def _get_message_text(data) -> Optional[str]: + try: + ev = data.event + msg = getattr(ev, "message", None) + if not msg: + return None + content_str = getattr(msg, "content", None) + msg_type = getattr(msg, "message_type", "") + if not content_str: + return None + if msg_type == "text": + parsed = json.loads(content_str) + return parsed.get("text", "") + return None + except Exception as e: + logger.warning("解析甜甜消息内容失败: %s", e) + return None + + +def _get_sender_open_id(data) -> Optional[str]: + try: + ev = data.event + sender = getattr(ev, "sender", None) + if not sender: + return None + sender_id = getattr(sender, "sender_id", None) + if not sender_id: + return None + return getattr(sender_id, "open_id", None) + except Exception: + return None + + +def _get_chat_type(data) -> str: + try: + ev = data.event + msg = getattr(ev, "message", None) + return getattr(msg, "chat_type", "") if msg else "" + except Exception: + return "" + + +def _reply_to_feishu(open_id: str, text: str): + try: + from app.services.tiantian_app_service import send_plain_text + send_plain_text(open_id, text) + except Exception as e: + logger.warning("甜甜回复消息失败: %s", e) + + +def _reply_card(open_id: str, title: str, content: str, status: str = "info"): + try: + from app.services.tiantian_app_service import send_message_to_user + send_message_to_user(open_id, title, content, status=status) + except Exception as e: + logger.warning("甜甜回复卡片失败: %s", e) + + +def _make_llm_logger(db, agent_id: Optional[str] = None, user_id: Optional[str] = None): + def _log(metrics: dict): + try: + from app.models.agent_llm_log import AgentLLMLog + log = AgentLLMLog( + agent_id=agent_id, session_id=metrics.get("session_id"), + user_id=user_id, model=metrics.get("model", ""), + provider=metrics.get("provider"), + prompt_tokens=metrics.get("prompt_tokens", 0), + completion_tokens=metrics.get("completion_tokens", 0), + total_tokens=metrics.get("total_tokens", 0), + latency_ms=metrics.get("latency_ms", 0), + iteration_number=metrics.get("iteration_number", 0), + step_type=metrics.get("step_type"), + tool_name=metrics.get("tool_name"), + status=metrics.get("status", "success"), + error_message=metrics.get("error_message"), + ) + db.add(log) + db.commit() + except Exception as e: + logger.warning("写入 AgentLLMLog 失败: %s", e) + return _log + + +async def _handle_message_async(data): + open_id = _get_sender_open_id(data) + chat_type = _get_chat_type(data) + text = _get_message_text(data) + + if not open_id or chat_type != "p2p": + return + + logger.info("甜甜收到消息: open_id=%s text=%s", open_id[:20], text[:50] if text else "(空)") + + if not text: + return + + from sqlalchemy.orm import Session + from app.core.database import SessionLocal + from app.models.agent import Agent + + db: Optional[Session] = None + try: + db = SessionLocal() + + agent_id = settings.TIANTIAN_AGENT_ID + if not agent_id: + _reply_to_feishu(open_id, "甜甜尚未配置,请联系管理员。") + return + + agent = db.query(Agent).filter(Agent.id == agent_id).first() + if not agent: + _reply_to_feishu(open_id, "甜甜 Agent 已不存在,请联系管理员。") + return + + _reply_to_feishu(open_id, "正在思考,请稍候...") + + from app.agent_runtime import AgentRuntime, AgentConfig, AgentLLMConfig, AgentToolConfig, AgentMemoryConfig + + wc = agent.workflow_config or {} + nodes = wc.get("nodes", []) + system_prompt = agent.description or "" + model = "deepseek-v4-flash" + provider = "deepseek" + temperature = 0.85 + max_iterations = 15 + + for n in nodes: + if n.get("type") not in ("agent", "llm", "template"): + continue + cfg = n.get("data", {}) if isinstance(n, dict) else getattr(n, "data", {}) + system_prompt = cfg.get("system_prompt", "") or system_prompt + model = cfg.get("model", model) + provider = cfg.get("provider", provider) + temperature = float(cfg.get("temperature", temperature)) + max_iterations = int(cfg.get("max_iterations", max_iterations)) + break + + config = AgentConfig( + name=agent.name or "甜甜", + system_prompt=system_prompt, + llm=AgentLLMConfig( + model=model, provider=provider, + temperature=temperature, max_iterations=max_iterations, + ), + tools=AgentToolConfig(), + memory=AgentMemoryConfig( + max_history_messages=int(cfg.get("memory_max_history", 20)), + vector_memory_top_k=int(cfg.get("memory_vector_top_k", 5)), + persist_to_db=bool(cfg.get("memory_persist", True)), + vector_memory_enabled=bool(cfg.get("memory_vector_enabled", True)), + learning_enabled=bool(cfg.get("memory_learning", True)), + ), + user_id=None, + memory_scope_id=str(agent.id), + ) + + on_llm_call = _make_llm_logger(db, agent_id=str(agent.id)) + runtime = AgentRuntime(config=config, on_llm_call=on_llm_call) + result = await runtime.run(text) + + if result.content: + _reply_card(open_id, f"{agent.name}", result.content.strip(), status="success") + else: + _reply_to_feishu(open_id, "Agent 未返回有效回复,请重试。") + + logger.info( + "甜甜 Agent 回复完成: open_id=%s agent=%s iterations=%d tools=%d", + open_id[:20], agent.name, result.iterations_used, result.tool_calls_made, + ) + + except Exception as e: + logger.error("甜甜消息处理失败: %s", e) + try: + _reply_to_feishu(open_id, f"处理失败: {e!s}") + except Exception: + pass + finally: + if db: + db.close() + + +def _handle_message_internal(data): + msg_id = _get_message_id(data) + if msg_id: + if msg_id in _processed_msg_ids: + return + _processed_msg_ids.append(msg_id) + + open_id = _get_sender_open_id(data) + chat_type = _get_chat_type(data) + text = _get_message_text(data) + + if not open_id or chat_type != "p2p" or not text: + return + + try: + loop = asyncio.get_event_loop() + if loop.is_running(): + asyncio.ensure_future(_handle_message_async(data)) + else: + loop.run_until_complete(_handle_message_async(data)) + except Exception as e: + logger.error("甜甜创建消息处理任务失败: %s", e) + try: + _reply_to_feishu(open_id, f"处理失败: {e!s}") + except Exception: + pass + + +def _build_event_handler(): + from lark_oapi.event.dispatcher_handler import EventDispatcherHandler + + def on_message_receive(data): + _handle_message_internal(data) + + builder = EventDispatcherHandler.builder(encrypt_key="", verification_token="") + builder.register_p2_im_message_receive_v1(on_message_receive) + return builder.build() + + +async def start_ws_client(): + if not settings.TIANTIAN_APP_ID or not settings.TIANTIAN_APP_SECRET: + logger.warning("甜甜应用未配置,跳过甜甜长连接启动") + return + + from lark_oapi.ws import Client as WSClient + + handler = _build_event_handler() + client = WSClient( + app_id=settings.TIANTIAN_APP_ID, + app_secret=settings.TIANTIAN_APP_SECRET, + event_handler=handler, + auto_reconnect=True, + ) + + logger.info("甜甜长连接客户端启动中...") + + while True: + try: + await client._connect() + logger.info("甜甜长连接已建立") + asyncio.ensure_future(client._ping_loop()) + while True: + await asyncio.sleep(3600) + except asyncio.CancelledError: + break + except Exception as e: + logger.warning("甜甜长连接断开,3秒后重连: %s", e) + await asyncio.sleep(3)