Files
aiagent/backend/app/services/feishu_ws_handler.py
renjianbo 9454dee976 feat: complete remaining plan items — all 4 phases fully implemented
- Task API: add execute and retry endpoints
- Agent API: add create-main-agent endpoint and execute with graph/debate/pipeline modes
- Feishu tools: add read_messages, create_sheet, upload_file (54 builtin tools total)
- Feishu events: group @mention handling, approval callback, auto daily reporting
- Feishu app service: add send_plain_text_to_group for group chat replies
- Typed Data Ports: template variable injection {{previous.output.field}} + output schema validation
- GoalDetail.vue: Gantt timeline view + real-time progress polling (10s)
- Autonomy loop: per-goal Celery Beat scheduling via sync_autonomy_schedule_for_goal

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-08 22:36:03 +08:00

664 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""飞书长连接事件监听 — 通过 lark-oapi SDK 建立 WebSocket 接收事件"""
from __future__ import annotations
import asyncio
import json
import logging
import threading
from collections import deque
from typing import List, Optional
from app.core.config import settings
logger = logging.getLogger(__name__)
# 存储通过事件捕获的 open_id与 HTTP 事件回调共用)
_pending_open_ids: List[str] = []
# 已处理消息 ID 去重(防止 WS 重连导致重复处理)
_processed_msg_ids: deque[str] = deque(maxlen=20)
_ws_thread: threading.Thread | None = None
def _get_message_id(data) -> Optional[str]:
"""从 Feishu 消息事件中提取 message_id。"""
try:
ev = data.event
msg = getattr(ev, "message", None)
if msg:
return getattr(msg, "message_id", None)
except Exception:
return None
return None
def _get_message_text(data) -> Optional[str]:
"""从 Feishu 消息事件中提取纯文本内容。"""
try:
ev = data.event
msg = getattr(ev, "message", None)
if not msg:
return None
content_str = getattr(msg, "content", None)
msg_type = getattr(msg, "message_type", "")
if not content_str:
return None
if msg_type == "text":
parsed = json.loads(content_str)
return parsed.get("text", "")
# 其他消息类型暂不支持
return None
except Exception as e:
logger.warning("解析飞书消息内容失败: %s", e)
return None
def _get_sender_open_id(data) -> Optional[str]:
"""从 Feishu 消息事件中提取发送者 open_id。"""
try:
ev = data.event
sender = getattr(ev, "sender", None)
if not sender:
return None
sender_id = getattr(sender, "sender_id", None)
if not sender_id:
return None
return getattr(sender_id, "open_id", None)
except Exception:
return None
def _get_sender_union_id(data) -> Optional[str]:
"""从 Feishu 消息事件中提取发送者 union_id跨应用唯一"""
try:
ev = data.event
sender = getattr(ev, "sender", None)
if not sender:
return None
sender_id = getattr(sender, "sender_id", None)
if not sender_id:
return None
return getattr(sender_id, "union_id", None)
except Exception:
return None
def _get_chat_type(data) -> str:
"""获取聊天类型。"""
try:
ev = data.event
msg = getattr(ev, "message", None)
return getattr(msg, "chat_type", "") if msg else ""
except Exception:
return ""
def _get_chat_id(data) -> Optional[str]:
"""获取群聊 IDchat_type=group 时)。"""
try:
ev = data.event
msg = getattr(ev, "message", None)
return getattr(msg, "chat_id", None) if msg else None
except Exception:
return None
def _get_mentions(data) -> list:
"""从消息事件中提取 @提及 列表。"""
try:
ev = data.event
msg = getattr(ev, "message", None)
if not msg:
return []
mentions = getattr(msg, "mentions", None) or []
return [getattr(m, "id", {}).get("open_id", "") for m in mentions if hasattr(m, "id")]
except Exception:
return []
def _is_bot_mentioned(data) -> bool:
"""检查消息是否 @了当前机器人(通过 settings.BOT_OPEN_ID 或 mention 列表对比)。"""
try:
from app.core.config import settings
bot_open_id = getattr(settings, "BOT_OPEN_ID", None)
if not bot_open_id:
return False
mentions = _get_mentions(data)
return bot_open_id in mentions
except Exception:
return False
def _reply_to_group(chat_id: str, text: str):
"""向群聊发送消息。"""
try:
from app.services.feishu_app_service import send_plain_text_to_group
send_plain_text_to_group(chat_id, text)
except Exception as e:
logger.warning("飞书群聊回复失败: %s", e)
def _reply_to_feishu(open_id: str, text: str):
"""通过飞书 API 回复用户消息。"""
try:
from app.services.feishu_app_service import send_plain_text
send_plain_text(open_id, text)
except Exception as e:
logger.warning("飞书回复消息失败: %s", e)
def _reply_card(open_id: str, title: str, content: str, status: str = "info"):
"""通过飞书 API 回复卡片消息。"""
try:
from app.services.feishu_app_service import send_message_to_user
send_message_to_user(open_id, title, content, status=status)
except Exception as e:
logger.warning("飞书回复卡片失败: %s", e)
async def _handle_goal_creation(db, user_id: str, goal_title: str, open_id: str):
"""从飞书消息中创建 Goal 并异步启动执行。"""
from app.services.goal_service import create_goal
from app.tasks.goal_tasks import execute_goal_task
try:
goal = create_goal(db=db, creator_id=user_id, title=goal_title, priority=5)
_reply_to_feishu(open_id, f"✅ 目标已创建: **{goal.title}**\n正在分解任务并启动执行...")
# 异步执行目标
task = execute_goal_task.delay(str(goal.id))
logger.info("飞书触发 Goal 创建: goal_id=%s celery_task=%s", goal.id, task.id)
# 更新状态
from app.services.goal_service import update_goal
update_goal(db, str(goal.id), status="active")
except Exception as e:
logger.error("飞书 Goal 创建失败: %s", e)
_reply_to_feishu(open_id, f"创建目标失败: {e}")
async def _handle_message_async(data):
"""异步处理飞书消息。"""
open_id = _get_sender_open_id(data)
union_id = _get_sender_union_id(data)
chat_type = _get_chat_type(data)
text = _get_message_text(data)
if not open_id or chat_type != "p2p":
return
_pending_open_ids.append(open_id)
if len(_pending_open_ids) > 5:
_pending_open_ids.pop(0)
logger.info("飞书收到消息: open_id=%s text=%s", open_id[:20], text[:50] if text else "(空)")
try:
with open("/tmp/feishu_open_id.txt", "w") as f:
f.write(open_id)
except Exception:
pass
if not text:
return
from sqlalchemy.orm import Session
from app.core.database import SessionLocal
from app.models.user import User
from app.models.agent import Agent
from app.services.feishu_open_id_service import resolve_user_and_save
db: Optional[Session] = None
try:
db = SessionLocal()
# 自动保存/关联此应用的 open_id跨应用识别
resolved_uid = resolve_user_and_save(
db, app_id=settings.FEISHU_APP_ID or "",
open_id=open_id, union_id=union_id,
)
user = db.query(User).filter(User.feishu_open_id == open_id).first()
if not user and resolved_uid:
user = db.query(User).filter(User.id == resolved_uid).first()
if not user:
_reply_to_feishu(open_id, "你的账号未绑定平台用户,请先在平台绑定飞书。")
return
agent_id = user.feishu_default_agent_id
if not agent_id:
_reply_to_feishu(
open_id,
"你还没有设置飞书对话的默认 Agent。\n请先在平台设置:\n"
"POST /api/v1/feishu/default-agent {\"agent_id\": \"<你的AgentID>\"}",
)
return
agent = db.query(Agent).filter(Agent.id == agent_id).first()
if not agent:
_reply_to_feishu(open_id, f"默认 Agent (id={agent_id}) 已不存在,请重新设置。")
return
_reply_to_feishu(open_id, f"🤔 正在思考,请稍候...")
from app.agent_runtime import AgentRuntime, AgentConfig, AgentLLMConfig, AgentToolConfig, AgentMemoryConfig
wc = agent.workflow_config or {}
nodes = wc.get("nodes", [])
system_prompt = agent.description or ""
model = "deepseek-v4-flash"
provider = "deepseek"
temperature = 0.7
max_iterations = 10
for n in nodes:
if n.get("type") not in ("agent", "llm", "template"):
continue
cfg = n.get("data", {}) if isinstance(n, dict) else getattr(n, "data", {})
system_prompt = cfg.get("system_prompt", "") or system_prompt
model = cfg.get("model", model)
provider = cfg.get("provider", provider)
temperature = float(cfg.get("temperature", temperature))
max_iterations = int(cfg.get("max_iterations", max_iterations))
break
config = AgentConfig(
name=agent.name or "agent",
system_prompt=system_prompt + (
f"\n\n## 系统信息\n"
f"你的 Agent ID 是: {agent.id}\n"
f"在调用 schedule_list、schedule_delete 等工具时,使用此 ID 作为 agent_id 参数。"
),
llm=AgentLLMConfig(
model=model,
provider=provider,
temperature=temperature,
max_iterations=max_iterations,
),
tools=AgentToolConfig(),
memory=AgentMemoryConfig(
max_history_messages=int(cfg.get("memory_max_history", 20)),
vector_memory_top_k=int(cfg.get("memory_vector_top_k", 5)),
persist_to_db=bool(cfg.get("memory_persist", True)),
vector_memory_enabled=bool(cfg.get("memory_vector_enabled", True)),
learning_enabled=bool(cfg.get("memory_learning", True)),
),
user_id=user.id,
memory_scope_id=str(agent.id),
)
# ── 目标/任务意图检测:创建 Goal 并异步执行 ──
goal_triggers = ["创建目标:", "目标:", "创建任务:", "new goal:", "goal:"]
triggered_goal = False
goal_title = ""
for trigger in goal_triggers:
if text.lower().startswith(trigger.lower()):
trigger_text = text[len(trigger):].strip()
if trigger_text:
goal_title = trigger_text[:500]
triggered_goal = True
break
if triggered_goal and goal_title:
await _handle_goal_creation(db, user.id, goal_title, open_id)
return
on_llm_call = _make_llm_logger(db, agent_id=str(agent.id), user_id=user.id)
runtime = AgentRuntime(config=config, on_llm_call=on_llm_call)
result = await runtime.run(text)
if result.content:
_reply_card(
open_id,
f"🤖 {agent.name}",
result.content.strip(),
status="success",
)
else:
_reply_to_feishu(open_id, "Agent 未返回有效回复,请重试。")
logger.info(
"飞书 Agent 回复完成: open_id=%s agent=%s iterations=%d tools=%d",
open_id[:20], agent.name, result.iterations_used, result.tool_calls_made,
)
except Exception as e:
logger.error("飞书消息处理失败: %s", e)
try:
_reply_to_feishu(open_id, f"处理失败: {e!s}")
except Exception:
pass
finally:
if db:
db.close()
def _handle_message_internal(data):
"""同步入口 — 创建异步任务处理飞书消息(支持私信 + 群聊@提及)。"""
# 去重WS 重连后可能重投已处理的消息
msg_id = _get_message_id(data)
if msg_id:
if msg_id in _processed_msg_ids:
logger.debug("跳过已处理消息: %s", msg_id)
return
_processed_msg_ids.append(msg_id)
# 记录 pending open_id用于绑定
open_id = _get_sender_open_id(data)
chat_type = _get_chat_type(data)
chat_id = _get_chat_id(data)
text = _get_message_text(data)
if open_id:
_pending_open_ids.append(open_id)
if len(_pending_open_ids) > 5:
_pending_open_ids.pop(0)
try:
with open("/tmp/feishu_open_id.txt", "w") as f:
f.write(open_id)
except Exception:
pass
# 群聊 @提及 处理:当用户在群里 @机器人时,解析意图 → 创建 Goal
if chat_type == "group" and _is_bot_mentioned(data) and text and chat_id:
logger.info("飞书群聊@提及: chat_id=%s open_id=%s text=%s", chat_id, open_id[:20] if open_id else "", text[:80] if text else "(空)")
try:
loop = asyncio.get_event_loop()
if loop.is_running():
asyncio.ensure_future(_handle_group_mention_async(data, chat_id, open_id))
else:
loop.run_until_complete(_handle_group_mention_async(data, chat_id, open_id))
except Exception as e:
logger.error("群聊@提及处理失败: %s", e)
return
if not open_id or chat_type != "p2p":
return
logger.info("飞书收到消息: open_id=%s text=%s", open_id[:20], text[:50] if text else "(空)")
if not text:
return
# 将实际处理委托给异步函数
try:
loop = asyncio.get_event_loop()
if loop.is_running():
asyncio.ensure_future(_handle_message_async(data))
else:
loop.run_until_complete(_handle_message_async(data))
except Exception as e:
logger.error("创建飞书消息处理任务失败: %s", e)
try:
_reply_to_feishu(open_id, f"处理失败: {e!s}")
except Exception:
pass
def _make_llm_logger(db, agent_id: Optional[str] = None, user_id: Optional[str] = None):
"""创建 LLM 调用日志回调。"""
def _log(metrics: dict):
try:
from app.models.agent_llm_log import AgentLLMLog
log = AgentLLMLog(
agent_id=agent_id,
session_id=metrics.get("session_id"),
user_id=user_id,
model=metrics.get("model", ""),
provider=metrics.get("provider"),
prompt_tokens=metrics.get("prompt_tokens", 0),
completion_tokens=metrics.get("completion_tokens", 0),
total_tokens=metrics.get("total_tokens", 0),
latency_ms=metrics.get("latency_ms", 0),
iteration_number=metrics.get("iteration_number", 0),
step_type=metrics.get("step_type"),
tool_name=metrics.get("tool_name"),
status=metrics.get("status", "success"),
error_message=metrics.get("error_message"),
)
db.add(log)
db.commit()
except Exception as e:
logger.warning("写入 AgentLLMLog 失败: %s", e)
return _log
async def _handle_group_mention_async(data, chat_id: str, open_id: str):
"""处理群聊 @提及 — 解析意图、创建 Goal 并回复群聊。"""
text = _get_message_text(data)
if not text:
return
try:
_reply_to_group(chat_id, "🤔 收到!正在分析你的需求...")
from sqlalchemy.orm import Session
from app.core.database import SessionLocal
from app.models.user import User
db = SessionLocal()
try:
user = db.query(User).filter(User.feishu_open_id == open_id).first()
if not user:
_reply_to_group(chat_id, "你的飞书账号尚未绑定平台用户,请先在平台绑定飞书。")
return
# 尝试提取目标意图
goal_triggers = ["创建目标:", "目标:", "创建任务:", "帮我", "请帮我", "帮我做", ""]
goal_title = text
for trigger in goal_triggers:
if text.lower().startswith(trigger.lower()):
goal_title = text[len(trigger):].strip()
break
if goal_title:
await _handle_goal_creation(db, user.id, goal_title[:500], open_id)
_reply_to_group(chat_id, f"✅ 目标已创建并开始执行")
else:
# 通用 Agent 对话
from app.models.agent import Agent
agent_id = user.feishu_default_agent_id
if agent_id:
agent = db.query(Agent).filter(Agent.id == agent_id).first()
if agent:
from app.agent_runtime import AgentRuntime, AgentConfig, AgentLLMConfig, AgentToolConfig, AgentMemoryConfig
config = AgentConfig(
name=agent.name or "agent",
system_prompt=agent.description or "",
llm=AgentLLMConfig(model="deepseek-v4-flash", provider="deepseek", temperature=0.7, max_iterations=10),
tools=AgentToolConfig(),
memory=AgentMemoryConfig(),
user_id=user.id,
memory_scope_id=str(agent.id),
)
runtime = AgentRuntime(config=config)
result = await runtime.run(text)
if result.content:
_reply_to_group(chat_id, result.content.strip()[:2000])
else:
_reply_to_group(chat_id, "抱歉,未能处理你的请求。")
finally:
db.close()
except Exception as e:
logger.error("群聊@提及处理失败: %s", e)
try:
_reply_to_group(chat_id, f"处理失败: {e!s}")
except Exception:
pass
def _handle_approval_callback(data):
"""处理飞书审批回调 — 审批通过/驳回后恢复 Task 执行。"""
try:
event_type = getattr(data, "event_type", "") or getattr(data.event if hasattr(data, "event") else None, "type", "")
logger.info("飞书审批回调: event_type=%s", event_type)
db = None
from app.core.database import SessionLocal
from app.models.task import Task
try:
db = SessionLocal()
# 查找 awaiting_approval 状态的任务
waiting_tasks = db.query(Task).filter(Task.approval_status == "pending").all()
for task in waiting_tasks:
# 检查审批是否关联此任务
approval_type = getattr(data.event, "approval_type", "") or ""
status = getattr(data.event, "status", "") or ""
if status == "approved":
task.status = "in_progress"
task.approval_status = "approved"
task.error_message = None
logger.info("审批通过: task_id=%s", task.id)
# 异步恢复执行
try:
loop = asyncio.get_event_loop()
if loop.is_running():
asyncio.ensure_future(_resume_approved_task(task.id))
else:
loop.run_until_complete(_resume_approved_task(task.id))
except Exception:
pass
elif status == "rejected":
task.status = "failed"
task.approval_status = "rejected"
task.error_message = "审批驳回"
logger.info("审批驳回: task_id=%s", task.id)
db.commit()
finally:
if db:
db.close()
except Exception as e:
logger.warning("审批回调处理失败: %s", e)
async def _resume_approved_task(task_id: str):
"""审批通过后恢复任务执行。"""
try:
from app.core.database import SessionLocal
from app.services.main_agent_service import MainAgentService
from app.services import goal_service
db = SessionLocal()
try:
svc = MainAgentService(db)
result = await svc.execute_task(task_id)
goal_service.update_task(db=db, task_id=task_id, status="completed", result=result)
finally:
db.close()
except Exception as e:
logger.error("审批通过后执行任务失败: task_id=%s error=%s", task_id, e)
async def send_daily_progress_report():
"""每日自动进度汇报 — 由定时调度触发,汇总所有活跃 Goal 的进度并通过飞书通知。"""
try:
from app.core.database import SessionLocal
from app.models.goal import Goal
from app.models.user import User
db = SessionLocal()
try:
active_goals = db.query(Goal).filter(Goal.status == "active").all()
if not active_goals:
logger.info("每日汇报: 无活跃目标")
return
report_lines = ["## 📊 每日进度汇报\n"]
for g in active_goals:
pct = int((g.progress or 0) * 100)
report_lines.append(f"- **{g.title}** [P{g.priority}] — {pct}% 完成")
report = "\n".join(report_lines)
# 通知所有有活跃目标的用户
notified = set()
for g in active_goals:
creator = db.query(User).filter(User.id == g.creator_id).first()
if not creator or not creator.feishu_open_id or creator.feishu_open_id in notified:
continue
try:
_reply_card(creator.feishu_open_id, "每日进度汇报", report, status="info")
notified.add(creator.feishu_open_id)
except Exception as e:
logger.warning("每日汇报通知用户 %s 失败: %s", creator.id, e)
logger.info("每日汇报完成: 活跃目标=%d 通知用户=%d", len(active_goals), len(notified))
finally:
db.close()
except Exception as e:
logger.error("每日汇报失败: %s", e)
def _build_event_handler():
"""构建事件处理器。"""
from lark_oapi.event.dispatcher_handler import EventDispatcherHandler
def on_message_receive(data):
"""处理 im.message.receive_v1 事件。"""
_handle_message_internal(data)
builder = EventDispatcherHandler.builder(
encrypt_key="",
verification_token=settings.FEISHU_VERIFICATION_TOKEN,
)
builder.register_p2_im_message_receive_v1(on_message_receive)
# 审批事件回调
def on_approval_event(data):
_handle_approval_callback(data)
builder.register_p2_approval_instance_event_v1(on_approval_event)
return builder.build()
async def start_ws_client():
"""在 async 上下文中启动飞书长连接(在主事件循环运行)。"""
if not settings.FEISHU_APP_ID or not settings.FEISHU_APP_SECRET:
logger.warning("飞书应用未配置,跳过长连接启动")
return
from lark_oapi.ws import Client as WSClient
handler = _build_event_handler()
client = WSClient(
app_id=settings.FEISHU_APP_ID,
app_secret=settings.FEISHU_APP_SECRET,
event_handler=handler,
auto_reconnect=True,
)
logger.info("飞书长连接客户端启动中...")
while True:
try:
await client._connect()
logger.info("飞书长连接已建立")
asyncio.ensure_future(client._ping_loop())
while True:
await asyncio.sleep(3600)
except asyncio.CancelledError:
break
except Exception as e:
logger.warning("飞书长连接断开3秒后重连: %s", e)
await asyncio.sleep(3)
def get_pending_open_ids() -> List[str]:
"""获取待绑定的 open_id 列表。"""
return list(_pending_open_ids)
def clear_pending_open_ids():
"""清空待绑定的 open_id。"""
_pending_open_ids.clear()