-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathclass_orthologs.py
239 lines (206 loc) · 10.6 KB
/
class_orthologs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
# File Name: class_input.py
# Author: Hannah Nguyen
# Date: 9/22/22
# Description: orthologs class
from Bio import Seq, Entrez, SeqIO, Align
from Bio.SeqRecord import SeqRecord
import time
class theOrtholog:
def __init__(self, the_input, hit):
# content from the hit
self.hit = hit
# content from the input
self.json_file = the_input.json_file
self.ref_proteins = the_input.ref_proteins
self.target_clade = the_input.target_clade
self.blast_param = the_input.blast_param
self.entrez_param = the_input.entrez_param
# result from get_nuc_rec
self.nuc_rec = []
# result from get_nec_seq
self.nuc_seq = []
def get_nuc_rec(self):
print("Getting IPG nucelotide records for " + str(self.hit[1]) + "...")
print("\tFetching the protein records...")
# Fetches protein records for each hit
for i in range(self.entrez_param["retry_number"]):
try:
handle = Entrez.efetch(db="protein", id=self.hit[1], rettype="ipg",
retmode="xml")
records = Entrez.read(handle)
time.sleep(self.entrez_param["sleep_time"])
break
except:
print("\t\tNCBI exception raised on attempt " + str(i+1) + "\n\t\treattempting now...")
time.sleep(self.entrez_param["sleep_time"])
if i == (self.entrez_param["retry_number"] - 1):
print("\t\tCould not download record after " + str(self.entrez_param["retry_number"]) + " attempts")
print("\tNo gene records found for " + str(self.hit[1]))
return None
# The priority scores for the types of gene records available
# NC and ac = ref seq sequences (prefer those bc complete genomes)
# AE and CP = complete not as good
# everything else not as good
p_scores = {"NC_": 7, "AC_": 7,
"AE": 6, "CP": 6, "CY": 6,
"NZ_": 5, "NT_": 5, "NW_": 5,
"AAAA-AZZZ": 4,
"U": 3, "AF": 3, "AY": 3, "DQ": 3}
# Gene records are appended to this list
genome_list = []
print("\tRecording the gene records with priorities...")
if 'ProteinList' in records['IPGReport'].keys():
for idprotein in records['IPGReport']['ProteinList']:
# focuses on coding regions in the record
if 'CDSList' in idprotein.keys():
for cds in idprotein['CDSList']:
cds_acc = cds.attributes['accver']
cds_start = cds.attributes['start']
cds_stop = cds.attributes['stop']
cds_strand = cds.attributes['strand']
cds_scr = 0
# assign priority
for key in p_scores:
if cds_acc.startswith(key):
cds_scr = p_scores[key]
# create and append record
cds_rec = {'acc': cds_acc, 'start': cds_start,
'stop': cds_stop, 'strand': cds_strand,
'p_score': cds_scr}
genome_list.append(cds_rec)
else:
print("\tNo gene records found for " + self.hit[1])
return None
# Finds the genome with the max p-score
if len(genome_list) > 0:
max_p_record = genome_list[0]
for genome in genome_list:
if genome["p_score"] > max_p_record["p_score"]:
max_p_record = genome
print("Record Name: " + max_p_record['acc'])
print("Original Region: " + str(max_p_record['start']) + ", " + str(max_p_record['stop']))
print("\tReturning gene record for " + self.hit[1] + ". p-score: " +
str(max_p_record["p_score"]))
self.nuc_rec = max_p_record
return max_p_record
else:
print("\tNo gene records found for " + self.hit[1])
return None
def get_nuc_seq(self):
# if no record then we cant find seq
if self.nuc_rec is None:
print(self.nuc_rec)
print("\tRecord is not valid")
return None
print("Get nucleotide sequence for " + str(self.nuc_rec['acc']) + "...")
# grabs upstream region some parameter base pairs away from start to check for operons
if self.nuc_rec['strand'] == '+':
s_start = int(self.nuc_rec['start']) - self.blast_param["base_pairs_to_grab"]
if s_start <= 0:
s_start = 1
s_stop = int(self.nuc_rec['start'])
s_strand = 1
else:
s_stop = int(self.nuc_rec['stop']) + self.blast_param["base_pairs_to_grab"]
s_start = int(self.nuc_rec['stop'])
s_strand = 2
print("Region after going upstream: " + str(s_start) + ", " + str(s_stop))
print("\t\tGetting genbank record...")
# Fetch and read the annotated GenBank record
for i in range(self.entrez_param["retry_number"]):
try:
handle = Entrez.efetch(db="nuccore", id=self.nuc_rec['acc'], strand=s_strand,
seq_start=s_start, seq_stop=s_stop,
rettype='gbwithparts', retmode="text")
time.sleep(self.entrez_param["sleep_time"])
genome_record = SeqIO.read(handle, "genbank")
break
except:
print("\t\tNCBI exception raised on attempt " + str(i+1) + "\n\t\treattempting now...")
time.sleep(self.entrez_param["sleep_time"])
if i == (self.entrez_param["retry_number"] - 1):
print("\t\tCould not download record after " + str(self.entrez_param["retry_number"]) + " attempts")
sequence = genome_record.seq
# setting up return id
return_id = "p_" + str(self.nuc_rec['acc'])
#Setting up the description for the FASTA record
return_description = "Original_Query_Protein " + str(self.hit[0]) + " BLAST_Hit_Accession " + str(self.hit[1])
# If there is only one coding region in the selected sequence, then
# the sequence is returned unmodified.
gb_features = genome_record.features
if len(gb_features) == 1:
# Appends information to record description
print("\t\tOnly one coding regions found. Returning full sequence...")
return SeqRecord(Seq.Seq(sequence), id=return_id, description=return_description)
# If no coding intervals are indentified, None is returned.
elif len(gb_features) == 0:
print("\t\t|~> No coding intervals found for record: " + str(self.nuc_rec['acc']) + ".")
return None
genes = []
for i in range(len(gb_features)):
if gb_features[i].type == 'gene':
genes.append(gb_features[i])
upstream_regions = []
# starts at end of list of features since we are going upstream
gene = len(genes) - 1
# LATER make sure to implement a way to add 10,000 more when you dont reach the end
while gene > 0:
print("here")
if len(upstream_regions) == 0:
upstream_regions.append((genes[gene - 1].location.end, genes[gene].location.start))
elif genes[gene].location.strand == genes[gene - 1].location.strand:
if abs(genes[gene].location.start - genes[gene - 1].location.end) <= self.blast_param["min_intergenic_distance"]:
if abs(genes[gene].location.start - genes[gene - 1].location.end) > 0:
upstream_regions.append((genes[gene - 1].location.end, genes[gene].location.start))
else:
break
else:
break
gene -= 1
'''
else:
# starts at beginning of list of features since we are going upstream on reverse strand
gene = 0
# LATER make sure to implement a way to add 10,000 more when you dont reach the end
while gene < len(genes) - 1:
distance = genes[gene].location.start - genes[gene + 1].location.end
if len(upstream_regions) == 0:
upstream_regions.append((genes[gene + 1].location.end, genes[gene].location.start))
elif genes[gene].location.strand == genes[gene + 1].location.strand:
if distance <= self.blast_param["min_intergenic_distance"]:
print("region: ")
print(genes[gene+1].location.end, genes[gene].location.start)
print("distance: " + str(genes[gene].location.start - genes[gene + 1].location.end))
if (genes[gene].location.start - genes[gene + 1].location.end) > 0:
upstream_regions.append((genes[gene + 1].location.end, genes[gene].location.start))
else:
break
else:
break
gene += 1
'''
print("Upstream Regions to Take: ")
print(upstream_regions)
# get sequences of each upstream region
upstream_sequences = []
for i in range(len(upstream_regions)):
upstream_sequence = sequence
if sequence[upstream_regions[i][0]:upstream_regions[i][1]] != "":
upstream_sequence = sequence[upstream_regions[i][0]:upstream_regions[i][1]]
upstream_sequences.append(SeqRecord(Seq.Seq(upstream_sequence), id=return_id, description=return_description))
else:
upstream_sequence = sequence[upstream_regions[i][1]:upstream_regions[i][0]]
upstream_sequences.append(SeqRecord(Seq.Seq(upstream_sequence), id=return_id, description=return_description))
print(upstream_sequence)
self.nuc_seq = upstream_sequence
return upstream_sequence
def percent_similarity(self, ortholog2):
# percent_similarity = (matches / matches + mismatches)
aligner = Align.PairwiseAligner()
alignments = aligner.align(self.nuc_seq, ortholog2.nuc_seq)
alignment = alignments[0]
# counts number of matches and mismatches to show percent similarity
matches = alignment.counts().identities
mismatches = alignment.counts().mismatches
per_similarity = (matches / (matches+mismatches))
return per_similarity