Skip to content

Commit

Permalink
separated download and extract processes, per #86
Browse files Browse the repository at this point in the history
  • Loading branch information
mykle hoban committed Aug 17, 2024
1 parent 4a10647 commit bcad553
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 66 deletions.
5 changes: 3 additions & 2 deletions bin/collapse_taxonomy.R
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ opt = parse_args(
usage="%prog [options] <blast_result> <lineage_dump> <output_table>"
),
convert_hyphens_to_underscores = TRUE,
positional_arguments = 3
positional_arguments = 4
# args = debug_args
)

Expand All @@ -87,7 +87,8 @@ if (any(!fe)) {
# store filenames
blast_file <- opt$args[1]
lineage_dump <- opt$args[2]
output_table <- opt$args[3]
nodes_dump <- opt$args[3]
output_table <- opt$args[4]
qcov_thresh <- opt$options$qcov
pid_thresh <- opt$options$pid
diff_thresh <- opt$options$diff
Expand Down
22 changes: 17 additions & 5 deletions modules/modules.nf
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ process lca {
publishDir "${params.outDir}/taxonomy/lca/qcov${params.lcaQcov}_pid${params.lcaPid}_diff${params.lcaDiff}", mode: params.publishMode

input:
tuple path(blast_result), path(lineage), path(merged), val(curated)
tuple path(blast_result), path('*')

output:
tuple path("lca_intermediate*.tsv"), path("lca_taxonomy*.tsv"), emit: result
Expand All @@ -14,7 +14,6 @@ process lca {

script:
pf = params.keepUncultured ? "-u" : ""
c = curated ? "_lulu" : ""
"""
# save settings
echo "Minimum query coverage %: ${params.lcaQcov}" > lca_settings.txt
Expand All @@ -26,11 +25,11 @@ process lca {
--qcov ${params.lcaQcov} \
--pid ${params.lcaPid} \
--diff ${params.lcaDiff} \
--merged ${merged} \
--merged merged.dmp \
--dropped ${params.dropped} \
${pf} \
--intermediate "lca_intermediate${c}.tsv" \
${blast_result} ${lineage} "lca_taxonomy${c}.tsv"
--intermediate "lca_intermediate.tsv" \
${blast_result} rankedlineage.dmp nodes.dmp "lca_taxonomy.tsv"
"""
}

Expand Down Expand Up @@ -77,3 +76,16 @@ process multiqc {
multiqc .
"""
}

// get a file from a URL
process get_web {
input:
tuple val(location), path(localfile)
output:
path(localfile)

exec:
classifier = task.workDir / localfile
url = new URL(location)
url.withInputStream { stream -> classifier << stream }
}
3 changes: 1 addition & 2 deletions nextflow.config
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,7 @@ params.bindDir = ""
params.singularityCache = ""

/* taxonomy assignment / collapse options */
params.lineage = ""
params.merged = "++--++___++---++--+"
params.taxdump = ""
params.dropped = "dropped"
params.assignTaxonomy = false
params.collapseTaxonomy = false
Expand Down
135 changes: 78 additions & 57 deletions rainbow_bridge.nf
Original file line number Diff line number Diff line change
Expand Up @@ -575,20 +575,6 @@ process lulu {
"""
}

// retrieve one of the pre-trained insect models from https://github.com/shaunpwilkinson/insect#classifying-sequences
// because of previous sanity checks, we assume the value passed in `model` is a real one
process get_model {
input:
val(model)
output:
path('insect_model.rds')

exec:
m = model.toLowerCase()
classifier = task.workDir / 'insect_model.rds'
url = new URL(helper.insect_classifiers[m])
url.withInputStream { stream -> classifier << stream }
}

// run insect classifier model
process insect {
Expand Down Expand Up @@ -637,38 +623,50 @@ process insect {
"""
}

// retrieve the NCBI blast taxonomy database
// this is used in blast queries to assign taxids to names
process get_taxdb {
// extract the NCBI blast taxonomy database
process extract_taxdb {
label 'python3'

input:
tuple path(taxdb), val(to_extract)

output:
tuple path('taxdb.btd'), path('taxdb.bti')
path(to_extract)

script:
"""
curl -LO https://ftp.ncbi.nlm.nih.gov/blast/db/taxdb.tar.gz
tar --wildcards -zxvf taxdb.tar.gz 'taxdb.bt*'
tar -zxvf taxdb.tar.gz ${to_extract}
"""
}

// retrieve the NCBI ranked taxonomic lineage dump
// also get the info about merged taxids
process get_lineage {
// extract arbitrary files from a zip archive
process extract_taxonomy {
label 'python3'

input:
tuple path(zipfile), val(f)
output:
path(f)

script:
"""
unzip -p ${zipfile} ${f} > ${f}
"""
}

// dummy process to generate published file
process save_taxdump {
publishDir 'output/taxonomy/ncbi_taxdump'

output:
tuple path('ranked_lineage.tsv'), path('merged.dmp'), emit: dumps
path('new_taxdump.zip')
input:
path(taxdump)

output:
path(taxdump)

script:
"""
curl -LO https://ftp.ncbi.nlm.nih.gov/pub/taxonomy/new_taxdump/new_taxdump.zip
unzip -p new_taxdump.zip rankedlineage.dmp > ranked_lineage.tsv
unzip -p new_taxdump.zip merged.dmp > merged.dmp
# rm new_taxdump.zip
echo "linking taxdump to publish dir"
"""
}

Expand Down Expand Up @@ -798,8 +796,9 @@ include { fastqc as second_fastqc } from './modules/modules.nf'
include { multiqc as first_multiqc } from './modules/modules.nf'
include { multiqc as second_multiqc } from './modules/modules.nf'
include { lca as collapse_taxonomy } from './modules/modules.nf'
include { lca as collapse_taxonomy_lulu } from './modules/modules.nf'

include { get_web as get_model } from './modules/modules.nf'
include { get_web as get_taxdb } from './modules/modules.nf'
include { get_web as get_taxdump } from './modules/modules.nf'

workflow {
// make sure our arguments are all in order
Expand All @@ -815,26 +814,33 @@ workflow {
blast_result = Channel.fromPath(params.blastFile, checkIfExists: true)

// load the ranked lineage and merged channels
if (!helper.file_exists(params.lineage)) {
if (params.lineage != "") {
println(colors.yellow("Lineage file '${params.lineage}' does not exist and will be downloaded"))
if (!helper.file_exists(params.taxdump)) {
if (params.taxdump != "") {
println(colors.yellow("Taxonomy dump archive '${params.taxdump}' does not exist and will be downloaded"))
}
get_lineage()
get_lineage.out.dumps |
set{ lineage }

Channel.of('https://ftp.ncbi.nlm.nih.gov/pub/taxonomy/new_taxdump/new_taxdump.zip') |
combine(Channel.fromPath('new_taxdump.zip')) |
get_taxdump |
set { taxdump }
taxdump |
combine(Channel.of('rankedlineage.dmp','merged.dmp','nodes.dmp')) |
extract_taxonomy | collect | toList |
set { lineage }

taxdump |
save_taxdump
} else {
lineage = Channel.fromPath(params.lineage, checkIfExists: true)
merged = Channel.fromPath(params.merged, checkIfExists: false)
lineage |
combine(merged) |
Channel.fromPath(params.taxdump) |
combine(Channel.of('rankedlineage.dmp','merged.dmp','nodes.dmp')) |
extract_taxonomy | collect | toList |
set { lineage }
}


// run taxonomy process
blast_result |
combine(lineage) |
combine(Channel.of(false)) |
collapse_taxonomy

} else {
Expand Down Expand Up @@ -1147,9 +1153,13 @@ workflow {
toList |
set { taxdb }
} else {
// download taxdb from ncbi website
get_taxdb |
toList |
// download and extract taxdb from ncbi website
Channel.of('https://ftp.ncbi.nlm.nih.gov/blast/db/taxdb.tar.gz') |
combine(Channel.fromPath('taxdb.tar.gz')) |
get_taxdb |
combine(Channel.of('taxdb.btd','taxdb.bti')) |
extract_taxdb |
collect | toList |
set { taxdb }
}
} else {
Expand Down Expand Up @@ -1196,16 +1206,24 @@ workflow {
// get NCBI lineage dump if needed
if ((lca && !params.skipBlast) || params.insect) {
// get the NCBI ranked taxonomic lineage dump
if (!helper.file_exists(params.lineage)) {
get_lineage()
get_lineage.out.dumps |
set { lineage }
if (!helper.file_exists(params.taxdump)) {

Channel.of('https://ftp.ncbi.nlm.nih.gov/pub/taxonomy/new_taxdump/new_taxdump.zip') |
combine(Channel.fromPath('new_taxdump.zip')) |
get_taxdump |
set { taxdump }
taxdump |
combine(Channel.of('rankedlineage.dmp','merged.dmp','nodes.dmp')) |
extract_taxonomy | collect | toList |
set { lineage }

taxdump |
save_taxdump
} else {
lineage = Channel.fromPath(params.lineage, checkIfExists: true)
merged = Channel.fromPath(params.merged, checkIfExists: false)
lineage |
combine(merged) |
set { lineage }
Channel.fromPath(params.taxdump) |
combine(Channel.of('rankedlineage.dmp','merged.dmp','nodes.dmp')) |
extract_taxonomy | collect | toList |
set { lineage }
}
}

Expand All @@ -1222,7 +1240,11 @@ workflow {
classifier = Channel.fromPath(params.insect)
} else {
// download the classifier model if it's one of the supported ones
Channel.of(params.insect) |
// previous sanity checks ensure the model is in our helper map
m = params.insect.toLowerCase()
url = helper.insect_classifiers[m]
Channel.of(url) |
combine(Channel.fromPath('insect_model.rds')) |
get_model |
set { classifier }
}
Expand All @@ -1241,7 +1263,6 @@ workflow {
// and run the taxonomy assignment/collapser script
blast_result |
combine(lineage) |
combine(Channel.of(false)) |
collapse_taxonomy |
set { taxonomized }
}
Expand Down

0 comments on commit bcad553

Please sign in to comment.