diff --git a/README.md b/README.md index fd5191c..3cbee7a 100644 --- a/README.md +++ b/README.md @@ -1 +1,10 @@ Please refer to https://github.com/PacificBiosciences/FALCON_unzip/wiki + +Needs: + +* blasr +* samtools +* nucmer +* variantCaller.py + +Tested with smrttools-4.1.0 diff --git a/falcon_unzip/dedup_h_tigs.py b/falcon_unzip/dedup_h_tigs.py index 61bd8c3..9674958 100644 --- a/falcon_unzip/dedup_h_tigs.py +++ b/falcon_unzip/dedup_h_tigs.py @@ -20,7 +20,7 @@ def main(args): row = row.strip().split() q_cov = float(row[10]) idt = float(row[6]) - if q_cov > 99 and idt > 99: + if q_cov > 99 and idt > 99.9: filter_out.add(row[-1]) p_ctg_to_phase = {} diff --git a/falcon_unzip/phasing.py b/falcon_unzip/phasing.py index 6bd3151..30aefd5 100644 --- a/falcon_unzip/phasing.py +++ b/falcon_unzip/phasing.py @@ -17,11 +17,14 @@ def make_het_call(self): ctg_id = self.parameters["ctg_id"] ref_seq = self.parameters["ref_seq"] base_dir = self.parameters["base_dir"] + samtools = self.parameters["samtools"] vmap_fn = fn(self.vmap_file) vpos_fn = fn(self.vpos_file) q_id_map_fn = fn(self.q_id_map_file) - p = subprocess.Popen(shlex.split("samtools view %s %s" % (bam_fn, ctg_id) ), stdout=subprocess.PIPE) + + # maybe we should check if the samtools path is valid + p = subprocess.Popen(shlex.split("%s view %s %s" % (samtools, bam_fn, ctg_id) ), stdout=subprocess.PIPE) pileup = {} q_id_map = {} q_max_id = 0 @@ -75,13 +78,12 @@ def make_het_call(self): adv = int(m.group(1)) if m.group(2) == "S": qp += adv - if m.group(2) == "M": + if m.group(2) in ("M", "=", "X"): matches = [] for i in range(adv): matches.append( (rp, SEQ[qp]) ) rp += 1 qp += 1 - matches = matches[1:-1] for pos, b in matches: pileup.setdefault(pos, {}) pileup[pos].setdefault(b, []) @@ -482,6 +484,7 @@ def phasing(args): fasta_fn = args.fasta ctg_id = args.ctg_id base_dir = args.base_dir + samtools = args.samtools ref_seq = "" for r in FastaReader(fasta_fn): @@ -502,6 +505,7 @@ def phasing(args): parameters["ctg_id"] = ctg_id parameters["ref_seq"] = ref_seq parameters["base_dir"] = base_dir + parameters["samtools"] = samtools make_het_call_task = PypeTask( inputs = { "bam_file": bam_file }, outputs = { "vmap_file": vmap_file, "vpos_file": vpos_file, "q_id_map_file": q_id_map_file }, @@ -559,6 +563,7 @@ def parse_args(argv): parser.add_argument('--fasta', type=str, help='path to the fasta file of contain the contig', required=True) parser.add_argument('--ctg_id', type=str, help='contig identifier in the bam file', required=True) parser.add_argument('--base_dir', type=str, default="./", help='the output base_dir, default to current working directory') + parser.add_argument('--samtools', type=str, default="samtools", help='path to samtools') args = parser.parse_args(argv[1:]) diff --git a/falcon_unzip/rr_hctg_track.py b/falcon_unzip/rr_hctg_track.py index 0028a21..18e7f26 100644 --- a/falcon_unzip/rr_hctg_track.py +++ b/falcon_unzip/rr_hctg_track.py @@ -175,7 +175,7 @@ def parse_args(argv): parser = argparse.ArgumentParser(description='scan the raw read overlap information to identify the best hit from the reads \ to the contigs with read_to_contig_map generated by `fc_get_read_hctg_map`. Write rawread_ids.', formatter_class=argparse.ArgumentDefaultsHelpFormatter) - parser.add_argument('--n-core', type=int, default=4, + parser.add_argument('--n-core', type=int, default=48, help='number of processes used for for tracking reads; ' '0 for main process only') #parser.add_argument('--fofn', type=str, help='file contains the path of all LAS file to be processed in parallel') diff --git a/falcon_unzip/run_quiver.py b/falcon_unzip/run_quiver.py index 642f461..881fb51 100644 --- a/falcon_unzip/run_quiver.py +++ b/falcon_unzip/run_quiver.py @@ -62,7 +62,7 @@ def task_track_reads(self): def task_run_quiver(self): ref_fasta = fn(self.ref_fasta) - read_sam = fn(self.read_sam) + read_bam = fn(self.read_bam) cns_fasta = fn(self.cns_fasta) cns_fastq = fn(self.cns_fastq) @@ -86,13 +86,11 @@ def task_run_quiver(self): date {samtools} faidx {ref_fasta} -{samtools} view -b -S {read_sam} > {ctg_id}.bam {pbalign} --tmpDir=/localdisk/scratch/ --nproc=24 --minAccuracy=0.75 --minLength=50\ --minAnchorSize=12 --maxDivergence=30 --concordant --algorithm=blasr\ - --algorithmOptions=-useQuality --maxHits=1 --hitPolicy=random --seed=1\ - {ctg_id}.bam {ref_fasta} aln-{ctg_id}.bam -#{makePbi} --referenceFasta {ref_fasta} aln-{ctg_id}.bam -({variantCaller} -x 5 -X 120 -q 20 -j 24 -r {ref_fasta} aln-{ctg_id}.bam\ + --algorithmOptions=--useQuality --maxHits=1 --hitPolicy=random --seed=1\ + {read_bam} {ref_fasta} aln-{ctg_id}.bam +({variantCaller} --algorithm=arrow -x 5 -X 120 -q 20 -j 24 -r {ref_fasta} aln-{ctg_id}.bam\ -o {cns_fasta} -o {cns_fastq}) || echo quvier failed date touch {job_done} @@ -125,11 +123,14 @@ def task_cns_zcat(self): cns_fasta_fn, cns_fastq_fn = line.split() system('zcat {cns_fasta_fn} >> {cns_p_ctg_fasta_fn}'.format(**locals())) system('zcat {cns_fastq_fn} >> {cns_p_ctg_fastq_fn}'.format(**locals())) - with open(gathered_p_ctg_fn) as ifs: - for line in ifs: - cns_fasta_fn, cns_fastq_fn = line.split() - rm(cns_fasta_fn) - rm(cns_fasta_fn) + + # comment out this for now for recovering purpose + #with open(gathered_p_ctg_fn) as ifs: + # for line in ifs: + # cns_fasta_fn, cns_fastq_fn = line.split() + # rm(cns_fasta_fn) + # rm(cns_fasta_fn) + rm(cns_h_ctg_fasta_fn) touch(cns_h_ctg_fasta_fn) rm(cns_h_ctg_fastq_fn) @@ -139,11 +140,13 @@ def task_cns_zcat(self): cns_fasta_fn, cns_fastq_fn = line.split() system('zcat {cns_fasta_fn} >> {cns_h_ctg_fasta_fn}'.format(**locals())) system('zcat {cns_fastq_fn} >> {cns_h_ctg_fastq_fn}'.format(**locals())) - with open(gathered_h_ctg_fn) as ifs: - for line in ifs: - cns_fasta_fn, cns_fastq_fn = line.split() - rm(cns_fasta_fn) - rm(cns_fasta_fn) + + # comment out this for now for recovering purpose + #with open(gathered_h_ctg_fn) as ifs: + # for line in ifs: + # cns_fasta_fn, cns_fastq_fn = line.split() + # rm(cns_fasta_fn) + # rm(cns_fasta_fn) touch(job_done_fn) @@ -152,7 +155,7 @@ def task_scatter_quiver(self): h_ctg_fn = fn(self.h_ctg_fa) out_json = fn(self.scattered_quiver_json) track_reads_h_done_fn = fn(self.track_reads_h_done) - sam_dir = os.path.dirname(track_reads_h_done_fn) + bam_dir = os.path.dirname(track_reads_h_done_fn) config = self.parameters['config'] ref_seq_data = {} @@ -183,12 +186,12 @@ def task_scatter_quiver(self): m_ctg_id = ctg_id.split('-')[0] wd = os.path.join(os.getcwd(), m_ctg_id) ref_fasta = os.path.join(wd, '{ctg_id}_ref.fa'.format(ctg_id = ctg_id)) - read_sam = os.path.join(sam_dir, '{ctg_id}.sam'.format(ctg_id = ctg_id)) + read_bam = os.path.join(bam_dir, '{ctg_id}.bam'.format(ctg_id = ctg_id)) #cns_fasta = makePypeLocalFile(os.path.join(wd, 'cns-{ctg_id}.fasta.gz'.format(ctg_id = ctg_id))) #cns_fastq = makePypeLocalFile(os.path.join(wd, 'cns-{ctg_id}.fastq.gz'.format(ctg_id = ctg_id))) #job_done = makePypeLocalFile(os.path.join(wd, '{ctg_id}_quiver_done'.format(ctg_id = ctg_id))) - if os.path.exists(read_sam): + if os.path.exists(read_bam): # *.sam are created in task_track_reads, fc_select_reads_from_bam.py # Network latency should not matter because we have already waited for the 'done' file. mkdir(wd) @@ -204,7 +207,7 @@ def task_scatter_quiver(self): new_job['smrt_bin'] = config['smrt_bin'] new_job['sge_option'] = config['sge_quiver'] new_job['ref_fasta'] = ref_fasta - new_job['read_sam'] = read_sam + new_job['read_bam'] = read_bam jobs.append(new_job) open(out_json, 'w').write(json.dumps(jobs)) @@ -221,16 +224,16 @@ def create_quiver_jobs(wf, scattered_quiver_plf): smrt_bin = job['smrt_bin'] sge_option = job['sge_option'] ref_fasta = makePypeLocalFile(job['ref_fasta']) - read_sam = makePypeLocalFile(job['read_sam']) + read_bam = makePypeLocalFile(job['read_bam']) m_ctg_id = ctg_id.split('-')[0] wd = os.path.join(os.getcwd(), './4-quiver/', m_ctg_id) #ref_fasta = makePypeLocalFile(os.path.join(wd, '{ctg_id}_ref.fa'.format(ctg_id = ctg_id))) - #read_sam = makePypeLocalFile(os.path.join(os.getcwd(), './4-quiver/reads/' '{ctg_id}.sam'.format(ctg_id = ctg_id))) + #read_bam = makePypeLocalFile(os.path.join(os.getcwd(), './4-quiver/reads/' '{ctg_id}.sam'.format(ctg_id = ctg_id))) cns_fasta = makePypeLocalFile(os.path.join(wd, 'cns-{ctg_id}.fasta.gz'.format(ctg_id = ctg_id))) cns_fastq = makePypeLocalFile(os.path.join(wd, 'cns-{ctg_id}.fastq.gz'.format(ctg_id = ctg_id))) job_done = makePypeLocalFile(os.path.join(wd, '{ctg_id}_quiver_done'.format(ctg_id = ctg_id))) - if os.path.exists(fn(read_sam)): # TODO(CD): Ask Jason what we should do if missing SAM. + if os.path.exists(fn(read_bam)): # TODO(CD): Ask Jason what we should do if missing SAM. if ctg_types[ctg_id] == 'p': p_ctg_out.append( (fn(cns_fasta), fn(cns_fastq)) ) elif ctg_types[ctg_id] == 'h': @@ -243,7 +246,7 @@ def create_quiver_jobs(wf, scattered_quiver_plf): 'smrt_bin': smrt_bin, 'sge_option': sge_option, } - make_quiver_task = PypeTask(inputs = {'ref_fasta': ref_fasta, 'read_sam': read_sam, + make_quiver_task = PypeTask(inputs = {'ref_fasta': ref_fasta, 'read_bam': read_bam, 'scattered_quiver': scattered_quiver_plf, }, outputs = {'cns_fasta': cns_fasta, 'cns_fastq': cns_fastq, 'job_done': job_done}, diff --git a/falcon_unzip/select_reads_from_bam.py b/falcon_unzip/select_reads_from_bam.py index cd6dcdc..66a80c2 100644 --- a/falcon_unzip/select_reads_from_bam.py +++ b/falcon_unzip/select_reads_from_bam.py @@ -47,8 +47,10 @@ def abs_fn(maybe_rel_fn): else: header['RG'].extend(samfile.header['RG']) samfile.close() - - PG = header.pop('PG') #remove PG line as there might be a bug that generates no readable chrs + try: + PG = header.pop('PG') #remove PG line as there might be a bug that generates no readable chrs + except KeyError: + pass #print PG #base_dir = os.getcwd() @@ -80,9 +82,9 @@ def abs_fn(maybe_rel_fn): #print "Not selected:", ctg continue if ctg not in outfile: - samfile_fn = os.path.join(sam_dir, '%s.sam' % ctg) + samfile_fn = os.path.join(sam_dir, '%s.bam' % ctg) print >>sys.stderr, 'samfile_fn:{!r}'.format(samfile_fn) - outfile[ctg] = pysam.AlignmentFile(samfile_fn, 'wh', header=header) + outfile[ctg] = pysam.AlignmentFile(samfile_fn, 'wb', header=header) outfile[ctg].write(r) samfile.close() diff --git a/falcon_unzip/unzip.py b/falcon_unzip/unzip.py index fa75288..1cdd2d6 100644 --- a/falcon_unzip/unzip.py +++ b/falcon_unzip/unzip.py @@ -83,12 +83,13 @@ def task_run_blasr(self): hostname date cd {wd} -time {blasr} {read_fasta} {ref_fasta} -noSplitSubreads -clipping subread\ - -hitPolicy randombest -randomSeed 42 -bestn 1 -minPctIdentity 70.0\ - -minMatch 12 -nproc 24 -sam -out tmp_aln.sam -{samtools} view -bS tmp_aln.sam | {samtools} sort - {ctg_id}_sorted +time {blasr} {read_fasta} {ref_fasta} --noSplitSubreads --clipping subread\ + --hitPolicy randombest --randomSeed 42 --bestn 1 --minPctIdentity 70.0\ + --minMatch 12 --nproc 24 --bam --out tmp_aln.bam +#{samtools} view -bS tmp_aln.sam | {samtools} sort - {ctg_id}_sorted +{samtools} sort tmp_aln.bam -o {ctg_id}_sorted.bam {samtools} index {ctg_id}_sorted.bam -rm tmp_aln.sam +rm tmp_aln.bam date touch {job_done} """.format(**locals()) @@ -108,7 +109,9 @@ def task_phasing(self): wd = self.parameters['wd'] ctg_id = self.parameters['ctg_id'] - #config = self.parameters['config'] + config = self.parameters['config'] + smrt_bin = config['smrt_bin'] + samtools = os.path.join(smrt_bin, 'samtools') script_fn = os.path.join(wd , 'p_%s.sh' % (ctg_id)) @@ -119,7 +122,7 @@ def task_phasing(self): hostname date cd {wd} -fc_phasing.py --bam {aln_bam} --fasta {ref_fasta} --ctg_id {ctg_id} --base_dir .. +fc_phasing.py --bam {aln_bam} --fasta {ref_fasta} --ctg_id {ctg_id} --base_dir .. --samtools {samtools} fc_phasing_readmap.py --ctg_id {ctg_id} --read_map_dir ../../../2-asm-falcon/read_maps --phased_reads phased_reads date touch {job_done} @@ -147,7 +150,7 @@ def task_hasm(self): date cd {wd} -fc_ovlp_filter_with_phase.py --fofn {las_fofn} --max_diff 120 --max_cov 120 --min_cov 1 --n_core 12 --min_len 2500 --db ../../1-preads_ovl/preads.db --rid_phase_map {rid_to_phase_all} > preads.p_ovl +fc_ovlp_filter_with_phase.py --fofn {las_fofn} --max_diff 120 --max_cov 120 --min_cov 1 --n_core 48 --min_len 2500 --db ../../1-preads_ovl/preads.db --rid_phase_map {rid_to_phase_all} > preads.p_ovl fc_phased_ovlp_to_graph.py preads.p_ovl --min_len 2500 > fc.log if [ -e ../../1-preads_ovl/preads4falcon.fasta ]; then @@ -186,9 +189,10 @@ def task_hasm(self): self.generated_script_fn = script_fn def unzip_all(config): - unzip_concurrent_jobs = config['unzip_concurrent_jobs'] + unzip_blasr_concurrent_jobs = config['unzip_blasr_concurrent_jobs'] + unzip_phasing_concurrent_jobs = config['unzip_phasing_concurrent_jobs'] wf = PypeProcWatcherWorkflow( - max_jobs=unzip_concurrent_jobs, + max_jobs=unzip_blasr_concurrent_jobs, job_type=config['job_type'], job_queue=config.get('job_queue'), sge_option=config.get('sge_option'), @@ -246,6 +250,19 @@ def unzip_all(config): blasr_task = make_blasr_task(task_run_blasr) aln1_outs[ctg_id] = (ctg_aln_out, job_done) wf.addTask(blasr_task) + wf.refreshTargets() + + wf.max_jobs = unzip_phasing_concurrent_jobs + for ctg_id in ctg_ids: + # inputs + ref_fasta = makePypeLocalFile('./3-unzip/reads/{ctg_id}_ref.fa'.format(ctg_id = ctg_id)) + read_fasta = makePypeLocalFile('./3-unzip/reads/{ctg_id}_reads.fa'.format(ctg_id = ctg_id)) + + # outputs + wd = os.path.join(os.getcwd(), './3-unzip/0-phasing/{ctg_id}/'.format(ctg_id = ctg_id)) + + blasr_dir = os.path.join(wd, 'blasr') + ctg_aln_out = makePypeLocalFile(os.path.join(blasr_dir, '{ctg_id}_sorted.bam'.format(ctg_id = ctg_id))) phasing_dir = os.path.join(wd, 'phasing') job_done = makePypeLocalFile(os.path.join(phasing_dir, 'p_{ctg_id}_done'.format(ctg_id = ctg_id))) @@ -261,7 +278,6 @@ def unzip_all(config): ) phasing_task = make_phasing_task(task_phasing) wf.addTask(phasing_task) - wf.refreshTargets() hasm_wd = os.path.abspath('./3-unzip/1-hasm/') @@ -343,9 +359,13 @@ def main(argv=sys.argv): if config.has_option('Unzip', 'sge_track_reads'): sge_track_reads = config.get('Unzip', 'sge_track_reads') - unzip_concurrent_jobs = 8 - if config.has_option('Unzip', 'unzip_concurrent_jobs'): - unzip_concurrent_jobs = config.getint('Unzip', 'unzip_concurrent_jobs') + unzip_blasr_concurrent_jobs = 8 + if config.has_option('Unzip', 'unzip_blasr_concurrent_jobs'): + unzip_blasr_concurrent_jobs = config.getint('Unzip', 'unzip_blasr_concurrent_jobs') + + unzip_phasing_concurrent_jobs = 8 + if config.has_option('Unzip', 'unzip_phasing_concurrent_jobs'): + unzip_phasing_concurrent_jobs = config.getint('Unzip', 'unzip_phasing_concurrent_jobs') config = {'job_type': job_type, 'job_queue': job_queue, @@ -354,7 +374,8 @@ def main(argv=sys.argv): 'sge_phasing': sge_phasing, 'sge_hasm': sge_hasm, 'sge_track_reads': sge_track_reads, - 'unzip_concurrent_jobs': unzip_concurrent_jobs, + 'unzip_blasr_concurrent_jobs': unzip_blasr_concurrent_jobs, + 'unzip_phasing_concurrent_jobs': unzip_phasing_concurrent_jobs, 'pwatcher_type': pwatcher_type, } diff --git a/setup.py b/setup.py index b54466c..bde842a 100755 --- a/setup.py +++ b/setup.py @@ -11,10 +11,12 @@ scripts = glob.glob("src/py_scripts/*.py") setup(name='falcon_unzip', - version='0.1.0', + version='0.4.0', description='Falcon unzip', author='Jason Chin', author_email='jchin@pacificbiosciences.com', + maintainer='Christopher Dunn', + maintainer_email='pb.cdunn@gmail.com', packages=['falcon_unzip'], package_dir={'falcon_unzip':'falcon_unzip/'}, scripts = scripts,