From 2cf94c78c15cabae67fb458d97b45da8907045bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julian=20Kl=C3=BCber?= Date: Sat, 23 Mar 2024 16:52:35 +0100 Subject: [PATCH 1/9] build new direct answer parser --- .../parsers/warc_direct_answers.py | 272 ++++++++++++++++++ 1 file changed, 272 insertions(+) create mode 100644 archive_query_log/parsers/warc_direct_answers.py diff --git a/archive_query_log/parsers/warc_direct_answers.py b/archive_query_log/parsers/warc_direct_answers.py new file mode 100644 index 0000000..b2aa2fa --- /dev/null +++ b/archive_query_log/parsers/warc_direct_answers.py @@ -0,0 +1,272 @@ +from functools import cache +from itertools import chain +from typing import Iterable, Iterator +from urllib.parse import urljoin +from uuid import uuid5 + +from click import echo +from elasticsearch_dsl import Search +from elasticsearch_dsl.function import RandomScore +from elasticsearch_dsl.query import FunctionScore, Term, RankFeature, Exists +# noinspection PyProtectedMember +from lxml.etree import _Element, tostring # nosec: B410 +from tqdm.auto import tqdm +from warc_s3 import WarcS3Store + +from archive_query_log.config import Config +from archive_query_log.namespaces import NAMESPACE_WARC_DIRECT_ANSWER_PARSER, \ + NAMESPACE_RESULT +from archive_query_log.orm import Serp, InnerParser, InnerProviderId, \ + WarcDirectAnswerParserType, WarcDirectAnswerParser, WarcLocation, DirectAnswer, \ + Result, InnerSerp, DirectAnswerId, InnerDownloader +from archive_query_log.parsers.warc import open_warc +from archive_query_log.parsers.xml import parse_xml_tree, safe_xpath +from archive_query_log.utils.es import safe_iter_scan, update_action +from archive_query_log.utils.time import utc_now + + +def add_warc_direct_answer_parser( + config: Config, + provider_id: str | None, + url_pattern_regex: str | None, + priority: float | None, + parser_type: WarcDirectAnswerParserType, + xpath: str | None, + big_box_xpath: str | None, + small_box_xpath: str | None, + right_box_xpath: str | None, +) -> None: + if priority is not None and priority <= 0: + raise ValueError("Priority must be strictly positive.") + if parser_type == "xpath": + if xpath is None: + raise ValueError("No XPath given.") + else: + raise ValueError(f"Invalid parser type: {parser_type}") + parser_id_components = ( + provider_id if provider_id is not None else "", + url_pattern_regex if url_pattern_regex is not None else "", + str(priority) if priority is not None else "", + ) + parser_id = str(uuid5( + NAMESPACE_WARC_DIRECT_ANSWER_PARSER, + ":".join(parser_id_components), + )) + parser = WarcDirectAnswerParser( + id=parser_id, + last_modified=utc_now(), + provider=InnerProviderId(id=provider_id) if provider_id else None, + url_pattern_regex=url_pattern_regex, + priority=priority, + parser_type=parser_type, + xpath=xpath, + big_box_xpath=big_box_xpath, + small_box_xpath=small_box_xpath, + right_box_xpath=right_box_xpath, + ) + parser.save(using=config.es.client) + + +def _parse_warc_direct_answer( + parser: WarcDirectAnswerParser, + serp_id: str, + capture_url: str, + warc_store: WarcS3Store, + warc_location: WarcLocation, +) -> list[DirectAnswer] | None: + # Check if URL matches pattern. + if (parser.url_pattern is not None and + not parser.url_pattern.match(capture_url)): + return None + + # Parse direct answer. + if parser.parser_type == "xpath": + if parser.xpath is None: + raise ValueError("No XPath given.") + with open_warc(warc_store, warc_location) as record: + tree = parse_xml_tree(record) + if tree is None: + return None + + elements = safe_xpath(tree, parser.xpath, _Element) + if len(elements) == 0: + return None + + direct_answers = [] + element: _Element + for i, element in enumerate(elements): + big_box: str | None = None + if parser.big_box_xpath is not None: + big_boxs = safe_xpath(element, parser.big_box_xpath, str) + if len(big_boxs) > 0: + big_box = big_boxs[0].strip() + small_box: str | None = None + if parser.small_box_xpath is not None: + small_boxs = safe_xpath(element, parser.small_box_xpath, str) + if len(small_boxs) > 0: + small_box = small_boxs[0].strip() + right_box: str | None = None + if parser.right_box_xpath is not None: + right_boxs = safe_xpath(element, parser.right_box_xpath, str) + if len(right_boxs) > 0: + right_box = right_boxs[0].strip() + + content: str = tostring( + element, + encoding=str, + method="xml", + pretty_print=False, + with_tail=True, + ) + direct_answer_id_components = ( + serp_id, + parser.id, + str(hash(content)), + str(i), + ) + direct_answer_id = str(uuid5( + NAMESPACE_RESULT, + ":".join(direct_answer_id_components), + )) + direct_answers.append(DirectAnswer( + id=direct_answer_id, + rank=i, + content=content, + big_box=big_box, + small_box=small_box, + right_box=right_box, + )) + return direct_answers + else: + raise ValueError(f"Unknown parser type: {parser.parser_type}") + + +@cache +def _warc_direct_answer_parsers( + config: Config, + provider_id: str, +) -> list[WarcDirectAnswerParser]: + parsers: Iterable[WarcDirectAnswerParser] = ( + WarcDirectAnswerParser.search(using=config.es.client) + .filter( + ~Exists(field="provider.id") | + Term(provider__id=provider_id) + ) + .query(RankFeature(field="priority", saturation={})) + .scan() + ) + parsers = safe_iter_scan(parsers) + return list(parsers) + + +def _parse_serp_warc_direct_answer_action( + config: Config, + serp: Serp, +) -> Iterator[dict]: + # Re-check if it can be parsed. + if (serp.warc_location is None or + serp.warc_location.file is None or + serp.warc_location.offset is None or + serp.warc_location.length is None): + return + + # Re-check if parsing is necessary. + if (serp.warc_direct_answer_parser is not None and + serp.warc_direct_answer_parser.should_parse is not None and + not serp.warc_direct_answer_parser.should_parse): + return + + for parser in _warc_direct_answer_parsers(config, serp.provider.id): + # Try to parse the snippets. + warc_direct_answers = _parse_warc_direct_answer( + parser=parser, + serp_id=serp.id, + capture_url=serp.capture.url, + warc_store=config.s3.warc_store, + warc_location=serp.warc_location, + ) + if warc_direct_answers is None: + # Parsing was not successful, e.g., URL pattern did not match. + continue + for direct_answer in warc_direct_answers: + yield Result( + id=direct_answer.id, + last_modified=utc_now(), + archive=serp.archive, + provider=serp.provider, + capture=serp.capture, + serp=InnerSerp( + id=serp.id, + ).to_dict(), + direct_answer=direct_answer, + direct_answer_parser=InnerParser( + id=parser.id, + should_parse=False, + last_parsed=utc_now(), + ).to_dict(), + warc_before_serp_downloader=InnerDownloader( + should_download=True, + ).to_dict(), + warc_after_serp_downloader=InnerDownloader( + should_download=True, + ).to_dict(), + ).to_dict(include_meta=True) + yield update_action( + serp, + warc_direct_answers=[ + DirectAnswerId( + id=direct_answer.id, + rank=direct_answer.rank, + ) + for direct_answer in warc_direct_answers + ], + warc_direct_answers_parser=InnerParser( + id=parser.id, + should_parse=False, + last_parsed=utc_now(), + ), + ) + return + yield update_action( + serp, + warc_direct_answer_parser=InnerParser( + should_parse=False, + last_parsed=utc_now(), + ), + ) + return + + +def parse_serps_warc_direct_answer(config: Config) -> None: + Serp.index().refresh(using=config.es.client) + changed_serps_search: Search = ( + Serp.search(using=config.es.client) + .filter( + Exists(field="warc_location") & + ~Term(warc_direct_answer_parser__should_parse=False) + ) + .query( + RankFeature(field="archive.priority", saturation={}) | + RankFeature(field="provider.priority", saturation={}) | + FunctionScore(functions=[RandomScore()]) + ) + ) + num_changed_serps = changed_serps_search.count() + if num_changed_serps > 0: + changed_serps: Iterable[Serp] = ( + changed_serps_search + .params(preserve_order=True) + .scan() + ) + changed_serps = safe_iter_scan(changed_serps) + # noinspection PyTypeChecker + changed_serps = tqdm( + changed_serps, total=num_changed_serps, + desc="Parsing WARC direct answer", unit="SERP") + actions = chain.from_iterable( + _parse_serp_warc_direct_answer_action(config, serp) + for serp in changed_serps + ) + config.es.bulk(actions) + else: + echo("No new/changed SERPs.") \ No newline at end of file From 615d0f41a5ff82fcbb799186167735314e86e9dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julian=20Kl=C3=BCber?= Date: Sat, 23 Mar 2024 16:52:47 +0100 Subject: [PATCH 2/9] added direct answer --- archive_query_log/orm.py | 43 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/archive_query_log/orm.py b/archive_query_log/orm.py index 9dacd3e..24afe24 100644 --- a/archive_query_log/orm.py +++ b/archive_query_log/orm.py @@ -190,6 +190,18 @@ class Snippet(SnippetId): text: str | None = Text() +class DirectAnswerId(InnerDocument): + id: str = Keyword() + rank: int = Integer() + + +class DirectAnswer(DirectAnswerId): + content: str = Text() + big_box: str | None = Keyword() + small_box: str | None = Text() + right_box: str | None = Text() + + class Serp(BaseDocument): archive: InnerArchive = Object(InnerArchive) provider: InnerProvider = Object(InnerProvider) @@ -208,6 +220,8 @@ class Serp(BaseDocument): warc_query_parser: InnerParser | None = Object(InnerParser) warc_snippets: list[SnippetId] | None = Nested(SnippetId) warc_snippets_parser: InnerParser | None = Object(InnerParser) + warc_direct_answer: list[SnippetId] | None = Nested(SnippetId) + warc_direct_answer_parser: InnerParser | None = Object(InnerParser) # rendered_warc_location: WarcLocation | None = Object(WarcLocation) # rendered_warc_downloader: InnerDownloader | None = ( @@ -437,6 +451,35 @@ class Index: } +WarcDirectAnswerParserType = Literal[ + "xpath", +] + + +class WarcDirectAnswerParser(BaseDocument): + provider: InnerProviderId | None = Object(InnerProviderId) + url_pattern_regex: str | None = Keyword() + priority: float | None = RankFeature(positive_score_impact=True) + parser_type: WarcDirectAnswerParserType = Keyword() + xpath: str | None = Keyword() + url_xpath: str | None = Keyword() + title_xpath: str | None = Keyword() + text_xpath: str | None = Keyword() + + @cached_property + def url_pattern(self) -> Pattern | None: + if self.url_pattern_regex is None: + raise ValueError("No URL pattern regex.") + return pattern(self.url_pattern_regex) + + class Index: + name = "aql_warc_direct_answer_parsers" + settings = { + "number_of_shards": 1, + "number_of_replicas": 2, + } + + WarcMainContentParserType = Literal[ "resiliparse", ] From f3f11e893604e986672e39127f3a03641ad366de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julian=20Kl=C3=BCber?= Date: Tue, 2 Apr 2024 19:38:20 +0200 Subject: [PATCH 3/9] - removed rank - removed boxes and added url and text in DirectAnswer - changed xpath str to xpaths List[str] --- archive_query_log/orm.py | 13 +-- .../parsers/warc_direct_answers.py | 108 ++++++++---------- 2 files changed, 55 insertions(+), 66 deletions(-) diff --git a/archive_query_log/orm.py b/archive_query_log/orm.py index 24afe24..3f72887 100644 --- a/archive_query_log/orm.py +++ b/archive_query_log/orm.py @@ -1,7 +1,7 @@ from datetime import datetime from functools import cached_property from re import Pattern, compile as pattern -from typing import Literal +from typing import Literal, List from elasticsearch_dsl import Document, Keyword, Text, Date, RankFeature, \ InnerDoc as InnerDocument, Object, Index, Integer, Nested, Long, Boolean @@ -192,14 +192,12 @@ class Snippet(SnippetId): class DirectAnswerId(InnerDocument): id: str = Keyword() - rank: int = Integer() class DirectAnswer(DirectAnswerId): content: str = Text() - big_box: str | None = Keyword() - small_box: str | None = Text() - right_box: str | None = Text() + url: str | None = Keyword() + text: str | None = Text() class Serp(BaseDocument): @@ -220,7 +218,7 @@ class Serp(BaseDocument): warc_query_parser: InnerParser | None = Object(InnerParser) warc_snippets: list[SnippetId] | None = Nested(SnippetId) warc_snippets_parser: InnerParser | None = Object(InnerParser) - warc_direct_answer: list[SnippetId] | None = Nested(SnippetId) + warc_direct_answer: list[DirectAnswerId] | None = Nested(DirectAnswerId) warc_direct_answer_parser: InnerParser | None = Object(InnerParser) # rendered_warc_location: WarcLocation | None = Object(WarcLocation) @@ -461,9 +459,8 @@ class WarcDirectAnswerParser(BaseDocument): url_pattern_regex: str | None = Keyword() priority: float | None = RankFeature(positive_score_impact=True) parser_type: WarcDirectAnswerParserType = Keyword() - xpath: str | None = Keyword() + xpaths: List[str] | None = Keyword() url_xpath: str | None = Keyword() - title_xpath: str | None = Keyword() text_xpath: str | None = Keyword() @cached_property diff --git a/archive_query_log/parsers/warc_direct_answers.py b/archive_query_log/parsers/warc_direct_answers.py index b2aa2fa..41e899b 100644 --- a/archive_query_log/parsers/warc_direct_answers.py +++ b/archive_query_log/parsers/warc_direct_answers.py @@ -1,6 +1,6 @@ from functools import cache from itertools import chain -from typing import Iterable, Iterator +from typing import Iterable, Iterator, List from urllib.parse import urljoin from uuid import uuid5 @@ -31,15 +31,14 @@ def add_warc_direct_answer_parser( url_pattern_regex: str | None, priority: float | None, parser_type: WarcDirectAnswerParserType, - xpath: str | None, - big_box_xpath: str | None, - small_box_xpath: str | None, - right_box_xpath: str | None, + xpaths: List[str] | None, + url_xpath: str | None, + text_xpath: str | None, ) -> None: if priority is not None and priority <= 0: raise ValueError("Priority must be strictly positive.") if parser_type == "xpath": - if xpath is None: + if xpaths is None: raise ValueError("No XPath given.") else: raise ValueError(f"Invalid parser type: {parser_type}") @@ -59,10 +58,9 @@ def add_warc_direct_answer_parser( url_pattern_regex=url_pattern_regex, priority=priority, parser_type=parser_type, - xpath=xpath, - big_box_xpath=big_box_xpath, - small_box_xpath=small_box_xpath, - right_box_xpath=right_box_xpath, + xpaths=xpaths, + url_xpath=url_xpath, + text_xpath=text_xpath, ) parser.save(using=config.es.client) @@ -81,61 +79,56 @@ def _parse_warc_direct_answer( # Parse direct answer. if parser.parser_type == "xpath": - if parser.xpath is None: + if parser.xpaths is None: raise ValueError("No XPath given.") with open_warc(warc_store, warc_location) as record: tree = parse_xml_tree(record) if tree is None: return None - elements = safe_xpath(tree, parser.xpath, _Element) - if len(elements) == 0: - return None + for xpath in parser.xpaths: + elements = safe_xpath(tree, xpath, _Element) + if len(elements) == 0: + return None - direct_answers = [] - element: _Element - for i, element in enumerate(elements): - big_box: str | None = None - if parser.big_box_xpath is not None: - big_boxs = safe_xpath(element, parser.big_box_xpath, str) - if len(big_boxs) > 0: - big_box = big_boxs[0].strip() - small_box: str | None = None - if parser.small_box_xpath is not None: - small_boxs = safe_xpath(element, parser.small_box_xpath, str) - if len(small_boxs) > 0: - small_box = small_boxs[0].strip() - right_box: str | None = None - if parser.right_box_xpath is not None: - right_boxs = safe_xpath(element, parser.right_box_xpath, str) - if len(right_boxs) > 0: - right_box = right_boxs[0].strip() + direct_answers = [] + element: _Element + for i, element in enumerate(elements): + url: str | None = None + if parser.url_xpath is not None: + urls = safe_xpath(element, parser.url_xpath, str) + if len(urls) > 0: + url = urls[0].strip() + url = urljoin(capture_url, url) + text: str | None = None + if parser.text_xpath is not None: + texts = safe_xpath(element, parser.text_xpath, str) + if len(texts) > 0: + text = texts[0].strip() - content: str = tostring( - element, - encoding=str, - method="xml", - pretty_print=False, - with_tail=True, - ) - direct_answer_id_components = ( - serp_id, - parser.id, - str(hash(content)), - str(i), - ) - direct_answer_id = str(uuid5( - NAMESPACE_RESULT, - ":".join(direct_answer_id_components), - )) - direct_answers.append(DirectAnswer( - id=direct_answer_id, - rank=i, - content=content, - big_box=big_box, - small_box=small_box, - right_box=right_box, - )) + content: str = tostring( + element, + encoding=str, + method="xml", + pretty_print=False, + with_tail=True, + ) + direct_answer_id_components = ( + serp_id, + parser.id, + str(hash(content)), + str(i), + ) + direct_answer_id = str(uuid5( + NAMESPACE_RESULT, + ":".join(direct_answer_id_components), + )) + direct_answers.append(DirectAnswer( + id=direct_answer_id, + content=content, + url=url, + text=text, + )) return direct_answers else: raise ValueError(f"Unknown parser type: {parser.parser_type}") @@ -216,7 +209,6 @@ def _parse_serp_warc_direct_answer_action( warc_direct_answers=[ DirectAnswerId( id=direct_answer.id, - rank=direct_answer.rank, ) for direct_answer in warc_direct_answers ], From 45ee69d464da597c3ea2152e87dc553ebf70ac57 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julian=20Kl=C3=BCber?= Date: Thu, 4 Apr 2024 10:02:23 +0200 Subject: [PATCH 4/9] changed xpaths List[str] to xpath str --- archive_query_log/orm.py | 4 +- .../parsers/warc_direct_answers.py | 91 +++++++++---------- 2 files changed, 47 insertions(+), 48 deletions(-) diff --git a/archive_query_log/orm.py b/archive_query_log/orm.py index 3f72887..a93f45f 100644 --- a/archive_query_log/orm.py +++ b/archive_query_log/orm.py @@ -1,7 +1,7 @@ from datetime import datetime from functools import cached_property from re import Pattern, compile as pattern -from typing import Literal, List +from typing import Literal from elasticsearch_dsl import Document, Keyword, Text, Date, RankFeature, \ InnerDoc as InnerDocument, Object, Index, Integer, Nested, Long, Boolean @@ -459,7 +459,7 @@ class WarcDirectAnswerParser(BaseDocument): url_pattern_regex: str | None = Keyword() priority: float | None = RankFeature(positive_score_impact=True) parser_type: WarcDirectAnswerParserType = Keyword() - xpaths: List[str] | None = Keyword() + xpath: str | None = Keyword() url_xpath: str | None = Keyword() text_xpath: str | None = Keyword() diff --git a/archive_query_log/parsers/warc_direct_answers.py b/archive_query_log/parsers/warc_direct_answers.py index 41e899b..06088a8 100644 --- a/archive_query_log/parsers/warc_direct_answers.py +++ b/archive_query_log/parsers/warc_direct_answers.py @@ -1,6 +1,6 @@ from functools import cache from itertools import chain -from typing import Iterable, Iterator, List +from typing import Iterable, Iterator from urllib.parse import urljoin from uuid import uuid5 @@ -31,14 +31,14 @@ def add_warc_direct_answer_parser( url_pattern_regex: str | None, priority: float | None, parser_type: WarcDirectAnswerParserType, - xpaths: List[str] | None, + xpath: str | None, url_xpath: str | None, text_xpath: str | None, ) -> None: if priority is not None and priority <= 0: raise ValueError("Priority must be strictly positive.") if parser_type == "xpath": - if xpaths is None: + if xpath is None: raise ValueError("No XPath given.") else: raise ValueError(f"Invalid parser type: {parser_type}") @@ -58,7 +58,7 @@ def add_warc_direct_answer_parser( url_pattern_regex=url_pattern_regex, priority=priority, parser_type=parser_type, - xpaths=xpaths, + xpath=xpath, url_xpath=url_xpath, text_xpath=text_xpath, ) @@ -79,56 +79,55 @@ def _parse_warc_direct_answer( # Parse direct answer. if parser.parser_type == "xpath": - if parser.xpaths is None: + if parser.xpath is None: raise ValueError("No XPath given.") with open_warc(warc_store, warc_location) as record: tree = parse_xml_tree(record) if tree is None: return None - for xpath in parser.xpaths: - elements = safe_xpath(tree, xpath, _Element) - if len(elements) == 0: - return None + elements = safe_xpath(tree, parser.xpath, _Element) + if len(elements) == 0: + return None - direct_answers = [] - element: _Element - for i, element in enumerate(elements): - url: str | None = None - if parser.url_xpath is not None: - urls = safe_xpath(element, parser.url_xpath, str) - if len(urls) > 0: - url = urls[0].strip() - url = urljoin(capture_url, url) - text: str | None = None - if parser.text_xpath is not None: - texts = safe_xpath(element, parser.text_xpath, str) - if len(texts) > 0: - text = texts[0].strip() + direct_answers = [] + element: _Element + for i, element in enumerate(elements): + url: str | None = None + if parser.url_xpath is not None: + urls = safe_xpath(element, parser.url_xpath, str) + if len(urls) > 0: + url = urls[0].strip() + url = urljoin(capture_url, url) + text: str | None = None + if parser.text_xpath is not None: + texts = safe_xpath(element, parser.text_xpath, str) + if len(texts) > 0: + text = texts[0].strip() - content: str = tostring( - element, - encoding=str, - method="xml", - pretty_print=False, - with_tail=True, - ) - direct_answer_id_components = ( - serp_id, - parser.id, - str(hash(content)), - str(i), - ) - direct_answer_id = str(uuid5( - NAMESPACE_RESULT, - ":".join(direct_answer_id_components), - )) - direct_answers.append(DirectAnswer( - id=direct_answer_id, - content=content, - url=url, - text=text, - )) + content: str = tostring( + element, + encoding=str, + method="xml", + pretty_print=False, + with_tail=True, + ) + direct_answer_id_components = ( + serp_id, + parser.id, + str(hash(content)), + str(i), + ) + direct_answer_id = str(uuid5( + NAMESPACE_RESULT, + ":".join(direct_answer_id_components), + )) + direct_answers.append(DirectAnswer( + id=direct_answer_id, + content=content, + url=url, + text=text, + )) return direct_answers else: raise ValueError(f"Unknown parser type: {parser.parser_type}") From dabe86e9e89714addb77e0148291df0efc7badca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julian=20Kl=C3=BCber?= Date: Wed, 10 Apr 2024 15:42:31 +0200 Subject: [PATCH 5/9] changed direct_answer to plural --- archive_query_log/namespaces.py | 4 +- archive_query_log/orm.py | 12 ++--- .../parsers/warc_direct_answers.py | 50 +++++++++---------- 3 files changed, 33 insertions(+), 33 deletions(-) diff --git a/archive_query_log/namespaces.py b/archive_query_log/namespaces.py index 0614b41..b94d23f 100644 --- a/archive_query_log/namespaces.py +++ b/archive_query_log/namespaces.py @@ -15,6 +15,6 @@ NAMESPACE_AQL, "warc_snippets_parser") NAMESPACE_WARC_MAIN_CONTENT_PARSER = uuid5( NAMESPACE_AQL, "warc_main_content_parser") -NAMESPACE_WARC_DIRECT_ANSWER_PARSER = uuid5( - NAMESPACE_AQL, "warc_direct_answer_parser") +NAMESPACE_WARC_DIRECT_ANSWERS_PARSER = uuid5( + NAMESPACE_AQL, "warc_direct_answers_parser") NAMESPACE_WARC_DOWNLOADER = uuid5(NAMESPACE_AQL, "warc_downloader") diff --git a/archive_query_log/orm.py b/archive_query_log/orm.py index a93f45f..2c7b23a 100644 --- a/archive_query_log/orm.py +++ b/archive_query_log/orm.py @@ -218,8 +218,8 @@ class Serp(BaseDocument): warc_query_parser: InnerParser | None = Object(InnerParser) warc_snippets: list[SnippetId] | None = Nested(SnippetId) warc_snippets_parser: InnerParser | None = Object(InnerParser) - warc_direct_answer: list[DirectAnswerId] | None = Nested(DirectAnswerId) - warc_direct_answer_parser: InnerParser | None = Object(InnerParser) + warc_direct_answers: list[DirectAnswerId] | None = Nested(DirectAnswerId) + warc_direct_answers_parser: InnerParser | None = Object(InnerParser) # rendered_warc_location: WarcLocation | None = Object(WarcLocation) # rendered_warc_downloader: InnerDownloader | None = ( @@ -449,16 +449,16 @@ class Index: } -WarcDirectAnswerParserType = Literal[ +WarcDirectAnswersParserType = Literal[ "xpath", ] -class WarcDirectAnswerParser(BaseDocument): +class WarcDirectAnswersParser(BaseDocument): provider: InnerProviderId | None = Object(InnerProviderId) url_pattern_regex: str | None = Keyword() priority: float | None = RankFeature(positive_score_impact=True) - parser_type: WarcDirectAnswerParserType = Keyword() + parser_type: WarcDirectAnswersParserType = Keyword() xpath: str | None = Keyword() url_xpath: str | None = Keyword() text_xpath: str | None = Keyword() @@ -470,7 +470,7 @@ def url_pattern(self) -> Pattern | None: return pattern(self.url_pattern_regex) class Index: - name = "aql_warc_direct_answer_parsers" + name = "aql_warc_direct_answers_parsers" settings = { "number_of_shards": 1, "number_of_replicas": 2, diff --git a/archive_query_log/parsers/warc_direct_answers.py b/archive_query_log/parsers/warc_direct_answers.py index 06088a8..b50d995 100644 --- a/archive_query_log/parsers/warc_direct_answers.py +++ b/archive_query_log/parsers/warc_direct_answers.py @@ -14,10 +14,10 @@ from warc_s3 import WarcS3Store from archive_query_log.config import Config -from archive_query_log.namespaces import NAMESPACE_WARC_DIRECT_ANSWER_PARSER, \ +from archive_query_log.namespaces import NAMESPACE_WARC_DIRECT_ANSWERS_PARSER, \ NAMESPACE_RESULT from archive_query_log.orm import Serp, InnerParser, InnerProviderId, \ - WarcDirectAnswerParserType, WarcDirectAnswerParser, WarcLocation, DirectAnswer, \ + WarcDirectAnswersParserType, WarcDirectAnswersParser, WarcLocation, DirectAnswer, \ Result, InnerSerp, DirectAnswerId, InnerDownloader from archive_query_log.parsers.warc import open_warc from archive_query_log.parsers.xml import parse_xml_tree, safe_xpath @@ -25,12 +25,12 @@ from archive_query_log.utils.time import utc_now -def add_warc_direct_answer_parser( +def add_warc_direct_answers_parser( config: Config, provider_id: str | None, url_pattern_regex: str | None, priority: float | None, - parser_type: WarcDirectAnswerParserType, + parser_type: WarcDirectAnswersParserType, xpath: str | None, url_xpath: str | None, text_xpath: str | None, @@ -48,10 +48,10 @@ def add_warc_direct_answer_parser( str(priority) if priority is not None else "", ) parser_id = str(uuid5( - NAMESPACE_WARC_DIRECT_ANSWER_PARSER, + NAMESPACE_WARC_DIRECT_ANSWERS_PARSER, ":".join(parser_id_components), )) - parser = WarcDirectAnswerParser( + parser = WarcDirectAnswersParser( id=parser_id, last_modified=utc_now(), provider=InnerProviderId(id=provider_id) if provider_id else None, @@ -65,8 +65,8 @@ def add_warc_direct_answer_parser( parser.save(using=config.es.client) -def _parse_warc_direct_answer( - parser: WarcDirectAnswerParser, +def _parse_warc_direct_answers( + parser: WarcDirectAnswersParser, serp_id: str, capture_url: str, warc_store: WarcS3Store, @@ -77,7 +77,7 @@ def _parse_warc_direct_answer( not parser.url_pattern.match(capture_url)): return None - # Parse direct answer. + # Parse direct answers. if parser.parser_type == "xpath": if parser.xpath is None: raise ValueError("No XPath given.") @@ -134,12 +134,12 @@ def _parse_warc_direct_answer( @cache -def _warc_direct_answer_parsers( +def _warc_direct_answers_parsers( config: Config, provider_id: str, -) -> list[WarcDirectAnswerParser]: - parsers: Iterable[WarcDirectAnswerParser] = ( - WarcDirectAnswerParser.search(using=config.es.client) +) -> list[WarcDirectAnswersParser]: + parsers: Iterable[WarcDirectAnswersParser] = ( + WarcDirectAnswersParser.search(using=config.es.client) .filter( ~Exists(field="provider.id") | Term(provider__id=provider_id) @@ -151,7 +151,7 @@ def _warc_direct_answer_parsers( return list(parsers) -def _parse_serp_warc_direct_answer_action( +def _parse_serp_warc_direct_answers_action( config: Config, serp: Serp, ) -> Iterator[dict]: @@ -163,14 +163,14 @@ def _parse_serp_warc_direct_answer_action( return # Re-check if parsing is necessary. - if (serp.warc_direct_answer_parser is not None and - serp.warc_direct_answer_parser.should_parse is not None and - not serp.warc_direct_answer_parser.should_parse): + if (serp.warc_direct_answers_parser is not None and + serp.warc_direct_answers_parser.should_parse is not None and + not serp.warc_direct_answers_parser.should_parse): return - for parser in _warc_direct_answer_parsers(config, serp.provider.id): - # Try to parse the snippets. - warc_direct_answers = _parse_warc_direct_answer( + for parser in _warc_direct_answers_parsers(config, serp.provider.id): + # Try to parse the direct answers. + warc_direct_answers = _parse_warc_direct_answers( parser=parser, serp_id=serp.id, capture_url=serp.capture.url, @@ -220,7 +220,7 @@ def _parse_serp_warc_direct_answer_action( return yield update_action( serp, - warc_direct_answer_parser=InnerParser( + warc_direct_answers_parser=InnerParser( should_parse=False, last_parsed=utc_now(), ), @@ -228,13 +228,13 @@ def _parse_serp_warc_direct_answer_action( return -def parse_serps_warc_direct_answer(config: Config) -> None: +def parse_serps_warc_direct_answers(config: Config) -> None: Serp.index().refresh(using=config.es.client) changed_serps_search: Search = ( Serp.search(using=config.es.client) .filter( Exists(field="warc_location") & - ~Term(warc_direct_answer_parser__should_parse=False) + ~Term(warc_direct_answers_parser__should_parse=False) ) .query( RankFeature(field="archive.priority", saturation={}) | @@ -253,9 +253,9 @@ def parse_serps_warc_direct_answer(config: Config) -> None: # noinspection PyTypeChecker changed_serps = tqdm( changed_serps, total=num_changed_serps, - desc="Parsing WARC direct answer", unit="SERP") + desc="Parsing WARC direct answers", unit="SERP") actions = chain.from_iterable( - _parse_serp_warc_direct_answer_action(config, serp) + _parse_serp_warc_direct_answers_action(config, serp) for serp in changed_serps ) config.es.bulk(actions) From 2da7caaf5c75dd5b18289194a46f4e77ddb43a7d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julian=20Kl=C3=BCber?= Date: Wed, 10 Apr 2024 15:42:55 +0200 Subject: [PATCH 6/9] added direct answers --- archive_query_log/cli/parsers.py | 65 ++++++++++++++++++++++++++++ archive_query_log/imports/yaml.py | 72 +++++++++++++++++++++++++++++++ 2 files changed, 137 insertions(+) diff --git a/archive_query_log/cli/parsers.py b/archive_query_log/cli/parsers.py index c6e371e..8637eaf 100644 --- a/archive_query_log/cli/parsers.py +++ b/archive_query_log/cli/parsers.py @@ -9,6 +9,7 @@ UrlQueryParser, UrlPageParserType, UrlPageParser, \ UrlOffsetParser, UrlOffsetParserType, WarcQueryParserType, \ WarcQueryParser, WarcSnippetsParserType, WarcSnippetsParser, \ + WarcDirectAnswersParserType, WarcDirectAnswersParser, \ WarcMainContentParserType, WarcMainContentParser @@ -380,6 +381,70 @@ def warc_snippets_import(config: Config, services_path: Path) -> None: import_warc_snippets_parsers(config, services_path) +@parsers.group() +def warc_direct_answers() -> None: + pass + + +CHOICES_WARC_DIRECT_ANSWERS_PARSER_TYPE = [ + "xpath", +] + + +@warc_direct_answers.command("add") +@option("--provider-id", type=str) +@option("--url-pattern-regex", type=str) +@option("--priority", type=FloatRange(min=0, min_open=False)) +@option("--parser-type", + type=Choice(CHOICES_WARC_DIRECT_ANSWERS_PARSER_TYPE), required=True) +@option("--xpath", type=str) +@option("--url-xpath", type=str) +@option("--title-xpath", type=str) +@option("--text-xpath", type=str) +@pass_config +def warc_direct_answers_add( + config: Config, + provider_id: str | None, + url_pattern_regex: str | None, + parser_type: str, + xpath: str | None, + url_xpath: str | None, + text_xpath: str | None, +) -> None: + from archive_query_log.parsers.warc_snippets import \ + add_warc_direct_answers_parser + parser_type_strict: WarcDirectAnswersParserType + if parser_type == "xpath": + parser_type_strict = "xpath" + if xpath is None: + raise UsageError("No XPath given.") + else: + raise ValueError(f"Invalid parser type: {parser_type}") + WarcDirectAnswersParser.init(using=config.es.client) + add_warc_direct_answers_parser( + config=config, + provider_id=provider_id, + url_pattern_regex=url_pattern_regex, + parser_type=parser_type_strict, + xpath=xpath, + url_xpath=url_xpath, + text_xpath=text_xpath, + ) + + +@warc_direct_answers.command("import") +@option("-s", "--services-file", "services_path", + type=PathType(path_type=Path, exists=True, file_okay=True, + dir_okay=False, readable=True, resolve_path=True, + allow_dash=False), + default=Path("data") / "selected-services.yaml") +@pass_config +def warc_direct_answers_import(config: Config, services_path: Path) -> None: + from archive_query_log.imports.yaml import import_warc_direct_answers_parsers + WarcDirectAnswersParser.init(using=config.es.client) + import_warc_direct_answers_parsers(config, services_path) + + @parsers.group() def warc_main_content() -> None: pass diff --git a/archive_query_log/imports/yaml.py b/archive_query_log/imports/yaml.py index a3ffd0b..caceacc 100644 --- a/archive_query_log/imports/yaml.py +++ b/archive_query_log/imports/yaml.py @@ -19,6 +19,7 @@ from archive_query_log.parsers.url_query import add_url_query_parser from archive_query_log.parsers.warc_query import add_warc_query_parser from archive_query_log.parsers.warc_snippets import add_warc_snippets_parser +from archive_query_log.parsers.warc_direct_answers import add_warc_direct_answers_parser from archive_query_log.parsers.xml import xpaths_from_css_selector, \ text_xpath, merge_xpaths from archive_query_log.providers import add_provider @@ -479,3 +480,74 @@ def import_warc_snippets_parsers(config: Config, services_path: Path) -> None: title_xpath=title_xpath, text_xpath=snippet_xpath, ) + + +def import_warc_direct_answers_parsers(config: Config, services_path: Path) -> None: + echo("Load providers from services file.") + with services_path.open("r") as file: + services_list: Sequence[dict] = safe_load(file) + echo(f"Found {len(services_list)} service definitions.") + + services: Iterable[dict] = services_list + # noinspection PyTypeChecker + services = tqdm( + services, + desc="Import parsers for providers", + unit="provider", + ) + for service in services: + if ("domains" not in service or "results_parsers" not in service): + continue + + results_parsers = service["results_parsers"] + + providers = ( + Provider.search(using=config.es.client) + .query(Terms(domains=service["domains"])) + .scan() + ) + providers = safe_iter_scan(providers) + for provider in providers: + for results_parser in enumerate(results_parsers): + if results_parser["type"] != "html_selector": + continue + results_selector = results_parser["results_selector"] + url_selector = results_parser.get("url_selector") + direct_answer_selector = results_parser.get("direct_answer_selector") + + results_xpaths = xpaths_from_css_selector(results_selector) + results_xpaths = [ + "//" + result_xpath + for result_xpath in results_xpaths + ] + results_xpath = merge_xpaths(results_xpaths) + + if url_selector is not None: + url_xpaths = xpaths_from_css_selector(url_selector) + url_xpaths = [ + text_xpath(xpath, attribute="href") + for xpath in url_xpaths + ] + url_xpath = merge_xpaths(url_xpaths) + else: + url_xpath = None + + if direct_answer_selector is not None: + direct_answer_xpaths = xpaths_from_css_selector(direct_answer_selector) + direct_answer_xpaths = [ + text_xpath(xpath, text=True) + for xpath in direct_answer_xpaths + ] + direct_answer_xpath = merge_xpaths(direct_answer_xpaths) + else: + direct_answer_xpath = None + + add_warc_direct_answers_parser( + config=config, + provider_id=provider.meta.id, + url_pattern_regex=results_parser.get("url_pattern"), + parser_type="xpath", + xpath=results_xpath, + url_xpath=url_xpath, + text_xpath=direct_answer_xpath, + ) From a7af589af3ebe28fdfe6f7561be06b2f05e3dfa8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julian=20Kl=C3=BCber?= Date: Wed, 10 Apr 2024 15:58:24 +0200 Subject: [PATCH 7/9] dubugged import --- archive_query_log/cli/parsers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/archive_query_log/cli/parsers.py b/archive_query_log/cli/parsers.py index 8637eaf..ca60342 100644 --- a/archive_query_log/cli/parsers.py +++ b/archive_query_log/cli/parsers.py @@ -411,7 +411,7 @@ def warc_direct_answers_add( url_xpath: str | None, text_xpath: str | None, ) -> None: - from archive_query_log.parsers.warc_snippets import \ + from archive_query_log.parsers.warc_direct_answers import \ add_warc_direct_answers_parser parser_type_strict: WarcDirectAnswersParserType if parser_type == "xpath": From 9e628373c4b2f11d502a8210a6a11e95026d9118 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julian=20Kl=C3=BCber?= Date: Thu, 11 Apr 2024 13:11:54 +0200 Subject: [PATCH 8/9] added priority --- archive_query_log/cli/parsers.py | 3 ++- archive_query_log/imports/yaml.py | 4 +++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/archive_query_log/cli/parsers.py b/archive_query_log/cli/parsers.py index ca60342..9b6d8fb 100644 --- a/archive_query_log/cli/parsers.py +++ b/archive_query_log/cli/parsers.py @@ -399,13 +399,13 @@ def warc_direct_answers() -> None: type=Choice(CHOICES_WARC_DIRECT_ANSWERS_PARSER_TYPE), required=True) @option("--xpath", type=str) @option("--url-xpath", type=str) -@option("--title-xpath", type=str) @option("--text-xpath", type=str) @pass_config def warc_direct_answers_add( config: Config, provider_id: str | None, url_pattern_regex: str | None, + priority: float | None, parser_type: str, xpath: str | None, url_xpath: str | None, @@ -425,6 +425,7 @@ def warc_direct_answers_add( config=config, provider_id=provider_id, url_pattern_regex=url_pattern_regex, + priority=priority, parser_type=parser_type_strict, xpath=xpath, url_xpath=url_xpath, diff --git a/archive_query_log/imports/yaml.py b/archive_query_log/imports/yaml.py index caceacc..da4d9c6 100644 --- a/archive_query_log/imports/yaml.py +++ b/archive_query_log/imports/yaml.py @@ -500,6 +500,7 @@ def import_warc_direct_answers_parsers(config: Config, services_path: Path) -> N continue results_parsers = service["results_parsers"] + num_results_parsers = len(results_parsers) providers = ( Provider.search(using=config.es.client) @@ -508,7 +509,7 @@ def import_warc_direct_answers_parsers(config: Config, services_path: Path) -> N ) providers = safe_iter_scan(providers) for provider in providers: - for results_parser in enumerate(results_parsers): + for k, results_parser in enumerate(results_parsers): if results_parser["type"] != "html_selector": continue results_selector = results_parser["results_selector"] @@ -546,6 +547,7 @@ def import_warc_direct_answers_parsers(config: Config, services_path: Path) -> N config=config, provider_id=provider.meta.id, url_pattern_regex=results_parser.get("url_pattern"), + priority=num_results_parsers - k, parser_type="xpath", xpath=results_xpath, url_xpath=url_xpath, From 14ddc9a517fd47059cfc1f75eda664fa4eec16de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julian=20Kl=C3=BCber?= Date: Mon, 15 Apr 2024 10:33:36 +0200 Subject: [PATCH 9/9] removed "import_warc_direct_answers_parsers" from CLI --- archive_query_log/cli/parsers.py | 5 --- archive_query_log/imports/yaml.py | 74 ------------------------------- 2 files changed, 79 deletions(-) diff --git a/archive_query_log/cli/parsers.py b/archive_query_log/cli/parsers.py index 9b6d8fb..19923a0 100644 --- a/archive_query_log/cli/parsers.py +++ b/archive_query_log/cli/parsers.py @@ -439,11 +439,6 @@ def warc_direct_answers_add( dir_okay=False, readable=True, resolve_path=True, allow_dash=False), default=Path("data") / "selected-services.yaml") -@pass_config -def warc_direct_answers_import(config: Config, services_path: Path) -> None: - from archive_query_log.imports.yaml import import_warc_direct_answers_parsers - WarcDirectAnswersParser.init(using=config.es.client) - import_warc_direct_answers_parsers(config, services_path) @parsers.group() diff --git a/archive_query_log/imports/yaml.py b/archive_query_log/imports/yaml.py index da4d9c6..a3ffd0b 100644 --- a/archive_query_log/imports/yaml.py +++ b/archive_query_log/imports/yaml.py @@ -19,7 +19,6 @@ from archive_query_log.parsers.url_query import add_url_query_parser from archive_query_log.parsers.warc_query import add_warc_query_parser from archive_query_log.parsers.warc_snippets import add_warc_snippets_parser -from archive_query_log.parsers.warc_direct_answers import add_warc_direct_answers_parser from archive_query_log.parsers.xml import xpaths_from_css_selector, \ text_xpath, merge_xpaths from archive_query_log.providers import add_provider @@ -480,76 +479,3 @@ def import_warc_snippets_parsers(config: Config, services_path: Path) -> None: title_xpath=title_xpath, text_xpath=snippet_xpath, ) - - -def import_warc_direct_answers_parsers(config: Config, services_path: Path) -> None: - echo("Load providers from services file.") - with services_path.open("r") as file: - services_list: Sequence[dict] = safe_load(file) - echo(f"Found {len(services_list)} service definitions.") - - services: Iterable[dict] = services_list - # noinspection PyTypeChecker - services = tqdm( - services, - desc="Import parsers for providers", - unit="provider", - ) - for service in services: - if ("domains" not in service or "results_parsers" not in service): - continue - - results_parsers = service["results_parsers"] - num_results_parsers = len(results_parsers) - - providers = ( - Provider.search(using=config.es.client) - .query(Terms(domains=service["domains"])) - .scan() - ) - providers = safe_iter_scan(providers) - for provider in providers: - for k, results_parser in enumerate(results_parsers): - if results_parser["type"] != "html_selector": - continue - results_selector = results_parser["results_selector"] - url_selector = results_parser.get("url_selector") - direct_answer_selector = results_parser.get("direct_answer_selector") - - results_xpaths = xpaths_from_css_selector(results_selector) - results_xpaths = [ - "//" + result_xpath - for result_xpath in results_xpaths - ] - results_xpath = merge_xpaths(results_xpaths) - - if url_selector is not None: - url_xpaths = xpaths_from_css_selector(url_selector) - url_xpaths = [ - text_xpath(xpath, attribute="href") - for xpath in url_xpaths - ] - url_xpath = merge_xpaths(url_xpaths) - else: - url_xpath = None - - if direct_answer_selector is not None: - direct_answer_xpaths = xpaths_from_css_selector(direct_answer_selector) - direct_answer_xpaths = [ - text_xpath(xpath, text=True) - for xpath in direct_answer_xpaths - ] - direct_answer_xpath = merge_xpaths(direct_answer_xpaths) - else: - direct_answer_xpath = None - - add_warc_direct_answers_parser( - config=config, - provider_id=provider.meta.id, - url_pattern_regex=results_parser.get("url_pattern"), - priority=num_results_parsers - k, - parser_type="xpath", - xpath=results_xpath, - url_xpath=url_xpath, - text_xpath=direct_answer_xpath, - )