diff --git a/Cargo.lock b/Cargo.lock index 3b734ef..e3a7760 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -55,6 +55,18 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" +[[package]] +name = "bitvec" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bc2832c24239b0141d5674bb9174f9d68a8b5b3f2753311927c172ca46f7e9c" +dependencies = [ + "funty", + "radium", + "tap", + "wyz", +] + [[package]] name = "bstr" version = "0.2.17" @@ -126,6 +138,12 @@ dependencies = [ "jobserver", ] +[[package]] +name = "cfg-if" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" + [[package]] name = "cfg-if" version = "1.0.0" @@ -174,7 +192,7 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", ] [[package]] @@ -183,7 +201,7 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2801af0d36612ae591caa9568261fddce32ce6e08a7275ea334a06a4ad021a2c" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "crossbeam-channel", "crossbeam-deque", "crossbeam-epoch", @@ -197,7 +215,7 @@ version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2dd04ddaf88237dc3b8d8f9a3c1004b506b54b3313403944054d23c0870c521" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "crossbeam-utils", ] @@ -207,7 +225,7 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "715e8152b692bba2d374b53d4875445368fdf21a94751410af607a5ac677d1fc" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "crossbeam-epoch", "crossbeam-utils", ] @@ -219,7 +237,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "045ebe27666471bb549370b4b0b3e51b07f56325befa4284db65fc89c02511b1" dependencies = [ "autocfg", - "cfg-if", + "cfg-if 1.0.0", "crossbeam-utils", "memoffset", "once_cell", @@ -232,7 +250,7 @@ version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1cd42583b04998a5363558e5f9291ee5a5ff6b49944332103f251e7479a82aa7" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "crossbeam-utils", ] @@ -242,7 +260,7 @@ version = "0.8.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "51887d4adc7b564537b15adcfb307936f8075dfcd5f00dde9a9f1d29383682bc" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "once_cell", ] @@ -325,7 +343,9 @@ dependencies = [ name = "fqgrep" version = "0.1.1-alpha.1" dependencies = [ + "ansi_term", "anyhow", + "bitvec", "bstr", "built", "csv", @@ -333,6 +353,7 @@ dependencies = [ "flate2", "flume", "gzp", + "isatty", "itertools", "lazy_static", "log", @@ -347,6 +368,12 @@ dependencies = [ "structopt", ] +[[package]] +name = "funty" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c" + [[package]] name = "futures-core" version = "0.3.21" @@ -365,7 +392,7 @@ version = "0.2.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4eb1a864a501629691edf6c15a593b7a51eebaa1e8468e9ddc623de7c9b58ec6" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "js-sys", "libc", "wasi", @@ -443,7 +470,19 @@ version = "0.1.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", +] + +[[package]] +name = "isatty" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e31a8281fc93ec9693494da65fbf28c0c2aa60a2eaec25dc58e2f31952e95edc" +dependencies = [ + "cfg-if 0.1.10", + "libc", + "redox_syscall 0.1.57", + "winapi 0.3.9", ] [[package]] @@ -569,7 +608,7 @@ version = "0.4.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", ] [[package]] @@ -653,10 +692,10 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d76e8e1493bcac0d2766c42737f34458f1c8c50c0d23bcb24ea953affb273216" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "instant", "libc", - "redox_syscall", + "redox_syscall 0.2.16", "smallvec", "winapi 0.3.9", ] @@ -745,6 +784,12 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "radium" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc33ff2d4973d518d823d61aa239014831e521c75da58e3df4840d3f47749d09" + [[package]] name = "rayon" version = "1.5.3" @@ -769,6 +814,12 @@ dependencies = [ "num_cpus", ] +[[package]] +name = "redox_syscall" +version = "0.1.57" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41cc0f7e4d5d4544e8861606a285bb08d3e70712ccc7d2b84d7c0ccfaf4b05ce" + [[package]] name = "redox_syscall" version = "0.2.16" @@ -924,6 +975,12 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "tap" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" + [[package]] name = "termcolor" version = "1.1.3" @@ -1067,7 +1124,7 @@ version = "0.2.82" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc7652e3f6c4706c8d9cd54832c4a4ccb9b5336e2c3bd154d5cccfbf1c1f5f7d" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "wasm-bindgen-macro", ] @@ -1157,3 +1214,12 @@ name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + +[[package]] +name = "wyz" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30b31594f29d27036c383b53b59ed3476874d518f0efb151b27a4c275141390e" +dependencies = [ + "tap", +] diff --git a/Cargo.toml b/Cargo.toml index 6c141c1..a728e37 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,19 +12,30 @@ readme = "README.md" categories = ["science"] keywords = ["bioinformatics", "fastq"] +[lib] +name = "fqgrep_lib" +path = "src/lib/mod.rs" + +[[bin]] +name = "fqgrep" +path = "src/main.rs" + [profile.release] lto = true [dependencies] +ansi_term = "0.12.1" anyhow = "1.0.44" +bitvec = "1.0.1" bstr = "0.2.17" csv = "1.1.6" env_logger = "0.9.0" flate2 = {version = "1.0.22", default-features = false, features = ["zlib-ng-compat"]} flume = "0.10.9" gzp = "0.9.1" -itertools = "0.10.1" +isatty = "0.1.9" +itertools = "0.10.3" lazy_static = "1.4.0" log = "0.4.14" mimalloc = {version = "0.1.26", default-features = false} @@ -32,7 +43,7 @@ num_cpus = "1.13.0" parking_lot = "0.11.2" proglog = {version = "0.3.0", features = ["pretty_counts"]} rayon = "1.5.1" -regex = "1.5.4" +regex = "1.6.0" seq_io = "0.3.1" serde = {version = "1.0.130", features = ["derive"]} structopt = "0.3.23" diff --git a/precommit.sh b/precommit.sh new file mode 100755 index 0000000..0b93435 --- /dev/null +++ b/precommit.sh @@ -0,0 +1,55 @@ +#!/usr/bin/env bash + +############################################################################### +# Script that should be run pre-commit after making any changes. +############################################################################### + +set -e + +failures="" + +function banner() { + echo + echo "================================================================================" + echo $* + echo "================================================================================" + echo +} + +##################################################################### +# Takes two parameters, a "name" and a "command". +# Runs the command and prints out whether it succeeded or failed, and +# also tracks a list of failed steps in $failures. +##################################################################### +function run() { + local name=$1 + local cmd=$2 + + banner "Running $name" + set +e + $cmd + exit_code=$? + set -e + + if [[ $exit_code == 0 ]]; then + echo Passed $name + else + echo Failed $name + if [ -z "$failures" ]; then + failures="$name" + else + failures="$failures, $name" + fi + fi +} + +run "cargo fmt" "cargo fmt --all" +run "cargo clippy" "cargo clippy --all-features --locked -- -D warnings" +run "cargo test" "cargo test --locked --quiet" + +if [ -z "$failures" ]; then + banner "Precommit Passed" +else + banner "Precommit Failed with failures in: $failures" + exit 1 +fi diff --git a/rust-toolchain.toml b/rust-toolchain.toml new file mode 100644 index 0000000..cc8f987 --- /dev/null +++ b/rust-toolchain.toml @@ -0,0 +1,3 @@ +[toolchain] +channel = "1.64.0" +components = ["rustfmt", "clippy"] diff --git a/src/lib/color.rs b/src/lib/color.rs new file mode 100644 index 0000000..f23f060 --- /dev/null +++ b/src/lib/color.rs @@ -0,0 +1,32 @@ +use ansi_term::Color; + +/// Color for matching bases in a FASTQ +pub const COLOR_BASES: Color = Color::Red; +/// Color for matching base qualities in a FASTQ +pub const COLOR_QUALS: Color = Color::Fixed(22); +/// Color for a matching read name (head) of a FASTQ +pub const COLOR_HEAD: Color = Color::Fixed(30); +/// Color for all non-matching text in a FASTQ +pub const COLOR_BACKGROUND: Color = Color::Fixed(240); + +/// Colors the text with the given color +pub fn color(text: &[u8], colour: &Color) -> Vec { + let mut colored: Vec = Vec::with_capacity(text.len()); + colour.paint(text).write_to(&mut colored).unwrap(); + colored +} + +/// Color for the read name (head) of a FASTQ +pub fn color_head(text: &[u8]) -> Vec { + color(text, &COLOR_HEAD) +} + +/// Color for the base qualities of a FASTQ +pub fn color_quals(text: &[u8]) -> Vec { + color(text, &COLOR_QUALS) +} + +/// Color for non-matching bases, quals, and other text +pub fn color_background(text: &[u8]) -> Vec { + color(text, &COLOR_BACKGROUND) +} diff --git a/src/lib/matcher.rs b/src/lib/matcher.rs new file mode 100644 index 0000000..d6055dd --- /dev/null +++ b/src/lib/matcher.rs @@ -0,0 +1,409 @@ +use bitvec::prelude::*; +use std::ops::Range; + +use crate::color::{color_background, color_head}; +use crate::color::{COLOR_BACKGROUND, COLOR_BASES, COLOR_QUALS}; +use crate::reverse_complement; +use crate::DNA_BASES; +use anyhow::{bail, Context, Result}; +use bstr::ByteSlice; +use regex::bytes::{Regex, RegexBuilder, RegexSet, RegexSetBuilder}; +use seq_io::fastq::{OwnedRecord, Record}; + +/// Common options for pattern matchers +#[derive(Copy, Clone, Debug)] +pub struct MatcherOpts { + /// Invert the matching. The bases are said to match if they do not contain the pattern + pub invert_match: bool, + /// Include the reverse complement of the bases. The bases are said to match if they or their + /// reverse complement contains the pattern. + pub reverse_complement: bool, + /// Color the read based on the match + pub color: bool, +} + +/// Builds a bit vector from an iterator of ranges. The ranges may overlap each other. +fn to_bitvec(ranges: impl Iterator>, len: usize) -> BitVec { + let mut vec = bitvec![0; len]; + ranges.for_each(|range| { + for index in range { + vec.set(index, true); + } + }); + vec +} + +/// Color the bases and qualities based on ranges specifying where a pattern matched bases. The +/// ranges may overlap each other. +fn bases_colored( + bases: &[u8], + quals: &[u8], + ranges: impl Iterator>, +) -> (Vec, Vec) { + // The resulting colored bases + let mut colored_bases = Vec::with_capacity(bases.len()); + let mut colored_quals = Vec::with_capacity(bases.len()); + + // Merge the ranges into a bit mask, with 1 indicating that base is part of a pattern match + let bits = to_bitvec(ranges, bases.len()); + + // Iterate over the bit mask, finding stretches of matching and non-matching bases. Color both + // in both the bases and qualities + let mut last_color_on = false; + let mut last_bases_index = 0; + let mut cur_bases_index = 0; + for base_color_on in bits.iter() { + if *base_color_on { + // this base is to be colored + if !last_color_on { + // add up to but not including this base to the colored vector **as uncolored** + if last_bases_index + 1 < cur_bases_index { + COLOR_BACKGROUND + .paint(&bases[last_bases_index..cur_bases_index]) + .write_to(&mut colored_bases) + .unwrap(); + COLOR_BACKGROUND + .paint(&quals[last_bases_index..cur_bases_index]) + .write_to(&mut colored_quals) + .unwrap(); + } + // first base in a run of bases to be colored + last_bases_index = cur_bases_index; + } + + last_color_on = true; + } else { + // this base is not to be colored + if last_color_on { + // add up to but not including this base to the colored vector **as colored** + if last_bases_index + 1 < cur_bases_index { + COLOR_BASES + .paint(&bases[last_bases_index..cur_bases_index]) + .write_to(&mut colored_bases) + .unwrap(); + COLOR_QUALS + .paint(&quals[last_bases_index..cur_bases_index]) + .write_to(&mut colored_quals) + .unwrap(); + } + // first base in a run of bases to be colored + last_bases_index = cur_bases_index; + } + last_color_on = false; + } + cur_bases_index += 1; + } + // Color to the end + if last_bases_index + 1 < cur_bases_index { + if last_color_on { + COLOR_BASES + .paint(&bases[last_bases_index..cur_bases_index]) + .write_to(&mut colored_bases) + .unwrap(); + COLOR_QUALS + .paint(&quals[last_bases_index..cur_bases_index]) + .write_to(&mut colored_quals) + .unwrap(); + } else { + COLOR_BACKGROUND + .paint(&bases[last_bases_index..cur_bases_index]) + .write_to(&mut colored_bases) + .unwrap(); + COLOR_BACKGROUND + .paint(&quals[last_bases_index..cur_bases_index]) + .write_to(&mut colored_quals) + .unwrap(); + } + } + + (colored_bases, colored_quals) +} + +/// Validates that a given FIXED pattern contains only valid DNA bases (ACGTN) +pub fn validate_fixed_pattern(pattern: &str) -> Result<()> { + for (index, base) in pattern.chars().enumerate() { + if !DNA_BASES.contains(&(base as u8)) { + bail!( + "Fixed pattern must contain only DNA bases: {} .. [{}] .. {}", + &pattern[0..index], + &pattern[index..=index], + &pattern[index + 1..], + ) + } + } + Ok(()) +} + +/// Base trait for all pattern matchers +pub trait Matcher { + /// The options for the pattern matcher + fn opts(&self) -> MatcherOpts; + + /// Returns true if the bases match the pattern, false otherwise + fn bases_match(&self, bases: &[u8]) -> bool; + + /// Colors the bases and qualities based on where they match the pattern. All bases + /// that match the pattern are colored. Colored in this case means adding ANSI color + /// codes for printing to a terminal. + fn color_matched_bases(&self, bases: &[u8], quals: &[u8]) -> (Vec, Vec); + + /// Returns true if the read's bases match the pattern, false otherwise + fn read_match(&self, read: &mut OwnedRecord) -> bool { + let match_found = if self.opts().invert_match { + self.bases_match(read.seq()) + && (!self.opts().reverse_complement + || self.bases_match(&reverse_complement(read.seq()))) + } else { + self.bases_match(read.seq()) + || (self.opts().reverse_complement + && self.bases_match(&reverse_complement(read.seq()))) + }; + + if self.opts().color { + if match_found { + let (seq, qual) = self.color_matched_bases(&read.seq, &read.qual); + read.head = color_head(&read.head); + read.seq = seq; + read.qual = qual; + } else { + // always color, in case the read is paired + read.head = color_background(&read.head); + read.seq = color_background(&read.seq); + read.qual = color_background(&read.qual); + } + } + + match_found + } +} + +/// Matcher for a fixed string pattern +pub struct FixedStringMatcher { + pattern: Vec, + opts: MatcherOpts, +} + +impl Matcher for FixedStringMatcher { + fn bases_match(&self, bases: &[u8]) -> bool { + bases.find(&self.pattern).is_some() != self.opts.invert_match + } + + fn color_matched_bases(&self, bases: &[u8], quals: &[u8]) -> (Vec, Vec) { + let ranges = bases.find_iter(&self.pattern).map(|start| Range { + start, + end: start + self.pattern.len(), + }); + if self.opts().reverse_complement { + let bases_revcomp = &reverse_complement(bases); + let ranges_revcomp = bases_revcomp + .find_iter(&self.pattern) + .map(|start| bases.len() - start - self.pattern.len()) + .map(|start| Range { + start, + end: start + self.pattern.len(), + }); + bases_colored(bases, quals, ranges.chain(ranges_revcomp)) + } else { + bases_colored(bases, quals, ranges) + } + } + + fn opts(&self) -> MatcherOpts { + self.opts + } +} + +impl FixedStringMatcher { + pub fn new(pattern: &str, opts: MatcherOpts) -> Self { + let pattern = pattern.as_bytes().to_vec(); + Self { pattern, opts } + } +} + +/// Matcher for a set of fixed string patterns +pub struct FixedStringSetMatcher { + patterns: Vec>, + opts: MatcherOpts, +} + +impl Matcher for FixedStringSetMatcher { + fn bases_match(&self, bases: &[u8]) -> bool { + self.patterns + .iter() + .any(|pattern| bases.find(&pattern).is_some()) + != self.opts.invert_match + } + + fn color_matched_bases(&self, bases: &[u8], quals: &[u8]) -> (Vec, Vec) { + let ranges = self.patterns.iter().flat_map(|pattern| { + bases + .find_iter(&pattern) + .map(|start| Range { + start, + end: start + pattern.len(), + }) + .collect::>() + }); + if self.opts().reverse_complement { + let bases_revcomp = &reverse_complement(bases); + let ranges_revcomp = self.patterns.iter().flat_map(|pattern| { + bases_revcomp + .find_iter(&pattern) + .map(|start| bases.len() - start - pattern.len()) + .map(|start| Range { + start, + end: start + pattern.len(), + }) + .collect::>() + }); + bases_colored(bases, quals, ranges.chain(ranges_revcomp)) + } else { + bases_colored(bases, quals, ranges) + } + } + + fn opts(&self) -> MatcherOpts { + self.opts + } +} + +impl FixedStringSetMatcher { + pub fn new(patterns: I, opts: MatcherOpts) -> Self + where + S: AsRef, + I: IntoIterator, + { + let patterns: Vec> = patterns + .into_iter() + .map(|pattern| pattern.as_ref().to_owned().as_bytes().to_vec()) + .collect(); + Self { patterns, opts } + } +} + +/// Matcher for a regular expression pattern +pub struct RegexMatcher { + regex: Regex, + opts: MatcherOpts, +} + +impl RegexMatcher { + pub fn new(pattern: &str, opts: MatcherOpts) -> Self { + let regex = RegexBuilder::new(pattern) + .build() + .context(format!("Invalid regular expression: {}", pattern)) + .unwrap(); + Self { regex, opts } + } +} + +impl Matcher for RegexMatcher { + fn bases_match(&self, bases: &[u8]) -> bool { + self.regex.is_match(bases) != self.opts.invert_match + } + + fn color_matched_bases(&self, bases: &[u8], quals: &[u8]) -> (Vec, Vec) { + let ranges = self.regex.find_iter(bases).map(|m| m.range()); + if self.opts().reverse_complement { + let bases_revcomp = &reverse_complement(bases); + let ranges_revcomp = + self.regex + .find_iter(bases_revcomp) + .map(|m| m.range()) + .map(|range| Range { + start: bases.len() - range.start - range.len(), + end: bases.len() - range.start, + }); + bases_colored(bases, quals, ranges.chain(ranges_revcomp)) + } else { + bases_colored(bases, quals, ranges) + } + } + + fn opts(&self) -> MatcherOpts { + self.opts + } +} + +pub struct RegexSetMatcher { + regex_set: RegexSet, + regex_matchers: Vec, + opts: MatcherOpts, +} + +/// Matcher for a set of regular expression patterns +impl RegexSetMatcher { + pub fn new(patterns: I, opts: MatcherOpts) -> Self + where + S: AsRef, + I: IntoIterator, + { + let string_patterns: Vec = patterns + .into_iter() + .map(|p| p.as_ref().to_string()) + .collect(); + let regex_set = RegexSetBuilder::new(string_patterns.clone()) + .build() + .unwrap(); + let regex_matchers: Vec = string_patterns + .into_iter() + .map(|pattern| RegexMatcher::new(pattern.as_ref(), opts)) + .collect(); + Self { + regex_set, + regex_matchers, + opts, + } + } +} + +impl Matcher for RegexSetMatcher { + fn bases_match(&self, bases: &[u8]) -> bool { + self.regex_set.is_match(bases) != self.opts.invert_match + } + + fn color_matched_bases(&self, bases: &[u8], quals: &[u8]) -> (Vec, Vec) { + let ranges = self + .regex_matchers + .iter() + .flat_map(|r| r.regex.find_iter(bases).map(|m| m.range())); + if self.opts().reverse_complement { + let bases_revcomp = &reverse_complement(bases); + let ranges_revcomp = self.regex_matchers.iter().flat_map(|r| { + r.regex + .find_iter(bases_revcomp) + .map(|m| m.range()) + .map(|range| Range { + start: bases.len() - range.start - range.len(), + end: bases.len() - range.start, + }) + }); + bases_colored(bases, quals, ranges.chain(ranges_revcomp)) + } else { + bases_colored(bases, quals, ranges) + } + } + + fn opts(&self) -> MatcherOpts { + self.opts + } +} + +/// Factory for building a matcher +pub struct MatcherFactory; + +impl MatcherFactory { + pub fn new_matcher( + pattern: &Option, + fixed_strings: bool, + regexp: &Vec, + match_opts: MatcherOpts, + ) -> Box { + match (fixed_strings, &pattern) { + (true, Some(pattern)) => Box::new(FixedStringMatcher::new(pattern, match_opts)), + (false, Some(pattern)) => Box::new(RegexMatcher::new(pattern, match_opts)), + (true, None) => Box::new(FixedStringSetMatcher::new(regexp, match_opts)), + (false, None) => Box::new(RegexSetMatcher::new(regexp, match_opts)), + } + } +} diff --git a/src/lib/mod.rs b/src/lib/mod.rs new file mode 100644 index 0000000..aa8451b --- /dev/null +++ b/src/lib/mod.rs @@ -0,0 +1,73 @@ +#![deny(unsafe_code)] +#![allow( + clippy::must_use_candidate, + clippy::missing_panics_doc, + clippy::missing_errors_doc, + clippy::module_name_repetitions +)] +pub mod color; +pub mod matcher; +use lazy_static::lazy_static; +use std::{borrow::Borrow, path::Path}; + +pub const DNA_BASES: [u8; 5] = *b"ACGTN"; +pub const IUPAC_BASES: [u8; 15] = *b"AGCTYRWSKMDVHBN"; +pub const IUPAC_BASES_COMPLEMENT: [u8; 15] = *b"TCGARYWSMKHBDVN"; + +lazy_static! { + pub static ref COMPLEMENT: [u8; 256] = { + let mut comp = [0; 256]; + for (v, a) in comp.iter_mut().enumerate() { + *a = v as u8; + } + for (&a, &b) in IUPAC_BASES.iter().zip(IUPAC_BASES_COMPLEMENT.iter()) { + comp[a as usize] = b; + comp[a as usize + 32] = b + 32; // lowercase variants + } + comp + }; +} + +fn complement(a: u8) -> u8 { + COMPLEMENT[a as usize] +} + +fn reverse_complement(text: T) -> Vec +where + C: Borrow, + T: IntoIterator, + T::IntoIter: DoubleEndedIterator, +{ + text.into_iter() + .rev() + .map(|a| complement(*a.borrow())) + .collect() +} + +/// Returns true if the path ends with a recognized GZIP file extension +fn is_path_with_extension>(p: &P, extensions: [&str; 2]) -> bool { + if let Some(ext) = p.as_ref().extension() { + match ext.to_str() { + Some(x) => extensions.contains(&x), + None => false, + } + } else { + false + } +} + +/// The set of file extensions to treat as GZIPPED +const GZIP_EXTENSIONS: [&str; 2] = ["gz", "bgz"]; + +/// Returns true if the path ends with a recognized GZIP file extension +pub fn is_gzip_path>(p: &P) -> bool { + is_path_with_extension(p, GZIP_EXTENSIONS) +} + +/// The set of file extensions to treat as FASTQ +const FASTQ_EXTENSIONS: [&str; 2] = ["fastq", "fq"]; + +/// Returns true if the path ends with a recognized FASTQ file extension +pub fn is_fastq_path>(p: &P) -> bool { + is_path_with_extension(p, FASTQ_EXTENSIONS) +} diff --git a/src/main.rs b/src/main.rs index 810bc8f..fad492a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,31 +1,31 @@ #[global_allocator] static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; +use isatty::stdout_isatty; +use std::io::Stdout; +use std::process::ExitCode; use std::{ - borrow::Borrow, - fs, fs::File, - io::{BufReader, BufWriter}, - path::{Path, PathBuf}, - thread::JoinHandle, + io::{BufRead, BufReader, BufWriter, Read, Write}, + path::PathBuf, + str::FromStr, }; +use structopt::clap::arg_enum; -use anyhow::{Error, Result}; -use bstr::ByteSlice; +use anyhow::{bail, ensure, Context, Result}; use env_logger::Env; use flate2::bufread::MultiGzDecoder; use flume::{bounded, Receiver, Sender}; -use gzp::{deflate::Bgzf, Compression, ZBuilder, ZWriter, BUFSIZE}; +use fqgrep_lib::matcher::{validate_fixed_pattern, Matcher, MatcherFactory, MatcherOpts}; +use fqgrep_lib::{is_fastq_path, is_gzip_path}; +use gzp::BUFSIZE; use itertools::{self, izip, Itertools}; use lazy_static::lazy_static; -use log::info; use parking_lot::Mutex; -use proglog::{CountFormatterKind, ProgLogBuilder}; +use proglog::{CountFormatterKind, ProgLog, ProgLogBuilder}; use rayon::prelude::*; -use regex::{Regex, RegexBuilder}; -use seq_io::fastq::{self, OwnedRecord}; -use serde::{Deserialize, Serialize}; -use structopt::{clap::AppSettings::ColoredHelp, StructOpt}; +use seq_io::fastq::{self, OwnedRecord, Record}; +use structopt::{clap::AppSettings, StructOpt}; /// The number of reads in a chunk const CHUNKSIZE: usize = 5000; // * num_cpus::get(); @@ -33,33 +33,11 @@ const CHUNKSIZE: usize = 5000; // * num_cpus::get(); const READER_CHANNEL_SIZE: usize = 100; const WRITER_CHANNEL_SIZE: usize = 2000; -const REF_PREFIX: &str = "REF"; -const ALT_PREFIX: &str = "ALT"; -const BOTH_PREFIX: &str = "BOTH"; - -/// Helper type to represent the accumlated reads and what match type they are. -/// -/// `p` are ref reads -/// `1` are alt reads -/// `2` are reads that matched both -type MatchedReads = ( - (Vec, Vec), - (Vec, Vec), - (Vec, Vec), -); - lazy_static! { /// Return the number of cpus as a String pub static ref NUM_CPU: String = num_cpus::get().to_string(); } -lazy_static! { - static ref R1_REGEX: Regex = RegexBuilder::new(r"r1") - .case_insensitive(true) - .build() - .expect("Failed to compile R1 regex"); -} - pub mod built_info { use lazy_static::lazy_static; include!(concat!(env!("OUT_DIR"), "/built.rs")); @@ -88,470 +66,441 @@ pub mod built_info { } } -lazy_static! { - static ref COMPLEMENT: [u8; 256] = { - let mut comp = [0; 256]; - for (v, a) in comp.iter_mut().enumerate() { - *a = v as u8; - } - for (&a, &b) in b"AGCTYRWSKMDVHBN".iter().zip(b"TCGARYWSMKHBDVN".iter()) { - comp[a as usize] = b; - comp[a as usize + 32] = b + 32; // lowercase variants - } - comp - }; +struct FastqWriter { + tx: Sender>, + lock: Mutex<()>, } -fn complement(a: u8) -> u8 { - COMPLEMENT[a as usize] -} +impl FastqWriter { + fn new(count_tx: Sender, count: bool, paired: bool) -> Self { + // TODO: try making this unbounded + let (tx, rx): (Sender>, Receiver>) = + bounded(WRITER_CHANNEL_SIZE); + std::thread::spawn(move || { + let mut maybe_writer: Option> = { + if count { + None + } else { + Some(BufWriter::with_capacity(BUFSIZE, std::io::stdout())) + } + }; -fn revcomp(text: T) -> Vec -where - C: Borrow, - T: IntoIterator, - T::IntoIter: DoubleEndedIterator, -{ - text.into_iter() - .rev() - .map(|a| complement(*a.borrow())) - .collect() -} + let mut num_matches = 0; + while let Ok(reads) = rx.recv() { + num_matches += reads.len(); + if let Some(ref mut writer) = maybe_writer { + for read in reads { + fastq::write_to(&mut *writer, &read.head, &read.seq, &read.qual) + .expect("failed writing read"); + } + }; + } + if paired { + num_matches /= 2; + } -#[derive(Debug, Serialize, Deserialize)] -struct FqGrepMetrics { - total_alt_reads_found: usize, - total_ref_reads_found: usize, - total_both_reads_found: usize, - total_reads_found: usize, -} + if count { + std::io::stdout() + .write_all(format!("{}\n", num_matches).as_bytes()) + .unwrap(); + } -impl FqGrepMetrics { - fn new( - total_ref_reads_found: usize, - total_alt_reads_found: usize, - total_both_reads_found: usize, - ) -> Self { - Self { - total_ref_reads_found, - total_alt_reads_found, - total_both_reads_found, - total_reads_found: total_alt_reads_found - + total_both_reads_found - + total_ref_reads_found, - } + if let Some(mut writer) = maybe_writer { + writer.flush().expect("Error flushing writer"); + }; + count_tx + .send(num_matches) + .expect("failed sending final count"); + }); + let lock = Mutex::new(()); + Self { tx, lock } } } -#[derive(Debug, Clone)] -struct Sample { - name: String, -} - -impl Sample { - fn new(name: String) -> Self { - Self { name } - } - - fn from_r1_path>(r1_path: P, prefix: &str) -> Result { - let filename = r1_path - .as_ref() - .file_name() - .expect("Unable to extract file name from R1") - .to_string_lossy(); - let parts: Vec<&str> = R1_REGEX.splitn(&filename, 2).collect(); - if let Some(&part) = parts.first() { - let name: &str = &part[0..part.len() - 1]; - Ok(Self::new(format!("{}_{}", prefix, name.to_owned()))) - } else { - Err(Error::msg("Unable to extract a sample name from R1 path")) - } - } - - fn filenames>(&self, dir: P, illumina: bool) -> (PathBuf, PathBuf) { - if illumina { - ( - dir.as_ref() - .join(format!("{}_S1_L001_R1_001.fastq.gz", self.name)), - dir.as_ref() - .join(format!("{}_S1_L001_R2_001.fastq.gz", self.name)), - ) +fn spawn_reader(file: PathBuf, decompress: bool) -> Receiver> { + let (tx, rx) = bounded(READER_CHANNEL_SIZE); + std::thread::spawn(move || { + // Open the file or standad input + let raw_handle = if file.as_os_str() == "-" { + Box::new(std::io::stdin()) as Box } else { - ( - dir.as_ref().join(format!("{}.R1.fastq.gz", self.name)), - dir.as_ref().join(format!("{}.R2.fastq.gz", self.name)), - ) + let handle = File::open(&file) + .with_context(|| format!("Error opening input: {}", file.display())) + .unwrap(); + Box::new(handle) as Box + }; + // Wrap it in a buffer + let buf_handle = BufReader::with_capacity(BUFSIZE, raw_handle); + // Maybe wrap it in a decompressor + let maybe_decoder_handle = { + let is_gzip = is_gzip_path(&file) || (!is_fastq_path(&file) && decompress); + if is_gzip { + Box::new(MultiGzDecoder::new(buf_handle)) as Box + } else { + Box::new(buf_handle) as Box + } + }; + // Open a FASTQ reader, get an iterator over the records, and chunk them + let fastq_reader = fastq::Reader::with_capacity(maybe_decoder_handle, BUFSIZE) + .into_records() + .chunks(CHUNKSIZE * num_cpus::get()); + // Iterate over the chunks + for chunk in &fastq_reader { + tx.send(chunk.map(|r| r.expect("Error reading")).collect()) + .expect("Error sending"); } - } + }); + rx } -struct WriterStats { - reads_written: usize, +arg_enum! { + #[derive(PartialEq, Debug)] + enum Color { + Never, + Always, + Auto, +} } -impl WriterStats { - fn new() -> Self { - Self { reads_written: 0 } - } +/// The fqgrep utility searches any given input FASTQ files, selecting records whose bases match +/// one or more patterns. By default, a pattern matches the bases in a FASTQ record if the +/// regular expression (RE) in the pattern matches the bases. An empty expression matches every +/// line. Each FASTQ record that matches at least one of the patterns is written to the standard +/// output. +/// +/// INPUT COMPRESSION +/// +/// By default, the input files are assumed to be uncompressed with the following exceptions: (1) +/// If the input files are real files and end with `.gz` or `.bgz`, they are assumed to be GZIP +/// compressed, or (2) if they end with `.fastq` or `.fq`, they are assumed to be uncompressed, or +/// (3) if the `-Z/--decompress` option is specified then any unrecongized inputs (including +/// standard input) are assumed to be GZIP compressed. +/// +/// THREADS +/// +/// The `--threads` option controls the number of threads used to _search_ the reads. +/// Independently, for single end reads or interleaved paired end reads, a single thread will be +/// used to read each input FASTQ. For paired end reads across pairs of FASTQs, two threads will +/// be used to read the FASTQs for each end of a pair. Finally, a single thread will be created +/// for the writer. +/// +/// EXIT STATUS +/// +/// The fqgrep utility exits with one of the following values: +/// 0 if one or more lines were selected, 1 if no lines were selected, and >1 if an error occurred. +/// +#[derive(StructOpt, Debug)] +#[structopt( + name = "fqgrep", + global_setting(AppSettings::ColoredHelp), + global_setting(AppSettings::DeriveDisplayOrder), + version = built_info::VERSION.as_str()) + ] +#[allow(clippy::struct_excessive_bools)] +struct Opts { + /// The number of threads to use for matching reads against pattern. See the full usage for + /// threads specific to reading and writing. + #[structopt(long, short = "t", default_value = NUM_CPU.as_str())] + threads: usize, + + // TODO: support GREP_COLOR(S) (or FQGREP_COLOR(S)) environment variables + /// Mark up the matching text. The possible values of when are “never”, “always” and “auto”. + #[structopt(long = "color", default_value = "never")] + color: Color, + + /// Only a count of selected lines is written to standard output. + #[structopt(long, short = "c")] + count: bool, + + /// Specify a pattern used during the search of the input: an input line is selected if it + /// matches any of the specified patterns. This option is most useful when multiple `-e` + /// options are used to specify multiple patterns. + #[structopt(long, short = "e")] + regexp: Vec, + + /// Interpret pattern as a set of fixed strings + #[structopt(long, short = "F")] + fixed_strings: bool, + + /// Read one or more newline separated patterns from file. Empty pattern lines match every + /// input line. Newlines are not considered part of a pattern. If file is empty, nothing + /// is matched. + #[structopt(long, short = "f")] + file: Option, + + /// Selected lines are those not matching any of the specified patterns + #[structopt(short = "v")] + invert_match: bool, + + /// Assume all unrecognized inputs are GZIP compressed. + #[structopt(short = "Z", long)] + decompress: bool, + + /// Treat the input files as paired. The number of input files must be a multiple of two, + /// with the first file being R1, second R2, third R1, fourth R2, and so on. If the pattern + /// matches either R1 or R2, then both R1 and R2 will be output (interleaved). If the input + /// is standard input, then treat the input as interlaved paired end reads. + #[structopt(long)] + paired: bool, + + /// Search the reverse complement for matches. + #[structopt(long)] + reverse_complement: bool, + + /// Write progress information + #[structopt(long)] + progress: bool, + + /// The first argument is the pattern to match, with the remaining arguments containing the + /// files to match. If `-e` is given, then all the arguments are files to match. + /// Use standard input if either no files are given or `-` is given. + /// + /// Input files must be gzip compressed unless `--plain` is given + args: Vec, } -struct ThreadWriter { - handle: JoinHandle, - tx: Option>>, +fn read_patterns(file: &PathBuf) -> Result> { + let f = File::open(&file).expect("error in opening file"); + let r = BufReader::new(f); + let mut v = Vec::new(); + for result in r.lines() { + v.push(result?); + } + Ok(v) } -impl ThreadWriter { - fn new(mut writer: ZWriterWrapper) -> Self { - // TODO: try making this unbounded - let (tx, rx): (Sender>, Receiver>) = - bounded(WRITER_CHANNEL_SIZE); - let handle = std::thread::spawn(move || { - let mut writer_stats = WriterStats::new(); - while let Ok(reads) = rx.recv() { - for read in reads { - writer_stats.reads_written += 1; - fastq::write_to(&mut writer.0, &read.head, &read.seq, &read.qual) - .expect("failed writing read"); - } - } - writer.0.finish().expect("Error flushing writer"); - writer_stats - }); - Self { - handle, - tx: Some(tx), +fn main() -> ExitCode { + // Set the exit code: + // - exit code 0 if there were matches + // - exit code 1 if there were no matches + // - exit code 2 if fqgrep returned an error + // - exit code 101 if fqgrep panicked + let outer = std::panic::catch_unwind(fqgrep); + match outer { + Err(_) => { + eprintln!("Error: fqgrep panicked. Please report this as a bug!"); + ExitCode::from(101) } + Ok(inner) => match inner { + Ok(0) => ExitCode::from(1), + Ok(_) => ExitCode::SUCCESS, + Err(e) => { + eprintln!("Error: {}", e); + ExitCode::from(2) + } + }, } } -struct SampleWriter { - r1: ThreadWriter, // + Send - r2: ThreadWriter, // + Send - lock: Mutex<()>, -} -impl SampleWriter { - fn new>( - sample: &Sample, - dir: P, - compression_threads: usize, - illumina: bool, - ) -> Result { - let (r1, r2) = sample.filenames(dir, illumina); - - let r1_writer = ZBuilder::::new() - .num_threads(compression_threads) - .compression_level(Compression::new(2)) - .from_writer(BufWriter::with_capacity(BUFSIZE, File::create(r1)?)); - let r1 = ThreadWriter::new(ZWriterWrapper(r1_writer)); - - let r2_writer = ZBuilder::::new() - .num_threads(compression_threads) - .compression_level(Compression::new(2)) - .from_writer(BufWriter::with_capacity(BUFSIZE, File::create(r2)?)); - let r2 = ThreadWriter::new(ZWriterWrapper(r2_writer)); - let lock = Mutex::new(()); +#[allow(clippy::too_many_lines)] +fn fqgrep() -> Result { + let mut opts = setup(); - Ok(Self { r1, r2, lock }) + // Add patterns from a file if given + if let Some(file) = &opts.file { + for pattern in read_patterns(file)? { + opts.regexp.push(pattern); + } } -} -struct ZWriterWrapper(Box); - -// TODO: make ZWriter's return type be Send -unsafe impl Send for ZWriterWrapper {} -unsafe impl Sync for ZWriterWrapper {} + // Inspect the positional arguments to extract a fixed pattern + let (pattern, mut files): (Option, Vec) = { + let (pattern, file_strings): (Option, Vec) = if opts.regexp.is_empty() { + // No patterns given by -e, so assume the first positional argument is the pattern and + // the rest are files + ensure!( + !opts.args.is_empty(), + "Pattern must be given with -e or as the first positional argument " + ); + let files = opts.args.iter().skip(1).cloned().collect(); + (Some(opts.args[0].clone()), files) + } else { + // Patterns given by -e, so assume all positional arguments are files + (None, opts.args.clone()) + }; -struct ThreadReader { - handle: JoinHandle<()>, - rx: Receiver>, -} + // Convert file strings into paths + let files = file_strings + .iter() + .map(|s| { + PathBuf::from_str(s) + .with_context(|| format!("Cannot create a path from: {}", s)) + .unwrap() + }) + .collect(); + (pattern, files) + }; -impl ThreadReader { - fn new(file: PathBuf) -> Self { - let (tx, rx) = bounded(READER_CHANNEL_SIZE); - let handle = std::thread::spawn(move || { - let reader = fastq::Reader::with_capacity( - MultiGzDecoder::new(BufReader::with_capacity( - BUFSIZE, - File::open(&file).expect("error in opening file"), - )), - BUFSIZE, - ) - .into_records() - .chunks(CHUNKSIZE * num_cpus::get()); - let chunks = reader.into_iter(); + // Ensure that if multiple files are given, its a multiple of two. + if opts.paired { + ensure!( + files.len() <= 1 || files.len() % 2 == 0, + "Input files must be a multiple of two, or either a single file or standard input (assume interleaved) with --paired" + ); + } - for chunk in chunks { - // let now = std::time::Instant::now(); - tx.send(chunk.map(|r| r.expect("Error reading")).collect()) - .expect("Error sending"); + // Validate the fixed string pattern, if fixed-strings are specified + if opts.fixed_strings { + if let Some(pattern) = &pattern { + validate_fixed_pattern(pattern)?; + } else if !opts.regexp.is_empty() { + for pattern in &opts.regexp { + validate_fixed_pattern(pattern)?; } - }); - - Self { handle, rx } + } else { + bail!("A pattern must be given as a positional argument or with -e/--regexp") + } } -} - -#[derive(Debug, PartialEq, PartialOrd)] -enum Matches { - Ref, - Alt, - Both, - None, -} - -/// A small utility to "grep" a pair of gzipped FASTQ files, outputting the read pair if the sequence -/// matches either the given ref sequence, the given alt sequence, or both. -#[derive(StructOpt, Debug)] -#[structopt(name = "fqgrep", global_setting(ColoredHelp), version = built_info::VERSION.as_str())] -struct Opts { - /// Path to the input R1 gzipped FASTQ - #[structopt(long, short = "1", display_order = 1)] - r1_fastq: PathBuf, - - /// Path to the input R2 gzipped FASTQ - #[structopt(long, short = "2", display_order = 2)] - r2_fastq: PathBuf, - /// The output directory to write to. - /// - /// `fqgrep` will write a pair of fastqs for the alt matched reads to: {output_dir}/ALT_{sample_name}R{1,2}.fastq.gz - /// and the fastqs for the ref matched reads to {output_dir}/REF_{sample_name}R{1,2}.fastq.gz - #[structopt(long, short = "o", display_order = 3)] - output_dir: PathBuf, - - /// The "ref" search sequence to grep for, for R2 reads the search sequence is reverse complemented. - #[structopt(long, short = "r", display_order = 4)] - ref_search_sequence: String, - - /// The "alt" search sequence to grep for, for R2 reads the search sequence is reverse complemented. - #[structopt(long, short = "a", display_order = 5)] - alt_search_sequence: String, - - /// The number of threads to use for matching reads against pattern - /// - /// Keep in mind that there is are two reader threads and two writer threads created by default. - #[structopt(long, short = "t", default_value = NUM_CPU.as_str(), display_order = 6)] - threads_for_matching: usize, - - /// Number of threads to use for compression. - /// - /// Zero is probably the correct answer if the searched sequence is relatively rare. - /// If it's common and many reads will be written, it may be worth decreasing the number of - /// `treads_for_matching` by a small number and making this number > 2 - #[structopt(long, short = "c", default_value = "0", display_order = 7)] - compression_threads: usize, - - /// True to name output FASTQs based on Illumin's FASTQ file name convetions. - /// - /// If true, will use the file name pattern "_S1_L001_R<1 or 2>_001.fastq.gz". - /// Otherwise, will use ".R<1 or 2>.fastq.gz" - #[structopt(long, short = "I", display_order = 8)] - illumina: bool, -} + // Set up a progress logger if desired + let progress_logger = if opts.progress { + Some( + ProgLogBuilder::new() + .name("fqgrep-progress") + .noun("reads") + .verb("Searched") + .unit(50_000_000) + .count_formatter(CountFormatterKind::Comma) + .build(), + ) + } else { + None + }; -#[derive(Debug, Hash, PartialEq, Eq, PartialOrd, Ord)] -struct Barcode<'a>(&'a [u8], &'a [u8]); + // Build the common pattern matching options + let match_opts = MatcherOpts { + invert_match: opts.invert_match, + reverse_complement: opts.reverse_complement, + color: opts.color == Color::Always || (opts.color == Color::Auto && stdout_isatty()), + }; -#[allow(clippy::too_many_lines)] -fn main() -> Result<()> { - let opts = setup(); - let progress_logger = ProgLogBuilder::new() - .name("fqgrep-progress") - .noun("reads") - .verb("Searched") - .unit(50_000_000) - .count_formatter(CountFormatterKind::Comma) - .build(); - - let ref_sample = Sample::from_r1_path(&opts.r1_fastq, REF_PREFIX)?; - let alt_sample = Sample::from_r1_path(&opts.r1_fastq, ALT_PREFIX)?; - let both_sample = Sample::from_r1_path(&opts.r1_fastq, BOTH_PREFIX)?; - - // Create the output directory - fs::create_dir_all(&opts.output_dir)?; - - let mut ref_writer = SampleWriter::new( - &ref_sample, - &opts.output_dir, - opts.compression_threads, - opts.illumina, - )?; - let mut alt_writer = SampleWriter::new( - &alt_sample, - &opts.output_dir, - opts.compression_threads, - opts.illumina, - )?; - let mut both_writer = SampleWriter::new( - &both_sample, - &opts.output_dir, - opts.compression_threads, - opts.illumina, - )?; + // The matcher used in the primary search + let matcher: Box = + MatcherFactory::new_matcher(&pattern, opts.fixed_strings, &opts.regexp, match_opts); + // The thread pool from which threads are spanwed. let pool = rayon::ThreadPoolBuilder::new() - .num_threads(opts.threads_for_matching) + .num_threads(opts.threads) .build() .unwrap(); - // TODO: make sure errors aren't being eaten - info!("Creating reader threads"); - let r1_reader = ThreadReader::new(opts.r1_fastq.clone()); - let r2_reader = ThreadReader::new(opts.r2_fastq.clone()); + // Sender and receiver channels for the final count of matching records + let (count_tx, count_rx): (Sender, Receiver) = bounded(1); - let ref_forward_seq = opts.ref_search_sequence.as_bytes(); - let ref_revcomp_seq = revcomp(ref_forward_seq); - let alt_forward_seq = opts.alt_search_sequence.as_bytes(); - let alt_revcomp_seq = revcomp(alt_forward_seq); + // The writer of final counts or matching records + let writer = FastqWriter::new(count_tx, opts.count, opts.paired); - info!("Processing reads"); + // The main loop pool.install(|| { - for (r1_super, r2_super) in izip!(r1_reader.rx.iter(), r2_reader.rx.iter()) { - (r1_super, r2_super) - .into_par_iter() - .map(|(r1, r2)| { - let ref_match = r1.seq.find(ref_forward_seq).is_some() - || r1.seq.find(&ref_revcomp_seq).is_some() - || r2.seq.find(ref_forward_seq).is_some() - || r2.seq.find(&ref_revcomp_seq).is_some(); - let alt_match = r1.seq.find(alt_forward_seq).is_some() - || r1.seq.find(&alt_revcomp_seq).is_some() - || r2.seq.find(alt_forward_seq).is_some() - || r2.seq.find(&alt_revcomp_seq).is_some(); - let m = match (ref_match, alt_match) { - (true, true) => Matches::Both, - (true, false) => Matches::Ref, - (false, true) => Matches::Alt, - (false, false) => Matches::None, - }; - progress_logger.record(); - (m, (r1, r2)) - }) - .filter(|(m, _)| *m != Matches::None) - .fold( - || ((vec![], vec![]), (vec![], vec![]), (vec![], vec![])), - |(mut ref_reads, mut alt_reads, mut both_reads): MatchedReads, - (match_type, (r1, r2)): (Matches, (OwnedRecord, OwnedRecord))| { - match match_type { - Matches::Ref => { - ref_reads.0.push(r1); - ref_reads.1.push(r2); - } - Matches::Alt => { - alt_reads.0.push(r1); - alt_reads.1.push(r2); + // If no files, use "-" to signify standard input. + if files.is_empty() { + // read from standard input + files.push(PathBuf::from_str("-").unwrap()); + } + if opts.paired { + // Either an interleaved paired end FASTQ, or pairs of FASTQs + if files.len() == 1 { + // Interleaved paired end FASTQ + // The channel FASTQ record chunks are received after being read in + let rx = spawn_reader(files[0].clone(), opts.decompress); + for reads in izip!(rx.iter()) { + let paired_reads = reads + .into_iter() + .tuples::<(OwnedRecord, OwnedRecord)>() + .collect_vec(); + process_paired_reads(paired_reads, &matcher, &writer, &progress_logger); + } + } else { + // Pairs of FASTQ files + for file_pairs in files.chunks_exact(2) { + // The channels for R1 and R2 with FASTQ record chunks that are received after being read in + let rx1 = spawn_reader(file_pairs[0].clone(), opts.decompress); + let rx2 = spawn_reader(file_pairs[1].clone(), opts.decompress); + for (reads1, reads2) in izip!(rx1.iter(), rx2.iter()) { + let paired_reads = reads1.into_iter().zip(reads2.into_iter()).collect_vec(); + process_paired_reads(paired_reads, &matcher, &writer, &progress_logger); + } + } + } + } else { + // Process one FASTQ at a time + for file in files { + // The channel FASTQ record chunks are received after being read in + let rx = spawn_reader(file.clone(), opts.decompress); + for reads in rx.iter() { + // Get the matched reads + let matched_reads: Vec = reads + .into_par_iter() + .map(|mut read| -> Option { + if let Some(progress) = &progress_logger { + progress.record(); } - Matches::Both => { - both_reads.0.push(r1); - both_reads.1.push(r2); + if matcher.read_match(&mut read) { + Some(read) + } else { + None } - _ => unreachable!() - } - (ref_reads, alt_reads, both_reads) - }, - ) - .for_each( - |(ref_reads, alt_reads, both_reads): MatchedReads| { - // Acquire lock to ensure R1 and R2 are enqueued at the same time - { - let _lock = ref_writer.lock.lock(); - ref_writer.r1.tx.as_ref().unwrap().send(ref_reads.0).expect("Failed to send R1s"); - ref_writer.r2.tx.as_ref().unwrap().send(ref_reads.1).expect("Failed to send R1s"); - } - { - let _lock = alt_writer.lock.lock(); - alt_writer.r1.tx.as_ref().unwrap().send(alt_reads.0).expect("Failed to send R1s"); - alt_writer.r2.tx.as_ref().unwrap().send(alt_reads.1).expect("Failed to send R1s"); - } - { - let _lock = both_writer.lock.lock(); - both_writer.r1.tx.as_ref().unwrap().send(both_reads.0).expect("Failed to send R1s"); - both_writer.r2.tx.as_ref().unwrap().send(both_reads.1).expect("Failed to send R1s"); - } - }, - ); + }) + .flatten() + .collect(); + + let _lock = writer.lock.lock(); + writer.tx.send(matched_reads).expect("Failed to send read"); + } + } } }); - info!("Joining reader threads"); - r1_reader.handle.join().expect("Failed to join r1"); - r2_reader.handle.join().expect("Failed to join r2"); - - // Flush and close writers - info!("Joining writer threads and collecting stats"); - while !ref_writer.r1.tx.as_ref().unwrap().is_empty() {} - while !ref_writer.r2.tx.as_ref().unwrap().is_empty() {} - ref_writer.r1.tx.take(); - ref_writer.r2.tx.take(); - let ref_r1_stats = ref_writer - .r1 - .handle - .join() - .expect("Error joining r1 handle"); - let _ref_r2_stats = ref_writer - .r2 - .handle - .join() - .expect("Error joining r2 handle"); - while !alt_writer.r1.tx.as_ref().unwrap().is_empty() {} - while !alt_writer.r2.tx.as_ref().unwrap().is_empty() {} - alt_writer.r1.tx.take(); - alt_writer.r2.tx.take(); - let alt_r1_stats = alt_writer - .r1 - .handle - .join() - .expect("Error joining r1 handle"); - let _alt_r2_stats = alt_writer - .r2 - .handle - .join() - .expect("Error joining r2 handle"); - while !both_writer.r1.tx.as_ref().unwrap().is_empty() {} - while !both_writer.r2.tx.as_ref().unwrap().is_empty() {} - both_writer.r1.tx.take(); - both_writer.r2.tx.take(); - let both_r1_stats = both_writer - .r1 - .handle - .join() - .expect("Error joining r1 handle"); - let _both_r2_stats = both_writer - .r2 - .handle - .join() - .expect("Error joining r2 handle"); - drop(progress_logger); - info!( - "{} reads matched input ref sequence", - ref_r1_stats.reads_written - ); - info!( - "{} reads matched input alt sequence", - alt_r1_stats.reads_written - ); - info!( - "{} reads matched both input ref and alt sequences", - both_r1_stats.reads_written - ); - - info!("Writing stats"); - let fq_grep_metrics = FqGrepMetrics::new( - ref_r1_stats.reads_written, - alt_r1_stats.reads_written, - both_r1_stats.reads_written, - ); - - let writer = BufWriter::new(File::create(opts.output_dir.join("fqgrep_stats.tsv"))?); - let mut writer = csv::WriterBuilder::new() - .delimiter(b'\t') - .has_headers(true) - .from_writer(writer); - writer.serialize(fq_grep_metrics)?; - writer.flush()?; - - Ok(()) + drop(writer); // so count_tx.send will execute + // Get the final count of records matched + match count_rx.recv() { + Ok(count) => Ok(count), + Err(error) => Err(error).with_context(|| "failed receive final match counts"), + } +} + +/// Process a chunk of paired end records in parallel. +#[allow(clippy::borrowed_box)] // FIXME: remove me later and solve +fn process_paired_reads( + reads: Vec<(OwnedRecord, OwnedRecord)>, + matcher: &Box, + writer: &FastqWriter, + progress_logger: &Option, +) { + reads + .into_par_iter() + .map(|(mut read1, mut read2)| { + if let Some(progress) = progress_logger { + progress.record(); + progress.record(); + } + assert!( + read1.head() == read2.head(), + "Mismatching read pair! R1: {} R2: {}", + std::str::from_utf8(read1.head()).unwrap(), + std::str::from_utf8(read2.head()).unwrap() + ); + // NB: if the output is to be colored, always call read_match on read2, regardless of + // whether or not read1 had a match, so that read2 is always colored. If the output + // isn't to be colored, only search for a match in read2 if read1 does not have a match + let match1 = matcher.read_match(&mut read1); + let match2 = (!matcher.opts().color && match1) || matcher.read_match(&mut read2); + if match1 || match2 { + Some((read1, read2)) + } else { + None + } + }) + .flatten() + .fold(std::vec::Vec::new, |mut matched_reads, (r1, r2)| { + matched_reads.push(r1); + matched_reads.push(r2); + matched_reads + }) + .for_each(|matched_read| { + let _lock = writer.lock.lock(); + writer.tx.send(matched_read).expect("Failed to send read"); + }); } /// Parse args and set up logging / tracing