Files
aiagent/backend/app/services/tiantian_ws_handler.py
renjianbo f33bc461ff fix: resolve Feishu cross-app notification routing bug
Implement per-app open_id storage via user_feishu_open_ids table with
union_id-based cross-app user identification. WS handlers now auto-capture
open_id+union_id and resolve/associate user accounts. Schedule notifications
route through the correct bot's open_id instead of always falling back to 苹果.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-06 00:36:40 +08:00

303 lines
9.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""甜甜飞书长连接 — 固定路由到苏瑶3号 Agent知识图谱+RAG 版)"""
from __future__ import annotations
import asyncio
import json
import logging
from collections import deque
from typing import Optional
from app.core.config import settings
logger = logging.getLogger(__name__)
_processed_msg_ids: deque[str] = deque(maxlen=20)
def _get_message_id(data) -> Optional[str]:
try:
ev = data.event
msg = getattr(ev, "message", None)
if msg:
return getattr(msg, "message_id", None)
except Exception:
return None
return None
def _get_message_text(data) -> Optional[str]:
try:
ev = data.event
msg = getattr(ev, "message", None)
if not msg:
return None
content_str = getattr(msg, "content", None)
msg_type = getattr(msg, "message_type", "")
if not content_str:
return None
if msg_type == "text":
parsed = json.loads(content_str)
return parsed.get("text", "")
return None
except Exception as e:
logger.warning("解析甜甜消息内容失败: %s", e)
return None
def _get_sender_open_id(data) -> Optional[str]:
try:
ev = data.event
sender = getattr(ev, "sender", None)
if not sender:
return None
sender_id = getattr(sender, "sender_id", None)
if not sender_id:
return None
return getattr(sender_id, "open_id", None)
except Exception:
return None
def _get_sender_union_id(data) -> Optional[str]:
try:
ev = data.event
sender = getattr(ev, "sender", None)
if not sender:
return None
sender_id = getattr(sender, "sender_id", None)
if not sender_id:
return None
return getattr(sender_id, "union_id", None)
except Exception:
return None
def _get_chat_type(data) -> str:
try:
ev = data.event
msg = getattr(ev, "message", None)
return getattr(msg, "chat_type", "") if msg else ""
except Exception:
return ""
def _reply_to_feishu(open_id: str, text: str):
try:
from app.services.tiantian_app_service import send_plain_text
send_plain_text(open_id, text)
except Exception as e:
logger.warning("甜甜回复消息失败: %s", e)
def _reply_card(open_id: str, title: str, content: str, status: str = "info"):
try:
from app.services.tiantian_app_service import send_message_to_user
send_message_to_user(open_id, title, content, status=status)
except Exception as e:
logger.warning("甜甜回复卡片失败: %s", e)
def _make_llm_logger(db, agent_id: Optional[str] = None, user_id: Optional[str] = None):
def _log(metrics: dict):
try:
from app.models.agent_llm_log import AgentLLMLog
log = AgentLLMLog(
agent_id=agent_id, session_id=metrics.get("session_id"),
user_id=user_id, model=metrics.get("model", ""),
provider=metrics.get("provider"),
prompt_tokens=metrics.get("prompt_tokens", 0),
completion_tokens=metrics.get("completion_tokens", 0),
total_tokens=metrics.get("total_tokens", 0),
latency_ms=metrics.get("latency_ms", 0),
iteration_number=metrics.get("iteration_number", 0),
step_type=metrics.get("step_type"),
tool_name=metrics.get("tool_name"),
status=metrics.get("status", "success"),
error_message=metrics.get("error_message"),
)
db.add(log)
db.commit()
except Exception as e:
logger.warning("写入 AgentLLMLog 失败: %s", e)
return _log
async def _handle_message_async(data):
open_id = _get_sender_open_id(data)
union_id = _get_sender_union_id(data)
chat_type = _get_chat_type(data)
text = _get_message_text(data)
if not open_id or chat_type != "p2p":
return
logger.info("甜甜收到消息: open_id=%s text=%s", open_id[:20], text[:50] if text else "(空)")
if not text:
return
from sqlalchemy.orm import Session
from app.core.database import SessionLocal
from app.models.agent import Agent
from app.services.feishu_open_id_service import resolve_user_and_save
db: Optional[Session] = None
try:
db = SessionLocal()
# 自动保存/关联此应用的 open_id跨应用识别
resolved_uid = resolve_user_and_save(
db, app_id=settings.TIANTIAN_APP_ID or "",
open_id=open_id, union_id=union_id,
)
agent_id = settings.TIANTIAN_AGENT_ID
if not agent_id:
_reply_to_feishu(open_id, "甜甜尚未配置,请联系管理员。")
return
agent = db.query(Agent).filter(Agent.id == agent_id).first()
if not agent:
_reply_to_feishu(open_id, "甜甜 Agent 已不存在,请联系管理员。")
return
_reply_to_feishu(open_id, "正在思考,请稍候...")
from app.agent_runtime import AgentRuntime, AgentConfig, AgentLLMConfig, AgentToolConfig, AgentMemoryConfig
wc = agent.workflow_config or {}
nodes = wc.get("nodes", [])
system_prompt = agent.description or ""
model = "deepseek-v4-flash"
provider = "deepseek"
temperature = 0.85
max_iterations = 15
for n in nodes:
if n.get("type") not in ("agent", "llm", "template"):
continue
cfg = n.get("data", {}) if isinstance(n, dict) else getattr(n, "data", {})
system_prompt = cfg.get("system_prompt", "") or system_prompt
model = cfg.get("model", model)
provider = cfg.get("provider", provider)
temperature = float(cfg.get("temperature", temperature))
max_iterations = int(cfg.get("max_iterations", max_iterations))
break
config = AgentConfig(
name=agent.name or "甜甜",
system_prompt=system_prompt + (
f"\n\n## 系统信息\n"
f"你的 Agent ID 是: {agent.id}\n"
f"在调用 schedule_list、schedule_delete 等工具时,使用此 ID 作为 agent_id 参数。"
),
llm=AgentLLMConfig(
model=model, provider=provider,
temperature=temperature, max_iterations=max_iterations,
),
tools=AgentToolConfig(),
memory=AgentMemoryConfig(
max_history_messages=int(cfg.get("memory_max_history", 20)),
vector_memory_top_k=int(cfg.get("memory_vector_top_k", 5)),
persist_to_db=bool(cfg.get("memory_persist", True)),
vector_memory_enabled=bool(cfg.get("memory_vector_enabled", True)),
learning_enabled=bool(cfg.get("memory_learning", True)),
),
user_id=resolved_uid,
memory_scope_id=str(agent.id),
)
on_llm_call = _make_llm_logger(db, agent_id=str(agent.id))
runtime = AgentRuntime(config=config, on_llm_call=on_llm_call)
result = await runtime.run(text)
if result.content:
_reply_card(open_id, f"{agent.name}", result.content.strip(), status="success")
else:
_reply_to_feishu(open_id, "Agent 未返回有效回复,请重试。")
logger.info(
"甜甜 Agent 回复完成: open_id=%s agent=%s iterations=%d tools=%d",
open_id[:20], agent.name, result.iterations_used, result.tool_calls_made,
)
except Exception as e:
logger.error("甜甜消息处理失败: %s", e)
try:
_reply_to_feishu(open_id, f"处理失败: {e!s}")
except Exception:
pass
finally:
if db:
db.close()
def _handle_message_internal(data):
msg_id = _get_message_id(data)
if msg_id:
if msg_id in _processed_msg_ids:
return
_processed_msg_ids.append(msg_id)
open_id = _get_sender_open_id(data)
chat_type = _get_chat_type(data)
text = _get_message_text(data)
if not open_id or chat_type != "p2p" or not text:
return
try:
loop = asyncio.get_event_loop()
if loop.is_running():
asyncio.ensure_future(_handle_message_async(data))
else:
loop.run_until_complete(_handle_message_async(data))
except Exception as e:
logger.error("甜甜创建消息处理任务失败: %s", e)
try:
_reply_to_feishu(open_id, f"处理失败: {e!s}")
except Exception:
pass
def _build_event_handler():
from lark_oapi.event.dispatcher_handler import EventDispatcherHandler
def on_message_receive(data):
_handle_message_internal(data)
builder = EventDispatcherHandler.builder(encrypt_key="", verification_token="")
builder.register_p2_im_message_receive_v1(on_message_receive)
return builder.build()
async def start_ws_client():
if not settings.TIANTIAN_APP_ID or not settings.TIANTIAN_APP_SECRET:
logger.warning("甜甜应用未配置,跳过甜甜长连接启动")
return
from lark_oapi.ws import Client as WSClient
handler = _build_event_handler()
client = WSClient(
app_id=settings.TIANTIAN_APP_ID,
app_secret=settings.TIANTIAN_APP_SECRET,
event_handler=handler,
auto_reconnect=True,
)
logger.info("甜甜长连接客户端启动中...")
while True:
try:
await client._connect()
logger.info("甜甜长连接已建立")
asyncio.ensure_future(client._ping_loop())
while True:
await asyncio.sleep(3600)
except asyncio.CancelledError:
break
except Exception as e:
logger.warning("甜甜长连接断开3秒后重连: %s", e)
await asyncio.sleep(3)