diff --git a/.github/workflows/end-to-end.yml b/.github/workflows/end-to-end.yml new file mode 100644 index 00000000..800abb3a --- /dev/null +++ b/.github/workflows/end-to-end.yml @@ -0,0 +1,46 @@ +name: End-to-end MGS workflow test + +on: [pull_request] + +jobs: + test: + runs-on: ubuntu-latest + + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Set up JDK 11 + uses: actions/setup-java@v4 + with: + java-version: '11' + distribution: 'adopt' + + - name: Setup Nextflow latest-edge + uses: nf-core/setup-nextflow@v1 + with: + version: "latest-edge" + + - name: Install nf-test + run: | + wget -qO- https://get.nf-test.com | bash + sudo mv nf-test /usr/local/bin/ + + - name: Run index workflow + run: nf-test test --tag index --verbose + + - name: Clean docker for more space + run: | + docker kill $(docker ps -q) 2>/dev/null || true + docker rm $(docker ps -a -q) 2>/dev/null || true + docker rmi $(docker images -q) -f 2>/dev/null || true + docker system prune -af --volumes + + - name: Clean up nf-test dir + run: sudo rm -rf .nf-test + + - name: Run run workflow + run: nf-test test --tag run --verbose + + - name: Run run_validation workflow + run: nf-test test --tag validation --verbose diff --git a/.gitignore b/.gitignore index 86e686b2..9ad97427 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,6 @@ test/output test/.nextflow* *.Rhistory pipeline_report.txt + +.nf-test/ +.nf-test.log \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index 14d80d14..c049cb27 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,29 @@ +# v2.5.2 +- Changes to default read filtering: + - Relaxed FASTP quality filtering (`--cut_mean_quality` and `--average_qual` reduced from 25 to 20). + - Relaxed BBDUK viral filtering (switched from 3 21-mers to 1 24-mer). +- Overhauled BLAST validation functionality: + - BLAST now runs on forward and reverse reads independently + - BLAST output filtering no longer assumes specific filename suffixes + - Paired BLAST output includes more information + - RUN_VALIDATION can now directly take in FASTA files instead of a virus read DB + - Fixed issues with publishing BLAST output under new Nextflow version +- Implemented nf-test for end-to-end testing of pipeline functionality + - Implemented test suite in `tests/main.nf.test` + - Reconfigured INDEX workflow to enable generation of miniature index directories for testing + - Added Github Actions workflow in `.github/workflows/end-to-end.yml` + - Pull requests will now fail if any of INDEX, RUN, or RUN_VALIDATION crashes when run on test data. + - Generated first version of new, curated test dataset for testing RUN workflow. Samplesheet and config file are available in `test-data`. The previous test dataset in `test` has been removed. +- Implemented S3 auto-cleanup: + - Added tags to published files to facilitate S3 auto-cleanup + - Added S3 lifecycle configuration file to `ref`, along with a script in `bin` to add it to an S3 bucket +- Minor changes + - Added logic to check if `grouping` variable in `nextflow.config` matches the input samplesheet, if it doesn't, the code throws an error. + - Externalized resource specifications to `resources.config`, removing hardcoded CPU/memory values + - Renamed `index-params.json` to `params-index.json` to avoid clash with Github Actions + - Removed redundant subsetting statement from TAXONOMY workflow. + - Added --group_across_illumina_lanes option to generate_samplesheet + # v2.5.1 - Enabled extraction of BBDuk-subset putatively-host-viral raw reads for downstream chimera detection. - Added back viral read fields accidentally being discarded by COLLAPSE_VIRUS_READS. @@ -16,7 +42,7 @@ - Reconfigured QC subworkflow to run FASTQC and MultiQC on each pair of input files separately (fixes bug arising from allowing arbitrary filenames for forward and reverse read files). # v2.4.0 -- Created a new output directory where we put log files called `logging`. +- Created a new output directory where we put log files called `logging`. - Added the trace file from Nextflow to the `logging` directory which can be used for understanding cpu, memory usage, and other infromation like runtime. After running the pipeline, `plot-timeline-script.R` can be used to generate a useful summary plot of the runtime for each process in the pipeline. - Removed CONCAT_GZIPPED. - Replaced the sample input format with something more similar to nf-core, called `samplesheet.csv`. This new input file can be generated using the script `generate_samplesheet.sh`. diff --git a/README.md b/README.md index 70c96868..63984ece 100644 --- a/README.md +++ b/README.md @@ -179,6 +179,7 @@ To run this workflow with full functionality, you need access to the following d 2. **Docker:** To install Docker Engine for command-line use, follow the installation instructions available [here](https://docs.docker.com/engine/install/) (or [here](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/install-docker.html) for installation on an AWS EC2 instance). 3. **AWS CLI:** If not already installed, install the AWS CLI by following the instructions available [here](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html). 4. **Git:** To install the Git version control tool, follow the installation instructions available [here](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git). +5. **nf-test**: To install nf-test, follow the install instructions available [here](https://www.nf-test.com/docs/getting-started/). #### 2. Configure AWS & Docker @@ -245,25 +246,33 @@ Wait for the workflow to run to completion; this is likely to take several hours ### Testing & validation -To confirm that the pipeline works in your hands, we provide a small test dataset (`test/raw`) to run through the run workflow. This can be used to test any of the pipeline profiles described above. +To confirm that the pipeline works in your hands, we provide a small test dataset (`s3://nao-testing/gold-standard-test/raw/`) to run through the run workflow. This can be used to test any of the pipeline profiles described above. If your EC2 instance has the resources to handle it, the simplest way to start using the pipeline is to run the test data through it locally on that instance (i.e. without using S3). To do this: -1. Navigate to the `test` directory. -2. Edit `nextflow.config` to set `params.ref_dir` to the index directory you chose or created above (specifically `PATH_TO_REF_DIR/output`). -3. Still within the `test` directory, run `nextflow run -profile ec2_local .. -resume`. -4. Wait for the workflow to finish. Inspect the `output` directory to view the processed output files. +1. Create a new directory outside the repo directory and copy over the run workflow config file as `nextflow.config` in that directory: + +``` +mkdir launch +cd launch +cp REPO_DIR/configs/run.config nextflow.config +``` + +2. Edit `nextflow.config` to set `params.ref_dir` to the index directory you chose or created above (specifically `PATH_TO_REF_DIR/output`). +3. Then set the samplesheet path to the test dataset samplesheet `${projectDir}/test-data/samplesheet.csv`. +4. Within this directory, run `nextflow run -profile ec2_local .. -resume`. Wait for the workflow to finish. +5. Inspect the `output` directory to view the processed output files. If this is successful, the next level of complexity is to run the workflow with a working directory on S3. To do this: -1. Within the `test` directory, edit `nextflow.config` to set `params.base_dir` to the S3 directory of your choice. -2. Still within that directory, run `nextflow run -profile ec2_s3 .. -resume`. +1. Edit `nextflow.config` to set `params.base_dir` to the S3 directory of your choice. +2. Still within that directory, run `nextflow run -profile ec2_s3 .. -resume`. 3. Wait for the workflow to finish, and inspect the output on S3. Finally, you can run the test dataset through the pipeline on AWS Batch. To do this, configure Batch as described [here](https://data.securebio.org/wills-public-notebook/notebooks/2024-06-11_batch.html) (steps 1-3), then: -1. Within the `test` directory, edit `nextflow.config` to set `params.base_dir` to a different S3 directory of your choice and `process.queue` to the name of your Batch job queue. -2. Still within that directory, run `nextflow run -profile batch .. -resume` (or simply `nextflow run .. -resume`). +1. Edit `nextflow.config` to set `params.base_dir` to a different S3 directory of your choice and `process.queue` to the name of your Batch job queue. +2. Still within that directory, run `nextflow run -profile batch .. -resume` (or simply `nextflow run .. -resume`). 3. Wait for the workflow to finish, and inspect the output on S3. ### Running on new data @@ -271,9 +280,8 @@ Finally, you can run the test dataset through the pipeline on AWS Batch. To do t To run the workflow on another dataset, you need: 1. Accessible raw data files in Gzipped FASTQ format, named appropriately. -2. A sample sheet file specifying the samples, along with paths to the forward and reverse read files for each sample. +2. A sample sheet file specifying the samples, along with paths to the forward and reverse read files for each sample. `generate_samplesheet.sh` (see below) can make this for you. 3. A config file in a clean launch directory, pointing to: - - The directory containing the raw data (`params.raw_dir`). - The base directory in which to put the working and output directories (`params.base_dir`). - The directory containing the outputs of the reference workflow (`params.ref_dir`). - The sample sheet (`params.sample_sheet`). @@ -285,18 +293,45 @@ To run the workflow on another dataset, you need: > - Second column: Path to FASTQ file 1 which should be the forward read for this sample > - Third column: Path to FASTQ file 2 which should be the reverse read for this sample > -> The easiest way to get this file is by using the `generate_samplesheet.sh` script. As input, this script takes a path to raw FASTQ files (`dir_path`), and forward (`forward_suffix`) and reverse (`reverse_suffix`) read suffixes, both of which support regex, and an optional output path (`output_path`). Those using data from s3 should make sure to pass the `s3` parameter. Those who would like to group samples by some metadata can pass a path to a CSV file containing a header column named `sample,group`, where each row gives the sample name and the group to group by (`group_file`) or edit the samplesheet manually after generation (since manually editing the samplesheet will be easier when the groups CSV isn't readily available). As output, the script generates a CSV file named (`samplesheet.csv` by default), which can be used as input for the pipeline. - +> The easiest way to get this file is by using the `generate_samplesheet.sh` script. As input, this script takes a path to raw FASTQ files (`dir_path`), and forward (`forward_suffix`) and reverse (`reverse_suffix`) read suffixes, both of which support regex, and an optional output path (`output_path`). Those using data from s3 should make sure to pass the `s3` parameter. Those who would like to group samples by some metadata can pass a path to a CSV file containing a header column named `sample,group`, where each row gives the sample name and the group to group by (`group_file`), edit the samplesheet manually after generation (since manually editing the samplesheet will be easier when the groups CSV isn't readily available), or provide the --group_across_illumina_lanes option if a single library was run across a single Illumina flowcell. As output, the script generates a CSV file named (`samplesheet.csv` by default), which can be used as input for the pipeline. +> +> For example: +> ``` +> ../bin/generate_samplesheet.sh \ +> --s3 +> --dir_path s3://nao-restricted/MJ-2024-10-21/raw/ \ +> --forward_suffix _1 \ +> --reverse_suffix _2 +> ``` If running on Batch, a good process for starting the pipeline on a new dataset is as follows: 1. Process the raw data to have appropriate filenames (see above) and deposit it in an accessible S3 directory. 2. Create a clean launch directory and copy `configs/run.config` to a file named `nextflow.config` in that directory. -3. Create a library metadata file in that launch directory, specifying library/sample mappings and any other metadata (see above). +3. Create a sample sheet in that launch directory (see above) 4. Edit `nextflow.config` to specify each item in `params` as appropriate, as well as setting `process.queue` to the appropriate Batch queue. 5. Run `nextflow run PATH_TO_REPO_DIR -resume`. 6. Navigate to `{params.base_dir}/output` to view and download output files. +## Run tests using `nf-test` before making pull requests + +During the development process, we now request that users run the pipeline using `nf-test` locally before making pull requests (a test will be run automatically on the PR, but it's often useful to run it locally first). To do this, you need to make sure that you have a big enough ec2-instance. We recommend the `m5.xlarge` with at least `32GB` of EBS storage, as this machine closely reflects the VMs on Github Actions. Once you have an instance, run `nf-test run tests/main.test.nf`, which will run all workflows of the pipeline and check that they run to completion. If you want to run a specific workflow, you use the following commands: + +``` +nf-test run --tag index # Runs the index workflow +nf-test run --tag run # Runs the run workflow +nf-test run --tag validation # Runs the validation workflow +``` + +Importantly, make sure to periodically delete docker images to free up space on your instance. You can do this by running the following command, although note that this will delete all docker images: + +``` +docker kill $(docker ps -q) 2>/dev/null || true +docker rm $(docker ps -a -q) 2>/dev/null || true +docker rmi $(docker images -q) -f 2>/dev/null || true +docker system prune -af --volumes +``` + # Troubleshooting When attempting to run a released version of the pipeline, the most common sources of errors are AWS permission issues. Before debugging a persistent error in-depth, make sure that you have all the permissions specified in Step 0 of [our Batch workflow guide](https://data.securebio.org/wills-public-notebook/notebooks/2024-06-11_batch.html). Next, make sure Nextflow has access to your AWS credentials, such as by running `eval "$(aws configure export-credentials --format env)"`. diff --git a/bin/apply-lifecycle-rules.py b/bin/apply-lifecycle-rules.py new file mode 100755 index 00000000..ae346c13 --- /dev/null +++ b/bin/apply-lifecycle-rules.py @@ -0,0 +1,89 @@ +#!/usr/bin/env python3 + +import argparse +import json +import boto3 +import sys +from botocore.exceptions import ClientError + +def load_lifecycle_config(config_path): + try: + with open(config_path, 'r') as f: + return json.load(f) + except json.JSONDecodeError: + print(f"Error: {config_path} contains invalid JSON") + sys.exit(1) + except FileNotFoundError: + print(f"Error: Could not find file {config_path}") + sys.exit(1) + +def print_lifecycle_rules(rules): + if not rules: + print("No lifecycle rules configured") + return + + for rule in rules: + print(f"- {rule['ID']}") + print(f" Status: {rule['Status']}") + if 'Expiration' in rule: + print(f" Expiration: {rule['Expiration'].get('Days', 'N/A')} days") + print() + +def get_current_rules(s3, bucket_name): + try: + response = s3.get_bucket_lifecycle_configuration(Bucket=bucket_name) + return response.get('Rules', []) + except ClientError as e: + if e.response['Error']['Code'] == 'NoSuchLifecycleConfiguration': + return [] + raise + +def apply_lifecycle_rules(bucket_name, lifecycle_config): + s3 = boto3.client('s3') + + try: + # First verify the bucket exists and we have access + s3.head_bucket(Bucket=bucket_name) + + # Show current configuration + print(f"\nCurrent lifecycle rules for bucket {bucket_name}:") + current_rules = get_current_rules(s3, bucket_name) + print_lifecycle_rules(current_rules) + + # Apply the new configuration + s3.put_bucket_lifecycle_configuration( + Bucket=bucket_name, + LifecycleConfiguration=lifecycle_config + ) + print(f"\nSuccessfully applied new lifecycle rules to bucket: {bucket_name}") + + # Show the updated configuration + print("\nUpdated lifecycle rules:") + new_rules = get_current_rules(s3, bucket_name) + print_lifecycle_rules(new_rules) + + except ClientError as e: + error_code = e.response.get('Error', {}).get('Code', 'Unknown') + if error_code == '404': + print(f"Error: Bucket {bucket_name} does not exist") + elif error_code == '403': + print(f"Error: Permission denied for bucket {bucket_name}") + else: + print(f"Error applying lifecycle rules: {str(e)}") + sys.exit(1) + +def main(): + parser = argparse.ArgumentParser(description='Apply S3 lifecycle rules to a bucket') + parser.add_argument('config_file', help='Path to lifecycle configuration JSON file') + parser.add_argument('bucket_name', help='Name of the S3 bucket') + + args = parser.parse_args() + + # Load the configuration + lifecycle_config = load_lifecycle_config(args.config_file) + + # Apply the rules + apply_lifecycle_rules(args.bucket_name, lifecycle_config) + +if __name__ == '__main__': + main() diff --git a/bin/generate_samplesheet.sh b/bin/generate_samplesheet.sh index e5f49c96..9d531dfa 100755 --- a/bin/generate_samplesheet.sh +++ b/bin/generate_samplesheet.sh @@ -1,5 +1,7 @@ #!/bin/bash +set -u +set -e ##### Input parameters ##### @@ -10,7 +12,7 @@ reverse_suffix="" s3=0 output_path="samplesheet.csv" # Default output path group_file="" # Optional parameter for the group file - +group_across_illumina_lanes=false # Parse command-line arguments while [[ $# -gt 0 ]]; do @@ -39,6 +41,10 @@ while [[ $# -gt 0 ]]; do group_file="$2" shift 2 ;; + --group_across_illumina_lanes) + group_across_illumina_lanes=true + shift + ;; *) echo "Unknown option: $1" exit 1 @@ -58,6 +64,13 @@ if [[ -z "$dir_path" || -z "$forward_suffix" || -z "$reverse_suffix" ]]; then echo -e " --s3 Use if files are stored in S3 bucket" echo -e " --output_path Output path for samplesheet [default: samplesheet.csv]" echo -e " --group_file Path to group file for sample grouping [header column must have the names 'sample,group' in that order; additional columns may be included, however they will be ignored by the script]" + echo -e + " --group_across_illumina_lanes Create groups by assuming that files that differ only by a terminal _Lnnn are the same library split across multiple lanes." + exit 1 +fi + +if $group_across_illumina_lanes && [[ -n "$group_file" ]]; then + echo "Provide at most one of --group_file and --group_across_illumina_lanes" exit 1 fi @@ -69,11 +82,12 @@ echo "reverse_suffix: $reverse_suffix" echo "s3: $s3" echo "output_path: $output_path" echo "group_file: $group_file" +echo "group_across_illumina_lanes: $group_across_illumina_lanes" #### EXAMPLES #### -# dir_path="" # Cannot share this as it's restricted, but imagine the read looks like this +# dir_path="" # Cannot share this as it's restricted, but imagine the read looks like this # forward_suffix="_S[0-9]_L[0-9][0-9][0-9]_R1_001" # reverse_suffix="_S[0-9]_L[0-9][0-9][0-9]_R2_001" # s3=1 @@ -125,6 +139,17 @@ if [[ -n "$group_file" ]]; then # Perform left join with group file awk -F',' 'NR==FNR{a[$1]=$2; next} FNR==1{print $0",group"} FNR>1{print $0","(a[$1]?a[$1]:"NA")}' "$group_file" "$temp_samplesheet" > "$output_path" echo "CSV file '$output_path' has been created with group information." +elif $group_across_illumina_lanes; then + cat "$temp_samplesheet" | tr ',' ' ' | \ + while read sample fastq_1 fastq_2; do + if [[ $sample = "sample" ]]; then + echo $sample $fastq_1 $fastq_2 "group" + else + echo $sample $fastq_1 $fastq_2 \ + $(echo "$sample" | sed 's/_L[0-9][0-9][0-9]$//') + fi + done | tr ' ' ',' > "$output_path" + echo "CSV file '$output_path' has been created with grouping across illumina lanes." else # If no group file, just use the temporary samplesheet as the final output mv "$temp_samplesheet" "$output_path" @@ -133,4 +158,3 @@ fi # Remove temporary file if it still exists [ -f "$temp_samplesheet" ] && rm "$temp_samplesheet" - diff --git a/configs/containers.config b/configs/containers.config index b5d29917..dbff9cab 100644 --- a/configs/containers.config +++ b/configs/containers.config @@ -1,12 +1,22 @@ // Specify Docker containers for workflow processes process { - withLabel: base { - container = "eclipse/alpine_jdk8:latest" - // NB: As of 2024-07-01, no more specific tag available + withLabel: curl { + container = "community.wave.seqera.io/library/curl:8.10.1--43150f2d543ef413" + } + withLabel: unzip { + container = "community.wave.seqera.io/library/unzip:6.0--0e729f0c20458893" + } + withLabel: coreutils { + container = "community.wave.seqera.io/library/coreutils:9.5--ae99c88a9b28c264" + } + withLabel: coreutils_gzip_gawk { + container = "community.wave.seqera.io/library/coreutils_gawk_gzip:c49bfad0a858f99a" } withLabel: MultiQC { // NB: As of 2024-07-01, newer versions currently cause errors - container = "multiqc/multiqc:v1.21" +// container = "multiqc/multiqc:v1.21" +// container = "staphb/multiqc:1.22.2" + container = "thatdnaguy/multiqc:v1.21_01" } withLabel: FASTQC { container = "staphb/fastqc:0.12.1" diff --git a/configs/index-for-run-test.config b/configs/index-for-run-test.config new file mode 100644 index 00000000..227a3ce2 --- /dev/null +++ b/configs/index-for-run-test.config @@ -0,0 +1,55 @@ +/*********************************************************************** +| CONFIGURATION FILE FOR NAO VIRAL MGS WORKFLOW - REFERENCES & INDEXES | +***********************************************************************/ + +params { + mode = "index" + + // Directories + base_dir = "s3://nao-testing/index-test" // Parent for working and output directories (can be S3) + + // URLs for downloading reference genomes etc + taxonomy_url = "https://ftp.ncbi.nlm.nih.gov/pub/taxonomy/taxdump_archive/taxdmp_2024-06-01.zip" + virus_host_db_url = "https://www.genome.jp/ftp/db/virushostdb/virushostdb.tsv" + + // 21st chromosome + human_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi?db=nucleotide&id=NC_000021.9&rettype=fasta" + + // Look up genome assembly ncbi + genome_urls = [ + cow_ch28: "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi?db=nucleotide&id=NC_037355.1&rettype=fasta", + ecoli: "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi?db=nucleotide&id=NC_002695.2&rettype=fasta" + ] + + ssu_url = "https://www.arb-silva.de/fileadmin/silva_databases/release_138.1/Exports/SILVA_138.1_SSURef_NR99_tax_silva.fasta.gz" + lsu_url = "https://www.arb-silva.de/fileadmin/silva_databases/release_138.1/Exports/SILVA_138.1_LSURef_NR99_tax_silva.fasta.gz" + + // Other reference files + host_taxon_db = "${projectDir}/ref/host-taxa.tsv" + contaminants = "${projectDir}/ref/contaminants.fasta.gz" + genome_patterns_exclude = "${projectDir}/ref/hv_patterns_exclude.txt" + + // Kraken viral DB + kraken_db = "https://genome-idx.s3.amazonaws.com/kraken/k2_viral_20240904.tar.gz" + // Smallest possible BLAST DB + blast_db_name = "nt_others" + + // Pull information from GenBank or Ref Seq + ncbi_viral_params = "--section refseq --assembly-level complete" + + // Other input values + virus_taxid = "10239" + viral_taxids_exclude = "2731619 2732413 2732411" // Exclude Caudoviricetes, Malgrantaviricetes, Faserviricetes + host_taxa_screen = "vertebrate human" // Host taxa to screen for when building reference virus DB + + // Initializing run params to avoid warnings + kraken_memory = "" + classify_dedup_subset = "" +} + +includeConfig "${projectDir}/configs/logging.config" +includeConfig "${projectDir}/configs/containers.config" +includeConfig "${projectDir}/configs/resources.config" +includeConfig "${projectDir}/configs/profiles.config" +includeConfig "${projectDir}/configs/output.config" +process.queue = "harmon-queue" // AWS Batch job queue diff --git a/configs/index.config b/configs/index.config index 9219f7a3..af5406c8 100644 --- a/configs/index.config +++ b/configs/index.config @@ -12,11 +12,15 @@ params { taxonomy_url = "https://ftp.ncbi.nlm.nih.gov/pub/taxonomy/taxdump_archive/taxdmp_2024-06-01.zip" virus_host_db_url = "https://www.genome.jp/ftp/db/virushostdb/virushostdb.tsv" human_url = "https://s3-us-west-2.amazonaws.com/human-pangenomics/T2T/CHM13/assemblies/analysis_set/chm13v2.0.fa.gz" - cow_url = "https://ftp.ncbi.nlm.nih.gov/genomes/all/GCF/002/263/795/GCF_002263795.3_ARS-UCD2.0/GCF_002263795.3_ARS-UCD2.0_genomic.fna.gz" - pig_url = "https://ftp.ncbi.nlm.nih.gov/genomes/all/GCF/000/003/025/GCF_000003025.6_Sscrofa11.1/GCF_000003025.6_Sscrofa11.1_genomic.fna.gz" - carp_url = "https://ftp.ncbi.nlm.nih.gov/genomes/all/GCF/000/951/615/GCF_000951615.1_common_carp_genome/GCF_000951615.1_common_carp_genome_genomic.fna.gz" - mouse_url = "https://ftp.ncbi.nlm.nih.gov/genomes/all/GCF/000/001/635/GCF_000001635.27_GRCm39/GCF_000001635.27_GRCm39_genomic.fna.gz" - ecoli_url = "https://ftp.ncbi.nlm.nih.gov/genomes/all/GCF/030/389/925/GCF_030389925.1_ASM3038992v1/GCF_030389925.1_ASM3038992v1_genomic.fna.gz" + + genome_urls = [ + cow: "https://ftp.ncbi.nlm.nih.gov/genomes/all/GCF/002/263/795/GCF_002263795.3_ARS-UCD2.0/GCF_002263795.3_ARS-UCD2.0_genomic.fna.gz", + pig: "https://ftp.ncbi.nlm.nih.gov/genomes/all/GCF/000/003/025/GCF_000003025.6_Sscrofa11.1/GCF_000003025.6_Sscrofa11.1_genomic.fna.gz", + carp: "https://ftp.ncbi.nlm.nih.gov/genomes/all/GCF/000/951/615/GCF_000951615.1_common_carp_genome/GCF_000951615.1_common_carp_genome_genomic.fna.gz", + mouse: "https://ftp.ncbi.nlm.nih.gov/genomes/all/GCF/000/001/635/GCF_000001635.27_GRCm39/GCF_000001635.27_GRCm39_genomic.fna.gz", + ecoli: "https://ftp.ncbi.nlm.nih.gov/genomes/all/GCF/030/389/925/GCF_030389925.1_ASM3038992v1/GCF_030389925.1_ASM3038992v1_genomic.fna.gz" + ] + ssu_url = "https://www.arb-silva.de/fileadmin/silva_databases/release_138.1/Exports/SILVA_138.1_SSURef_NR99_tax_silva.fasta.gz" lsu_url = "https://www.arb-silva.de/fileadmin/silva_databases/release_138.1/Exports/SILVA_138.1_LSURef_NR99_tax_silva.fasta.gz" @@ -25,6 +29,8 @@ params { contaminants = "${projectDir}/ref/contaminants.fasta.gz" genome_patterns_exclude = "${projectDir}/ref/hv_patterns_exclude.txt" kraken_db = "s3://genome-idx/kraken/k2_standard_20240605.tar.gz" // Path to tarball containing Kraken reference DB + blast_db_name = "core_nt" + ncbi_viral_params = "--section genbank" // Other input values virus_taxid = "10239" diff --git a/configs/resources.config b/configs/resources.config index 4adb5a73..d58e296f 100644 --- a/configs/resources.config +++ b/configs/resources.config @@ -6,6 +6,16 @@ process { memory = 4.GB } + withLabel: single_cpu_16GB_memory { + cpus = 1 + memory = 16.GB + } + + withLabel: single_cpu_32GB_memory { + cpus = 1 + memory = 32.GB + } + // Small multi-core processes withLabel: small { cpus = 8 @@ -23,4 +33,14 @@ process { cpus = 32 memory = 64.GB } + + withLabel: kraken_resources { + cpus = 16 + memory = 128.GB + } + + withLabel: blast_resources { + cpus = 32 + memory = 256.GB + } } diff --git a/configs/run.config b/configs/run.config index 7e87daa5..98199092 100644 --- a/configs/run.config +++ b/configs/run.config @@ -14,15 +14,16 @@ params { adapters = "${projectDir}/ref/adapters.fasta" // Path to adapter file for adapter trimming // Numerical - grouping = true // Whether to group samples by 'group' column in samplesheet + grouping = false // Whether to group samples by 'group' column in samplesheet n_reads_trunc = 0 // Number of reads per sample to run through pipeline (0 = all reads) n_reads_profile = 1000000 // Number of reads per sample to run through taxonomic profiling bt2_score_threshold = 20 // Normalized score threshold for calling a host-infecting virus read (typically 15 or 20) blast_viral_fraction = 0 // Fraction of putative host-infecting virus reads to BLAST vs nt (0 = don't run BLAST) - kraken_memory = "128 GB" // Memory needed to safely load Kraken DB quality_encoding = "phred33" // FASTQ quality encoding (probably phred33, maybe phred64) fuzzy_match_alignment_duplicates = 0 // Fuzzy matching the start coordinate of reads for identification of duplicates through alignment (0 = exact matching; options are 0, 1, or 2) host_taxon = "vertebrate" + + blast_db_prefix = "core_nt" } includeConfig "${projectDir}/configs/logging.config" diff --git a/configs/run_validation.config b/configs/run_validation.config index 587719ac..d9fa9ab8 100644 --- a/configs/run_validation.config +++ b/configs/run_validation.config @@ -7,16 +7,21 @@ params { // Directories base_dir = "s3://nao-mgs-wb/test-remote" // Parent for working and output directories (can be S3) - ref_dir = "s3://nao-mgs-wb/index-20240714/output" // Reference/index directory (generated by index workflow) + ref_dir = "s3://nao-mgs-wb/index-20241113/output" // Reference/index directory (generated by index workflow) // Files - hv_tsv_collapsed = "${ref_dir}/results/hv/hv_hits_putative_collapsed.tsv.gz" + viral_tsv_collapsed = "${base_dir}/results/virus_hits_db.tsv.gz" + viral_fasta_1 = "" + viral_fasta_2 = "" // Numerical - blast_hv_fraction = 0.2 // Fraction of putative HV reads to BLAST vs nt (0 = don't run BLAST) + blast_viral_fraction = 1 // Fraction of putative viral reads to BLAST vs nt (0 = don't run BLAST) + blast_db_prefix = "core_nt" } +includeConfig "${projectDir}/configs/logging.config" includeConfig "${projectDir}/configs/containers.config" includeConfig "${projectDir}/configs/resources.config" includeConfig "${projectDir}/configs/profiles.config" +includeConfig "${projectDir}/configs/output.config" process.queue = "will-batch-queue" // AWS Batch job queue diff --git a/main.nf b/main.nf index 9a08093b..6ce890c4 100644 --- a/main.nf +++ b/main.nf @@ -15,11 +15,22 @@ workflow { output { "input" { path "input" + tags nextflow_file_class: "publish" } "logging" { path "logging" + tags nextflow_file_class: "publish" } "results" { path "results" + tags nextflow_file_class: "publish" + } + reads_cleaned { + path "intermediates/reads/cleaned" + tags nextflow_file_class: "intermediate" + } + reads_raw_viral { + path "intermediates/reads/raw_viral" + tags nextflow_file_class: "intermediate" } } diff --git a/modules/local/addFragDupToVirusReads/main.nf b/modules/local/addFragDupToVirusReads/main.nf index 764c75aa..2875201b 100644 --- a/modules/local/addFragDupToVirusReads/main.nf +++ b/modules/local/addFragDupToVirusReads/main.nf @@ -1,8 +1,7 @@ // Add fragment length and duplication info to viral read DB process ADD_FRAG_DUP_TO_VIRUS_READS { label "tidyverse" - cpus 1 - memory "16.GB" + label "single_cpu_16GB_memory" input: path(collapsed_ch) path(merged_bbmerge_results) diff --git a/modules/local/bbduk/main.nf b/modules/local/bbduk/main.nf index b7d61a2d..df19e871 100644 --- a/modules/local/bbduk/main.nf +++ b/modules/local/bbduk/main.nf @@ -25,7 +25,7 @@ process BBDUK { ref=!{contaminant_ref} io="in=${in1} in2=${in2} ref=${ref} out=${op1} out2=${op2} outm=${of1} outm2=${of2} stats=${stats}" # Define parameters - par="minkmerfraction=!{min_kmer_fraction} k=!{k} t=!{task.cpus} -Xmx30g" + par="minkmerfraction=!{min_kmer_fraction} k=!{k} t=!{task.cpus} -Xmx!{task.memory.toGiga()}g" # Execute bbduk.sh ${io} ${par} ''' @@ -58,7 +58,7 @@ process BBDUK_HITS { ref=!{contaminant_ref} io="in=${in1} in2=${in2} ref=${ref} out=${op1} out2=${op2} outm=${of1} outm2=${of2} stats=${stats}" # Define parameters - par="minkmerhits=!{min_kmer_hits} k=!{k} t=!{task.cpus} -Xmx30g" + par="minkmerhits=!{min_kmer_hits} k=!{k} t=!{task.cpus} -Xmx!{task.memory.toGiga()}g" # Execute bbduk.sh ${io} ${par} ''' @@ -83,7 +83,7 @@ process BBDUK_MASK { out=!{label}_masked.fasta.gz stats=!{label}_mask.stats.txt ref=!{contaminant_ref} - par="k=!{k} hdist=1 mink=8 mm=f rcomp=t maskmiddle=t mask=N t=!{task.cpus} -Xmx30g" + par="k=!{k} hdist=1 mink=8 mm=f rcomp=t maskmiddle=t mask=N t=!{task.cpus} -Xmx!{task.memory.toGiga()}g" # Execute bbduk.sh in=${in} out=${out} ref=${ref} stats=${stats} ${par} ''' diff --git a/modules/local/bbmap/main.nf b/modules/local/bbmap/main.nf index 2e67e594..4b6e2473 100644 --- a/modules/local/bbmap/main.nf +++ b/modules/local/bbmap/main.nf @@ -22,7 +22,7 @@ process BBMAP { stats=!{sample}_!{suffix}_bbmap.stats.txt io="in=${in1} in2=${in2} outu=${ou1} outu2=${ou2} outm=${om1} outm2=${om2} statsfile=${stats} path=!{index_dir}" # Define parameters - par="minid=0.8 maxindel=4 bwr=0.25 bw=25 quickmatch minhits=2 t=!{task.cpus} -Xmx60g" + par="minid=0.8 maxindel=4 bwr=0.25 bw=25 quickmatch minhits=2 t=!{task.cpus} -Xmx!{task.memory.toGiga()}g" # Execute bbmap.sh ${io} ${par} ''' @@ -43,7 +43,7 @@ process BBMAP_INDEX { mkdir ${odir} cp !{reference_fasta} ${odir}/reference.fasta.gz cd ${odir} - bbmap.sh ref=reference.fasta.gz t=!{task.cpus} -Xmx60g + bbmap.sh ref=reference.fasta.gz t=!{task.cpus} -Xmx!{task.memory.toGiga()}g #tar -czf human-ref-index.tar.gz human_ref_index ''' } diff --git a/modules/local/blast/main.nf b/modules/local/blast/main.nf index f837a9b7..42b7f7c5 100644 --- a/modules/local/blast/main.nf +++ b/modules/local/blast/main.nf @@ -1,14 +1,11 @@ // BLAST paired reads against a downloaded DB and return a single output file process BLAST_PAIRED_LOCAL { label "BLAST" - cpus "${cpus}" - memory "${mem}" + label "blast_resources" input: path(gzipped_reads) path(blast_db_dir) val(db_prefix) - val(cpus) - val(mem) output: path("hits.blast.gz") shell: @@ -30,3 +27,32 @@ process BLAST_PAIRED_LOCAL { gzip hits.blast ''' } + +// BLAST a single input FASTA file against a downloaded DB and return a single output file +process BLAST_LOCAL { + label "BLAST" + label "blast_resources" + input: + path(gzipped_reads_fasta) + path(blast_db_dir) + val(db_prefix) + output: + path("hits.blast.gz") + shell: + ''' + # Specify BLAST parameters + in=!{gzipped_reads_fasta} + out=hits.blast + db=!{blast_db_dir}/!{db_prefix} + threads=!{task.cpus} + # Decompress reads + zcat ${in} > reads.fasta + # Set up command + io="-query reads.fasta -out ${out} -db ${db}" + par="-perc_identity 60 -max_hsps 5 -num_alignments 250 -qcov_hsp_perc 30 -num_threads ${threads}" + # Run BLAST + blastn ${io} ${par} -outfmt "6 qseqid sseqid sgi staxid qlen evalue bitscore qcovs length pident mismatch gapopen sstrand qstart qend sstart send" + # Gzip output + gzip ${out} + ''' +} diff --git a/modules/local/clumpify/main.nf b/modules/local/clumpify/main.nf index fb9c5f3e..02eafe92 100644 --- a/modules/local/clumpify/main.nf +++ b/modules/local/clumpify/main.nf @@ -16,7 +16,7 @@ process CLUMPIFY_PAIRED { op2=!{sample}_dedup_2.fastq.gz io="in=${in1} in2=${in2} out=${op1} out2=${op2}" # Define parameters - par="reorder dedupe passes=6 addcount=t t=!{task.cpus} -Xmx30g" + par="reorder dedupe passes=6 addcount=t t=!{task.cpus} -Xmx!{task.memory.toGiga()}g" # Execute clumpify.sh ${io} ${par} ''' @@ -38,7 +38,7 @@ process CLUMPIFY_SINGLE { out=!{sample}_dedup.fastq.gz io="in=${in} out=${out}" # Define parameters - par="reorder dedupe rcomp passes=6 addcount=t t=!{task.cpus} -Xmx30g" + par="reorder dedupe rcomp passes=6 addcount=t t=!{task.cpus} -Xmx!{task.memory.toGiga()}g" # Execute clumpify.sh ${io} ${par} ''' diff --git a/modules/local/collapseVirusReads/main.nf b/modules/local/collapseVirusReads/main.nf index 42070a4e..c2181c5b 100644 --- a/modules/local/collapseVirusReads/main.nf +++ b/modules/local/collapseVirusReads/main.nf @@ -1,8 +1,7 @@ // Collapse separate read pair entries in viral read DB process COLLAPSE_VIRUS_READS { label "tidyverse" - cpus 1 - memory "32.GB" + label "single_cpu_32GB_memory" input: path(virus_hits_filtered) output: diff --git a/modules/local/concatGroup/main.nf b/modules/local/concatGroup/main.nf index 8544327a..f81b45d0 100644 --- a/modules/local/concatGroup/main.nf +++ b/modules/local/concatGroup/main.nf @@ -1,6 +1,6 @@ // Copy a file to a new location with a custom path process CONCAT_GROUP { - label "base" + label "coreutils" label "single" input: tuple val(samples), path(fastq_1_list), path(fastq_2_list), val(group) diff --git a/modules/local/concatenateFasta/main.nf b/modules/local/concatenateFasta/main.nf index 93286fa8..9a36fb9b 100644 --- a/modules/local/concatenateFasta/main.nf +++ b/modules/local/concatenateFasta/main.nf @@ -1,7 +1,7 @@ // Concatenate gzipped FASTA files process CONCATENATE_FASTA_GZIPPED { label "single" - label "base" + label "coreutils" input: path(files) val(name) @@ -16,7 +16,7 @@ process CONCATENATE_FASTA_GZIPPED { // Concatenate gzipped FASTA files within a directory process CONCATENATE_FASTA_GZIPPED_DIR { label "single" - label "base" + label "coreutils" input: path(dir) val(name) @@ -31,7 +31,7 @@ process CONCATENATE_FASTA_GZIPPED_DIR { // Concatenate gzipped FASTA files within a directory of subdirectories process CONCATENATE_FASTA_GZIPPED_DIR_DEEP { label "single" - label "base" + label "coreutils" input: path(dir) val(name) diff --git a/modules/local/downloadGenome/main.nf b/modules/local/downloadGenome/main.nf index aed011f8..16e6ffeb 100644 --- a/modules/local/downloadGenome/main.nf +++ b/modules/local/downloadGenome/main.nf @@ -3,13 +3,17 @@ process DOWNLOAD_GENOME { label "BBTools" label "single" input: - val(genome_url) - val(name) + tuple val(genome_url), val(name) output: path("${name}.fasta.gz"), emit: genome shell: ''' - path=!{name}.fasta.gz - wget !{genome_url} -O ${path} + path=!{name}.fasta + if [[ "!{genome_url}" == *.gz ]]; then + wget "!{genome_url}" -O ${path}.gz + else + wget "!{genome_url}" -O ${path} + gzip ${path} + fi ''' } diff --git a/modules/local/downloadViralGenbank/main.nf b/modules/local/downloadViralGenbank/main.nf deleted file mode 100644 index 3d34c883..00000000 --- a/modules/local/downloadViralGenbank/main.nf +++ /dev/null @@ -1,14 +0,0 @@ -// Download entire viral Genbank DB -process DOWNLOAD_VIRAL_GENBANK { - label "biopython" - label "max" - output: - path("genbank_metadata.txt"), emit: metadata - path("genbank_genomes"), emit: genomes - shell: - ''' - par="--section genbank --formats fasta --flat-output --verbose --parallel !{task.cpus}" - io="--output-folder genbank_genomes --metadata-table genbank_metadata.txt" - ncbi-genome-download ${par} ${io} viral - ''' -} diff --git a/modules/local/downloadViralNCBI/main.nf b/modules/local/downloadViralNCBI/main.nf new file mode 100644 index 00000000..db61ce0f --- /dev/null +++ b/modules/local/downloadViralNCBI/main.nf @@ -0,0 +1,16 @@ +// Download entire viral Genbank DB +process DOWNLOAD_VIRAL_NCBI { + label "biopython" + label "max" + input: + val(ncbi_viral_params) + output: + path("ncbi_metadata.txt"), emit: metadata + path("ncbi_genomes"), emit: genomes + shell: + ''' + par="--formats fasta --flat-output --verbose --parallel !{task.cpus}" + io="--output-folder ncbi_genomes --metadata-table ncbi_metadata.txt" + ncbi-genome-download !{ncbi_viral_params} ${par} ${io} viral + ''' +} diff --git a/modules/local/downloadVirusHostDB/main.nf b/modules/local/downloadVirusHostDB/main.nf index 0b21b70d..a5345c07 100644 --- a/modules/local/downloadVirusHostDB/main.nf +++ b/modules/local/downloadVirusHostDB/main.nf @@ -1,7 +1,7 @@ // Download entire VirusHostDB process DOWNLOAD_VIRUS_HOST_DB { label "single" - label "base" + label "curl" input: val(virus_host_db_url) output: diff --git a/modules/local/extractNcbiTaxonomy/main.nf b/modules/local/extractNcbiTaxonomy/main.nf index a83d9fd2..fcdb3acb 100644 --- a/modules/local/extractNcbiTaxonomy/main.nf +++ b/modules/local/extractNcbiTaxonomy/main.nf @@ -1,6 +1,6 @@ // Extract NCBI taxonomy archive and access nodes and names files process EXTRACT_NCBI_TAXONOMY { - label "base" + label "unzip" label "single" input: path(taxonomy_zip) diff --git a/modules/local/fastp/main.nf b/modules/local/fastp/main.nf index f54afd32..beb3de24 100644 --- a/modules/local/fastp/main.nf +++ b/modules/local/fastp/main.nf @@ -26,7 +26,42 @@ process FASTP { oh=!{sample}_fastp.html ad=!{adapters} io="--in1 !{reads[0]} --in2 !{reads[1]} --out1 ${o1} --out2 ${o2} --failed_out ${of} --html ${oh} --json ${oj} --adapter_fasta ${ad}" - par="--cut_front --cut_tail --correction --detect_adapter_for_pe --trim_poly_x --cut_mean_quality 25 --average_qual 25 --qualified_quality_phred 20 --verbose --dont_eval_duplication --thread !{task.cpus} --low_complexity_filter" + par="--cut_front --cut_tail --correction --detect_adapter_for_pe --trim_poly_x --cut_mean_quality 20 --average_qual 20 --qualified_quality_phred 20 --verbose --dont_eval_duplication --thread !{task.cpus} --low_complexity_filter" + # Execute + fastp ${io} ${par} + ''' +} + +// Run FASTP for adapter trimming but don't trim for quality +process FASTP_NOTRIM { + label "max" + label "fastp" + input: + // reads is a list of two files: forward/reverse reads + tuple val(sample), path(reads) + path(adapters) + output: + tuple val(sample), path("${sample}_fastp_{1,2}.fastq.gz"), emit: reads + tuple val(sample), path("${sample}_fastp_failed.fastq.gz"), emit: failed + tuple val(sample), path("${sample}_fastp.{json,html}"), emit: log + shell: + /* Cleaning not done in CUTADAPT or TRIMMOMATIC: + * Higher quality threshold for sliding window trimming; + * Removing poly-X tails; + * Automatic adapter detection; + * Base correction in overlapping paired-end reads; + * Filter low complexity reads. + */ + ''' + # Define paths and subcommands + o1=!{sample}_fastp_1.fastq.gz + o2=!{sample}_fastp_2.fastq.gz + of=!{sample}_fastp_failed.fastq.gz + oj=!{sample}_fastp.json + oh=!{sample}_fastp.html + ad=!{adapters} + io="--in1 !{reads[0]} --in2 !{reads[1]} --out1 ${o1} --out2 ${o2} --failed_out ${of} --html ${oh} --json ${oj} --adapter_fasta ${ad}" + par="--detect_adapter_for_pe --disable_quality_filtering --disable_length_filtering --verbose --dont_eval_duplication --thread !{task.cpus}" # Execute fastp ${io} ${par} ''' diff --git a/modules/local/filterBlast/main.nf b/modules/local/filterBlast/main.nf index 8038b8ad..7783deef 100644 --- a/modules/local/filterBlast/main.nf +++ b/modules/local/filterBlast/main.nf @@ -1,11 +1,10 @@ // Process & filter BLAST output into TSV process FILTER_BLAST { label "tidyverse" - cpus 1 - memory "${mem}" + label "single_cpu_32GB_memory" input: path(blast_hits) - val(mem) + val(max_rank) output: path("blast_hits_filtered.tsv.gz") shell: @@ -25,9 +24,7 @@ process FILTER_BLAST { # Rank hits for each query and filter for high-ranking hits blast_hits_ranked <- blast_hits_best %>% group_by(qseqid) %>% mutate(rank = dense_rank(desc(bitscore))) - blast_hits_highrank <- blast_hits_ranked %>% filter(rank <= 5) %>% - mutate(read_pair = str_split(qseqid, "_") %>% sapply(nth, n=-1), - seq_id = str_split(qseqid, "_") %>% sapply(nth, n=1)) + blast_hits_highrank <- blast_hits_ranked %>% filter(rank <= !{max_rank}) # Write output write_tsv(blast_hits_highrank, "blast_hits_filtered.tsv.gz") ''' diff --git a/modules/local/filterVirusReads/main.nf b/modules/local/filterVirusReads/main.nf index 929372e1..c9deb5f1 100644 --- a/modules/local/filterVirusReads/main.nf +++ b/modules/local/filterVirusReads/main.nf @@ -1,8 +1,7 @@ // Filter virus reads by alignment score and assignment status process FILTER_VIRUS_READS { label "tidyverse" - cpus 1 - memory "16.GB" + label "single_cpu_16GB_memory" input: path(virus_hits) val(score_threshold) diff --git a/modules/local/kraken/main.nf b/modules/local/kraken/main.nf index ed762af8..d8b8b0b1 100644 --- a/modules/local/kraken/main.nf +++ b/modules/local/kraken/main.nf @@ -1,12 +1,10 @@ // 7.3. Perform taxonomic assignment with Kraken2 process KRAKEN { label "Kraken2" - cpus 16 - memory "${mem}" + label "kraken_resources" input: tuple val(sample), path(reads) path db_path - val mem output: tuple val(sample), path("${sample}.output"), emit: output tuple val(sample), path("${sample}.report"), emit: report diff --git a/modules/local/makeVirusReadsFasta/main.nf b/modules/local/makeVirusReadsFasta/main.nf index 9abfc91a..2bd7e37f 100644 --- a/modules/local/makeVirusReadsFasta/main.nf +++ b/modules/local/makeVirusReadsFasta/main.nf @@ -1,8 +1,7 @@ // Extract FASTA from virus read TSV process MAKE_VIRUS_READS_FASTA { label "tidyverse" - cpus 1 - memory "16.GB" + label "single_cpu_16GB_memory" input: path(virus_hits_db) output: @@ -13,8 +12,8 @@ process MAKE_VIRUS_READS_FASTA { library(tidyverse) tab <- read_tsv("!{virus_hits_db}", col_names = TRUE, show_col_types = FALSE) %>% mutate(seq_head = paste0(">", seq_id), - header1 = paste0(seq_head, "_1"), - header2 = paste0(seq_head, "_2")) + header1 = paste0(seq_head, " 1"), + header2 = paste0(seq_head, " 2")) fasta_1_tab <- select(tab, header=header1, seq=query_seq_fwd) fasta_2_tab <- select(tab, header=header2, seq=query_seq_rev) fasta_1_out <- do.call(paste, c(fasta_1_tab, sep="\n")) %>% paste(collapse="\n") diff --git a/modules/local/mergeSamKraken/main.nf b/modules/local/mergeSamKraken/main.nf index 06907528..77f3b769 100644 --- a/modules/local/mergeSamKraken/main.nf +++ b/modules/local/mergeSamKraken/main.nf @@ -1,8 +1,7 @@ // Merge processed SAM and Kraken TSVs and compute length-normalized alignment scores process MERGE_SAM_KRAKEN { label "tidyverse" - cpus 1 - memory "16.GB" + label "single_cpu_16GB_memory" input: tuple val(sample), path(kraken_processed), path(sam_processed) output: diff --git a/modules/local/mergeTsvs/main.nf b/modules/local/mergeTsvs/main.nf index 16f2ee26..22a77267 100644 --- a/modules/local/mergeTsvs/main.nf +++ b/modules/local/mergeTsvs/main.nf @@ -1,8 +1,7 @@ // Combine multiple TSVs with identical headers into a single output file process MERGE_TSVS { label "tidyverse" - cpus 1 - memory "16.GB" + label "single_cpu_16GB_memory" input: path(tsvs) val(name) diff --git a/modules/local/pairBlast/main.nf b/modules/local/pairBlast/main.nf index 564a9303..10e3e698 100644 --- a/modules/local/pairBlast/main.nf +++ b/modules/local/pairBlast/main.nf @@ -3,22 +3,30 @@ process PAIR_BLAST { label "tidyverse" label "single" input: - path(blast_hits_filtered) + path(blast_filtered_fwd) + path(blast_filtered_rev) output: path("blast_hits_paired.tsv.gz") shell: ''' #!/usr/bin/env Rscript library(tidyverse) - blast_hits_filtered_path <- "!{blast_hits_filtered}" - blast_hits_filtered <- read_tsv(blast_hits_filtered_path, show_col_types = FALSE) - # Summarize by read pair and taxid - blast_hits_paired <- blast_hits_filtered %>% - mutate(bitscore = as.numeric(bitscore)) %>% - group_by(seq_id, staxid) %>% - summarize(bitscore_max = max(bitscore), bitscore_min = min(bitscore), - n_reads = n(), .groups = "drop") + # Set input paths + path_fwd <- "!{blast_filtered_fwd}" + path_rev <- "!{blast_filtered_rev}" + # Import data + ctypes <- cols(qseqid="c", sseqid="c", sgi="c", staxid="i", qlen="i", evalue="d", + bitscore="i", qcovs="i", length="i", pident="d", mismatch="i", gapopen="i", + sstrand="c", qstart="i", qend="i", sstart="i", send="i", rank="i") + hits_fwd <- read_tsv(path_fwd, col_types = ctypes) + hits_rev <- read_tsv(path_rev, col_types = ctypes) + # Join on query ID and subject taxid + hits_pair <- full_join(hits_fwd, hits_rev, by=c("qseqid", "staxid"), + suffix = c("_fwd", "_rev")) %>% + mutate(n_reads = as.numeric(!is.na(sseqid_fwd))+as.numeric(!is.na(sseqid_rev)), + bitscore_max = pmax(bitscore_fwd, bitscore_rev), + bitscore_min = pmin(bitscore_fwd, bitscore_rev)) # Write output - write_tsv(blast_hits_paired, "blast_hits_paired.tsv.gz") + write_tsv(hits_pair, "blast_hits_paired.tsv.gz") ''' } diff --git a/modules/local/summarizeBBMerge/main.nf b/modules/local/summarizeBBMerge/main.nf index b4ae9b2b..b5463860 100644 --- a/modules/local/summarizeBBMerge/main.nf +++ b/modules/local/summarizeBBMerge/main.nf @@ -1,5 +1,5 @@ process SUMMARIZE_BBMERGE { - label "base" + label "coreutils_gzip_gawk" label "single" input: tuple val(sample), path(merged) @@ -14,4 +14,4 @@ process SUMMARIZE_BBMERGE { ' > ${sample}_bbmerge_summary.txt echo Processing complete. Output saved to ${sample}_bbmerge_summary.txt """ -} \ No newline at end of file +} diff --git a/modules/local/summarizeDedup/main.nf b/modules/local/summarizeDedup/main.nf index f914bf7c..34728067 100644 --- a/modules/local/summarizeDedup/main.nf +++ b/modules/local/summarizeDedup/main.nf @@ -1,5 +1,5 @@ process SUMMARIZE_DEDUP { - label "base" + label "coreutils_gzip_gawk" label "single" input: tuple val(sample), path(merged) @@ -22,4 +22,4 @@ process SUMMARIZE_DEDUP { ' > ${sample}_dedup_summary.txt echo Processing complete. Output saved to ${sample}_dedup_summary.txt """ -} \ No newline at end of file +} diff --git a/nf-test.config b/nf-test.config new file mode 100644 index 00000000..b4d9e7a2 --- /dev/null +++ b/nf-test.config @@ -0,0 +1,8 @@ +config { + + testsDir "tests" + workDir ".nf-test" + configFile "tests/nextflow.config" + profile "ec2_local" + +} diff --git a/pipeline-version.txt b/pipeline-version.txt index 73462a5a..f225a78a 100644 --- a/pipeline-version.txt +++ b/pipeline-version.txt @@ -1 +1 @@ -2.5.1 +2.5.2 diff --git a/ref/s3-lifecycle-policy.json b/ref/s3-lifecycle-policy.json new file mode 100644 index 00000000..01f585f0 --- /dev/null +++ b/ref/s3-lifecycle-policy.json @@ -0,0 +1,43 @@ +{ + "Rules": [ + { + "ID": "Delete intermediate outputs after 14 days", + "Filter": { + "Tag": { + "Key": "nextflow_file_class", + "Value": "intermediate" + } + }, + "Status": "Enabled", + "Expiration": { + "Days": 14 + } + }, + { + "ID": "Delete temporary workdir files after 1 week", + "Filter": { + "Tag": { + "Key": "nextflow.io/temporary", + "Value": "true" + } + }, + "Status": "Enabled", + "Expiration": { + "Days": 7 + } + }, + { + "ID": "Delete workdir logging files after 4 weeks", + "Filter": { + "Tag": { + "Key": "nextflow.io/metadata", + "Value": "true" + } + }, + "Status": "Enabled", + "Expiration": { + "Days": 28 + } + } + ] +} diff --git a/subworkflows/local/blastViral/main.nf b/subworkflows/local/blastViral/main.nf index 1287ab6d..aae56eaf 100644 --- a/subworkflows/local/blastViral/main.nf +++ b/subworkflows/local/blastViral/main.nf @@ -3,8 +3,10 @@ ***************************/ include { SUBSET_READS_PAIRED_MERGED } from "../../../modules/local/subsetReads" -include { BLAST_PAIRED_LOCAL } from "../../../modules/local/blast" -include { FILTER_BLAST } from "../../../modules/local/filterBlast" +include { BLAST_LOCAL as BLAST_LOCAL_FWD } from "../../../modules/local/blast" +include { BLAST_LOCAL as BLAST_LOCAL_REV } from "../../../modules/local/blast" +include { FILTER_BLAST as FILTER_BLAST_FWD } from "../../../modules/local/filterBlast" +include { FILTER_BLAST as FILTER_BLAST_REV } from "../../../modules/local/filterBlast" include { PAIR_BLAST } from "../../../modules/local/pairBlast" /*********** @@ -17,9 +19,6 @@ workflow BLAST_VIRAL { blast_db_dir blast_db_prefix read_fraction - blast_cpus - blast_mem - blast_filter_mem main: // Subset viral reads for BLAST if ( read_fraction < 1 ) { @@ -27,16 +26,25 @@ workflow BLAST_VIRAL { } else { subset_ch = viral_fasta } - // BLAST putative viral hits against prepared DB - blast_ch = BLAST_PAIRED_LOCAL(subset_ch, blast_db_dir, blast_db_prefix, blast_cpus, blast_mem) - // Process BLAST output - filter_ch = FILTER_BLAST(blast_ch, blast_filter_mem) - pair_ch = PAIR_BLAST(filter_ch) + // BLAST forward and reverse reads separately against prepared DB + blast_ch_1 = BLAST_LOCAL_FWD(subset_ch.map{it[0]}, blast_db_dir, blast_db_prefix) + blast_ch_2 = BLAST_LOCAL_REV(subset_ch.map{it[1]}, blast_db_dir, blast_db_prefix) + // Filter BLAST output (keeping forward and reverse separate) + filter_ch_1 = FILTER_BLAST_FWD(blast_ch_1, 5) + filter_ch_2 = FILTER_BLAST_REV(blast_ch_2, 5) + // Combine alignment information across pairs + filter_out_1 = filter_ch_1.map{ file -> + def newFile = file.parent / "fwd_${file.name}" + file.copyTo(newFile) + return newFile + } + filter_out_2 = filter_ch_2.map{ file -> + def newFile = file.parent / "rev_${file.name}" + file.copyTo(newFile) + return newFile + } + pair_ch = PAIR_BLAST(filter_out_1, filter_out_2) emit: - blast_raw = blast_ch - blast_filtered = filter_ch + blast_subset = subset_ch blast_paired = pair_ch - publish: - subset_ch >> "results" - pair_ch >> "results" } diff --git a/subworkflows/local/extractViralReads/main.nf b/subworkflows/local/extractViralReads/main.nf index 0b4f5b6a..44e45510 100644 --- a/subworkflows/local/extractViralReads/main.nf +++ b/subworkflows/local/extractViralReads/main.nf @@ -47,7 +47,6 @@ workflow EXTRACT_VIRAL_READS { bbduk_suffix encoding fuzzy_match - kraken_memory grouping main: // Get reference paths @@ -91,7 +90,7 @@ workflow EXTRACT_VIRAL_READS { human_bbm_ch = BBMAP_HUMAN(other_bt2_ch.reads_unconc, bbm_human_index_path, "human") other_bbm_ch = BBMAP_OTHER(human_bbm_ch.reads_unmapped, bbm_other_index_path, "other") // Run Kraken on filtered viral candidates - tax_ch = TAXONOMY(other_bbm_ch.reads_unmapped, kraken_db_ch, true, "F", 1, kraken_memory) + tax_ch = TAXONOMY(other_bbm_ch.reads_unmapped, kraken_db_ch, true, "F") // Process Kraken output and merge with Bowtie2 output across samples kraken_output_ch = PROCESS_KRAKEN_VIRAL(tax_ch.kraken_output, virus_db_path, host_taxon) bowtie2_kraken_merged_ch = MERGE_SAM_KRAKEN(kraken_output_ch.combine(bowtie2_sam_ch, by: 0)) diff --git a/subworkflows/local/makeContaminantIndex/main.nf b/subworkflows/local/makeContaminantIndex/main.nf index c5dccd8d..03b5980e 100644 --- a/subworkflows/local/makeContaminantIndex/main.nf +++ b/subworkflows/local/makeContaminantIndex/main.nf @@ -2,11 +2,7 @@ | MODULES AND SUBWORKFLOWS | ***************************/ -include { DOWNLOAD_GENOME as DOWNLOAD_COW } from "../../../modules/local/downloadGenome" -include { DOWNLOAD_GENOME as DOWNLOAD_PIG } from "../../../modules/local/downloadGenome" -include { DOWNLOAD_GENOME as DOWNLOAD_MOUSE } from "../../../modules/local/downloadGenome" -include { DOWNLOAD_GENOME as DOWNLOAD_CARP } from "../../../modules/local/downloadGenome" -include { DOWNLOAD_GENOME as DOWNLOAD_ECOLI } from "../../../modules/local/downloadGenome" +include { DOWNLOAD_GENOME } from "../../../modules/local/downloadGenome" include { CONCATENATE_FASTA_GZIPPED } from "../../../modules/local/concatenateFasta" include { BBMAP_INDEX } from "../../../modules/local/bbmap" include { BOWTIE2_INDEX } from "../../../modules/local/bowtie2" @@ -17,22 +13,25 @@ include { BOWTIE2_INDEX } from "../../../modules/local/bowtie2" workflow MAKE_CONTAMINANT_INDEX { take: - cow_url - pig_url - mouse_url - carp_url - ecoli_url + genome_urls contaminants_path main: // Download reference genomes - cow_ch = DOWNLOAD_COW(cow_url, "cow_genome") - pig_ch = DOWNLOAD_PIG(pig_url, "pig_genome") - mouse_ch = DOWNLOAD_MOUSE(mouse_url, "mouse_genome") - carp_ch = DOWNLOAD_CARP(carp_url, "carp_genome") - ecoli_ch = DOWNLOAD_ECOLI(ecoli_url, "ecoli_genome") - ref_ch = cow_ch.mix(pig_ch, mouse_ch, carp_ch, ecoli_ch, channel.fromPath(contaminants_path)).collect() - // Concatenate - genome_ch = CONCATENATE_FASTA_GZIPPED(ref_ch, "ref_concat") + ref_ch = channel + .fromList(genome_urls.entrySet()) + .map { entry -> + tuple(entry.value, entry.key) // (url, name) + } + + downloaded_ch = DOWNLOAD_GENOME(ref_ch) + + combined_ch = downloaded_ch + .mix(channel.fromPath(contaminants_path)) + .collect() + + // Then use combined_ch for the rest of your workflow + genome_ch = CONCATENATE_FASTA_GZIPPED(combined_ch, "ref_concat") + // Make indexes bbmap_ch = BBMAP_INDEX(genome_ch, "bbm-other-index") bowtie2_ch = BOWTIE2_INDEX(genome_ch, "bt2-other-index") diff --git a/subworkflows/local/makeHumanIndex/main.nf b/subworkflows/local/makeHumanIndex/main.nf index e623eaad..7f0bf592 100644 --- a/subworkflows/local/makeHumanIndex/main.nf +++ b/subworkflows/local/makeHumanIndex/main.nf @@ -14,7 +14,9 @@ workflow MAKE_HUMAN_INDEX { take: human_genome_url main: - genome_ch = DOWNLOAD_GENOME(human_genome_url, "human_genome") + ref_ch = channel + .of(tuple(human_genome_url, "human")) // (url, name) + genome_ch = DOWNLOAD_GENOME(ref_ch) bbmap_ch = BBMAP_INDEX(genome_ch, "bbm-human-index") bowtie2_ch = BOWTIE2_INDEX(genome_ch, "bt2-human-index") emit: diff --git a/subworkflows/local/makeVirusGenomeDB/main.nf b/subworkflows/local/makeVirusGenomeDB/main.nf index 57f2a8be..0bcaa216 100644 --- a/subworkflows/local/makeVirusGenomeDB/main.nf +++ b/subworkflows/local/makeVirusGenomeDB/main.nf @@ -2,7 +2,7 @@ | MODULES AND SUBWORKFLOWS | ***************************/ -include { DOWNLOAD_VIRAL_GENBANK } from "../../../modules/local/downloadViralGenbank" +include { DOWNLOAD_VIRAL_NCBI } from "../../../modules/local/downloadViralNCBI" include { FILTER_VIRAL_GENBANK_METADATA } from "../../../modules/local/filterViralGenbankMetadata" include { ADD_GENBANK_GENOME_IDS } from "../../../modules/local/addGenbankGenomeIDs" include { CONCATENATE_GENOME_FASTA } from "../../../modules/local/concatenateGenomeFasta" @@ -14,12 +14,13 @@ include { FILTER_GENOME_FASTA } from "../../../modules/local/filterGenomeFasta" workflow MAKE_VIRUS_GENOME_DB { take: + ncbi_viral_params // Parameters for downloading genomic sequences from NCBI's GenBank or RefSeq databases. Any additional sequence-specific options can be provided here, supplementing the default download settings. virus_db // TSV giving taxonomic structure and host infection status of virus taxids patterns_exclude // File of sequence header patterns to exclude from genome DB host_taxa // Tuple of host taxa to include main: // 1. Download viral Genbank - dl_ch = DOWNLOAD_VIRAL_GENBANK() + dl_ch = DOWNLOAD_VIRAL_NCBI(ncbi_viral_params) // 2. Filter genome metadata by taxid to identify genomes to retain meta_ch = FILTER_VIRAL_GENBANK_METADATA(dl_ch.metadata, virus_db, host_taxa, "virus-genome") // 3. Add genome IDs to Genbank metadata file diff --git a/subworkflows/local/profile/main.nf b/subworkflows/local/profile/main.nf index 24b4353f..efb4cbb3 100644 --- a/subworkflows/local/profile/main.nf +++ b/subworkflows/local/profile/main.nf @@ -27,7 +27,6 @@ workflow PROFILE { min_kmer_fraction k bbduk_suffix - kraken_memory grouping main: // Randomly subset reads to target number @@ -55,8 +54,8 @@ workflow PROFILE { ribo_path = "${ref_dir}/results/ribo-ref-concat.fasta.gz" ribo_ch = BBDUK(grouped_ch, ribo_path, min_kmer_fraction, k, bbduk_suffix) // Run taxonomic profiling separately on ribo and non-ribo reads - tax_ribo_ch = TAXONOMY_RIBO(ribo_ch.fail, kraken_db_ch, false, "D", 1, kraken_memory) - tax_noribo_ch = TAXONOMY_NORIBO(ribo_ch.reads, kraken_db_ch, false, "D", 1, kraken_memory) + tax_ribo_ch = TAXONOMY_RIBO(ribo_ch.fail, kraken_db_ch, false, "D") + tax_noribo_ch = TAXONOMY_NORIBO(ribo_ch.reads, kraken_db_ch, false, "D") // Merge ribo and non-ribo outputs kr_ribo = tax_ribo_ch.kraken_reports.collectFile(name: "kraken_reports_ribo.tsv.gz") kr_noribo = tax_noribo_ch.kraken_reports.collectFile(name: "kraken_reports_noribo.tsv.gz") diff --git a/subworkflows/local/taxonomy/main.nf b/subworkflows/local/taxonomy/main.nf index f9fbc352..425d03a3 100644 --- a/subworkflows/local/taxonomy/main.nf +++ b/subworkflows/local/taxonomy/main.nf @@ -6,7 +6,6 @@ | MODULES AND SUBWORKFLOWS | ***************************/ -include { SUBSET_READS_PAIRED } from "../../../modules/local/subsetReads" include { BBMERGE } from "../../../modules/local/bbmerge" include { SUMMARIZE_BBMERGE } from "../../../modules/local/summarizeBBMerge" include { SUMMARIZE_DEDUP } from "../../../modules/local/summarizeDedup" @@ -30,21 +29,12 @@ workflow TAXONOMY { kraken_db_ch dedup_rc classification_level - read_fraction - kraken_memory main: - // Subset reads (if applicable) - if ( read_fraction == 1 ){ - subset_ch = reads_ch - } else { - subset_ch = SUBSET_READS_PAIRED(reads_ch, read_fraction, "fastq") - } - // Deduplicate reads (if applicable) if ( dedup_rc ){ - paired_dedup_ch = CLUMPIFY_PAIRED(subset_ch) + paired_dedup_ch = CLUMPIFY_PAIRED(reads_ch) } else { - paired_dedup_ch = subset_ch + paired_dedup_ch = reads_ch } // Prepare reads merged_ch = BBMERGE(paired_dedup_ch) @@ -61,7 +51,7 @@ workflow TAXONOMY { summarize_dedup_ch = SUMMARIZE_DEDUP(dedup_ch) // Run Kraken and munge reports - kraken_ch = KRAKEN(dedup_ch, kraken_db_ch, kraken_memory) + kraken_ch = KRAKEN(dedup_ch, kraken_db_ch) kraken_label_ch = LABEL_KRAKEN_REPORTS(kraken_ch.report) kraken_merge_ch = MERGE_KRAKEN_REPORTS(kraken_label_ch.collect().ifEmpty([]), "kraken_reports") // Run Bracken and munge reports diff --git a/test-data/grouping-samplesheet.csv b/test-data/grouping-samplesheet.csv new file mode 100644 index 00000000..7be90f2e --- /dev/null +++ b/test-data/grouping-samplesheet.csv @@ -0,0 +1,2 @@ +sample,fastq_1,fastq_2,group +gold_standard,s3://nao-testing/gold-standard-test/raw/gold_standard_R1.fastq.gz,s3://nao-testing/gold-standard-test/raw/gold_standard_R2.fastq.gz,gold_standard diff --git a/test/nextflow.config b/test-data/nextflow.config similarity index 91% rename from test/nextflow.config rename to test-data/nextflow.config index 7e87daa5..98199092 100644 --- a/test/nextflow.config +++ b/test-data/nextflow.config @@ -14,15 +14,16 @@ params { adapters = "${projectDir}/ref/adapters.fasta" // Path to adapter file for adapter trimming // Numerical - grouping = true // Whether to group samples by 'group' column in samplesheet + grouping = false // Whether to group samples by 'group' column in samplesheet n_reads_trunc = 0 // Number of reads per sample to run through pipeline (0 = all reads) n_reads_profile = 1000000 // Number of reads per sample to run through taxonomic profiling bt2_score_threshold = 20 // Normalized score threshold for calling a host-infecting virus read (typically 15 or 20) blast_viral_fraction = 0 // Fraction of putative host-infecting virus reads to BLAST vs nt (0 = don't run BLAST) - kraken_memory = "128 GB" // Memory needed to safely load Kraken DB quality_encoding = "phred33" // FASTQ quality encoding (probably phred33, maybe phred64) fuzzy_match_alignment_duplicates = 0 // Fuzzy matching the start coordinate of reads for identification of duplicates through alignment (0 = exact matching; options are 0, 1, or 2) host_taxon = "vertebrate" + + blast_db_prefix = "core_nt" } includeConfig "${projectDir}/configs/logging.config" diff --git a/test-data/samplesheet.csv b/test-data/samplesheet.csv new file mode 100644 index 00000000..18b1ffe5 --- /dev/null +++ b/test-data/samplesheet.csv @@ -0,0 +1,2 @@ +sample,fastq_1,fastq_2 +gold_standard,s3://nao-testing/gold-standard-test/raw/gold_standard_R1.fastq.gz,s3://nao-testing/gold-standard-test/raw/gold_standard_R2.fastq.gz diff --git a/test/grouping.csv b/test/grouping.csv deleted file mode 100644 index e9fa59d2..00000000 --- a/test/grouping.csv +++ /dev/null @@ -1,9 +0,0 @@ -sample,group -230926Esv_D23-14904-1,A -230926Esv_D23-14905-1,A -230926Esv_D23-14906-1,A -230926Esv_D23-14907-1,A -230926Esv_D23-14908-1,B -230926Esv_D23-14909-1,C -230926Esv_D23-14910-1,D -230926Esv_D23-14911-1,E diff --git a/test/raw/230926Esv_D23-14904-1_1.fastq.gz b/test/raw/230926Esv_D23-14904-1_1.fastq.gz deleted file mode 100644 index ee884038..00000000 Binary files a/test/raw/230926Esv_D23-14904-1_1.fastq.gz and /dev/null differ diff --git a/test/raw/230926Esv_D23-14904-1_2.fastq.gz b/test/raw/230926Esv_D23-14904-1_2.fastq.gz deleted file mode 100644 index 9e78d7fe..00000000 Binary files a/test/raw/230926Esv_D23-14904-1_2.fastq.gz and /dev/null differ diff --git a/test/raw/230926Esv_D23-14905-1_1.fastq.gz b/test/raw/230926Esv_D23-14905-1_1.fastq.gz deleted file mode 100644 index 663c68e4..00000000 Binary files a/test/raw/230926Esv_D23-14905-1_1.fastq.gz and /dev/null differ diff --git a/test/raw/230926Esv_D23-14905-1_2.fastq.gz b/test/raw/230926Esv_D23-14905-1_2.fastq.gz deleted file mode 100644 index a9cf0a6d..00000000 Binary files a/test/raw/230926Esv_D23-14905-1_2.fastq.gz and /dev/null differ diff --git a/test/raw/230926Esv_D23-14906-1_1.fastq.gz b/test/raw/230926Esv_D23-14906-1_1.fastq.gz deleted file mode 100644 index 844f727f..00000000 Binary files a/test/raw/230926Esv_D23-14906-1_1.fastq.gz and /dev/null differ diff --git a/test/raw/230926Esv_D23-14906-1_2.fastq.gz b/test/raw/230926Esv_D23-14906-1_2.fastq.gz deleted file mode 100644 index 1dd25f42..00000000 Binary files a/test/raw/230926Esv_D23-14906-1_2.fastq.gz and /dev/null differ diff --git a/test/raw/230926Esv_D23-14907-1_1.fastq.gz b/test/raw/230926Esv_D23-14907-1_1.fastq.gz deleted file mode 100644 index 1956f9da..00000000 Binary files a/test/raw/230926Esv_D23-14907-1_1.fastq.gz and /dev/null differ diff --git a/test/raw/230926Esv_D23-14907-1_2.fastq.gz b/test/raw/230926Esv_D23-14907-1_2.fastq.gz deleted file mode 100644 index afd2c5cd..00000000 Binary files a/test/raw/230926Esv_D23-14907-1_2.fastq.gz and /dev/null differ diff --git a/test/raw/230926Esv_D23-14908-1_1.fastq.gz b/test/raw/230926Esv_D23-14908-1_1.fastq.gz deleted file mode 100644 index a58fb672..00000000 Binary files a/test/raw/230926Esv_D23-14908-1_1.fastq.gz and /dev/null differ diff --git a/test/raw/230926Esv_D23-14908-1_2.fastq.gz b/test/raw/230926Esv_D23-14908-1_2.fastq.gz deleted file mode 100644 index 8a80ac0b..00000000 Binary files a/test/raw/230926Esv_D23-14908-1_2.fastq.gz and /dev/null differ diff --git a/test/raw/230926Esv_D23-14909-1_1.fastq.gz b/test/raw/230926Esv_D23-14909-1_1.fastq.gz deleted file mode 100644 index aa4b1585..00000000 Binary files a/test/raw/230926Esv_D23-14909-1_1.fastq.gz and /dev/null differ diff --git a/test/raw/230926Esv_D23-14909-1_2.fastq.gz b/test/raw/230926Esv_D23-14909-1_2.fastq.gz deleted file mode 100644 index 9341aac9..00000000 Binary files a/test/raw/230926Esv_D23-14909-1_2.fastq.gz and /dev/null differ diff --git a/test/raw/230926Esv_D23-14910-1_1.fastq.gz b/test/raw/230926Esv_D23-14910-1_1.fastq.gz deleted file mode 100644 index ea40e59e..00000000 Binary files a/test/raw/230926Esv_D23-14910-1_1.fastq.gz and /dev/null differ diff --git a/test/raw/230926Esv_D23-14910-1_2.fastq.gz b/test/raw/230926Esv_D23-14910-1_2.fastq.gz deleted file mode 100644 index d6c417d1..00000000 Binary files a/test/raw/230926Esv_D23-14910-1_2.fastq.gz and /dev/null differ diff --git a/test/raw/230926Esv_D23-14911-1_1.fastq.gz b/test/raw/230926Esv_D23-14911-1_1.fastq.gz deleted file mode 100644 index 13cc6972..00000000 Binary files a/test/raw/230926Esv_D23-14911-1_1.fastq.gz and /dev/null differ diff --git a/test/raw/230926Esv_D23-14911-1_2.fastq.gz b/test/raw/230926Esv_D23-14911-1_2.fastq.gz deleted file mode 100644 index 1257f7d6..00000000 Binary files a/test/raw/230926Esv_D23-14911-1_2.fastq.gz and /dev/null differ diff --git a/test/samplesheet.csv b/test/samplesheet.csv deleted file mode 100644 index 67505e5f..00000000 --- a/test/samplesheet.csv +++ /dev/null @@ -1,9 +0,0 @@ -sample,fastq_1,fastq_2 -230926Esv_D23-14904-1,raw/230926Esv_D23-14904-1_1.fastq.gz,raw/230926Esv_D23-14904-1_2.fastq.gz -230926Esv_D23-14905-1,raw/230926Esv_D23-14905-1_1.fastq.gz,raw/230926Esv_D23-14905-1_2.fastq.gz -230926Esv_D23-14906-1,raw/230926Esv_D23-14906-1_1.fastq.gz,raw/230926Esv_D23-14906-1_2.fastq.gz -230926Esv_D23-14907-1,raw/230926Esv_D23-14907-1_1.fastq.gz,raw/230926Esv_D23-14907-1_2.fastq.gz -230926Esv_D23-14908-1,raw/230926Esv_D23-14908-1_1.fastq.gz,raw/230926Esv_D23-14908-1_2.fastq.gz -230926Esv_D23-14909-1,raw/230926Esv_D23-14909-1_1.fastq.gz,raw/230926Esv_D23-14909-1_2.fastq.gz -230926Esv_D23-14910-1,raw/230926Esv_D23-14910-1_1.fastq.gz,raw/230926Esv_D23-14910-1_2.fastq.gz -230926Esv_D23-14911-1,raw/230926Esv_D23-14911-1_1.fastq.gz,raw/230926Esv_D23-14911-1_2.fastq.gz diff --git a/tests/index.config b/tests/index.config new file mode 100644 index 00000000..8526ae2e --- /dev/null +++ b/tests/index.config @@ -0,0 +1,53 @@ +/* +======================================================================================== + Nextflow config file for running tests +======================================================================================== +*/ + +params { + mode = "index" + + // Directories + base_dir = "./" // Parent for working and output directories (can be S3) + + // Cow & E. coli + genome_urls = [ + ecoli: "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi?db=nucleotide&id=NZ_CP128970.1&rettype=fasta" + ] + + // URLs for downloading reference genomes etc + taxonomy_url = "https://ftp.ncbi.nlm.nih.gov/pub/taxonomy/taxdump_archive/taxdmp_2024-06-01.zip" + virus_host_db_url = "https://www.genome.jp/ftp/db/virushostdb/virushostdb.tsv" + + // This is actually ecoli + human_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi?db=nucleotide&id=NZ_CP128970.1&rettype=fasta" + + ssu_url = "https://www.arb-silva.de/fileadmin/silva_databases/release_138.1/Exports/SILVA_138.1_SSURef_NR99_tax_silva.fasta.gz" + lsu_url = "https://www.arb-silva.de/fileadmin/silva_databases/release_138.1/Exports/SILVA_138.1_LSURef_NR99_tax_silva.fasta.gz" + + // Other reference files + host_taxon_db = "${projectDir}/ref/host-taxa.tsv" + contaminants = "${projectDir}/ref/contaminants.fasta.gz" + genome_patterns_exclude = "${projectDir}/ref/hv_patterns_exclude.txt" + + // Kraken DB - https://benlangmead.github.io/aws-indexes/k2 + kraken_db = "https://genome-idx.s3.amazonaws.com/kraken/k2_viral_20240904.tar.gz" // Path to tarball containing Kraken reference DB + blast_db_name = "nt_others" + + // Can't do Circovirus, Anellovirus + // Works Betacoronavirus + ncbi_viral_params = "--section refseq --assembly-level complete --genera 'Rhinovirus'" + + // Other input values + virus_taxid = "10239" + viral_taxids_exclude = "2731619 2732413 2732411" // Exclude Caudoviricetes, Malgrantaviricetes, Faserviricetes + host_taxa_screen = "human" // Host taxa to screen for when building reference virus DB + + // Initializing run params to avoid warnings + kraken_memory = "" + classify_dedup_subset = "" +} + +includeConfig "${projectDir}/configs/containers.config" +includeConfig "${projectDir}/configs/profiles.config" +includeConfig "${projectDir}/configs/output.config" \ No newline at end of file diff --git a/tests/main.nf.test b/tests/main.nf.test new file mode 100644 index 00000000..5156cf93 --- /dev/null +++ b/tests/main.nf.test @@ -0,0 +1,31 @@ +nextflow_pipeline { + + name "End-to-end test of MGS workflow" + script "main.nf" + + test("Test index workflow") { + config "tests/index.config" + tag "index" + + then { + assert workflow.success + } + } + test("Test run workflow") { + config "tests/run.config" + tag "run" + + then { + assert workflow.success + } + } + test("Test validation workflow") { + config "tests/run_validation.config" + tag "validation" + + then { + assert workflow.success + } + } + +} diff --git a/tests/nextflow.config b/tests/nextflow.config new file mode 100644 index 00000000..c7f738cf --- /dev/null +++ b/tests/nextflow.config @@ -0,0 +1,49 @@ +// Default config file that nf-test relies on +// Specify resources for processes based on labels +// Free resources provided by Github Actions is limited to 4 CPUs, 16GB RAM, and 14GB SSD for Linux VMs. More information here:https://docs.github.com/en/actions/using-github-hosted-runners/using-github-hosted-runners/about-github-hosted-runners#standard-github-hosted-runners-for-public-repositories + +process { + // Single-core processes + withLabel: single { + cpus = 1 + memory = 4.GB + } + + withLabel: single_cpu_16GB_memory { + cpus = 1 + memory = 15.GB + } + + withLabel: single_cpu_32GB_memory { + cpus = 1 + memory = 15.GB + } + + // Small multi-core processes + withLabel: small { + cpus = 4 + memory = 15.GB + } + + // Large multi-core processes + withLabel: large { + cpus = 4 + memory = 15.GB + } + + // Very large multi-core processes + withLabel: max { + cpus = 4 + memory = 15.GB + } + + withLabel: kraken_resources { + cpus = 4 + memory = 15.GB + } + + withLabel: blast_resources { + cpus = 4 + memory = 15.GB + } +} diff --git a/tests/run.config b/tests/run.config new file mode 100644 index 00000000..fc157874 --- /dev/null +++ b/tests/run.config @@ -0,0 +1,33 @@ +/* +======================================================================================== + Nextflow config file for running tests +======================================================================================== +*/ + +params { + mode = "run" + + // Directories + base_dir = "./" // Parent for working and output directories (can be S3) + ref_dir = "s3://nao-testing/index-test/output" // Reference/index directory (generated by index workflow) + + // Files + sample_sheet = "${projectDir}/test-data/samplesheet.csv" // Path to library TSV + adapters = "${projectDir}/ref/adapters.fasta" // Path to adapter file for adapter trimming + + // Numerical + grouping = false // Whether to group samples by 'group' column in samplesheet + n_reads_trunc = 0 // Number of reads per sample to run through pipeline (0 = all reads) + n_reads_profile = 1000000 // Number of reads per sample to run through taxonomic profiling + bt2_score_threshold = 20 // Normalized score threshold for calling a host-infecting virus read (typically 15 or 20) + blast_viral_fraction = 1 // Fraction of putative host-infecting virus reads to BLAST vs nt (0 = don't run BLAST) + quality_encoding = "phred33" // FASTQ quality encoding (probably phred33, maybe phred64) + fuzzy_match_alignment_duplicates = 0 // Fuzzy matching the start coordinate of reads for identification of duplicates through alignment (0 = exact matching; options are 0, 1, or 2) + host_taxon = "vertebrate" + + blast_db_prefix = "nt_others" +} + +includeConfig "${projectDir}/configs/containers.config" +includeConfig "${projectDir}/configs/profiles.config" +includeConfig "${projectDir}/configs/output.config" \ No newline at end of file diff --git a/tests/run_validation.config b/tests/run_validation.config new file mode 100644 index 00000000..987f480e --- /dev/null +++ b/tests/run_validation.config @@ -0,0 +1,23 @@ +/************************************************ +| CONFIGURATION FILE FOR NAO VIRAL MGS WORKFLOW | +************************************************/ + +params { + mode = "run_validation" + + // Directories + base_dir = "./" // Parent for working and output directories (can be S3) + ref_dir = "s3://nao-testing/index-test/output" // Reference/index directory (generated by index workflow) + + // Files + viral_tsv_collapsed = "s3://nao-testing/gold-standard-test/results/hv/hv_hits_putative_collapsed.tsv.gz" + viral_fasta_1 = "" + viral_fasta_2 = "" + + // Numerical + blast_viral_fraction = 0.2 // Fraction of putative HV reads to BLAST vs nt (0 = don't run BLAST) + blast_db_prefix = "nt_others" +} + +includeConfig "${projectDir}/configs/containers.config" +includeConfig "${projectDir}/configs/profiles.config" diff --git a/workflows/index.nf b/workflows/index.nf index 9f866bb8..faa32b4a 100644 --- a/workflows/index.nf +++ b/workflows/index.nf @@ -29,15 +29,15 @@ workflow INDEX { // Build viral taxonomy and infection DB MAKE_VIRUS_TAXONOMY_DB(params.taxonomy_url, params.virus_host_db_url, params.host_taxon_db, params.virus_taxid, params.viral_taxids_exclude) // Get reference DB of viral genomes of interest - MAKE_VIRUS_GENOME_DB(MAKE_VIRUS_TAXONOMY_DB.out.db, params.genome_patterns_exclude, params.host_taxa_screen) + MAKE_VIRUS_GENOME_DB(params.ncbi_viral_params, MAKE_VIRUS_TAXONOMY_DB.out.db, params.genome_patterns_exclude, params.host_taxa_screen) // Build viral alignment index MAKE_VIRUS_INDEX(MAKE_VIRUS_GENOME_DB.out.fasta) // Build other alignment indices MAKE_HUMAN_INDEX(params.human_url) - MAKE_CONTAMINANT_INDEX(params.cow_url, params.pig_url, params.mouse_url, params.carp_url, params.ecoli_url, params.contaminants) + MAKE_CONTAMINANT_INDEX(params.genome_urls, params.contaminants) // Other index files JOIN_RIBO_REF(params.ssu_url, params.lsu_url) - DOWNLOAD_BLAST_DB("core_nt") + DOWNLOAD_BLAST_DB(params.blast_db_name) EXTRACT_KRAKEN_DB(params.kraken_db, "kraken_db", true) // Publish results params_str = JsonOutput.prettyPrint(JsonOutput.toJson(params)) diff --git a/workflows/run.nf b/workflows/run.nf index 7ad36a0b..b8f36e92 100644 --- a/workflows/run.nf +++ b/workflows/run.nf @@ -27,6 +27,19 @@ workflow RUN { // Start time start_time = new Date() start_time_str = start_time.format("YYYY-MM-dd HH:mm:ss z (Z)") + kraken_db_path = "${params.ref_dir}/results/kraken_db" + blast_db_path = "${params.ref_dir}/results/${params.blast_db_prefix}" + + // Check if grouping column exists in samplesheet + check_grouping = new File(params.sample_sheet).text.readLines()[0].contains('group') ? true : false + if (params.grouping != check_grouping) { + if (params.grouping && !check_grouping) { + throw new Exception("Grouping enabled in config file, but group column absent from samplesheet.") + } else if (!params.grouping && check_grouping) { + throw new Exception("Grouping is not enabled in config file, but group column is present in the samplesheet.") + } + } + // Prepare samplesheet if ( params.grouping ) { samplesheet = Channel @@ -43,24 +56,25 @@ workflow RUN { samplesheet_ch = samplesheet.map { sample, read1, read2 -> tuple(sample, [read1, read2]) } group_ch = Channel.empty() } - // Prepare Kraken DB - kraken_db_path = "${params.ref_dir}/results/kraken_db" // Preprocessing RAW(samplesheet_ch, params.n_reads_trunc, "2", "4 GB", "raw_concat") CLEAN(RAW.out.reads, params.adapters, "2", "4 GB", "cleaned") // Extract and count human-viral reads - EXTRACT_VIRAL_READS(CLEAN.out.reads, group_ch, params.ref_dir, kraken_db_path, params.bt2_score_threshold, params.adapters, params.host_taxon, "3", "21", "viral", "${params.quality_encoding}", "${params.fuzzy_match_alignment_duplicates}", "${params.kraken_memory}", params.grouping) + EXTRACT_VIRAL_READS(CLEAN.out.reads, group_ch, params.ref_dir, kraken_db_path, params.bt2_score_threshold, params.adapters, params.host_taxon, "1", "24", "viral", "${params.quality_encoding}", "${params.fuzzy_match_alignment_duplicates}", params.grouping) // Process intermediate output for chimera detection raw_processed_ch = EXTRACT_VIRAL_READS.out.bbduk_match.join(RAW.out.reads, by: 0) EXTRACT_RAW_READS_FROM_PROCESSED(raw_processed_ch, "raw_viral_subset") // BLAST validation on host-viral reads (optional) if ( params.blast_viral_fraction > 0 ) { - blast_db_path = "${params.ref_dir}/results/core_nt" - blast_db_prefix = "core_nt" - BLAST_VIRAL(EXTRACT_VIRAL_READS.out.fasta, blast_db_path, blast_db_prefix, params.blast_viral_fraction, "32", "256 GB", "32 GB") + BLAST_VIRAL(EXTRACT_VIRAL_READS.out.fasta, blast_db_path, params.blast_db_prefix, params.blast_viral_fraction) + blast_subset_ch = BLAST_VIRAL.out.blast_subset + blast_paired_ch = BLAST_VIRAL.out.blast_paired + } else { + blast_subset_ch = Channel.empty() + blast_paired_ch = Channel.empty() } // Taxonomic profiling - PROFILE(CLEAN.out.reads, group_ch, kraken_db_path, params.n_reads_profile, params.ref_dir, "0.4", "27", "ribo", "${params.kraken_memory}", params.grouping) + PROFILE(CLEAN.out.reads, group_ch, kraken_db_path, params.n_reads_profile, params.ref_dir, "0.4", "27", "ribo", params.grouping) // Process output qc_ch = RAW.out.qc.concat(CLEAN.out.qc) PROCESS_OUTPUT(qc_ch) @@ -69,18 +83,22 @@ workflow RUN { params_ch = Channel.of(params_str).collectFile(name: "run-params.json") time_ch = Channel.of(start_time_str + "\n").collectFile(name: "time.txt") version_ch = Channel.fromPath("${projectDir}/pipeline-version.txt") + index_params_ch = Channel.fromPath("${params.ref_dir}/input/index-params.json") + .map { file -> file.copyTo("${workDir}/params-index.json") } + index_pipeline_version_ch = Channel.fromPath("${params.ref_dir}/logging/pipeline-version.txt") + .map { file -> file.copyTo("${workDir}/pipeline-version-index.txt") } publish: // Saved inputs - Channel.fromPath("${params.ref_dir}/input/index-params.json") >> "input" - Channel.fromPath("${params.ref_dir}/logging/pipeline-version.txt").collectFile(name: "pipeline-version-index.txt") >> "logging" + index_params_ch >> "input" + index_pipeline_version_ch >> "logging" Channel.fromPath(params.sample_sheet) >> "input" Channel.fromPath(params.adapters) >> "input" params_ch >> "input" time_ch >> "logging" version_ch >> "logging" // Intermediate files - CLEAN.out.reads >> "intermediates/reads/cleaned" - EXTRACT_RAW_READS_FROM_PROCESSED.out.reads >> "intermediates/reads/raw_viral" + CLEAN.out.reads >> "reads_cleaned" + EXTRACT_RAW_READS_FROM_PROCESSED.out.reads >> "reads_raw_viral" // QC PROCESS_OUTPUT.out.basic >> "results" PROCESS_OUTPUT.out.adapt >> "results" @@ -91,4 +109,7 @@ workflow RUN { EXTRACT_VIRAL_READS.out.counts >> "results" PROFILE.out.bracken >> "results" PROFILE.out.kraken >> "results" + // Validation output (if any) + blast_subset_ch >> "results" + blast_paired_ch >> "results" } diff --git a/workflows/run_validation.nf b/workflows/run_validation.nf index 2051ebc8..66e55cad 100644 --- a/workflows/run_validation.nf +++ b/workflows/run_validation.nf @@ -13,36 +13,48 @@ include { MAKE_VIRUS_READS_FASTA } from "../modules/local/makeVirusReadsFasta" include { BLAST_VIRAL } from "../subworkflows/local/blastViral" nextflow.preview.output = true -/***************** -| MAIN WORKFLOWS | -*****************/ +/**************** +| MAIN WORKFLOW | +****************/ // Complete primary workflow workflow RUN_VALIDATION { // Start time start_time = new Date() start_time_str = start_time.format("YYYY-MM-dd HH:mm:ss z (Z)") - // Define input - collapsed_ch = params.viral_tsv_collapsed - // Extract virus reads into FASTA format - fasta_ch = MAKE_VIRUS_READS_FASTA(collapsed_ch) + if ( params.viral_tsv_collapsed == "" ) { + // Option 1: Directly specify FASTA paths in config file (only used if no RUN output DB given) + fasta_ch = Channel.value([file(params.viral_fasta_1), file(params.viral_fasta_2)]) + } else { + // Option 2: Extract read sequences from output DB from RUN workflow (default) + // Define input + collapsed_ch = params.viral_tsv_collapsed + // Extract virus reads into FASTA format + fasta_ch = MAKE_VIRUS_READS_FASTA(collapsed_ch) + } // BLAST validation on host-viral reads if ( params.blast_viral_fraction > 0 ) { - blast_db_path = "${params.ref_dir}/results/core_nt" - blast_db_prefix = "core_nt" - BLAST_VIRAL(fasta_ch, blast_nt_path, params.blast_viral_fraction) - BLAST_VIRAL(fasta_ch, blast_db_path, blast_db_prefix, params.blast_viral_fraction, "32", "256 GB", "32 GB") + blast_db_path = "${params.ref_dir}/results/${params.blast_db_prefix}" + BLAST_VIRAL(fasta_ch, blast_db_path, params.blast_db_prefix, params.blast_viral_fraction) } // Publish results (NB: BLAST workflow has its own publish directive) params_str = JsonOutput.prettyPrint(JsonOutput.toJson(params)) params_ch = Channel.of(params_str).collectFile(name: "run-params.json") time_ch = Channel.of(start_time_str + "\n").collectFile(name: "time.txt") version_ch = Channel.fromPath("${projectDir}/pipeline-version.txt") + index_params_ch = Channel.fromPath("${params.ref_dir}/input/index-params.json") + .map { file -> file.copyTo("${workDir}/params-index.json") } + index_pipeline_version_ch = Channel.fromPath("${params.ref_dir}/logging/pipeline-version.txt") + .map { file -> file.copyTo("${workDir}/pipeline-version-index.txt") } publish: // Saved inputs - Channel.fromPath("${params.ref_dir}/input/index-params.json") >> "input" - Channel.fromPath("${params.ref_dir}/input/pipeline-version.txt").collectFile(name: "pipeline-version-index.txt") >> "input" + index_params_ch >> "input" + index_pipeline_version_ch >> "logging" + // Saved outputs params_ch >> "input" - time_ch >> "input" - version_ch >> "input" + time_ch >> "logging" + version_ch >> "logging" + // BLAST outputs + BLAST_VIRAL.out.blast_subset >> "results" + BLAST_VIRAL.out.blast_paired >> "results" }