diff --git a/wwpdb/utils/tests-nmr/run_annotation_onedep.py b/wwpdb/utils/tests-nmr/run_annotation_onedep.py index 7c22033e..48d19ad8 100644 --- a/wwpdb/utils/tests-nmr/run_annotation_onedep.py +++ b/wwpdb/utils/tests-nmr/run_annotation_onedep.py @@ -3,25 +3,22 @@ import json import re import pynmrstar -from packaging import version as package_version +import hashlib from mmcif.io.IoAdapterPy import IoAdapterPy +from typing import Any, List, Tuple, Union, Optional try: from wwpdb.utils.nmr.NmrDpUtility import NmrDpUtility + from wwpdb.utils.nmr.io.CifReader import CifReader auth_view = 'mock-data-combine-at-upload' except ImportError: from nmr.NmrDpUtility import NmrDpUtility + from nmr.io.CifReader import CifReader auth_view = 'auth_view' -__pynmrstar_v3_3__ = package_version.parse(pynmrstar.__version__) >= package_version.parse("3.3.0") -__pynmrstar_v3_2__ = package_version.parse(pynmrstar.__version__) >= package_version.parse("3.2.0") -__pynmrstar_v3_1__ = package_version.parse(pynmrstar.__version__) >= package_version.parse("3.1.0") -__pynmrstar_v3__ = package_version.parse(pynmrstar.__version__) >= package_version.parse("3.0.0") - - -def get_inventory_list(star_data): +def get_inventory_list(star_data: Union[pynmrstar.Entry, pynmrstar.Saveframe, pynmrstar.Loop]) -> Tuple[List[str], List[str]]: """ Return lists of saveframe category names and loop category names in an NEF/NMR-STAR file. @return: list of saveframe category names, list of loop category names """ @@ -48,7 +45,7 @@ def get_inventory_list(star_data): return sf_list, lp_list -def is_empty_loop(star_data, lp_category): +def is_empty_loop(star_data: Union[pynmrstar.Entry, pynmrstar.Saveframe, pynmrstar.Loop], lp_category: str) -> bool: """ Return whether one of specified loops is empty loop. @return: True for empty loop exists, False otherwise """ @@ -59,47 +56,56 @@ def is_empty_loop(star_data, lp_category): return any(len(loop) == 0 for loop in loops) if isinstance(star_data, pynmrstar.Saveframe): - if __pynmrstar_v3_2__: - loop = star_data.get_loop(lp_category) - else: - loop = star_data.get_loop_by_category(lp_category) + loop = star_data.get_loop(lp_category) return len(loop) == 0 return len(star_data) == 0 -def get_lp_tag(lp, tags): - """ Return the selected loop tags by row as a list of lists. +def get_first_sf_tag(sf: pynmrstar.Saveframe, tag: str, default: str = '') -> Any: + """ Return the first value of a given saveframe tag with decoding symbol notation. + @return: The first tag value, '' (by default) otherwise. """ - return lp.get_tag(tags) if __pynmrstar_v3__ else lp.get_data_by_tag(tags) + if not isinstance(sf, pynmrstar.Saveframe) or tag is None: + return default + array = sf.get_tag(tag) -def get_first_sf_tag(sf=None, tag=None): - """ Return the first value of a given saveframe tag. - @return: The first tag value, empty string otherwise. - """ + if len(array) == 0 or array[0] is None: + return default - if sf is None or tag is None: - return '' + if not isinstance(array[0], str): + return array[0] - array = sf.get_tag(tag) + value = array[0] + + while value.startswith('$$'): + value = value[1:] - if len(array) == 0: - return '' + if len(value) == 0 or value == '$': + return default - return array[0] if array[0] is not None else '' + return value if len(value) < 2 or value[0] != '$' else value[1:] -def set_sf_tag(sf, tag, value): - """ Set saveframe tag. +def set_sf_tag(sf: pynmrstar.Saveframe, tag: str, value: Any): + """ Set saveframe tag with a given value. """ tagNames = [t[0] for t in sf.tags] - if isinstance(value, str) and len(value) == 0: - value = None + if isinstance(value, str): + + if len(value) == 0: + value = None + + while value.startswith('$$'): + value = value[1:] + + if len(value) == 0 or value == '$': + value = None if tag not in tagNames: sf.add_tag(tag, value) @@ -108,7 +114,10 @@ def set_sf_tag(sf, tag, value): sf.tags[tagNames.index(tag)][1] = value -def is_combined_nmr_data(file_path): +def is_combined_nmr_data(file_path: str) -> Tuple[bool, Optional[dict]]: + """ Return whether input file is combined NMR data file. + @return: Whether input file is combined NMR data file, dictionary of list of associated constraint file names for each format + """ emptyValue = (None, '', '.', '?', 'null', 'None') @@ -120,9 +129,11 @@ def is_combined_nmr_data(file_path): bmrb_id_pattern = re.compile(r'^(bmr)?[0-9]{5,}$') try: + star_data = pynmrstar.Entry.from_file(file_path) for sf in star_data.frame_list: + if sf.category == 'constraint_statistics': data_file_name = get_first_sf_tag(sf, 'Data_file_name') entry_id = get_first_sf_tag(sf, 'Entry_ID') @@ -132,12 +143,9 @@ def is_combined_nmr_data(file_path): lp_category = '_Constraint_file' - if __pynmrstar_v3_2__: - lp = sf.get_loop(lp_category) - else: - lp = sf.get_loop_by_category(lp_category) + lp = sf.get_loop(lp_category) - dat = get_lp_tag(lp, ['ID', 'Constraint_filename', 'Software_name']) + dat = lp.get_tag(['ID', 'Constraint_filename', 'Software_name']) original_file_name = {} @@ -197,8 +205,10 @@ def is_combined_nmr_data(file_path): original_file_name['nm-res-cha'].append(row[1]) return combined, original_file_name + except Exception: pass + return False, None @@ -818,6 +828,18 @@ def __init__(self): else: self.__mr_file_path.append(d['file_name']) + pk_file_lists = [] + + try: + + cR = CifReader(False, sys.stderr) + + if cR.parse(self.__cif_file_path): + pk_file_lists = cR.getDictList('pdbx_nmr_spectral_peak_list') + + except Exception: + pass + pk_dic = {} for file_name in sorted(os.listdir(self.__data_dir)): @@ -841,6 +863,35 @@ def __init__(self): self.__has_peak = True + if len(pk_dic) < len(pk_file_lists): + pk_file_lists, md5_list = [], [] + + for key in sorted(pk_dic.keys()): + file_path = pk_dic[key]['file_name'] + pk_file_lists.append(file_path) + with open(file_path, 'r', encoding='utf-8', errors='ignore') as ifh: + hash_key = hashlib.md5(ifh.read().encode('utf-8')).hexdigest() + md5_list.append(hash_key) + + for file_name in sorted(os.listdir(self.__data_dir)): + if self.__pk_name_pattern.match(file_name): + file_path = os.path.join(self.__data_dir, file_name) + + if file_path in pk_file_lists: + continue + + with open(file_path, 'r', encoding='utf-8', errors='ignore') as ifh: + hash_key = hashlib.md5(ifh.read().encode('utf-8')).hexdigest() + + if hash_key in md5_list: + continue + + pk_file_lists.append(file_path) + md5_list.append(hash_key) + + key = max(list(pk_dic.keys())) + 1 + pk_dic[key] = {'file_name': file_path} + for key in sorted(pk_dic.keys()): self.__ar_file_type.append('nm-pea-any') self.__ar_file_path.append(pk_dic[key]['file_name']) @@ -863,7 +914,7 @@ def __init__(self): else: print(f'Spectral peaks : {ar_file_path} ({ar_file_type})') - def is_star_file(self, file_path): + def is_star_file(self, file_path: str) -> bool: has_datablock = False has_anonymous_saveframe = False @@ -886,7 +937,7 @@ def is_star_file(self, file_path): return has_datablock or has_anonymous_saveframe or has_save or has_loop or has_stop - def is_amb_top_file(self, file_path): + def is_amb_top_file(self, file_path: str) -> bool: with open(file_path, "r") as ifh: for line in ifh: @@ -895,7 +946,7 @@ def is_amb_top_file(self, file_path): return False - def is_amb_rst_file(self, file_path): + def is_amb_rst_file(self, file_path) -> bool: with open(file_path, "r") as ifh: for line in ifh: @@ -904,7 +955,7 @@ def is_amb_rst_file(self, file_path): return False - def is_gro_top_file(self, file_path): + def is_gro_top_file(self, file_path) -> bool: with open(file_path, "r") as ifh: for line in ifh: @@ -913,7 +964,7 @@ def is_gro_top_file(self, file_path): return False - def is_gro_rst_file(self, file_path): + def is_gro_rst_file(self, file_path) -> bool: with open(file_path, "r") as ifh: for line in ifh: @@ -922,7 +973,7 @@ def is_gro_rst_file(self, file_path): return False - def is_cha_top_file(self, file_path): + def is_cha_top_file(self, file_path) -> bool: with open(file_path, "r") as ifh: for line in ifh: @@ -931,7 +982,7 @@ def is_cha_top_file(self, file_path): return False - def is_cha_rst_file(self, file_path): + def is_cha_rst_file(self, file_path) -> bool: with open(file_path, "r") as ifh: for line in ifh: @@ -940,7 +991,7 @@ def is_cha_rst_file(self, file_path): return False - def has_cs_loop(self, file_path): # pylint: disable=no-self-use + def has_cs_loop(self, file_path) -> bool: # pylint: disable=no-self-use try: star_data = pynmrstar.Entry.from_file(file_path)