Title
This here is in bold font.
diff --git a/tests/comparison.py b/tests/comparison.py index 1cd52599..259a183f 100644 --- a/tests/comparison.py +++ b/tests/comparison.py @@ -33,8 +33,9 @@ from trafilatura import extract try: - from trafilatura.core import baseline + from trafilatura import baseline except ImportError: + print("Cannot import baseline, using simple version") baseline = None from evaldata import EVAL_PAGES diff --git a/tests/comparison_small.py b/tests/comparison_small.py index bb3bcfe5..b0b17ce5 100644 --- a/tests/comparison_small.py +++ b/tests/comparison_small.py @@ -23,8 +23,9 @@ from trafilatura import extract try: - from trafilatura.core import baseline, html2txt + from trafilatura import baseline, html2txt except ImportError: + print("Cannot import baseline, using simple version") baseline = None html2txt = None #from trafilatura.htmlprocessing import prune_html @@ -155,8 +156,7 @@ def run_baseline(htmlstring): if baseline is not None: _, result, _ = baseline(htmlstring) return result - result = run_baseline_2(htmlstring) - return result + return run_baseline_2(htmlstring) def run_trafilatura(htmlstring): diff --git a/tests/unit_tests.py b/tests/unit_tests.py index c65248f1..db88158c 100644 --- a/tests/unit_tests.py +++ b/tests/unit_tests.py @@ -31,13 +31,13 @@ process_record, utils, xml) from trafilatura.core import (Extractor, handle_formatting, handle_image, handle_lists, handle_paragraphs, handle_quotes, - handle_table, handle_textelem, sanitize_tree, - trim) -from trafilatura.external import try_justext + handle_table, handle_textelem) +from trafilatura.external import sanitize_tree, try_justext from trafilatura.filters import textfilter from trafilatura.meta import reset_caches from trafilatura.metadata import Document from trafilatura.settings import DEFAULT_CONFIG, TAG_CATALOG, use_config +from trafilatura.utils import trim logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) @@ -116,6 +116,12 @@ def test_trim(): def test_input(): '''test if loaded strings/trees are handled properly''' + teststring = "高山云雾出好茶".encode("utf-8") + assert utils.detect_encoding(teststring) == ["utf-8"] + teststring = "高山云雾出好茶".encode("gb18030") + assert "gb18030" in utils.detect_encoding(teststring) + assert "gb18030" in utils.detect_encoding(teststring*1000) + assert utils.is_dubious_html("This is a string.") is True htmlstring = "\n" @@ -147,7 +153,8 @@ def test_input(): # old: with pytest.raises(TypeError) as err: assert extract(None, 'url', '0000', target_language=None) is None # legacy - assert process_record(None, 'url', '0000', target_language=None) is None + with pytest.raises(SystemExit): + assert process_record(None, 'url', '0000', target_language=None) is None # GZip with open(os.path.join(RESOURCES_DIR, 'webpage.html.gz'), 'rb') as gzfile: myinput = gzfile.read() @@ -293,21 +300,29 @@ def test_formatting(): my_document = html.fromstring('
This here is in bold font.
Paragraph 1
Paragraph 2
Here is a code sample:
import trafilatura
Here is a code sample:
import trafilatura\ntrafilatura.extract("")
This here is in bold and italic font.
') my_result = extract(my_document, output_format='xml', include_formatting=True, config=ZERO_CONFIG) @@ -1255,7 +1270,17 @@ def test_lang_detection(): assert detected == sample['expected'], f"Lang detection failed for {sample['expected']}" +def test_config_loading(): + "Check if the config file is read correctly." + with pytest.raises(FileNotFoundError): + config = use_config(filename="/bogus-dir/bogus-file.txt") + + config = use_config(filename=os.path.join(RESOURCES_DIR, "newsettings.cfg")) + assert config is not None + + if __name__ == '__main__': + test_config_loading() test_trim() test_input() test_formatting() diff --git a/trafilatura/__init__.py b/trafilatura/__init__.py index c47038fe..f0fecc8b 100644 --- a/trafilatura/__init__.py +++ b/trafilatura/__init__.py @@ -14,7 +14,8 @@ import logging -from .core import bare_extraction, baseline, extract, html2txt, process_record +from .baseline import baseline, html2txt +from .core import bare_extraction, extract, process_record from .downloads import fetch_response, fetch_url from .metadata import extract_metadata from .utils import load_html diff --git a/trafilatura/baseline.py b/trafilatura/baseline.py new file mode 100644 index 00000000..4c17b478 --- /dev/null +++ b/trafilatura/baseline.py @@ -0,0 +1,101 @@ +# pylint:disable-msg=E0611 +import re + +from lxml.etree import Element, SubElement + +from .settings import BASIC_CLEAN_XPATH +from .utils import load_html, trim + + +JSON_SEARCH = re.compile(r'"articlebody": *"(.+?)(? element containing the extracted paragraphs, + the main text as string, and its length as integer. + + """ + tree = load_html(filecontent) + postbody = Element('body') + if tree is None: + return postbody, '', 0 + # scrape from json text + for elem in tree.iterfind('.//script[@type="application/ld+json"]'): + if elem.text and '"article' in elem.text: + mymatch = JSON_SEARCH.search(elem.text) + if mymatch: + elem = SubElement(postbody, 'p') + elem.text = trim(mymatch[1].replace('\\"', '"')) + return postbody, elem.text, len(elem.text) + + tree = basic_cleaning(tree) + + # scrape from article tag + article_elem = tree.find('.//article') + if article_elem is not None: + temp_text = trim(article_elem.text_content()) + if len(temp_text) > 100: + elem = SubElement(postbody, 'p') + elem.text = temp_text + return postbody, temp_text, len(temp_text) + # scrape from text paragraphs + results = set() + for element in tree.iter('blockquote', 'code', 'p', 'pre', 'q', 'quote'): + entry = element.text_content() + if entry not in results: + elem = SubElement(postbody, 'p') + elem.text = entry + results.add(entry) + temp_text = trim('\n'.join(postbody.itertext())) + if len(temp_text) > 100: + return postbody, temp_text, len(temp_text) + # default strategy: clean the tree and take everything + postbody = Element('body') + body_elem = tree.find('.//body') + if body_elem is not None: + # elem.text = trim(body_elem.text_content()) + text = '\n'.join([trim(e) for e in body_elem.itertext()]) + if len(text) > 100: + elem = SubElement(postbody, 'p') + elem.text = text + return postbody, text, len(text) + # new fallback + text = html2txt(tree) + elem = SubElement(postbody, 'p') + elem.text = text + return postbody, text, len(text) + # old: return postbody, '', 0 + + +def html2txt(content): + """Run basic html2txt on a document. + + Args: + content: HTML document as string or LXML element. + + Returns: + The extracted text in the form of a string or an empty string. + + """ + tree = load_html(content) + if tree is None: + return "" + body = tree.find(".//body") + if body is None: + return "" + tree = basic_cleaning(tree) + return " ".join(body.text_content().split()).strip() diff --git a/trafilatura/cli_utils.py b/trafilatura/cli_utils.py index cc0af698..188da8c9 100644 --- a/trafilatura/cli_utils.py +++ b/trafilatura/cli_utils.py @@ -17,7 +17,8 @@ from trafilatura import spider -from .core import extract, html2txt +from .baseline import html2txt +from .core import extract from .downloads import (add_to_compressed_dict, buffered_downloads, load_download_buffer) from .feeds import find_feed_urls @@ -26,7 +27,7 @@ from .meta import reset_caches from .settings import FILENAME_LEN, MAX_FILES_PER_DIRECTORY, use_config from .sitemaps import sitemap_search -from .utils import URL_BLACKLIST_REGEX, make_chunks, uniquify_list +from .utils import URL_BLACKLIST_REGEX, make_chunks LOGGER = logging.getLogger(__name__) @@ -67,7 +68,7 @@ def load_input_urls(args): LOGGER.warning('No input provided') # uniq URLs while preserving order (important) - return uniquify_list(input_urls) + return list(dict.fromkeys(input_urls)) def load_blacklist(filename): diff --git a/trafilatura/core.py b/trafilatura/core.py index 0a302eb2..80374576 100644 --- a/trafilatura/core.py +++ b/trafilatura/core.py @@ -5,15 +5,16 @@ import logging import re # import regex as re +import sys import warnings + from copy import deepcopy from lxml.etree import Element, SubElement, XPath, strip_elements, strip_tags, tostring -from lxml.html import tostring # own -from .external import (SANITIZED_XPATH, justext_rescue, sanitize_tree, - try_readability) +from .baseline import baseline +from .external import compare_extraction from .filters import (LANGID_FLAG, check_html_lang, duplicate_test, language_filter, text_chars_test) from .hashing import content_fingerprint @@ -21,9 +22,8 @@ handle_textnode, link_density_test_tables, process_node, prune_unwanted_nodes, tree_cleaning) from .metadata import Document, extract_metadata -from .settings import BASIC_CLEAN_XPATH, DEFAULT_CONFIG, TAG_CATALOG, use_config -from .utils import (is_image_file, load_html, normalize_unicode, trim, - FORMATTING_PROTECTED) +from .settings import DEFAULT_CONFIG, TAG_CATALOG, use_config +from .utils import FORMATTING_PROTECTED, is_image_file, load_html, normalize_unicode from .xml import (build_json_output, build_tei_output, build_xml_output, control_xml_output, remove_empty_elements, strip_double_tags, xmltotxt, xmltocsv) from .xpaths import (BODY_XPATH, COMMENTS_DISCARD_XPATH, COMMENTS_XPATH, @@ -40,8 +40,6 @@ CODES_QUOTES = {'code', 'quote'} NOT_AT_THE_END = {'head', 'ref'} -JSON_SEARCH = re.compile(r'"articlebody": *"(.+?)(? 0: - # set attribute - if child.get('rend') is not None: - newchildelem.set('rend', child.get('rend')) - processed_element.append(newchildelem) - child.tag = 'done' - element.tag = 'done' + last_subchild.tail += " " + child.tail + if new_child_elem.text or len(new_child_elem) > 0: + update_elem_rendition(child, new_child_elem) + processed_element.append(new_child_elem) + child.tag = "done" + element.tag = "done" # test if it has children and text. Avoid double tags?? - if len(processed_element) > 0 and text_chars_test(''.join(processed_element.itertext())) is True: - # set attribute - if element.get('rend') is not None: - processed_element.set('rend', element.get('rend')) + if is_text_element(processed_element): + update_elem_rendition(element, processed_element) return processed_element return None def is_code_block_element(element): + "Check if it is a code element according to common structural markers." # pip - if element.get('lang') is not None or element.tag == 'code': + if element.get("lang") or element.tag == "code": return True # GitHub parent = element.getparent() - if parent is not None and 'highlight' in parent.get('class', default=''): + if parent is not None and "highlight" in parent.get("class", ""): return True # highlightjs - code = element.find('code') - if code is not None and len(element.getchildren()) == 1: + code = element.find("code") + if code is not None and len(element) == 1: return True return False def handle_code_blocks(element): + "Turn element into a properly tagged code block." processed_element = deepcopy(element) - for child in element.iter('*'): - child.tag = 'done' - processed_element.tag = 'code' + for child in element.iter("*"): + child.tag = "done" + processed_element.tag = "code" return processed_element def handle_quotes(element, options): - '''Process quotes elements''' + "Process quotes elements." if is_code_block_element(element): return handle_code_blocks(element) processed_element = Element(element.tag) - for child in element.iter('*'): + for child in element.iter("*"): processed_child = process_node(child, options) # handle_textnode(child, comments_fix=True) - if processed_child is not None: - newsub = SubElement(processed_element, child.tag) - newsub.text, newsub.tail = processed_child.text, processed_child.tail - child.tag = 'done' - if len(processed_element) > 0 and text_chars_test(''.join(processed_element.itertext())) is True: + define_newelem(processed_child, processed_element) + child.tag = "done" + if is_text_element(processed_element): # avoid double/nested tags - strip_tags(processed_element, 'quote') + strip_tags(processed_element, "quote") return processed_element return None def handle_other_elements(element, potential_tags, options): - '''Handle diverse or unknown elements in the scope of relevant tags''' + "Handle diverse or unknown elements in the scope of relevant tags." # handle w3schools code - if element.tag == 'div' and 'w3-code' in element.get('class', default=''): + if element.tag == "div" and "w3-code" in element.get("class", ""): return handle_code_blocks(element) + # delete unwanted if element.tag not in potential_tags: - if element.tag != 'done': - LOGGER.debug('discarding element: %s %s', element.tag, element.text) + if element.tag != "done": + LOGGER.debug("discarding element: %s %s", element.tag, element.text) return None - if element.tag == 'div': + + if element.tag == "div": # make a copy and prune it in case it contains sub-elements handled on their own? # divcopy = deepcopy(element) processed_element = handle_textnode(element, options, comments_fix=False, preserve_spaces=True) if processed_element is not None and text_chars_test(processed_element.text) is True: processed_element.attrib.clear() # small div-correction # could be moved elsewhere - if processed_element.tag == 'div': - processed_element.tag = 'p' + if processed_element.tag == "div": + processed_element.tag = "p" # insert return processed_element else: - LOGGER.debug('unexpected element seen: %s %s', element.tag, element.text) + LOGGER.debug("unexpected element seen: %s %s", element.tag, element.text) + return None def handle_paragraphs(element, potential_tags, options): - '''Process paragraphs (p) elements along with their children, - trim and clean the content''' - element.attrib.clear() + "Process paragraphs along with their children, trim and clean the content." + element.attrib.clear() # todo: test if necessary # strip_tags(element, 'p') # change in precision due to spaces? + # no children if len(element) == 0: - processed_element = process_node(element, options) - if processed_element is not None: - return processed_element - return None + return process_node(element, options) + # children processed_element = Element(element.tag) - for child in element.iter('*'): - if child.tag not in potential_tags and child.tag != 'done': - LOGGER.debug('unexpected in p: %s %s %s', child.tag, child.text, child.tail) + for child in element.iter("*"): + if child.tag not in potential_tags and child.tag != "done": + LOGGER.debug("unexpected in p: %s %s %s", child.tag, child.text, child.tail) continue # spacing = child.tag in SPACING_PROTECTED # todo: outputformat.startswith('xml')? # todo: act on spacing here? processed_child = handle_textnode(child, options, comments_fix=False, preserve_spaces=True) if processed_child is not None: # todo: needing attention! - if processed_child.tag == 'p': - LOGGER.debug('extra p within p: %s %s %s', processed_child.tag, processed_child.text, + if processed_child.tag == "p": + LOGGER.debug("extra p within p: %s %s %s", processed_child.tag, processed_child.text, processed_child.tail) if processed_element.text: - processed_element.text += ' ' + processed_child.text + processed_element.text += " " + processed_child.text else: processed_element.text = processed_child.text + child.tag = "done" continue # handle formatting newsub = Element(child.tag) @@ -306,14 +324,14 @@ def handle_paragraphs(element, potential_tags, options): if len(processed_child) > 0: for item in processed_child: # children are lists if text_chars_test(item.text) is True: - item.text = ' ' + item.text + item.text = " " + item.text strip_tags(processed_child, item.tag) # correct attributes - if child.tag == 'hi': - newsub.set('rend', child.get('rend')) - elif child.tag == 'ref': - if child.get('target') is not None: - newsub.set('target', child.get('target')) + if child.tag == "hi": + newsub.set("rend", child.get("rend")) + elif child.tag == "ref": + if child.get("target") is not None: + newsub.set("target", child.get("target")) # handle line breaks # elif processed_child.tag == 'lb': # try: @@ -334,61 +352,61 @@ def handle_paragraphs(element, potential_tags, options): # newsub.tail = processed_child.text newsub.text, newsub.tail = processed_child.text, processed_child.tail processed_element.append(newsub) - child.tag = 'done' + child.tag = "done" # finish if len(processed_element) > 0: + last_elem = processed_element[-1] # clean trailing lb-elements - if ( - processed_element[-1].tag == 'lb' - and processed_element[-1].tail is None - ): - processed_element[-1].getparent().remove(processed_element[-1]) + if last_elem.tag == "lb" and last_elem.tail is None: + last_elem.getparent().remove(last_elem) return processed_element if processed_element.text: return processed_element - LOGGER.debug('discarding p-child: %s', tostring(processed_element)) + LOGGER.debug("discarding p-child: %s", tostring(processed_element)) return None def define_cell_type(element): - '''Determine cell element type and mint new element''' + "Determine cell element type and mint new element." # define tag - cell_element = Element('cell') - if element.tag == 'th': - cell_element.set('role', 'head') + cell_element = Element("cell") + if element.tag == "th": + cell_element.set("role", "head") return cell_element def handle_table(table_elem, potential_tags, options): - '''Process single table element''' - newtable = Element('table') - newrow = Element('row') + "Process single table element." + newtable = Element("table") + newrow = Element("row") + # strip these structural elements - strip_tags(table_elem, 'thead', 'tbody', 'tfoot') + strip_tags(table_elem, "thead", "tbody", "tfoot") + # explore sub-elements for subelement in table_elem.iterdescendants(): - if subelement.tag == 'tr': + if subelement.tag == "tr": # process existing row if len(newrow) > 0: newtable.append(newrow) - newrow = Element('row') + newrow = Element("row") elif subelement.tag in TABLE_ELEMS: - newchildelem = define_cell_type(subelement) + new_child_elem = define_cell_type(subelement) # process if len(subelement) == 0: processed_cell = process_node(subelement, options) if processed_cell is not None: - newchildelem.text, newchildelem.tail = processed_cell.text, processed_cell.tail + new_child_elem.text, new_child_elem.tail = processed_cell.text, processed_cell.tail else: # proceed with iteration, fix for nested elements - newchildelem.text, newchildelem.tail = subelement.text, subelement.tail + new_child_elem.text, new_child_elem.tail = subelement.text, subelement.tail subelement.tag = "done" for child in subelement.iterdescendants(): if child.tag in TABLE_ALL: # todo: define attributes properly if child.tag in TABLE_ELEMS: # subcell_elem = define_cell_type(subelement) - child.tag = 'cell' + child.tag = "cell" processed_subchild = handle_textnode(child, options, preserve_spaces=True, comments_fix=True) # todo: lists in table cells elif child.tag == "list" and options.recall: @@ -398,20 +416,19 @@ def handle_table(table_elem, potential_tags, options): processed_subchild = None # don't handle it anymore else: # subcell_elem = Element(child.tag) - processed_subchild = handle_textelem(child, potential_tags.union(['div']), options) + processed_subchild = handle_textelem(child, potential_tags.union(["div"]), options) # add child element to processed_element - if processed_subchild is not None: - subchildelem = SubElement(newchildelem, processed_subchild.tag) - subchildelem.text, subchildelem.tail = processed_subchild.text, processed_subchild.tail - child.tag = 'done' + define_newelem(processed_subchild, new_child_elem) + child.tag = "done" # add to tree - if newchildelem.text or len(newchildelem) > 0: - newrow.append(newchildelem) + if new_child_elem.text or len(new_child_elem) > 0: + newrow.append(new_child_elem) # beware of nested tables - elif subelement.tag == 'table': + elif subelement.tag == "table": break # cleanup - subelement.tag = 'done' + subelement.tag = "done" + # end of processing if len(newrow) > 0: newtable.append(newrow) @@ -421,30 +438,35 @@ def handle_table(table_elem, potential_tags, options): def handle_image(element): - '''Process image element''' - # image source + "Process image elements and their relevant attributes." processed_element = Element(element.tag) - if is_image_file(element.get('data-src')): - processed_element.set('src', element.get('data-src')) - elif is_image_file(element.get('src')): - processed_element.set('src', element.get('src')) + + for attr in ("data-src", "src"): + src = element.get(attr) + if is_image_file(src): + processed_element.set("src", src) + break else: # take the first corresponding attribute - for attr in element.attrib: - if attr.startswith('data-src') and is_image_file(element.get(attr)): - processed_element.set('src', element.get(attr)) + for attr, value in element.attrib.items(): + if attr.startswith("data-src") and is_image_file(value): + processed_element.set("src", value) break + # additional data - if element.get('alt') is not None: - processed_element.set('alt', element.get('alt')) - if element.get('title') is not None: - processed_element.set('title', element.get('title')) + if element.get("alt") is not None: + processed_element.set("alt", element.get("alt")) + if element.get("title") is not None: + processed_element.set("title", element.get("title")) + # don't return empty elements or elements without source, just None - if len(processed_element.attrib) == 0 or not processed_element.get('src'): + if not processed_element.attrib or not processed_element.get("src"): return None + # post-processing: URLs - url = processed_element.get('src') - processed_element.set('src', re.sub(r'^//', 'http://', url)) + if not processed_element.get("src").startswith("http"): + processed_element.set("src", re.sub(r"^//", "http://", processed_element.get("src"))) + return processed_element @@ -544,9 +566,8 @@ def extract_content(tree, options): # iterate for expr in BODY_XPATH: # select tree if the expression has been found - try: - subtree = expr(tree)[0] - except IndexError: + subtree = next((s for s in expr(tree) if s is not None), None) + if subtree is None: continue # prune the subtree subtree = prune_unwanted_sections(subtree, potential_tags, options) @@ -581,7 +602,7 @@ def extract_content(tree, options): if {e.tag for e in subelems} == {'lb'}: subelems = [subtree] # extract content - result_body.extend(filter(lambda x: x is not None, (handle_textelem(e, potential_tags, options) for e in subelems))) + result_body.extend([el for el in (handle_textelem(e, potential_tags, options) for e in subelems) if el is not None]) # remove trailing titles while len(result_body) > 0 and (result_body[-1].tag in NOT_AT_THE_END): result_body[-1].getparent().remove(result_body[-1]) @@ -617,21 +638,20 @@ def process_comments_node(elem, potential_tags, options): def extract_comments(tree, options): - '''Try and extract comments out of potential sections in the HTML''' - comments_body = Element('body') + "Try and extract comments out of potential sections in the HTML." + comments_body = Element("body") # define iteration strategy potential_tags = set(TAG_CATALOG) # 'span' # potential_tags.add('div') trouble with
' + elem.tag = "del" + elem.set("rend", "overstrike") + + +def convert_details(elem): + "Handle details and summary." + elem.tag = "div" + for subelem in elem.iter("summary"): + subelem.tag = "head" + + +CONVERSIONS = { + "dl": convert_lists, "ol": convert_lists, "ul": convert_lists, + "h1": convert_headings, "h2": convert_headings, "h3": convert_headings, + "h4": convert_headings, "h5": convert_headings, "h6": convert_headings, + "br": convert_line_breaks, "hr": convert_line_breaks, + "blockquote": convert_quotes, "pre": convert_quotes, "q": convert_quotes, + "del": convert_deletions, "s": convert_deletions, "strike": convert_deletions, + "details": convert_details, +} + + def convert_tags(tree, options, url=None): '''Simplify markup and convert relevant HTML tags to an XML standard''' # delete links for faster processing - if options.links is False: + if not options.links: xpath_expr = './/div//a|.//ul//a' # .//p//a ? - if options.tables is True: + if options.tables: xpath_expr += '|.//table//a' # necessary for further detection for elem in tree.xpath(xpath_expr): @@ -229,131 +294,86 @@ def convert_tags(tree, options, url=None): # replace href attribute and delete the rest target = elem.get('href') # defaults to None elem.attrib.clear() - if target is not None: + if target: # convert relative URLs - if base_url is not None: + if base_url: target = fix_relative_urls(base_url, target) elem.set('target', target) - # include_formatting - if options.formatting is False: - strip_tags(tree, *REND_TAG_MAPPING) - else: - for elem in tree.iter(list(REND_TAG_MAPPING)): + + if options.formatting: + for elem in tree.iter(REND_TAG_MAPPING.keys()): attribute = REND_TAG_MAPPING[elem.tag] elem.tag = 'hi' elem.set('rend', attribute) + else: + strip_tags(tree, *REND_TAG_MAPPING) + # iterate over all concerned elements - for elem in tree.iter('blockquote', 'br', 'del', 'details', 'dl', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'hr', 'ol', 'pre', 'q', 's', 'strike', 'ul'): - # ul/ol → list / li → item - if elem.tag in ('dl', 'ol', 'ul'): - elem.set('rend', elem.tag) - elem.tag = 'list' - i = 1 - for subelem in elem.iter('dd', 'dt', 'li'): - # keep track of dd/dt items - if subelem.tag in ('dd', 'dt'): - subelem.set('rend', f"{subelem.tag}-{i}") - # increment counter after in description list
- if subelem.tag == 'dd':
- i += 1
- # convert elem tag
- subelem.tag = 'item'
- # head tags + delete attributes
- elif elem.tag in ('h1', 'h2', 'h3', 'h4', 'h5', 'h6'):
- elem.attrib.clear()
- elem.set('rend', elem.tag)
- elem.tag = 'head'
- # br → lb
- elif elem.tag in ('br', 'hr'):
- elem.tag = 'lb'
+ for elem in tree.iter(CONVERSIONS.keys()):
+ CONVERSIONS[elem.tag](elem)
# wbr
# pre
#elif elem.tag == 'pre':
# else:
# elem.tag = 'quote'
- # blockquote, q → quote
- elif elem.tag in ('blockquote', 'pre', 'q'):
- code_flag = False
- if elem.tag == 'pre':
- # detect if there could be code inside
- children = elem.getchildren()
- # pre with a single span is more likely to be code
- if len(children) == 1 and children[0].tag == 'span':
- code_flag = True
- # find hljs elements to detect if it's code
- code_elems = elem.xpath(".//span[starts-with(@class,'hljs')]")
- if code_elems:
- code_flag = True
- for subelem in code_elems:
- subelem.attrib.clear()
- if code_flag:
- elem.tag = 'code'
- else:
- elem.tag = 'quote'
- # del | s | strike → - elif elem.tag in ('del', 's', 'strike'): - elem.tag = 'del' - elem.set('rend', 'overstrike') - # details + summary - elif elem.tag == 'details': - elem.tag = 'div' - for subelem in elem.iter('summary'): - subelem.tag = 'head' # images - if options.images is True: + if options.images: for elem in tree.iter('img'): elem.tag = 'graphic' return tree -def handle_textnode(element, options, comments_fix=True, preserve_spaces=False): - '''Convert, format, and probe potential text elements''' - if element.text is None and element.tail is None and len(element) == 0: +def handle_textnode(elem, options, comments_fix=True, preserve_spaces=False): + "Convert, format, and probe potential text elements." + if elem.tag == "done" or (len(elem) == 0 and not elem.text and not elem.tail): return None + # lb bypass - if comments_fix is False and element.tag == 'lb': - if preserve_spaces is False: - element.tail = trim(element.tail) - # if textfilter(element) is True: + if not comments_fix and elem.tag == "lb": + if not preserve_spaces: + elem.tail = trim(elem.tail) + # if textfilter(elem) is True: # return None # duplicate_test(subelement)? - return element - if element.text is None and len(element) == 0: + return elem + + if not elem.text and len(elem) == 0: # try the tail - # LOGGER.debug('using tail for element %s', element.tag) - element.text, element.tail = element.tail, '' + # LOGGER.debug('using tail for element %s', elem.tag) + elem.text, elem.tail = elem.tail, "" # handle differently for br/lb - if comments_fix and element.tag == 'lb': - element.tag = 'p' + if comments_fix and elem.tag == "lb": + elem.tag = "p" + # trim - if preserve_spaces is False: - element.text = trim(element.text) - if element.tail: - element.tail = trim(element.tail) + if not preserve_spaces: + elem.text = trim(elem.text) + if elem.tail: + elem.tail = trim(elem.tail) + # filter content # or not re.search(r'\w', element.text): # text_content()? - if not element.text and textfilter(element) is True: + if not elem.text and textfilter(elem) or \ + (options.dedup and duplicate_test(elem, options.config)): return None - if options.dedup and duplicate_test(element, options.config) is True: - return None - return element + return elem -def process_node(element, options): - '''Convert, format, and probe potential text elements (light format)''' - if element.tag == 'done': - return None - if len(element) == 0 and not element.text and not element.tail: +def process_node(elem, options): + "Convert, format, and probe potential text elements (light format)." + if elem.tag == "done" or (len(elem) == 0 and not elem.text and not elem.tail): return None + # trim - element.text, element.tail = trim(element.text), trim(element.tail) + elem.text, elem.tail = trim(elem.text), trim(elem.tail) + # adapt content string - if element.tag != 'lb' and not element.text and element.tail: - element.text, element.tail = element.tail, None + if elem.tag != "lb" and not elem.text and elem.tail: + elem.text, elem.tail = elem.tail, None + # content checks - if element.text or element.tail: - if textfilter(element) is True: + if elem.text or elem.tail: + if textfilter(elem) or (options.dedup and duplicate_test(elem, options.config)): return None - if options.dedup and duplicate_test(element, options.config) is True: - return None - return element + + return elem diff --git a/trafilatura/lru.py b/trafilatura/lru.py index d0675af4..227b7fb3 100644 --- a/trafilatura/lru.py +++ b/trafilatura/lru.py @@ -27,8 +27,7 @@ def __init__(self, maxsize=128): def _move_link(self, link): # Move the link to the front of the circular queue link_prev, link_next, _key, result = link - link_prev[NEXT] = link_next - link_next[PREV] = link_prev + link_prev[NEXT], link_next[PREV] = link_next, link_prev last = self.root[PREV] last[NEXT] = self.root[PREV] = link link[PREV] = last @@ -40,7 +39,7 @@ def get(self, key): and retrieve its value from the linked list''' with self.lock: link = self.cache.get(key) - if link is not None: + if link: return self._move_link(link) return -1 @@ -49,39 +48,37 @@ def put(self, key, value): # Size limited caching that tracks accesses by recency with self.lock: link = self.cache.get(key) - if link is not None: + if link: self._move_link(link) self.cache[key][RESULT] = value - return - with self.lock: - if self.full: - # Use the old root to store the new key and result. - oldroot = self.root - oldroot[KEY] = key - oldroot[RESULT] = value - # Empty the oldest link and make it the new root. - # Keep a reference to the old key and old result to - # prevent their ref counts from going to zero during the - # update. That will prevent potentially arbitrary object - # clean-up code (i.e. __del__) from running while we're - # still adjusting the links. - self.root = oldroot[NEXT] - oldkey = self.root[KEY] - self.root[KEY] = self.root[RESULT] = None - # Now update the cache dictionary. - del self.cache[oldkey] - # Save the potentially reentrant cache[key] assignment - # for last, after the root and links have been put in - # a consistent state. - self.cache[key] = oldroot else: - # Put result in a new link at the front of the queue. - last = self.root[PREV] - link = [last, self.root, key, value] - last[NEXT] = self.root[PREV] = self.cache[key] = link - # Use the cache_len bound method instead of the len() function - # which could potentially be wrapped in an lru_cache itself. - self.full = len(self.cache) >= self.maxsize + if self.full: + # Use the old root to store the new key and result. + oldroot = self.root + oldroot[KEY], oldroot[RESULT] = key, value + # Empty the oldest link and make it the new root. + # Keep a reference to the old key and old result to + # prevent their ref counts from going to zero during the + # update. That will prevent potentially arbitrary object + # clean-up code (i.e. __del__) from running while we're + # still adjusting the links. + self.root = oldroot[NEXT] + oldkey = self.root[KEY] + self.root[KEY] = self.root[RESULT] = None + # Now update the cache dictionary. + del self.cache[oldkey] + # Save the potentially reentrant cache[key] assignment + # for last, after the root and links have been put in + # a consistent state. + self.cache[key] = oldroot + else: + # Put result in a new link at the front of the queue. + last = self.root[PREV] + link = [last, self.root, key, value] + last[NEXT] = self.root[PREV] = self.cache[key] = link + # Use the cache_len bound method instead of the len() function + # which could potentially be wrapped in an lru_cache itself. + self.full = len(self.cache) >= self.maxsize def clear(self): '''Delete all cache content''' diff --git a/trafilatura/metadata.py b/trafilatura/metadata.py index d2dda0c8..dfa1aba8 100644 --- a/trafilatura/metadata.py +++ b/trafilatura/metadata.py @@ -17,7 +17,7 @@ from .metaxpaths import (author_discard_xpaths, author_xpaths, categories_xpaths, tags_xpaths, title_xpaths) from .utils import (line_processing, load_html, normalize_authors, - normalize_tags, trim, unescape, uniquify_list) + normalize_tags, trim, unescape) LOGGER = logging.getLogger(__name__) logging.getLogger('htmldate').setLevel(logging.WARNING) @@ -418,8 +418,7 @@ def extract_catstags(metatype, tree): #if not results: # for elem in tree.xpath('.//a[@href]'): # search for 'category' - results = [line_processing(x) for x in results if x is not None] - return uniquify_list([x for x in results if x is not None]) + return [r for r in dict.fromkeys(line_processing(x) for x in results if x) if r] def parse_license_element(element, strict=False): diff --git a/trafilatura/readability_lxml.py b/trafilatura/readability_lxml.py old mode 100755 new mode 100644 index 580bb7cf..e962ef70 --- a/trafilatura/readability_lxml.py +++ b/trafilatura/readability_lxml.py @@ -14,12 +14,15 @@ https://github.com/timbertson/python-readability https://github.com/buriy/python-readability -License of forked code: Apache-2.0 License +License of forked code: Apache-2.0. """ + import logging import re +from operator import attrgetter + from lxml.etree import tostring from lxml.html import fragment_fromstring @@ -28,33 +31,25 @@ LOGGER = logging.getLogger(__name__) -BAD_ATTRS = "|".join(["width", "height", "style", "[-a-z]*color", "background[-a-z]*", "on*"]) -QUOTES = '\'[^\']+\'|"[^"]+"' -NON_SPACE = "[^ \"'>]+" -HTMLSTRIP = re.compile( - "<" # open - "([^>]+) " # prefix - " (?:{BAD_ATTRS}) *" - + "= *(?:{NON_SPACE}|{QUOTES})" # undesirable attributes - + "([^>]*)" # value # postfix - ">", # end - re.I, -) - DOT_SPACE = re.compile(r"\.( |$)") -def clean_attributes(html): - while HTMLSTRIP.search(html): - html = HTMLSTRIP.sub("<\\1\\2>", html) - return html - - def _tostring(string): - return tostring(string, encoding=str, method='xml') - - -DIV_TO_P_ELEMS = {'a', 'blockquote', 'dl', 'div', 'img', 'ol', 'p', 'pre', 'table', 'ul'} + return tostring(string, encoding=str, method="xml") + + +DIV_TO_P_ELEMS = { + "a", + "blockquote", + "dl", + "div", + "img", + "ol", + "p", + "pre", + "table", + "ul", +} DIV_SCORES = {"div", "article"} BLOCK_SCORES = {"pre", "td", "blockquote"} @@ -74,26 +69,28 @@ def _tostring(string): re.I, ), "negativeRe": re.compile( - r"combx|comment|com-|contact|foot|footer|footnote|masthead|media|meta|outbrain|promo|related|scroll|shoutbox|sidebar|sponsor|shopping|tags|tool|widget", + r"button|combx|comment|com-|contact|figure|foot|footer|footnote|form|input|masthead|media|meta|outbrain|promo|related|scroll|shoutbox|sidebar|sponsor|shopping|tags|tool|widget", re.I, ), "divToPElementsRe": re.compile( - r"<(a|blockquote|dl|div|img|ol|p|pre|table|ul)", re.I + r"<(?:a|blockquote|dl|div|img|ol|p|pre|table|ul)", re.I ), - "videoRe": re.compile(r"https?:\/\/(www\.)?(youtube|vimeo)\.com", re.I), + "videoRe": re.compile(r"https?:\/\/(?:www\.)?(?:youtube|vimeo)\.com", re.I), } -FRAME_TAGS = {'body', 'html'} +FRAME_TAGS = {"body", "html"} LIST_TAGS = {"ol", "ul"} # DIV_TO_P_ELEMS = {'a', 'blockquote', 'dl', 'div', 'img', 'ol', 'p', 'pre', 'table', 'ul'} + def text_length(elem): + "Return the length of the element with all its contents." return len(trim(elem.text_content())) class Candidate: "Defines a class to score candidate elements." - __slots__ = ['score', 'elem'] + __slots__ = ["score", "elem"] def __init__(self, score, elem): self.score = score @@ -102,7 +99,8 @@ def __init__(self, score, elem): class Document: """Class to build a etree document out of html.""" - __slots__ = ['doc', 'min_text_length', 'retry_length'] + + __slots__ = ["doc", "min_text_length", "retry_length"] def __init__(self, doc, min_text_length=25, retry_length=250): """Generate the document @@ -121,13 +119,6 @@ def __init__(self, doc, min_text_length=25, retry_length=250): self.min_text_length = min_text_length self.retry_length = retry_length - def get_clean_html(self): - """ - An internal method, which can be overridden in subclasses, for example, - to disable or to improve DOM-to-text conversion in .summary() method - """ - return clean_attributes(_tostring(self.doc)) - def summary(self): """ Given a HTML file, extracts the text of the article. @@ -135,12 +126,11 @@ def summary(self): Warning: It mutates internal DOM representation of the HTML document, so it is better to call other API methods before this one. """ + for elem in self.doc.iter("script", "style"): + elem.drop_tree() + ruthless = True while True: - for i in self.tags(self.doc, "script", "style"): - i.drop_tree() - for i in self.tags(self.doc, "body"): - i.set("id", "readabilityBody") if ruthless: self.remove_unlikely_candidates() self.transform_misused_divs_into_paragraphs() @@ -148,23 +138,27 @@ def summary(self): best_candidate = self.select_best_candidate(candidates) - if best_candidate is not None: + if best_candidate: article = self.get_article(candidates, best_candidate) else: if ruthless is True: ruthless = False - LOGGER.debug("Ended up stripping too much - going for a safer parse") + LOGGER.debug( + "Ended up stripping too much - going for a safer parse" + ) # try again continue # go ahead - LOGGER.debug("Ruthless and lenient parsing did not work. Returning raw html") + LOGGER.debug( + "Ruthless and lenient parsing did not work. Returning raw html" + ) article = self.doc.find("body") if article is None: article = self.doc cleaned_article = self.sanitize(article, candidates) article_length = len(cleaned_article or "") - if ruthless is True and article_length < self.retry_length: + if ruthless and article_length < self.retry_length: ruthless = False # Loop through and try again. continue @@ -184,9 +178,7 @@ def get_article(self, candidates, best_candidate): # if isinstance(sibling, NavigableString): continue append = False # conditions - if sibling == best_candidate.elem: - append = True - elif ( + if sibling == best_candidate.elem or ( sibling in candidates and candidates[sibling].score >= sibling_score_threshold ): @@ -196,18 +188,20 @@ def get_article(self, candidates, best_candidate): node_content = sibling.text or "" node_length = len(node_content) - if node_length > 80 and link_density < 0.25: - append = True - elif ( - node_length <= 80 - and link_density == 0 - and DOT_SPACE.search(node_content) + if ( + node_length > 80 + and link_density < 0.25 + or ( + node_length <= 80 + and link_density == 0 + and DOT_SPACE.search(node_content) + ) ): append = True # append to the output div if append: output.append(sibling) - #if output is not None: + # if output is not None: # output.append(best_candidate.elem) return output @@ -215,22 +209,22 @@ def select_best_candidate(self, candidates): if not candidates: return None sorted_candidates = sorted( - candidates.values(), key=lambda x: x.score, reverse=True + candidates.values(), key=attrgetter("score"), reverse=True ) - for candidate in sorted_candidates[:5]: - LOGGER.debug("Top 5: %s %s", candidate.elem.tag, candidate.score) - # return best candidate - return sorted_candidates[0] + if LOGGER.isEnabledFor(logging.DEBUG): + for candidate in sorted_candidates[:5]: + LOGGER.debug("Top 5: %s %s", candidate.elem.tag, candidate.score) + return next(iter(sorted_candidates)) def get_link_density(self, elem): total_length = text_length(elem) or 1 - link_length = sum(text_length(elem) for elem in elem.findall(".//a")) + link_length = sum(text_length(link) for link in elem.findall(".//a")) return link_length / total_length def score_paragraphs(self): candidates = {} - ordered = [] - for elem in self.tags(self.doc, "p", "pre", "td"): + + for elem in self.doc.iter("p", "pre", "td"): parent_node = elem.getparent() if parent_node is None: continue @@ -239,20 +233,16 @@ def score_paragraphs(self): elem_text = trim(elem.text_content()) elem_text_len = len(elem_text) - # don't count too short paragraphs + # discard too short paragraphs if elem_text_len < self.min_text_length: continue - if parent_node not in candidates: - candidates[parent_node] = self.score_node(parent_node) - ordered.append(parent_node) - - if grand_parent_node is not None and grand_parent_node not in candidates: - candidates[grand_parent_node] = self.score_node(grand_parent_node) - ordered.append(grand_parent_node) + for node in (parent_node, grand_parent_node): + if node is not None and node not in candidates: + candidates[node] = self.score_node(node) score = 1 + len(elem_text.split(",")) + min((elem_text_len / 100), 3) - #if elem not in candidates: + # if elem not in candidates: # candidates[elem] = self.score_node(elem) candidates[parent_node].score += score @@ -262,13 +252,8 @@ def score_paragraphs(self): # Scale the final candidates score based on link density. Good content # should have a relatively small link density (5% or less) and be # mostly unaffected by this operation. - for elem in ordered: - candidate = candidates[elem] - density = self.get_link_density(elem) - # LOGGER.debug("Branch %6.3f link density %.3f -> %6.3f", - # candidate.score, density, candidate.score * (1 - density) - #) - candidate.score *= 1 - density + for elem, candidate in candidates.items(): + candidate.score *= 1 - self.get_link_density(elem) return candidates @@ -296,7 +281,7 @@ def score_node(self, elem): def remove_unlikely_candidates(self): for elem in self.doc.findall(".//*"): - attrs = ' '.join(filter(None, (elem.get("class"), elem.get("id")))) + attrs = " ".join(filter(None, (elem.get("class"), elem.get("id")))) if len(attrs) < 2: continue if ( @@ -308,55 +293,43 @@ def remove_unlikely_candidates(self): elem.drop_tree() def transform_misused_divs_into_paragraphs(self): - for elem in self.tags(self.doc, "div"): + for elem in self.doc.findall(".//div"): # transforms that do not contain other block elements into
# s,
s # FIXME: The current implementation ignores all descendants that # are not direct children of elem # This results in incorrect results in case there is an
# buried within an for example
- #hurts precision:
- #if not any(e.tag in DIV_TO_P_ELEMS for e in list(elem)):
+ # hurts precision:
+ # if not any(e.tag in DIV_TO_P_ELEMS for e in list(elem)):
if not REGEXES["divToPElementsRe"].search(
- ''.join([_tostring(e) for e in list(elem)])
+ "".join(map(_tostring, list(elem)))
):
elem.tag = "p"
- for elem in self.tags(self.doc, "div"):
- if elem.text is not None:
- elem_text = elem.text.strip()
- if elem_text:
- p_elem = fragment_fromstring("")
- p_elem.text = elem.text
- elem.text = None
- elem.insert(0, p_elem)
+ for elem in self.doc.findall(".//div"):
+ if elem.text and elem.text.strip():
+ p_elem = fragment_fromstring("")
+ p_elem.text, elem.text = elem.text, None
+ elem.insert(0, p_elem)
for pos, child in sorted(enumerate(elem), reverse=True):
if child.tail and child.tail.strip():
p_elem = fragment_fromstring("")
- p_elem.text = child.tail
- child.tail = None
+ p_elem.text, child.tail = child.tail, None
elem.insert(pos + 1, p_elem)
if child.tag == "br":
child.drop_tree()
- def tags(self, node, *tag_names):
- for tag_name in tag_names:
- yield from node.findall(f".//{tag_name}")
-
- def reverse_tags(self, node, *tag_names):
- for tag_name in tag_names:
- yield from reversed(node.findall(f".//{tag_name}"))
-
def sanitize(self, node, candidates):
- for header in self.tags(node, "h1", "h2", "h3", "h4", "h5", "h6"):
+ for header in node.iter("h1", "h2", "h3", "h4", "h5", "h6"):
if self.class_weight(header) < 0 or self.get_link_density(header) > 0.33:
header.drop_tree()
- for elem in self.tags(node, "form", "textarea"):
+ for elem in node.iter("form", "textarea"):
elem.drop_tree()
- for elem in self.tags(node, "iframe"):
+ for elem in node.iter("iframe"):
if "src" in elem.attrib and REGEXES["videoRe"].search(elem.attrib["src"]):
elem.text = "VIDEO" # ADD content to iframe text node to force proper output
else:
@@ -364,21 +337,24 @@ def sanitize(self, node, candidates):
allowed = set()
# Conditionally clean
s, ands
- for elem in self.reverse_tags(
- node, "table", "ul", "div", "aside", "header", "footer", "section"
- ):
+ for elem in reversed(node.xpath("//table|//ul|//div|//aside|//header|//footer|//section")):
if elem in allowed:
continue
weight = self.class_weight(elem)
score = candidates[elem].score if elem in candidates else 0
if weight + score < 0:
- LOGGER.debug("Removed %s with score %6.3f and weight %-3s",
- elem.tag, score, weight
+ LOGGER.debug(
+ "Removed %s with score %6.3f and weight %-3s",
+ elem.tag,
+ score,
+ weight,
)
elem.drop_tree()
elif elem.text_content().count(",") < 10:
to_remove = False
- counts = {kind: len(elem.findall(f".//{kind}")) for kind in TEXT_CLEAN_ELEMS}
+ counts = {
+ kind: len(elem.findall(f".//{kind}")) for kind in TEXT_CLEAN_ELEMS
+ }
counts["li"] -= 100
counts["input"] -= len(elem.findall('.//input[@type="hidden"]'))
@@ -387,7 +363,11 @@ def sanitize(self, node, candidates):
link_density = self.get_link_density(elem)
parent_node = elem.getparent()
if parent_node is not None:
- score = candidates[parent_node].score if parent_node in candidates else 0
+ score = (
+ candidates[parent_node].score
+ if parent_node in candidates
+ else 0
+ )
# if elem.tag == 'div' and counts["img"] >= 1:
# continue
if counts["p"] and counts["img"] > 1 + counts["p"] * 1.3:
@@ -403,16 +383,26 @@ def sanitize(self, node, candidates):
reason = f"too short content length {content_length} without a single image"
to_remove = True
elif content_length < self.min_text_length and counts["img"] > 2:
- reason = f"too short content length {content_length} and too many images"
+ reason = (
+ f"too short content length {content_length} and too many images"
+ )
to_remove = True
elif weight < 25 and link_density > 0.2:
- reason = f"too many links {link_density:.3f} for its weight {weight}"
+ reason = (
+ f"too many links {link_density:.3f} for its weight {weight}"
+ )
to_remove = True
elif weight >= 25 and link_density > 0.5:
- reason = f"too many links {link_density:.3f} for its weight {weight}"
+ reason = (
+ f"too many links {link_density:.3f} for its weight {weight}"
+ )
to_remove = True
- elif (counts["embed"] == 1 and content_length < 75) or counts["embed"] > 1:
- reason = "