Skip to content

Commit

Permalink
Implement atom name mapping history as requirement of standalone NMR …
Browse files Browse the repository at this point in the history
…data conversion service
  • Loading branch information
yokochi47 committed Nov 27, 2024
1 parent 67c930d commit b5bb10e
Show file tree
Hide file tree
Showing 2 changed files with 289 additions and 16 deletions.
283 changes: 267 additions & 16 deletions wwpdb/utils/nmr/NmrDpUtility.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@
# 14-Nov-2024 M. Yokochi - add support for CHARMM extended CRD (topology) file. file type: 'nm-aux-cha'
# 19-Nov-2024 M. Yokochi - add support for pH titration data (NMR restraint remediation)
# 22-Nov-2024 M. Yokochi - add support for CYANA NOA (NOE Assignment) file. file type: 'nm-res-noa'
# 27-Nov-2024 M. Yokochi - implement atom name mapping history as requirement of standalone NMR data conversion service
##
""" Wrapper class for NMR data processing.
@author: Masashi Yokochi
Expand Down Expand Up @@ -36087,10 +36088,10 @@ def __calculateStatsOfExptlData__(self, file_list_id, file_name, file_type, cont
self.__calculateStatsOfDistanceRestraint(file_list_id, sf_framecode, lp_data, conflict_id_set, inconsistent, redundant, ent)

elif content_subtype == 'dihed_restraint':
self.__calculateStatsOfDihedralRestraint(file_list_id, lp_data, conflict_id_set, inconsistent, redundant, ent)
self.__calculateStatsOfDihedralRestraint(file_list_id, sf_framecode, lp_data, conflict_id_set, inconsistent, redundant, ent)

elif content_subtype == 'rdc_restraint':
self.__calculateStatsOfRdcRestraint(file_list_id, lp_data, conflict_id_set, inconsistent, redundant, ent)
self.__calculateStatsOfRdcRestraint(file_list_id, sf_framecode, lp_data, conflict_id_set, inconsistent, redundant, ent)

if content_subtype.startswith('spectral_peak'):

Expand Down Expand Up @@ -37744,7 +37745,9 @@ def __calculateStatsOfDistanceRestraint(self, file_list_id, sf_framecode, lp_dat
file_name = input_source_dic['file_name']
file_type = input_source_dic['file_type']

index_tag = self.index_tags[file_type]['dist_restraint']
content_subtype = 'dist_restraint'

index_tag = self.index_tags[file_type][content_subtype]
item_names = self.item_names_in_ds_loop[file_type]
combination_id_name = item_names['combination_id']
chain_id_1_name = item_names['chain_id_1']
Expand All @@ -37764,8 +37767,8 @@ def __calculateStatsOfDistanceRestraint(self, file_list_id, sf_framecode, lp_dat
upper_limit_name = item_names['upper_limit']
lower_linear_limit_name = item_names['lower_linear_limit']
upper_linear_limit_name = item_names['upper_linear_limit']
weight_name = self.weight_tags[file_type]['dist_restraint']
id_tag = self.consist_id_tags[file_type]['dist_restraint']
weight_name = self.weight_tags[file_type][content_subtype]
id_tag = self.consist_id_tags[file_type][content_subtype]

len_lp_data = len(lp_data)

Expand Down Expand Up @@ -38580,7 +38583,7 @@ def __calculateStatsOfDistanceRestraint(self, file_list_id, sf_framecode, lp_dat
ent['histogram_of_discrepancy'] = {'range_of_values': range_of_vals, 'number_of_values': transposed, 'annotations': dist_ann}

if file_type == 'nmr-star' and self.__star_data_type[file_list_id] == 'Entry':
lp_category = self.lp_categories[file_type]['dist_restraint']
lp_category = self.lp_categories[file_type][content_subtype]
sf = self.__star_data[file_list_id].get_saveframe_by_name(sf_framecode)
lp = next(lp for lp in sf.loops if lp.category == lp_category)

Expand Down Expand Up @@ -39544,7 +39547,7 @@ def __getTypeOfCovalentBond(self, file_type, lp_data, row_id, target_value,

return data_type

def __calculateStatsOfDihedralRestraint(self, file_list_id, lp_data, conflict_id_set, inconsistent, redundant, ent):
def __calculateStatsOfDihedralRestraint(self, file_list_id, sf_framecode, lp_data, conflict_id_set, inconsistent, redundant, ent):
""" Calculate statistics of dihedral angle restraints.
"""

Expand All @@ -39553,8 +39556,10 @@ def __calculateStatsOfDihedralRestraint(self, file_list_id, lp_data, conflict_id

file_type = input_source_dic['file_type']

index_tag = self.index_tags[file_type]['dihed_restraint']
item_names = self.potential_items[file_type]['dihed_restraint']
content_subtype = 'dihed_restraint'

index_tag = self.index_tags[file_type][content_subtype]
item_names = self.potential_items[file_type][content_subtype]
target_value_name = item_names['target_value']
lower_limit_name = item_names['lower_limit']
upper_limit_name = item_names['upper_limit']
Expand All @@ -39580,8 +39585,8 @@ def __calculateStatsOfDihedralRestraint(self, file_list_id, lp_data, conflict_id
atom_id_3_name = dh_item_names['atom_id_3']
atom_id_4_name = dh_item_names['atom_id_4']
angle_type_name = dh_item_names['angle_type']
weight_name = self.weight_tags[file_type]['dihed_restraint']
id_tag = self.consist_id_tags[file_type]['dihed_restraint']
weight_name = self.weight_tags[file_type][content_subtype]
id_tag = self.consist_id_tags[file_type][content_subtype]

try:

Expand Down Expand Up @@ -40300,6 +40305,162 @@ def __calculateStatsOfDihedralRestraint(self, file_list_id, lp_data, conflict_id
if len(range_of_vals) > 1:
ent['histogram_of_discrepancy'] = {'range_of_values': range_of_vals, 'number_of_values': transposed, 'annotations': dihed_ann}

if file_type == 'nmr-star' and self.__star_data_type[file_list_id] == 'Entry':
lp_category = self.lp_categories[file_type][content_subtype]
sf = self.__star_data[file_list_id].get_saveframe_by_name(sf_framecode)
lp = next(lp for lp in sf.loops if lp.category == lp_category)

mapping, identity_mapping = [], []
dat1 = dat2 = dat3 = dat4 = None

tags1 = ['Comp_ID_1', 'Atom_ID_1', 'Auth_atom_name_1']
if set(tags1) & set(lp.tags) == set(tags1):
dat1 = get_lp_tag(lp, tags1)

tags2 = ['Comp_ID_2', 'Atom_ID_2', 'Auth_atom_name_2']
if set(tags2) & set(lp.tags) == set(tags2):
dat2 = get_lp_tag(lp, tags2)

tags3 = ['Comp_ID_3', 'Atom_ID_3', 'Auth_atom_name_3']
if set(tags3) & set(lp.tags) == set(tags3):
dat3 = get_lp_tag(lp, tags3)

tags4 = ['Comp_ID_4', 'Atom_ID_4', 'Auth_atom_name_4']
if set(tags4) & set(lp.tags) == set(tags4):
dat4 = get_lp_tag(lp, tags4)

if dat1 is not None:

for row in dat1:
if row[0] in emptyValue or row[1] in emptyValue or row[2] in emptyValue or row[1] != row[2]:
continue
key = (row[0], row[2])
if key not in identity_mapping:
identity_mapping.append(key)

if dat2 is not None:

for row in dat2:
if row[0] in emptyValue or row[1] in emptyValue or row[2] in emptyValue or row[1] != row[2]:
continue
key = (row[0], row[2])
if key not in identity_mapping:
identity_mapping.append(key)

if dat3 is not None:

for row in dat3:
if row[0] in emptyValue or row[1] in emptyValue or row[2] in emptyValue or row[1] != row[2]:
continue
key = (row[0], row[2])
if key not in identity_mapping:
identity_mapping.append(key)

if dat4 is not None:

for row in dat4:
if row[0] in emptyValue or row[1] in emptyValue or row[2] in emptyValue or row[1] != row[2]:
continue
key = (row[0], row[2])
if key not in identity_mapping:
identity_mapping.append(key)

if dat1 is not None:

for row in dat1:
if row[0] in emptyValue or row[1] in emptyValue or row[2] in emptyValue or row[1] == row[2]:
continue
comp_id = row[0]
atom_id = row[1]
atom_name = row[2]

if not any(m['comp_id'] == comp_id for m in mapping):
mapping.append({'comp_id': comp_id, 'history': []})

history = next(m['history'] for m in mapping if m['comp_id'] == comp_id)

if not any(h for h in history if h['atom_name'] == atom_name):
history.append({'atom_name': atom_name, 'atom_id': [atom_name] if (comp_id, atom_name) in identity_mapping else []})

h = next(h for h in history if h['atom_name'] == atom_name)
if atom_id not in h['atom_id']:
h['atom_id'].append(atom_id)

if dat2 is not None:

for row in dat2:
if row[0] in emptyValue or row[1] in emptyValue or row[2] in emptyValue or row[1] == row[2]:
continue
comp_id = row[0]
atom_id = row[1]
atom_name = row[2]

if not any(m['comp_id'] == comp_id for m in mapping):
mapping.append({'comp_id': comp_id, 'history': []})

history = next(m['history'] for m in mapping if m['comp_id'] == comp_id)

if not any(h for h in history if h['atom_name'] == atom_name):
history.append({'atom_name': atom_name, 'atom_id': [atom_name] if (comp_id, atom_name) in identity_mapping else []})

h = next(h for h in history if h['atom_name'] == atom_name)
if atom_id not in h['atom_id']:
h['atom_id'].append(atom_id)

if dat3 is not None:

for row in dat3:
if row[0] in emptyValue or row[1] in emptyValue or row[2] in emptyValue or row[1] == row[2]:
continue
comp_id = row[0]
atom_id = row[1]
atom_name = row[2]

if not any(m['comp_id'] == comp_id for m in mapping):
mapping.append({'comp_id': comp_id, 'history': []})

history = next(m['history'] for m in mapping if m['comp_id'] == comp_id)

if not any(h for h in history if h['atom_name'] == atom_name):
history.append({'atom_name': atom_name, 'atom_id': [atom_name] if (comp_id, atom_name) in identity_mapping else []})

h = next(h for h in history if h['atom_name'] == atom_name)
if atom_id not in h['atom_id']:
h['atom_id'].append(atom_id)

if dat4 is not None:

for row in dat4:
if row[0] in emptyValue or row[1] in emptyValue or row[2] in emptyValue or row[1] == row[2]:
continue
comp_id = row[0]
atom_id = row[1]
atom_name = row[2]

if not any(m['comp_id'] == comp_id for m in mapping):
mapping.append({'comp_id': comp_id, 'history': []})

history = next(m['history'] for m in mapping if m['comp_id'] == comp_id)

if not any(h for h in history if h['atom_name'] == atom_name):
history.append({'atom_name': atom_name, 'atom_id': [atom_name] if (comp_id, atom_name) in identity_mapping else []})

h = next(h for h in history if h['atom_name'] == atom_name)
if atom_id not in h['atom_id']:
h['atom_id'].append(atom_id)

if len(mapping) == 0:
mapping = None

else:
for m in mapping:
for h in m['history']:
h['atom_id'] = sorted(h['atom_id'])
m['history'] = sorted(m['history'], key=itemgetter('atom_name'))
mapping = sorted(mapping, key=lambda x: (len(x['comp_id']), x['comp_id']))

ent['atom_name_mapping'] = mapping

except Exception as e:

self.report.error.appendDescription('internal_error', "+NmrDpUtility.__calculateStatsOfDihedralRestraint() ++ Error - " + str(e))
Expand Down Expand Up @@ -40330,7 +40491,7 @@ def __getTypeOfDihedralRestraint(self, data_type, peptide, nucleotide, carbohydr

return data_type

def __calculateStatsOfRdcRestraint(self, file_list_id, lp_data, conflict_id_set, inconsistent, redundant, ent):
def __calculateStatsOfRdcRestraint(self, file_list_id, sf_framecode, lp_data, conflict_id_set, inconsistent, redundant, ent):
""" Calculate statistics of RDC restraints.
"""

Expand All @@ -40339,8 +40500,10 @@ def __calculateStatsOfRdcRestraint(self, file_list_id, lp_data, conflict_id_set,

file_type = input_source_dic['file_type']

index_tag = self.index_tags[file_type]['rdc_restraint']
item_names = self.potential_items[file_type]['rdc_restraint']
content_subtype = 'rdc_restraint'

index_tag = self.index_tags[file_type][content_subtype]
item_names = self.potential_items[file_type][content_subtype]
target_value_name = item_names['target_value']
if 'target_value_alt' in item_names and target_value_name not in lp_data[0].keys():
target_value_name = item_names['target_value_alt']
Expand Down Expand Up @@ -40401,8 +40564,8 @@ def __calculateStatsOfRdcRestraint(self, file_list_id, lp_data, conflict_id_set,
# comp_id_2_name = item_names['comp_id_2']
atom_id_1_name = item_names['atom_id_1']
atom_id_2_name = item_names['atom_id_2']
weight_name = self.weight_tags[file_type]['rdc_restraint']
id_tag = self.consist_id_tags[file_type]['rdc_restraint']
weight_name = self.weight_tags[file_type][content_subtype]
id_tag = self.consist_id_tags[file_type][content_subtype]

count, comb_count, inco_count, redu_count, weights, potential_types =\
{}, {}, {}, {}, {}, {}
Expand Down Expand Up @@ -40863,6 +41026,94 @@ def __calculateStatsOfRdcRestraint(self, file_list_id, lp_data, conflict_id_set,
if len(range_of_vals) > 1:
ent['histogram_of_discrepancy'] = {'range_of_values': range_of_vals, 'number_of_values': transposed, 'annotations': rdc_ann}

if file_type == 'nmr-star' and self.__star_data_type[file_list_id] == 'Entry':
lp_category = self.lp_categories[file_type][content_subtype]
sf = self.__star_data[file_list_id].get_saveframe_by_name(sf_framecode)
lp = next(lp for lp in sf.loops if lp.category == lp_category)

mapping, identity_mapping = [], []
dat1 = dat2 = None

tags1 = ['Comp_ID_1', 'Atom_ID_1', 'Auth_atom_name_1']
if set(tags1) & set(lp.tags) == set(tags1):
dat1 = get_lp_tag(lp, tags1)

tags2 = ['Comp_ID_2', 'Atom_ID_2', 'Auth_atom_name_2']
if set(tags2) & set(lp.tags) == set(tags2):
dat2 = get_lp_tag(lp, tags2)

if dat1 is not None:

for row in dat1:
if row[0] in emptyValue or row[1] in emptyValue or row[2] in emptyValue or row[1] != row[2]:
continue
key = (row[0], row[2])
if key not in identity_mapping:
identity_mapping.append(key)

if dat2 is not None:

for row in dat2:
if row[0] in emptyValue or row[1] in emptyValue or row[2] in emptyValue or row[1] != row[2]:
continue
key = (row[0], row[2])
if key not in identity_mapping:
identity_mapping.append(key)

if dat1 is not None:

for row in dat1:
if row[0] in emptyValue or row[1] in emptyValue or row[2] in emptyValue or row[1] == row[2]:
continue
comp_id = row[0]
atom_id = row[1]
atom_name = row[2]

if not any(m['comp_id'] == comp_id for m in mapping):
mapping.append({'comp_id': comp_id, 'history': []})

history = next(m['history'] for m in mapping if m['comp_id'] == comp_id)

if not any(h for h in history if h['atom_name'] == atom_name):
history.append({'atom_name': atom_name, 'atom_id': [atom_name] if (comp_id, atom_name) in identity_mapping else []})

h = next(h for h in history if h['atom_name'] == atom_name)
if atom_id not in h['atom_id']:
h['atom_id'].append(atom_id)

if dat2 is not None:

for row in dat2:
if row[0] in emptyValue or row[1] in emptyValue or row[2] in emptyValue or row[1] == row[2]:
continue
comp_id = row[0]
atom_id = row[1]
atom_name = row[2]

if not any(m['comp_id'] == comp_id for m in mapping):
mapping.append({'comp_id': comp_id, 'history': []})

history = next(m['history'] for m in mapping if m['comp_id'] == comp_id)

if not any(h for h in history if h['atom_name'] == atom_name):
history.append({'atom_name': atom_name, 'atom_id': [atom_name] if (comp_id, atom_name) in identity_mapping else []})

h = next(h for h in history if h['atom_name'] == atom_name)
if atom_id not in h['atom_id']:
h['atom_id'].append(atom_id)

if len(mapping) == 0:
mapping = None

else:
for m in mapping:
for h in m['history']:
h['atom_id'] = sorted(h['atom_id'])
m['history'] = sorted(m['history'], key=itemgetter('atom_name'))
mapping = sorted(mapping, key=lambda x: (len(x['comp_id']), x['comp_id']))

ent['atom_name_mapping'] = mapping

except Exception as e:

self.report.error.appendDescription('internal_error', "+NmrDpUtility.__calculateStatsOfRdcRestraint() ++ Error - " + str(e))
Expand Down
Loading

0 comments on commit b5bb10e

Please sign in to comment.