feat: using charset_normalizer instead of chardet (#29022)

This commit is contained in:
wangxiaolei
2025-12-05 11:19:19 +08:00
committed by GitHub
parent a849411978
commit 45911ab0af
6 changed files with 61 additions and 32 deletions

View File

@@ -1,7 +1,9 @@
"""Document loader helpers."""
import concurrent.futures
from typing import NamedTuple, cast
from typing import NamedTuple
import charset_normalizer
class FileEncoding(NamedTuple):
@@ -27,14 +29,14 @@ def detect_file_encodings(file_path: str, timeout: int = 5, sample_size: int = 1
sample_size: The number of bytes to read for encoding detection. Default is 1MB.
For large files, reading only a sample is sufficient and prevents timeout.
"""
import chardet
def read_and_detect(file_path: str):
with open(file_path, "rb") as f:
# Read only a sample of the file for encoding detection
# This prevents timeout on large files while still providing accurate encoding detection
rawdata = f.read(sample_size)
return cast(list[dict], chardet.detect_all(rawdata))
def read_and_detect(filename: str):
rst = charset_normalizer.from_path(filename)
best = rst.best()
if best is None:
return []
file_encoding = FileEncoding(encoding=best.encoding, confidence=best.coherence, language=best.language)
return [file_encoding]
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(read_and_detect, file_path)

View File

@@ -5,7 +5,7 @@ from dataclasses import dataclass
from typing import Any, cast
from urllib.parse import unquote
import chardet
import charset_normalizer
import cloudscraper
from readabilipy import simple_json_from_html_string
@@ -69,9 +69,12 @@ def get_url(url: str, user_agent: str | None = None) -> str:
if response.status_code != 200:
return f"URL returned status code {response.status_code}."
# Detect encoding using chardet
detected_encoding = chardet.detect(response.content)
encoding = detected_encoding["encoding"]
# Detect encoding using charset_normalizer
detected_encoding = charset_normalizer.from_bytes(response.content).best()
if detected_encoding:
encoding = detected_encoding.encoding
else:
encoding = "utf-8"
if encoding:
try:
content = response.content.decode(encoding)

View File

@@ -7,7 +7,7 @@ import tempfile
from collections.abc import Mapping, Sequence
from typing import Any
import chardet
import charset_normalizer
import docx
import pandas as pd
import pypandoc
@@ -228,9 +228,12 @@ def _extract_text_by_file_extension(*, file_content: bytes, file_extension: str)
def _extract_text_from_plain_text(file_content: bytes) -> str:
try:
# Detect encoding using chardet
result = chardet.detect(file_content)
encoding = result["encoding"]
# Detect encoding using charset_normalizer
result = charset_normalizer.from_bytes(file_content, cp_isolation=["utf_8", "latin_1", "cp1252"]).best()
if result:
encoding = result.encoding
else:
encoding = "utf-8"
# Fallback to utf-8 if detection fails
if not encoding:
@@ -247,9 +250,12 @@ def _extract_text_from_plain_text(file_content: bytes) -> str:
def _extract_text_from_json(file_content: bytes) -> str:
try:
# Detect encoding using chardet
result = chardet.detect(file_content)
encoding = result["encoding"]
# Detect encoding using charset_normalizer
result = charset_normalizer.from_bytes(file_content).best()
if result:
encoding = result.encoding
else:
encoding = "utf-8"
# Fallback to utf-8 if detection fails
if not encoding:
@@ -269,9 +275,12 @@ def _extract_text_from_json(file_content: bytes) -> str:
def _extract_text_from_yaml(file_content: bytes) -> str:
"""Extract the content from yaml file"""
try:
# Detect encoding using chardet
result = chardet.detect(file_content)
encoding = result["encoding"]
# Detect encoding using charset_normalizer
result = charset_normalizer.from_bytes(file_content).best()
if result:
encoding = result.encoding
else:
encoding = "utf-8"
# Fallback to utf-8 if detection fails
if not encoding:
@@ -424,9 +433,12 @@ def _extract_text_from_file(file: File):
def _extract_text_from_csv(file_content: bytes) -> str:
try:
# Detect encoding using chardet
result = chardet.detect(file_content)
encoding = result["encoding"]
# Detect encoding using charset_normalizer
result = charset_normalizer.from_bytes(file_content).best()
if result:
encoding = result.encoding
else:
encoding = "utf-8"
# Fallback to utf-8 if detection fails
if not encoding: