""" 文本分块器 — 将长文本切分为适合 Embedding + 检索的块。 策略: 1. 按段落分割(连续换行) 2. 超长段落按句子切分(句号、问号、感叹号、换行) 3. 短段落合并,直到接近 chunk_size 4. chunk 之间保留 overlap 字符的重叠 """ from __future__ import annotations import re from typing import List def chunk_text( text: str, chunk_size: int = 500, chunk_overlap: int = 50, ) -> List[str]: """ 将文本分割为语义完整的块。 Args: text: 输入文本 chunk_size: 每块目标字符数 chunk_overlap: 相邻块重叠字符数 Returns: 文本块列表 """ if not text or not text.strip(): return [] # 1. 按段落分割 paragraphs = _split_paragraphs(text) if not paragraphs: return [] # 2. 处理每个段落:超长段落进一步拆分 segments: List[str] = [] for para in paragraphs: if len(para) <= chunk_size: segments.append(para) else: segments.extend(_split_long_paragraph(para, chunk_size)) # 3. 合并短段落为块 chunks = _merge_segments(segments, chunk_size, chunk_overlap) return chunks def _split_paragraphs(text: str) -> List[str]: """按连续换行分割段落,过滤空白段落。""" # 按两个以上换行分割 raw = re.split(r"\n\s*\n", text) paras = [] for p in raw: p = p.strip() if p: paras.append(p) return paras def _split_long_paragraph(para: str, chunk_size: int) -> List[str]: """将超长段落按句子切分。""" # 先按句子结束符分割(去掉 look-behind 长度限制) sentences = re.split(r"(?<=[。!?])|(?<=[.!?])\s*", para) sentences = [s.strip() for s in sentences if s.strip()] if not sentences: # 无法按句子分割,按字符硬切 return [para[i : i + chunk_size] for i in range(0, len(para), chunk_size)] chunks: List[str] = [] current = "" for sent in sentences: if len(current) + len(sent) <= chunk_size: current += sent else: if current: chunks.append(current.strip()) current = sent if current: chunks.append(current.strip()) # 如果某个块还是太长(单句超长),按字符硬切 result: List[str] = [] for c in chunks: if len(c) <= chunk_size: result.append(c) else: for i in range(0, len(c), chunk_size): result.append(c[i : i + chunk_size]) return result def _merge_segments(segments: List[str], chunk_size: int, overlap: int) -> List[str]: """合并短段落为块,块间带重叠。""" if not segments: return [] chunks: List[str] = [] current = "" for seg in segments: # 如果当前块 + 新段不超过上限,追加 if not current: current = seg elif len(current) + 1 + len(seg) <= chunk_size: current += "\n\n" + seg else: # 当前块已满 chunks.append(current.strip()) # 重叠:取前一块末尾的 overlap 字符 if overlap > 0 and len(current) > overlap: # 从上一个块末尾取 overlap 字符作为新块的起始 tail = current[-overlap:] # 从上一个换行处开始,避免截断句子 newline_pos = tail.find("\n") if newline_pos > 0 and newline_pos < overlap // 2: tail = tail[newline_pos + 1 :] current = tail + "\n\n" + seg else: current = seg if current.strip(): chunks.append(current.strip()) return chunks