Skip to content

Commit

Permalink
Merge branch 'main' into tmorse-nt-compression
Browse files Browse the repository at this point in the history
  • Loading branch information
phoenixAja authored Apr 8, 2024
2 parents 192710b + 63c9aed commit 0d4b64f
Show file tree
Hide file tree
Showing 17 changed files with 569 additions and 9 deletions.
27 changes: 27 additions & 0 deletions lib/idseq_utils/idseq_utils/diamond_scatter.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Diamond Scatter

`diamond_scatter.py` works with a [modified version of diamond](https://github.com/morsecodist/diamond) to use diamond's parallelization to work in separate jobs that can be run at any time instead of on an HPC cluster with a shared file system. This allows us to run alignment on batch with spot instances while dynamically scaling to meet uneven demand more easily.


The approach was to change the diamond code base as little as possible. That is why the `diamond_scatter.py` script exists, to control some aspects of the diamond parallelization management without adding it to diamond. If we want to fold this into diamond we would want to add what is in this script in addition to the fork but I am not sure this sort of parallelization is a goal of the diamond project so they may not be receptive.

# How Diamond Parallelization Works

Refer to [the docs](https://github.com/bbuchfink/diamond/wiki/6.-Distributed-computing) for a detailed explaination.

Diamond expects to run in different processes with a file system accessible by all processes. The directory contains a record of what work needs to be done that the different processes can update to communicate with one another. The diretory also allows the sharing of files between the processes. A parallel run starts with running an initialization command that creates the list of work needed for a particular alignment. Then, you can run as many processes as you want. Each processes will pick up a chunk of alignment work based on the work in the list, and will update the list when it begins. Each alignment chunk produces some intermediate files that they store in the directory. The final chunk, is able to detect it is the final chunk based on the work list and it has access to all of the intermediate files so it joins them.

# How Diamond was Modified

To make diamond run in parallel without a shared directory you need two main elements:

1. We need to split the joining from the alignment because the alignment chunk won't have all of the intermediate files. We want to be able to run the join later after we've gathered up all the intermediate files.
2. We need diamond to be able to run a single, specific, chunk and output intermediate files without deleting them.

To achieve 1 diamond was modified slightly to split the alignment command in two by adding flags: `--single-chunk` and `--join-chunks`. The `single-chunk` skips the checking for joining and cleaning up intermediate files and `join-chunks` skips alignment and goes right to that check.

Now all you need is to make sure the directory is in the correct state at each step of the processes. That is where the `diamond_scatter.py` script comes in. The script fills the role of initializing the parallel directory.

To do a single chunk of alignment work, instead of specifying all of the work in the work list the script just specifies that one chunk needs to be completed, but a specific chunk with a particular number. This is necessary because the intermediate file naming scheme uses chunk numbers so they will be needed for the final join. The modified diamond won't clean these up so you can copy them into remote storage and join at any time.

To join the chunks you just need to copy all of the intermediate files onto the node you are using to join. Thie script initializes the directory with the correct format and runs with the `join-chunks` flag.
35 changes: 35 additions & 0 deletions lib/idseq_utils/idseq_utils/diamond_scatter.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
"""
See diamond_scatter.md for a detailed description
"""

import os
import shutil
import sys
Expand Down Expand Up @@ -107,6 +111,9 @@ def zero_pad(n: int, m: int):


def make_par_dir(cwd: str, par_tmpdir: str):
"""
This creates a parallization coordination directory in the format diamond expects
"""
os.mkdir(join(cwd, par_tmpdir))
p_dir = join(cwd, par_tmpdir, "parallelizer")
os.mkdir(p_dir)
Expand All @@ -119,6 +126,11 @@ def make_par_dir(cwd: str, par_tmpdir: str):


def make_db(reference_fasta: str, output_dir: str, chunks: int):
"""
This creates a chunked diamond index. The chunks are named with a naming scheme such
that they have their chunk number, number of sequences, and number of letters. This
allows us to align these chunks without needing to store these parameteres separately.
"""
os.mkdir(output_dir)
chunk_size = (sum(1 for _ in SeqIO.parse(reference_fasta, "fasta")) // chunks) + 1
seqs = SeqIO.parse(reference_fasta, "fasta")
Expand All @@ -138,16 +150,22 @@ def make_db(reference_fasta: str, output_dir: str, chunks: int):


def blastx_chunk(db_chunk: str, output_dir: str, diamond_args: str, *query: str):
"""
Align one index chunk. Index chunks as created by `make_db`
"""
try:
os.mkdir(output_dir)
except OSError as e:
if e.errno != errno.EEXIST:
raise
# unpack the chunk number, n_seqs, and n_letters from the chunk naming scheme
chunk, n_seqs, n_letters = basename(db_chunk)[:-5].split("-")
with TemporaryDirectory() as tmp_dir:
make_par_dir(tmp_dir, "par-tmp")
# specify in the parallelization directory that we need to align this one chunk
with open(join(tmp_dir, "par-tmp", f"align_todo_{zero_pad(0, 6)}"), "w") as f:
f.writelines([align_chunk(int(chunk), 0, int(n_seqs), 0)])
# run the alignment
diamond_blastx(
cwd=tmp_dir,
par_tmpdir="par-tmp",
Expand All @@ -158,6 +176,8 @@ def blastx_chunk(db_chunk: str, output_dir: str, diamond_args: str, *query: str)
diamond_args=diamond_args,
queries=(abspath(q) for q in query),
)
# the intermediate output files will be named with this scheme
# construct their names so we can copy them as outputs
ref_block_name = f"ref_block_{zero_pad(0, 6)}_{zero_pad(int(chunk), 6)}"
ref_dict_name = f"ref_dict_{zero_pad(0, 6)}_{zero_pad(int(chunk), 6)}"
shutil.copy(
Expand All @@ -169,6 +189,12 @@ def blastx_chunk(db_chunk: str, output_dir: str, diamond_args: str, *query: str)


def mock_reference_fasta(chunks: int, chunk_size: int):
"""
This creates a dummy valid fasta which is required to build a dummy alignment index to feed
into the join command.
With the join flag the command doesn't actually use the index but the blastx command always expects a valid one
"""
letters = chunk = i = 0
while chunk < chunks:
n = 100
Expand All @@ -182,17 +208,25 @@ def mock_reference_fasta(chunks: int, chunk_size: int):
def blastx_join(chunk_dir: str, out: str, diamond_args: str, *query: str):
with TemporaryDirectory() as tmp_dir:
make_par_dir(tmp_dir, "par-tmp")
# specify in the parallelization directory that we need to join all the chunks
with open(join(tmp_dir, "par-tmp", f"join_todo_{zero_pad(0, 6)}"), "w") as f:
f.write("TOKEN\n")

# copy all of the intermediate files into the right spot
for f in os.listdir(chunk_dir):
shutil.copy(join(chunk_dir, f), join(tmp_dir, "par-tmp", f))

chunks = len(os.listdir(chunk_dir)) // 2
with NamedTemporaryFile() as ref_fasta, NamedTemporaryFile() as db:
# make fake db to appease diamond
# With the join flag the command doesn't actually use the index but the blastx command
# always expects a valid one

# make a dummy fasta
SeqIO.write(SeqRecord(Seq("M"), "ID"), ref_fasta.name, "fasta")
# turn it into a dummy index
diamond_makedb(tmp_dir, ref_fasta.name, db.name)
# join the chunks with diamond
diamond_blastx(
cwd=tmp_dir,
par_tmpdir="par-tmp",
Expand All @@ -204,6 +238,7 @@ def blastx_join(chunk_dir: str, out: str, diamond_args: str, *query: str):
queries=(abspath(q) for q in query),
)

# the join outputs the final results in different files, concatenate them
with open(out, "w") as out_f:
for out_chunk in glob(join(tmp_dir, f"{out}_*")):
with open(out_chunk) as out_chunk_f:
Expand Down
6 changes: 5 additions & 1 deletion workflows/amr/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ COPY requirements.txt requirements.txt
RUN pip3 install -r requirements.txt

# RGI version 6.0.3
RUN pip3 install git+https://github.com/lvreynoso/rgi.git@a14f8dd5457a6220a06024b6aebd05e8cb6181d7
COPY rgi_bam_parsing.patch rgi_bam_parsing.patch
RUN curl -L https://github.com/arpcard/rgi/archive/refs/tags/6.0.3.tar.gz | tar xz && \
cd rgi-6.0.3/ && \
git apply ../rgi_bam_parsing.patch && \
pip3 install .

# Install SPAdes
RUN curl -LO https://github.com/ablab/spades/releases/download/v3.11.1/SPAdes-3.11.1-Linux.tar.gz && \
Expand Down
6 changes: 6 additions & 0 deletions workflows/amr/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ CZ ID's AMR workflow implements the [Resistance Gene Identifier (RGI)](https://g

# Changelog

## 1.4.1 - 2024-04-07

### Changed

- Docker build now uses the official RGI 6.0.3 release and applies a patch, instead of installing from a fork.

## 1.4.0 - 2024-02-26

### Added
Expand Down
69 changes: 69 additions & 0 deletions workflows/amr/rgi_bam_parsing.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
diff --git a/app/kmer_query.py b/app/kmer_query.py
index b062aaf..207855c 100644
--- a/app/kmer_query.py
+++ b/app/kmer_query.py
@@ -98,7 +98,7 @@ class CARDkmers(object):
type_hit = value[hsp]["type_match"]
dna = value[hsp]["orf_dna_sequence"]

- fasta.write(">{orf}__{hsp}__{model}__{type_hit}\n{dna}\n"
+ fasta.write(">{orf}___{hsp}___{model}___{type_hit}\n{dna}\n"
.format(orf=contig_id, hsp=hsp, model=model, type_hit=type_hit, dna=dna))

except Exception as e:
@@ -113,28 +113,15 @@ class CARDkmers(object):
"""

os.system("samtools index {input_bam}".format(input_bam=self.input_bam_file))
- aligner = ""
- bam_file = pysam.AlignmentFile(self.input_bam_file, "rb")
- header = bam_file.text.split("\n")
-
- for h in header:
- if "@PG" in h:
- aligner = h.split("\t")[1]
-
- if aligner == "ID:KMA":
- os.system("""samtools view -F 4 -F 2048 {bam} | while read line; do awk '{cmd}'; done > {out}"""
- .format(bam=self.input_bam_file, cmd="""{print ">"$1"__"$4"__"$3"__"$5"\\n"$11}""", out=self.fasta_file))
- else:
- os.system("""samtools view -F 4 -F 2048 {bam} | while read line; do awk '{cmd}'; done > {out}"""
- .format(bam=self.input_bam_file, cmd="""{print ">"$1"__"$3"__"$2"__"$5"\\n"$10}""", out=self.fasta_file))
-
+ os.system("""samtools view -F 4 -F 2048 {bam} | while read line; do awk -F '\t' '{cmd}'; done > {out}"""
+ .format(bam=self.input_bam_file, cmd="""{print ">"$1"___"$3"___"$2"___"$5"\\n"$10}""", out=self.fasta_file))


def get_bwt_alignment_data(self, header):
"""
bit-wise flag reference: http://blog.nextgenetics.net/?e=18
"""
- qname, model, flag, mapq = header.split("__")
+ qname, model, flag, mapq = header.split("___")

if flag.isdigit() is False:
logger.error("failed to parse BAM file: {}, please check which aligner software was used to produce the BAM file in the RGI BWT step.".format(self.input_bam_file))
@@ -149,7 +136,7 @@ class CARDkmers(object):
return read, model, flag, mapq

# def get_rgi_data(self, header):
- # orf, hsp, model, type_hit = header.split("__")
+ # orf, hsp, model, type_hit = header.split("___")
# return orf, hsp, model, type_hit

def populate_rgi_json(self, orf, hsp, model, type_hit, num_kmers, amr_c, tax, gen, o):
@@ -221,12 +208,12 @@ class CARDkmers(object):
dna = str(entry.seq)
num_kmers = len(dna) - k + 1
if type == 'rgi':
- contig_id, hsp, model, type_hit = entry.id.split("__")
+ contig_id, hsp, model, type_hit = entry.id.split("___")
for x in self.orf_list:
if x.split()[0] == contig_id:
orf_id = x
elif type == 'bwt':
- read, model, flag, mapq = self.get_bwt_alignment_data(entry.id)
+ read, model, flag, mapq = self.get_bwt_alignment_data(entry.description)
elif type == "fasta":
read = entry.id
else:
10 changes: 10 additions & 0 deletions workflows/bulk-download/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# syntax=docker/dockerfile:1.4
FROM ubuntu:20.04

LABEL maintainer="CZ ID Team <[email protected]>"

RUN apt-get update && apt-get -y install python3 python3-pip zip
RUN ln -s /usr/bin/python3 /usr/bin/python

COPY requirements.txt requirements.txt
RUN pip3 install -r requirements.txt
4 changes: 4 additions & 0 deletions workflows/bulk-download/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# CZ ID Bulk Downloads Pipeline

# Changelog

50 changes: 50 additions & 0 deletions workflows/bulk-download/manifest.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
workflow_name: bulk-download
specification_language: WDL
description: Generate bulk downloads for the CZID web application
entity_inputs:
consensus_genomes:
name: Consensus Genomes
description: Consensus genomes to bulk download
entity_type: consensus_genome
multivalue: True
required: False
raw_inputs:
aggregate_action:
name: Aggregate Action
description: Concatenate or zip files to aggregate files to create the bulk download
type: str
values:
- concatenate
- zip
bulk_download_type:
name: Bulk Download Type
description: The type of bulk download this represents
type: str
values:
- consensus_genome
- consensus_genome_intermediate_output_files
input_loaders:
- name: bulk_download
version: ">=0.0.1"
inputs:
consensus_genomes: ~
bulk_download_type: ~
outputs:
files: ~
- name: passthrough
version: ">=0.0.1"
inputs:
aggregate_action: ~
outputs:
aggregate_action: action
- name: czid_docker
version: ">=0.0.1"
outputs:
docker_image_id: ~
output_loaders:
- name: bulk_download
version: ">=0.0.1"
inputs:
bulk_download_type: ~
workflow_outputs:
file: "bulk_download.file"
3 changes: 3 additions & 0 deletions workflows/bulk-download/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
boto3
awscli
stream-unzip
Loading

0 comments on commit 0d4b64f

Please sign in to comment.