-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathVCFtoFASTA_onebyone.py
133 lines (112 loc) · 3.67 KB
/
VCFtoFASTA_onebyone.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
"""A script to create a fasta file of one sample from a vcf"""
import sys
import os
import random
import gzip
# Input file - should be in vcf format, can me .gz or uncompress
bos_file = sys.argv[1]
# If you run with only an input file, will print sample names and exit
# Second argument should be the sample name you want a fasta for
if len(sys.argv) > 2:
sample_name = sys.argv[2]
else:
sample_name = None
## Useful for running a quick trial
## Stops after cutoff number of sites
cutoff = 500000
cutoff_on = False
# Skip mitochondrial information
## TODO should be filtered before instead of hard coded here
mito_tag = "NC_006853.1"
sep = "/"
def vcf_call_to_bases(ref, alt, call):
assert(len(ref)==1), "ref is {}, why not caught as INDEL?????".format(ref)
geno = call.replace('0', ref).replace('1',alt).replace('.','?')
#todo: handle ./.?
return geno
def random_base(geno):
nucleotides=["A","T","C","G","?"]
index = random.randint(1,2)
assert(geno[0] in nucleotides), "geno 0 is {}".format(geno[0])
assert(geno[2] in nucleotides), "geno 0 is {}".format(geno[2])
if index == 1:
return geno[0]
elif index == 2:
return geno[2]
else:
print("ERRROORRRR")
index_dict = {} # tracks the names of the samples
sequence_dict = {'reference':''} #Stores the data (memory HOG)
print("About to open file")
if 'gz' in bos_file:
infi = gzip.open(bos_file, 'r')
else:
infi = open(bos_file, 'r')
#infi = gzip.open(bos_file,"r")
print("Finished open file")
for line in infi:
if line.startswith('##'):
pass
elif line.startswith('#'):
headers = line.split()
print(headers)
for i, val in enumerate(headers):
index_dict[val]=i
break
infi.close()
def get_sequence_for_sample(sample_name, bos_file):
sample_index = index_dict[sample_name]
sample_sequence = ''
if 'gz' in bos_file:
infi = gzip.open(bos_file, 'r')
else:
infi = open(bos_file, 'r')
counter = 0
outfile = open("{}.fasta".format(sample_name),'w')
outfile.write("> {}\n".format(sample_name))
pos = 0
for line in infi:
counter += 1
if(counter >= cutoff and cutoff_on):
break
if line.startswith('#'):
pass
else:
# if ".%s." % (sep) in line:
# print("skipping line {} due to missing data".format(counter))
# print(line)
# continue
if mito_tag in line:
continue
elif "INDEL" in line:
# print("skipping line {} due to indel".format(counter))
continue
vcfrow = line.split()
assert(len(vcfrow)>4), vcfrow
ref = vcfrow[3]
alt = vcfrow[4]
if len(alt) > 1: #Skips multi allele, or indels
continue
pos += 1
if sample_name == 'REF':
base_call = ref
else:
# if(nucleotides.count(vcfrow[3])!=1 or nucleotides.count(vcfrow[4])!=1):
# continue
call = vcfrow[sample_index].split(":")[0]
assert(len(ref)==1), ref
assert(len(alt)==1), alt
assert(len(call)== 3), call
geno_call = vcf_call_to_bases(ref, alt, call)
assert(len(geno_call)==3), geno_call
base_call = random_base(geno_call)
outfile.write(base_call)
if(pos % 80 == 0):
outfile.write('\n')
if(counter % 10000==0):
print(counter)
outfile.write('\n')
outfile.close()
infi.close()
if sample_name:
get_sequence_for_sample(sample_name, bos_file)