diff --git a/doajtest/unit/test_task_datalog_journal_added_update.py b/doajtest/unit/test_task_datalog_journal_added_update.py index c887e8232..7cada8cc5 100644 --- a/doajtest/unit/test_task_datalog_journal_added_update.py +++ b/doajtest/unit/test_task_datalog_journal_added_update.py @@ -1,5 +1,6 @@ import time import unittest +from datetime import datetime from typing import List from unittest.mock import MagicMock from unittest.mock import patch @@ -7,7 +8,7 @@ from doajtest.fixtures import JournalFixtureFactory from doajtest.helpers import DoajTestCase from portality.lib import dates -from portality.models import Journal +from portality.models import Journal, datalog_journal_added from portality.models.datalog_journal_added import DatalogJournalAdded from portality.tasks import datalog_journal_added_update from portality.tasks.datalog_journal_added_update import DatalogJournalAddedUpdate, to_display_data, \ @@ -39,6 +40,16 @@ class TestDatalogJournalAddedUpdate(DoajTestCase): + def test_sync_datalog_journal_added(self): + journals = create_test_journals(4) + save_and_block_last(journals[-1:]) + datalog_journal_added_update.sync_datalog_journal_added() + assert DatalogJournalAdded.count() == 1 + + save_and_block_last(journals[:-1]) + datalog_journal_added_update.sync_datalog_journal_added() + assert DatalogJournalAdded.count() == 4 + def test_execute__normal(self): """ test background job execute @@ -50,7 +61,7 @@ def test_execute__normal(self): save_test_datalog() - journals = save_test_journals(3) + journals = save_test_journals(4) worksheet = MagicMock() worksheet.get_all_values.return_value = [ @@ -64,10 +75,10 @@ def test_execute__normal(self): .open.return_value .worksheet.return_value) = worksheet - background_task = background_helper.execute_by_bg_task_type(DatalogJournalAddedUpdate, - filename=input_filename, - worksheet_name=input_worksheet_name, - google_key_path=input_google_key_path) + background_helper.execute_by_bg_task_type(DatalogJournalAddedUpdate, + filename=input_filename, + worksheet_name=input_worksheet_name, + google_key_path=input_google_key_path) worksheet.get_all_values.assert_called() new_rows_added_to_excels, row_idx, *_ = worksheet.insert_rows.call_args.args @@ -128,17 +139,26 @@ def test_latest_row_index(self): def test_find_new_datalog_journals(self): save_test_journals(3) - def _find_new_datalog_journals(latest_date_str): + def _count_new_datalog_journals(latest_date_str): datalog_list = datalog_journal_added_update.find_new_datalog_journals( dates.parse(latest_date_str) ) datalog_list = list(datalog_list) return len(datalog_list) - assert _find_new_datalog_journals('2101-01-01') == 2 - assert _find_new_datalog_journals('2102-01-01') == 1 - assert _find_new_datalog_journals('2103-01-01') == 0 - assert _find_new_datalog_journals('2104-01-01') == 0 + assert _count_new_datalog_journals('2101-01-01') == 3 + assert _count_new_datalog_journals('2101-01-01T22:22:22Z') == 2 + assert _count_new_datalog_journals('2102-01-01') == 2 + assert _count_new_datalog_journals('2103-01-01') == 1 + assert _count_new_datalog_journals('2104-01-01') == 0 + + def test_is_datalog_exist(self): + save_test_datalog() + # save_all_block_last(testdata_datalog_list) + + assert datalog_journal_added.is_issn_exists('1234-3000', '2021-01-01') + assert datalog_journal_added.is_issn_exists('1234-1000', datetime(2020, 1, 1)) + assert not datalog_journal_added.is_issn_exists('9999-9999', datetime(2021, 1, 1)) def save_test_datalog(): @@ -147,17 +167,29 @@ def save_test_datalog(): time.sleep(2) + DatalogJournalAdded.refresh() -def save_test_journals(n_journals: int) -> List[Journal]: + +def create_test_journals(n_journals): journals = JournalFixtureFactory.make_many_journal_sources(count=n_journals, in_doaj=True) journals = map(lambda d: Journal(**d), journals) journals = list(journals) assert len(journals) == n_journals - journals[0]['created_date'] = '2103-01-01' - journals[1]['created_date'] = '2102-01-01' - journals[2]['created_date'] = '2101-01-01' - save_and_block_last(journals) + test_dates = [ + '2103-01-01T03:00:00Z', + '2102-01-01T02:00:00Z', + '2101-01-01T22:22:22Z', + '2101-01-01T01:00:00Z', + ] + for i, j in enumerate(journals): + if i < len(test_dates): + j['created_date'] = test_dates[i] + return journals + +def save_test_journals(n_journals: int) -> List[Journal]: + journals = create_test_journals(n_journals) + save_and_block_last(journals) return journals diff --git a/portality/dao.py b/portality/dao.py index 176a74148..948f136f2 100644 --- a/portality/dao.py +++ b/portality/dao.py @@ -1,8 +1,8 @@ from __future__ import annotations import json -import re import os +import re import sys import time import urllib.parse @@ -1087,6 +1087,14 @@ def _yield_index_alias(): return index_aliases +def is_exist(query: dict, index): + query['size'] = 1 + query['_source'] = False + res = ES.search(body=query, index=index, size=1, ignore=[404]) + + return res.get('hits', {}).get('total',{}).get('value', 0) > 0 + + class BlockTimeOutException(Exception): pass diff --git a/portality/migrate/4031_journals_added_sheet_duplicated/README.md b/portality/migrate/4031_journals_added_sheet_duplicated/README.md new file mode 100644 index 000000000..f65290ad6 --- /dev/null +++ b/portality/migrate/4031_journals_added_sheet_duplicated/README.md @@ -0,0 +1,10 @@ +# 2024-12-25; Issue 4031 - Journals added sheet duplicated + +## Execution + +Run the script to remove all records in DatalogJournalAdded + + python portality/migrate/4031_journals_added_sheet_duplicated/cleanup_4031_journals_added_sheet_duplicated.py + + +* After running the script, manually **BACKUP** and remove all record in google sheet \ No newline at end of file diff --git a/portality/migrate/4031_journals_added_sheet_duplicated/__init__.py b/portality/migrate/4031_journals_added_sheet_duplicated/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/portality/migrate/4031_journals_added_sheet_duplicated/cleanup_4031_journals_added_sheet_duplicated.py b/portality/migrate/4031_journals_added_sheet_duplicated/cleanup_4031_journals_added_sheet_duplicated.py new file mode 100644 index 000000000..633e882a3 --- /dev/null +++ b/portality/migrate/4031_journals_added_sheet_duplicated/cleanup_4031_journals_added_sheet_duplicated.py @@ -0,0 +1,15 @@ +from portality.models.datalog_journal_added import DatalogJournalAdded + + +def main(): + # remove all records + query = { + "query": { + "match_all": {} + }, + } + DatalogJournalAdded.delete_by_query(query) + + +if __name__ == '__main__': + main() diff --git a/portality/models/datalog_journal_added.py b/portality/models/datalog_journal_added.py index 51bb22d42..6f1971f40 100644 --- a/portality/models/datalog_journal_added.py +++ b/portality/models/datalog_journal_added.py @@ -1,4 +1,9 @@ -from portality.dao import DomainObject +from __future__ import annotations + +import elasticsearch + +from portality import dao +from portality.dao import DomainObject, ScrollInitialiseException from portality.lib import coerce from portality.lib.coerce import COERCE_MAP from portality.lib.seamless import SeamlessMixin @@ -77,3 +82,61 @@ def journal_id(self): @journal_id.setter def journal_id(self, val): self.__seamless__.set_single('journal_id', val) + + +class LastDatalogJournalAddedQuery: + + def query(self): + return { + "size": 1, + "sort": [ + { + "date_added": { + "order": "desc" + } + } + ], + "query": { + "match_all": {} + } + } + + +def find_last_datalog(): + try: + record = next(DatalogJournalAdded.iterate(LastDatalogJournalAddedQuery().query()), None) + except (elasticsearch.exceptions.NotFoundError, ScrollInitialiseException): + record = None + return record + + +class DateAddedDescQuery: + + def query(self): + return { + 'sort': [ + {'date_added': {'order': 'desc'}} + ] + } + + +class IssnDateMatchQuery: + def __init__(self, issn, date_added): + self.issn = issn + self.date_added = date_added + + def query(self): + return { + "query": { + "bool": { + "filter": [ + {"term": {"issn.keyword": self.issn}}, + {"term": {"date_added": self.date_added}} + ] + } + }, + } + + +def is_issn_exists(issn, date_added): + return dao.is_exist(IssnDateMatchQuery(issn, date_added).query(), DatalogJournalAdded.index_name()) diff --git a/portality/settings.py b/portality/settings.py index d5b68b014..60571782e 100644 --- a/portality/settings.py +++ b/portality/settings.py @@ -442,7 +442,7 @@ "old_data_cleanup": {"month": "*", "day": "12", "day_of_week": "*", "hour": "6", "minute": "30"}, "monitor_bgjobs": {"month": "*", "day": "*/6", "day_of_week": "*", "hour": "10", "minute": "0"}, "find_discontinued_soon": {"month": "*", "day": "*", "day_of_week": "*", "hour": "0", "minute": "3"}, - "datalog_journal_added_update": {"month": "*", "day": "*", "day_of_week": "*", "hour": "*", "minute": "*/30"} + "datalog_journal_added_update": {"month": "*", "day": "*", "day_of_week": "*", "hour": "4", "minute": "30"} } diff --git a/portality/tasks/anon_export.py b/portality/tasks/anon_export.py index 1b06ddd42..c04805014 100644 --- a/portality/tasks/anon_export.py +++ b/portality/tasks/anon_export.py @@ -3,7 +3,7 @@ import os import shutil import uuid -from typing import Callable, NoReturn +from typing import Callable from portality import models, dao from portality.background import BackgroundTask @@ -111,7 +111,7 @@ def _copy_on_complete(path, logger_fn, tmpStore, mainStore, container): def run_anon_export(tmpStore, mainStore, container, clean=False, limit=None, batch_size=100000, - logger_fn: Callable[[str], NoReturn] = None): + logger_fn: Callable[[str], None] = None): if logger_fn is None: logger_fn = print if clean: diff --git a/portality/tasks/datalog_journal_added_update.py b/portality/tasks/datalog_journal_added_update.py index a56259d50..76e839713 100644 --- a/portality/tasks/datalog_journal_added_update.py +++ b/portality/tasks/datalog_journal_added_update.py @@ -9,12 +9,14 @@ * [No longer display Seal](https://github.com/DOAJ/doajPM/issues/3829) """ +from __future__ import annotations + import datetime import itertools import logging import re import time -from typing import Callable, NoReturn, List, Iterable +from typing import Callable, List, Iterable import gspread @@ -22,8 +24,8 @@ from portality.background import BackgroundTask from portality.core import app from portality.lib import dates, gsheet -from portality.models import Journal -from portality.models.datalog_journal_added import DatalogJournalAdded +from portality.models import Journal, datalog_journal_added +from portality.models.datalog_journal_added import DatalogJournalAdded, find_last_datalog, DateAddedDescQuery from portality.tasks.helpers import background_helper from portality.tasks.redis_huey import scheduled_short_queue as queue @@ -51,20 +53,8 @@ def query(self): } -class LatestDatalogJournalQuery: - def __init__(self): - pass - - def query(self): - return { - 'sort': [ - {'date_added': {'order': 'desc'}} - ] - } - - -def find_new_datalog_journals(latest_date: datetime.datetime) -> Iterable[DatalogJournalAdded]: - records = Journal.iterate(NewDatalogJournalQuery(latest_date).query()) +def find_new_datalog_journals(fetch_date: datetime.datetime) -> Iterable[DatalogJournalAdded]: + records = Journal.iterate(NewDatalogJournalQuery(fetch_date).query()) return (to_datalog_journal_added(j) for j in records) @@ -85,32 +75,15 @@ def to_datalog_journal_added(journal: Journal) -> DatalogJournalAdded: return record -class LastDatalogJournalAddedQuery: - def __init__(self): - pass - - def query(self): - return { - "size": 1, - "sort": [ - { - "date_added": { - "order": "desc" - } - } - ], - "query": { - "match_all": {} - } - } - - -def get_latest_date_added(): - record = next(DatalogJournalAdded.iterate(LastDatalogJournalAddedQuery().query()), None) +def get_fetch_datalog_date(n_days=30): + record = find_last_datalog() if record is None: - return datetime.datetime.now() + return datetime.datetime(2024, 2, 1) else: - return dates.parse(record.date_added) + d = dates.parse(record.date_added) + # subtract n days to avoid missing records + d -= datetime.timedelta(days=n_days) + return d def find_latest_row_index(records: List[List[str]]): @@ -126,21 +99,26 @@ def find_latest_row_index(records: List[List[str]]): return latest_row_index -def find_first_issn(rows): +def find_latest_issn_list(rows, n=10) -> list[str]: + issn_list = [] for row in rows: for c in row: if re.match(regex.ISSN, c): - return c - return None + issn_list.append(c) + + if len(issn_list) >= n: + return issn_list + + return issn_list -def find_new_xlsx_rows(last_issn, page_size=400) -> list: +def find_new_xlsx_rows(latest_issn_list, page_size=400) -> list: """ find new datalog records and convert to xlsx display format Parameters ---------- - last_issn + latest_issn_list page_size Returns @@ -150,8 +128,8 @@ def find_new_xlsx_rows(last_issn, page_size=400) -> list: """ - new_records = DatalogJournalAdded.iterate(LatestDatalogJournalQuery().query(), page_size=page_size) - new_records = itertools.takewhile(lambda r: r.issn != last_issn, new_records) + new_records = DatalogJournalAdded.iterate(DateAddedDescQuery().query(), page_size=page_size) + new_records = itertools.takewhile(lambda r: r.issn not in latest_issn_list, new_records) new_records = list(new_records) new_xlsx_rows = [to_display_data(j) for j in new_records] return new_xlsx_rows @@ -166,22 +144,12 @@ def to_display_data(datalog: DatalogJournalAdded) -> list: def records_new_journals(filename, worksheet_name, google_key_path, - logger_fn: Callable[[str], NoReturn] = None + logger_fn: Callable[[str], None] = None ): if logger_fn is None: logger_fn = print - latest_date = get_latest_date_added() - new_datalog_list = find_new_datalog_journals(latest_date) - new_datalog_list = (dao.patch_model_for_bulk(r) for r in new_datalog_list) - new_datalog_list = list(new_datalog_list) - if new_datalog_list: - # save new records to DB - DatalogJournalAdded.bulk([r.data for r in new_datalog_list], ) - logger_fn(f'saved new records to datalog [{len(new_datalog_list)}]') - time.sleep(6) # wait for bulk save to complete - else: - logger_fn('No new records found') + sync_datalog_journal_added(logger_fn) # save new records to google sheet client = gsheet.load_client(google_key_path) @@ -199,14 +167,33 @@ def records_new_journals(filename, org_rows = worksheet.get_all_values() org_rows = list(org_rows) latest_row_idx = find_latest_row_index(org_rows) - last_issn = find_first_issn(org_rows[latest_row_idx:]) - logger_fn(f'last_issn: {last_issn}') + latest_issn_list = find_latest_issn_list(org_rows[latest_row_idx:]) + logger_fn(f'latest_issn_list: {latest_issn_list}') - new_xlsx_rows = find_new_xlsx_rows(last_issn) + new_xlsx_rows = find_new_xlsx_rows(latest_issn_list) worksheet.insert_rows(new_xlsx_rows, latest_row_idx + 1) logger_fn(f'inserted rows to google sheet [{len(new_xlsx_rows)}]') +def sync_datalog_journal_added(logger_fn: Callable[[str], None] = None): + if logger_fn is None: + logger_fn = print + + fetch_date = get_fetch_datalog_date() + logger_fn(f'latest_date of Datalog: {fetch_date}') + new_datalog_list = find_new_datalog_journals(fetch_date) + new_datalog_list = (r for r in new_datalog_list if r.issn and not datalog_journal_added.is_issn_exists(r.issn, r.date_added)) + new_datalog_list = (dao.patch_model_for_bulk(r) for r in new_datalog_list) + new_datalog_list = list(new_datalog_list) + if new_datalog_list: + # save new records to DB + DatalogJournalAdded.bulk([r.data for r in new_datalog_list], ) + logger_fn(f'saved new records to datalog [{len(new_datalog_list)}]') + time.sleep(6) # wait for bulk save to complete + else: + logger_fn('No new records found') + + class DatalogJournalAddedUpdate(BackgroundTask): """ ~~DatalogJournalAddedUpdate:Feature->BackgroundTask:Process~~