diff --git a/tests/metadata_tests.py b/tests/metadata_tests.py index f7de33c4..632d00fc 100644 --- a/tests/metadata_tests.py +++ b/tests/metadata_tests.py @@ -271,7 +271,8 @@ def test_meta(): assert metadata.title == 'Title' # catch errors - assert extract_metadata('') is None + metadata = extract_metadata('') + assert all(getattr(metadata, a) is None for a in metadata.__slots__) metadata = extract_metadata('') assert metadata.sitename is None metadata = extract_metadata('' + 'AAA'*10000 + '') diff --git a/tests/sitemaps_tests.py b/tests/sitemaps_tests.py index f3788585..f6cf1e57 100644 --- a/tests/sitemaps_tests.py +++ b/tests/sitemaps_tests.py @@ -82,6 +82,7 @@ def test_extraction(): #sitemap.handle_link(url) # (url, '0') # safety belts + assert sitemaps.is_plausible_sitemap('http://example.org/sitemap.xml.gz', None) is False assert sitemaps.is_plausible_sitemap('http://example.org/sitemap.xml.gz', b'\x1f\x8bABC') is False assert sitemaps.is_plausible_sitemap('http://example.org/sitemap.xml', 'ABC') is False assert sitemaps.is_plausible_sitemap('http://test.org/sitemap.xml', '') is False diff --git a/tests/unit_tests.py b/tests/unit_tests.py index ca3f7130..9e034cf4 100644 --- a/tests/unit_tests.py +++ b/tests/unit_tests.py @@ -741,6 +741,8 @@ def test_tei(): def test_htmlprocessing(): '''test html-related functions''' + assert xml.xmltotxt(None, include_formatting=False) == "" + options = DEFAULT_OPTIONS options.tables = True assert trafilatura.htmlprocessing.tree_cleaning(etree.Element('html'), options) is not None @@ -819,6 +821,7 @@ def test_extraction_options(): assert extract(my_html, only_with_metadata=False, output_format='xml', config=ZERO_CONFIG) is not None assert extract(my_html, only_with_metadata=True, output_format='xml', config=ZERO_CONFIG) is None assert extract(my_html, target_language='de', config=ZERO_CONFIG) is None + assert extract(my_html, target_language='de', no_fallback=True, config=ZERO_CONFIG) is None assert etree.tostring(try_justext(html.fromstring(my_html), None, 'de')) == b'' # assert extract(my_html) is None @@ -1383,6 +1386,8 @@ def test_is_probably_readerable(): """ Test is_probably_readerable function. """ + assert not is_probably_readerable("ABC") + very_small_str = "hello there" small_str = "hello there " * 11 large_str = "hello there " * 12 diff --git a/trafilatura/baseline.py b/trafilatura/baseline.py index 998c5097..cb2b7235 100644 --- a/trafilatura/baseline.py +++ b/trafilatura/baseline.py @@ -8,13 +8,14 @@ from typing import Any, Tuple from lxml.etree import _Element, Element, SubElement +from lxml.html import HtmlElement from .settings import BASIC_CLEAN_XPATH from .utils import load_html, trim from .xml import delete_element -def basic_cleaning(tree: _Element) -> _Element: +def basic_cleaning(tree: HtmlElement) -> HtmlElement: "Remove a few section types from the document." for elem in BASIC_CLEAN_XPATH(tree): delete_element(elem) @@ -62,7 +63,7 @@ def baseline(filecontent: Any) -> Tuple[_Element, str, int]: # scrape from article tag temp_text = "" for article_elem in tree.iterfind('.//article'): - text = trim(article_elem.text_content()) + text = trim(article_elem.text_content()) or "" if len(text) > 100: SubElement(postbody, 'p').text = text temp_text += " " + text if temp_text else text @@ -75,7 +76,7 @@ def baseline(filecontent: Any) -> Tuple[_Element, str, int]: temp_text = "" # postbody = Element('body') for element in tree.iter('blockquote', 'code', 'p', 'pre', 'q', 'quote'): - entry = trim(element.text_content()) + entry = trim(element.text_content()) or "" if entry not in results: SubElement(postbody, 'p').text = entry temp_text += " " + entry if temp_text else entry @@ -88,10 +89,11 @@ def baseline(filecontent: Any) -> Tuple[_Element, str, int]: postbody = Element('body') body_elem = tree.find('.//body') if body_elem is not None: - elem = SubElement(postbody, 'p') + p_elem = SubElement(postbody, 'p') # todo: sanitize? - elem.text = '\n'.join([trim(e) for e in body_elem.itertext()]) - return postbody, elem.text, len(elem.text) + text_elems = [trim(e) for e in body_elem.itertext()] + p_elem.text = '\n'.join([e for e in text_elems if e]) + return postbody, p_elem.text, len(p_elem.text) # new fallback text = html2txt(tree, clean=False) diff --git a/trafilatura/cli.py b/trafilatura/cli.py index eecdbbca..4fed7c4f 100644 --- a/trafilatura/cli.py +++ b/trafilatura/cli.py @@ -16,10 +16,11 @@ url_processing_pipeline, write_result) from .settings import PARALLEL_CORES, SUPPORTED_FMT_CLI + # fix output encoding on some systems -if sys.stdout.encoding != 'UTF-8': +if sys.stdout.encoding != 'UTF-8' and hasattr(sys.stdout, 'reconfigure'): sys.stdout.reconfigure(encoding='utf-8') -if sys.stderr.encoding != 'UTF-8': +if sys.stderr.encoding != 'UTF-8' and hasattr(sys.stderr, 'reconfigure'): sys.stderr.reconfigure(encoding='utf-8') diff --git a/trafilatura/cli_utils.py b/trafilatura/cli_utils.py index 340fe108..2b2dc2f8 100644 --- a/trafilatura/cli_utils.py +++ b/trafilatura/cli_utils.py @@ -29,7 +29,7 @@ from .baseline import html2txt from .core import extract from .deduplication import generate_bow_hash -from .downloads import add_to_compressed_dict, buffered_downloads, load_download_buffer +from .downloads import Response, add_to_compressed_dict, buffered_downloads, load_download_buffer from .feeds import find_feed_urls from .meta import reset_caches from .settings import ( @@ -272,7 +272,7 @@ def download_queue_processing( bufferlist, args.parallel, options=options ): # handle result - if result: + if result and isinstance(result, str): options.url = url counter = process_result(result, args, counter, options) else: @@ -380,7 +380,7 @@ def cli_crawler( for url, result in buffered_downloads( bufferlist, args.parallel, decode=False, options=options ): - if result is not None: + if result and isinstance(result, Response): spider.process_response(result, param_dict[get_base_url(url)]) # early exit if maximum count is reached if any(c >= n for c in spider.URL_STORE.get_all_counts()): diff --git a/trafilatura/core.py b/trafilatura/core.py index 075501d3..77328af9 100644 --- a/trafilatura/core.py +++ b/trafilatura/core.py @@ -6,18 +6,31 @@ import logging from copy import copy, deepcopy +from typing import Any, Dict, Optional, Set, Tuple, Union -from lxml.etree import XPath, strip_tags +from lxml.etree import _Element, XPath, strip_tags +from lxml.html import HtmlElement # own from .baseline import baseline from .deduplication import content_fingerprint, duplicate_test from .external import compare_extraction -from .htmlprocessing import build_html_output, convert_tags, prune_unwanted_nodes, tree_cleaning +from .htmlprocessing import ( + build_html_output, + convert_tags, + prune_unwanted_nodes, + tree_cleaning, +) from .main_extractor import extract_comments, extract_content from .metadata import Document, extract_metadata from .settings import DEFAULT_CONFIG, Extractor, use_config -from .utils import LANGID_FLAG, check_html_lang, language_filter, load_html, normalize_unicode +from .utils import ( + LANGID_FLAG, + check_html_lang, + language_filter, + load_html, + normalize_unicode, +) from .xml import build_json_output, control_xml_output, xmltotxt, xmltocsv from .xpaths import REMOVE_COMMENTS_XPATH @@ -27,36 +40,51 @@ TXT_FORMATS = {"markdown", "txt"} -def determine_returnstring(document, options): - '''Convert XML tree to chosen format, clean the result and output it as a string''' +def determine_returnstring(document: Document, options: Extractor) -> str: + """Convert XML tree to chosen format, clean the result and output it as a string""" # XML (TEI) steps - if 'xml' in options.format: + if "xml" in options.format: # last cleaning - for element in document.body.iter('*'): - if element.tag != 'graphic' and len(element) == 0 and not element.text and not element.tail: + for element in document.body.iter("*"): + if ( + element.tag != "graphic" + and len(element) == 0 + and not element.text + and not element.tail + ): parent = element.getparent() # do not remove elements inside to preserve formatting - if parent is not None and parent.tag != 'code': + if parent is not None and parent.tag != "code": parent.remove(element) # build output tree returnstring = control_xml_output(document, options) # CSV - elif options.format == 'csv': + elif options.format == "csv": returnstring = xmltocsv(document, options.formatting) # JSON - elif options.format == 'json': + elif options.format == "json": returnstring = build_json_output(document, options.with_metadata) # HTML - elif options.format == 'html': + elif options.format == "html": returnstring = build_html_output(document, options.with_metadata) # Markdown and TXT else: if options.with_metadata: header = "---\n" for attr in ( - 'title', 'author', 'url', 'hostname', 'description', 'sitename', - 'date', 'categories', 'tags', 'fingerprint', 'id', 'license' - ): + "title", + "author", + "url", + "hostname", + "description", + "sitename", + "date", + "categories", + "tags", + "fingerprint", + "id", + "license", + ): if getattr(document, attr): header += f"{attr}: {str(getattr(document, attr))}\n" header += "---\n" @@ -69,33 +97,60 @@ def determine_returnstring(document, options): return normalize_unicode(returnstring) -def trafilatura_sequence(cleaned_tree, cleaned_tree_backup, tree_backup, options): +def trafilatura_sequence( + cleaned_tree: HtmlElement, + cleaned_tree_backup: HtmlElement, + tree_backup: HtmlElement, + options: Extractor, +) -> Tuple[_Element, str, int]: "Execute the standard cascade of extractors used by Trafilatura." # Trafilatura's main extractor postbody, temp_text, len_text = extract_content(cleaned_tree, options) # comparison with external extractors if not options.fast: - postbody, temp_text, len_text = compare_extraction(cleaned_tree_backup, deepcopy(tree_backup), postbody, temp_text, len_text, options) + postbody, temp_text, len_text = compare_extraction( + cleaned_tree_backup, + deepcopy(tree_backup), + postbody, + temp_text, + len_text, + options, + ) # rescue: baseline extraction on original/dirty tree if len_text < options.min_extracted_size and not options.focus == "precision": postbody, temp_text, len_text = baseline(deepcopy(tree_backup)) - LOGGER.debug('non-clean extracted length: %s (extraction)', len_text) + LOGGER.debug("non-clean extracted length: %s (extraction)", len_text) return postbody, temp_text, len_text -def bare_extraction(filecontent, url=None, no_fallback=False, # fast=False, - favor_precision=False, favor_recall=False, - include_comments=True, output_format="python", target_language=None, - include_tables=True, include_images=False, include_formatting=False, - include_links=False, deduplicate=False, - date_extraction_params=None, - with_metadata=False, only_with_metadata=False, - max_tree_size=None, url_blacklist=None, author_blacklist=None, - as_dict=True, prune_xpath=None, - config=DEFAULT_CONFIG, options=None): +def bare_extraction( + filecontent: Any, + url: Optional[str] = None, + no_fallback: bool = False, # fast=False, + favor_precision: bool = False, + favor_recall: bool = False, + include_comments: bool = True, + output_format: str = "python", + target_language: Optional[str] = None, + include_tables: bool = True, + include_images: bool = False, + include_formatting: bool = False, + include_links: bool = False, + deduplicate: bool = False, + date_extraction_params: Optional[Dict[str, Any]] = None, + with_metadata: bool = False, + only_with_metadata: bool = False, + max_tree_size: Optional[int] = None, + url_blacklist: Optional[Set[str]] = None, + author_blacklist: Optional[Set[str]] = None, + as_dict: bool = True, + prune_xpath: Optional[Any] = None, + config: Any = DEFAULT_CONFIG, + options: Optional[Extractor] = None, +) -> Optional[Union[Document, Dict[str, Any]]]: """Internal function for text extraction returning bare Python variables. Args: @@ -136,53 +191,72 @@ def bare_extraction(filecontent, url=None, no_fallback=False, # fast=False, """ # deprecations - #if no_fallback is True: + # if no_fallback is True: # fast = no_fallback - #warnings.warn( - # '"no_fallback" will be deprecated in a future version, use "fast" instead', - # PendingDeprecationWarning - #) + # warnings.warn( + # '"no_fallback" will be deprecated in a future version, use "fast" instead', + # PendingDeprecationWarning + # ) # load data try: # regroup extraction options if not options or not isinstance(options, Extractor): options = Extractor( - config=config, output_format=output_format, - fast=no_fallback, precision=favor_precision, recall=favor_recall, - comments=include_comments, formatting=include_formatting, links=include_links, - images=include_images, tables=include_tables, - dedup=deduplicate, lang=target_language, max_tree_size=max_tree_size, - url=url, with_metadata=with_metadata, only_with_metadata=only_with_metadata, - author_blacklist=author_blacklist, url_blacklist=url_blacklist, - date_params=date_extraction_params - ) + config=config, + output_format=output_format, + fast=no_fallback, + precision=favor_precision, + recall=favor_recall, + comments=include_comments, + formatting=include_formatting, + links=include_links, + images=include_images, + tables=include_tables, + dedup=deduplicate, + lang=target_language, + max_tree_size=max_tree_size, + url=url, + with_metadata=with_metadata, + only_with_metadata=only_with_metadata, + author_blacklist=author_blacklist, + url_blacklist=url_blacklist, + date_params=date_extraction_params, + ) # load the HTML tree tree = load_html(filecontent) if tree is None: - LOGGER.error('empty HTML tree: %s', url) + LOGGER.error("empty HTML tree: %s", url) raise ValueError # quick and dirty HTML lang check if options.lang and (options.fast or not LANGID_FLAG): if check_html_lang(tree, options.lang) is False: - LOGGER.error('wrong HTML meta language: %s', options.source) + LOGGER.error("wrong HTML meta language: %s", options.source) raise ValueError # extract metadata if necessary if options.with_metadata: - document = extract_metadata(tree, options.url, options.date_params, options.fast, options.author_blacklist) + document = extract_metadata( + tree, + options.url, + options.date_params, + options.fast, + options.author_blacklist, + ) # cut short if extracted URL in blacklist if document.url in options.url_blacklist: - LOGGER.warning('blacklisted URL: %s', document.url) + LOGGER.warning("blacklisted URL: %s", document.url) raise ValueError # cut short if core elements are missing - if options.only_with_metadata and not (document.date and document.title and document.url): - LOGGER.error('no metadata: %s', options.source) + if options.only_with_metadata and not ( + document.date and document.title and document.url + ): + LOGGER.error("no metadata: %s", options.source) raise ValueError else: @@ -204,50 +278,67 @@ def bare_extraction(filecontent, url=None, no_fallback=False, # fast=False, # comments first, then remove if options.comments: - commentsbody, temp_comments, len_comments, cleaned_tree = extract_comments(cleaned_tree, options) + commentsbody, temp_comments, len_comments, cleaned_tree = extract_comments( + cleaned_tree, options + ) else: - commentsbody, temp_comments, len_comments = None, '', 0 + commentsbody, temp_comments, len_comments = None, "", 0 if options.focus == "precision": cleaned_tree = prune_unwanted_nodes(cleaned_tree, REMOVE_COMMENTS_XPATH) - postbody, temp_text, len_text = trafilatura_sequence(cleaned_tree, cleaned_tree_backup, tree, options) + postbody, temp_text, len_text = trafilatura_sequence( + cleaned_tree, cleaned_tree_backup, tree, options + ) # tree size sanity check if options.max_tree_size: # strip tags if len(postbody) > options.max_tree_size: - LOGGER.debug('output tree too long: %s', len(postbody)) - strip_tags(postbody, 'hi') + LOGGER.debug("output tree too long: %s", len(postbody)) + strip_tags(postbody, "hi") # still too long, raise an error if len(postbody) > options.max_tree_size: - LOGGER.debug('output tree too long: %s, discarding %s', len(postbody), options.source) + LOGGER.debug( + "output tree too long: %s, discarding %s", + len(postbody), + options.source, + ) raise ValueError # size checks if options.comments and len_comments < options.min_extracted_comm_size: - LOGGER.debug('not enough comments: %s', options.source) - if len_text < options.min_output_size and \ - len_comments < options.min_output_comm_size: - LOGGER.debug('text and comments not long enough: %s %s %s', len_text, len_comments, options.source) + LOGGER.debug("not enough comments: %s", options.source) + if ( + len_text < options.min_output_size + and len_comments < options.min_output_comm_size + ): + LOGGER.debug( + "text and comments not long enough: %s %s %s", + len_text, + len_comments, + options.source, + ) raise ValueError # check duplicates at body level if options.dedup and duplicate_test(postbody, options) is True: - LOGGER.debug('discarding duplicate document: %s', options.source) + LOGGER.debug("discarding duplicate document: %s", options.source) raise ValueError # sanity check on language if options.lang: - is_not_target_lang, document = language_filter(temp_text, temp_comments, options.lang, document) + is_not_target_lang, document = language_filter( + temp_text, temp_comments, options.lang, document + ) if is_not_target_lang is True: - LOGGER.debug('wrong language: %s', options.source) + LOGGER.debug("wrong language: %s", options.source) raise ValueError except (TypeError, ValueError): - LOGGER.warning('discarding data: %s', options.source) + LOGGER.warning("discarding data: %s", options.source) return None # special case: python variables - if options.format == 'python': + if options.format == "python": document.text = xmltotxt(postbody, options.formatting) if options.comments: document.comments = xmltotxt(commentsbody, options.formatting) @@ -260,17 +351,33 @@ def bare_extraction(filecontent, url=None, no_fallback=False, # fast=False, return document if not as_dict else document.as_dict() -def extract(filecontent, url=None, record_id=None, no_fallback=False, - favor_precision=False, favor_recall=False, - include_comments=True, output_format="txt", - tei_validation=False, target_language=None, - include_tables=True, include_images=False, include_formatting=False, - include_links=False, deduplicate=False, - date_extraction_params=None, - with_metadata=False, only_with_metadata=False, - max_tree_size=None, url_blacklist=None, author_blacklist=None, - settingsfile=None, prune_xpath=None, - config=DEFAULT_CONFIG, options=None): +def extract( + filecontent: Any, + url: Optional[str] = None, + record_id: Optional[str] = None, + no_fallback: bool = False, + favor_precision: bool = False, + favor_recall: bool = False, + include_comments: bool = True, + output_format: str = "txt", + tei_validation: bool = False, + target_language: Optional[str] = None, + include_tables: bool = True, + include_images: bool = False, + include_formatting: bool = False, + include_links: bool = False, + deduplicate: bool = False, + date_extraction_params: Optional[Dict[str, Any]] = None, + with_metadata: bool = False, + only_with_metadata: bool = False, + max_tree_size: Optional[int] = None, + url_blacklist: Optional[Set[str]] = None, + author_blacklist: Optional[Set[str]] = None, + settingsfile: Optional[str] = None, + prune_xpath: Optional[Any] = None, + config: Any = DEFAULT_CONFIG, + options: Optional[Extractor] = None, +) -> Optional[str]: """Main function exposed by the package: Wrapper for text extraction and conversion to chosen output format. @@ -312,21 +419,34 @@ def extract(filecontent, url=None, record_id=None, no_fallback=False, # regroup extraction options if not options or not isinstance(options, Extractor): options = Extractor( - config=use_config(settingsfile, config), output_format=output_format, - fast=no_fallback, precision=favor_precision, recall=favor_recall, - comments=include_comments, formatting=include_formatting, links=include_links, - images=include_images, tables=include_tables, - dedup=deduplicate, lang=target_language, max_tree_size=max_tree_size, - url=url, with_metadata=with_metadata, only_with_metadata=only_with_metadata, - tei_validation=tei_validation, - author_blacklist=author_blacklist, url_blacklist=url_blacklist, - date_params=date_extraction_params - ) + config=use_config(settingsfile, config), + output_format=output_format, + fast=no_fallback, + precision=favor_precision, + recall=favor_recall, + comments=include_comments, + formatting=include_formatting, + links=include_links, + images=include_images, + tables=include_tables, + dedup=deduplicate, + lang=target_language, + max_tree_size=max_tree_size, + url=url, + with_metadata=with_metadata, + only_with_metadata=only_with_metadata, + tei_validation=tei_validation, + author_blacklist=author_blacklist, + url_blacklist=url_blacklist, + date_params=date_extraction_params, + ) # extraction document = bare_extraction( - filecontent, options=options, - as_dict=False, prune_xpath=prune_xpath, + filecontent, + options=options, + as_dict=False, + prune_xpath=prune_xpath, ) # post-processing @@ -336,12 +456,16 @@ def extract(filecontent, url=None, record_id=None, no_fallback=False, if options.format not in TXT_FORMATS: # control output if options.format == "python": - raise ValueError("'python' format only usable in bare_extraction() function") + raise ValueError( + "'python' format only usable in bare_extraction() function" + ) # add record ID to metadata document.id = record_id # calculate fingerprint if document.raw_text is not None: - document.fingerprint = content_fingerprint(str(document.title) + " " + str(document.raw_text)) + document.fingerprint = content_fingerprint( + str(document.title) + " " + str(document.raw_text) + ) # return return determine_returnstring(document, options) diff --git a/trafilatura/deduplication.py b/trafilatura/deduplication.py index f73a8527..fda5188d 100644 --- a/trafilatura/deduplication.py +++ b/trafilatura/deduplication.py @@ -242,7 +242,7 @@ def put_in_cache(teststring: str) -> None: def duplicate_test(element: _Element, options: Any) -> bool: "Check for duplicate text with LRU cache." - teststring = trim(" ".join(element.itertext())) + teststring = trim(" ".join(element.itertext())) or "" # teststring = element.text if len(teststring) > options.min_duplcheck_size: # retrieve value from cache diff --git a/trafilatura/downloads.py b/trafilatura/downloads.py index ca94cd6b..6df19e78 100644 --- a/trafilatura/downloads.py +++ b/trafilatura/downloads.py @@ -33,7 +33,7 @@ PROXY_URL = None try: - import pycurl + import pycurl # type: ignore CURL_SHARE = pycurl.CurlShare() # available options: # https://curl.se/libcurl/c/curl_share_setopt.html @@ -54,12 +54,12 @@ RETRY_STRATEGY = None -def create_pool(**args): +def create_pool(**args: Any) -> Any: "Configure urllib3 download pool according to user-defined settings." manager_class = SOCKSProxyManager if PROXY_URL else urllib3.PoolManager manager_args = {"proxy_url": PROXY_URL} if PROXY_URL else {} - manager_args["num_pools"] = 50 - return manager_class(**manager_args, **args) + manager_args["num_pools"] = 50 # type: ignore[assignment] + return manager_class(**manager_args, **args) # type: ignore[arg-type] DEFAULT_HEADERS = urllib3.util.make_headers(accept_encoding=True) @@ -125,16 +125,17 @@ def as_dict(self) -> Dict[str, str]: # caching throws an error # @lru_cache(maxsize=2) -def _parse_config(config: ConfigParser) -> Tuple[Optional[str], Optional[str]]: +def _parse_config(config: ConfigParser) -> Tuple[Optional[List[str]], Optional[str]]: "Read and extract HTTP header strings from the configuration file." + agent_list = None # load a series of user-agents myagents = config.get("DEFAULT", "USER_AGENTS").strip() or None if myagents is not None and myagents != "": - myagents = myagents.split("\n") + agent_list = myagents.split("\n") # https://developer.mozilla.org/en-US/docs/Web/HTTP/Cookies # todo: support for several cookies? mycookie = config.get("DEFAULT", "COOKIE") or None - return myagents, mycookie + return agent_list, mycookie def _determine_headers( @@ -209,7 +210,7 @@ def _send_urllib_request( def _handle_response( url: str, response: Response, decode: bool, options: Extractor -) -> Optional[Union[Response, str]]: +) -> Optional[Union[Response, str]]: # todo: only return str "Internal function to run safety checks on response result." lentest = len(response.html or response.data or "") if response.status != 200: @@ -349,8 +350,8 @@ def add_to_compressed_dict( def load_download_buffer( - url_store: UrlStore, sleep_time: int = 5 -) -> Tuple[List[str], UrlStore]: + url_store: UrlStore, sleep_time: float = 5.0 +) -> Tuple[Optional[List[str]], UrlStore]: """Determine threading strategy and draw URLs respecting domain-based back-off rules.""" while True: bufferlist = url_store.get_download_urls(time_limit=sleep_time, max_urls=10**5) diff --git a/trafilatura/external.py b/trafilatura/external.py index d5b8762a..3c663461 100644 --- a/trafilatura/external.py +++ b/trafilatura/external.py @@ -5,11 +5,13 @@ import logging +from typing import Any, Tuple + # third-party -from justext.core import (ParagraphMaker, classify_paragraphs, - revise_paragraph_classification) -from justext.utils import get_stoplist, get_stoplists -from lxml.etree import Element, strip_tags, tostring +from justext.core import ParagraphMaker, classify_paragraphs, revise_paragraph_classification # type: ignore +from justext.utils import get_stoplist, get_stoplists # type: ignore +from lxml.etree import _Element, Element, strip_tags, tostring +from lxml.html import HtmlElement # own from .baseline import basic_cleaning @@ -27,7 +29,7 @@ SANITIZED_XPATH = './/aside|.//audio|.//button|.//fieldset|.//figure|.//footer|.//iframe|.//input|.//label|.//link|.//nav|.//noindex|.//noscript|.//object|.//option|.//select|.//source|.//svg|.//time' -def try_readability(htmlinput): +def try_readability(htmlinput: HtmlElement) -> HtmlElement: '''Safety net: try with the generic algorithm readability''' # defaults: min_text_length=25, retry_length=250 try: @@ -36,10 +38,10 @@ def try_readability(htmlinput): return fromstring_bytes(doc.summary()) except Exception as err: LOGGER.warning('readability_lxml failed: %s', err) - return Element('div') + return HtmlElement('div') -def compare_extraction(tree, backup_tree, body, text, len_text, options): +def compare_extraction(tree: HtmlElement, backup_tree: HtmlElement, body: _Element, text: str, len_text: int, options: Any) -> Tuple[_Element, str, int]: '''Decide whether to choose own or external extraction based on a series of heuristics''' # bypass for recall @@ -54,7 +56,7 @@ def compare_extraction(tree, backup_tree, body, text, len_text, options): # try with readability temppost_algo = try_readability(backup_tree) # unicode fix necessary on certain systems (#331) - algo_text = trim(tostring(temppost_algo, method='text', encoding='utf-8').decode('utf-8')) + algo_text = trim(tostring(temppost_algo, method='text', encoding='utf-8').decode('utf-8')) or "" len_algo = len(algo_text) # compare @@ -105,7 +107,7 @@ def compare_extraction(tree, backup_tree, body, text, len_text, options): return body, text, len_text -def jt_stoplist_init(): +def jt_stoplist_init() -> Tuple[str]: 'Retrieve and return the content of all JusText stoplists' global JT_STOPLIST stoplist = set() @@ -115,7 +117,7 @@ def jt_stoplist_init(): return JT_STOPLIST -def custom_justext(tree, stoplist): +def custom_justext(tree: HtmlElement, stoplist: Tuple[str]) -> Any: 'Customized version of JusText processing' paragraphs = ParagraphMaker.make_paragraphs(tree) classify_paragraphs(paragraphs, stoplist, 50, 150, 0.1, 0.2, 0.25, True) @@ -123,7 +125,7 @@ def custom_justext(tree, stoplist): return paragraphs -def try_justext(tree, url, target_language): +def try_justext(tree: HtmlElement, url: str, target_language: str) -> _Element: '''Second safety net: try with the generic algorithm justext''' # init result_body = Element('body') @@ -147,22 +149,20 @@ def try_justext(tree, url, target_language): return result_body -def justext_rescue(tree, options): +def justext_rescue(tree: HtmlElement, options: Any) -> Tuple[_Element, str, int]: '''Try to use justext algorithm as a second fallback''' # additional cleaning tree = basic_cleaning(tree) # proceed temppost_algo = try_justext(tree, options.url, options.lang) - temp_text = trim(' '.join(temppost_algo.itertext())) + temp_text = trim(' '.join(temppost_algo.itertext())) or "" return temppost_algo, temp_text, len(temp_text) -def sanitize_tree(tree, options): +def sanitize_tree(tree: HtmlElement, options: Any) -> Tuple[HtmlElement, str, int]: '''Convert and sanitize the output from the generic algorithm (post-processing)''' # 1. clean cleaned_tree = tree_cleaning(tree, options) - for elem in tree.findall(SANITIZED_XPATH): - elem.getparent().remove(elem) if options.links is False: strip_tags(cleaned_tree, 'a') strip_tags(cleaned_tree, 'span') @@ -185,5 +185,5 @@ def sanitize_tree(tree, options): ] strip_tags(cleaned_tree, *sanitization_list) # 4. return - text = trim(' '.join(cleaned_tree.itertext())) + text = trim(' '.join(cleaned_tree.itertext())) or "" return cleaned_tree, text, len(text) diff --git a/trafilatura/feeds.py b/trafilatura/feeds.py index 72c2b8ba..2e6f9617 100644 --- a/trafilatura/feeds.py +++ b/trafilatura/feeds.py @@ -145,7 +145,7 @@ def find_links(feed_string: str, params: FeedParameters) -> List[str]: # Atom if " List[str]: """Try to find feed URLs. @@ -278,7 +278,6 @@ def find_feed_urls( urlfilter = None downloaded = fetch_url(url) - downloaded = fetch_url(url) if downloaded is not None: # assume it's a feed feed_links = extract_links(downloaded, params) @@ -286,7 +285,8 @@ def find_feed_urls( # assume it's a web page for feed in determine_feed(downloaded, params): feed_string = fetch_url(feed) - feed_links.extend(extract_links(feed_string, params)) + if feed_string: + feed_links.extend(extract_links(feed_string, params)) # filter triggered, prepare it if len(url) > len(baseurl) + 2: urlfilter = url diff --git a/trafilatura/json_metadata.py b/trafilatura/json_metadata.py index af60f9a2..adca561b 100644 --- a/trafilatura/json_metadata.py +++ b/trafilatura/json_metadata.py @@ -220,7 +220,7 @@ def normalize_json(string: str) -> str: string = JSON_UNICODE_REPLACE.sub(lambda match: chr(int(match[1], 16)), string) string = ''.join(c for c in string if ord(c) < 0xD800 or ord(c) > 0xDFFF) string = unescape(string) - return trim(JSON_REMOVE_HTML.sub('', string)) # type: ignore[no-any-return] + return trim(JSON_REMOVE_HTML.sub('', string)) or "" def normalize_authors(current_authors: Optional[str], author_string: str) -> Optional[str]: @@ -240,13 +240,13 @@ def normalize_authors(current_authors: Optional[str], author_string: str) -> Opt author_string = HTML_STRIP_TAGS.sub('', author_string) # examine names for author in AUTHOR_SPLIT.split(author_string): - author = trim(author) + author = trim(author) or "" # remove emoji author = AUTHOR_EMOJI_REMOVE.sub('', author) # remove @username author = AUTHOR_TWITTER.sub('', author) # replace special characters with space - author = trim(AUTHOR_REPLACE_JOIN.sub(' ', author)) + author = trim(AUTHOR_REPLACE_JOIN.sub(' ', author)) or "" author = AUTHOR_REMOVE_NICKNAME.sub('', author) # remove special characters author = AUTHOR_REMOVE_SPECIAL.sub('', author) diff --git a/trafilatura/main_extractor.py b/trafilatura/main_extractor.py index 6a5c26c7..d82c63e1 100644 --- a/trafilatura/main_extractor.py +++ b/trafilatura/main_extractor.py @@ -7,8 +7,10 @@ import re # import regex as re from copy import deepcopy +from typing import Any, Optional, Tuple, Union -from lxml.etree import Element, SubElement, strip_elements, strip_tags, tostring +from lxml.etree import _Element, Element, SubElement, strip_elements, strip_tags, tostring +from lxml.html import HtmlElement # own from .htmlprocessing import (delete_by_link_density, handle_textnode, @@ -33,12 +35,12 @@ NOT_AT_THE_END = {'head', 'ref'} -def _log_event(msg, tag, text): +def _log_event(msg: str, tag: str, text: Optional[Union[bytes, str]]) -> None: "Format extraction event for debugging purposes." LOGGER.debug("%s: %s %s", msg, tag, trim(text or "") or "None") -def handle_titles(element, options): +def handle_titles(element: _Element, options: Any) -> Optional[_Element]: '''Process head elements (titles)''' if len(element) == 0: # maybe needs attention? @@ -64,12 +66,13 @@ def handle_titles(element, options): return None -def handle_formatting(element, options): +def handle_formatting(element: _Element, options: Any) -> Optional[_Element]: '''Process formatting elements (b, i, etc. converted to hi) found outside of paragraphs''' formatting = process_node(element, options) - if len(element) == 0 and formatting is None: + if formatting is None: # and len(element) == 0 return None + # repair orphan elements # if formatting is None: # formatting = Element(element.tag) @@ -101,6 +104,7 @@ def handle_formatting(element, options): # repair orphan elements # shorter code but triggers warning: # parent = element.getparent() or element.getprevious() + parent = element.getparent() if parent is None: parent = element.getprevious() @@ -112,15 +116,15 @@ def handle_formatting(element, options): return processed_element -def add_sub_element(new_child_elem, subelem, processed_subchild): +def add_sub_element(new_child_elem: _Element, subelem: _Element, processed_subchild: _Element) -> None: "Add a sub-element to an existing child element." sub_child_elem = SubElement(new_child_elem, processed_subchild.tag) sub_child_elem.text, sub_child_elem.tail = processed_subchild.text, processed_subchild.tail for attr in subelem.attrib: - sub_child_elem.set(attr, subelem.get(attr)) + sub_child_elem.set(attr, subelem.attrib[attr]) -def process_nested_elements(child, new_child_elem, options): +def process_nested_elements(child: _Element, new_child_elem: _Element, options: Any) -> None: "Iterate through an element child and rewire its descendants." new_child_elem.text = child.text for subelem in child.iterdescendants("*"): @@ -136,25 +140,25 @@ def process_nested_elements(child, new_child_elem, options): #subelem.getparent().remove(subelem) -def update_elem_rendition(elem, new_elem): +def update_elem_rendition(elem: _Element, new_elem: _Element) -> None: "Copy the rend attribute from an existing element to a new one." - if elem.get("rend") is not None: - new_elem.set("rend", elem.get("rend")) + if rend_attr := elem.get("rend"): + new_elem.set("rend", rend_attr) -def is_text_element(elem): +def is_text_element(elem: _Element) -> bool: "Find if the element contains text." return elem is not None and text_chars_test(''.join(elem.itertext())) is True -def define_newelem(processed_elem, orig_elem): +def define_newelem(processed_elem: _Element, orig_elem: _Element) -> None: "Create a new sub-element if necessary." if processed_elem is not None: childelem = SubElement(orig_elem, processed_elem.tag) childelem.text, childelem.tail = processed_elem.text, processed_elem.tail -def handle_lists(element, options): +def handle_lists(element: _Element, options: Any) -> Optional[_Element]: "Process lists elements including their descendants." processed_element = Element(element.tag) @@ -169,8 +173,8 @@ def handle_lists(element, options): if len(child) == 0: processed_child = process_node(child, options) if processed_child is not None: - new_child_elem.text = processed_child.text - if processed_child.tail is not None and processed_child.tail.strip(): + new_child_elem.text = processed_child.text or "" + if processed_child.tail and processed_child.tail.strip(): new_child_elem.text += " " + processed_child.tail processed_element.append(new_child_elem) else: @@ -195,7 +199,7 @@ def handle_lists(element, options): return None -def is_code_block_element(element): +def is_code_block_element(element: _Element) -> bool: "Check if it is a code element according to common structural markers." # pip if element.get("lang") or element.tag == "code": @@ -211,7 +215,7 @@ def is_code_block_element(element): return False -def handle_code_blocks(element): +def handle_code_blocks(element: _Element) -> _Element: "Turn element into a properly tagged code block." processed_element = deepcopy(element) for child in element.iter("*"): @@ -220,7 +224,7 @@ def handle_code_blocks(element): return processed_element -def handle_quotes(element, options): +def handle_quotes(element: _Element, options: Any) -> Optional[_Element]: "Process quotes elements." if is_code_block_element(element): return handle_code_blocks(element) @@ -228,7 +232,8 @@ def handle_quotes(element, options): processed_element = Element(element.tag) for child in element.iter("*"): processed_child = process_node(child, options) # handle_textnode(child, comments_fix=True) - define_newelem(processed_child, processed_element) + if processed_child is not None: + define_newelem(processed_child, processed_element) child.tag = "done" if is_text_element(processed_element): # avoid double/nested tags @@ -237,7 +242,7 @@ def handle_quotes(element, options): return None -def handle_other_elements(element, potential_tags, options): +def handle_other_elements(element: _Element, potential_tags: Any, options: Any) -> Optional[_Element]: "Handle diverse or unknown elements in the scope of relevant tags." # handle w3schools code if element.tag == "div" and "w3-code" in element.get("class", ""): @@ -264,7 +269,7 @@ def handle_other_elements(element, potential_tags, options): return None -def handle_paragraphs(element, potential_tags, options): +def handle_paragraphs(element: _Element, potential_tags: Any, options: Any) -> Optional[_Element]: "Process paragraphs along with their children, trim and clean the content." element.attrib.clear() # todo: test if necessary # strip_tags(element, 'p') # change in precision due to spaces? @@ -287,7 +292,7 @@ def handle_paragraphs(element, potential_tags, options): if processed_child.tag == "p": _log_event("extra in p", "p", processed_child.text) if processed_element.text: - processed_element.text += " " + processed_child.text + processed_element.text += " " + (processed_child.text or "") else: processed_element.text = processed_child.text child.tag = "done" @@ -299,14 +304,14 @@ def handle_paragraphs(element, potential_tags, options): if len(processed_child) > 0: for item in processed_child: # children are lists if text_chars_test(item.text) is True: - item.text = " " + item.text + item.text = " " + item.text # type: ignore[operator] strip_tags(processed_child, item.tag) # correct attributes if child.tag == "hi": - newsub.set("rend", child.get("rend")) + newsub.set("rend", child.get("rend", "")) elif child.tag == "ref": if child.get("target") is not None: - newsub.set("target", child.get("target")) + newsub.set("target", child.get("target", "")) # handle line breaks # elif processed_child.tag == 'lb': # try: @@ -341,7 +346,7 @@ def handle_paragraphs(element, potential_tags, options): return None -def define_cell_type(is_header): +def define_cell_type(is_header: bool) -> _Element: "Determine cell element type and mint new element." # define tag cell_element = Element("cell") @@ -350,7 +355,7 @@ def define_cell_type(is_header): return cell_element -def handle_table(table_elem, potential_tags, options): +def handle_table(table_elem: _Element, potential_tags: Any, options: Any) -> Optional[_Element]: "Process single table element." newtable = Element("table") @@ -360,19 +365,24 @@ def handle_table(table_elem, potential_tags, options): # calculate maximum number of columns per row, includin colspan max_cols = 0 for tr in table_elem.iter('tr'): - max_cols = max(max_cols, sum(int(td.get("colspan", 1)) for td in tr.iter(TABLE_ELEMS))) + max_cols = max(max_cols, sum(int(td.get("colspan", 1)) for td in tr.iter(TABLE_ELEMS))) # type: ignore # explore sub-elements seen_header_row = False seen_header = False - row_attrs = {"span": str(max_cols)} if max_cols > 1 else {} - newrow = Element("row", **row_attrs) + span_attr = str(max_cols) if max_cols > 1 else "" + newrow = Element("row") + if span_attr: + newrow.set("span", span_attr) + for subelement in table_elem.iterdescendants(): if subelement.tag == "tr": # process existing row if len(newrow) > 0: newtable.append(newrow) - newrow = Element("row", **row_attrs) + newrow = Element("row") + if span_attr: + newrow.set("span", span_attr) seen_header_row = seen_header_row or seen_header elif subelement.tag in TABLE_ELEMS: is_header = subelement.tag == "th" and not seen_header_row @@ -404,7 +414,8 @@ def handle_table(table_elem, potential_tags, options): # subcell_elem = Element(child.tag) processed_subchild = handle_textelem(child, potential_tags.union(["div"]), options) # add child element to processed_element - define_newelem(processed_subchild, new_child_elem) + if processed_subchild is not None: + define_newelem(processed_subchild, new_child_elem) child.tag = "done" # add to tree if new_child_elem.text or len(new_child_elem) > 0: @@ -426,12 +437,12 @@ def handle_table(table_elem, potential_tags, options): return None -def handle_image(element): +def handle_image(element: _Element) -> Optional[_Element]: "Process image elements and their relevant attributes." processed_element = Element(element.tag) for attr in ("data-src", "src"): - src = element.get(attr) + src = element.get(attr, "") if is_image_file(src): processed_element.set("src", src) break @@ -443,23 +454,24 @@ def handle_image(element): break # additional data - if element.get("alt") is not None: - processed_element.set("alt", element.get("alt")) - if element.get("title") is not None: - processed_element.set("title", element.get("title")) + if alt_attr := element.get("alt"): + processed_element.set("alt", alt_attr) + if title_attr := element.get("title"): + processed_element.set("title", title_attr) # don't return empty elements or elements without source, just None if not processed_element.attrib or not processed_element.get("src"): return None # post-processing: URLs - if not processed_element.get("src").startswith("http"): - processed_element.set("src", re.sub(r"^//", "http://", processed_element.get("src"))) + src_attr = processed_element.get("src", "") + if not src_attr.startswith("http"): + processed_element.set("src", re.sub(r"^//", "http://", src_attr)) return processed_element -def handle_textelem(element, potential_tags, options): +def handle_textelem(element: _Element, potential_tags: Any, options: Any) -> Optional[_Element]: '''Process text element and determine how to deal with its content''' new_element = None # bypass: nested elements @@ -473,10 +485,10 @@ def handle_textelem(element, potential_tags, options): new_element = handle_paragraphs(element, potential_tags, options) elif element.tag == 'lb': if text_chars_test(element.tail) is True: - element = process_node(element, options) - if element is not None: + this_element = process_node(element, options) + if this_element is not None: new_element = Element('p') - new_element.text = element.tail + new_element.text = this_element.tail elif element.tag in FORMATTING: new_element = handle_formatting(element, options) # process_node(element, options) elif element.tag == 'table' and 'table' in potential_tags: @@ -489,7 +501,7 @@ def handle_textelem(element, potential_tags, options): return new_element -def recover_wild_text(tree, result_body, options, potential_tags=TAG_CATALOG): +def recover_wild_text(tree: HtmlElement, result_body: _Element, options: Any, potential_tags: Any = TAG_CATALOG) -> _Element: '''Look for all previously unconsidered wild elements, including outside of the determined frame and throughout the document to recover potentially missing text parts''' LOGGER.debug('Recovering wild text elements') @@ -510,7 +522,7 @@ def recover_wild_text(tree, result_body, options, potential_tags=TAG_CATALOG): return result_body -def prune_unwanted_sections(tree, potential_tags, options): +def prune_unwanted_sections(tree: HtmlElement, potential_tags: Any, options: Any) -> HtmlElement: 'Rule-based deletion of targeted document sections' favor_precision = options.focus == "precision" # prune the rest @@ -544,7 +556,7 @@ def prune_unwanted_sections(tree, potential_tags, options): return tree -def _extract(tree, options): +def _extract(tree: HtmlElement, options: Any) -> Tuple[_Element, str, Any]: # init potential_tags = set(TAG_CATALOG) if options.tables is True: @@ -597,7 +609,7 @@ def _extract(tree, options): return result_body, temp_text, potential_tags -def extract_content(cleaned_tree, options): +def extract_content(cleaned_tree: HtmlElement, options: Any) -> Tuple[_Element, str, int]: '''Find the main content of a page using a set of XPath expressions, then extract relevant elements, strip them of unwanted subparts and convert them''' @@ -620,7 +632,7 @@ def extract_content(cleaned_tree, options): return result_body, temp_text, len(temp_text) -def process_comments_node(elem, potential_tags, options): +def process_comments_node(elem: _Element, potential_tags: Any, options: Any) -> Optional[_Element]: '''Process comment node and determine how to deal with its content''' if elem.tag in potential_tags: # print(elem.tag, elem.text_content()) @@ -634,7 +646,7 @@ def process_comments_node(elem, potential_tags, options): return None -def extract_comments(tree, options): +def extract_comments(tree: HtmlElement, options: Any) -> Tuple[_Element, str, int, HtmlElement]: "Try to extract comments out of potential sections in the HTML." comments_body = Element("body") # define iteration strategy diff --git a/trafilatura/meta.py b/trafilatura/meta.py index eaa2e806..f6fc20cd 100644 --- a/trafilatura/meta.py +++ b/trafilatura/meta.py @@ -6,7 +6,7 @@ from courlan.meta import clear_caches as reset_caches_courlan from htmldate.meta import reset_caches as reset_caches_htmldate -from justext.core import define_stoplist +from justext.core import define_stoplist # type: ignore from .deduplication import LRU_TEST, Simhash, is_similar_domain from .utils import line_processing, return_printables_and_spaces, trim diff --git a/trafilatura/metadata.py b/trafilatura/metadata.py index 510c8995..c501b017 100644 --- a/trafilatura/metadata.py +++ b/trafilatura/metadata.py @@ -8,7 +8,7 @@ from copy import deepcopy from html import unescape -from typing import Any, Dict, List, Optional, Set, Tuple +from typing import Any, Dict, List, Optional, Set, Tuple, Union from courlan import ( extract_domain, @@ -480,23 +480,23 @@ def extract_license(tree: HtmlElement) -> Optional[str]: def extract_metadata( - filecontent: str, + filecontent: Union[HtmlElement, str], default_url: Optional[str] = None, date_config: Optional[Any] = None, extensive: bool = True, author_blacklist: Optional[Set[str]] = None, -) -> Optional[Document]: +) -> Document: """Main process for metadata extraction. Args: - filecontent: HTML code as string. + filecontent: HTML code as string or parsed tree. default_url: Previously known URL of the downloaded document. date_config: Provide extraction parameters to htmldate as dict(). author_blacklist: Provide a blacklist of Author Names as set() to filter out authors. Returns: - A trafilatura.metadata.Document containing the extracted metadata information or None. - trafilatura.metadata.Document has .as_dict() method that will return a copy as a dict. + A trafilatura.settings.Document containing the extracted metadata information or None. + The Document class has .as_dict() method that will return a copy as a dict. """ # init author_blacklist = author_blacklist or set() @@ -505,7 +505,7 @@ def extract_metadata( # load contents tree = load_html(filecontent) if tree is None: - return None + return Document() # initialize dict and try to strip meta tags metadata = examine_meta(tree) diff --git a/trafilatura/readability_lxml.py b/trafilatura/readability_lxml.py index d4bbf853..fc563221 100644 --- a/trafilatura/readability_lxml.py +++ b/trafilatura/readability_lxml.py @@ -22,9 +22,10 @@ from math import sqrt from operator import attrgetter +from typing import Any, Dict, Optional, Set from lxml.etree import tostring -from lxml.html import fragment_fromstring +from lxml.html import HtmlElement, fragment_fromstring from .utils import load_html, trim @@ -34,7 +35,7 @@ DOT_SPACE = re.compile(r"\.( |$)") -def _tostring(string): +def _tostring(string: HtmlElement) -> str: return tostring(string, encoding=str, method="xml") @@ -83,9 +84,9 @@ def _tostring(string): # DIV_TO_P_ELEMS = {'a', 'blockquote', 'dl', 'div', 'img', 'ol', 'p', 'pre', 'table', 'ul'} -def text_length(elem): +def text_length(elem: HtmlElement) -> int: "Return the length of the element with all its contents." - return len(trim(elem.text_content())) + return len(trim(elem.text_content()) or "") class Candidate: @@ -93,9 +94,9 @@ class Candidate: __slots__ = ["score", "elem"] - def __init__(self, score, elem): - self.score = score - self.elem = elem + def __init__(self, score: float, elem: HtmlElement) -> None: + self.score: float = score + self.elem: HtmlElement = elem class Document: @@ -103,7 +104,7 @@ class Document: __slots__ = ["doc", "min_text_length", "retry_length"] - def __init__(self, doc, min_text_length=25, retry_length=250): + def __init__(self, doc: HtmlElement, min_text_length: int = 25, retry_length: int = 250) -> None: """Generate the document :param doc: string of the html content. @@ -120,7 +121,7 @@ def __init__(self, doc, min_text_length=25, retry_length=250): self.min_text_length = min_text_length self.retry_length = retry_length - def summary(self): + def summary(self) -> str: """ Given a HTML file, extracts the text of the article. @@ -165,7 +166,7 @@ def summary(self): continue return cleaned_article - def get_article(self, candidates, best_candidate): + def get_article(self, candidates: Dict[HtmlElement, Candidate], best_candidate: Candidate) -> HtmlElement: # Now that we have the top candidate, look through its siblings for # content that might also be related. # Things like preambles, content split by ads that we removed, etc. @@ -206,7 +207,7 @@ def get_article(self, candidates, best_candidate): # output.append(best_candidate.elem) return output - def select_best_candidate(self, candidates): + def select_best_candidate(self, candidates: Dict[HtmlElement, Candidate]) -> Optional[Candidate]: if not candidates: return None sorted_candidates = sorted( @@ -217,12 +218,12 @@ def select_best_candidate(self, candidates): LOGGER.debug("Top 5: %s %s", candidate.elem.tag, candidate.score) return next(iter(sorted_candidates)) - def get_link_density(self, elem): + def get_link_density(self, elem: HtmlElement) -> float: total_length = text_length(elem) or 1 link_length = sum(text_length(link) for link in elem.findall(".//a")) return link_length / total_length - def score_paragraphs(self): + def score_paragraphs(self) -> Dict[HtmlElement, Candidate]: candidates = {} for elem in self.doc.iter("p", "pre", "td"): @@ -231,7 +232,7 @@ def score_paragraphs(self): continue grand_parent_node = parent_node.getparent() - elem_text = trim(elem.text_content()) + elem_text = trim(elem.text_content()) or "" elem_text_len = len(elem_text) # discard too short paragraphs @@ -258,7 +259,7 @@ def score_paragraphs(self): return candidates - def class_weight(self, elem): + def class_weight(self, elem: HtmlElement) -> float: weight = 0 for attribute in filter(None, (elem.get("class"), elem.get("id"))): if REGEXES["negativeRe"].search(attribute): @@ -267,7 +268,7 @@ def class_weight(self, elem): weight += 25 return weight - def score_node(self, elem): + def score_node(self, elem: HtmlElement) -> Candidate: score = self.class_weight(elem) name = elem.tag.lower() if name in DIV_SCORES: @@ -280,7 +281,7 @@ def score_node(self, elem): score -= 5 return Candidate(score, elem) - def remove_unlikely_candidates(self): + def remove_unlikely_candidates(self) -> None: for elem in self.doc.findall(".//*"): attrs = " ".join(filter(None, (elem.get("class"), elem.get("id")))) if len(attrs) < 2: @@ -293,7 +294,7 @@ def remove_unlikely_candidates(self): # LOGGER.debug("Removing unlikely candidate: %s", elem.tag) elem.drop_tree() - def transform_misused_divs_into_paragraphs(self): + def transform_misused_divs_into_paragraphs(self) -> None: for elem in self.doc.findall(".//div"): # transform
s that do not contain other block elements into #

s @@ -322,7 +323,7 @@ def transform_misused_divs_into_paragraphs(self): if child.tag == "br": child.drop_tree() - def sanitize(self, node, candidates): + def sanitize(self, node: HtmlElement, candidates: Dict[HtmlElement, Candidate]) -> str: for header in node.iter("h1", "h2", "h3", "h4", "h5", "h6"): if self.class_weight(header) < 0 or self.get_link_density(header) > 0.33: header.drop_tree() @@ -336,7 +337,7 @@ def sanitize(self, node, candidates): else: elem.drop_tree() - allowed = set() + allowed: Set[HtmlElement] = set() # Conditionally clean s,
    s, and
    s for elem in reversed( node.xpath("//table|//ul|//div|//aside|//header|//footer|//section") @@ -445,13 +446,10 @@ def sanitize(self, node, candidates): return _tostring(self.doc) -""" -Port of isProbablyReaderable from mozilla/readability.js to Python. -https://github.com/mozilla/readability - -License of forked code: Apache-2.0. -""" +# Port of isProbablyReaderable from mozilla/readability.js to Python. +# https://github.com/mozilla/readability +# License of forked code: Apache-2.0. REGEXPS = { "unlikelyCandidates": re.compile( @@ -466,12 +464,12 @@ def sanitize(self, node, candidates): DISPLAY_NONE = re.compile(r"display:\s*none", re.I) -def is_node_visible(node): +def is_node_visible(node: HtmlElement) -> bool: """ Checks if the node is visible by considering style, attributes, and class. """ - if "style" in node.attrib and DISPLAY_NONE.search(node.get("style")): + if "style" in node.attrib and DISPLAY_NONE.search(node.get("style", "")): return False if "hidden" in node.attrib: return False @@ -482,11 +480,13 @@ def is_node_visible(node): return True -def is_probably_readerable(html, options={}): +def is_probably_readerable(html: HtmlElement, options: Any={}) -> bool: """ Decides whether or not the document is reader-able without parsing the whole thing. """ doc = load_html(html) + if doc is None: + return False min_content_length = options.get("min_content_length", 140) min_score = options.get("min_score", 20) @@ -495,7 +495,7 @@ def is_probably_readerable(html, options={}): nodes = set(doc.xpath(".//p | .//pre | .//article")) nodes.update(node.getparent() for node in doc.xpath(".//div/br")) - score = 0 + score = 0.0 for node in nodes: if not visibility_checker(node): continue diff --git a/trafilatura/settings.cfg b/trafilatura/settings.cfg index 160f5909..11a07895 100644 --- a/trafilatura/settings.cfg +++ b/trafilatura/settings.cfg @@ -7,7 +7,7 @@ DOWNLOAD_TIMEOUT = 30 MAX_FILE_SIZE = 20000000 MIN_FILE_SIZE = 10 # sleep between requests -SLEEP_TIME = 5 +SLEEP_TIME = 5.0 # user-agents here: agent1,agent2,... USER_AGENTS = # cookie for HTTP requests diff --git a/trafilatura/settings.py b/trafilatura/settings.py index 8ff24e58..e3add72e 100644 --- a/trafilatura/settings.py +++ b/trafilatura/settings.py @@ -6,14 +6,14 @@ from configparser import ConfigParser from datetime import datetime from html import unescape -from typing import Any, Dict, Optional +from typing import Any, Dict, Optional, Set try: from os import sched_getaffinity - HAS_SCHED = True + CPU_COUNT = len(sched_getaffinity(0)) except ImportError: from os import cpu_count - HAS_SCHED = False + CPU_COUNT = cpu_count() from pathlib import Path @@ -26,7 +26,9 @@ SUPPORTED_FORMATS = set(SUPPORTED_FMT_CLI) | {"python"} # for bare_extraction() only -def use_config(filename=None, config=None): +def use_config( + filename: Optional[str] = None, config: Optional[ConfigParser] = None +) -> ConfigParser: """ Use configuration object or read and parse a settings file. """ @@ -46,64 +48,109 @@ def use_config(filename=None, config=None): DEFAULT_CONFIG = use_config() CONFIG_MAPPING = { - 'min_extracted_size': 'MIN_EXTRACTED_SIZE', - 'min_output_size': 'MIN_OUTPUT_SIZE', - 'min_output_comm_size': 'MIN_OUTPUT_COMM_SIZE', - 'min_extracted_comm_size': 'MIN_EXTRACTED_COMM_SIZE', - 'min_duplcheck_size': 'MIN_DUPLCHECK_SIZE', - 'max_repetitions': 'MAX_REPETITIONS', - 'max_file_size': 'MAX_FILE_SIZE', - 'min_file_size': 'MIN_FILE_SIZE' + "min_extracted_size": "MIN_EXTRACTED_SIZE", + "min_output_size": "MIN_OUTPUT_SIZE", + "min_output_comm_size": "MIN_OUTPUT_COMM_SIZE", + "min_extracted_comm_size": "MIN_EXTRACTED_COMM_SIZE", + "min_duplcheck_size": "MIN_DUPLCHECK_SIZE", + "max_repetitions": "MAX_REPETITIONS", + "max_file_size": "MAX_FILE_SIZE", + "min_file_size": "MIN_FILE_SIZE", } class Extractor: "Defines a class to store all extraction options." __slots__ = [ - 'config', - # general - 'format', 'fast', 'focus', 'comments', - 'formatting', 'links', 'images', 'tables', 'dedup', 'lang', - # extraction size - 'min_extracted_size', 'min_output_size', - 'min_output_comm_size', 'min_extracted_comm_size', - # deduplication - 'min_duplcheck_size', 'max_repetitions', - # rest - 'max_file_size', 'min_file_size', 'max_tree_size', - # meta - 'source', 'url', 'with_metadata', 'only_with_metadata', 'tei_validation', - 'date_params', - 'author_blacklist', 'url_blacklist' + "config", + # general + "format", + "fast", + "focus", + "comments", + "formatting", + "links", + "images", + "tables", + "dedup", + "lang", + # extraction size + "min_extracted_size", + "min_output_size", + "min_output_comm_size", + "min_extracted_comm_size", + # deduplication + "min_duplcheck_size", + "max_repetitions", + # rest + "max_file_size", + "min_file_size", + "max_tree_size", + # meta + "source", + "url", + "with_metadata", + "only_with_metadata", + "tei_validation", + "date_params", + "author_blacklist", + "url_blacklist", ] - def __init__(self, *, config=DEFAULT_CONFIG, output_format="txt", - fast=False, precision=False, recall=False, - comments=True, formatting=False, links=False, images=False, - tables=True, dedup=False, lang=None, max_tree_size=None, - url=None, source=None, with_metadata=False, only_with_metadata=False, tei_validation=False, - author_blacklist=None, url_blacklist=None, date_params=None): + + def __init__( + self, + *, + config: ConfigParser = DEFAULT_CONFIG, + output_format: str = "txt", + fast: bool = False, + precision: bool = False, + recall: bool = False, + comments: bool = True, + formatting: bool = False, + links: bool = False, + images: bool = False, + tables: bool = True, + dedup: bool = False, + lang: Optional[str] = None, + max_tree_size: Optional[int] = None, + url: Optional[str] = None, + source: Optional[str] = None, + with_metadata: bool = False, + only_with_metadata: bool = False, + tei_validation: bool = False, + author_blacklist: Optional[Set[str]] = None, + url_blacklist: Optional[Set[str]] = None, + date_params: Optional[Dict[str, str]] = None, + ): self._set_source(url, source) self._set_format(output_format) self._add_config(config) - self.fast = fast - self.focus = "recall" if recall else "precision" if precision else "balanced" - self.comments = comments - self.formatting = formatting or self.format == "markdown" - self.links = links - self.images = images - self.tables = tables - self.dedup = dedup - self.lang = lang - self.max_tree_size = max_tree_size - self.url = url - self.only_with_metadata = only_with_metadata - self.tei_validation = tei_validation - self.author_blacklist = author_blacklist or set() - self.url_blacklist = url_blacklist or set() - self.with_metadata = (with_metadata or only_with_metadata or - url_blacklist or output_format == "xmltei") - self.date_params = (date_params or - set_date_params(self.config.getboolean('DEFAULT', 'EXTENSIVE_DATE_SEARCH'))) + self.fast: bool = fast + self.focus: str = ( + "recall" if recall else "precision" if precision else "balanced" + ) + self.comments: bool = comments + self.formatting: bool = formatting or self.format == "markdown" + self.links: bool = links + self.images: bool = images + self.tables: bool = tables + self.dedup: bool = dedup + self.lang: Optional[str] = lang + self.max_tree_size: Optional[int] = max_tree_size + self.url: Optional[str] = url + self.only_with_metadata: bool = only_with_metadata + self.tei_validation: bool = tei_validation + self.author_blacklist: Set[str] = author_blacklist or set() + self.url_blacklist: Set[str] = url_blacklist or set() + self.with_metadata: bool = ( + with_metadata + or only_with_metadata + or bool(url_blacklist) + or output_format == "xmltei" + ) + self.date_params: Dict[str, Any] = date_params or set_date_params( + self.config.getboolean("DEFAULT", "EXTENSIVE_DATE_SEARCH") + ) def _set_source(self, url: Optional[str], source: Optional[str]) -> None: "Set the source attribute in a robust way." @@ -113,69 +160,96 @@ def _set_source(self, url: Optional[str], source: Optional[str]) -> None: def _set_format(self, chosen_format: str) -> None: "Store the format if supported and raise an error otherwise." if chosen_format not in SUPPORTED_FORMATS: - raise AttributeError(f"Cannot set format, must be one of: {', '.join(sorted(SUPPORTED_FORMATS))}") + raise AttributeError( + f"Cannot set format, must be one of: {', '.join(sorted(SUPPORTED_FORMATS))}" + ) self.format = chosen_format - def _add_config(self, config): + def _add_config(self, config: ConfigParser) -> None: "Store options loaded from config file." for key, value in CONFIG_MAPPING.items(): - setattr(self, key, config.getint('DEFAULT', value)) + setattr(self, key, config.getint("DEFAULT", value)) self.config = config def args_to_extractor(args: Any, url: Optional[str] = None) -> Extractor: "Derive extractor configuration from CLI args." options = Extractor( - config=use_config(filename=args.config_file), output_format=args.output_format, - formatting=args.formatting, - precision=args.precision, recall=args.recall, - comments=args.no_comments, tables=args.no_tables, - dedup=args.deduplicate, lang=args.target_language, url=url, - with_metadata=args.with_metadata, only_with_metadata=args.only_with_metadata, - tei_validation=args.validate_tei - ) + config=use_config(filename=args.config_file), + output_format=args.output_format, + formatting=args.formatting, + precision=args.precision, + recall=args.recall, + comments=args.no_comments, + tables=args.no_tables, + dedup=args.deduplicate, + lang=args.target_language, + url=url, + with_metadata=args.with_metadata, + only_with_metadata=args.only_with_metadata, + tei_validation=args.validate_tei, + ) for attr in ("fast", "images", "links"): setattr(options, attr, getattr(args, attr)) return options -def set_date_params(extensive: bool = True): +def set_date_params(extensive: bool = True) -> Dict[str, Any]: "Provide default parameters for date extraction." return { - "original_date": True, - "extensive_search": extensive, - "max_date": datetime.now().strftime("%Y-%m-%d") - } + "original_date": True, + "extensive_search": extensive, + "max_date": datetime.now().strftime("%Y-%m-%d"), + } class Document: "Defines a class to store all necessary data and metadata fields for extracted information." __slots__ = [ - 'title', 'author', 'url', 'hostname', 'description', 'sitename', - 'date', 'categories', 'tags', 'fingerprint', 'id', 'license', - 'body', 'comments', 'commentsbody', 'raw_text', 'text', - 'language', 'image', 'pagetype', 'filedate' # 'locale'? + "title", + "author", + "url", + "hostname", + "description", + "sitename", + "date", + "categories", + "tags", + "fingerprint", + "id", + "license", + "body", + "comments", + "commentsbody", + "raw_text", + "text", + "language", + "image", + "pagetype", + "filedate", + # 'locale'? ] + def __init__(self) -> None: for slot in self.__slots__: setattr(self, slot, None) def __getattr__(self, name: str) -> None: - raise AttributeError("% attribute not present in Document", name) + raise AttributeError(f"{name} attribute not present in Document") - def __setattr__(self, name: str, value) -> None: + def __setattr__(self, name: str, value: Any) -> None: if name in self.__slots__: object.__setattr__(self, name, value) @classmethod - def from_dict(cls, data: dict): + def from_dict(cls: Any, data: Dict[str, Any]) -> Any: "Set a series of attributes using a dictionary." doc = cls() for key, value in data.items(): setattr(doc, key, value) return doc - def set_attributes(self, **kwargs) -> None: + def set_attributes(self, **kwargs: Optional[Dict[str, Any]]) -> None: "Helper function to (re-)set a series of attributes." for key, value in kwargs.items(): if value: @@ -188,21 +262,18 @@ def clean_and_trim(self) -> None: if isinstance(value, str): # length if len(value) > 10000: - value = value[:9999] + '…' + value = value[:9999] + "…" # HTML entities, remove spaces and control characters value = line_processing(unescape(value)) setattr(self, slot, value) def as_dict(self) -> Dict[str, Optional[str]]: "Convert the document to a dictionary." - return { - attr: getattr(self, attr, None) - for attr in self.__slots__ - } + return {attr: getattr(self, attr, None) for attr in self.__slots__} # Safety checks -PARALLEL_CORES = min(len(sched_getaffinity(0)) if HAS_SCHED else cpu_count(), 16) # 16 processes at most +PARALLEL_CORES = min(CPU_COUNT, 16) # 16 processes at most LRU_SIZE = 4096 # Files @@ -215,72 +286,159 @@ def as_dict(self) -> Dict[str, Optional[str]]: # filters -CUT_EMPTY_ELEMS = {'article', 'b', 'blockquote', 'dd', 'div', 'dt', 'em', - 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'i', 'li', 'main', - 'p', 'pre', 'q', 'section', 'span', 'strong'} - # 'meta', 'td', 'a', 'caption', 'dl', 'header', - # 'colgroup', 'col', -#CUT_EMPTY_ELEMS = {'div', 'span'} +CUT_EMPTY_ELEMS = { + "article", + "b", + "blockquote", + "dd", + "div", + "dt", + "em", + "h1", + "h2", + "h3", + "h4", + "h5", + "h6", + "i", + "li", + "main", + "p", + "pre", + "q", + "section", + "span", + "strong", +} +# 'meta', 'td', 'a', 'caption', 'dl', 'header', +# 'colgroup', 'col', +# CUT_EMPTY_ELEMS = {'div', 'span'} # order could matter, using lists to keep extraction deterministic MANUALLY_CLEANED = [ # important - 'aside', 'embed', 'footer', 'form', 'head', 'iframe', 'menu', 'object', 'script', + "aside", + "embed", + "footer", + "form", + "head", + "iframe", + "menu", + "object", + "script", # other content - 'applet', 'audio', 'canvas', 'figure', 'map', 'picture', 'svg', 'video', + "applet", + "audio", + "canvas", + "figure", + "map", + "picture", + "svg", + "video", # secondary - 'area', 'blink', 'button', 'datalist', 'dialog', - 'frame', 'frameset', 'fieldset', 'link', 'input', 'ins', 'label', 'legend', - 'marquee', 'math', 'menuitem', 'nav', 'noscript', 'optgroup', 'option', - 'output', 'param', 'progress', 'rp', 'rt', 'rtc', 'select', 'source', - 'style', 'track', 'textarea', 'time', 'use', + "area", + "blink", + "button", + "datalist", + "dialog", + "frame", + "frameset", + "fieldset", + "link", + "input", + "ins", + "label", + "legend", + "marquee", + "math", + "menuitem", + "nav", + "noindex", + "noscript", + "optgroup", + "option", + "output", + "param", + "progress", + "rp", + "rt", + "rtc", + "select", + "source", + "style", + "track", + "textarea", + "time", + "use", ] # 'meta', 'hr', 'img', 'data', 'details', 'summary' MANUALLY_STRIPPED = [ - 'abbr', 'acronym', 'address', 'bdi', 'bdo', 'big', 'cite', 'data', 'dfn', - 'font', 'hgroup', 'img', 'ins', 'mark', 'meta', 'ruby', 'small', 'tbody', - 'template', 'tfoot', 'thead', + "abbr", + "acronym", + "address", + "bdi", + "bdo", + "big", + "cite", + "data", + "dfn", + "font", + "hgroup", + "img", + "ins", + "mark", + "meta", + "ruby", + "small", + "tbody", + "template", + "tfoot", + "thead", ] # 'center', 'rb', 'wbr' -BASIC_CLEAN_XPATH = XPath(".//aside|.//div[contains(@class|@id, 'footer')]|.//footer|.//script|.//style") +BASIC_CLEAN_XPATH = XPath( + ".//aside|.//div[contains(@class|@id, 'footer')]|.//footer|.//script|.//style" +) -TAG_CATALOG = frozenset(['blockquote', 'code', 'del', 'head', 'hi', 'lb', 'list', 'p', 'pre', 'quote']) +TAG_CATALOG = frozenset( + ["blockquote", "code", "del", "head", "hi", "lb", "list", "p", "pre", "quote"] +) # + list(CUT_EMPTY_ELEMS) JUSTEXT_LANGUAGES = { - 'ar': 'Arabic', - 'bg': 'Bulgarian', - 'cz': 'Czech', - 'da': 'Danish', - 'de': 'German', - 'en': 'English', - 'el': 'Greek', - 'es': 'Spanish', - 'fa': 'Persian', - 'fi': 'Finnish', - 'fr': 'French', - 'hr': 'Croatian', - 'hu': 'Hungarian', + "ar": "Arabic", + "bg": "Bulgarian", + "cz": "Czech", + "da": "Danish", + "de": "German", + "en": "English", + "el": "Greek", + "es": "Spanish", + "fa": "Persian", + "fi": "Finnish", + "fr": "French", + "hr": "Croatian", + "hu": "Hungarian", # 'ja': '', - 'ko': 'Korean', - 'id': 'Indonesian', - 'it': 'Italian', - 'no': 'Norwegian_Nynorsk', - 'nl': 'Dutch', - 'pl': 'Polish', - 'pt': 'Portuguese', - 'ro': 'Romanian', - 'ru': 'Russian', - 'sk': 'Slovak', - 'sl': 'Slovenian', - 'sr': 'Serbian', - 'sv': 'Swedish', - 'tr': 'Turkish', - 'uk': 'Ukrainian', - 'ur': 'Urdu', - 'vi': 'Vietnamese', + "ko": "Korean", + "id": "Indonesian", + "it": "Italian", + "no": "Norwegian_Nynorsk", + "nl": "Dutch", + "pl": "Polish", + "pt": "Portuguese", + "ro": "Romanian", + "ru": "Russian", + "sk": "Slovak", + "sl": "Slovenian", + "sr": "Serbian", + "sv": "Swedish", + "tr": "Turkish", + "uk": "Ukrainian", + "ur": "Urdu", + "vi": "Vietnamese", # 'zh': '', } diff --git a/trafilatura/sitemaps.py b/trafilatura/sitemaps.py index d6bb137c..0eb52a08 100644 --- a/trafilatura/sitemaps.py +++ b/trafilatura/sitemaps.py @@ -82,7 +82,7 @@ def __init__( def fetch(self) -> None: "Fetch a sitemap over the network." LOGGER.debug("fetching sitemap: %s", self.current_url) - self.content = fetch_url(self.current_url) + self.content = fetch_url(self.current_url) or "" self.seen.add(self.current_url) def handle_link(self, link: str) -> None: @@ -92,9 +92,9 @@ def handle_link(self, link: str) -> None: return # fix, check, clean and normalize link = fix_relative_urls(self.base_url, link) - link = clean_url(link, self.target_lang) + link = clean_url(link, self.target_lang) or "" - if link is None or not lang_filter(link, self.target_lang): + if not link or not lang_filter(link, self.target_lang): return newdomain = extract_domain(link, fast=True) @@ -180,7 +180,7 @@ def sitemap_search( url: str, target_lang: Optional[str] = None, external: bool = False, - sleep_time: int = 2, + sleep_time: float = 2.0, max_sitemaps: int = MAX_SITEMAPS_SEEN, ) -> List[str]: """Look for sitemaps for the given URL and gather links. @@ -290,12 +290,12 @@ def extract_robots_sitemaps(robotstxt: Optional[str], baseurl: str) -> List[str] line = line.strip() if not line: continue - line = line.split(":", 1) - if len(line) == 2: - line[0] = line[0].strip().lower() - if line[0] == "sitemap": + line_parts = line.split(":", 1) + if len(line_parts) == 2: + line_parts[0] = line_parts[0].strip().lower() + if line_parts[0] == "sitemap": # urllib.parse.unquote(line[1].strip()) - candidates.append(line[1].strip()) + candidates.append(line_parts[1].strip()) candidates = list(dict.fromkeys(candidates)) sitemapurls = [fix_relative_urls(baseurl, u) for u in candidates if u] diff --git a/trafilatura/spider.py b/trafilatura/spider.py index a41d441d..1abeda04 100644 --- a/trafilatura/spider.py +++ b/trafilatura/spider.py @@ -20,7 +20,7 @@ ) try: - import py3langid + import py3langid # type: ignore except ImportError: pass @@ -206,10 +206,11 @@ def process_links( if htmlstring and params.prune_xpath is not None: if isinstance(params.prune_xpath, str): - params.prune_xpath = [params.prune_xpath] + params.prune_xpath = [params.prune_xpath] # type: ignore[assignment] tree = load_html(htmlstring) - tree = prune_unwanted_nodes(tree, [XPath(x) for x in params.prune_xpath]) - htmlstring = tostring(tree).decode() + if tree is not None: + tree = prune_unwanted_nodes(tree, [XPath(x) for x in params.prune_xpath]) + htmlstring = tostring(tree).decode() links, links_priority = [], [] for link in extract_links( diff --git a/trafilatura/utils.py b/trafilatura/utils.py index bc37b834..ec6c5cb3 100644 --- a/trafilatura/utils.py +++ b/trafilatura/utils.py @@ -21,32 +21,32 @@ from functools import lru_cache from itertools import islice -from typing import Any, Optional +from typing import Any, List, Literal, Optional, Tuple, Union from unicodedata import normalize # response compression try: - import brotli + import brotli # type: ignore HAS_BROTLI = True except ImportError: HAS_BROTLI = False try: - import zstandard + import zstandard # type: ignore HAS_ZSTD = True except ImportError: HAS_ZSTD = False # language detection try: - import py3langid + import py3langid # type: ignore LANGID_FLAG = True except ImportError: LANGID_FLAG = False # CChardet is faster and can be more accurate try: - from cchardet import detect as cchardet_detect + from cchardet import detect as cchardet_detect # type: ignore except ImportError: cchardet_detect = None @@ -91,7 +91,7 @@ # COMMENTS_BLACKLIST = ('( Abmelden / Ändern )') # Fill in your details below|Trage deine Daten unten|Kommentar verfassen|Bitte logge dich|Hinterlasse einen Kommentar| to %s| mit %s) -def handle_compressed_file(filecontent): +def handle_compressed_file(filecontent: bytes) -> Union[bytes, str]: """ Don't trust response headers and try to decompress a binary string with a cascade of installed packages. Use magic numbers when available. @@ -128,7 +128,7 @@ def handle_compressed_file(filecontent): return filecontent -def isutf8(data): +def isutf8(data: bytes) -> bool: """Simple heuristic to determine if a bytestring uses standard unicode encoding""" try: data.decode('UTF-8') @@ -137,7 +137,7 @@ def isutf8(data): return True -def detect_encoding(bytesobject): +def detect_encoding(bytesobject: bytes) -> List[str]: """"Read all input or first chunk and return a list of encodings""" # alternatives: https://github.com/scrapy/w3lib/blob/master/w3lib/encoding.py # unicode-test @@ -162,14 +162,15 @@ def detect_encoding(bytesobject): return [g for g in guesses if g not in UNICODE_ALIASES] -def decode_file(filecontent) -> str: +def decode_file(filecontent: Union[bytes, str]) -> str: """Check if the bytestring could be GZip and eventually decompress it, guess bytestring encoding and try to decode to Unicode string. Resort to destructive conversion otherwise.""" - # init if isinstance(filecontent, str): return filecontent + htmltext = None + # GZip and Brotli test filecontent = handle_compressed_file(filecontent) # encoding @@ -181,6 +182,7 @@ def decode_file(filecontent) -> str: htmltext = None else: break + # return original content if nothing else succeeded return htmltext or str(filecontent, encoding='utf-8', errors='replace') @@ -206,7 +208,7 @@ def repair_faulty_html(htmlstring: str, beginning: str) -> str: return htmlstring -def fromstring_bytes(htmlobject): +def fromstring_bytes(htmlobject: str) -> Optional[HtmlElement]: "Try to pass bytes to LXML parser." tree = None try: @@ -262,23 +264,23 @@ def load_html(htmlobject: Any) -> Optional[HtmlElement]: @lru_cache(maxsize=2**14) # sys.maxunicode = 1114111 -def return_printables_and_spaces(char): +def return_printables_and_spaces(char: str) -> str: 'Return a character if it belongs to certain classes' return char if char.isprintable() or char.isspace() else '' -def remove_control_characters(string): +def remove_control_characters(string: str) -> str: '''Prevent non-printable and XML invalid character errors''' return ''.join(map(return_printables_and_spaces, string)) -def normalize_unicode(string, unicodeform='NFC'): +def normalize_unicode(string: str, unicodeform: Literal['NFC', 'NFD', 'NFKC', 'NFKD'] = 'NFC') -> str: 'Normalize the given string to the specified unicode format.' return normalize(unicodeform, string) @lru_cache(maxsize=1024) -def line_processing(line, preserve_space=False, trailing_space=False): +def line_processing(line: str, preserve_space: bool = False, trailing_space: bool = False) -> Optional[str]: '''Remove HTML space entities, then discard incompatible unicode and invalid XML characters on line level''' # spacing HTML entities: https://www.w3.org/MarkUp/html-spec/html-spec_13.html @@ -287,10 +289,10 @@ def line_processing(line, preserve_space=False, trailing_space=False): if not preserve_space: # remove newlines that are not related to punctuation or markup # remove non-printable chars and normalize space characters (including Unicode spaces) - new_line = trim(LINES_TRIMMING.sub(r" ", new_line)) + new_line = trim(LINES_TRIMMING.sub(r" ", new_line)) # type: ignore[assignment] # prune empty lines if all(map(str.isspace, new_line)): - new_line = None + new_line = None # type: ignore[assignment] elif trailing_space: space_before = " " if line[0].isspace() else "" space_after = " " if line[-1].isspace() else "" @@ -346,7 +348,7 @@ def trim(string: str) -> Optional[str]: return None -def is_image_file(imagesrc): +def is_image_file(imagesrc: Optional[str]) -> bool: '''Check if the observed string corresponds to a valid image extension. Use a length threshold and apply a regex on the content.''' if imagesrc is None or len(imagesrc) > 8192: @@ -354,7 +356,7 @@ def is_image_file(imagesrc): return bool(IMAGE_EXTENSION.search(imagesrc)) -def make_chunks(iterable, n): +def make_chunks(iterable: Any, n: int) -> Any: "Chunk data into smaller pieces." # 3.12+: https://docs.python.org/3/library/itertools.html#itertools.batched iterator = iter(iterable) @@ -362,7 +364,7 @@ def make_chunks(iterable, n): yield batch -def is_acceptable_length(my_len, options) -> bool: +def is_acceptable_length(my_len: int, options: Any) -> bool: "Check if the document length is within acceptable boundaries." if my_len < options.min_file_size: LOGGER.error("too small/incorrect for URL %s", options.url) @@ -373,7 +375,7 @@ def is_acceptable_length(my_len, options) -> bool: return True -def check_html_lang(tree, target_language, strict=False): +def check_html_lang(tree: HtmlElement, target_language: str, strict: bool = False) -> bool: """Check HTML meta-elements for language information and split the result in case there are several languages.""" for attr in TARGET_LANG_ATTRS: @@ -397,7 +399,7 @@ def check_html_lang(tree, target_language, strict=False): return True -def language_classifier(temp_text, temp_comments): +def language_classifier(temp_text: str, temp_comments: str) -> Optional[str]: '''Run external component (if installed) for language identification''' if LANGID_FLAG is True: result, _ = ( @@ -411,7 +413,7 @@ def language_classifier(temp_text, temp_comments): return result -def language_filter(temp_text, temp_comments, target_language, docmeta): +def language_filter(temp_text: str, temp_comments: str, target_language: str, docmeta: Any) -> Tuple[bool, Any]: '''Filter text based on language detection and store relevant information''' # todo: run and pass info along anyway? if target_language is not None: @@ -432,11 +434,11 @@ def textfilter(element: _Element) -> bool: '''Filter out unwanted text''' testtext = element.tail if element.text is None else element.text # to check: line len → continue if len(line) <= 5 - return not text_chars_test(testtext) or any(map(RE_FILTER.match, testtext.splitlines())) + return not testtext or testtext.isspace() or any(map(RE_FILTER.match, testtext.splitlines())) def text_chars_test(string: Optional[str]) -> bool: '''Determine if a string is only composed of spaces and/or control characters''' # or not re.search(r'\w', string) # return string is not None and len(string) != 0 and not string.isspace() - return bool(string) and not string.isspace() + return bool(string) and not string.isspace() # type: ignore[union-attr] diff --git a/trafilatura/xml.py b/trafilatura/xml.py index ec9efe2f..e90a9fc3 100644 --- a/trafilatura/xml.py +++ b/trafilatura/xml.py @@ -120,8 +120,8 @@ def build_json_output(docmeta: Document, with_metadata: bool = True) -> str: 'source': outputdict.pop('url'), 'source-hostname': outputdict.pop('sitename'), 'excerpt': outputdict.pop('description'), - 'categories': ';'.join(outputdict.pop('categories')), - 'tags': ';'.join(outputdict.pop('tags')), + 'categories': ';'.join(outputdict.pop('categories') or []), + 'tags': ';'.join(outputdict.pop('tags') or []), 'text': xmltotxt(outputdict.pop('body'), include_formatting=False), }) commentsbody = outputdict.pop('commentsbody') @@ -350,8 +350,11 @@ def process_element(element: _Element, returnlist: List[str], include_formatting returnlist.append(element.tail) -def xmltotxt(xmloutput: _Element, include_formatting: bool) -> str: +def xmltotxt(xmloutput: Optional[_Element], include_formatting: bool) -> str: "Convert to plain text format and optionally preserve formatting as markdown." + if xmloutput is None: + return "" + returnlist: List[str] = [] process_element(xmloutput, returnlist, include_formatting) @@ -586,7 +589,7 @@ def _move_element_one_level_up(element: _Element) -> None: return new_elem = Element("p") - new_elem.extend(sibling for sibling in element.itersiblings()) + new_elem.extend(list(element.itersiblings())) grand_parent.insert(grand_parent.index(parent) + 1, element)