Files
aiagent/backend/app/services/document_parser.py

102 lines
2.7 KiB
Python
Raw Permalink Normal View History

"""
文档解析器 支持 txt / pdf / docx / md / csv
解析结果统一返回纯文本字符串由调用方做分块处理
"""
from __future__ import annotations
import csv
import io
import logging
import os
from typing import Optional
logger = logging.getLogger(__name__)
def parse_document(file_path: str, file_type: str) -> Optional[str]:
"""
解析文档为纯文本
Args:
file_path: 文件绝对路径
file_type: 文件类型txt / pdf / docx / md / csv
Returns:
文本内容解析失败返回 None
"""
if not os.path.isfile(file_path):
logger.warning("文件不存在: %s", file_path)
return None
parsers = {
"txt": _parse_text,
"md": _parse_text,
"pdf": _parse_pdf,
"docx": _parse_docx,
"csv": _parse_csv,
}
parser = parsers.get(file_type)
if not parser:
logger.warning("不支持的文件类型: %s", file_type)
return None
try:
text = parser(file_path)
if text:
text = text.strip()
logger.info("文档解析完成: %s (%s, %d 字符)", file_path, file_type, len(text or ""))
return text
except Exception as e:
logger.error("文档解析失败: %s (%s)", file_path, e, exc_info=True)
return None
def _parse_text(file_path: str) -> str:
"""解析纯文本 / Markdown 文件。"""
with open(file_path, "r", encoding="utf-8", errors="replace") as f:
return f.read()
def _parse_pdf(file_path: str) -> str:
"""解析 PDF 文件。"""
from pypdf import PdfReader
reader = PdfReader(file_path)
pages = []
for page in reader.pages:
text = page.extract_text()
if text:
pages.append(text)
return "\n\n".join(pages)
def _parse_docx(file_path: str) -> str:
"""解析 Word 文档。"""
from docx import Document as DocxDocument
doc = DocxDocument(file_path)
paragraphs = []
for para in doc.paragraphs:
if para.text and para.text.strip():
paragraphs.append(para.text.strip())
# 也提取表格内容
for table in doc.tables:
for row in table.rows:
cells = [cell.text.strip() for cell in row.cells if cell.text.strip()]
if cells:
paragraphs.append(" | ".join(cells))
return "\n\n".join(paragraphs)
def _parse_csv(file_path: str) -> str:
"""解析 CSV 文件为文本表格。"""
rows = []
with open(file_path, "r", encoding="utf-8", errors="replace") as f:
reader = csv.reader(f)
for row in reader:
rows.append(" | ".join(cell.strip() for cell in row))
return "\n".join(rows)