diff --git a/subworkflows/local/correlation/main.nf b/subworkflows/local/correlation/main.nf index bb46c5de..9e2a4b57 100644 --- a/subworkflows/local/correlation/main.nf +++ b/subworkflows/local/correlation/main.nf @@ -5,7 +5,7 @@ include {PROPR_PROPR as PROPR} from "../../../modules/local/propr/propr/main.nf" workflow CORRELATION { take: - ch_counts // [ meta, counts] with meta keys: method, args_cor + ch_counts // [[meta_input], counts, analysis method] main: @@ -15,22 +15,19 @@ workflow CORRELATION { ch_versions = Channel.empty() // branch tools to select the correct correlation analysis method - ch_counts - .branch { - propr: it[0]["method"] == "propr" + inputs = ch_counts + .map { meta, abundance, analysis_method -> + [ meta + ['method': analysis_method ], abundance] } - .set { ch_counts } // ---------------------------------------------------- // Perform correlation analysis with propr // ---------------------------------------------------- - PROPR(ch_counts.propr.unique()) + PROPR( inputs.filter{it[0].method == 'propr'}) ch_matrix = PROPR.out.matrix.mix(ch_matrix) ch_adjacency = PROPR.out.adjacency.mix(ch_adjacency) ch_versions = ch_versions.mix(PROPR.out.versions) - // TODO: divide propr module into cor, propr, pcor, pcorbshrink, etc. - emit: matrix = ch_matrix // channel: [ csv ] adjacency = ch_adjacency // channel: [ csv ] diff --git a/subworkflows/local/differential/main.nf b/subworkflows/local/differential/main.nf index a271f871..782dbd3c 100644 --- a/subworkflows/local/differential/main.nf +++ b/subworkflows/local/differential/main.nf @@ -10,11 +10,12 @@ include { FILTER_DIFFTABLE as FILTER_DIFFTABLE_DESEQ2 } from '../../../modules/l workflow DIFFERENTIAL { take: - ch_counts // [ meta_exp, counts ] with meta keys: method, args_diff + ch_input // [[meta_input], counts, analysis method] + ch_samplesheet // [ meta_exp, samplesheet ] ch_contrasts // [ meta_contrast, contrast_variable, reference, target ] - ch_transcript_lengths - ch_control_features + ch_transcript_lengths // [ meta_exp, transcript_lengths] + ch_control_features // [ meta_exp, control_features] main: @@ -26,14 +27,17 @@ workflow DIFFERENTIAL { ch_adjacency = Channel.empty() ch_versions = Channel.empty() - // branch tools to select the correct differential analysis method - ch_counts - .branch { - propd: it[0]["method"] == "propd" - deseq2: it[0]["method"] == "deseq2" - limma: it[0]["method"] == "limma" + // We need to cross the things we're iterating + inputs = ch_input + .combine(ch_samplesheet) + .filter{ meta_input, abundance, analysis_method, meta_exp, samplesheet -> meta_input.subMap(meta_exp.keySet()) == meta_exp } + .combine(ch_contrasts) + .multiMap { meta_input, abundance, analysis_method, meta_exp, samplesheet, meta_contrast, variable, reference, target -> + def meta = meta_input.clone() + meta_contrast.clone() + ['method': analysis_method ] // ToDo: If modules are updated to combine their input metas, then this merge could be removed + samples_and_matrix: [ meta, samplesheet, abundance] + contrasts: [ meta, variable, reference, target] + propd_input: [ meta + ['contrast': meta_contrast.id, 'method': analysis_method], abundance, samplesheet, variable, reference, target ] // ToDo: remove this, once propd is updated and uses standardized inputs } - .set { ch_counts } // ---------------------------------------------------- // Perform differential analysis with propd @@ -42,18 +46,7 @@ workflow DIFFERENTIAL { // TODO propd currently don't support blocking, so we should not run propd with same contrast_variable, reference and target, // but different blocking variable, since it will simply run the same analysis again. - ch_counts.propd - .combine(ch_samplesheet) - .filter{ meta_counts, counts, meta_samplesheet, samplesheet -> meta_counts.subMap(meta_samplesheet.keySet()) == meta_samplesheet } - .combine(ch_contrasts) - .map { - meta_data, counts, meta_samplesheet, samplesheet, meta_contrast, contrast_variable, reference, target -> - def meta = meta_data.clone() + ['contrast': meta_contrast.id] - return [ meta, counts, samplesheet, contrast_variable, reference, target ] - } - .set { ch_propd } - - PROPD(ch_propd.unique()) + PROPD( inputs.propd_input.filter{it[0].method == 'propd'}) ch_results_pairwise = PROPD.out.results.mix(ch_results_pairwise) ch_results_pairwise_filtered = PROPD.out.results_filtered.mix(ch_results_pairwise_filtered) ch_results_genewise = PROPD.out.connectivity.mix(ch_results_genewise) @@ -65,29 +58,17 @@ workflow DIFFERENTIAL { // Perform differential analysis with DESeq2 // ---------------------------------------------------- - ch_counts.deseq2 - .combine(ch_samplesheet) - .filter{ meta_counts, counts, meta_samplesheet, samplesheet -> meta_counts.subMap(meta_samplesheet.keySet()) == meta_samplesheet } - .combine(ch_contrasts) - .multiMap { - meta_data, counts, meta_samplesheet, samplesheet, meta_contrast, contrast_variable, reference, target -> - def meta = meta_data.clone() + meta_contrast.clone() - contrast: [ meta, contrast_variable, reference, target ] - samples_and_matrix: [ meta, samplesheet, counts ] - } - .set { ch_deseq2 } - // do we need this process DESEQ2_NORM? DESEQ2_NORM ( - ch_deseq2.contrast.first(), - ch_deseq2.samples_and_matrix, + inputs.contrasts.filter{it[0].method == 'deseq2'}.first(), + inputs.samples_and_matrix.filter{it[0].method == 'deseq2'}, ch_control_features, ch_transcript_lengths ) DESEQ2_DIFFERENTIAL ( - ch_deseq2.contrast, - ch_deseq2.samples_and_matrix, + inputs.contrasts.filter{it[0].method == 'deseq2'}, + inputs.samples_and_matrix.filter{it[0].method == 'deseq2'}, ch_control_features, ch_transcript_lengths ) @@ -97,16 +78,6 @@ workflow DIFFERENTIAL { ch_model_deseq2 = DESEQ2_DIFFERENTIAL.out.model ch_versions = DESEQ2_DIFFERENTIAL.out.versions.mix(ch_versions) - ch_processed_matrices = ch_norm_deseq2 - if ('rlog' in params.deseq2_vs_method){ - ch_processed_matrices = ch_processed_matrices.join(DESEQ2_NORM.out.rlog_counts) - } - if ('vst' in params.deseq2_vs_method){ - ch_processed_matrices = ch_processed_matrices.join(DESEQ2_NORM.out.vst_counts) - } - ch_processed_matrices = ch_processed_matrices - .map{ it.tail() } - // TODO modify the module to accept these parameters as meta/ext.args in the same way how propd does ch_logfc_deseq2 = Channel.value([ "log2FoldChange", params.differential_min_fold_change ]) ch_padj_deseq2 = Channel.value([ "padj", params.differential_max_qval ]) @@ -125,24 +96,11 @@ workflow DIFFERENTIAL { // Perform differential analysis with limma // ---------------------------------------------------- - // combine the input channels with the tools information - // in this way, limma will only be run if the user have specified it, as informed by ch_tools - ch_counts.limma - .combine(ch_samplesheet) - .filter{ meta_counts, counts, meta_samplesheet, samplesheet -> meta_counts.subMap(meta_samplesheet.keySet()) == meta_samplesheet } - .combine(ch_contrasts) - .unique() - .multiMap { - meta_data, counts, meta_samplesheet, samplesheet, meta_contrast, contrast_variable, reference, target -> - def meta = meta_data.clone() + meta_contrast.clone() - input1: [ meta, contrast_variable, reference, target ] - input2: [ meta, samplesheet, counts ] - } - .set { ch_limma } - - // run limma - LIMMA_DIFFERENTIAL(ch_limma.input1, ch_limma.input2) + LIMMA_DIFFERENTIAL( + inputs.contrasts.filter{it[0].method == 'limma'}, + inputs.samples_and_matrix.filter { it[0].method == 'limma' } + ) ch_versions = LIMMA_DIFFERENTIAL.out.versions.mix(ch_versions) // filter results diff --git a/subworkflows/local/enrichment/main.nf b/subworkflows/local/enrichment/main.nf index 30006c19..826cc985 100644 --- a/subworkflows/local/enrichment/main.nf +++ b/subworkflows/local/enrichment/main.nf @@ -12,10 +12,10 @@ include { TABULAR_TO_GSEA_CHIP } from '../../../modules/local/tabular_to_gsea_ch workflow ENRICHMENT { take: - ch_counts // [ meta, counts] with meta keys: method, args_cor - ch_results_genewise // [ meta, results] with meta keys: method, args_cor - ch_results_genewise_filtered // [ meta, results] with meta keys: method, args_cor - ch_adjacency // [ meta, adj_matrix] with meta keys: method, args_cor + ch_counts // [[meta_input], counts, analysis method] + ch_results_genewise // [[meta_results], results, analysis method] + ch_results_genewise_filtered // [[meta_results], results, analysis method] + ch_adjacency // [[meta_adj_matrix], adj_matrix, analysis method] ch_contrasts ch_samplesheet ch_featuresheet @@ -31,14 +31,14 @@ workflow ENRICHMENT { ch_versions = Channel.empty() ch_counts - .branch { - gsea: it[0]["method"] == "gsea" + .map { meta, abundance, analysis_method -> + [ meta + ['method': analysis_method ], abundance] } .set { ch_counts } ch_adjacency - .branch { - grea: it[0]["method"] == "grea" + .map { meta, adj_matrix, analysis_method -> + [ meta + ['method': analysis_method ], adj_matrix] } .set { ch_adjacency } @@ -56,7 +56,7 @@ workflow ENRICHMENT { // Perform enrichment analysis with GREA // ---------------------------------------------------- - GREA(ch_adjacency.grea.unique(), ch_gmt.collect()) + GREA(ch_adjacency.filter{it[0].method == 'grea'}, ch_gmt.collect()) ch_enriched = ch_enriched.mix(GREA.out.results) ch_versions = ch_versions.mix(GREA.out.versions) @@ -68,7 +68,7 @@ workflow ENRICHMENT { // input, and process the sample sheet to generate class definitions // (CLS) for the variable used in each contrast - CUSTOM_TABULARTOGSEAGCT ( ch_counts ) + CUSTOM_TABULARTOGSEAGCT ( ch_counts.filter{it[0].method == 'gsea'} ) // TODO: update CUSTOM_TABULARTOGSEACLS for value channel input per new // guidlines (rather than meta usage employed here) @@ -94,7 +94,6 @@ workflow ENRICHMENT { .map{ tuple(it[1], it[0], it[2]) } .combine( ch_gmt.map { meta, gmt -> gmt } ) - println("__"+TABULAR_TO_GSEA_CHIP.out.chip) GSEA_GSEA( ch_gsea_inputs, ch_gsea_inputs.map{ tuple(it[0].reference, it[0].target) }, // * @@ -130,21 +129,20 @@ workflow ENRICHMENT { ch_background = [] } else if (params.gprofiler2_background_file == "auto") { // If auto, use input matrix as background - ch_background = ch_counts.map { meta, counts -> counts } + ch_background = ch_counts.filter{it[0].method == 'gprofiler2'}.map { meta, counts -> counts } } else { ch_background = Channel.from(file(params.gprofiler2_background_file, checkIfExists: true)) } // rearrange channel for GPROFILER2_GOST process ch_gmt = ch_gmt.map { meta, gmt -> gmt } - ch_results_genewise_filtered - .branch { - grea: it[0]["method"] == "gprofiler2" + .map { meta, results, analysis_method -> + [ meta + ['method': analysis_method ], results] } .set { ch_results_genewise_filtered } - GPROFILER2_GOST(ch_results_genewise_filtered, ch_gmt, ch_background) + GPROFILER2_GOST(ch_results_genewise_filtered.filter{it[0].method == 'gprofiler2'}, ch_gmt, ch_background) ch_versions = ch_versions.mix(GPROFILER2_GOST.out.versions) emit: diff --git a/subworkflows/local/experimental/main.nf b/subworkflows/local/experimental/main.nf index b08e92b9..25df3c22 100644 --- a/subworkflows/local/experimental/main.nf +++ b/subworkflows/local/experimental/main.nf @@ -9,11 +9,11 @@ def preprocess_subworkflow_output( ch_input, ch_tools_args, method_field_name) { // add method arguments to channel meta return ch_input .combine(ch_tools_args) - .filter{ meta, input, pathway, arg_maps -> meta["pathway_name"] ? meta["pathway_name"] == pathway["pathway_name"] : true } + .filter{ meta, input, pathway, arg_map -> meta["pathway_name"] ? meta["pathway_name"] == pathway["pathway_name"] : true } .map{ meta, input, pathway, arg_map -> def meta_clone = meta.clone() + pathway + arg_map.clone() - meta_clone["method"] = meta_clone.remove(method_field_name) - return [meta_clone, input] + def method = meta_clone.remove(method_field_name) + return [meta_clone, input, method] } }