Optimize webscraper (#4392)

Co-authored-by: luowei <glpat-EjySCyNjWiLqAED-YmwM>
Co-authored-by: crazywoola <427733928@qq.com>
Co-authored-by: crazywoola <100913391+crazywoola@users.noreply.github.com>
This commit is contained in:
Charlie.Wei
2024-05-15 15:23:16 +08:00
committed by GitHub
parent c0fe414e0a
commit 97b65f9b4b
4 changed files with 61 additions and 11 deletions

View File

@@ -1,5 +1,6 @@
import hashlib
import json
import mimetypes
import os
import re
import site
@@ -7,6 +8,7 @@ import subprocess
import tempfile
import unicodedata
from contextlib import contextmanager
from urllib.parse import unquote
import requests
from bs4 import BeautifulSoup, CData, Comment, NavigableString
@@ -39,22 +41,34 @@ def get_url(url: str, user_agent: str = None) -> str:
}
if user_agent:
headers["User-Agent"] = user_agent
supported_content_types = extract_processor.SUPPORT_URL_CONTENT_TYPES + ["text/html"]
response = requests.get(url, headers=headers, allow_redirects=True, timeout=(5, 10))
main_content_type = None
supported_content_types = extract_processor.SUPPORT_URL_CONTENT_TYPES + ["text/html"]
response = requests.head(url, headers=headers, allow_redirects=True, timeout=(5, 10))
if response.status_code != 200:
return "URL returned status code {}.".format(response.status_code)
# check content-type
main_content_type = response.headers.get('Content-Type').split(';')[0].strip()
content_type = response.headers.get('Content-Type')
if content_type:
main_content_type = response.headers.get('Content-Type').split(';')[0].strip()
else:
content_disposition = response.headers.get('Content-Disposition')
filename_match = re.search(r'filename="([^"]+)"', content_disposition)
if filename_match:
filename = unquote(filename_match.group(1))
extension = re.search(r'\.(\w+)$', filename)
if extension:
main_content_type = mimetypes.guess_type(filename)[0]
if main_content_type not in supported_content_types:
return "Unsupported content-type [{}] of URL.".format(main_content_type)
if main_content_type in extract_processor.SUPPORT_URL_CONTENT_TYPES:
return ExtractProcessor.load_from_url(url, return_text=True)
response = requests.get(url, headers=headers, allow_redirects=True, timeout=(120, 300))
a = extract_using_readabilipy(response.text)
if not a['plain_text'] or not a['plain_text'].strip():