""" Agent 技能市场 API — 公共市场共享复用 支持: - Agent 一键发布到公共市场 - 市场浏览:按分类、热度、评分排序 - 一键安装到自己的工作区 - 版本管理:发布者更新后,使用者可选择升级 - 评分与评论 """ from fastapi import APIRouter, Depends, HTTPException, status, Query from sqlalchemy.orm import Session from sqlalchemy import func, or_ from pydantic import BaseModel from typing import List, Optional, Dict, Any from datetime import datetime import logging import copy from app.core.database import get_db from app.models.agent import Agent, AgentRating, AgentFavorite from app.api.auth import get_current_user from app.models.user import User from app.core.exceptions import NotFoundError, ValidationError logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/v1/agent-market", tags=["agent-market"]) # ─── Pydantic Schemas ────────────────────────────────────────────── class AgentMarketItem(BaseModel): """市场列表项""" id: str name: str description: Optional[str] = None category: Optional[str] = None tags: Optional[List[str]] = None thumbnail: Optional[str] = None is_featured: bool = False rating_avg: float = 0.0 rating_count: int = 0 use_count: int = 0 view_count: int = 0 version: int = 1 user_id: str creator_username: Optional[str] = None is_favorited: Optional[bool] = None user_rating: Optional[int] = None created_at: Optional[datetime] = None updated_at: Optional[datetime] = None class Config: from_attributes = True class AgentMarketDetail(BaseModel): """市场详情(含工作流配置)""" id: str name: str description: Optional[str] = None category: Optional[str] = None tags: Optional[List[str]] = None thumbnail: Optional[str] = None is_featured: bool = False rating_avg: float = 0.0 rating_count: int = 0 use_count: int = 0 view_count: int = 0 version: int = 1 user_id: str creator_username: Optional[str] = None is_favorited: Optional[bool] = None user_rating: Optional[int] = None workflow_config: Optional[Dict[str, Any]] = None budget_config: Optional[Dict[str, Any]] = None created_at: Optional[datetime] = None updated_at: Optional[datetime] = None class Config: from_attributes = True class PublishRequest(BaseModel): """一键发布到市场""" category: Optional[str] = None tags: Optional[List[str]] = None thumbnail: Optional[str] = None class RatingCreate(BaseModel): """评分/评论创建""" rating: int # 1-5 comment: Optional[str] = None class RatingResponse(BaseModel): """评分响应""" id: str agent_id: str user_id: str username: Optional[str] = None rating: int comment: Optional[str] = None created_at: datetime class Config: from_attributes = True class InstallResponse(BaseModel): """安装结果""" message: str agent_id: str agent_name: str forked_from_id: str upstream_version: int # ─── 辅助函数 ────────────────────────────────────────────────────── def _build_agent_item(agent: Agent, current_user: Optional[User], db: Session) -> dict: """构建市场列表项(含用户特定状态)。""" item = { "id": agent.id, "name": agent.name, "description": agent.description, "category": agent.category, "tags": agent.tags or [], "thumbnail": agent.thumbnail, "is_featured": bool(agent.is_featured), "rating_avg": float(agent.rating_avg or 0), "rating_count": agent.rating_count or 0, "use_count": agent.use_count or 0, "view_count": agent.view_count or 0, "version": agent.version or 1, "user_id": agent.user_id, "creator_username": agent.user.username if agent.user else None, "is_favorited": None, "user_rating": None, "created_at": agent.created_at, "updated_at": agent.updated_at, } if current_user: fav = db.query(AgentFavorite).filter( AgentFavorite.agent_id == agent.id, AgentFavorite.user_id == current_user.id, ).first() item["is_favorited"] = fav is not None r = db.query(AgentRating).filter( AgentRating.agent_id == agent.id, AgentRating.user_id == current_user.id, ).first() item["user_rating"] = r.rating if r else None return item # ─── 市场浏览 API ────────────────────────────────────────────────── @router.get("", response_model=List[AgentMarketItem]) async def browse_market( skip: int = Query(0, ge=0), limit: int = Query(20, ge=1, le=100), search: Optional[str] = None, category: Optional[str] = None, tags: Optional[str] = None, sort_by: str = Query("created_at", pattern="^(created_at|rating_avg|use_count|view_count)$"), sort_order: str = Query("desc", pattern="^(asc|desc)$"), featured_only: bool = Query(False), db: Session = Depends(get_db), current_user: Optional[User] = Depends(get_current_user), ): """浏览 Agent 市场(公开已发布的 Agent)。""" query = db.query(Agent).filter( Agent.is_public == 1, Agent.status.in_(["published", "running"]), ) if search: query = query.filter( or_( Agent.name.like(f"%{search}%"), Agent.description.like(f"%{search}%"), ) ) if category: query = query.filter(Agent.category == category) if tags: tag_list = [t.strip() for t in tags.split(",")] for tag in tag_list: query = query.filter(Agent.tags.contains([tag])) if featured_only: query = query.filter(Agent.is_featured == 1) # 排序 sort_col = { "rating_avg": Agent.rating_avg, "use_count": Agent.use_count, "view_count": Agent.view_count, "created_at": Agent.created_at, }.get(sort_by, Agent.created_at) order_fn = sort_col.desc() if sort_order == "desc" else sort_col.asc() query = query.order_by(order_fn) agents = query.offset(skip).limit(limit).all() return [AgentMarketItem(**_build_agent_item(a, current_user, db)) for a in agents] @router.get("/{agent_id}", response_model=AgentMarketDetail) async def get_market_agent( agent_id: str, db: Session = Depends(get_db), current_user: Optional[User] = Depends(get_current_user), ): """查看市场 Agent 详情(含工作流配置)。""" agent = db.query(Agent).filter(Agent.id == agent_id).first() if not agent: raise NotFoundError("Agent", agent_id) if not agent.is_public and (not current_user or agent.user_id != current_user.id): raise HTTPException(status_code=403, detail="无权访问此 Agent") agent.view_count = (agent.view_count or 0) + 1 db.commit() base = _build_agent_item(agent, current_user, db) base["workflow_config"] = agent.workflow_config base["budget_config"] = agent.budget_config return AgentMarketDetail(**base) # ─── 发布 / 下架 ─────────────────────────────────────────────────── @router.post("/publish/{agent_id}") async def publish_to_market( agent_id: str, data: Optional[PublishRequest] = None, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """一键发布 Agent 到公共市场。""" agent = db.query(Agent).filter(Agent.id == agent_id).first() if not agent: raise NotFoundError("Agent", agent_id) if agent.user_id != current_user.id and current_user.role != "admin": raise HTTPException(status_code=403, detail="无权发布此 Agent") agent.is_public = 1 if agent.status not in ("published", "running"): agent.status = "published" if data: if data.category is not None: agent.category = data.category if data.tags is not None: agent.tags = data.tags if data.thumbnail is not None: agent.thumbnail = data.thumbnail db.commit() db.refresh(agent) logger.info("Agent 发布到市场: %s (%s)", agent.name, agent.id) return {"message": "已发布到市场", "agent_id": str(agent.id)} @router.post("/unpublish/{agent_id}") async def unpublish_from_market( agent_id: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """下架 Agent。""" agent = db.query(Agent).filter(Agent.id == agent_id).first() if not agent: raise NotFoundError("Agent", agent_id) if agent.user_id != current_user.id and current_user.role != "admin": raise HTTPException(status_code=403, detail="无权下架此 Agent") agent.is_public = 0 db.commit() logger.info("Agent 已下架: %s (%s)", agent.name, agent.id) return {"message": "已下架", "agent_id": str(agent.id)} # ─── 安装 / Fork ─────────────────────────────────────────────────── @router.post("/{agent_id}/install", response_model=InstallResponse, status_code=201) async def install_agent( agent_id: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """一键安装 Agent 到自己的工作区(Fork)。""" original = db.query(Agent).filter(Agent.id == agent_id).first() if not original: raise NotFoundError("Agent", agent_id) if not original.is_public and (not current_user or original.user_id != current_user.id): raise HTTPException(status_code=403, detail="无权安装此 Agent") # 生成副本名称 base = original.name name = f"{base} (市场安装)" counter = 1 while db.query(Agent).filter( Agent.name == name, Agent.user_id == current_user.id, ).first(): counter += 1 name = f"{base} (市场安装 {counter})" new_agent = Agent( name=name, description=original.description, workflow_config=copy.deepcopy(original.workflow_config), budget_config=copy.deepcopy(original.budget_config) if original.budget_config else None, user_id=current_user.id, status="draft", version=1, forked_from_id=agent_id, category=original.category, tags=copy.deepcopy(original.tags) if original.tags else None, ) db.add(new_agent) original.use_count = (original.use_count or 0) + 1 db.commit() db.refresh(new_agent) logger.info("用户 %s 从市场安装了 Agent: %s -> %s", current_user.id, original.name, new_agent.id) return InstallResponse( message="安装成功", agent_id=str(new_agent.id), agent_name=new_agent.name, forked_from_id=agent_id, upstream_version=original.version or 1, ) @router.post("/{agent_id}/check-upgrade") async def check_upgrade( agent_id: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """检查 Fork 的 Agent 是否有上游更新。""" agent = db.query(Agent).filter(Agent.id == agent_id).first() if not agent: raise NotFoundError("Agent", agent_id) if agent.user_id != current_user.id: raise HTTPException(status_code=403, detail="无权操作此 Agent") if not agent.forked_from_id: return {"has_upgrade": False, "message": "此 Agent 不是从市场安装的"} upstream = db.query(Agent).filter(Agent.id == agent.forked_from_id).first() if not upstream: return {"has_upgrade": False, "message": "上游 Agent 已被删除"} has_upgrade = (upstream.version or 1) > (agent.version or 1) or ( upstream.updated_at and agent.updated_at and upstream.updated_at > agent.created_at ) return { "has_upgrade": has_upgrade, "upstream_version": upstream.version or 1, "current_version": agent.version or 1, "upstream_name": upstream.name, } @router.post("/{agent_id}/upgrade", response_model=InstallResponse) async def upgrade_agent( agent_id: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """升级 Fork 的 Agent 到上游最新版本。""" agent = db.query(Agent).filter(Agent.id == agent_id).first() if not agent: raise NotFoundError("Agent", agent_id) if agent.user_id != current_user.id: raise HTTPException(status_code=403, detail="无权操作此 Agent") if not agent.forked_from_id: raise HTTPException(status_code=400, detail="此 Agent 不是从市场安装的,无法升级") upstream = db.query(Agent).filter(Agent.id == agent.forked_from_id).first() if not upstream: raise HTTPException(status_code=400, detail="上游 Agent 已被删除") # 更新工作流配置和预算配置 agent.workflow_config = copy.deepcopy(upstream.workflow_config) if upstream.budget_config: agent.budget_config = copy.deepcopy(upstream.budget_config) agent.version = upstream.version or 1 db.commit() db.refresh(agent) logger.info("Agent %s 已升级到上游版本 %s", agent.id, upstream.version) return InstallResponse( message="升级成功", agent_id=str(agent.id), agent_name=agent.name, forked_from_id=agent.forked_from_id, upstream_version=upstream.version or 1, ) # ─── 评分 & 评论 ─────────────────────────────────────────────────── @router.post("/{agent_id}/rate", status_code=201) async def rate_agent( agent_id: str, data: RatingCreate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """评分/评论 Agent。""" if data.rating < 1 or data.rating > 5: raise ValidationError("评分必须在 1-5 之间") agent = db.query(Agent).filter(Agent.id == agent_id).first() if not agent: raise NotFoundError("Agent", agent_id) existing = db.query(AgentRating).filter( AgentRating.agent_id == agent_id, AgentRating.user_id == current_user.id, ).first() if existing: existing.rating = data.rating existing.comment = data.comment else: r = AgentRating( agent_id=agent_id, user_id=current_user.id, rating=data.rating, comment=data.comment, ) db.add(r) agent.rating_count = (agent.rating_count or 0) + 1 # 重新计算平均分 avg = db.query(func.avg(AgentRating.rating)).filter( AgentRating.agent_id == agent_id, ).scalar() agent.rating_avg = str(round(float(avg), 1)) if avg else "0.0" db.commit() return { "message": "评分成功", "rating": data.rating, "rating_avg": float(agent.rating_avg), "rating_count": agent.rating_count, } @router.get("/{agent_id}/ratings", response_model=List[RatingResponse]) async def get_agent_ratings( agent_id: str, skip: int = Query(0, ge=0), limit: int = Query(20, ge=1, le=100), db: Session = Depends(get_db), ): """获取 Agent 的评分/评论列表。""" ratings = ( db.query(AgentRating) .filter(AgentRating.agent_id == agent_id) .order_by(AgentRating.created_at.desc()) .offset(skip) .limit(limit) .all() ) result = [] for r in ratings: result.append({ "id": r.id, "agent_id": r.agent_id, "user_id": r.user_id, "username": r.user.username if r.user else None, "rating": r.rating, "comment": r.comment, "created_at": r.created_at, }) return result # ─── 收藏 ────────────────────────────────────────────────────────── @router.post("/{agent_id}/favorite", status_code=201) async def favorite_agent( agent_id: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """收藏 Agent。""" agent = db.query(Agent).filter(Agent.id == agent_id).first() if not agent: raise NotFoundError("Agent", agent_id) existing = db.query(AgentFavorite).filter( AgentFavorite.agent_id == agent_id, AgentFavorite.user_id == current_user.id, ).first() if existing: raise HTTPException(status_code=400, detail="已收藏此 Agent") fav = AgentFavorite(agent_id=agent_id, user_id=current_user.id) db.add(fav) db.commit() return {"message": "收藏成功"} @router.delete("/{agent_id}/favorite") async def unfavorite_agent( agent_id: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """取消收藏。""" fav = db.query(AgentFavorite).filter( AgentFavorite.agent_id == agent_id, AgentFavorite.user_id == current_user.id, ).first() if not fav: raise HTTPException(status_code=404, detail="未收藏此 Agent") db.delete(fav) db.commit() return {"message": "已取消收藏"} @router.get("/my/favorites", response_model=List[AgentMarketItem]) async def my_favorites( skip: int = Query(0, ge=0), limit: int = Query(20, ge=1, le=100), db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """获取我的收藏。""" favs = ( db.query(AgentFavorite) .filter(AgentFavorite.user_id == current_user.id) .order_by(AgentFavorite.created_at.desc()) .offset(skip) .limit(limit) .all() ) result = [] for f in favs: agent = db.query(Agent).filter(Agent.id == f.agent_id).first() if agent: item = _build_agent_item(agent, current_user, db) item["is_favorited"] = True result.append(AgentMarketItem(**item)) return result @router.get("/my/shared", response_model=List[AgentMarketItem]) async def my_shared( skip: int = Query(0, ge=0), limit: int = Query(20, ge=1, le=100), db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """获取我分享到市场的 Agent。""" agents = ( db.query(Agent) .filter( Agent.user_id == current_user.id, Agent.is_public == 1, ) .order_by(Agent.created_at.desc()) .offset(skip) .limit(limit) .all() ) return [AgentMarketItem(**_build_agent_item(a, current_user, db)) for a in agents] # ─── 分类列表 ────────────────────────────────────────────────────── @router.get("/categories/list") async def list_categories(db: Session = Depends(get_db)): """获取市场中所有分类。""" rows = ( db.query(Agent.category, func.count(Agent.id)) .filter(Agent.is_public == 1, Agent.category.isnot(None)) .group_by(Agent.category) .all() ) return [{"category": cat, "count": cnt} for cat, cnt in rows if cat]