""" 代码评审代理 — 数字分身自动审查 PR diff,生成审查报告 """ from __future__ import annotations import logging import re from typing import Any, Dict, List, Optional from app.services.fingerprint_engine import fingerprint_engine from app.services.shadow_executor import shadow_executor from app.services.behavior_collector import behavior_collector logger = logging.getLogger(__name__) REVIEW_CATEGORIES = { "security": {"label": "安全漏洞", "weight": 0.30, "patterns": [ "password", "secret", "token", "api_key", "private_key", "eval\\(", "exec\\(", "os\\.system", "subprocess\\.call\\(.*shell=True", "raw.*sql|execute\\(.*\\+|string\\s+concatenation.*sql", "innerHTML|dangerouslySetInnerHTML", ]}, "data_integrity": {"label": "数据完整性", "weight": 0.20, "patterns": [ "DELETE\\s+FROM|DROP\\s+TABLE|TRUNCATE", "db\\.execute|db\\.query.*f[\"']", "without.*migration|schema.*change", ]}, "error_handling": {"label": "异常处理", "weight": 0.15, "patterns": [ "except:\\s*$|except\\s+Exception:\\s*pass", "catch\\s*\\(\\s*\\)", "\\.get\\(.*\\)(?!.*or|.*default)", ]}, "performance": {"label": "性能问题", "weight": 0.15, "patterns": [ "for.*in.*range.*\\n.*query|N\\+1", "\\.all\\(\\).*for|SELECT \\*", "sleep\\(|time\\.sleep", ]}, "readability": {"label": "可读性", "weight": 0.10, "patterns": [ "def [a-z]{1,2}\\(", "#.*TODO|#.*FIXME|#.*HACK", ]}, "style": {"label": "代码风格", "weight": 0.10, "patterns": [ "print\\(|console\\.log", "debug=True|DEBUG\\s*=\\s*True", ]}, } class CodeReviewAgent: """代码评审代理 — 数字分身审查 PR""" def review_pr(self, user_id: str, pr_url: str, title: str = "", description: str = "", diff: str = "", files_changed: Optional[List[str]] = None) -> Dict[str, Any]: """评审 PR diff,生成审查报告。 Args: user_id: 用户ID(用于加载行为指纹偏好) pr_url: PR 链接 title: PR 标题 description: PR 描述 diff: diff 内容(可从 Gitea API 获取) files_changed: 变更文件列表 """ fp = fingerprint_engine.get_fingerprint(user_id) preference = (fp.get("preference_weights", {}).get("code_review") if fp else None) or {} review_focus = self._compute_review_focus(preference) findings = [] if diff: findings = self._analyze_diff(diff, review_focus, files_changed or []) severity_counts = {"critical": 0, "major": 0, "minor": 0, "suggestion": 0} for f in findings: severity_counts[f["severity"]] = severity_counts.get(f["severity"], 0) + 1 verdict = self._determine_verdict(findings) summary = self._generate_summary(title, description, findings, verdict) suggestion = { "category": "code_review", "pr_url": pr_url, "title": title, "verdict": verdict, "summary": summary, "findings": findings, "severity_counts": severity_counts, "review_focus": review_focus, "files_changed": files_changed or [], } # 通过影子模式记录(如果启用) shadow_executor.generate_suggestion(user_id, "code_review", { "pr_url": pr_url, "diff_length": len(diff), "files_count": len(files_changed or []), }) # 记录行为 behavior_collector.log_fire_and_forget( user_id=user_id, category="code_review", action="review_pr", context={"pr_url": pr_url, "files_count": len(files_changed or [])}, result={"verdict": verdict, "findings_count": len(findings)}, ) return suggestion def _compute_review_focus(self, preference: Dict[str, float]) -> Dict[str, float]: """根据用户偏好调整审查权重。""" focus = {} for cat, meta in REVIEW_CATEGORIES.items(): pref_key = cat user_weight = preference.get(pref_key, meta["weight"]) focus[cat] = round(user_weight, 2) sorted_focus = dict(sorted(focus.items(), key=lambda x: x[1], reverse=True)) return sorted_focus def _analyze_diff(self, diff: str, review_focus: Dict[str, float], files: List[str]) -> List[Dict[str, Any]]: """分析 diff 内容,识别问题。""" findings = [] lines = diff.split("\n") for cat, meta in REVIEW_CATEGORIES.items(): if review_focus.get(cat, 0) < 0.05: continue for pattern in meta["patterns"]: for i, line in enumerate(lines): if re.search(pattern, line, re.IGNORECASE): context_start = max(0, i - 2) context_end = min(len(lines), i + 3) severity = self._classify_severity(cat, line) findings.append({ "category": cat, "label": meta["label"], "severity": severity, "line_number": i + 1, "matched_line": line.strip()[:200], "context": "\n".join(lines[context_start:context_end]), "message": self._generate_finding_message(cat, line.strip()), }) break # 限制最多 30 条 findings.sort(key=lambda x: {"critical": 0, "major": 1, "minor": 2, "suggestion": 3}[x["severity"]]) return findings[:30] def _classify_severity(self, category: str, line: str) -> str: """根据类别和行内容判定严重程度。""" critical_patterns = [ "password", "secret", "token", "api_key", "private_key", "DELETE FROM", "DROP TABLE", "eval(", "exec(", ] major_patterns = [ "raw.*sql", "innerHTML", "dangerouslySetInnerHTML", "except:", "except Exception:", "pass", ] line_lower = line.lower() if any(p.lower() in line_lower for p in critical_patterns): return "critical" if any(p.lower() in line_lower for p in major_patterns): return "major" if category in ("performance", "data_integrity"): return "major" if category in ("error_handling",): return "minor" return "suggestion" def _generate_finding_message(self, category: str, line: str) -> str: """生成问题描述。""" messages = { "security": f"潜在安全风险: {line[:80]}", "data_integrity": f"数据操作需注意: {line[:80]}", "error_handling": f"异常处理建议改进: {line[:80]}", "performance": f"可能的性能问题: {line[:80]}", "readability": f"可读性建议: {line[:80]}", "style": f"代码风格提醒: {line[:80]}", } return messages.get(category, f"代码问题: {line[:80]}") def _determine_verdict(self, findings: List[Dict[str, Any]]) -> str: """根据发现的问题判定审查结论。""" critical = sum(1 for f in findings if f["severity"] == "critical") major = sum(1 for f in findings if f["severity"] == "major") if critical > 0: return "request_changes" if major > 3: return "request_changes" if major > 0: return "comment" if findings: return "comment" return "approve" def _generate_summary(self, title: str, description: str, findings: List[Dict[str, Any]], verdict: str) -> str: """生成审查摘要。""" verdict_text = { "approve": "建议通过 — 未发现重大问题", "comment": "建议改进后通过 — 有若干建议可参考", "request_changes": "建议修改后重新提交 — 存在需要关注的问题", } critical = sum(1 for f in findings if f["severity"] == "critical") major = sum(1 for f in findings if f["severity"] == "major") minor = sum(1 for f in findings if f["severity"] == "minor") return ( f"PR: {title or 'Untitled'}\n" f"审查结论: {verdict_text.get(verdict, verdict)}\n" f"发现问题: {critical} 严重, {major} 重要, {minor} 次要, " f"共 {len(findings)} 条" ) code_review_agent = CodeReviewAgent()