-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathassembly_stats_global.py
147 lines (123 loc) · 5.42 KB
/
assembly_stats_global.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Purpose
-------
Get the basic statistics from a RAW assembly.
Outputs the statistics for all the contigs in the assembly and for the contigs with over 1000pb into a tsv and json file.
Assembly metrics collected:
* Assembler - assembler name
* Contigs - number of contigs in the assembly
* basepairs - total number of nucleotides in the assembly
* Max contig size - size of the largest contig in the assembly
* n50 - sequence length of the shortest contig at 50% of the total assembly length
* contigs>1000bp (%) - number of contigs with size >= 1000 bp (and % over "Contigs")
* bp in contigs>1000bp (%) - total number of nucleotides in contigs with size >= 1000 bp (and % over "basepairs")
* n50 in contigs>1000bp - sequence length of the shortest contig at 50% of the total length of contigs with ´
size >= 1000 bp
Expected input
--------------
The following variables are expected whether using NextFlow or the
:py:func:`main` executor.
- ``sample_id``: Sample Identification string.
- e.g.: ``'SampleA'``
- ``assembler``: String with assembler name.
- e.g.: ``'SPAdes'``
- ``assembly``: fasta file from the assembler
- e.g.: ``'spades.fasta'``
- ``min_len``: value for minimum contig length
- e.g.: ``'1000'``
Generated output
----------------
- ``.report.json``: Data structure for the report
Authorship
----------
Inês Mendes, [email protected]
https://github.com/cimendes
"""
import os
import json
import re
try:
import utils
except ImportError:
from templates import utils
__version__ = "0.0.1"
__build__ = "22.10.2020"
__template__ = "ASSEMBLY_STATS_GLOBAL-nf"
logger = utils.get_logger(__file__)
if __file__.endswith(".command.sh"):
SAMPLE_ID = '$sample_id'
ASSEMBLER = '$assembler'
ASSEMBLY = '$assembly'
READ_MAPPING_STATS = '$read_mapping'
MIN_LEN = "$params.minLength"
N_TARGET = float("$params.n_target")
logger.debug("Running {} with parameters:".format(
os.path.basename(__file__)))
logger.debug("SAMPLE_ID: {}".format(SAMPLE_ID))
logger.debug("ASSEMBLER: {}".format(ASSEMBLER))
logger.debug("ASSEMBLY: {}".format(ASSEMBLY))
logger.debug("READ_MAPPING_STATS: {}".format(READ_MAPPING_STATS))
logger.debug("MIN_LEN: {}".format(MIN_LEN))
logger.debug("N_TARGET: {}".format(N_TARGET))
def get_contig_lists(fasta, min_len):
"""
From a fasta iterator, get lists with contig lengths
:param fasta: yield tuples of header, sequence
:param min_len: minimum contig lenght
:return:
- contig_len: list with all contig lenghts in the assembly
- contigs_len_over_1000: list with contig lenghts filtered for a minimum of 1000 nucleotides
"""
contigs_len_over_1000 = []
contigs_len = []
Ns_all = 0
Ns_over_1000 = 0
for header, seq in fasta:
Ns_all += len(re.findall("N", seq.upper()))
if len(seq) > int(min_len):
contigs_len_over_1000.append(len(seq))
Ns_over_1000 += len(re.findall("N", seq.upper()))
contigs_len.append(len(seq))
return contigs_len, contigs_len_over_1000, Ns_all, Ns_over_1000
def main(sample_id, assembler, assembly, read_mapping_stats, min_len, n_target):
contigs, contigs_over_min_len, Ns_all, Ns_over_1000 = get_contig_lists(utils.fasta_iter(assembly), min_len)
n50_contigs = utils.get_Nx(contigs, n_target)
n50_contigs_over_min_len = utils.get_Nx(contigs_over_min_len, n_target)
# get read mapping stats
with open(read_mapping_stats) as f:
assembly_stats_json = json.load(f)
if assembly_stats_json[sample_id]["assembler"] == assembler:
mapped_reads_all = assembly_stats_json[sample_id]["mapped_reads_original"]
mapped_reads_filtered = assembly_stats_json[sample_id]["mapped_reads_filtered"]
else:
mapped_reads_all = 0
mapped_reads_filtered = 0
logger.error(assembly_stats_json)
with open("{}_{}_report.json".format(sample_id, assembler), "w") as json_report:
json_dic = {
"assembler": assembler,
"sample_id": sample_id,
"global": {
"contigs": len(contigs),
"basepairs": sum(contigs),
"max_contig_size": max(contigs) if len(contigs) > 0 else 0,
"N{}".format(int(n_target*100)): n50_contigs,
"mapped_reads": mapped_reads_all,
"Ns": Ns_all},
"filtered": {
"contigs": len(contigs_over_min_len),
"basepairs": sum(contigs_over_min_len),
"N{}".format(int(n_target*100)): n50_contigs_over_min_len,
"mapped_reads": mapped_reads_filtered,
"Ns": Ns_over_1000}
}
json_report.write(json.dumps(json_dic, separators=(",", ":")))
with open(sample_id + '_' + assembler + "_global_assembly_stats_global.csv", "w") as cvs_file:
cvs_file.write(','.join([assembler, f'{len(contigs)}', f'{sum(contigs)}',
f'{max(contigs)if len(contigs) > 0 else 0 }', f'{n50_contigs}',
f'{len(contigs_over_min_len)}', f'{sum(contigs_over_min_len)}',
f'{n50_contigs_over_min_len}']))
if __name__ == '__main__':
main(SAMPLE_ID, ASSEMBLER, ASSEMBLY,READ_MAPPING_STATS, MIN_LEN, N_TARGET)