Files
aiagent/backend/app/services/renshenguo2_ws_handler.py
renjianbo 66d52ad020 feat: 人参果/人参果1号 飞书图片解析支持
在飞书 WS handler 中新增图片消息识别与下载:
- _get_image_key: 检测飞书 image 类型消息,提取 image_key
- download_image_from_feishu: 调用飞书 API 下载图片二进制
- 图片保存到 agent_workspaces/{agent_id}/images/ 下
- Agent 自动调用 image_ocr + image_vision 分析后回复用户

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-07 08:02:14 +08:00

368 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""人参果1号飞书长连接 — 固定路由到 AI学习助手 Agent行为约束版禁止主动消息"""
from __future__ import annotations
import asyncio
import json
import logging
from collections import deque
from typing import Optional
from app.core.config import settings
logger = logging.getLogger(__name__)
_processed_msg_ids: deque[str] = deque(maxlen=20)
def _get_message_id(data) -> Optional[str]:
try:
ev = data.event
msg = getattr(ev, "message", None)
if msg:
return getattr(msg, "message_id", None)
except Exception:
return None
return None
def _get_message_text(data) -> Optional[str]:
try:
ev = data.event
msg = getattr(ev, "message", None)
if not msg:
return None
content_str = getattr(msg, "content", None)
msg_type = getattr(msg, "message_type", "")
if not content_str:
return None
if msg_type == "text":
parsed = json.loads(content_str)
return parsed.get("text", "")
return None
except Exception as e:
logger.warning("解析人参果1号消息内容失败: %s", e)
return None
def _get_image_key(data) -> Optional[str]:
"""从图片消息中提取 image_key返回 None 表示不是图片消息。"""
try:
ev = data.event
msg = getattr(ev, "message", None)
if not msg:
return None
msg_type = getattr(msg, "message_type", "")
if msg_type != "image":
return None
content_str = getattr(msg, "content", None)
if not content_str:
return None
parsed = json.loads(content_str)
return parsed.get("image_key", None)
except Exception:
return None
def _get_sender_open_id(data) -> Optional[str]:
try:
ev = data.event
sender = getattr(ev, "sender", None)
if not sender:
return None
sender_id = getattr(sender, "sender_id", None)
if not sender_id:
return None
return getattr(sender_id, "open_id", None)
except Exception:
return None
def _get_sender_union_id(data) -> Optional[str]:
try:
ev = data.event
sender = getattr(ev, "sender", None)
if not sender:
return None
sender_id = getattr(sender, "sender_id", None)
if not sender_id:
return None
return getattr(sender_id, "union_id", None)
except Exception:
return None
def _get_chat_type(data) -> str:
try:
ev = data.event
msg = getattr(ev, "message", None)
return getattr(msg, "chat_type", "") if msg else ""
except Exception:
return ""
def _reply_to_feishu(open_id: str, text: str):
try:
from app.services.renshenguo2_app_service import send_plain_text
send_plain_text(open_id, text)
except Exception as e:
logger.warning("人参果1号回复消息失败: %s", e)
def _reply_card(open_id: str, title: str, content: str, status: str = "info"):
try:
from app.services.renshenguo2_app_service import send_message_to_user
send_message_to_user(open_id, title, content, status=status)
except Exception as e:
logger.warning("人参果1号回复卡片失败: %s", e)
def _make_llm_logger(db, agent_id: Optional[str] = None, user_id: Optional[str] = None):
def _log(metrics: dict):
try:
from app.models.agent_llm_log import AgentLLMLog
log = AgentLLMLog(
agent_id=agent_id, session_id=metrics.get("session_id"),
user_id=user_id, model=metrics.get("model", ""),
provider=metrics.get("provider"),
prompt_tokens=metrics.get("prompt_tokens", 0),
completion_tokens=metrics.get("completion_tokens", 0),
total_tokens=metrics.get("total_tokens", 0),
latency_ms=metrics.get("latency_ms", 0),
iteration_number=metrics.get("iteration_number", 0),
step_type=metrics.get("step_type"),
tool_name=metrics.get("tool_name"),
status=metrics.get("status", "success"),
error_message=metrics.get("error_message"),
)
db.add(log)
db.commit()
except Exception as e:
logger.warning("写入 AgentLLMLog 失败: %s", e)
return _log
async def _handle_message_async(data):
open_id = _get_sender_open_id(data)
union_id = _get_sender_union_id(data)
chat_type = _get_chat_type(data)
text = _get_message_text(data)
image_key = _get_image_key(data)
message_id = _get_message_id(data)
if not open_id or chat_type != "p2p":
return
if text:
logger.info("人参果1号收到文本消息: open_id=%s text=%s", open_id[:20], text[:80])
elif image_key and message_id:
logger.info("人参果1号收到图片消息: open_id=%s image_key=%s", open_id[:20], image_key[:30])
else:
return
from sqlalchemy.orm import Session
from app.core.database import SessionLocal
from app.models.agent import Agent
from app.services.feishu_open_id_service import resolve_user_and_save
db: Optional[Session] = None
try:
db = SessionLocal()
# 自动保存/关联此应用的 open_id跨应用识别
resolved_uid = resolve_user_and_save(
db, app_id=settings.RENSHENGUO2_APP_ID or "",
open_id=open_id, union_id=union_id,
)
agent_id = settings.RENSHENGUO2_AGENT_ID
if not agent_id:
_reply_to_feishu(open_id, "人参果1号尚未配置请联系管理员。")
return
agent = db.query(Agent).filter(Agent.id == agent_id).first()
if not agent:
_reply_to_feishu(open_id, "人参果1号 Agent 已不存在,请联系管理员。")
return
_reply_to_feishu(open_id, "正在思考,请稍候...")
# ─── 图片消息:下载飞书图片到工作区 ───────────────────
image_attachment_path = None
if image_key and message_id and not text:
from app.services.renshenguo2_app_service import download_image_from_feishu
from app.services.builtin_tools import _local_file_workspace_root
image_bytes = download_image_from_feishu(message_id, image_key)
if image_bytes:
ws_root = _local_file_workspace_root()
uploads_dir = ws_root / "agent_workspaces" / str(agent.id) / "images"
uploads_dir.mkdir(parents=True, exist_ok=True)
import time as _time
image_filename = f"feishu_img_{_time.strftime('%Y%m%d_%H%M%S')}.jpg"
image_path = uploads_dir / image_filename
image_path.write_bytes(image_bytes)
image_attachment_path = str(image_path.relative_to(ws_root)).replace("\\", "/")
text = (
f"用户通过飞书发来了一张图片。"
f"图片已保存到工作区路径: {image_attachment_path}\n"
f"请先用 image_ocr 提取图片中的文字,"
f"如果文字不足以理解内容,再用 image_vision 分析图片,"
f"然后根据分析结果回复用户。"
)
logger.info("人参果1号图片已保存: %s (%d bytes)", image_attachment_path, len(image_bytes))
else:
_reply_to_feishu(open_id, "图片下载失败,请稍后重试。")
return
from app.agent_runtime import AgentRuntime, AgentConfig, AgentLLMConfig, AgentToolConfig, AgentMemoryConfig
wc = agent.workflow_config or {}
nodes = wc.get("nodes", [])
system_prompt = agent.description or ""
model = "deepseek-v4-flash"
provider = "deepseek"
temperature = 0.7
max_iterations = 15
tools_whitelist = []
for n in nodes:
if n.get("type") not in ("agent", "llm", "template"):
continue
cfg = n.get("data", {}) if isinstance(n, dict) else getattr(n, "data", {})
system_prompt = cfg.get("system_prompt", "") or system_prompt
model = cfg.get("model", model)
provider = cfg.get("provider", provider)
temperature = float(cfg.get("temperature", temperature))
max_iterations = int(cfg.get("max_iterations", max_iterations))
tools_whitelist = cfg.get("tools", tools_whitelist)
break
# 人参果1号在 system_prompt 末尾强制注入行为约束
# 无论 DB 中的 system_prompt 如何配置,这些规则始终生效
behavior_rules = (
f"\n\n## 系统信息\n"
f"你的 Agent ID 是: {agent.id}\n"
f"在调用 schedule_list、schedule_delete 等工具时,使用此 ID 作为 agent_id 参数。"
f"\n\n## 行为约束(必须严格遵守,违反将导致严重问题)\n"
f"1. **被动响应原则**:仅在用户主动发送消息时回复。不得根据时间流逝、用户在线状态或任何隐式信号主动发起对话。用户不说话,你就别说话。\n"
f"2. **单轮回复限制**:对于用户的每条消息,只回复一次。回复完毕后必须停止,不得继续追问、猜测用户状态或发送额外消息。\n"
f"3. **禁止重复关心**:如果上一轮已经表达过关心、提醒或催促(如催睡觉、催休息),不得在用户没有新输入的情况下重复类似内容。\n"
f"4. **禁止猜测用户状态**:不得主动猜测用户的行为、情绪或状态(如\"你是不是睡不着\"\"你是不是又回来了\"\"让我猜猜你现在\")。只回应用户明确提出的问题或话题。\n"
f"5. **简洁直接**回答应简洁、直接、切题。避免过度的寒暄、emoji堆砌、表格罗列猜测和冗余内容。\n"
f"6. **上下文中立**:记忆系统提供的历史信息仅供理解上下文使用,不得用于主动发起话题、回顾\"上次你如何如何\"或过度个性化回应。\n"
f"7. **禁止自言自语**:一次回复结束后,除非用户再次发送消息,否则绝对不要产生任何新输出。"
)
config = AgentConfig(
name=agent.name or "人参果1号",
system_prompt=system_prompt + behavior_rules,
llm=AgentLLMConfig(
model=model, provider=provider,
temperature=temperature, max_iterations=max_iterations,
),
tools=AgentToolConfig(include_tools=tools_whitelist),
memory=AgentMemoryConfig(
max_history_messages=int(cfg.get("memory_max_history", 40)),
vector_memory_top_k=int(cfg.get("memory_vector_top_k", 10)),
persist_to_db=bool(cfg.get("memory_persist", True)),
vector_memory_enabled=bool(cfg.get("memory_vector_enabled", True)),
learning_enabled=bool(cfg.get("memory_learning", True)),
),
user_id=resolved_uid,
memory_scope_id=str(agent.id),
)
on_llm_call = _make_llm_logger(db, agent_id=str(agent.id))
runtime = AgentRuntime(config=config, on_llm_call=on_llm_call)
result = await runtime.run(text)
if result.content:
_reply_card(open_id, f"{agent.name}", result.content.strip(), status="success")
else:
_reply_to_feishu(open_id, "Agent 未返回有效回复,请重试。")
logger.info(
"人参果1号 Agent 回复完成: open_id=%s agent=%s iterations=%d tools=%d",
open_id[:20], agent.name, result.iterations_used, result.tool_calls_made,
)
except Exception as e:
logger.error("人参果1号消息处理失败: %s", e)
try:
_reply_to_feishu(open_id, f"处理失败: {e!s}")
except Exception:
pass
finally:
if db:
db.close()
def _handle_message_internal(data):
msg_id = _get_message_id(data)
if msg_id:
if msg_id in _processed_msg_ids:
return
_processed_msg_ids.append(msg_id)
open_id = _get_sender_open_id(data)
chat_type = _get_chat_type(data)
text = _get_message_text(data)
if not open_id or chat_type != "p2p" or not text:
return
try:
loop = asyncio.get_event_loop()
if loop.is_running():
asyncio.ensure_future(_handle_message_async(data))
else:
loop.run_until_complete(_handle_message_async(data))
except Exception as e:
logger.error("人参果1号创建消息处理任务失败: %s", e)
try:
_reply_to_feishu(open_id, f"处理失败: {e!s}")
except Exception:
pass
def _build_event_handler():
from lark_oapi.event.dispatcher_handler import EventDispatcherHandler
def on_message_receive(data):
_handle_message_internal(data)
builder = EventDispatcherHandler.builder(encrypt_key="", verification_token="")
builder.register_p2_im_message_receive_v1(on_message_receive)
return builder.build()
async def start_ws_client():
if not settings.RENSHENGUO2_APP_ID or not settings.RENSHENGUO2_APP_SECRET:
logger.warning("人参果1号应用未配置跳过人参果1号长连接启动")
return
from lark_oapi.ws import Client as WSClient
handler = _build_event_handler()
client = WSClient(
app_id=settings.RENSHENGUO2_APP_ID,
app_secret=settings.RENSHENGUO2_APP_SECRET,
event_handler=handler,
auto_reconnect=True,
)
logger.info("人参果1号长连接客户端启动中...")
while True:
try:
await client._connect()
logger.info("人参果1号长连接已建立")
asyncio.ensure_future(client._ping_loop())
while True:
await asyncio.sleep(3600)
except asyncio.CancelledError:
break
except Exception as e:
logger.warning("人参果1号长连接断开3秒后重连: %s", e)
await asyncio.sleep(3)