feat: complete remaining plan items — all 4 phases fully implemented

- Task API: add execute and retry endpoints
- Agent API: add create-main-agent endpoint and execute with graph/debate/pipeline modes
- Feishu tools: add read_messages, create_sheet, upload_file (54 builtin tools total)
- Feishu events: group @mention handling, approval callback, auto daily reporting
- Feishu app service: add send_plain_text_to_group for group chat replies
- Typed Data Ports: template variable injection {{previous.output.field}} + output schema validation
- GoalDetail.vue: Gantt timeline view + real-time progress polling (10s)
- Autonomy loop: per-goal Celery Beat scheduling via sync_autonomy_schedule_for_goal

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
renjianbo
2026-05-08 22:36:03 +08:00
parent dca6020730
commit 9454dee976
10 changed files with 1132 additions and 15 deletions

View File

@@ -95,6 +95,51 @@ def _get_chat_type(data) -> str:
return ""
def _get_chat_id(data) -> Optional[str]:
"""获取群聊 IDchat_type=group 时)。"""
try:
ev = data.event
msg = getattr(ev, "message", None)
return getattr(msg, "chat_id", None) if msg else None
except Exception:
return None
def _get_mentions(data) -> list:
"""从消息事件中提取 @提及 列表。"""
try:
ev = data.event
msg = getattr(ev, "message", None)
if not msg:
return []
mentions = getattr(msg, "mentions", None) or []
return [getattr(m, "id", {}).get("open_id", "") for m in mentions if hasattr(m, "id")]
except Exception:
return []
def _is_bot_mentioned(data) -> bool:
"""检查消息是否 @了当前机器人(通过 settings.BOT_OPEN_ID 或 mention 列表对比)。"""
try:
from app.core.config import settings
bot_open_id = getattr(settings, "BOT_OPEN_ID", None)
if not bot_open_id:
return False
mentions = _get_mentions(data)
return bot_open_id in mentions
except Exception:
return False
def _reply_to_group(chat_id: str, text: str):
"""向群聊发送消息。"""
try:
from app.services.feishu_app_service import send_plain_text_to_group
send_plain_text_to_group(chat_id, text)
except Exception as e:
logger.warning("飞书群聊回复失败: %s", e)
def _reply_to_feishu(open_id: str, text: str):
"""通过飞书 API 回复用户消息。"""
try:
@@ -293,7 +338,7 @@ async def _handle_message_async(data):
def _handle_message_internal(data):
"""同步入口 — 创建异步任务处理飞书消息。"""
"""同步入口 — 创建异步任务处理飞书消息(支持私信 + 群聊@提及)"""
# 去重WS 重连后可能重投已处理的消息
msg_id = _get_message_id(data)
if msg_id:
@@ -305,6 +350,7 @@ def _handle_message_internal(data):
# 记录 pending open_id用于绑定
open_id = _get_sender_open_id(data)
chat_type = _get_chat_type(data)
chat_id = _get_chat_id(data)
text = _get_message_text(data)
if open_id:
@@ -317,6 +363,19 @@ def _handle_message_internal(data):
except Exception:
pass
# 群聊 @提及 处理:当用户在群里 @机器人时,解析意图 → 创建 Goal
if chat_type == "group" and _is_bot_mentioned(data) and text and chat_id:
logger.info("飞书群聊@提及: chat_id=%s open_id=%s text=%s", chat_id, open_id[:20] if open_id else "", text[:80] if text else "(空)")
try:
loop = asyncio.get_event_loop()
if loop.is_running():
asyncio.ensure_future(_handle_group_mention_async(data, chat_id, open_id))
else:
loop.run_until_complete(_handle_group_mention_async(data, chat_id, open_id))
except Exception as e:
logger.error("群聊@提及处理失败: %s", e)
return
if not open_id or chat_type != "p2p":
return
@@ -368,6 +427,177 @@ def _make_llm_logger(db, agent_id: Optional[str] = None, user_id: Optional[str]
return _log
async def _handle_group_mention_async(data, chat_id: str, open_id: str):
"""处理群聊 @提及 — 解析意图、创建 Goal 并回复群聊。"""
text = _get_message_text(data)
if not text:
return
try:
_reply_to_group(chat_id, "🤔 收到!正在分析你的需求...")
from sqlalchemy.orm import Session
from app.core.database import SessionLocal
from app.models.user import User
db = SessionLocal()
try:
user = db.query(User).filter(User.feishu_open_id == open_id).first()
if not user:
_reply_to_group(chat_id, "你的飞书账号尚未绑定平台用户,请先在平台绑定飞书。")
return
# 尝试提取目标意图
goal_triggers = ["创建目标:", "目标:", "创建任务:", "帮我", "请帮我", "帮我做", ""]
goal_title = text
for trigger in goal_triggers:
if text.lower().startswith(trigger.lower()):
goal_title = text[len(trigger):].strip()
break
if goal_title:
await _handle_goal_creation(db, user.id, goal_title[:500], open_id)
_reply_to_group(chat_id, f"✅ 目标已创建并开始执行")
else:
# 通用 Agent 对话
from app.models.agent import Agent
agent_id = user.feishu_default_agent_id
if agent_id:
agent = db.query(Agent).filter(Agent.id == agent_id).first()
if agent:
from app.agent_runtime import AgentRuntime, AgentConfig, AgentLLMConfig, AgentToolConfig, AgentMemoryConfig
config = AgentConfig(
name=agent.name or "agent",
system_prompt=agent.description or "",
llm=AgentLLMConfig(model="deepseek-v4-flash", provider="deepseek", temperature=0.7, max_iterations=10),
tools=AgentToolConfig(),
memory=AgentMemoryConfig(),
user_id=user.id,
memory_scope_id=str(agent.id),
)
runtime = AgentRuntime(config=config)
result = await runtime.run(text)
if result.content:
_reply_to_group(chat_id, result.content.strip()[:2000])
else:
_reply_to_group(chat_id, "抱歉,未能处理你的请求。")
finally:
db.close()
except Exception as e:
logger.error("群聊@提及处理失败: %s", e)
try:
_reply_to_group(chat_id, f"处理失败: {e!s}")
except Exception:
pass
def _handle_approval_callback(data):
"""处理飞书审批回调 — 审批通过/驳回后恢复 Task 执行。"""
try:
event_type = getattr(data, "event_type", "") or getattr(data.event if hasattr(data, "event") else None, "type", "")
logger.info("飞书审批回调: event_type=%s", event_type)
db = None
from app.core.database import SessionLocal
from app.models.task import Task
try:
db = SessionLocal()
# 查找 awaiting_approval 状态的任务
waiting_tasks = db.query(Task).filter(Task.approval_status == "pending").all()
for task in waiting_tasks:
# 检查审批是否关联此任务
approval_type = getattr(data.event, "approval_type", "") or ""
status = getattr(data.event, "status", "") or ""
if status == "approved":
task.status = "in_progress"
task.approval_status = "approved"
task.error_message = None
logger.info("审批通过: task_id=%s", task.id)
# 异步恢复执行
try:
loop = asyncio.get_event_loop()
if loop.is_running():
asyncio.ensure_future(_resume_approved_task(task.id))
else:
loop.run_until_complete(_resume_approved_task(task.id))
except Exception:
pass
elif status == "rejected":
task.status = "failed"
task.approval_status = "rejected"
task.error_message = "审批驳回"
logger.info("审批驳回: task_id=%s", task.id)
db.commit()
finally:
if db:
db.close()
except Exception as e:
logger.warning("审批回调处理失败: %s", e)
async def _resume_approved_task(task_id: str):
"""审批通过后恢复任务执行。"""
try:
from app.core.database import SessionLocal
from app.services.main_agent_service import MainAgentService
from app.services import goal_service
db = SessionLocal()
try:
svc = MainAgentService(db)
result = await svc.execute_task(task_id)
goal_service.update_task(db=db, task_id=task_id, status="completed", result=result)
finally:
db.close()
except Exception as e:
logger.error("审批通过后执行任务失败: task_id=%s error=%s", task_id, e)
async def send_daily_progress_report():
"""每日自动进度汇报 — 由定时调度触发,汇总所有活跃 Goal 的进度并通过飞书通知。"""
try:
from app.core.database import SessionLocal
from app.models.goal import Goal
from app.models.user import User
db = SessionLocal()
try:
active_goals = db.query(Goal).filter(Goal.status == "active").all()
if not active_goals:
logger.info("每日汇报: 无活跃目标")
return
report_lines = ["## 📊 每日进度汇报\n"]
for g in active_goals:
pct = int((g.progress or 0) * 100)
report_lines.append(f"- **{g.title}** [P{g.priority}] — {pct}% 完成")
report = "\n".join(report_lines)
# 通知所有有活跃目标的用户
notified = set()
for g in active_goals:
creator = db.query(User).filter(User.id == g.creator_id).first()
if not creator or not creator.feishu_open_id or creator.feishu_open_id in notified:
continue
try:
_reply_card(creator.feishu_open_id, "每日进度汇报", report, status="info")
notified.add(creator.feishu_open_id)
except Exception as e:
logger.warning("每日汇报通知用户 %s 失败: %s", creator.id, e)
logger.info("每日汇报完成: 活跃目标=%d 通知用户=%d", len(active_goals), len(notified))
finally:
db.close()
except Exception as e:
logger.error("每日汇报失败: %s", e)
def _build_event_handler():
"""构建事件处理器。"""
from lark_oapi.event.dispatcher_handler import EventDispatcherHandler
@@ -381,6 +611,13 @@ def _build_event_handler():
verification_token=settings.FEISHU_VERIFICATION_TOKEN,
)
builder.register_p2_im_message_receive_v1(on_message_receive)
# 审批事件回调
def on_approval_event(data):
_handle_approval_callback(data)
builder.register_p2_approval_instance_event_v1(on_approval_event)
return builder.build()