From f480304bacd3abb504746d0790ae0e28836f6186 Mon Sep 17 00:00:00 2001 From: wild Date: Wed, 18 Sep 2024 16:26:53 +0300 Subject: [PATCH 1/2] adds opensearch-py requirements --- requirements.txt | 1 + setup.py | 1 + 2 files changed, 2 insertions(+) diff --git a/requirements.txt b/requirements.txt index 5fad2218..59909902 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,6 +5,7 @@ PyMySQL>=0.7.9 psycopg2-binary>=2.8.4 hjson>=1.5.8 elasticsearch>=2.4 +opensearch-py>=2.7 beautifulsoup4>=4.3.2 readability-lxml>=0.6.2 langdetect>=1.0.7 diff --git a/setup.py b/setup.py index be794f8d..aeb48730 100644 --- a/setup.py +++ b/setup.py @@ -38,6 +38,7 @@ "psycopg2-binary>=2.8.4", "hjson>=1.5.8", "elasticsearch>=2.4", + "opensearch-py>=2.7", "beautifulsoup4>=4.3.2", "readability-lxml>=0.6.2", "langdetect>=1.0.7", From 7ca63d39a6acd982dc4c7c80915abeb22bc23a99 Mon Sep 17 00:00:00 2001 From: wild Date: Fri, 20 Sep 2024 14:32:21 +0300 Subject: [PATCH 2/2] adds support export to OpenSearch --- README.md | 28 ++++++++++ newsplease/__main__.py | 54 ++++++++++++++++++- newsplease/config/config.cfg | 40 ++++++++++++++ newsplease/config/config_lib.cfg | 40 ++++++++++++++ newsplease/pipeline/pipelines.py | 89 ++++++++++++++++++++++++++++++++ 5 files changed, 249 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 3d12ef49..1d91d944 100644 --- a/README.md +++ b/README.md @@ -139,6 +139,34 @@ That's it! Except, if your Elasticsearch database is not located at `http://loca username = 'root' secret = 'password' +### OpenSearch + +news-please supports export to OpenSearch. OpenSearch is the same as ElasticSearch with minor differences in the client implementation, so has same features that ElasticSearch + +Example pipline settings + [Scrapy] + + ITEM_PIPELINES = { + 'newsplease.pipeline.pipelines.ArticleMasterExtractor':100, + 'newsplease.pipeline.pipelines.OpensearchStorage':350 + } + +if your Opensearch database is not located at `http://localhost:9200`, uses a different username/password or CA-certificate authentication. In these cases, you will also need to change the following. + + [Opensearch] + + host = localhost + port = 9200 + # means that requesth should be send through https + use_ssl = True + + ... + + # Credentials used for authentication + username = 'root' + secret = 'password' + + ### PostgreSQL news-please allows for storing of articles to a PostgreSQL database, including the versioning feature. To export to PostgreSQL, open the corresponding config file (`config_lib.cfg` for library mode and `config.cfg` for CLI mode) and add the PostgresqlStorage module to the pipeline and adjust the database credentials: diff --git a/newsplease/__main__.py b/newsplease/__main__.py index 9723d4d3..82f28d59 100644 --- a/newsplease/__main__.py +++ b/newsplease/__main__.py @@ -12,6 +12,7 @@ import psycopg2 import pymysql from elasticsearch import Elasticsearch +from opensearchpy import OpenSearch from scrapy.utils.log import configure_logging from newsplease.pipeline.pipelines import RedisStorageClient @@ -55,6 +56,7 @@ class NewsPleaseLauncher(object): mysql = None postgresql = None elasticsearch = None + opensearch = None redis = None number_of_active_crawlers = 0 config_directory_default_path = "~/news-please-repo/config/" @@ -63,13 +65,14 @@ class NewsPleaseLauncher(object): __single_crawler = False - def __init__(self, cfg_directory_path, is_resume, is_reset_elasticsearch, + def __init__(self, cfg_directory_path, is_resume, is_reset_elasticsearch, is_reset_opensearch, is_reset_json, is_reset_mysql, is_reset_postgresql, is_reset_redis, is_no_confirm, library_mode=False): """ The constructor of the main class, thus the real entry point to the tool. :param cfg_file_path: :param is_resume: :param is_reset_elasticsearch: + :param is_reset_opensearch: :param is_reset_json: :param is_reset_mysql: :param is_reset_postgresql: @@ -113,6 +116,7 @@ def __init__(self, cfg_directory_path, is_resume, is_reset_elasticsearch, self.mysql = self.cfg.section("MySQL") self.postgresql = self.cfg.section("Postgresql") self.elasticsearch = self.cfg.section("Elasticsearch") + self.opensearch = self.cfg.section("Opensearch") self.redis = self.cfg.section("Redis") # perform reset if given as parameter @@ -124,10 +128,13 @@ def __init__(self, cfg_directory_path, is_resume, is_reset_elasticsearch, self.reset_files() if is_reset_elasticsearch: self.reset_elasticsearch() + if is_reset_opensearch: + self.reset_opensearch() if is_reset_redis: self.reset_redis() + # close the process - if is_reset_elasticsearch or is_reset_json or is_reset_mysql or is_reset_postgresql or is_reset_redis: + if is_reset_elasticsearch or is_reset_opensearch or is_reset_json or is_reset_mysql or is_reset_postgresql or is_reset_redis: sys.exit(0) self.json_file_path = self.cfg_directory_path + self.cfg.section('Files')['url_input_file_name'] @@ -497,6 +504,45 @@ def reset_elasticsearch(self): self.log.error("Failed to connect to Elasticsearch. " "Please check if the database is running and the config is correct: %s" % error) + def reset_opensearch(self): + """ + Resets the Opensearch Database. + """ + + print(""" +Cleanup OpenSearch database: + This will truncate all tables and reset the whole OpenSearch database. + """) + + confirm = self.no_confirm + + if not confirm: + confirm = 'yes' in builtins.input( + """ +Do you really want to do this? Write 'yes' to confirm: {yes}""" + .format(yes='yes' if confirm else '')) + + if not confirm: + print("Did not type yes. Thus aborting.") + return + + try: + # initialize DB connection + conn = OpenSearch( + hosts=[{"host": self.opensearch["host"], "port": self.opensearch["port"]}], + http_compress=True, + http_auth=(str(self.opensearch["username"]), str(self.opensearch["secret"])), + use_ssl=self.opensearch["use_ssl"] + ) + + print("Resetting OpensSearch database...") + conn.indices.delete(index=self.opensearch["index_current"], ignore=[400, 404]) + conn.indices.delete(index=self.opensearch["index_archive"], ignore=[400, 404]) + except ConnectionError as error: + self.log.error("Failed to connect to OpenSearch. " + "Please check if the database is running and the config is correct: %s" % error) + + def reset_files(self): """ Resets the local data directory. @@ -709,6 +755,7 @@ def stop(self): cfg_file_path=plac.Annotation('path to the config file', 'option', 'c'), resume=plac.Annotation('resume crawling from last process', 'flag'), reset_elasticsearch=plac.Annotation('reset Elasticsearch indexes', 'flag'), + reset_opensearch=plac.Annotation('reset Opensearch indexes', 'flag'), reset_json=plac.Annotation('reset JSON files', 'flag'), reset_mysql=plac.Annotation('reset MySQL database', 'flag'), reset_postgresql=plac.Annotation('reset Postgresql database', 'flag'), @@ -720,6 +767,7 @@ def cli( cfg_file_path, resume, reset_elasticsearch, + reset_opensearch, reset_json, reset_mysql, reset_postgresql, @@ -731,6 +779,7 @@ def cli( if reset_all: reset_elasticsearch = True + reset_opensearch = True reset_json = True reset_mysql = True reset_postgresql = True @@ -743,6 +792,7 @@ def cli( cfg_file_path, resume, reset_elasticsearch, + reset_opensearch, reset_json, reset_mysql, reset_postgresql, diff --git a/newsplease/config/config.cfg b/newsplease/config/config.cfg index 95d30134..d8aec370 100644 --- a/newsplease/config/config.cfg +++ b/newsplease/config/config.cfg @@ -264,6 +264,46 @@ mapping = {"properties": { }} +[Opensearch] + +# Elasticsearch-Connection required for saving detailed meta-information +host = 'localhost' +port = 9200 +use_ssl = True +index_current = 'news-please' +index_archive = 'news-please-archive' + +# Opensearch supports user authentication by CA certificates, but not implemented yet +# use_ca_certificates = False +# ca_cert_path = /path/to/cacert.pem +# client_cert_path = /path/to/client_cert.pem +# client_key_path = /path/to/client_key.pem +username = 'root' +secret = 'password' + +# Properties of the document type used for storage. +mapping = {"properties": { + "url": {"type": "text","fields":{"keyword":{"type":"keyword"}}}, + "source_domain": {"type": "text","fields":{"keyword":{"type":"keyword"}}}, + "title_page": {"type": "text","fields":{"keyword":{"type":"keyword"}}}, + "title_rss": {"type": "text","fields":{"keyword":{"type":"keyword"}}}, + "localpath": {"type": "text","fields":{"keyword":{"type":"keyword"}}}, + "filename": {"type": "keyword"}, + "ancestor": {"type": "keyword"}, + "descendant": {"type": "keyword"}, + "version": {"type": "long"}, + "date_download": {"type": "date", "format":"yyyy-MM-dd HH:mm:ss"}, + "date_modify": {"type": "date", "format":"yyyy-MM-dd HH:mm:ss"}, + "date_publish": {"type": "date", "format":"yyyy-MM-dd HH:mm:ss"}, + "title": {"type": "text","fields":{"keyword":{"type":"keyword"}}}, + "description": {"type": "text","fields":{"keyword":{"type":"keyword"}}}, + "text": {"type": "text"}, + "authors": {"type": "text","fields":{"keyword":{"type":"keyword"}}}, + "image_url": {"type": "text","fields":{"keyword":{"type":"keyword"}}}, + "language": {"type": "keyword"} + }} + + [Redis] # Redis required for saving meta-information diff --git a/newsplease/config/config_lib.cfg b/newsplease/config/config_lib.cfg index 6260fb23..022b48b8 100644 --- a/newsplease/config/config_lib.cfg +++ b/newsplease/config/config_lib.cfg @@ -251,6 +251,46 @@ mapping = { } +[Opensearch] + +# Elasticsearch-Connection required for saving detailed meta-information +host = 'localhost' +port = 9200 +use_ssl = True +index_current = 'news-please' +index_archive = 'news-please-archive' + +# Opensearch supports user authentication by CA certificates, but not implemented yet +# use_ca_certificates = False +# ca_cert_path = /path/to/cacert.pem +# client_cert_path = /path/to/client_cert.pem +# client_key_path = /path/to/client_key.pem +username = 'root' +secret = 'password' + +# Properties of the document type used for storage. +mapping = {"properties": { + "url": {"type": "text","fields":{"keyword":{"type":"keyword"}}}, + "source_domain": {"type": "text","fields":{"keyword":{"type":"keyword"}}}, + "title_page": {"type": "text","fields":{"keyword":{"type":"keyword"}}}, + "title_rss": {"type": "text","fields":{"keyword":{"type":"keyword"}}}, + "localpath": {"type": "text","fields":{"keyword":{"type":"keyword"}}}, + "filename": {"type": "keyword"}, + "ancestor": {"type": "keyword"}, + "descendant": {"type": "keyword"}, + "version": {"type": "long"}, + "date_download": {"type": "date", "format":"yyyy-MM-dd HH:mm:ss"}, + "date_modify": {"type": "date", "format":"yyyy-MM-dd HH:mm:ss"}, + "date_publish": {"type": "date", "format":"yyyy-MM-dd HH:mm:ss"}, + "title": {"type": "text","fields":{"keyword":{"type":"keyword"}}}, + "description": {"type": "text","fields":{"keyword":{"type":"keyword"}}}, + "text": {"type": "text"}, + "authors": {"type": "text","fields":{"keyword":{"type":"keyword"}}}, + "image_url": {"type": "text","fields":{"keyword":{"type":"keyword"}}}, + "language": {"type": "keyword"} + }} + + [Redis] # Redis required for saving meta-information diff --git a/newsplease/pipeline/pipelines.py b/newsplease/pipeline/pipelines.py index db06428b..94b45bfe 100644 --- a/newsplease/pipeline/pipelines.py +++ b/newsplease/pipeline/pipelines.py @@ -22,6 +22,7 @@ import psycopg2 from dateutil import parser as dateparser from elasticsearch import Elasticsearch +from opensearchpy import OpenSearch from scrapy.exceptions import DropItem from redis import StrictRedis @@ -664,6 +665,94 @@ def process_item(self, item, spider): self.log.error("Lost connection to Elasticsearch, this module will be deactivated: %s" % error) return item +class OpensearchStorage(ExtractedInformationStorage): + """ + Handles remote storage of the meta data in Elasticsearch or Opensearch + """ + + log = None + cfg = None + es = None + index_current = None + index_archive = None + mapping = None + running = False + + def __init__(self): + self.log = logging.getLogger('opensearch.trace') + self.log.addHandler(logging.NullHandler()) + self.cfg = CrawlerConfig.get_instance() + self.database = self.cfg.section("Opensearch") + + self.conn = OpenSearch( + hosts=[{"host": self.database["host"], "port": self.database["port"]}], + http_compress = True, + http_auth=(str(self.database["username"]), str(self.database["secret"])), + use_ssl = self.database["use_ssl"] + ) + self.index_current = self.database["index_current"] + self.index_archive = self.database["index_archive"] + self.mapping = self.database["mapping"] + + # check connection to Database and set the configuration + + try: + # check if server is available + self.conn.ping() + + # raise logging level due to indices.exists() habit of logging a warning if an index doesn't exist. + os_log = logging.getLogger('elasticsearch') + os_level = os_log.getEffectiveLevel() + os_log.setLevel('ERROR') + + # check if the necessary indices exist and create them if needed + if not self.conn.indices.exists(index=self.index_current): + self.conn.indices.create(index=self.index_current, ignore=[400, 404]) + self.conn.indices.put_mapping(index=self.index_current, body=self.mapping) + if not self.conn.indices.exists(self.index_archive): + self.conn.indices.create(index=self.index_archive, ignore=[400, 404]) + self.conn.indices.put_mapping(index=self.index_archive, body=self.mapping) + self.running = True + + # restore previous logging level + os_log.setLevel(os_level) + + except ConnectionError as error: + self.running = False + self.log.error("Failed to connect to Opensearch, this module will be deactivated. " + "Please check if the database is running and the config is correct: %s" % error) + + def process_item(self, item, spider): + + if self.running: + try: + version = 1 + ancestor = None + + # search for previous version + request = self.conn.search(index=self.index_current, body={'query': {'match': {'url.keyword': item['url']}}}) + if request['hits']['total']['value'] > 0: + # save old version into index_archive + old_version = request['hits']['hits'][0] + old_version['_source']['descendent'] = True + self.conn.index(index=self.index_archive, body=old_version['_source']) + version += 1 + ancestor = old_version['_id'] + + # save new version into old id of index_current + self.log.info("Saving to Opensearch: %s" % item['url']) + extracted_info = ExtractedInformationStorage.extract_relevant_info(item) + extracted_info['ancestor'] = ancestor + extracted_info['version'] = version + self.conn.index(index=self.index_current, id=ancestor, + body=extracted_info) + + + except ConnectionError as error: + self.running = False + self.log.error("Lost connection to Opensearch, this module will be deactivated: %s" % error) + return item + class DateFilter(object): """