102 lines
2.7 KiB
Python
102 lines
2.7 KiB
Python
|
|
"""
|
|||
|
|
文档解析器 — 支持 txt / pdf / docx / md / csv。
|
|||
|
|
|
|||
|
|
解析结果统一返回纯文本字符串,由调用方做分块处理。
|
|||
|
|
"""
|
|||
|
|
from __future__ import annotations
|
|||
|
|
|
|||
|
|
import csv
|
|||
|
|
import io
|
|||
|
|
import logging
|
|||
|
|
import os
|
|||
|
|
from typing import Optional
|
|||
|
|
|
|||
|
|
logger = logging.getLogger(__name__)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def parse_document(file_path: str, file_type: str) -> Optional[str]:
|
|||
|
|
"""
|
|||
|
|
解析文档为纯文本。
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
file_path: 文件绝对路径
|
|||
|
|
file_type: 文件类型(txt / pdf / docx / md / csv)
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
文本内容,解析失败返回 None
|
|||
|
|
"""
|
|||
|
|
if not os.path.isfile(file_path):
|
|||
|
|
logger.warning("文件不存在: %s", file_path)
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
parsers = {
|
|||
|
|
"txt": _parse_text,
|
|||
|
|
"md": _parse_text,
|
|||
|
|
"pdf": _parse_pdf,
|
|||
|
|
"docx": _parse_docx,
|
|||
|
|
"csv": _parse_csv,
|
|||
|
|
}
|
|||
|
|
parser = parsers.get(file_type)
|
|||
|
|
if not parser:
|
|||
|
|
logger.warning("不支持的文件类型: %s", file_type)
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
text = parser(file_path)
|
|||
|
|
if text:
|
|||
|
|
text = text.strip()
|
|||
|
|
logger.info("文档解析完成: %s (%s, %d 字符)", file_path, file_type, len(text or ""))
|
|||
|
|
return text
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error("文档解析失败: %s (%s)", file_path, e, exc_info=True)
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _parse_text(file_path: str) -> str:
|
|||
|
|
"""解析纯文本 / Markdown 文件。"""
|
|||
|
|
with open(file_path, "r", encoding="utf-8", errors="replace") as f:
|
|||
|
|
return f.read()
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _parse_pdf(file_path: str) -> str:
|
|||
|
|
"""解析 PDF 文件。"""
|
|||
|
|
from pypdf import PdfReader
|
|||
|
|
|
|||
|
|
reader = PdfReader(file_path)
|
|||
|
|
pages = []
|
|||
|
|
for page in reader.pages:
|
|||
|
|
text = page.extract_text()
|
|||
|
|
if text:
|
|||
|
|
pages.append(text)
|
|||
|
|
return "\n\n".join(pages)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _parse_docx(file_path: str) -> str:
|
|||
|
|
"""解析 Word 文档。"""
|
|||
|
|
from docx import Document as DocxDocument
|
|||
|
|
|
|||
|
|
doc = DocxDocument(file_path)
|
|||
|
|
paragraphs = []
|
|||
|
|
for para in doc.paragraphs:
|
|||
|
|
if para.text and para.text.strip():
|
|||
|
|
paragraphs.append(para.text.strip())
|
|||
|
|
|
|||
|
|
# 也提取表格内容
|
|||
|
|
for table in doc.tables:
|
|||
|
|
for row in table.rows:
|
|||
|
|
cells = [cell.text.strip() for cell in row.cells if cell.text.strip()]
|
|||
|
|
if cells:
|
|||
|
|
paragraphs.append(" | ".join(cells))
|
|||
|
|
|
|||
|
|
return "\n\n".join(paragraphs)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _parse_csv(file_path: str) -> str:
|
|||
|
|
"""解析 CSV 文件为文本表格。"""
|
|||
|
|
rows = []
|
|||
|
|
with open(file_path, "r", encoding="utf-8", errors="replace") as f:
|
|||
|
|
reader = csv.reader(f)
|
|||
|
|
for row in reader:
|
|||
|
|
rows.append(" | ".join(cell.strip() for cell in row))
|
|||
|
|
return "\n".join(rows)
|