Skip to content

Commit

Permalink
Add XEASY peak reader and parserlistener with predition for spectral …
Browse files Browse the repository at this point in the history
…region
  • Loading branch information
yokochi47 committed Dec 4, 2024
1 parent 330ccdb commit 3eb3572
Show file tree
Hide file tree
Showing 11 changed files with 2,467 additions and 1,038 deletions.
19 changes: 16 additions & 3 deletions wwpdb/utils/nmr/pk/BasePKParserListener.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ class BasePKParserListener():

# spectral metadata
num_of_dim = -1
acq_dim_id = 1
spectral_dim = {}

# whether to allow extended sequence temporary
Expand All @@ -224,6 +225,9 @@ class BasePKParserListener():
numberSelection = []
originalNumberSelection = []

# collection of assignment (XEASY)
assignmentSelection = []

f = None
warningMessage = None

Expand Down Expand Up @@ -375,7 +379,8 @@ def validatePeak2D(self, index: int, pos_1: float, pos_2: float,
lw_1: Optional[float], lw_2: Optional[float],
pos_hz_1: Optional[float], pos_hz_2: Optional[float],
lw_hz_1: Optional[float], lw_hz_2: Optional[float],
height: Optional[str], height_uncertainty: Optional[str], volume: Optional[str]) -> Optional[dict]:
height: Optional[str], height_uncertainty: Optional[str],
volume: Optional[str], volume_uncertainty: Optional[str]) -> Optional[dict]:

dstFunc = {'position_1': str(pos_1), 'position_2': str(pos_2)}

Expand All @@ -385,6 +390,8 @@ def validatePeak2D(self, index: int, pos_1: float, pos_2: float,
dstFunc['volume'] = volume
if height_uncertainty is not None and float(height_uncertainty) != 0.0:
dstFunc['height_uncertainty'] = height_uncertainty
if volume_uncertainty is not None and float(volume_uncertainty) != 0.0:
dstFunc['volume_uncertainty'] = volume_uncertainty

if 'height' not in dstFunc and 'volume' not in dstFunc:
self.f.append(f"[Missing data] {self.__getCurrentRestraint(n=index)}"
Expand Down Expand Up @@ -427,7 +434,8 @@ def validatePeak3D(self, index: int, pos_1: float, pos_2: float, pos_3: float,
lw_1: Optional[float], lw_2: Optional[float], lw_3: Optional[float],
pos_hz_1: Optional[float], pos_hz_2: Optional[float], pos_hz_3: Optional[float],
lw_hz_1: Optional[float], lw_hz_2: Optional[float], lw_hz_3: Optional[float],
height: Optional[str], height_uncertainty: Optional[str], volume: Optional[str]) -> Optional[dict]:
height: Optional[str], height_uncertainty: Optional[str],
volume: Optional[str], volume_uncertainty: Optional[str]) -> Optional[dict]:

dstFunc = {'position_1': str(pos_1), 'position_2': str(pos_2), 'position_3': str(pos_3)}

Expand All @@ -437,6 +445,8 @@ def validatePeak3D(self, index: int, pos_1: float, pos_2: float, pos_3: float,
dstFunc['volume'] = volume
if height_uncertainty is not None and float(height_uncertainty) != 0.0:
dstFunc['height_uncertainty'] = height_uncertainty
if volume_uncertainty is not None and float(volume_uncertainty) != 0.0:
dstFunc['volume_uncertainty'] = volume_uncertainty

if 'height' not in dstFunc and 'volume' not in dstFunc:
self.f.append(f"[Missing data] {self.__getCurrentRestraint(n=index)}"
Expand Down Expand Up @@ -495,7 +505,8 @@ def validatePeak4D(self, index: int, pos_1: float, pos_2: float, pos_3: float, p
lw_1: Optional[float], lw_2: Optional[float], lw_3: Optional[float], lw_4: Optional[float],
pos_hz_1: Optional[float], pos_hz_2: Optional[float], pos_hz_3: Optional[float], pos_hz_4: Optional[float],
lw_hz_1: Optional[float], lw_hz_2: Optional[float], lw_hz_3: Optional[float], lw_hz_4: Optional[float],
height: Optional[str], height_uncertainty: Optional[str], volume: Optional[str]) -> Optional[dict]:
height: Optional[str], height_uncertainty: Optional[str],
volume: Optional[str], volume_uncertainty: Optional[str]) -> Optional[dict]:

dstFunc = {'position_1': str(pos_1), 'position_2': str(pos_2), 'position_3': str(pos_3), 'position_4': str(pos_4)}

Expand All @@ -505,6 +516,8 @@ def validatePeak4D(self, index: int, pos_1: float, pos_2: float, pos_3: float, p
dstFunc['volume'] = volume
if height_uncertainty is not None and float(height_uncertainty) != 0.0:
dstFunc['height_uncertainty'] = height_uncertainty
if volume_uncertainty is not None and float(volume_uncertainty) != 0.0:
dstFunc['volume_uncertainty'] = volume_uncertainty

if 'height' not in dstFunc and 'volume' not in dstFunc:
self.f.append(f"[Missing data] {self.__getCurrentRestraint(n=index)}"
Expand Down
113 changes: 83 additions & 30 deletions wwpdb/utils/nmr/pk/NmrPipePKParserListener.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import re
import copy
import collections
import numpy as np
import math

from antlr4 import ParseTreeListener
Expand Down Expand Up @@ -79,6 +80,7 @@ def __init__(self, verbose=True, log=sys.stdout,
# Enter a parse tree produced by NmrPipePKParser#nmrpipe_pk.
def enterNmrpipe_pk(self, ctx: NmrPipePKParser.Nmrpipe_pkContext): # pylint: disable=unused-argument
self.num_of_dim = -1
self.acq_dim_id = 1
self.spectral_dim = {}
self.listIdInternal = {}
self.chainNumberDict = {}
Expand Down Expand Up @@ -385,14 +387,47 @@ def exitNmrpipe_pk(self, ctx: NmrPipePKParser.Nmrpipe_pkContext): # pylint: dis
del self.reasonsForReParsing['local_seq_scheme']

if len(self.spectral_dim) > 0:
for v in self.spectral_dim.values():
for _v in v.values():
for d, v in self.spectral_dim.items():
for _id, _v in v.items():
for __v in _v.values():
if __v['spectrometer_frequency'] is None and 'freq_hint' in __v and len(__v['freq_hint']) > 0:
__v['spectrometer_frequency'] = collections.Counter(__v['freq_hint']).most_common()[0][0]
if 'freq_hint' in __v:
center = np.mean(np.array(__v['freq_hint']))

if __v['spectral_region'] is None:
atom_type = __v['atom_type']
if 125 < center < 130 and atom_type == 'C':
__v['spectral_region'] = 'C_aro'
elif 115 < center < 125 and atom_type == 'N':
__v['spectral_region'] = 'N_ami'
elif 170 < center < 180 and atom_type == 'C':
__v['spectral_region'] = 'CO'
elif 6 < center < 9 and atom_type == 'H':
__v['spectral_region'] = 'H_ami_or_aro'
elif 4 < center < 6 and atom_type == 'H':
__v['spectral_region'] = 'H_all'
elif 60 < center < 90 and atom_type == 'C':
__v['spectral_region'] = 'C_all'
elif 30 < center < 50 and atom_type == 'C':
__v['spectral_region'] = 'C_ali'

del __v['freq_hint']

if __v['spectrometer_frequency'] is None and 'obs_freq_hint' in __v and len(__v['obs_freq_hint']) > 0:
__v['spectrometer_frequency'] = collections.Counter(__v['obs_freq_hint']).most_common()[0][0]

if 'obs_freq_hint' in __v:
del __v['obs_freq_hint']

for __v in _v.values():
if __v['spectral_region'] == 'H_ami_or_aro':
has_a = any(___v['spectral_region'] == 'C_aro' for ___v in _v.values())
__v['spectral_region'] = 'H_aro' if has_a else 'H_ami'

if self.debug:
print(f'num_of_dim: {d}, list_id: {_id}')
for __d, __v in _v.items():
print(f'{__d} {__v}')

finally:
self.warningMessage = sorted(list(set(self.f)), key=self.f.index)

Expand Down Expand Up @@ -436,9 +471,12 @@ def set_spectral_dim(_dim_id):

isotope_number = cur_spectral_dim['atom_isotope_number']

cur_spectral_dim['acquisition'] = 'yes' if _dim_id == 1 and (isotope_number == 1
or (isotope_number == 13
and self.exptlMethod == 'SOLID-STATE NMR')) else 'no'
cur_spectral_dim['acquisition'] = 'yes' if _dim_id == self.acq_dim_id\
and (isotope_number == 1 or (isotope_number == 13 and self.exptlMethod == 'SOLID-STATE NMR')) else 'no'

if _dim_id == 1 and cur_spectral_dim['acquisition'] == 'no':
self.acq_dim_id = self.num_of_dim

cur_spectral_dim['under_sampling_type'] = 'not observed' if cur_spectral_dim['acquisition'] == 'yes' else 'aliased'

if ctx.Ppm_value_DA(0) and ctx.Ppm_value_DA(1):
Expand Down Expand Up @@ -506,6 +544,7 @@ def enterPeak_list_2d(self, ctx: NmrPipePKParser.Peak_list_2dContext): # pylint
or _dim_id not in self.cur_spectral_dim
else self.cur_spectral_dim[_dim_id])
self.spectral_dim[self.num_of_dim][self.cur_list_id][_dim_id]['freq_hint'] = []
self.spectral_dim[self.num_of_dim][self.cur_list_id][_dim_id]['obs_freq_hint'] = []
self.peaks2D = 0
self.cur_spectral_dim = {}

Expand Down Expand Up @@ -570,22 +609,25 @@ def exitPeak_2d(self, ctx: NmrPipePKParser.Peak_2dContext):
return

dstFunc = self.validatePeak2D(index, x_ppm, y_ppm, None, None, None, None,
x_hz, y_hz, xw_hz, yw_hz, height, dheight, vol)
x_hz, y_hz, xw_hz, yw_hz, height, dheight, vol, None)

if dstFunc is None:
self.peaks2D -= 1
return

cur_spectral_dim = self.spectral_dim[self.num_of_dim][self.cur_list_id]

cur_spectral_dim[1]['freq_hint'].append(x_ppm)
cur_spectral_dim[2]['freq_hint'].append(y_ppm)

if x_ppm is not None and x_hz is not None and cur_spectral_dim[1]['spectrometer_frequency'] is None\
and isinstance(cur_spectral_dim[1]['value_first_point'], float):
cur_spectral_dim[1]['freq_hint'].append(float(roundString(str(x_hz / (x_ppm - cur_spectral_dim[1]['value_first_point'])),
getMaxEffDigits([str(x_ppm), str(x_hz)]))))
cur_spectral_dim[1]['obs_freq_hint'].append(float(roundString(str(x_hz / (cur_spectral_dim[1]['value_first_point'] - x_ppm)),
getMaxEffDigits([str(x_ppm), str(x_hz)]))))
if y_ppm is not None and y_hz is not None and cur_spectral_dim[2]['spectrometer_frequency'] is None\
and isinstance(cur_spectral_dim[2]['value_first_point'], float):
cur_spectral_dim[2]['freq_hint'].append(float(roundString(str(y_hz / (y_ppm - cur_spectral_dim[2]['value_first_point'])),
getMaxEffDigits([str(y_ppm), str(y_hz)]))))
cur_spectral_dim[2]['obs_freq_hint'].append(float(roundString(str(y_hz / (cur_spectral_dim[2]['value_first_point'] - y_ppm)),
getMaxEffDigits([str(y_ppm), str(y_hz)]))))

has_assignments = False

Expand Down Expand Up @@ -684,6 +726,7 @@ def enterPeak_list_3d(self, ctx: NmrPipePKParser.Peak_list_3dContext): # pylint
or _dim_id not in self.cur_spectral_dim
else self.cur_spectral_dim[_dim_id])
self.spectral_dim[self.num_of_dim][self.cur_list_id][_dim_id]['freq_hint'] = []
self.spectral_dim[self.num_of_dim][self.cur_list_id][_dim_id]['obs_freq_hint'] = []
self.peaks3D = 0
self.cur_spectral_dim = {}

Expand Down Expand Up @@ -751,31 +794,35 @@ def exitPeak_3d(self, ctx: NmrPipePKParser.Peak_3dContext):
if not self.hasPolySeq and not self.hasNonPolySeq:
return

if x_ppm is None or y_ppm is None or type != 1:
if x_ppm is None or y_ppm is None or z_ppm is None or type != 1:
self.peaks3D -= 1
return

dstFunc = self.validatePeak3D(index, x_ppm, y_ppm, z_ppm, None, None, None, None, None, None,
x_hz, y_hz, z_hz, xw_hz, yw_hz, zw_hz, height, dheight, vol)
x_hz, y_hz, z_hz, xw_hz, yw_hz, zw_hz, height, dheight, vol, None)

if dstFunc is None:
self.peaks3D -= 1
return

cur_spectral_dim = self.spectral_dim[self.num_of_dim][self.cur_list_id]

cur_spectral_dim[1]['freq_hint'].append(x_ppm)
cur_spectral_dim[2]['freq_hint'].append(y_ppm)
cur_spectral_dim[3]['freq_hint'].append(z_ppm)

if x_ppm is not None and x_hz is not None and cur_spectral_dim[1]['spectrometer_frequency'] is None\
and isinstance(cur_spectral_dim[1]['value_first_point'], float):
cur_spectral_dim[1]['freq_hint'].append(float(roundString(str(x_hz / (x_ppm - cur_spectral_dim[1]['value_first_point'])),
getMaxEffDigits([str(x_ppm), str(x_hz)]))))
cur_spectral_dim[1]['obs_freq_hint'].append(float(roundString(str(x_hz / (cur_spectral_dim[1]['value_first_point'] - x_ppm)),
getMaxEffDigits([str(x_ppm), str(x_hz)]))))
if y_ppm is not None and y_hz is not None and cur_spectral_dim[2]['spectrometer_frequency'] is None\
and isinstance(cur_spectral_dim[2]['value_first_point'], float):
cur_spectral_dim[2]['freq_hint'].append(float(roundString(str(y_hz / (y_ppm - cur_spectral_dim[2]['value_first_point'])),
getMaxEffDigits([str(y_ppm), str(y_hz)]))))
cur_spectral_dim[2]['obs_freq_hint'].append(float(roundString(str(y_hz / (cur_spectral_dim[2]['value_first_point'] - y_ppm)),
getMaxEffDigits([str(y_ppm), str(y_hz)]))))
if z_ppm is not None and z_hz is not None and cur_spectral_dim[3]['spectrometer_frequency'] is None\
and isinstance(cur_spectral_dim[3]['value_first_point'], float):
cur_spectral_dim[3]['freq_hint'].append(float(roundString(str(z_hz / (z_ppm - cur_spectral_dim[3]['value_first_point'])),
getMaxEffDigits([str(z_ppm), str(z_hz)]))))
cur_spectral_dim[3]['obs_freq_hint'].append(float(roundString(str(z_hz / (cur_spectral_dim[3]['value_first_point'] - z_ppm)),
getMaxEffDigits([str(z_ppm), str(z_hz)]))))

has_assignments = False

Expand Down Expand Up @@ -886,6 +933,7 @@ def enterPeak_list_4d(self, ctx: NmrPipePKParser.Peak_list_4dContext): # pylint
or _dim_id not in self.cur_spectral_dim
else self.cur_spectral_dim[_dim_id])
self.spectral_dim[self.num_of_dim][self.cur_list_id][_dim_id]['freq_hint'] = []
self.spectral_dim[self.num_of_dim][self.cur_list_id][_dim_id]['obs_freq_hint'] = []
self.peaks4D = 0
self.cur_spectral_dim = {}

Expand Down Expand Up @@ -961,35 +1009,40 @@ def exitPeak_4d(self, ctx: NmrPipePKParser.Peak_4dContext):
if not self.hasPolySeq and not self.hasNonPolySeq:
return

if x_ppm is None or y_ppm is None or type != 1:
if x_ppm is None or y_ppm is None or z_ppm is None or a_ppm is None or type != 1:
self.peaks4D -= 1
return

dstFunc = self.validatePeak4D(index, x_ppm, y_ppm, z_ppm, a_ppm, None, None, None, None, None, None, None, None,
x_hz, y_hz, z_hz, a_hz, xw_hz, yw_hz, zw_hz, aw_hz, height, dheight, vol)
x_hz, y_hz, z_hz, a_hz, xw_hz, yw_hz, zw_hz, aw_hz, height, dheight, vol, None)

if dstFunc is None:
self.peaks4D -= 1
return

cur_spectral_dim = self.spectral_dim[self.num_of_dim][self.cur_list_id]

cur_spectral_dim[1]['freq_hint'].append(x_ppm)
cur_spectral_dim[2]['freq_hint'].append(y_ppm)
cur_spectral_dim[3]['freq_hint'].append(z_ppm)
cur_spectral_dim[4]['freq_hint'].append(a_ppm)

if x_ppm is not None and x_hz is not None and cur_spectral_dim[1]['spectrometer_frequency'] is None\
and isinstance(cur_spectral_dim[1]['value_first_point'], float):
cur_spectral_dim[1]['freq_hint'].append(float(roundString(str(x_hz / (x_ppm - cur_spectral_dim[1]['value_first_point'])),
getMaxEffDigits([str(x_ppm), str(x_hz)]))))
cur_spectral_dim[1]['obs_freq_hint'].append(float(roundString(str(x_hz / (cur_spectral_dim[1]['value_first_point'] - x_ppm)),
getMaxEffDigits([str(x_ppm), str(x_hz)]))))
if y_ppm is not None and y_hz is not None and cur_spectral_dim[2]['spectrometer_frequency'] is None\
and isinstance(cur_spectral_dim[2]['value_first_point'], float):
cur_spectral_dim[2]['freq_hint'].append(float(roundString(str(y_hz / (y_ppm - cur_spectral_dim[2]['value_first_point'])),
getMaxEffDigits([str(y_ppm), str(y_hz)]))))
cur_spectral_dim[2]['obs_freq_hint'].append(float(roundString(str(y_hz / (cur_spectral_dim[2]['value_first_point'] - y_ppm)),
getMaxEffDigits([str(y_ppm), str(y_hz)]))))
if z_ppm is not None and z_hz is not None and cur_spectral_dim[3]['spectrometer_frequency'] is None\
and isinstance(cur_spectral_dim[3]['value_first_point'], float):
cur_spectral_dim[3]['freq_hint'].append(float(roundString(str(z_hz / (z_ppm - cur_spectral_dim[3]['value_first_point'])),
getMaxEffDigits([str(z_ppm), str(z_hz)]))))
cur_spectral_dim[3]['obs_freq_hint'].append(float(roundString(str(z_hz / (cur_spectral_dim[3]['value_first_point'] - z_ppm)),
getMaxEffDigits([str(z_ppm), str(z_hz)]))))
if a_ppm is not None and a_hz is not None and cur_spectral_dim[4]['spectrometer_frequency'] is None\
and isinstance(cur_spectral_dim[4]['value_first_point'], float):
cur_spectral_dim[4]['freq_hint'].append(float(roundString(str(a_hz / (a_ppm - cur_spectral_dim[4]['value_first_point'])),
getMaxEffDigits([str(a_ppm), str(a_hz)]))))
cur_spectral_dim[4]['obs_freq_hint'].append(float(roundString(str(a_hz / (cur_spectral_dim[4]['value_first_point'] - a_ppm)),
getMaxEffDigits([str(a_ppm), str(a_hz)]))))

has_assignments = False

Expand Down
Loading

0 comments on commit 3eb3572

Please sign in to comment.