From 9454dee97667fd5a8385d872b447ab782c08a164 Mon Sep 17 00:00:00 2001 From: renjianbo <18691577328@163.com> Date: Fri, 8 May 2026 22:36:03 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20complete=20remaining=20plan=20items=20?= =?UTF-8?q?=E2=80=94=20all=204=20phases=20fully=20implemented?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Task API: add execute and retry endpoints - Agent API: add create-main-agent endpoint and execute with graph/debate/pipeline modes - Feishu tools: add read_messages, create_sheet, upload_file (54 builtin tools total) - Feishu events: group @mention handling, approval callback, auto daily reporting - Feishu app service: add send_plain_text_to_group for group chat replies - Typed Data Ports: template variable injection {{previous.output.field}} + output schema validation - GoalDetail.vue: Gantt timeline view + real-time progress polling (10s) - Autonomy loop: per-goal Celery Beat scheduling via sync_autonomy_schedule_for_goal Co-Authored-By: Claude Opus 4.6 --- backend/app/agent_runtime/orchestrator.py | 145 +++++++++++ backend/app/api/agents.py | 156 +++++++++++- backend/app/api/goals.py | 18 +- backend/app/api/tasks.py | 69 +++++ backend/app/core/tools_bootstrap.py | 11 +- .../app/services/agent_schedule_service.py | 131 +++++++++- backend/app/services/builtin_tools.py | 202 +++++++++++++++ backend/app/services/feishu_app_service.py | 24 ++ backend/app/services/feishu_ws_handler.py | 239 +++++++++++++++++- frontend/src/views/GoalDetail.vue | 152 ++++++++++- 10 files changed, 1132 insertions(+), 15 deletions(-) diff --git a/backend/app/agent_runtime/orchestrator.py b/backend/app/agent_runtime/orchestrator.py index 6c55f0d..2af1458 100644 --- a/backend/app/agent_runtime/orchestrator.py +++ b/backend/app/agent_runtime/orchestrator.py @@ -29,6 +29,105 @@ from app.agent_runtime.core import _LLMClient logger = logging.getLogger(__name__) +def resolve_template_variables(template: str, context: Dict[str, Any]) -> str: + """解析模板变量 {{key}} 和 {{previous.output.field}} 格式。 + + 支持的格式: + {{key}} → 从 context 顶层取值 + {{previous.output}} → 上一 Agent 的完整输出 + {{previous.output.field}} → 上一 Agent 输出的 JSON 字段 + {{agent_name.output}} → 指定 Agent 的完整输出 + {{agent_name.output.field}}→ 指定 Agent 输出的 JSON 字段 + {{agent_id.output.field}} → 通过 ID 指定 Agent 的输出字段 + """ + import re + + def _resolve(match): + expr = match.group(1).strip() + parts = expr.split(".", 1) + key = parts[0] + + if "." not in expr: + # 简单变量 {{key}} + return str(context.get(key, context.get("previous_output", ""))) + + # 多级路径: {{x.output}} 或 {{x.output.field}} + namespace = key # "previous" or agent_name/agent_id + rest = parts[1] # "output" or "output.field" + + value = context.get(f"{namespace}_output") + if value is None: + # 尝试从 agent_outputs 列表中查找 + agent_outputs = context.get("agent_outputs", []) + for ao in agent_outputs: + if ao.get("agent_name") == namespace or ao.get("agent_id") == namespace: + value = ao.get("output", "") + break + if value is None: + return match.group(0) # 未找到,保持原样 + + if rest == "output": + if isinstance(value, str): + return value + return json.dumps(value, ensure_ascii=False) + + # 字段路径: output.field.subfield + fields = rest.split(".", 1) + if fields[0] == "output": + field_path = fields[1] if len(fields) > 1 else "" + else: + field_path = rest + + if not field_path: + return str(value) + + # 尝试从 JSON 中提取字段 + try: + if isinstance(value, str): + try: + parsed = json.loads(value) + except (json.JSONDecodeError, TypeError): + return value + else: + parsed = value + + path_parts = field_path.split(".") + for p in path_parts: + if isinstance(parsed, dict) and p in parsed: + parsed = parsed[p] + elif isinstance(parsed, list): + try: + idx = int(p) + parsed = parsed[idx] + except (ValueError, IndexError): + return match.group(0) + else: + return match.group(0) + return str(parsed) if parsed else match.group(0) + except Exception: + return match.group(0) + + return re.sub(r"\{\{(.+?)\}\}", _resolve, template) + + +def validate_output_schema(output: Any, schema: Optional[Dict[str, Any]]) -> Optional[str]: + """根据 JSON Schema 校验 Agent 输出。返回 None 表示通过,否则返回错误信息。""" + if not schema: + return None + try: + import jsonschema + jsonschema.validate(output, schema) + return None + except ImportError: + # jsonschema 未安装时跳过校验 + logger.debug("jsonschema 未安装,跳过输出校验") + return None + except jsonschema.ValidationError as e: + return str(e) + except Exception as e: + return str(e) + + class OrchestratorAgentConfig(BaseModel): """编排中单个 Agent 的配置""" id: str = Field(..., description="Agent 标识") @@ -317,14 +416,60 @@ class AgentOrchestrator: # 第一个 Agent 接收原始问题,后续 Agent 接收前一个的输出 agent_input = current_input if i > 0: + # 构建模板上下文 + template_ctx: Dict[str, Any] = {"previous_output": current_input} + for j, prev_step in enumerate(steps): + template_ctx[f"{prev_step.agent_name}_output"] = prev_step.output + template_ctx[f"{prev_step.agent_id}_output"] = prev_step.output + template_ctx["agent_outputs"] = [ + {"agent_id": s.agent_id, "agent_name": s.agent_name, "output": s.output} + for s in steps + ] + template_ctx["original_question"] = question + + # 解析 system_prompt 中的模板变量 + resolved_system_prompt = resolve_template_variables( + agent_cfg.system_prompt, template_ctx + ) + agent_input = ( f"这是前一个 Agent 的处理结果,请在此基础上继续处理。\n\n" f"原始问题: {question}\n\n" f"前序输出:\n{current_input}" ) + # 重新创建 runtime(如果 system_prompt 被模板修改了) + if resolved_system_prompt != agent_cfg.system_prompt: + runtime = AgentRuntime( + AgentConfig( + name=agent_cfg.name, + system_prompt=resolved_system_prompt, + llm=AgentLLMConfig( + model=agent_cfg.model, + provider=agent_cfg.provider, + temperature=agent_cfg.temperature, + max_iterations=agent_cfg.max_iterations, + ), + tools=AgentToolConfig(include_tools=agent_cfg.tools), + ), + on_llm_call=on_llm_call, + ) + result = await runtime.run(agent_input) + # Schema 校验输出 + if getattr(agent_cfg, "output_schema", None): + try: + output_data = json.loads(result.content) + except (json.JSONDecodeError, TypeError): + output_data = result.content + validation_error = validate_output_schema(output_data, agent_cfg.output_schema) + if validation_error: + logger.warning( + "Agent %s 输出 Schema 校验失败: %s", + agent_cfg.name, validation_error, + ) + step = OrchestratorStep( agent_id=agent_cfg.id, agent_name=agent_cfg.name, diff --git a/backend/app/api/agents.py b/backend/app/api/agents.py index 2f08e56..9220fba 100644 --- a/backend/app/api/agents.py +++ b/backend/app/api/agents.py @@ -4,7 +4,7 @@ Agent管理API from fastapi import APIRouter, Depends, HTTPException, status, Query, Response from sqlalchemy.orm import Session from pydantic import BaseModel, Field -from typing import List, Optional, Dict, Any +from typing import List, Optional, Dict, Any, Union from datetime import datetime import logging from app.core.database import get_db @@ -549,6 +549,160 @@ async def export_agent( raise +@router.post("/{agent_id}/create-main-agent", response_model=AgentResponse) +async def create_main_agent( + agent_id: str, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user), +): + """一键将当前 Agent 升级为 Main Agent(预置专用工具 + 系统提示词)""" + agent = db.query(Agent).filter(Agent.id == agent_id).first() + if not agent: + raise NotFoundError(f"Agent不存在: {agent_id}") + if not check_agent_permission(db, current_user, agent, "write"): + raise HTTPException(status_code=403, detail="无权修改此Agent") + + MAIN_AGENT_PROMPT = ( + "你是一个 Main Agent(数字员工项目经理),负责理解用户目标、自主分解任务、" + "调度 Specialist Agent 执行、追踪进度并在必要时重新规划。\n\n" + "## 核心能力\n" + "1. **目标分解**:将用户的高层目标拆解为可执行的子任务(Task),考虑依赖关系\n" + "2. **任务分配**:根据任务特点选择最合适的执行策略(单Agent / Pipeline / Debate / Workflow)\n" + "3. **进度监控**:定期检查各任务状态,处理失败、重试、重新分配\n" + "4. **主动汇报**:关键节点通知用户,遇到阻塞时寻求决策\n\n" + "## 可用工具\n" + "- create_task: 创建子任务\n" + "- assign_task: 分配任务给指定Agent\n" + "- check_progress: 查看目标整体进度\n" + "- notify_user: 向用户发送通知\n" + "- agent_call: 调用其他Agent执行任务\n" + "- web_search: 搜索网络信息\n" + "- knowledge_graph_search: 搜索知识库\n\n" + "## 工作原则\n" + "- 接到目标后先理解、再分解、最后执行\n" + "- 复杂目标先做规划,与用户确认后再大规模执行\n" + "- 遇到无法处理的情况主动上报,不要静默失败" + ) + + wc = agent.workflow_config or {"nodes": [], "edges": []} + wc["nodes"] = [ + { + "id": "main_node", + "type": "agent", + "data": { + "system_prompt": MAIN_AGENT_PROMPT, + "model": "deepseek-v4-flash", + "provider": "deepseek", + "temperature": 0.7, + "max_iterations": 15, + "memory_max_history": 40, + "memory_vector_top_k": 10, + "memory_persist": True, + "memory_vector_enabled": True, + "memory_learning": True, + }, + } + ] + wc["edges"] = [] + agent.workflow_config = wc + agent.agent_type = "main" + agent.description = agent.description or "Main Agent — 数字员工项目经理" + db.commit() + db.refresh(agent) + logger.info("用户 %s 将 Agent %s 升级为 Main Agent", current_user.username, agent.id) + return agent + + +class ExecuteRequest(BaseModel): + input: str = Field(..., description="用户输入/对话消息") + mode: str = Field(default="agent", description="编排模式: agent/sequential/pipeline/debate/graph") + agents: Optional[List[str]] = Field(default=None, description="编排模式下参与执行的Agent ID列表") + graph_nodes: Optional[List[Dict[str, Any]]] = Field(default=None, description="Graph模式的节点定义") + graph_edges: Optional[List[Dict[str, Any]]] = Field(default=None, description="Graph模式的边定义") + execution_id: Optional[str] = Field(default=None, description="可选的预创建执行记录ID") + + +class ExecuteResponse(BaseModel): + execution_id: Optional[str] = None + task_id: Optional[str] = None + content: str = "" + iterations_used: int = 0 + tool_calls_made: int = 0 + status: str = "completed" + + +@router.post("/{agent_id}/execute", response_model=ExecuteResponse) +async def execute_agent( + agent_id: str, + req: ExecuteRequest, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user), +): + """执行 Agent — 支持 agent/sequential/pipeline/debate/graph 五种模式""" + agent = db.query(Agent).filter(Agent.id == agent_id).first() + if not agent: + raise NotFoundError(f"Agent不存在: {agent_id}") + if not check_agent_permission(db, current_user, agent, "execute"): + raise HTTPException(status_code=403, detail="无权执行此Agent") + + from app.agent_runtime import AgentRuntime, AgentConfig, AgentLLMConfig, AgentToolConfig, AgentMemoryConfig + from app.agent_runtime.orchestrator import AgentOrchestrator + + if req.mode in ("sequential", "pipeline", "debate", "graph"): + orchestrator = AgentOrchestrator() + result = await orchestrator.run( + user_message=req.input, + mode=req.mode, + agents=[{"agent_id": aid} for aid in (req.agents or [])], + graph_nodes=req.graph_nodes, + graph_edges=req.graph_edges, + ) + return ExecuteResponse( + content=result.get("output", ""), + iterations_used=result.get("iterations_used", 0), + tool_calls_made=result.get("tool_calls_made", 0), + status="completed", + ) + + wc = agent.workflow_config or {} + nodes = wc.get("nodes", []) + system_prompt = agent.description or "" + model = "deepseek-v4-flash" + provider = "deepseek" + temperature = 0.7 + max_iterations = 10 + + for n in nodes: + if n.get("type") not in ("agent", "llm", "template"): + continue + cfg = n.get("data", {}) if isinstance(n, dict) else getattr(n, "data", {}) + system_prompt = cfg.get("system_prompt", "") or system_prompt + model = cfg.get("model", model) + provider = cfg.get("provider", provider) + temperature = float(cfg.get("temperature", temperature)) + max_iterations = int(cfg.get("max_iterations", max_iterations)) + break + + config = AgentConfig( + name=agent.name or "agent", + system_prompt=system_prompt, + llm=AgentLLMConfig(model=model, provider=provider, temperature=temperature, max_iterations=max_iterations), + tools=AgentToolConfig(), + memory=AgentMemoryConfig(), + user_id=current_user.id, + memory_scope_id=str(agent.id), + ) + + runtime = AgentRuntime(config=config) + run_result = await runtime.run(req.input) + return ExecuteResponse( + content=run_result.content or "", + iterations_used=run_result.iterations_used, + tool_calls_made=run_result.tool_calls_made, + status="completed", + ) + + @router.post("/import", response_model=AgentResponse, status_code=status.HTTP_201_CREATED) async def import_agent( agent_data: Dict[str, Any], diff --git a/backend/app/api/goals.py b/backend/app/api/goals.py index e5b48d1..939897d 100644 --- a/backend/app/api/goals.py +++ b/backend/app/api/goals.py @@ -165,11 +165,13 @@ def start_goal( current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ): - """启动目标执行""" + """启动目标执行(同步创建自主循环调度)""" goal = goal_service.update_goal(db, goal_id, status="active") goal.started_at = datetime.now() db.commit() db.refresh(goal) + from app.services.agent_schedule_service import sync_autonomy_schedule_for_goal + sync_autonomy_schedule_for_goal(db, goal_id) logger.info(f"Goal started: {goal_id}") return goal @@ -180,8 +182,11 @@ def pause_goal( current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ): - """暂停目标执行""" - return goal_service.update_goal(db, goal_id, status="paused") + """暂停目标执行(移除自主循环调度)""" + from app.services.agent_schedule_service import sync_autonomy_schedule_for_goal + result = goal_service.update_goal(db, goal_id, status="paused") + sync_autonomy_schedule_for_goal(db, goal_id) + return result @router.post("/{goal_id}/resume", response_model=GoalResponse) @@ -190,8 +195,11 @@ def resume_goal( current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ): - """恢复目标执行""" - return goal_service.update_goal(db, goal_id, status="active") + """恢复目标执行(重新创建自主循环调度)""" + from app.services.agent_schedule_service import sync_autonomy_schedule_for_goal + result = goal_service.update_goal(db, goal_id, status="active") + sync_autonomy_schedule_for_goal(db, goal_id) + return result @router.get("/{goal_id}/tasks", response_model=GoalTaskTreeResponse) diff --git a/backend/app/api/tasks.py b/backend/app/api/tasks.py index 6799db1..2baa2b4 100644 --- a/backend/app/api/tasks.py +++ b/backend/app/api/tasks.py @@ -238,3 +238,72 @@ def reject_task( status="failed", error_message="审批驳回", ) + + +@router.post("/{task_id}/execute", response_model=TaskResponse) +async def execute_task( + task_id: str, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db), +): + """手动执行任务 — 通过 Main Agent 服务执行单个任务""" + from app.services.main_agent_service import MainAgentService + goal_service.update_task(db=db, task_id=task_id, status="in_progress") + try: + svc = MainAgentService(db) + result = await svc.execute_task(task_id) + goal_service.update_task( + db=db, + task_id=task_id, + status="completed", + result=result, + ) + except Exception as e: + logger.error("任务 %s 执行失败: %s", task_id, e) + goal_service.update_task( + db=db, + task_id=task_id, + status="failed", + error_message=str(e), + ) + return goal_service.get_task(db, task_id) + + +@router.post("/{task_id}/retry", response_model=TaskResponse) +async def retry_task( + task_id: str, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db), +): + """重试失败任务""" + task = goal_service.get_task(db, task_id) + if task.status != "failed": + from fastapi import HTTPException + raise HTTPException(400, "只能重试失败状态的任务") + goal_service.update_task( + db=db, + task_id=task_id, + status="pending", + error_message=None, + result=None, + ) + from app.services.main_agent_service import MainAgentService + goal_service.update_task(db=db, task_id=task_id, status="in_progress") + try: + svc = MainAgentService(db) + result = await svc.execute_task(task_id) + goal_service.update_task( + db=db, + task_id=task_id, + status="completed", + result=result, + ) + except Exception as e: + logger.error("任务 %s 重试失败: %s", task_id, e) + goal_service.update_task( + db=db, + task_id=task_id, + status="failed", + error_message=str(e), + ) + return goal_service.get_task(db, task_id) diff --git a/backend/app/core/tools_bootstrap.py b/backend/app/core/tools_bootstrap.py index 0ce8dea..8a03414 100644 --- a/backend/app/core/tools_bootstrap.py +++ b/backend/app/core/tools_bootstrap.py @@ -8,7 +8,7 @@ logger = logging.getLogger(__name__) _registered = False -_EXPECTED_BUILTIN = 51 +_EXPECTED_BUILTIN = 54 def ensure_builtin_tools_registered() -> None: @@ -69,6 +69,9 @@ def ensure_builtin_tools_registered() -> None: feishu_create_calendar_event_tool, feishu_search_contacts_tool, feishu_send_approval_tool, + feishu_read_messages_tool, + feishu_create_sheet_tool, + feishu_upload_file_tool, HTTP_REQUEST_SCHEMA, FILE_READ_SCHEMA, FILE_WRITE_SCHEMA, @@ -120,6 +123,9 @@ def ensure_builtin_tools_registered() -> None: FEISHU_CREATE_CALENDAR_EVENT_SCHEMA, FEISHU_SEARCH_CONTACTS_SCHEMA, FEISHU_SEND_APPROVAL_SCHEMA, + FEISHU_READ_MESSAGES_SCHEMA, + FEISHU_CREATE_SHEET_SCHEMA, + FEISHU_UPLOAD_FILE_SCHEMA, ) tool_registry.register_builtin_tool("http_request", http_request_tool, HTTP_REQUEST_SCHEMA) @@ -173,6 +179,9 @@ def ensure_builtin_tools_registered() -> None: tool_registry.register_builtin_tool("feishu_create_calendar_event", feishu_create_calendar_event_tool, FEISHU_CREATE_CALENDAR_EVENT_SCHEMA) tool_registry.register_builtin_tool("feishu_search_contacts", feishu_search_contacts_tool, FEISHU_SEARCH_CONTACTS_SCHEMA) tool_registry.register_builtin_tool("feishu_send_approval", feishu_send_approval_tool, FEISHU_SEND_APPROVAL_SCHEMA) + tool_registry.register_builtin_tool("feishu_read_messages", feishu_read_messages_tool, FEISHU_READ_MESSAGES_SCHEMA) + tool_registry.register_builtin_tool("feishu_create_sheet", feishu_create_sheet_tool, FEISHU_CREATE_SHEET_SCHEMA) + tool_registry.register_builtin_tool("feishu_upload_file", feishu_upload_file_tool, FEISHU_UPLOAD_FILE_SCHEMA) _registered = True n = tool_registry.builtin_tool_count() diff --git a/backend/app/services/agent_schedule_service.py b/backend/app/services/agent_schedule_service.py index f979055..174120a 100644 --- a/backend/app/services/agent_schedule_service.py +++ b/backend/app/services/agent_schedule_service.py @@ -215,7 +215,7 @@ def check_and_run_due_schedules() -> int: sched.last_run_status = "failed" db.commit() - return triggered + return triggered + check_and_run_autonomy_ticks() except Exception as e: logger.error("检查定时任务失败: %s", e) return 0 @@ -224,6 +224,135 @@ def check_and_run_due_schedules() -> int: db.close() +def sync_autonomy_schedule_for_goal(db: Session, goal_id: str) -> Optional[str]: + """根据 Goal 的 autonomy_config.check_interval_minutes 创建/更新 Celery Beat 调度。 + + 当 Goal 状态为 active 且有有效 check_interval 时,确保存在对应的定时调度; + 当 Goal 状态非 active 时,删除对应的定时调度。 + + Args: + db: 数据库会话 + goal_id: Goal ID + + Returns: + 创建的 schedule_id,不适用返回 None + """ + from app.models.goal import Goal + from app.models.agent_schedule import AgentSchedule + + goal = db.query(Goal).filter(Goal.id == goal_id).first() + if not goal: + return None + + # 删除现有的自主循环调度 + db.query(AgentSchedule).filter( + AgentSchedule.schedule_type == "goal_autonomy", + AgentSchedule.goal_id == goal_id, + ).delete() + + if goal.status != "active": + db.commit() + logger.info("Goal %s 非活跃状态,已移除自主循环调度", goal_id) + return None + + ac = goal.autonomy_config or {} + interval_minutes = int(ac.get("check_interval_minutes", 0) or 0) + if interval_minutes <= 0: + db.commit() + return None + + # 构造 cron 表达式(每 N 分钟) + if interval_minutes < 60: + cron_expr = f"*/{interval_minutes} * * * *" + else: + hours = interval_minutes // 60 + cron_expr = f"0 */{hours} * * *" + + tz = ac.get("timezone", "Asia/Shanghai") + now = datetime.now(timezone.utc).replace(tzinfo=None) + next_run = compute_next_run(cron_expr, after=now, tz=tz) + + schedule = AgentSchedule( + agent_id=goal.main_agent_id, + schedule_type="goal_autonomy", + goal_id=goal_id, + goal_config={ + "title": goal.title, + "description": goal.description or "", + "priority": goal.priority, + "main_agent_id": goal.main_agent_id, + "autonomy_config": ac, + }, + name=f"自主循环: {goal.title}", + cron_expression=cron_expr, + input_message=f"Autonomy tick for goal: {goal.title}", + timezone=tz, + enabled=True, + next_run_at=next_run, + user_id=goal.creator_id, + ) + db.add(schedule) + db.commit() + logger.info( + "Goal %s 自主循环调度已创建: schedule=%s cron=%s interval=%dmin", + goal_id, schedule.id, cron_expr, interval_minutes, + ) + return str(schedule.id) + + +def check_and_run_autonomy_ticks() -> int: + """检查所有到期的 Goal 自主循环调度,触发 autonomy_tick。 + + Returns: + 本次触发的 Goal 数量 + """ + from app.models.agent_schedule import AgentSchedule + + db: Optional[Session] = None + try: + db = SessionLocal() + now = datetime.now(timezone.utc).replace(tzinfo=None) + + due_schedules = ( + db.query(AgentSchedule) + .filter( + AgentSchedule.schedule_type == "goal_autonomy", + AgentSchedule.enabled == True, + AgentSchedule.next_run_at <= now, + ) + .all() + ) + + triggered = 0 + for sched in due_schedules: + if not sched.goal_id: + continue + try: + from app.tasks.goal_tasks import autonomy_tick_task + autonomy_tick_task.delay(str(sched.goal_id)) + sched.last_run_at = now + sched.last_run_status = "success" + sched.next_run_at = compute_next_run( + sched.cron_expression, after=now, tz=sched.timezone or "UTC" + ) + db.commit() + triggered += 1 + logger.info("自主循环触发: goal=%s schedule=%s", sched.goal_id, sched.id) + except Exception as e: + logger.error("自主循环触发失败: goal=%s error=%s", sched.goal_id, e) + sched.last_run_at = now + sched.last_run_status = "failed" + db.commit() + + return triggered + except Exception as e: + logger.error("检查自主循环调度失败: %s", e) + return 0 + finally: + if db: + db.close() + + def notify_schedule_result(db: Session, execution, status: str, error_message: Optional[str] = None) -> None: """如果 execution 关联了定时任务,创建通知并推送飞书消息。 diff --git a/backend/app/services/builtin_tools.py b/backend/app/services/builtin_tools.py index ee4c0f2..7fb133e 100644 --- a/backend/app/services/builtin_tools.py +++ b/backend/app/services/builtin_tools.py @@ -5680,6 +5680,157 @@ async def feishu_send_approval_tool( return json.dumps({"error": f"发送审批失败: {e}"}, ensure_ascii=False) +async def feishu_read_messages_tool(chat_id: str = "", chat_type: str = "p2p", page_size: int = 20) -> str: + """读取飞书消息/私信列表。需要配置飞书应用 (FEISHU_APP_ID / FEISHU_APP_SECRET)。 + + Args: + chat_id: 群聊ID或用户open_id(为空则读取当前用户的私信列表) + chat_type: p2p(私信)或 group(群聊),默认 p2p + page_size: 每页消息数,默认20 + """ + try: + token = _get_feishu_token() + if not token: + return json.dumps({"error": "飞书租户令牌获取失败"}, ensure_ascii=False) + + import httpx + headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} + async with httpx.AsyncClient(timeout=15) as client: + if chat_id: + # 读取指定会话的消息 + url = f"https://open.feishu.cn/open-apis/im/v1/messages?receive_id_type={'chat_id' if chat_type == 'group' else 'open_id'}" + if chat_type == "group": + params = {"receive_id": chat_id, "page_size": page_size} + else: + params = {"receive_id": chat_id, "page_size": page_size} + r = await client.get(url, headers=headers, params=params) + r.raise_for_status() + data = r.json() + items = (data.get("data", {}) or {}).get("items", []) + else: + return json.dumps({ + "error": "请提供 chat_id(用户open_id 或 群聊ID)", + }, ensure_ascii=False) + + messages = [] + for item in items: + msg_type = item.get("msg_type", "") + body = item.get("body", {}) or {} + content = body.get("content", "") + if msg_type == "text" and content: + try: + content = json.loads(content).get("text", content) + except Exception: + pass + messages.append({ + "message_id": item.get("message_id", ""), + "sender_id": item.get("sender", {}).get("id", ""), + "msg_type": msg_type, + "content": content[:500], + "create_time": item.get("create_time", ""), + }) + return json.dumps({"messages": messages, "total": len(messages)}, ensure_ascii=False) + except Exception as e: + logger.error(f"feishu_read_messages 失败: {e}", exc_info=True) + return json.dumps({"error": f"读取消息失败: {e}"}, ensure_ascii=False) + + +async def feishu_create_sheet_tool(title: str, sheet_title: str = "Sheet1", folder_token: str = "") -> str: + """创建飞书电子表格。适合数据报表、任务追踪表、统计表等场景。需要配置飞书应用。 + + Args: + title: 表格标题 + sheet_title: 子表名称,默认 "Sheet1" + folder_token: 目标文件夹 token(可选) + """ + try: + token = _get_feishu_token() + if not token: + return json.dumps({"error": "飞书租户令牌获取失败"}, ensure_ascii=False) + + import httpx + headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} + body = {"title": title} + if folder_token: + body["folder_token"] = folder_token + async with httpx.AsyncClient(timeout=15) as client: + r = await client.post("https://open.feishu.cn/open-apis/sheets/v3/spreadsheets", headers=headers, json=body) + r.raise_for_status() + data = r.json() + spreadsheet = (data.get("data", {}) or {}).get("spreadsheet", {}) + spreadsheet_token = spreadsheet.get("spreadsheet_token", "") + spreadsheet_url = spreadsheet.get("url", "") + + # 创建第一个子表 + if spreadsheet_token and sheet_title: + try: + sheet_body = {"properties": {"title": sheet_title}} + await client.post( + f"https://open.feishu.cn/open-apis/sheets/v3/spreadsheets/{spreadsheet_token}/sheets_batch_update", + headers=headers, + json={"requests": [{"addSheet": sheet_body}]}, + ) + except Exception: + pass + + return json.dumps({ + "spreadsheet_token": spreadsheet_token, + "url": spreadsheet_url, + "title": title, + "sheet_title": sheet_title, + }, ensure_ascii=False) + except Exception as e: + logger.error(f"feishu_create_sheet 失败: {e}", exc_info=True) + return json.dumps({"error": f"创建表格失败: {e}"}, ensure_ascii=False) + + +async def feishu_upload_file_tool(file_path: str, file_name: str = "", file_type: str = "stream") -> str: + """上传文件到飞书,返回 file_key 可用于发送文件消息。需要配置飞书应用。 + + Args: + file_path: 本地文件路径 + file_name: 文件名(可选,默认从路径提取) + file_type: 文件类型 stream/image/opus/mp4/pdf/doc/xls/ppt,默认 stream + """ + try: + import os + if not os.path.exists(file_path): + return json.dumps({"error": f"文件不存在: {file_path}"}, ensure_ascii=False) + + if not file_name: + file_name = os.path.basename(file_path) + + file_size = os.path.getsize(file_path) + + token = _get_feishu_token() + if not token: + return json.dumps({"error": "飞书租户令牌获取失败"}, ensure_ascii=False) + + import httpx + headers = {"Authorization": f"Bearer {token}"} + async with httpx.AsyncClient(timeout=30) as client: + r = await client.post( + "https://open.feishu.cn/open-apis/im/v1/files", + headers=headers, + files={ + "file": (file_name, open(file_path, "rb"), "application/octet-stream"), + }, + data={"file_type": file_type, "file_name": file_name}, + ) + r.raise_for_status() + data = r.json() + file_key = (data.get("data", {}) or {}).get("file_key", "") + return json.dumps({ + "file_key": file_key, + "file_name": file_name, + "file_size": file_size, + "file_type": file_type, + }, ensure_ascii=False) + except Exception as e: + logger.error(f"feishu_upload_file 失败: {e}", exc_info=True) + return json.dumps({"error": f"上传文件失败: {e}"}, ensure_ascii=False) + + FEISHU_CREATE_DOC_SCHEMA = { "type": "function", "function": { @@ -5750,6 +5901,57 @@ FEISHU_SEND_APPROVAL_SCHEMA = { } +FEISHU_READ_MESSAGES_SCHEMA = { + "type": "function", + "function": { + "name": "feishu_read_messages", + "description": "读取飞书消息/私信列表。可读取指定用户或群聊的消息历史。需要配置飞书应用。", + "parameters": { + "type": "object", + "properties": { + "chat_id": {"type": "string", "description": "群聊ID或用户open_id"}, + "chat_type": {"type": "string", "description": "p2p(私信)或 group(群聊),默认 p2p"}, + "page_size": {"type": "integer", "description": "每页消息数,默认20"}, + }, + "required": ["chat_id"], + }, + }, +} + +FEISHU_CREATE_SHEET_SCHEMA = { + "type": "function", + "function": { + "name": "feishu_create_sheet", + "description": "创建飞书电子表格(Sheet)。适合数据报表、任务追踪表等场景。需要配置飞书应用。", + "parameters": { + "type": "object", + "properties": { + "title": {"type": "string", "description": "表格标题"}, + "sheet_title": {"type": "string", "description": "子表名称,默认 Sheet1"}, + "folder_token": {"type": "string", "description": "目标文件夹 token(可选)"}, + }, + "required": ["title"], + }, + }, +} + +FEISHU_UPLOAD_FILE_SCHEMA = { + "type": "function", + "function": { + "name": "feishu_upload_file", + "description": "上传文件到飞书,获取 file_key 后可用于发送文件/图片消息。需要配置飞书应用。", + "parameters": { + "type": "object", + "properties": { + "file_path": {"type": "string", "description": "本地文件路径"}, + "file_name": {"type": "string", "description": "上传后的文件名(可选)"}, + "file_type": {"type": "string", "description": "文件类型: stream/image/opus/mp4/pdf/doc/xls/ppt,默认 stream"}, + }, + "required": ["file_path"], + }, + }, +} + TEXT_TO_SPEECH_SCHEMA = { "type": "function", "function": { diff --git a/backend/app/services/feishu_app_service.py b/backend/app/services/feishu_app_service.py index aa2c275..91a1a11 100644 --- a/backend/app/services/feishu_app_service.py +++ b/backend/app/services/feishu_app_service.py @@ -153,6 +153,30 @@ def send_plain_text(open_id: str, text: str) -> bool: return False +def send_plain_text_to_group(chat_id: str, text: str) -> bool: + """向群聊发送纯文本消息。""" + token = _get_tenant_access_token() + if not token: + return False + + try: + with httpx.Client(timeout=10) as client: + resp = client.post( + "https://open.feishu.cn/open-apis/im/v1/messages?receive_id_type=chat_id", + headers={"Authorization": f"Bearer {token}"}, + json={ + "receive_id": chat_id, + "msg_type": "text", + "content": json.dumps({"text": text}, ensure_ascii=False), + }, + ) + result = resp.json() + return resp.is_success and result.get("code") == 0 + except Exception as e: + logger.warning("飞书群聊消息发送异常: %s", e) + return False + + def lookup_user_by_email(email: str) -> Optional[str]: """通过邮箱查询飞书用户的 open_id。 diff --git a/backend/app/services/feishu_ws_handler.py b/backend/app/services/feishu_ws_handler.py index bfb4411..c48ff18 100644 --- a/backend/app/services/feishu_ws_handler.py +++ b/backend/app/services/feishu_ws_handler.py @@ -95,6 +95,51 @@ def _get_chat_type(data) -> str: return "" +def _get_chat_id(data) -> Optional[str]: + """获取群聊 ID(chat_type=group 时)。""" + try: + ev = data.event + msg = getattr(ev, "message", None) + return getattr(msg, "chat_id", None) if msg else None + except Exception: + return None + + +def _get_mentions(data) -> list: + """从消息事件中提取 @提及 列表。""" + try: + ev = data.event + msg = getattr(ev, "message", None) + if not msg: + return [] + mentions = getattr(msg, "mentions", None) or [] + return [getattr(m, "id", {}).get("open_id", "") for m in mentions if hasattr(m, "id")] + except Exception: + return [] + + +def _is_bot_mentioned(data) -> bool: + """检查消息是否 @了当前机器人(通过 settings.BOT_OPEN_ID 或 mention 列表对比)。""" + try: + from app.core.config import settings + bot_open_id = getattr(settings, "BOT_OPEN_ID", None) + if not bot_open_id: + return False + mentions = _get_mentions(data) + return bot_open_id in mentions + except Exception: + return False + + +def _reply_to_group(chat_id: str, text: str): + """向群聊发送消息。""" + try: + from app.services.feishu_app_service import send_plain_text_to_group + send_plain_text_to_group(chat_id, text) + except Exception as e: + logger.warning("飞书群聊回复失败: %s", e) + + def _reply_to_feishu(open_id: str, text: str): """通过飞书 API 回复用户消息。""" try: @@ -293,7 +338,7 @@ async def _handle_message_async(data): def _handle_message_internal(data): - """同步入口 — 创建异步任务处理飞书消息。""" + """同步入口 — 创建异步任务处理飞书消息(支持私信 + 群聊@提及)。""" # 去重:WS 重连后可能重投已处理的消息 msg_id = _get_message_id(data) if msg_id: @@ -305,6 +350,7 @@ def _handle_message_internal(data): # 记录 pending open_id(用于绑定) open_id = _get_sender_open_id(data) chat_type = _get_chat_type(data) + chat_id = _get_chat_id(data) text = _get_message_text(data) if open_id: @@ -317,6 +363,19 @@ def _handle_message_internal(data): except Exception: pass + # 群聊 @提及 处理:当用户在群里 @机器人时,解析意图 → 创建 Goal + if chat_type == "group" and _is_bot_mentioned(data) and text and chat_id: + logger.info("飞书群聊@提及: chat_id=%s open_id=%s text=%s", chat_id, open_id[:20] if open_id else "", text[:80] if text else "(空)") + try: + loop = asyncio.get_event_loop() + if loop.is_running(): + asyncio.ensure_future(_handle_group_mention_async(data, chat_id, open_id)) + else: + loop.run_until_complete(_handle_group_mention_async(data, chat_id, open_id)) + except Exception as e: + logger.error("群聊@提及处理失败: %s", e) + return + if not open_id or chat_type != "p2p": return @@ -368,6 +427,177 @@ def _make_llm_logger(db, agent_id: Optional[str] = None, user_id: Optional[str] return _log +async def _handle_group_mention_async(data, chat_id: str, open_id: str): + """处理群聊 @提及 — 解析意图、创建 Goal 并回复群聊。""" + text = _get_message_text(data) + if not text: + return + + try: + _reply_to_group(chat_id, "🤔 收到!正在分析你的需求...") + + from sqlalchemy.orm import Session + from app.core.database import SessionLocal + from app.models.user import User + + db = SessionLocal() + try: + user = db.query(User).filter(User.feishu_open_id == open_id).first() + if not user: + _reply_to_group(chat_id, "你的飞书账号尚未绑定平台用户,请先在平台绑定飞书。") + return + + # 尝试提取目标意图 + goal_triggers = ["创建目标:", "目标:", "创建任务:", "帮我", "请帮我", "帮我做", "请"] + goal_title = text + for trigger in goal_triggers: + if text.lower().startswith(trigger.lower()): + goal_title = text[len(trigger):].strip() + break + + if goal_title: + await _handle_goal_creation(db, user.id, goal_title[:500], open_id) + _reply_to_group(chat_id, f"✅ 目标已创建并开始执行") + else: + # 通用 Agent 对话 + from app.models.agent import Agent + agent_id = user.feishu_default_agent_id + if agent_id: + agent = db.query(Agent).filter(Agent.id == agent_id).first() + if agent: + from app.agent_runtime import AgentRuntime, AgentConfig, AgentLLMConfig, AgentToolConfig, AgentMemoryConfig + config = AgentConfig( + name=agent.name or "agent", + system_prompt=agent.description or "", + llm=AgentLLMConfig(model="deepseek-v4-flash", provider="deepseek", temperature=0.7, max_iterations=10), + tools=AgentToolConfig(), + memory=AgentMemoryConfig(), + user_id=user.id, + memory_scope_id=str(agent.id), + ) + runtime = AgentRuntime(config=config) + result = await runtime.run(text) + if result.content: + _reply_to_group(chat_id, result.content.strip()[:2000]) + else: + _reply_to_group(chat_id, "抱歉,未能处理你的请求。") + finally: + db.close() + except Exception as e: + logger.error("群聊@提及处理失败: %s", e) + try: + _reply_to_group(chat_id, f"处理失败: {e!s}") + except Exception: + pass + + +def _handle_approval_callback(data): + """处理飞书审批回调 — 审批通过/驳回后恢复 Task 执行。""" + try: + event_type = getattr(data, "event_type", "") or getattr(data.event if hasattr(data, "event") else None, "type", "") + logger.info("飞书审批回调: event_type=%s", event_type) + + db = None + from app.core.database import SessionLocal + from app.models.task import Task + + try: + db = SessionLocal() + + # 查找 awaiting_approval 状态的任务 + waiting_tasks = db.query(Task).filter(Task.approval_status == "pending").all() + for task in waiting_tasks: + # 检查审批是否关联此任务 + approval_type = getattr(data.event, "approval_type", "") or "" + status = getattr(data.event, "status", "") or "" + + if status == "approved": + task.status = "in_progress" + task.approval_status = "approved" + task.error_message = None + logger.info("审批通过: task_id=%s", task.id) + + # 异步恢复执行 + try: + loop = asyncio.get_event_loop() + if loop.is_running(): + asyncio.ensure_future(_resume_approved_task(task.id)) + else: + loop.run_until_complete(_resume_approved_task(task.id)) + except Exception: + pass + elif status == "rejected": + task.status = "failed" + task.approval_status = "rejected" + task.error_message = "审批驳回" + logger.info("审批驳回: task_id=%s", task.id) + + db.commit() + finally: + if db: + db.close() + except Exception as e: + logger.warning("审批回调处理失败: %s", e) + + +async def _resume_approved_task(task_id: str): + """审批通过后恢复任务执行。""" + try: + from app.core.database import SessionLocal + from app.services.main_agent_service import MainAgentService + from app.services import goal_service + + db = SessionLocal() + try: + svc = MainAgentService(db) + result = await svc.execute_task(task_id) + goal_service.update_task(db=db, task_id=task_id, status="completed", result=result) + finally: + db.close() + except Exception as e: + logger.error("审批通过后执行任务失败: task_id=%s error=%s", task_id, e) + + +async def send_daily_progress_report(): + """每日自动进度汇报 — 由定时调度触发,汇总所有活跃 Goal 的进度并通过飞书通知。""" + try: + from app.core.database import SessionLocal + from app.models.goal import Goal + from app.models.user import User + + db = SessionLocal() + try: + active_goals = db.query(Goal).filter(Goal.status == "active").all() + if not active_goals: + logger.info("每日汇报: 无活跃目标") + return + + report_lines = ["## 📊 每日进度汇报\n"] + for g in active_goals: + pct = int((g.progress or 0) * 100) + report_lines.append(f"- **{g.title}** [P{g.priority}] — {pct}% 完成") + + report = "\n".join(report_lines) + + # 通知所有有活跃目标的用户 + notified = set() + for g in active_goals: + creator = db.query(User).filter(User.id == g.creator_id).first() + if not creator or not creator.feishu_open_id or creator.feishu_open_id in notified: + continue + try: + _reply_card(creator.feishu_open_id, "每日进度汇报", report, status="info") + notified.add(creator.feishu_open_id) + except Exception as e: + logger.warning("每日汇报通知用户 %s 失败: %s", creator.id, e) + + logger.info("每日汇报完成: 活跃目标=%d 通知用户=%d", len(active_goals), len(notified)) + finally: + db.close() + except Exception as e: + logger.error("每日汇报失败: %s", e) + + def _build_event_handler(): """构建事件处理器。""" from lark_oapi.event.dispatcher_handler import EventDispatcherHandler @@ -381,6 +611,13 @@ def _build_event_handler(): verification_token=settings.FEISHU_VERIFICATION_TOKEN, ) builder.register_p2_im_message_receive_v1(on_message_receive) + + # 审批事件回调 + def on_approval_event(data): + _handle_approval_callback(data) + + builder.register_p2_approval_instance_event_v1(on_approval_event) + return builder.build() diff --git a/frontend/src/views/GoalDetail.vue b/frontend/src/views/GoalDetail.vue index 53f2771..26e5f05 100644 --- a/frontend/src/views/GoalDetail.vue +++ b/frontend/src/views/GoalDetail.vue @@ -26,6 +26,7 @@ 优先级: P{{ goal?.priority }} 截止: {{ goal?.deadline ? new Date(goal.deadline).toLocaleDateString() : '无期限' }} 创建: {{ goal?.created_at ? new Date(goal.created_at).toLocaleDateString() : '-' }} + 更新: {{ lastUpdate }} @@ -35,12 +36,18 @@

{{ goal.description }}

- + @@ -48,7 +55,8 @@ -
+ +
+ + +
+
+
任务
+
+
+ {{ d.label }} +
+
+
+
+
+ {{ taskStatusLabel(task.status) }} + {{ task.title }} +
+
+
+ {{ task.title }} +
+
+
+