Skip to content

Commit

Permalink
DAOTHER-8905, BMRB internal annotation: Rescue spectral peak lists mi…
Browse files Browse the repository at this point in the history
…ssing by ancient critical OneDep bugs
  • Loading branch information
yokochi47 committed Feb 21, 2025
1 parent 519f010 commit 9f2a732
Showing 1 changed file with 95 additions and 44 deletions.
139 changes: 95 additions & 44 deletions wwpdb/utils/tests-nmr/run_annotation_onedep.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,22 @@
import json
import re
import pynmrstar
from packaging import version as package_version
import hashlib

from mmcif.io.IoAdapterPy import IoAdapterPy
from typing import Any, List, Tuple, Union, Optional

try:
from wwpdb.utils.nmr.NmrDpUtility import NmrDpUtility
from wwpdb.utils.nmr.io.CifReader import CifReader
auth_view = 'mock-data-combine-at-upload'
except ImportError:
from nmr.NmrDpUtility import NmrDpUtility
from nmr.io.CifReader import CifReader
auth_view = 'auth_view'


__pynmrstar_v3_3__ = package_version.parse(pynmrstar.__version__) >= package_version.parse("3.3.0")
__pynmrstar_v3_2__ = package_version.parse(pynmrstar.__version__) >= package_version.parse("3.2.0")
__pynmrstar_v3_1__ = package_version.parse(pynmrstar.__version__) >= package_version.parse("3.1.0")
__pynmrstar_v3__ = package_version.parse(pynmrstar.__version__) >= package_version.parse("3.0.0")


def get_inventory_list(star_data):
def get_inventory_list(star_data: Union[pynmrstar.Entry, pynmrstar.Saveframe, pynmrstar.Loop]) -> Tuple[List[str], List[str]]:
""" Return lists of saveframe category names and loop category names in an NEF/NMR-STAR file.
@return: list of saveframe category names, list of loop category names
"""
Expand All @@ -48,7 +45,7 @@ def get_inventory_list(star_data):
return sf_list, lp_list


def is_empty_loop(star_data, lp_category):
def is_empty_loop(star_data: Union[pynmrstar.Entry, pynmrstar.Saveframe, pynmrstar.Loop], lp_category: str) -> bool:
""" Return whether one of specified loops is empty loop.
@return: True for empty loop exists, False otherwise
"""
Expand All @@ -59,47 +56,56 @@ def is_empty_loop(star_data, lp_category):
return any(len(loop) == 0 for loop in loops)

if isinstance(star_data, pynmrstar.Saveframe):
if __pynmrstar_v3_2__:
loop = star_data.get_loop(lp_category)
else:
loop = star_data.get_loop_by_category(lp_category)
loop = star_data.get_loop(lp_category)

return len(loop) == 0

return len(star_data) == 0


def get_lp_tag(lp, tags):
""" Return the selected loop tags by row as a list of lists.
def get_first_sf_tag(sf: pynmrstar.Saveframe, tag: str, default: str = '') -> Any:
""" Return the first value of a given saveframe tag with decoding symbol notation.
@return: The first tag value, '' (by default) otherwise.
"""

return lp.get_tag(tags) if __pynmrstar_v3__ else lp.get_data_by_tag(tags)
if not isinstance(sf, pynmrstar.Saveframe) or tag is None:
return default

array = sf.get_tag(tag)

def get_first_sf_tag(sf=None, tag=None):
""" Return the first value of a given saveframe tag.
@return: The first tag value, empty string otherwise.
"""
if len(array) == 0 or array[0] is None:
return default

if sf is None or tag is None:
return ''
if not isinstance(array[0], str):
return array[0]

array = sf.get_tag(tag)
value = array[0]

while value.startswith('$$'):
value = value[1:]

if len(array) == 0:
return ''
if len(value) == 0 or value == '$':
return default

return array[0] if array[0] is not None else ''
return value if len(value) < 2 or value[0] != '$' else value[1:]


def set_sf_tag(sf, tag, value):
""" Set saveframe tag.
def set_sf_tag(sf: pynmrstar.Saveframe, tag: str, value: Any):
""" Set saveframe tag with a given value.
"""

tagNames = [t[0] for t in sf.tags]

if isinstance(value, str) and len(value) == 0:
value = None
if isinstance(value, str):

if len(value) == 0:
value = None

while value.startswith('$$'):
value = value[1:]

if len(value) == 0 or value == '$':
value = None

if tag not in tagNames:
sf.add_tag(tag, value)
Expand All @@ -108,7 +114,10 @@ def set_sf_tag(sf, tag, value):
sf.tags[tagNames.index(tag)][1] = value


def is_combined_nmr_data(file_path):
def is_combined_nmr_data(file_path: str) -> Tuple[bool, Optional[dict]]:
""" Return whether input file is combined NMR data file.
@return: Whether input file is combined NMR data file, dictionary of list of associated constraint file names for each format
"""

emptyValue = (None, '', '.', '?', 'null', 'None')

Expand All @@ -120,9 +129,11 @@ def is_combined_nmr_data(file_path):
bmrb_id_pattern = re.compile(r'^(bmr)?[0-9]{5,}$')

try:

star_data = pynmrstar.Entry.from_file(file_path)

for sf in star_data.frame_list:

if sf.category == 'constraint_statistics':
data_file_name = get_first_sf_tag(sf, 'Data_file_name')
entry_id = get_first_sf_tag(sf, 'Entry_ID')
Expand All @@ -132,12 +143,9 @@ def is_combined_nmr_data(file_path):

lp_category = '_Constraint_file'

if __pynmrstar_v3_2__:
lp = sf.get_loop(lp_category)
else:
lp = sf.get_loop_by_category(lp_category)
lp = sf.get_loop(lp_category)

dat = get_lp_tag(lp, ['ID', 'Constraint_filename', 'Software_name'])
dat = lp.get_tag(['ID', 'Constraint_filename', 'Software_name'])

original_file_name = {}

Expand Down Expand Up @@ -197,8 +205,10 @@ def is_combined_nmr_data(file_path):
original_file_name['nm-res-cha'].append(row[1])

return combined, original_file_name

except Exception:
pass

return False, None


Expand Down Expand Up @@ -818,6 +828,18 @@ def __init__(self):
else:
self.__mr_file_path.append(d['file_name'])

pk_file_lists = []

try:

cR = CifReader(False, sys.stderr)

if cR.parse(self.__cif_file_path):
pk_file_lists = cR.getDictList('pdbx_nmr_spectral_peak_list')

except Exception:
pass

pk_dic = {}

for file_name in sorted(os.listdir(self.__data_dir)):
Expand All @@ -841,6 +863,35 @@ def __init__(self):

self.__has_peak = True

if len(pk_dic) < len(pk_file_lists):
pk_file_lists, md5_list = [], []

for key in sorted(pk_dic.keys()):
file_path = pk_dic[key]['file_name']
pk_file_lists.append(file_path)
with open(file_path, 'r', encoding='utf-8', errors='ignore') as ifh:
hash_key = hashlib.md5(ifh.read().encode('utf-8')).hexdigest()
md5_list.append(hash_key)

for file_name in sorted(os.listdir(self.__data_dir)):
if self.__pk_name_pattern.match(file_name):
file_path = os.path.join(self.__data_dir, file_name)

if file_path in pk_file_lists:
continue

with open(file_path, 'r', encoding='utf-8', errors='ignore') as ifh:
hash_key = hashlib.md5(ifh.read().encode('utf-8')).hexdigest()

if hash_key in md5_list:
continue

pk_file_lists.append(file_path)
md5_list.append(hash_key)

key = max(list(pk_dic.keys())) + 1
pk_dic[key] = {'file_name': file_path}

for key in sorted(pk_dic.keys()):
self.__ar_file_type.append('nm-pea-any')
self.__ar_file_path.append(pk_dic[key]['file_name'])
Expand All @@ -863,7 +914,7 @@ def __init__(self):
else:
print(f'Spectral peaks : {ar_file_path} ({ar_file_type})')

def is_star_file(self, file_path):
def is_star_file(self, file_path: str) -> bool:

has_datablock = False
has_anonymous_saveframe = False
Expand All @@ -886,7 +937,7 @@ def is_star_file(self, file_path):

return has_datablock or has_anonymous_saveframe or has_save or has_loop or has_stop

def is_amb_top_file(self, file_path):
def is_amb_top_file(self, file_path: str) -> bool:

with open(file_path, "r") as ifh:
for line in ifh:
Expand All @@ -895,7 +946,7 @@ def is_amb_top_file(self, file_path):

return False

def is_amb_rst_file(self, file_path):
def is_amb_rst_file(self, file_path) -> bool:

with open(file_path, "r") as ifh:
for line in ifh:
Expand All @@ -904,7 +955,7 @@ def is_amb_rst_file(self, file_path):

return False

def is_gro_top_file(self, file_path):
def is_gro_top_file(self, file_path) -> bool:

with open(file_path, "r") as ifh:
for line in ifh:
Expand All @@ -913,7 +964,7 @@ def is_gro_top_file(self, file_path):

return False

def is_gro_rst_file(self, file_path):
def is_gro_rst_file(self, file_path) -> bool:

with open(file_path, "r") as ifh:
for line in ifh:
Expand All @@ -922,7 +973,7 @@ def is_gro_rst_file(self, file_path):

return False

def is_cha_top_file(self, file_path):
def is_cha_top_file(self, file_path) -> bool:

with open(file_path, "r") as ifh:
for line in ifh:
Expand All @@ -931,7 +982,7 @@ def is_cha_top_file(self, file_path):

return False

def is_cha_rst_file(self, file_path):
def is_cha_rst_file(self, file_path) -> bool:

with open(file_path, "r") as ifh:
for line in ifh:
Expand All @@ -940,7 +991,7 @@ def is_cha_rst_file(self, file_path):

return False

def has_cs_loop(self, file_path): # pylint: disable=no-self-use
def has_cs_loop(self, file_path) -> bool: # pylint: disable=no-self-use
try:

star_data = pynmrstar.Entry.from_file(file_path)
Expand Down

0 comments on commit 9f2a732

Please sign in to comment.