Skip to content
This repository has been archived by the owner on Mar 16, 2022. It is now read-only.

Commit

Permalink
Merge pull request #1 from PacificBiosciences/bam
Browse files Browse the repository at this point in the history
Update for smrttools-4.1.0
  • Loading branch information
Christopher Dunn authored Dec 21, 2016
2 parents 83dfd69 + f6cc4ce commit 7ebc99c
Show file tree
Hide file tree
Showing 8 changed files with 91 additions and 49 deletions.
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1 +1,10 @@
Please refer to https://github.com/PacificBiosciences/FALCON_unzip/wiki

Needs:

* blasr
* samtools
* nucmer
* variantCaller.py

Tested with smrttools-4.1.0
2 changes: 1 addition & 1 deletion falcon_unzip/dedup_h_tigs.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def main(args):
row = row.strip().split()
q_cov = float(row[10])
idt = float(row[6])
if q_cov > 99 and idt > 99:
if q_cov > 99 and idt > 99.9:
filter_out.add(row[-1])

p_ctg_to_phase = {}
Expand Down
11 changes: 8 additions & 3 deletions falcon_unzip/phasing.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@ def make_het_call(self):
ctg_id = self.parameters["ctg_id"]
ref_seq = self.parameters["ref_seq"]
base_dir = self.parameters["base_dir"]
samtools = self.parameters["samtools"]
vmap_fn = fn(self.vmap_file)
vpos_fn = fn(self.vpos_file)
q_id_map_fn = fn(self.q_id_map_file)

p = subprocess.Popen(shlex.split("samtools view %s %s" % (bam_fn, ctg_id) ), stdout=subprocess.PIPE)

# maybe we should check if the samtools path is valid
p = subprocess.Popen(shlex.split("%s view %s %s" % (samtools, bam_fn, ctg_id) ), stdout=subprocess.PIPE)
pileup = {}
q_id_map = {}
q_max_id = 0
Expand Down Expand Up @@ -75,13 +78,12 @@ def make_het_call(self):
adv = int(m.group(1))
if m.group(2) == "S":
qp += adv
if m.group(2) == "M":
if m.group(2) in ("M", "=", "X"):
matches = []
for i in range(adv):
matches.append( (rp, SEQ[qp]) )
rp += 1
qp += 1
matches = matches[1:-1]
for pos, b in matches:
pileup.setdefault(pos, {})
pileup[pos].setdefault(b, [])
Expand Down Expand Up @@ -482,6 +484,7 @@ def phasing(args):
fasta_fn = args.fasta
ctg_id = args.ctg_id
base_dir = args.base_dir
samtools = args.samtools

ref_seq = ""
for r in FastaReader(fasta_fn):
Expand All @@ -502,6 +505,7 @@ def phasing(args):
parameters["ctg_id"] = ctg_id
parameters["ref_seq"] = ref_seq
parameters["base_dir"] = base_dir
parameters["samtools"] = samtools

make_het_call_task = PypeTask( inputs = { "bam_file": bam_file },
outputs = { "vmap_file": vmap_file, "vpos_file": vpos_file, "q_id_map_file": q_id_map_file },
Expand Down Expand Up @@ -559,6 +563,7 @@ def parse_args(argv):
parser.add_argument('--fasta', type=str, help='path to the fasta file of contain the contig', required=True)
parser.add_argument('--ctg_id', type=str, help='contig identifier in the bam file', required=True)
parser.add_argument('--base_dir', type=str, default="./", help='the output base_dir, default to current working directory')
parser.add_argument('--samtools', type=str, default="samtools", help='path to samtools')


args = parser.parse_args(argv[1:])
Expand Down
2 changes: 1 addition & 1 deletion falcon_unzip/rr_hctg_track.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def parse_args(argv):
parser = argparse.ArgumentParser(description='scan the raw read overlap information to identify the best hit from the reads \
to the contigs with read_to_contig_map generated by `fc_get_read_hctg_map`. Write rawread_ids.',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--n-core', type=int, default=4,
parser.add_argument('--n-core', type=int, default=48,
help='number of processes used for for tracking reads; '
'0 for main process only')
#parser.add_argument('--fofn', type=str, help='file contains the path of all LAS file to be processed in parallel')
Expand Down
51 changes: 27 additions & 24 deletions falcon_unzip/run_quiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def task_track_reads(self):

def task_run_quiver(self):
ref_fasta = fn(self.ref_fasta)
read_sam = fn(self.read_sam)
read_bam = fn(self.read_bam)

cns_fasta = fn(self.cns_fasta)
cns_fastq = fn(self.cns_fastq)
Expand All @@ -86,13 +86,11 @@ def task_run_quiver(self):
date
{samtools} faidx {ref_fasta}
{samtools} view -b -S {read_sam} > {ctg_id}.bam
{pbalign} --tmpDir=/localdisk/scratch/ --nproc=24 --minAccuracy=0.75 --minLength=50\
--minAnchorSize=12 --maxDivergence=30 --concordant --algorithm=blasr\
--algorithmOptions=-useQuality --maxHits=1 --hitPolicy=random --seed=1\
{ctg_id}.bam {ref_fasta} aln-{ctg_id}.bam
#{makePbi} --referenceFasta {ref_fasta} aln-{ctg_id}.bam
({variantCaller} -x 5 -X 120 -q 20 -j 24 -r {ref_fasta} aln-{ctg_id}.bam\
--algorithmOptions=--useQuality --maxHits=1 --hitPolicy=random --seed=1\
{read_bam} {ref_fasta} aln-{ctg_id}.bam
({variantCaller} --algorithm=arrow -x 5 -X 120 -q 20 -j 24 -r {ref_fasta} aln-{ctg_id}.bam\
-o {cns_fasta} -o {cns_fastq}) || echo quvier failed
date
touch {job_done}
Expand Down Expand Up @@ -125,11 +123,14 @@ def task_cns_zcat(self):
cns_fasta_fn, cns_fastq_fn = line.split()
system('zcat {cns_fasta_fn} >> {cns_p_ctg_fasta_fn}'.format(**locals()))
system('zcat {cns_fastq_fn} >> {cns_p_ctg_fastq_fn}'.format(**locals()))
with open(gathered_p_ctg_fn) as ifs:
for line in ifs:
cns_fasta_fn, cns_fastq_fn = line.split()
rm(cns_fasta_fn)
rm(cns_fasta_fn)

# comment out this for now for recovering purpose
#with open(gathered_p_ctg_fn) as ifs:
# for line in ifs:
# cns_fasta_fn, cns_fastq_fn = line.split()
# rm(cns_fasta_fn)
# rm(cns_fasta_fn)

rm(cns_h_ctg_fasta_fn)
touch(cns_h_ctg_fasta_fn)
rm(cns_h_ctg_fastq_fn)
Expand All @@ -139,11 +140,13 @@ def task_cns_zcat(self):
cns_fasta_fn, cns_fastq_fn = line.split()
system('zcat {cns_fasta_fn} >> {cns_h_ctg_fasta_fn}'.format(**locals()))
system('zcat {cns_fastq_fn} >> {cns_h_ctg_fastq_fn}'.format(**locals()))
with open(gathered_h_ctg_fn) as ifs:
for line in ifs:
cns_fasta_fn, cns_fastq_fn = line.split()
rm(cns_fasta_fn)
rm(cns_fasta_fn)

# comment out this for now for recovering purpose
#with open(gathered_h_ctg_fn) as ifs:
# for line in ifs:
# cns_fasta_fn, cns_fastq_fn = line.split()
# rm(cns_fasta_fn)
# rm(cns_fasta_fn)

touch(job_done_fn)

Expand All @@ -152,7 +155,7 @@ def task_scatter_quiver(self):
h_ctg_fn = fn(self.h_ctg_fa)
out_json = fn(self.scattered_quiver_json)
track_reads_h_done_fn = fn(self.track_reads_h_done)
sam_dir = os.path.dirname(track_reads_h_done_fn)
bam_dir = os.path.dirname(track_reads_h_done_fn)
config = self.parameters['config']

ref_seq_data = {}
Expand Down Expand Up @@ -183,12 +186,12 @@ def task_scatter_quiver(self):
m_ctg_id = ctg_id.split('-')[0]
wd = os.path.join(os.getcwd(), m_ctg_id)
ref_fasta = os.path.join(wd, '{ctg_id}_ref.fa'.format(ctg_id = ctg_id))
read_sam = os.path.join(sam_dir, '{ctg_id}.sam'.format(ctg_id = ctg_id))
read_bam = os.path.join(bam_dir, '{ctg_id}.bam'.format(ctg_id = ctg_id))
#cns_fasta = makePypeLocalFile(os.path.join(wd, 'cns-{ctg_id}.fasta.gz'.format(ctg_id = ctg_id)))
#cns_fastq = makePypeLocalFile(os.path.join(wd, 'cns-{ctg_id}.fastq.gz'.format(ctg_id = ctg_id)))
#job_done = makePypeLocalFile(os.path.join(wd, '{ctg_id}_quiver_done'.format(ctg_id = ctg_id)))

if os.path.exists(read_sam):
if os.path.exists(read_bam):
# *.sam are created in task_track_reads, fc_select_reads_from_bam.py
# Network latency should not matter because we have already waited for the 'done' file.
mkdir(wd)
Expand All @@ -204,7 +207,7 @@ def task_scatter_quiver(self):
new_job['smrt_bin'] = config['smrt_bin']
new_job['sge_option'] = config['sge_quiver']
new_job['ref_fasta'] = ref_fasta
new_job['read_sam'] = read_sam
new_job['read_bam'] = read_bam
jobs.append(new_job)
open(out_json, 'w').write(json.dumps(jobs))

Expand All @@ -221,16 +224,16 @@ def create_quiver_jobs(wf, scattered_quiver_plf):
smrt_bin = job['smrt_bin']
sge_option = job['sge_option']
ref_fasta = makePypeLocalFile(job['ref_fasta'])
read_sam = makePypeLocalFile(job['read_sam'])
read_bam = makePypeLocalFile(job['read_bam'])
m_ctg_id = ctg_id.split('-')[0]
wd = os.path.join(os.getcwd(), './4-quiver/', m_ctg_id)
#ref_fasta = makePypeLocalFile(os.path.join(wd, '{ctg_id}_ref.fa'.format(ctg_id = ctg_id)))
#read_sam = makePypeLocalFile(os.path.join(os.getcwd(), './4-quiver/reads/' '{ctg_id}.sam'.format(ctg_id = ctg_id)))
#read_bam = makePypeLocalFile(os.path.join(os.getcwd(), './4-quiver/reads/' '{ctg_id}.sam'.format(ctg_id = ctg_id)))
cns_fasta = makePypeLocalFile(os.path.join(wd, 'cns-{ctg_id}.fasta.gz'.format(ctg_id = ctg_id)))
cns_fastq = makePypeLocalFile(os.path.join(wd, 'cns-{ctg_id}.fastq.gz'.format(ctg_id = ctg_id)))
job_done = makePypeLocalFile(os.path.join(wd, '{ctg_id}_quiver_done'.format(ctg_id = ctg_id)))

if os.path.exists(fn(read_sam)): # TODO(CD): Ask Jason what we should do if missing SAM.
if os.path.exists(fn(read_bam)): # TODO(CD): Ask Jason what we should do if missing SAM.
if ctg_types[ctg_id] == 'p':
p_ctg_out.append( (fn(cns_fasta), fn(cns_fastq)) )
elif ctg_types[ctg_id] == 'h':
Expand All @@ -243,7 +246,7 @@ def create_quiver_jobs(wf, scattered_quiver_plf):
'smrt_bin': smrt_bin,
'sge_option': sge_option,
}
make_quiver_task = PypeTask(inputs = {'ref_fasta': ref_fasta, 'read_sam': read_sam,
make_quiver_task = PypeTask(inputs = {'ref_fasta': ref_fasta, 'read_bam': read_bam,
'scattered_quiver': scattered_quiver_plf,
},
outputs = {'cns_fasta': cns_fasta, 'cns_fastq': cns_fastq, 'job_done': job_done},
Expand Down
10 changes: 6 additions & 4 deletions falcon_unzip/select_reads_from_bam.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@ def abs_fn(maybe_rel_fn):
else:
header['RG'].extend(samfile.header['RG'])
samfile.close()

PG = header.pop('PG') #remove PG line as there might be a bug that generates no readable chrs
try:
PG = header.pop('PG') #remove PG line as there might be a bug that generates no readable chrs
except KeyError:
pass
#print PG

#base_dir = os.getcwd()
Expand Down Expand Up @@ -80,9 +82,9 @@ def abs_fn(maybe_rel_fn):
#print "Not selected:", ctg
continue
if ctg not in outfile:
samfile_fn = os.path.join(sam_dir, '%s.sam' % ctg)
samfile_fn = os.path.join(sam_dir, '%s.bam' % ctg)
print >>sys.stderr, 'samfile_fn:{!r}'.format(samfile_fn)
outfile[ctg] = pysam.AlignmentFile(samfile_fn, 'wh', header=header)
outfile[ctg] = pysam.AlignmentFile(samfile_fn, 'wb', header=header)
outfile[ctg].write(r)
samfile.close()

Expand Down
51 changes: 36 additions & 15 deletions falcon_unzip/unzip.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,13 @@ def task_run_blasr(self):
hostname
date
cd {wd}
time {blasr} {read_fasta} {ref_fasta} -noSplitSubreads -clipping subread\
-hitPolicy randombest -randomSeed 42 -bestn 1 -minPctIdentity 70.0\
-minMatch 12 -nproc 24 -sam -out tmp_aln.sam
{samtools} view -bS tmp_aln.sam | {samtools} sort - {ctg_id}_sorted
time {blasr} {read_fasta} {ref_fasta} --noSplitSubreads --clipping subread\
--hitPolicy randombest --randomSeed 42 --bestn 1 --minPctIdentity 70.0\
--minMatch 12 --nproc 24 --bam --out tmp_aln.bam
#{samtools} view -bS tmp_aln.sam | {samtools} sort - {ctg_id}_sorted
{samtools} sort tmp_aln.bam -o {ctg_id}_sorted.bam
{samtools} index {ctg_id}_sorted.bam
rm tmp_aln.sam
rm tmp_aln.bam
date
touch {job_done}
""".format(**locals())
Expand All @@ -108,7 +109,9 @@ def task_phasing(self):
wd = self.parameters['wd']
ctg_id = self.parameters['ctg_id']

#config = self.parameters['config']
config = self.parameters['config']
smrt_bin = config['smrt_bin']
samtools = os.path.join(smrt_bin, 'samtools')

script_fn = os.path.join(wd , 'p_%s.sh' % (ctg_id))

Expand All @@ -119,7 +122,7 @@ def task_phasing(self):
hostname
date
cd {wd}
fc_phasing.py --bam {aln_bam} --fasta {ref_fasta} --ctg_id {ctg_id} --base_dir ..
fc_phasing.py --bam {aln_bam} --fasta {ref_fasta} --ctg_id {ctg_id} --base_dir .. --samtools {samtools}
fc_phasing_readmap.py --ctg_id {ctg_id} --read_map_dir ../../../2-asm-falcon/read_maps --phased_reads phased_reads
date
touch {job_done}
Expand Down Expand Up @@ -147,7 +150,7 @@ def task_hasm(self):
date
cd {wd}
fc_ovlp_filter_with_phase.py --fofn {las_fofn} --max_diff 120 --max_cov 120 --min_cov 1 --n_core 12 --min_len 2500 --db ../../1-preads_ovl/preads.db --rid_phase_map {rid_to_phase_all} > preads.p_ovl
fc_ovlp_filter_with_phase.py --fofn {las_fofn} --max_diff 120 --max_cov 120 --min_cov 1 --n_core 48 --min_len 2500 --db ../../1-preads_ovl/preads.db --rid_phase_map {rid_to_phase_all} > preads.p_ovl
fc_phased_ovlp_to_graph.py preads.p_ovl --min_len 2500 > fc.log
if [ -e ../../1-preads_ovl/preads4falcon.fasta ];
then
Expand Down Expand Up @@ -186,9 +189,10 @@ def task_hasm(self):
self.generated_script_fn = script_fn

def unzip_all(config):
unzip_concurrent_jobs = config['unzip_concurrent_jobs']
unzip_blasr_concurrent_jobs = config['unzip_blasr_concurrent_jobs']
unzip_phasing_concurrent_jobs = config['unzip_phasing_concurrent_jobs']
wf = PypeProcWatcherWorkflow(
max_jobs=unzip_concurrent_jobs,
max_jobs=unzip_blasr_concurrent_jobs,
job_type=config['job_type'],
job_queue=config.get('job_queue'),
sge_option=config.get('sge_option'),
Expand Down Expand Up @@ -246,6 +250,19 @@ def unzip_all(config):
blasr_task = make_blasr_task(task_run_blasr)
aln1_outs[ctg_id] = (ctg_aln_out, job_done)
wf.addTask(blasr_task)
wf.refreshTargets()

wf.max_jobs = unzip_phasing_concurrent_jobs
for ctg_id in ctg_ids:
# inputs
ref_fasta = makePypeLocalFile('./3-unzip/reads/{ctg_id}_ref.fa'.format(ctg_id = ctg_id))
read_fasta = makePypeLocalFile('./3-unzip/reads/{ctg_id}_reads.fa'.format(ctg_id = ctg_id))

# outputs
wd = os.path.join(os.getcwd(), './3-unzip/0-phasing/{ctg_id}/'.format(ctg_id = ctg_id))

blasr_dir = os.path.join(wd, 'blasr')
ctg_aln_out = makePypeLocalFile(os.path.join(blasr_dir, '{ctg_id}_sorted.bam'.format(ctg_id = ctg_id)))

phasing_dir = os.path.join(wd, 'phasing')
job_done = makePypeLocalFile(os.path.join(phasing_dir, 'p_{ctg_id}_done'.format(ctg_id = ctg_id)))
Expand All @@ -261,7 +278,6 @@ def unzip_all(config):
)
phasing_task = make_phasing_task(task_phasing)
wf.addTask(phasing_task)

wf.refreshTargets()

hasm_wd = os.path.abspath('./3-unzip/1-hasm/')
Expand Down Expand Up @@ -343,9 +359,13 @@ def main(argv=sys.argv):
if config.has_option('Unzip', 'sge_track_reads'):
sge_track_reads = config.get('Unzip', 'sge_track_reads')

unzip_concurrent_jobs = 8
if config.has_option('Unzip', 'unzip_concurrent_jobs'):
unzip_concurrent_jobs = config.getint('Unzip', 'unzip_concurrent_jobs')
unzip_blasr_concurrent_jobs = 8
if config.has_option('Unzip', 'unzip_blasr_concurrent_jobs'):
unzip_blasr_concurrent_jobs = config.getint('Unzip', 'unzip_blasr_concurrent_jobs')

unzip_phasing_concurrent_jobs = 8
if config.has_option('Unzip', 'unzip_phasing_concurrent_jobs'):
unzip_phasing_concurrent_jobs = config.getint('Unzip', 'unzip_phasing_concurrent_jobs')

config = {'job_type': job_type,
'job_queue': job_queue,
Expand All @@ -354,7 +374,8 @@ def main(argv=sys.argv):
'sge_phasing': sge_phasing,
'sge_hasm': sge_hasm,
'sge_track_reads': sge_track_reads,
'unzip_concurrent_jobs': unzip_concurrent_jobs,
'unzip_blasr_concurrent_jobs': unzip_blasr_concurrent_jobs,
'unzip_phasing_concurrent_jobs': unzip_phasing_concurrent_jobs,
'pwatcher_type': pwatcher_type,
}

Expand Down
4 changes: 3 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
scripts = glob.glob("src/py_scripts/*.py")

setup(name='falcon_unzip',
version='0.1.0',
version='0.4.0',
description='Falcon unzip',
author='Jason Chin',
author_email='[email protected]',
maintainer='Christopher Dunn',
maintainer_email='[email protected]',
packages=['falcon_unzip'],
package_dir={'falcon_unzip':'falcon_unzip/'},
scripts = scripts,
Expand Down

0 comments on commit 7ebc99c

Please sign in to comment.