From 8cf0c7f8bbb73c3d839bd011233768f978f54f02 Mon Sep 17 00:00:00 2001 From: Nils Homer Date: Wed, 10 Aug 2022 02:07:13 -0700 Subject: [PATCH 01/22] wip: make it grep-like --- Cargo.toml | 12 +- src/lib/matcher.rs | 145 ++++++++++ src/lib/mod.rs | 64 +++++ src/main.rs | 694 +++++++++++++++++---------------------------- 4 files changed, 473 insertions(+), 442 deletions(-) create mode 100644 src/lib/matcher.rs create mode 100644 src/lib/mod.rs diff --git a/Cargo.toml b/Cargo.toml index 6c141c1..10e9737 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,14 @@ readme = "README.md" categories = ["science"] keywords = ["bioinformatics", "fastq"] +[lib] +name = "fqgrep_lib" +path = "src/lib/mod.rs" + +[[bin]] +name = "fqgrep" +path = "src/main.rs" + [profile.release] lto = true @@ -24,7 +32,7 @@ env_logger = "0.9.0" flate2 = {version = "1.0.22", default-features = false, features = ["zlib-ng-compat"]} flume = "0.10.9" gzp = "0.9.1" -itertools = "0.10.1" +itertools = "0.10.3" lazy_static = "1.4.0" log = "0.4.14" mimalloc = {version = "0.1.26", default-features = false} @@ -32,7 +40,7 @@ num_cpus = "1.13.0" parking_lot = "0.11.2" proglog = {version = "0.3.0", features = ["pretty_counts"]} rayon = "1.5.1" -regex = "1.5.4" +regex = "1.6.0" seq_io = "0.3.1" serde = {version = "1.0.130", features = ["derive"]} structopt = "0.3.23" diff --git a/src/lib/matcher.rs b/src/lib/matcher.rs new file mode 100644 index 0000000..d79a062 --- /dev/null +++ b/src/lib/matcher.rs @@ -0,0 +1,145 @@ +use crate::reverse_complement; +use crate::DNA_BASES; +use anyhow::{bail, Context, Result}; +use bstr::ByteSlice; +use regex::bytes::{Regex, RegexBuilder, RegexSet, RegexSetBuilder}; +use seq_io::fastq::{OwnedRecord, Record}; + +#[derive(Copy, Clone, Debug)] +pub struct MatcherOpts { + pub invert_match: bool, + pub reverse_complement: bool, +} + +pub trait Matcher { + fn opts(&self) -> MatcherOpts; + + fn bases_match(&self, bases: &[u8]) -> bool; + + fn read_match(&self, read: &OwnedRecord) -> bool { + if self.opts().invert_match { + self.bases_match(read.seq()) + && (self.opts().reverse_complement + && self.bases_match(&reverse_complement(read.seq()))) + } else { + self.bases_match(read.seq()) + || (self.opts().reverse_complement + && self.bases_match(&reverse_complement(read.seq()))) + } + } +} + +pub struct FixedStringMatcher { + pattern: Vec, + opts: MatcherOpts, +} + +impl Matcher for FixedStringMatcher { + fn bases_match(&self, bases: &[u8]) -> bool { + bases.find(&self.pattern).is_some() != self.opts.invert_match + } + fn opts(&self) -> MatcherOpts { + self.opts + } +} + +impl FixedStringMatcher { + pub fn new(pattern: &str, opts: MatcherOpts) -> Self { + let pattern = pattern.as_bytes().to_vec(); + Self { pattern, opts } + } + + pub fn validate(pattern: &str) -> Result<()> { + for (index, base) in pattern.chars().enumerate() { + if !DNA_BASES.contains(&(base as u8)) { + bail!( + "Fixed pattern must contain only DNA bases: {} .. [{}] .. {}", + &pattern[0..index], + &pattern[index..=index], + &pattern[index + 1..], + ) + } + } + Ok(()) + } +} + +pub struct FixedStringSetMatcher { + patterns: Vec>, + opts: MatcherOpts, +} + +impl Matcher for FixedStringSetMatcher { + fn bases_match(&self, bases: &[u8]) -> bool { + self.patterns + .iter() + .any(|pattern| bases.find(&pattern).is_some()) + != self.opts.invert_match + } + fn opts(&self) -> MatcherOpts { + self.opts + } +} + +impl FixedStringSetMatcher { + pub fn new(patterns: I, opts: MatcherOpts) -> Self + where + S: AsRef, + I: IntoIterator, + { + let patterns: Vec> = patterns + .into_iter() + .map(|pattern| pattern.as_ref().to_owned().as_bytes().to_vec()) + .collect(); + Self { patterns, opts } + } +} + +pub struct RegexMatcher { + regex: Regex, + opts: MatcherOpts, +} + +impl RegexMatcher { + pub fn new(pattern: &str, opts: MatcherOpts) -> Self { + let regex = RegexBuilder::new(pattern) + .build() + .context(format!("Invalid regular expression: {}", pattern)) + .unwrap(); + Self { regex, opts } + } +} + +impl Matcher for RegexMatcher { + fn bases_match(&self, bases: &[u8]) -> bool { + self.regex.is_match(bases) != self.opts.invert_match + } + fn opts(&self) -> MatcherOpts { + self.opts + } +} + +pub struct RegexSetMatcher { + regexes: RegexSet, + opts: MatcherOpts, +} + +impl RegexSetMatcher { + pub fn new(patterns: I, opts: MatcherOpts) -> Self + where + S: AsRef, + I: IntoIterator, + { + let regexes = RegexSetBuilder::new(patterns).build().unwrap(); + Self { regexes, opts } + } +} + +impl Matcher for RegexSetMatcher { + fn bases_match(&self, bases: &[u8]) -> bool { + self.regexes.is_match(bases) != self.opts.invert_match + } + fn opts(&self) -> MatcherOpts { + self.opts + } +} diff --git a/src/lib/mod.rs b/src/lib/mod.rs new file mode 100644 index 0000000..84899de --- /dev/null +++ b/src/lib/mod.rs @@ -0,0 +1,64 @@ +#![deny(unsafe_code)] +#![allow( + clippy::must_use_candidate, + clippy::missing_panics_doc, + clippy::missing_errors_doc, + clippy::module_name_repetitions +)] +pub mod matcher; +use lazy_static::lazy_static; +use std::borrow::Borrow; + +lazy_static! { + pub static ref DNA_BASES: [u8; 5] = { + let mut bases = [0; 5]; + for (i, base) in b"ACGTN".iter().enumerate() { + bases[i] = *base; + } + bases + }; + + pub static ref IUPAC_BASES: [u8;15] = { + let mut bases = [0; 15]; + for (i, base) in b"AGCTYRWSKMDVHBN".iter().enumerate() { + bases[i] = *base; + } + bases + }; + + pub static ref IUPAC_BASES_COMPLEMENT: [u8;15] = { + let mut bases = [0; 15]; + for (i, base) in b"TCGARYWSMKHBDVN".iter().enumerate() { + bases[i] = *base; + } + bases + }; + + pub static ref COMPLEMENT: [u8; 256] = { + let mut comp = [0; 256]; + for (v, a) in comp.iter_mut().enumerate() { + *a = v as u8; + } + for (&a, &b) in IUPAC_BASES.iter().zip(IUPAC_BASES_COMPLEMENT.iter()) { + comp[a as usize] = b; + comp[a as usize + 32] = b + 32; // lowercase variants + } + comp + }; +} + +fn complement(a: u8) -> u8 { + COMPLEMENT[a as usize] +} + +fn reverse_complement(text: T) -> Vec +where + C: Borrow, + T: IntoIterator, + T::IntoIter: DoubleEndedIterator, +{ + text.into_iter() + .rev() + .map(|a| complement(*a.borrow())) + .collect() +} diff --git a/src/main.rs b/src/main.rs index 810bc8f..df2b766 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,30 +2,28 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; use std::{ - borrow::Borrow, - fs, fs::File, - io::{BufReader, BufWriter}, - path::{Path, PathBuf}, + io::{BufRead, BufReader, BufWriter, Write}, + path::PathBuf, + str::FromStr, thread::JoinHandle, }; -use anyhow::{Error, Result}; -use bstr::ByteSlice; +use anyhow::{bail, ensure, Context, Result}; use env_logger::Env; use flate2::bufread::MultiGzDecoder; use flume::{bounded, Receiver, Sender}; -use gzp::{deflate::Bgzf, Compression, ZBuilder, ZWriter, BUFSIZE}; +use fqgrep_lib::matcher::{ + FixedStringMatcher, FixedStringSetMatcher, Matcher, MatcherOpts, RegexMatcher, RegexSetMatcher, +}; +use gzp::BUFSIZE; use itertools::{self, izip, Itertools}; use lazy_static::lazy_static; -use log::info; use parking_lot::Mutex; -use proglog::{CountFormatterKind, ProgLogBuilder}; -use rayon::prelude::*; -use regex::{Regex, RegexBuilder}; -use seq_io::fastq::{self, OwnedRecord}; -use serde::{Deserialize, Serialize}; -use structopt::{clap::AppSettings::ColoredHelp, StructOpt}; +use proglog::{CountFormatterKind, ProgLog, ProgLogBuilder}; +use rayon::{prelude::*, Scope}; +use seq_io::fastq::{self, OwnedRecord, Record}; +use structopt::{clap::AppSettings, StructOpt}; /// The number of reads in a chunk const CHUNKSIZE: usize = 5000; // * num_cpus::get(); @@ -33,33 +31,11 @@ const CHUNKSIZE: usize = 5000; // * num_cpus::get(); const READER_CHANNEL_SIZE: usize = 100; const WRITER_CHANNEL_SIZE: usize = 2000; -const REF_PREFIX: &str = "REF"; -const ALT_PREFIX: &str = "ALT"; -const BOTH_PREFIX: &str = "BOTH"; - -/// Helper type to represent the accumlated reads and what match type they are. -/// -/// `p` are ref reads -/// `1` are alt reads -/// `2` are reads that matched both -type MatchedReads = ( - (Vec, Vec), - (Vec, Vec), - (Vec, Vec), -); - lazy_static! { /// Return the number of cpus as a String pub static ref NUM_CPU: String = num_cpus::get().to_string(); } -lazy_static! { - static ref R1_REGEX: Regex = RegexBuilder::new(r"r1") - .case_insensitive(true) - .build() - .expect("Failed to compile R1 regex"); -} - pub mod built_info { use lazy_static::lazy_static; include!(concat!(env!("OUT_DIR"), "/built.rs")); @@ -88,179 +64,32 @@ pub mod built_info { } } -lazy_static! { - static ref COMPLEMENT: [u8; 256] = { - let mut comp = [0; 256]; - for (v, a) in comp.iter_mut().enumerate() { - *a = v as u8; - } - for (&a, &b) in b"AGCTYRWSKMDVHBN".iter().zip(b"TCGARYWSMKHBDVN".iter()) { - comp[a as usize] = b; - comp[a as usize + 32] = b + 32; // lowercase variants - } - comp - }; -} - -fn complement(a: u8) -> u8 { - COMPLEMENT[a as usize] -} - -fn revcomp(text: T) -> Vec -where - C: Borrow, - T: IntoIterator, - T::IntoIter: DoubleEndedIterator, -{ - text.into_iter() - .rev() - .map(|a| complement(*a.borrow())) - .collect() -} - -#[derive(Debug, Serialize, Deserialize)] -struct FqGrepMetrics { - total_alt_reads_found: usize, - total_ref_reads_found: usize, - total_both_reads_found: usize, - total_reads_found: usize, -} - -impl FqGrepMetrics { - fn new( - total_ref_reads_found: usize, - total_alt_reads_found: usize, - total_both_reads_found: usize, - ) -> Self { - Self { - total_ref_reads_found, - total_alt_reads_found, - total_both_reads_found, - total_reads_found: total_alt_reads_found - + total_both_reads_found - + total_ref_reads_found, - } - } -} - -#[derive(Debug, Clone)] -struct Sample { - name: String, -} - -impl Sample { - fn new(name: String) -> Self { - Self { name } - } - - fn from_r1_path>(r1_path: P, prefix: &str) -> Result { - let filename = r1_path - .as_ref() - .file_name() - .expect("Unable to extract file name from R1") - .to_string_lossy(); - let parts: Vec<&str> = R1_REGEX.splitn(&filename, 2).collect(); - if let Some(&part) = parts.first() { - let name: &str = &part[0..part.len() - 1]; - Ok(Self::new(format!("{}_{}", prefix, name.to_owned()))) - } else { - Err(Error::msg("Unable to extract a sample name from R1 path")) - } - } - - fn filenames>(&self, dir: P, illumina: bool) -> (PathBuf, PathBuf) { - if illumina { - ( - dir.as_ref() - .join(format!("{}_S1_L001_R1_001.fastq.gz", self.name)), - dir.as_ref() - .join(format!("{}_S1_L001_R2_001.fastq.gz", self.name)), - ) - } else { - ( - dir.as_ref().join(format!("{}.R1.fastq.gz", self.name)), - dir.as_ref().join(format!("{}.R2.fastq.gz", self.name)), - ) - } - } -} - -struct WriterStats { - reads_written: usize, -} - -impl WriterStats { - fn new() -> Self { - Self { reads_written: 0 } - } -} - -struct ThreadWriter { - handle: JoinHandle, - tx: Option>>, +struct FastqWriter { + tx: Sender>, + lock: Mutex<()>, } -impl ThreadWriter { - fn new(mut writer: ZWriterWrapper) -> Self { +impl FastqWriter { + fn new(pool: &Scope) -> Self { + let lock = Mutex::new(()); + let mut writer = BufWriter::with_capacity(BUFSIZE, std::io::stdout()); // TODO: try making this unbounded let (tx, rx): (Sender>, Receiver>) = bounded(WRITER_CHANNEL_SIZE); - let handle = std::thread::spawn(move || { - let mut writer_stats = WriterStats::new(); + pool.spawn(move |_| { while let Ok(reads) = rx.recv() { for read in reads { - writer_stats.reads_written += 1; - fastq::write_to(&mut writer.0, &read.head, &read.seq, &read.qual) + fastq::write_to(&mut writer, &read.head, &read.seq, &read.qual) .expect("failed writing read"); } } - writer.0.finish().expect("Error flushing writer"); - writer_stats + writer.flush().expect("Error flushing writer"); }); - Self { - handle, - tx: Some(tx), - } - } -} -struct SampleWriter { - r1: ThreadWriter, // + Send - r2: ThreadWriter, // + Send - lock: Mutex<()>, -} -impl SampleWriter { - fn new>( - sample: &Sample, - dir: P, - compression_threads: usize, - illumina: bool, - ) -> Result { - let (r1, r2) = sample.filenames(dir, illumina); - - let r1_writer = ZBuilder::::new() - .num_threads(compression_threads) - .compression_level(Compression::new(2)) - .from_writer(BufWriter::with_capacity(BUFSIZE, File::create(r1)?)); - let r1 = ThreadWriter::new(ZWriterWrapper(r1_writer)); - - let r2_writer = ZBuilder::::new() - .num_threads(compression_threads) - .compression_level(Compression::new(2)) - .from_writer(BufWriter::with_capacity(BUFSIZE, File::create(r2)?)); - let r2 = ThreadWriter::new(ZWriterWrapper(r2_writer)); - let lock = Mutex::new(()); - - Ok(Self { r1, r2, lock }) + Self { tx, lock } } } -struct ZWriterWrapper(Box); - -// TODO: make ZWriter's return type be Send -unsafe impl Send for ZWriterWrapper {} -unsafe impl Sync for ZWriterWrapper {} - struct ThreadReader { handle: JoinHandle<()>, rx: Receiver>, @@ -270,92 +99,152 @@ impl ThreadReader { fn new(file: PathBuf) -> Self { let (tx, rx) = bounded(READER_CHANNEL_SIZE); let handle = std::thread::spawn(move || { - let reader = fastq::Reader::with_capacity( - MultiGzDecoder::new(BufReader::with_capacity( + // TODO: make the types work so we can remove duplicate code here and below + if file == PathBuf::from_str("-").unwrap() { + let handle = BufReader::with_capacity(BUFSIZE, std::io::stdin()); + let reader = fastq::Reader::with_capacity(MultiGzDecoder::new(handle), BUFSIZE) + .into_records() + .chunks(CHUNKSIZE * num_cpus::get()); + let chunks = reader.into_iter(); + for chunk in chunks { + // let now = std::time::Instant::now(); + tx.send(chunk.map(|r| r.expect("Error reading")).collect()) + .expect("Error sending"); + } + } else { + let handle = BufReader::with_capacity( BUFSIZE, File::open(&file).expect("error in opening file"), - )), - BUFSIZE, - ) - .into_records() - .chunks(CHUNKSIZE * num_cpus::get()); - let chunks = reader.into_iter(); - - for chunk in chunks { - // let now = std::time::Instant::now(); - tx.send(chunk.map(|r| r.expect("Error reading")).collect()) - .expect("Error sending"); - } + ); + let reader = fastq::Reader::with_capacity(MultiGzDecoder::new(handle), BUFSIZE) + .into_records() + .chunks(CHUNKSIZE * num_cpus::get()); + let chunks = reader.into_iter(); + for chunk in chunks { + // let now = std::time::Instant::now(); + tx.send(chunk.map(|r| r.expect("Error reading")).collect()) + .expect("Error sending"); + } + }; }); Self { handle, rx } } } -#[derive(Debug, PartialEq, PartialOrd)] -enum Matches { - Ref, - Alt, - Both, - None, -} - /// A small utility to "grep" a pair of gzipped FASTQ files, outputting the read pair if the sequence /// matches either the given ref sequence, the given alt sequence, or both. #[derive(StructOpt, Debug)] -#[structopt(name = "fqgrep", global_setting(ColoredHelp), version = built_info::VERSION.as_str())] +#[structopt( + name = "fqgrep", + global_setting(AppSettings::ColoredHelp), + global_setting(AppSettings::DeriveDisplayOrder), + version = built_info::VERSION.as_str()) + ] +#[allow(clippy::struct_excessive_bools)] struct Opts { - /// Path to the input R1 gzipped FASTQ - #[structopt(long, short = "1", display_order = 1)] - r1_fastq: PathBuf, + /// The number of threads to use for matching reads against pattern + #[structopt(long, short = "t", default_value = NUM_CPU.as_str())] + threads: usize, + + /// Treat the input files as paired. The number of input files must be a multiple of two, + /// with the first file being R1, second R2, third R1, fourth R2, and so on. If the pattern + /// matches either R1 or R2, then both R1 and R2 will be output (interleaved). + #[structopt(long)] + paired: bool, + + /// Specify a pattern used during the search of the input: an input line is selected if it + /// matches any of the specified patterns. This option is most useful when multiple `-e` + /// options are used to specify multiple patterns. + #[structopt(long, short = "e")] + regexp: Vec, + + // Interpret pattern as a set of fixed strings + #[structopt(long, short = "F")] + fixed_strings: bool, + + /// Read one or more newline separated patterns from file. Empty pattern lines match every + /// input line. Newlines are not considered part of a pattern. If file is empty, nothing + /// is matched. + #[structopt(long, short = "f")] + file: Option, + + /// Search the reverse complement for matches. + #[structopt(long)] + reverse_complement: bool, + + /// Selected lines are those not matching any of the specified patterns + #[structopt(short = "v")] + invert_match: bool, + + /// The first argument is the pattern to match, with the remaining arguments containing the + /// files to match. If `-e` is given, then all the arguments are files to match. + /// Use standard input if either no files are given or `-` is given. + args: Vec, +} - /// Path to the input R2 gzipped FASTQ - #[structopt(long, short = "2", display_order = 2)] - r2_fastq: PathBuf, +fn read_patterns(file: &PathBuf) -> Result> { + let f = File::open(&file).expect("error in opening file"); + let r = BufReader::new(f); + let mut v = Vec::new(); + for result in r.lines() { + v.push(result?); + } + Ok(v) +} - /// The output directory to write to. - /// - /// `fqgrep` will write a pair of fastqs for the alt matched reads to: {output_dir}/ALT_{sample_name}R{1,2}.fastq.gz - /// and the fastqs for the ref matched reads to {output_dir}/REF_{sample_name}R{1,2}.fastq.gz - #[structopt(long, short = "o", display_order = 3)] - output_dir: PathBuf, +#[allow(clippy::too_many_lines)] +fn main() -> Result<()> { + let mut opts = setup(); - /// The "ref" search sequence to grep for, for R2 reads the search sequence is reverse complemented. - #[structopt(long, short = "r", display_order = 4)] - ref_search_sequence: String, + if let Some(file) = &opts.file { + for pattern in read_patterns(file)? { + opts.regexp.push(pattern); + } + } - /// The "alt" search sequence to grep for, for R2 reads the search sequence is reverse complemented. - #[structopt(long, short = "a", display_order = 5)] - alt_search_sequence: String, + let (pattern, mut files): (Option, Vec) = { + let (pattern, file_strings): (Option, Vec) = if opts.regexp.is_empty() { + ensure!( + !opts.args.is_empty(), + "Pattern must be given with -e or as the first positional argument " + ); + let files = opts.args.iter().skip(1).cloned().collect(); + (Some(opts.args[0].clone()), files) + } else { + (None, opts.args.clone()) + }; - /// The number of threads to use for matching reads against pattern - /// - /// Keep in mind that there is are two reader threads and two writer threads created by default. - #[structopt(long, short = "t", default_value = NUM_CPU.as_str(), display_order = 6)] - threads_for_matching: usize, - - /// Number of threads to use for compression. - /// - /// Zero is probably the correct answer if the searched sequence is relatively rare. - /// If it's common and many reads will be written, it may be worth decreasing the number of - /// `treads_for_matching` by a small number and making this number > 2 - #[structopt(long, short = "c", default_value = "0", display_order = 7)] - compression_threads: usize, - - /// True to name output FASTQs based on Illumin's FASTQ file name convetions. - /// - /// If true, will use the file name pattern "_S1_L001_R<1 or 2>_001.fastq.gz". - /// Otherwise, will use ".R<1 or 2>.fastq.gz" - #[structopt(long, short = "I", display_order = 8)] - illumina: bool, -} + let files = file_strings + .iter() + .map(|s| { + PathBuf::from_str(s) + .with_context(|| format!("Cannot create a path from: {}", s)) + .unwrap() + }) + .collect(); + (pattern, files) + }; -#[derive(Debug, Hash, PartialEq, Eq, PartialOrd, Ord)] -struct Barcode<'a>(&'a [u8], &'a [u8]); + if opts.paired { + ensure!( + files.len() <= 1 || files.len() % 2 == 0, + "Input files must be a multiple of two, or either a single file or standard input (assume interleaved) with --paired" + ); + } + + if opts.fixed_strings { + if let Some(pattern) = &pattern { + FixedStringMatcher::validate(pattern)?; + } else if !opts.regexp.is_empty() { + for pattern in &opts.regexp { + FixedStringMatcher::validate(pattern)?; + } + } else { + bail!("A pattern must be given as a positional argument or with -e/--regexp") + } + } -#[allow(clippy::too_many_lines)] -fn main() -> Result<()> { - let opts = setup(); let progress_logger = ProgLogBuilder::new() .name("fqgrep-progress") .noun("reads") @@ -364,196 +253,121 @@ fn main() -> Result<()> { .count_formatter(CountFormatterKind::Comma) .build(); - let ref_sample = Sample::from_r1_path(&opts.r1_fastq, REF_PREFIX)?; - let alt_sample = Sample::from_r1_path(&opts.r1_fastq, ALT_PREFIX)?; - let both_sample = Sample::from_r1_path(&opts.r1_fastq, BOTH_PREFIX)?; - - // Create the output directory - fs::create_dir_all(&opts.output_dir)?; - - let mut ref_writer = SampleWriter::new( - &ref_sample, - &opts.output_dir, - opts.compression_threads, - opts.illumina, - )?; - let mut alt_writer = SampleWriter::new( - &alt_sample, - &opts.output_dir, - opts.compression_threads, - opts.illumina, - )?; - let mut both_writer = SampleWriter::new( - &both_sample, - &opts.output_dir, - opts.compression_threads, - opts.illumina, - )?; - let pool = rayon::ThreadPoolBuilder::new() - .num_threads(opts.threads_for_matching) + .num_threads(opts.threads) .build() .unwrap(); - // TODO: make sure errors aren't being eaten - info!("Creating reader threads"); - let r1_reader = ThreadReader::new(opts.r1_fastq.clone()); - let r2_reader = ThreadReader::new(opts.r2_fastq.clone()); - - let ref_forward_seq = opts.ref_search_sequence.as_bytes(); - let ref_revcomp_seq = revcomp(ref_forward_seq); - let alt_forward_seq = opts.alt_search_sequence.as_bytes(); - let alt_revcomp_seq = revcomp(alt_forward_seq); - - info!("Processing reads"); - pool.install(|| { - for (r1_super, r2_super) in izip!(r1_reader.rx.iter(), r2_reader.rx.iter()) { - (r1_super, r2_super) - .into_par_iter() - .map(|(r1, r2)| { - let ref_match = r1.seq.find(ref_forward_seq).is_some() - || r1.seq.find(&ref_revcomp_seq).is_some() - || r2.seq.find(ref_forward_seq).is_some() - || r2.seq.find(&ref_revcomp_seq).is_some(); - let alt_match = r1.seq.find(alt_forward_seq).is_some() - || r1.seq.find(&alt_revcomp_seq).is_some() - || r2.seq.find(alt_forward_seq).is_some() - || r2.seq.find(&alt_revcomp_seq).is_some(); - let m = match (ref_match, alt_match) { - (true, true) => Matches::Both, - (true, false) => Matches::Ref, - (false, true) => Matches::Alt, - (false, false) => Matches::None, - }; - progress_logger.record(); - (m, (r1, r2)) - }) - .filter(|(m, _)| *m != Matches::None) - .fold( - || ((vec![], vec![]), (vec![], vec![]), (vec![], vec![])), - |(mut ref_reads, mut alt_reads, mut both_reads): MatchedReads, - (match_type, (r1, r2)): (Matches, (OwnedRecord, OwnedRecord))| { - match match_type { - Matches::Ref => { - ref_reads.0.push(r1); - ref_reads.1.push(r2); - } - Matches::Alt => { - alt_reads.0.push(r1); - alt_reads.1.push(r2); - } - Matches::Both => { - both_reads.0.push(r1); - both_reads.1.push(r2); + pool.scope(|s| { + let match_opts = MatcherOpts { + invert_match: opts.invert_match, + reverse_complement: opts.reverse_complement, + }; + let matcher: Box = match (opts.fixed_strings, &pattern) { + (true, Some(pattern)) => Box::new(FixedStringMatcher::new(pattern, match_opts)), + (false, Some(pattern)) => Box::new(RegexMatcher::new(pattern, match_opts)), + (true, None) => Box::new(FixedStringSetMatcher::new(&opts.regexp, match_opts)), + (false, None) => Box::new(RegexSetMatcher::new(&opts.regexp, match_opts)), + }; + let writer = FastqWriter::new(s); + + if opts.paired { + if files.is_empty() { + // read from standad input + files.push(PathBuf::from_str("-").unwrap()); + } + if files.len() == 1 { + // interleaved paired end FASTQ + let reader = ThreadReader::new(files[0].clone()); + for reads in izip!(reader.rx.iter()) { + let paired_reads = reads + .into_iter() + .tuples::<(OwnedRecord, OwnedRecord)>() + .collect_vec(); + process_paired_reads(paired_reads, &matcher, &writer, &progress_logger); + } + } else { + // pairs of FASTQ files + for file_pairs in files.chunks_exact(2) { + let reader1 = ThreadReader::new(file_pairs[0].clone()); + let reader2 = ThreadReader::new(file_pairs[1].clone()); + for (reads1, reads2) in izip!(reader1.rx.iter(), reader2.rx.iter()) { + let paired_reads = reads1.into_iter().zip(reads2.into_iter()).collect_vec(); + process_paired_reads(paired_reads, &matcher, &writer, &progress_logger); + } + reader1.handle.join().expect("Failed to join reader"); + reader2.handle.join().expect("Failed to join reader"); + } + } + } else { + if files.is_empty() { + // read from standad input + files.push(PathBuf::from_str("-").unwrap()); + } + for file in files { + let reader = ThreadReader::new(file.clone()); + for reads in reader.rx.iter() { + // Get the amtches reads + let matched_reads: Vec = reads + .into_par_iter() + .map(|read| -> Option { + progress_logger.record(); + if matcher.read_match(&read) { + Some(read) + } else { + None } - _ => unreachable!() - } - (ref_reads, alt_reads, both_reads) - }, - ) - .for_each( - |(ref_reads, alt_reads, both_reads): MatchedReads| { - // Acquire lock to ensure R1 and R2 are enqueued at the same time - { - let _lock = ref_writer.lock.lock(); - ref_writer.r1.tx.as_ref().unwrap().send(ref_reads.0).expect("Failed to send R1s"); - ref_writer.r2.tx.as_ref().unwrap().send(ref_reads.1).expect("Failed to send R1s"); - } - { - let _lock = alt_writer.lock.lock(); - alt_writer.r1.tx.as_ref().unwrap().send(alt_reads.0).expect("Failed to send R1s"); - alt_writer.r2.tx.as_ref().unwrap().send(alt_reads.1).expect("Failed to send R1s"); - } - { - let _lock = both_writer.lock.lock(); - both_writer.r1.tx.as_ref().unwrap().send(both_reads.0).expect("Failed to send R1s"); - both_writer.r2.tx.as_ref().unwrap().send(both_reads.1).expect("Failed to send R1s"); - } - }, - ); + }) + .flatten() + .collect(); + + let _lock = writer.lock.lock(); + writer.tx.send(matched_reads).expect("Failed to send read"); + } + + reader.handle.join().expect("Failed to join reader"); + } } }); - info!("Joining reader threads"); - r1_reader.handle.join().expect("Failed to join r1"); - r2_reader.handle.join().expect("Failed to join r2"); - - // Flush and close writers - info!("Joining writer threads and collecting stats"); - while !ref_writer.r1.tx.as_ref().unwrap().is_empty() {} - while !ref_writer.r2.tx.as_ref().unwrap().is_empty() {} - ref_writer.r1.tx.take(); - ref_writer.r2.tx.take(); - let ref_r1_stats = ref_writer - .r1 - .handle - .join() - .expect("Error joining r1 handle"); - let _ref_r2_stats = ref_writer - .r2 - .handle - .join() - .expect("Error joining r2 handle"); - while !alt_writer.r1.tx.as_ref().unwrap().is_empty() {} - while !alt_writer.r2.tx.as_ref().unwrap().is_empty() {} - alt_writer.r1.tx.take(); - alt_writer.r2.tx.take(); - let alt_r1_stats = alt_writer - .r1 - .handle - .join() - .expect("Error joining r1 handle"); - let _alt_r2_stats = alt_writer - .r2 - .handle - .join() - .expect("Error joining r2 handle"); - while !both_writer.r1.tx.as_ref().unwrap().is_empty() {} - while !both_writer.r2.tx.as_ref().unwrap().is_empty() {} - both_writer.r1.tx.take(); - both_writer.r2.tx.take(); - let both_r1_stats = both_writer - .r1 - .handle - .join() - .expect("Error joining r1 handle"); - let _both_r2_stats = both_writer - .r2 - .handle - .join() - .expect("Error joining r2 handle"); - drop(progress_logger); - info!( - "{} reads matched input ref sequence", - ref_r1_stats.reads_written - ); - info!( - "{} reads matched input alt sequence", - alt_r1_stats.reads_written - ); - info!( - "{} reads matched both input ref and alt sequences", - both_r1_stats.reads_written - ); - - info!("Writing stats"); - let fq_grep_metrics = FqGrepMetrics::new( - ref_r1_stats.reads_written, - alt_r1_stats.reads_written, - both_r1_stats.reads_written, - ); - - let writer = BufWriter::new(File::create(opts.output_dir.join("fqgrep_stats.tsv"))?); - let mut writer = csv::WriterBuilder::new() - .delimiter(b'\t') - .has_headers(true) - .from_writer(writer); - writer.serialize(fq_grep_metrics)?; - writer.flush()?; - Ok(()) } +fn process_paired_reads( + reads: Vec<(OwnedRecord, OwnedRecord)>, + matcher: &Box, + writer: &FastqWriter, + progress_logger: &ProgLog, +) { + reads + .into_par_iter() + .map(|(read1, read2)| { + progress_logger.record(); + progress_logger.record(); + assert!( + read1.head() == read2.head(), + "Mismatching read pair! R1: {} R2: {}", + std::str::from_utf8(read1.head()).unwrap(), + std::str::from_utf8(read2.head()).unwrap() + ); + if matcher.read_match(&read1) && matcher.read_match(&read2) { + Some((read1, read2)) + } else { + None + } + }) + .flatten() + .fold(std::vec::Vec::new, |mut matched_reads, (r1, r2)| { + matched_reads.push(r1); + matched_reads.push(r2); + matched_reads + }) + .for_each(|matched_read| { + let _lock = writer.lock.lock(); + writer.tx.send(matched_read).expect("Failed to send read"); + }); +} + /// Parse args and set up logging / tracing fn setup() -> Opts { if std::env::var("RUST_LOG").is_err() { From fad92ff5358bf97f0efea7185357ffc5c04383a8 Mon Sep 17 00:00:00 2001 From: Nils Homer Date: Wed, 10 Aug 2022 08:50:17 -0700 Subject: [PATCH 02/22] clippy --- src/main.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main.rs b/src/main.rs index df2b766..9b66e90 100644 --- a/src/main.rs +++ b/src/main.rs @@ -333,6 +333,7 @@ fn main() -> Result<()> { Ok(()) } +#[allow(clippy::borrowed_box)] // FIXME: remove me later and solve fn process_paired_reads( reads: Vec<(OwnedRecord, OwnedRecord)>, matcher: &Box, From f1d0a4d643fdea669a377f8f9b35f120e13bc042 Mon Sep 17 00:00:00 2001 From: Nils Homer Date: Wed, 10 Aug 2022 16:29:10 -0700 Subject: [PATCH 03/22] add -c/--count --- src/main.rs | 83 +++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 58 insertions(+), 25 deletions(-) diff --git a/src/main.rs b/src/main.rs index 9b66e90..ee5f8c0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -70,22 +70,37 @@ struct FastqWriter { } impl FastqWriter { - fn new(pool: &Scope) -> Self { - let lock = Mutex::new(()); - let mut writer = BufWriter::with_capacity(BUFSIZE, std::io::stdout()); + fn new(pool: &Scope, count: bool, paired: bool) -> Self { // TODO: try making this unbounded let (tx, rx): (Sender>, Receiver>) = bounded(WRITER_CHANNEL_SIZE); - pool.spawn(move |_| { - while let Ok(reads) = rx.recv() { - for read in reads { - fastq::write_to(&mut writer, &read.head, &read.seq, &read.qual) - .expect("failed writing read"); + if count { + pool.spawn(move |_| { + let mut num_matches = 0; + while let Ok(reads) = rx.recv() { + num_matches += reads.len(); } - } - writer.flush().expect("Error flushing writer"); - }); - + if paired { + num_matches /= 2; + } + // TODO: count should be divided by two for paired end reads + std::io::stdout() + .write_all(format!("{}\n", num_matches).as_bytes()) + .unwrap(); + }); + } else { + pool.spawn(move |_| { + let mut writer = BufWriter::with_capacity(BUFSIZE, std::io::stdout()); + while let Ok(reads) = rx.recv() { + for read in reads { + fastq::write_to(&mut writer, &read.head, &read.seq, &read.qual) + .expect("failed writing read"); + } + } + writer.flush().expect("Error flushing writer"); + }); + } + let lock = Mutex::new(()); Self { tx, lock } } } @@ -153,6 +168,10 @@ struct Opts { #[structopt(long)] paired: bool, + /// Only a count of selected lines is written to standard output. + #[structopt(long, short = "c")] + count: bool, + /// Specify a pattern used during the search of the input: an input line is selected if it /// matches any of the specified patterns. This option is most useful when multiple `-e` /// options are used to specify multiple patterns. @@ -177,6 +196,10 @@ struct Opts { #[structopt(short = "v")] invert_match: bool, + /// Write progress information + #[structopt(long)] + progress: bool, + /// The first argument is the pattern to match, with the remaining arguments containing the /// files to match. If `-e` is given, then all the arguments are files to match. /// Use standard input if either no files are given or `-` is given. @@ -245,13 +268,19 @@ fn main() -> Result<()> { } } - let progress_logger = ProgLogBuilder::new() - .name("fqgrep-progress") - .noun("reads") - .verb("Searched") - .unit(50_000_000) - .count_formatter(CountFormatterKind::Comma) - .build(); + let progress_logger = if opts.progress { + Some( + ProgLogBuilder::new() + .name("fqgrep-progress") + .noun("reads") + .verb("Searched") + .unit(50_000_000) + .count_formatter(CountFormatterKind::Comma) + .build(), + ) + } else { + None + }; let pool = rayon::ThreadPoolBuilder::new() .num_threads(opts.threads) @@ -269,7 +298,7 @@ fn main() -> Result<()> { (true, None) => Box::new(FixedStringSetMatcher::new(&opts.regexp, match_opts)), (false, None) => Box::new(RegexSetMatcher::new(&opts.regexp, match_opts)), }; - let writer = FastqWriter::new(s); + let writer = FastqWriter::new(s, opts.count, opts.paired); if opts.paired { if files.is_empty() { @@ -307,11 +336,13 @@ fn main() -> Result<()> { for file in files { let reader = ThreadReader::new(file.clone()); for reads in reader.rx.iter() { - // Get the amtches reads + // Get the matched reads let matched_reads: Vec = reads .into_par_iter() .map(|read| -> Option { - progress_logger.record(); + if let Some(progress) = &progress_logger { + progress.record(); + } if matcher.read_match(&read) { Some(read) } else { @@ -338,13 +369,15 @@ fn process_paired_reads( reads: Vec<(OwnedRecord, OwnedRecord)>, matcher: &Box, writer: &FastqWriter, - progress_logger: &ProgLog, + progress_logger: &Option, ) { reads .into_par_iter() .map(|(read1, read2)| { - progress_logger.record(); - progress_logger.record(); + if let Some(progress) = progress_logger { + progress.record(); + progress.record(); + } assert!( read1.head() == read2.head(), "Mismatching read pair! R1: {} R2: {}", From 91d8159b0499e260e44380f50945cc2be31a800f Mon Sep 17 00:00:00 2001 From: Nils Homer Date: Wed, 10 Aug 2022 17:50:02 -0700 Subject: [PATCH 04/22] paired end --- src/main.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main.rs b/src/main.rs index ee5f8c0..e8d333c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -164,7 +164,8 @@ struct Opts { /// Treat the input files as paired. The number of input files must be a multiple of two, /// with the first file being R1, second R2, third R1, fourth R2, and so on. If the pattern - /// matches either R1 or R2, then both R1 and R2 will be output (interleaved). + /// matches either R1 or R2, then both R1 and R2 will be output (interleaved). If the input + /// is standard input, then treat the input as interlaved paired end reads. #[structopt(long)] paired: bool, From 635ae857aa2950de60a1a82f7efbc93c306a2715 Mon Sep 17 00:00:00 2001 From: Nils Homer Date: Wed, 10 Aug 2022 18:01:04 -0700 Subject: [PATCH 05/22] simplify --- src/main.rs | 41 +++++++++++++++-------------------------- 1 file changed, 15 insertions(+), 26 deletions(-) diff --git a/src/main.rs b/src/main.rs index e8d333c..1b64d0b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,7 +3,7 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; use std::{ fs::File, - io::{BufRead, BufReader, BufWriter, Write}, + io::{BufRead, BufReader, BufWriter, Read, Write}, path::PathBuf, str::FromStr, thread::JoinHandle, @@ -114,33 +114,22 @@ impl ThreadReader { fn new(file: PathBuf) -> Self { let (tx, rx) = bounded(READER_CHANNEL_SIZE); let handle = std::thread::spawn(move || { - // TODO: make the types work so we can remove duplicate code here and below - if file == PathBuf::from_str("-").unwrap() { - let handle = BufReader::with_capacity(BUFSIZE, std::io::stdin()); - let reader = fastq::Reader::with_capacity(MultiGzDecoder::new(handle), BUFSIZE) - .into_records() - .chunks(CHUNKSIZE * num_cpus::get()); - let chunks = reader.into_iter(); - for chunk in chunks { - // let now = std::time::Instant::now(); - tx.send(chunk.map(|r| r.expect("Error reading")).collect()) - .expect("Error sending"); - } + let handle = if file == PathBuf::from_str("-").unwrap() { + Box::new(std::io::stdin()) as Box } else { - let handle = BufReader::with_capacity( - BUFSIZE, - File::open(&file).expect("error in opening file"), - ); - let reader = fastq::Reader::with_capacity(MultiGzDecoder::new(handle), BUFSIZE) - .into_records() - .chunks(CHUNKSIZE * num_cpus::get()); - let chunks = reader.into_iter(); - for chunk in chunks { - // let now = std::time::Instant::now(); - tx.send(chunk.map(|r| r.expect("Error reading")).collect()) - .expect("Error sending"); - } + let handle = File::open(&file).expect("error in opening file"); + Box::new(handle) as Box }; + let buf_reader = BufReader::with_capacity(BUFSIZE, handle); + let gz_reader = fastq::Reader::with_capacity(MultiGzDecoder::new(buf_reader), BUFSIZE) + .into_records() + .chunks(CHUNKSIZE * num_cpus::get()); + let chunks = gz_reader.into_iter(); + for chunk in chunks { + // let now = std::time::Instant::now(); + tx.send(chunk.map(|r| r.expect("Error reading")).collect()) + .expect("Error sending"); + } }); Self { handle, rx } From 3241664115bc1cb342c11344a914c2d496cc8211 Mon Sep 17 00:00:00 2001 From: Nils Homer Date: Wed, 10 Aug 2022 18:13:47 -0700 Subject: [PATCH 06/22] add the --plain option --- src/main.rs | 36 +++++++++++++++++++++++++----------- 1 file changed, 25 insertions(+), 11 deletions(-) diff --git a/src/main.rs b/src/main.rs index 1b64d0b..67f8de5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -111,22 +111,30 @@ struct ThreadReader { } impl ThreadReader { - fn new(file: PathBuf) -> Self { + fn new(file: PathBuf, plain: bool) -> Self { let (tx, rx) = bounded(READER_CHANNEL_SIZE); let handle = std::thread::spawn(move || { - let handle = if file == PathBuf::from_str("-").unwrap() { + // Open the file or standad input + let raw_handle = if file == PathBuf::from_str("-").unwrap() { Box::new(std::io::stdin()) as Box } else { let handle = File::open(&file).expect("error in opening file"); Box::new(handle) as Box }; - let buf_reader = BufReader::with_capacity(BUFSIZE, handle); - let gz_reader = fastq::Reader::with_capacity(MultiGzDecoder::new(buf_reader), BUFSIZE) + // Wrap it in a buffer + let buf_handle = BufReader::with_capacity(BUFSIZE, raw_handle); + // Maybe wrap it in a decompressor + let maybe_decoder_handle = if plain { + Box::new(buf_handle) as Box + } else { + Box::new(MultiGzDecoder::new(buf_handle)) as Box + }; + // Open a FASTQ reader, get an iterator over the records, and chunk them + let fastq_reader = fastq::Reader::with_capacity(maybe_decoder_handle, BUFSIZE) .into_records() .chunks(CHUNKSIZE * num_cpus::get()); - let chunks = gz_reader.into_iter(); - for chunk in chunks { - // let now = std::time::Instant::now(); + // Iterate over the chunks + for chunk in &fastq_reader { tx.send(chunk.map(|r| r.expect("Error reading")).collect()) .expect("Error sending"); } @@ -190,9 +198,15 @@ struct Opts { #[structopt(long)] progress: bool, + /// The input FASTQ(s) is not compressed + #[structopt(long)] + plain: bool, + /// The first argument is the pattern to match, with the remaining arguments containing the /// files to match. If `-e` is given, then all the arguments are files to match. /// Use standard input if either no files are given or `-` is given. + /// + /// Input files must be gzip compressed unless `--plain` is given args: Vec, } @@ -297,7 +311,7 @@ fn main() -> Result<()> { } if files.len() == 1 { // interleaved paired end FASTQ - let reader = ThreadReader::new(files[0].clone()); + let reader = ThreadReader::new(files[0].clone(), opts.plain); for reads in izip!(reader.rx.iter()) { let paired_reads = reads .into_iter() @@ -308,8 +322,8 @@ fn main() -> Result<()> { } else { // pairs of FASTQ files for file_pairs in files.chunks_exact(2) { - let reader1 = ThreadReader::new(file_pairs[0].clone()); - let reader2 = ThreadReader::new(file_pairs[1].clone()); + let reader1 = ThreadReader::new(file_pairs[0].clone(), opts.plain); + let reader2 = ThreadReader::new(file_pairs[1].clone(), opts.plain); for (reads1, reads2) in izip!(reader1.rx.iter(), reader2.rx.iter()) { let paired_reads = reads1.into_iter().zip(reads2.into_iter()).collect_vec(); process_paired_reads(paired_reads, &matcher, &writer, &progress_logger); @@ -324,7 +338,7 @@ fn main() -> Result<()> { files.push(PathBuf::from_str("-").unwrap()); } for file in files { - let reader = ThreadReader::new(file.clone()); + let reader = ThreadReader::new(file.clone(), opts.plain); for reads in reader.rx.iter() { // Get the matched reads let matched_reads: Vec = reads From 73aa9ee7b99b39423e7651c8715006d8868523db Mon Sep 17 00:00:00 2001 From: Nils Homer Date: Wed, 10 Aug 2022 21:00:04 -0700 Subject: [PATCH 07/22] fixes --- src/main.rs | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/src/main.rs b/src/main.rs index 67f8de5..25b16e7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -21,7 +21,7 @@ use itertools::{self, izip, Itertools}; use lazy_static::lazy_static; use parking_lot::Mutex; use proglog::{CountFormatterKind, ProgLog, ProgLogBuilder}; -use rayon::{prelude::*, Scope}; +use rayon::{prelude::*, Scope, ThreadPool}; use seq_io::fastq::{self, OwnedRecord, Record}; use structopt::{clap::AppSettings, StructOpt}; @@ -70,12 +70,12 @@ struct FastqWriter { } impl FastqWriter { - fn new(pool: &Scope, count: bool, paired: bool) -> Self { + fn new(pool: &ThreadPool, count: bool, paired: bool) -> Self { // TODO: try making this unbounded let (tx, rx): (Sender>, Receiver>) = bounded(WRITER_CHANNEL_SIZE); if count { - pool.spawn(move |_| { + pool.spawn(move || { let mut num_matches = 0; while let Ok(reads) = rx.recv() { num_matches += reads.len(); @@ -83,13 +83,12 @@ impl FastqWriter { if paired { num_matches /= 2; } - // TODO: count should be divided by two for paired end reads std::io::stdout() .write_all(format!("{}\n", num_matches).as_bytes()) .unwrap(); }); } else { - pool.spawn(move |_| { + pool.spawn(move || { let mut writer = BufWriter::with_capacity(BUFSIZE, std::io::stdout()); while let Ok(reads) = rx.recv() { for read in reads { @@ -115,10 +114,12 @@ impl ThreadReader { let (tx, rx) = bounded(READER_CHANNEL_SIZE); let handle = std::thread::spawn(move || { // Open the file or standad input - let raw_handle = if file == PathBuf::from_str("-").unwrap() { + let raw_handle = if file.as_os_str() == "-" { Box::new(std::io::stdin()) as Box } else { - let handle = File::open(&file).expect("error in opening file"); + let handle = File::open(&file) + .with_context(|| format!("Error opening input: {}", file.display())) + .unwrap(); Box::new(handle) as Box }; // Wrap it in a buffer @@ -291,7 +292,9 @@ fn main() -> Result<()> { .build() .unwrap(); - pool.scope(|s| { + let writer = FastqWriter::new(&pool, opts.count, opts.paired); + + pool.install(|| { let match_opts = MatcherOpts { invert_match: opts.invert_match, reverse_complement: opts.reverse_complement, @@ -302,7 +305,6 @@ fn main() -> Result<()> { (true, None) => Box::new(FixedStringSetMatcher::new(&opts.regexp, match_opts)), (false, None) => Box::new(RegexSetMatcher::new(&opts.regexp, match_opts)), }; - let writer = FastqWriter::new(s, opts.count, opts.paired); if opts.paired { if files.is_empty() { @@ -319,6 +321,7 @@ fn main() -> Result<()> { .collect_vec(); process_paired_reads(paired_reads, &matcher, &writer, &progress_logger); } + reader.handle.join().expect("Failed to join reader"); } else { // pairs of FASTQ files for file_pairs in files.chunks_exact(2) { @@ -365,6 +368,8 @@ fn main() -> Result<()> { } }); + while !writer.tx.is_empty() {} + Ok(()) } @@ -388,7 +393,7 @@ fn process_paired_reads( std::str::from_utf8(read1.head()).unwrap(), std::str::from_utf8(read2.head()).unwrap() ); - if matcher.read_match(&read1) && matcher.read_match(&read2) { + if matcher.read_match(&read1) || matcher.read_match(&read2) { Some((read1, read2)) } else { None From a70395a027880cb8b847e12cdde73c843230f720 Mon Sep 17 00:00:00 2001 From: Nils Homer Date: Wed, 10 Aug 2022 21:05:48 -0700 Subject: [PATCH 08/22] fixes --- src/main.rs | 91 ++++++++++++++++++++++------------------------------- 1 file changed, 38 insertions(+), 53 deletions(-) diff --git a/src/main.rs b/src/main.rs index 25b16e7..2f5ed33 100644 --- a/src/main.rs +++ b/src/main.rs @@ -104,45 +104,37 @@ impl FastqWriter { } } -struct ThreadReader { - handle: JoinHandle<()>, - rx: Receiver>, -} - -impl ThreadReader { - fn new(file: PathBuf, plain: bool) -> Self { - let (tx, rx) = bounded(READER_CHANNEL_SIZE); - let handle = std::thread::spawn(move || { - // Open the file or standad input - let raw_handle = if file.as_os_str() == "-" { - Box::new(std::io::stdin()) as Box - } else { - let handle = File::open(&file) - .with_context(|| format!("Error opening input: {}", file.display())) - .unwrap(); - Box::new(handle) as Box - }; - // Wrap it in a buffer - let buf_handle = BufReader::with_capacity(BUFSIZE, raw_handle); - // Maybe wrap it in a decompressor - let maybe_decoder_handle = if plain { - Box::new(buf_handle) as Box - } else { - Box::new(MultiGzDecoder::new(buf_handle)) as Box - }; - // Open a FASTQ reader, get an iterator over the records, and chunk them - let fastq_reader = fastq::Reader::with_capacity(maybe_decoder_handle, BUFSIZE) - .into_records() - .chunks(CHUNKSIZE * num_cpus::get()); - // Iterate over the chunks - for chunk in &fastq_reader { - tx.send(chunk.map(|r| r.expect("Error reading")).collect()) - .expect("Error sending"); - } - }); - - Self { handle, rx } - } +fn spawn_reader(file: PathBuf, plain: bool) -> Receiver> { + let (tx, rx) = bounded(READER_CHANNEL_SIZE); + rayon::spawn(move || { + // Open the file or standad input + let raw_handle = if file.as_os_str() == "-" { + Box::new(std::io::stdin()) as Box + } else { + let handle = File::open(&file) + .with_context(|| format!("Error opening input: {}", file.display())) + .unwrap(); + Box::new(handle) as Box + }; + // Wrap it in a buffer + let buf_handle = BufReader::with_capacity(BUFSIZE, raw_handle); + // Maybe wrap it in a decompressor + let maybe_decoder_handle = if plain { + Box::new(buf_handle) as Box + } else { + Box::new(MultiGzDecoder::new(buf_handle)) as Box + }; + // Open a FASTQ reader, get an iterator over the records, and chunk them + let fastq_reader = fastq::Reader::with_capacity(maybe_decoder_handle, BUFSIZE) + .into_records() + .chunks(CHUNKSIZE * num_cpus::get()); + // Iterate over the chunks + for chunk in &fastq_reader { + tx.send(chunk.map(|r| r.expect("Error reading")).collect()) + .expect("Error sending"); + } + }); + rx } /// A small utility to "grep" a pair of gzipped FASTQ files, outputting the read pair if the sequence @@ -313,26 +305,23 @@ fn main() -> Result<()> { } if files.len() == 1 { // interleaved paired end FASTQ - let reader = ThreadReader::new(files[0].clone(), opts.plain); - for reads in izip!(reader.rx.iter()) { + let rx = spawn_reader(files[0].clone(), opts.plain); + for reads in izip!(rx.iter()) { let paired_reads = reads .into_iter() .tuples::<(OwnedRecord, OwnedRecord)>() .collect_vec(); process_paired_reads(paired_reads, &matcher, &writer, &progress_logger); } - reader.handle.join().expect("Failed to join reader"); } else { // pairs of FASTQ files for file_pairs in files.chunks_exact(2) { - let reader1 = ThreadReader::new(file_pairs[0].clone(), opts.plain); - let reader2 = ThreadReader::new(file_pairs[1].clone(), opts.plain); - for (reads1, reads2) in izip!(reader1.rx.iter(), reader2.rx.iter()) { + let rx1 = spawn_reader(file_pairs[0].clone(), opts.plain); + let rx2 = spawn_reader(file_pairs[1].clone(), opts.plain); + for (reads1, reads2) in izip!(rx1.iter(), rx2.iter()) { let paired_reads = reads1.into_iter().zip(reads2.into_iter()).collect_vec(); process_paired_reads(paired_reads, &matcher, &writer, &progress_logger); } - reader1.handle.join().expect("Failed to join reader"); - reader2.handle.join().expect("Failed to join reader"); } } } else { @@ -341,8 +330,8 @@ fn main() -> Result<()> { files.push(PathBuf::from_str("-").unwrap()); } for file in files { - let reader = ThreadReader::new(file.clone(), opts.plain); - for reads in reader.rx.iter() { + let rx = spawn_reader(file.clone(), opts.plain); + for reads in rx.iter() { // Get the matched reads let matched_reads: Vec = reads .into_par_iter() @@ -362,14 +351,10 @@ fn main() -> Result<()> { let _lock = writer.lock.lock(); writer.tx.send(matched_reads).expect("Failed to send read"); } - - reader.handle.join().expect("Failed to join reader"); } } }); - while !writer.tx.is_empty() {} - Ok(()) } From 1e9de49676e067fac76ead231d5692a5a74d8771 Mon Sep 17 00:00:00 2001 From: Nils Homer Date: Wed, 10 Aug 2022 21:07:45 -0700 Subject: [PATCH 09/22] fix --- src/main.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main.rs b/src/main.rs index 2f5ed33..52e2581 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,7 +6,6 @@ use std::{ io::{BufRead, BufReader, BufWriter, Read, Write}, path::PathBuf, str::FromStr, - thread::JoinHandle, }; use anyhow::{bail, ensure, Context, Result}; @@ -21,7 +20,7 @@ use itertools::{self, izip, Itertools}; use lazy_static::lazy_static; use parking_lot::Mutex; use proglog::{CountFormatterKind, ProgLog, ProgLogBuilder}; -use rayon::{prelude::*, Scope, ThreadPool}; +use rayon::{prelude::*, ThreadPool}; use seq_io::fastq::{self, OwnedRecord, Record}; use structopt::{clap::AppSettings, StructOpt}; From f4bb927b81b923952dc2ab86e44e9275a81017f6 Mon Sep 17 00:00:00 2001 From: Nils Homer Date: Wed, 10 Aug 2022 22:19:44 -0700 Subject: [PATCH 10/22] improve exit code --- src/lib/mod.rs | 30 +++++++++++++++++- src/main.rs | 86 ++++++++++++++++++++++++++++++++++++++++++++------ 2 files changed, 105 insertions(+), 11 deletions(-) diff --git a/src/lib/mod.rs b/src/lib/mod.rs index 84899de..a10984d 100644 --- a/src/lib/mod.rs +++ b/src/lib/mod.rs @@ -7,7 +7,7 @@ )] pub mod matcher; use lazy_static::lazy_static; -use std::borrow::Borrow; +use std::{borrow::Borrow, path::Path}; lazy_static! { pub static ref DNA_BASES: [u8; 5] = { @@ -62,3 +62,31 @@ where .map(|a| complement(*a.borrow())) .collect() } + +/// Returns true if the path ends with a recognized GZIP file extension +fn is_path_with_extension>(p: &P, extensions: [&str; 2]) -> bool { + if let Some(ext) = p.as_ref().extension() { + match ext.to_str() { + Some(x) => extensions.contains(&x), + None => false, + } + } else { + false + } +} + +/// The set of file extensions to treat as GZIPPED +const GZIP_EXTENSIONS: [&str; 2] = ["gz", "bgz"]; + +/// Returns true if the path ends with a recognized GZIP file extension +pub fn is_gzip_path>(p: &P) -> bool { + is_path_with_extension(p, GZIP_EXTENSIONS) +} + +/// The set of file extensions to treat as FASTQ +const FASTQ_EXTENSIONS: [&str; 2] = ["fastq", "fq"]; + +/// Returns true if the path ends with a recognized FASTQ file extension +pub fn is_fastq_path>(p: &P) -> bool { + is_path_with_extension(p, FASTQ_EXTENSIONS) +} diff --git a/src/main.rs b/src/main.rs index 52e2581..47ce4c9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,7 @@ #[global_allocator] static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; +use std::process::ExitCode; use std::{ fs::File, io::{BufRead, BufReader, BufWriter, Read, Write}, @@ -15,6 +16,7 @@ use flume::{bounded, Receiver, Sender}; use fqgrep_lib::matcher::{ FixedStringMatcher, FixedStringSetMatcher, Matcher, MatcherOpts, RegexMatcher, RegexSetMatcher, }; +use fqgrep_lib::{is_fastq_path, is_gzip_path}; use gzp::BUFSIZE; use itertools::{self, izip, Itertools}; use lazy_static::lazy_static; @@ -69,7 +71,7 @@ struct FastqWriter { } impl FastqWriter { - fn new(pool: &ThreadPool, count: bool, paired: bool) -> Self { + fn new(count_tx: Sender, pool: &ThreadPool, count: bool, paired: bool) -> Self { // TODO: try making this unbounded let (tx, rx): (Sender>, Receiver>) = bounded(WRITER_CHANNEL_SIZE); @@ -85,17 +87,29 @@ impl FastqWriter { std::io::stdout() .write_all(format!("{}\n", num_matches).as_bytes()) .unwrap(); + + count_tx + .send(num_matches) + .expect("failed sending final count"); }); } else { pool.spawn(move || { + let mut num_matches = 0; let mut writer = BufWriter::with_capacity(BUFSIZE, std::io::stdout()); while let Ok(reads) = rx.recv() { + num_matches += reads.len(); for read in reads { fastq::write_to(&mut writer, &read.head, &read.seq, &read.qual) .expect("failed writing read"); } } + if paired { + num_matches /= 2; + } writer.flush().expect("Error flushing writer"); + count_tx + .send(num_matches) + .expect("failed sending final count"); }); } let lock = Mutex::new(()); @@ -118,10 +132,14 @@ fn spawn_reader(file: PathBuf, plain: bool) -> Receiver> { // Wrap it in a buffer let buf_handle = BufReader::with_capacity(BUFSIZE, raw_handle); // Maybe wrap it in a decompressor - let maybe_decoder_handle = if plain { - Box::new(buf_handle) as Box - } else { - Box::new(MultiGzDecoder::new(buf_handle)) as Box + let maybe_decoder_handle = { + // plaintext if your **not** a gzip path AND you're either a FASTQ or plain is true + let is_plain = !is_gzip_path(&file) && (is_fastq_path(&file) || plain); + if is_plain { + Box::new(buf_handle) as Box + } else { + Box::new(MultiGzDecoder::new(buf_handle)) as Box + } }; // Open a FASTQ reader, get an iterator over the records, and chunk them let fastq_reader = fastq::Reader::with_capacity(maybe_decoder_handle, BUFSIZE) @@ -136,8 +154,27 @@ fn spawn_reader(file: PathBuf, plain: bool) -> Receiver> { rx } -/// A small utility to "grep" a pair of gzipped FASTQ files, outputting the read pair if the sequence -/// matches either the given ref sequence, the given alt sequence, or both. +/// The fqgrep utility searches any given input FASTQ files, selecting records whose bases match +/// one or more patterns. By default, a pattern matches the bases in a FASTQ record if the +/// regular expression (RE) in the pattern matches the bases. An empty expression matches every +/// line. Each FASTQ record that matches at least one of the patterns is written to the standard +/// output. +/// +/// INPUT COMPRESSION +/// +/// By default, the input files must be GZIP compressed. If the input files are real files and end +/// with `.gz` or `.bgz`, they are assumed to be GZIP compressed. If they end with `.fastq` or +/// `.fq`, they are assumed to be plaintext (uncompressed). All other inputs are assumed to be +/// GZIP compressed, including standard input. The `--plain` option may be used to treat standard +/// input and any unrecongized inputs as plaintext (uncompressed). +/// +/// EXIT STATUS +/// The fqgrep utility exits with one of the following values: +/// +/// 0 One or more lines were selected. +/// 1 No lines were selected. +/// >1 An error occurred. +/// #[derive(StructOpt, Debug)] #[structopt( name = "fqgrep", @@ -212,8 +249,31 @@ fn read_patterns(file: &PathBuf) -> Result> { Ok(v) } +fn main() -> ExitCode { + // Set the exit code: + // - exit code 0 if there were matches + // - exit code 1 if there were no matches + // - exit code 2 if fqgrep returned an error + // - exit code 101 if fqgrep panicked + let outer = std::panic::catch_unwind(fqgrep); + match outer { + Err(_) => { + eprintln!("Error: fqgrep panicked. Please report this as a bug!"); + ExitCode::from(101) + } + Ok(inner) => match inner { + Ok(0) => ExitCode::from(1), + Ok(_) => ExitCode::SUCCESS, + Err(e) => { + eprintln!("Error: {}", e); + ExitCode::from(2) + } + }, + } +} + #[allow(clippy::too_many_lines)] -fn main() -> Result<()> { +fn fqgrep() -> Result { let mut opts = setup(); if let Some(file) = &opts.file { @@ -283,7 +343,8 @@ fn main() -> Result<()> { .build() .unwrap(); - let writer = FastqWriter::new(&pool, opts.count, opts.paired); + let (count_tx, count_rx): (Sender, Receiver) = bounded(1); + let writer = FastqWriter::new(count_tx, &pool, opts.count, opts.paired); pool.install(|| { let match_opts = MatcherOpts { @@ -354,7 +415,12 @@ fn main() -> Result<()> { } }); - Ok(()) + drop(writer); // so count_tx.send will execute + + match count_rx.recv() { + Ok(count) => Ok(count), + Err(error) => Err(error).with_context(|| "failed receive final match counts"), + } } #[allow(clippy::borrowed_box)] // FIXME: remove me later and solve From 120940fda6e1867856f946bfd5469239bf182f97 Mon Sep 17 00:00:00 2001 From: Nils Homer Date: Thu, 11 Aug 2022 23:56:02 -0700 Subject: [PATCH 11/22] Add coloring --- Cargo.lock | 92 ++++++++++++++++++--- Cargo.toml | 3 + src/lib/matcher.rs | 202 ++++++++++++++++++++++++++++++++++++++++----- src/lib/mod.rs | 1 + src/main.rs | 107 +++++++++++++++++------- 5 files changed, 343 insertions(+), 62 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3b734ef..e3a7760 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -55,6 +55,18 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" +[[package]] +name = "bitvec" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bc2832c24239b0141d5674bb9174f9d68a8b5b3f2753311927c172ca46f7e9c" +dependencies = [ + "funty", + "radium", + "tap", + "wyz", +] + [[package]] name = "bstr" version = "0.2.17" @@ -126,6 +138,12 @@ dependencies = [ "jobserver", ] +[[package]] +name = "cfg-if" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" + [[package]] name = "cfg-if" version = "1.0.0" @@ -174,7 +192,7 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", ] [[package]] @@ -183,7 +201,7 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2801af0d36612ae591caa9568261fddce32ce6e08a7275ea334a06a4ad021a2c" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "crossbeam-channel", "crossbeam-deque", "crossbeam-epoch", @@ -197,7 +215,7 @@ version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2dd04ddaf88237dc3b8d8f9a3c1004b506b54b3313403944054d23c0870c521" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "crossbeam-utils", ] @@ -207,7 +225,7 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "715e8152b692bba2d374b53d4875445368fdf21a94751410af607a5ac677d1fc" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "crossbeam-epoch", "crossbeam-utils", ] @@ -219,7 +237,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "045ebe27666471bb549370b4b0b3e51b07f56325befa4284db65fc89c02511b1" dependencies = [ "autocfg", - "cfg-if", + "cfg-if 1.0.0", "crossbeam-utils", "memoffset", "once_cell", @@ -232,7 +250,7 @@ version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1cd42583b04998a5363558e5f9291ee5a5ff6b49944332103f251e7479a82aa7" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "crossbeam-utils", ] @@ -242,7 +260,7 @@ version = "0.8.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "51887d4adc7b564537b15adcfb307936f8075dfcd5f00dde9a9f1d29383682bc" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "once_cell", ] @@ -325,7 +343,9 @@ dependencies = [ name = "fqgrep" version = "0.1.1-alpha.1" dependencies = [ + "ansi_term", "anyhow", + "bitvec", "bstr", "built", "csv", @@ -333,6 +353,7 @@ dependencies = [ "flate2", "flume", "gzp", + "isatty", "itertools", "lazy_static", "log", @@ -347,6 +368,12 @@ dependencies = [ "structopt", ] +[[package]] +name = "funty" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c" + [[package]] name = "futures-core" version = "0.3.21" @@ -365,7 +392,7 @@ version = "0.2.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4eb1a864a501629691edf6c15a593b7a51eebaa1e8468e9ddc623de7c9b58ec6" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "js-sys", "libc", "wasi", @@ -443,7 +470,19 @@ version = "0.1.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", +] + +[[package]] +name = "isatty" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e31a8281fc93ec9693494da65fbf28c0c2aa60a2eaec25dc58e2f31952e95edc" +dependencies = [ + "cfg-if 0.1.10", + "libc", + "redox_syscall 0.1.57", + "winapi 0.3.9", ] [[package]] @@ -569,7 +608,7 @@ version = "0.4.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", ] [[package]] @@ -653,10 +692,10 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d76e8e1493bcac0d2766c42737f34458f1c8c50c0d23bcb24ea953affb273216" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "instant", "libc", - "redox_syscall", + "redox_syscall 0.2.16", "smallvec", "winapi 0.3.9", ] @@ -745,6 +784,12 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "radium" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc33ff2d4973d518d823d61aa239014831e521c75da58e3df4840d3f47749d09" + [[package]] name = "rayon" version = "1.5.3" @@ -769,6 +814,12 @@ dependencies = [ "num_cpus", ] +[[package]] +name = "redox_syscall" +version = "0.1.57" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41cc0f7e4d5d4544e8861606a285bb08d3e70712ccc7d2b84d7c0ccfaf4b05ce" + [[package]] name = "redox_syscall" version = "0.2.16" @@ -924,6 +975,12 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "tap" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" + [[package]] name = "termcolor" version = "1.1.3" @@ -1067,7 +1124,7 @@ version = "0.2.82" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc7652e3f6c4706c8d9cd54832c4a4ccb9b5336e2c3bd154d5cccfbf1c1f5f7d" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "wasm-bindgen-macro", ] @@ -1157,3 +1214,12 @@ name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + +[[package]] +name = "wyz" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30b31594f29d27036c383b53b59ed3476874d518f0efb151b27a4c275141390e" +dependencies = [ + "tap", +] diff --git a/Cargo.toml b/Cargo.toml index 10e9737..a728e37 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,13 +25,16 @@ path = "src/main.rs" lto = true [dependencies] +ansi_term = "0.12.1" anyhow = "1.0.44" +bitvec = "1.0.1" bstr = "0.2.17" csv = "1.1.6" env_logger = "0.9.0" flate2 = {version = "1.0.22", default-features = false, features = ["zlib-ng-compat"]} flume = "0.10.9" gzp = "0.9.1" +isatty = "0.1.9" itertools = "0.10.3" lazy_static = "1.4.0" log = "0.4.14" diff --git a/src/lib/matcher.rs b/src/lib/matcher.rs index d79a062..df89c21 100644 --- a/src/lib/matcher.rs +++ b/src/lib/matcher.rs @@ -1,3 +1,7 @@ +use bitvec::prelude::*; +use std::ops::Range; + +use crate::color::{COLOR_BACKGROUND, COLOR_BASES, COLOR_QUALS}; use crate::reverse_complement; use crate::DNA_BASES; use anyhow::{bail, Context, Result}; @@ -11,11 +15,113 @@ pub struct MatcherOpts { pub reverse_complement: bool, } +fn to_bitvec(ranges: impl Iterator>, len: usize) -> BitVec { + let mut vec = bitvec![0; len]; + ranges.for_each(|range| { + for index in range { + vec.set(index, true); + } + }); + vec +} + +fn bases_colored( + bases: &[u8], + quals: &[u8], + ranges: impl Iterator>, +) -> (Vec, Vec) { + let mut colored_bases = Vec::with_capacity(bases.len()); + let mut colored_quals = Vec::with_capacity(bases.len()); + let bits = to_bitvec(ranges, bases.len()); + let mut last_color_on = false; + let mut last_bases_index = 0; + let mut cur_bases_index = 0; + for base_color_on in bits.iter() { + if *base_color_on { + // this base is to be colored + if !last_color_on { + // add up to but not including this base to the colored vector **as uncolored** + if last_bases_index + 1 < cur_bases_index { + COLOR_BACKGROUND + .paint(&bases[last_bases_index..cur_bases_index]) + .write_to(&mut colored_bases) + .unwrap(); + COLOR_BACKGROUND + .paint(&quals[last_bases_index..cur_bases_index]) + .write_to(&mut colored_quals) + .unwrap(); + } + // first base in a run of bases to be colored + last_bases_index = cur_bases_index; + } + + last_color_on = true; + } else { + // this base is not to be colored + if last_color_on { + // add up to but not including this base to the colored vector **as colored** + if last_bases_index + 1 < cur_bases_index { + COLOR_BASES + .paint(&bases[last_bases_index..cur_bases_index]) + .write_to(&mut colored_bases) + .unwrap(); + COLOR_QUALS + .paint(&quals[last_bases_index..cur_bases_index]) + .write_to(&mut colored_quals) + .unwrap(); + } + // first base in a run of bases to be colored + last_bases_index = cur_bases_index; + } + last_color_on = false; + } + cur_bases_index += 1; + } + // add up to but not including this base to the colored vector **as uncolored** + if last_bases_index + 1 < cur_bases_index { + if last_color_on { + COLOR_BASES + .paint(&bases[last_bases_index..cur_bases_index]) + .write_to(&mut colored_bases) + .unwrap(); + COLOR_QUALS + .paint(&quals[last_bases_index..cur_bases_index]) + .write_to(&mut colored_quals) + .unwrap(); + } else { + COLOR_BACKGROUND + .paint(&bases[last_bases_index..cur_bases_index]) + .write_to(&mut colored_bases) + .unwrap(); + COLOR_BACKGROUND + .paint(&quals[last_bases_index..cur_bases_index]) + .write_to(&mut colored_quals) + .unwrap(); + } + } + + (colored_bases, colored_quals) +} + +pub fn validate_fixed_pattern(pattern: &str) -> Result<()> { + for (index, base) in pattern.chars().enumerate() { + if !DNA_BASES.contains(&(base as u8)) { + bail!( + "Fixed pattern must contain only DNA bases: {} .. [{}] .. {}", + &pattern[0..index], + &pattern[index..=index], + &pattern[index + 1..], + ) + } + } + Ok(()) +} + pub trait Matcher { fn opts(&self) -> MatcherOpts; fn bases_match(&self, bases: &[u8]) -> bool; - + fn color_matched_bases(&self, bases: &[u8], quals: &[u8]) -> (Vec, Vec); fn read_match(&self, read: &OwnedRecord) -> bool { if self.opts().invert_match { self.bases_match(read.seq()) @@ -38,6 +144,15 @@ impl Matcher for FixedStringMatcher { fn bases_match(&self, bases: &[u8]) -> bool { bases.find(&self.pattern).is_some() != self.opts.invert_match } + + fn color_matched_bases(&self, bases: &[u8], quals: &[u8]) -> (Vec, Vec) { + let ranges = bases.find_iter(&self.pattern).map(|start| Range { + start, + end: start + self.pattern.len(), + }); + bases_colored(bases, quals, ranges) + } + fn opts(&self) -> MatcherOpts { self.opts } @@ -48,20 +163,6 @@ impl FixedStringMatcher { let pattern = pattern.as_bytes().to_vec(); Self { pattern, opts } } - - pub fn validate(pattern: &str) -> Result<()> { - for (index, base) in pattern.chars().enumerate() { - if !DNA_BASES.contains(&(base as u8)) { - bail!( - "Fixed pattern must contain only DNA bases: {} .. [{}] .. {}", - &pattern[0..index], - &pattern[index..=index], - &pattern[index + 1..], - ) - } - } - Ok(()) - } } pub struct FixedStringSetMatcher { @@ -76,6 +177,20 @@ impl Matcher for FixedStringSetMatcher { .any(|pattern| bases.find(&pattern).is_some()) != self.opts.invert_match } + + fn color_matched_bases(&self, bases: &[u8], quals: &[u8]) -> (Vec, Vec) { + let ranges = self.patterns.iter().flat_map(|pattern| { + bases + .find_iter(&pattern) + .map(|start| Range { + start, + end: start + pattern.len(), + }) + .collect::>() + }); + bases_colored(bases, quals, ranges) + } + fn opts(&self) -> MatcherOpts { self.opts } @@ -114,13 +229,20 @@ impl Matcher for RegexMatcher { fn bases_match(&self, bases: &[u8]) -> bool { self.regex.is_match(bases) != self.opts.invert_match } + + fn color_matched_bases(&self, bases: &[u8], quals: &[u8]) -> (Vec, Vec) { + let ranges = self.regex.find_iter(bases).map(|m| m.range()); + bases_colored(bases, quals, ranges) + } + fn opts(&self) -> MatcherOpts { self.opts } } pub struct RegexSetMatcher { - regexes: RegexSet, + regex_set: RegexSet, + regex_matchers: Vec, opts: MatcherOpts, } @@ -130,16 +252,58 @@ impl RegexSetMatcher { S: AsRef, I: IntoIterator, { - let regexes = RegexSetBuilder::new(patterns).build().unwrap(); - Self { regexes, opts } + let string_patterns: Vec = patterns + .into_iter() + .map(|p| p.as_ref().to_string()) + .collect(); + let regex_set = RegexSetBuilder::new(string_patterns.clone()) + .build() + .unwrap(); + let regex_matchers: Vec = string_patterns + .into_iter() + .map(|pattern| RegexMatcher::new(pattern.as_ref(), opts)) + .collect(); + Self { + regex_set, + regex_matchers, + opts, + } } } impl Matcher for RegexSetMatcher { fn bases_match(&self, bases: &[u8]) -> bool { - self.regexes.is_match(bases) != self.opts.invert_match + self.regex_set.is_match(bases) != self.opts.invert_match + } + + fn color_matched_bases(&self, bases: &[u8], quals: &[u8]) -> (Vec, Vec) { + let ranges = self + .regex_matchers + .iter() + .flat_map(|r| r.regex.find_iter(bases).map(|m| m.range())); + + bases_colored(bases, quals, ranges) } + fn opts(&self) -> MatcherOpts { self.opts } } + +pub struct MatcherFactory; + +impl MatcherFactory { + pub fn new_matcher( + pattern: &Option, + fixed_strings: bool, + regexp: &Vec, + match_opts: MatcherOpts, + ) -> Box { + match (fixed_strings, &pattern) { + (true, Some(pattern)) => Box::new(FixedStringMatcher::new(pattern, match_opts)), + (false, Some(pattern)) => Box::new(RegexMatcher::new(pattern, match_opts)), + (true, None) => Box::new(FixedStringSetMatcher::new(regexp, match_opts)), + (false, None) => Box::new(RegexSetMatcher::new(regexp, match_opts)), + } + } +} diff --git a/src/lib/mod.rs b/src/lib/mod.rs index a10984d..530e56f 100644 --- a/src/lib/mod.rs +++ b/src/lib/mod.rs @@ -5,6 +5,7 @@ clippy::missing_errors_doc, clippy::module_name_repetitions )] +pub mod color; pub mod matcher; use lazy_static::lazy_static; use std::{borrow::Borrow, path::Path}; diff --git a/src/main.rs b/src/main.rs index 47ce4c9..69df488 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,8 @@ #[global_allocator] static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; +use fqgrep_lib::color::{color_background, color_head}; +use isatty::stdout_isatty; use std::process::ExitCode; use std::{ fs::File, @@ -8,14 +10,13 @@ use std::{ path::PathBuf, str::FromStr, }; +use structopt::clap::arg_enum; use anyhow::{bail, ensure, Context, Result}; use env_logger::Env; use flate2::bufread::MultiGzDecoder; use flume::{bounded, Receiver, Sender}; -use fqgrep_lib::matcher::{ - FixedStringMatcher, FixedStringSetMatcher, Matcher, MatcherOpts, RegexMatcher, RegexSetMatcher, -}; +use fqgrep_lib::matcher::{validate_fixed_pattern, Matcher, MatcherFactory, MatcherOpts}; use fqgrep_lib::{is_fastq_path, is_gzip_path}; use gzp::BUFSIZE; use itertools::{self, izip, Itertools}; @@ -71,7 +72,13 @@ struct FastqWriter { } impl FastqWriter { - fn new(count_tx: Sender, pool: &ThreadPool, count: bool, paired: bool) -> Self { + fn new( + count_tx: Sender, + pool: &ThreadPool, + color_matcher: Option>, + count: bool, + paired: bool, + ) -> Self { // TODO: try making this unbounded let (tx, rx): (Sender>, Receiver>) = bounded(WRITER_CHANNEL_SIZE); @@ -99,7 +106,24 @@ impl FastqWriter { while let Ok(reads) = rx.recv() { num_matches += reads.len(); for read in reads { - fastq::write_to(&mut writer, &read.head, &read.seq, &read.qual) + // Color the reads if desired + let (head, seq, qual) = if let Some(ref matcher) = color_matcher { + let bases_match = matcher.bases_match(&read.seq); + if bases_match { + let (seq, qual) = + matcher.color_matched_bases(&read.seq, &read.qual); + (color_head(&read.head), seq, qual) + } else { + ( + color_background(&read.head), + color_background(&read.seq), + color_background(&read.qual), + ) + } + } else { + (read.head, read.seq, read.qual) + }; + fastq::write_to(&mut writer, &head, &seq, &qual) .expect("failed writing read"); } } @@ -154,6 +178,15 @@ fn spawn_reader(file: PathBuf, plain: bool) -> Receiver> { rx } +arg_enum! { + #[derive(PartialEq, Debug)] + enum Color { + Never, + Always, + Auto, +} +} + /// The fqgrep utility searches any given input FASTQ files, selecting records whose bases match /// one or more patterns. By default, a pattern matches the bases in a FASTQ record if the /// regular expression (RE) in the pattern matches the bases. An empty expression matches every @@ -188,12 +221,10 @@ struct Opts { #[structopt(long, short = "t", default_value = NUM_CPU.as_str())] threads: usize, - /// Treat the input files as paired. The number of input files must be a multiple of two, - /// with the first file being R1, second R2, third R1, fourth R2, and so on. If the pattern - /// matches either R1 or R2, then both R1 and R2 will be output (interleaved). If the input - /// is standard input, then treat the input as interlaved paired end reads. - #[structopt(long)] - paired: bool, + // TODO: support GREP_COLOR(S) (or FQGREP_COLOR(S)) environment variables + /// Mark up the matching text. The possible values of when are “never”, “always” and “auto”. + #[structopt(long = "color", default_value = "never")] + color: Color, /// Only a count of selected lines is written to standard output. #[structopt(long, short = "c")] @@ -215,14 +246,21 @@ struct Opts { #[structopt(long, short = "f")] file: Option, - /// Search the reverse complement for matches. - #[structopt(long)] - reverse_complement: bool, - /// Selected lines are those not matching any of the specified patterns #[structopt(short = "v")] invert_match: bool, + /// Treat the input files as paired. The number of input files must be a multiple of two, + /// with the first file being R1, second R2, third R1, fourth R2, and so on. If the pattern + /// matches either R1 or R2, then both R1 and R2 will be output (interleaved). If the input + /// is standard input, then treat the input as interlaved paired end reads. + #[structopt(long)] + paired: bool, + + /// Search the reverse complement for matches. + #[structopt(long)] + reverse_complement: bool, + /// Write progress information #[structopt(long)] progress: bool, @@ -314,10 +352,10 @@ fn fqgrep() -> Result { if opts.fixed_strings { if let Some(pattern) = &pattern { - FixedStringMatcher::validate(pattern)?; + validate_fixed_pattern(pattern)?; } else if !opts.regexp.is_empty() { for pattern in &opts.regexp { - FixedStringMatcher::validate(pattern)?; + validate_fixed_pattern(pattern)?; } } else { bail!("A pattern must be given as a positional argument or with -e/--regexp") @@ -338,26 +376,35 @@ fn fqgrep() -> Result { None }; + let match_opts = MatcherOpts { + invert_match: opts.invert_match, + reverse_complement: opts.reverse_complement, + }; + + // The matcher used in the primary search + let matcher: Box = + MatcherFactory::new_matcher(&pattern, opts.fixed_strings, &opts.regexp, match_opts); + + // The matcher used if the output is to be colored, None otherwise + let color_matcher = { + if opts.color == Color::Always || (opts.color == Color::Auto && stdout_isatty()) { + let matcher: Box = + MatcherFactory::new_matcher(&pattern, opts.fixed_strings, &opts.regexp, match_opts); + Some(matcher) + } else { + None + } + }; + let pool = rayon::ThreadPoolBuilder::new() .num_threads(opts.threads) .build() .unwrap(); let (count_tx, count_rx): (Sender, Receiver) = bounded(1); - let writer = FastqWriter::new(count_tx, &pool, opts.count, opts.paired); + let writer = FastqWriter::new(count_tx, &pool, color_matcher, opts.count, opts.paired); pool.install(|| { - let match_opts = MatcherOpts { - invert_match: opts.invert_match, - reverse_complement: opts.reverse_complement, - }; - let matcher: Box = match (opts.fixed_strings, &pattern) { - (true, Some(pattern)) => Box::new(FixedStringMatcher::new(pattern, match_opts)), - (false, Some(pattern)) => Box::new(RegexMatcher::new(pattern, match_opts)), - (true, None) => Box::new(FixedStringSetMatcher::new(&opts.regexp, match_opts)), - (false, None) => Box::new(RegexSetMatcher::new(&opts.regexp, match_opts)), - }; - if opts.paired { if files.is_empty() { // read from standad input @@ -426,7 +473,7 @@ fn fqgrep() -> Result { #[allow(clippy::borrowed_box)] // FIXME: remove me later and solve fn process_paired_reads( reads: Vec<(OwnedRecord, OwnedRecord)>, - matcher: &Box, + matcher: &Box, writer: &FastqWriter, progress_logger: &Option, ) { From 37ff2b3f2d1c91c980bef2568b040f4ddec94fbf Mon Sep 17 00:00:00 2001 From: Nils Homer Date: Fri, 12 Aug 2022 00:12:33 -0700 Subject: [PATCH 12/22] usage fix --- src/main.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/main.rs b/src/main.rs index 69df488..69fc75d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -202,11 +202,9 @@ arg_enum! { /// input and any unrecongized inputs as plaintext (uncompressed). /// /// EXIT STATUS -/// The fqgrep utility exits with one of the following values: /// -/// 0 One or more lines were selected. -/// 1 No lines were selected. -/// >1 An error occurred. +/// The fqgrep utility exits with one of the following values: +/// 0 if one or more lines were selected, 1 if no lines were selected, and >1 if an error occurred. /// #[derive(StructOpt, Debug)] #[structopt( @@ -236,7 +234,7 @@ struct Opts { #[structopt(long, short = "e")] regexp: Vec, - // Interpret pattern as a set of fixed strings + /// Interpret pattern as a set of fixed strings #[structopt(long, short = "F")] fixed_strings: bool, From bc9643734e9c79a1a4f6340a8b3926e19e5b923f Mon Sep 17 00:00:00 2001 From: Nils Homer Date: Fri, 12 Aug 2022 00:14:11 -0700 Subject: [PATCH 13/22] adding missing source --- src/lib/color.rs | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) create mode 100644 src/lib/color.rs diff --git a/src/lib/color.rs b/src/lib/color.rs new file mode 100644 index 0000000..c4c33c8 --- /dev/null +++ b/src/lib/color.rs @@ -0,0 +1,36 @@ +use ansi_term::Color; + +use lazy_static::lazy_static; + +lazy_static! { + /// Color for matching bases in a FASTQ + pub static ref COLOR_BASES: Color = Color::Red; + /// Color for matching base qualities in a FASTQ + pub static ref COLOR_QUALS: Color = Color::Fixed(22); + /// Color for a matching read name (head) of a FASTQ + pub static ref COLOR_HEAD: Color = Color::Fixed(30); + /// Color for all non-matching text in a FASTQ + pub static ref COLOR_BACKGROUND: Color = Color::Fixed(240); +} + +/// Colors the text with the given color +pub fn color(text: &[u8], colour: &Color) -> Vec { + let mut colored: Vec = Vec::with_capacity(text.len()); + colour.paint(text).write_to(&mut colored).unwrap(); + colored +} + +/// Color for the read name (head) of a FASTQ +pub fn color_head(text: &[u8]) -> Vec { + color(text, &COLOR_HEAD) +} + +/// Color for the base qualities of a FASTQ +pub fn color_quals(text: &[u8]) -> Vec { + color(text, &COLOR_QUALS) +} + +/// Color for non-matching bases, quals, and other text +pub fn color_background(text: &[u8]) -> Vec { + color(text, &COLOR_BACKGROUND) +} From 5da7b5422e207f76ec0a131b98c2b236b5cec897 Mon Sep 17 00:00:00 2001 From: Nils Homer Date: Fri, 12 Aug 2022 00:33:41 -0700 Subject: [PATCH 14/22] improve docs --- src/lib/matcher.rs | 30 +++++++++++++++++++++++++++++- src/main.rs | 42 ++++++++++++++++++++++++++++++------------ 2 files changed, 59 insertions(+), 13 deletions(-) diff --git a/src/lib/matcher.rs b/src/lib/matcher.rs index df89c21..0292fc9 100644 --- a/src/lib/matcher.rs +++ b/src/lib/matcher.rs @@ -9,12 +9,17 @@ use bstr::ByteSlice; use regex::bytes::{Regex, RegexBuilder, RegexSet, RegexSetBuilder}; use seq_io::fastq::{OwnedRecord, Record}; +/// Common options for pattern matchers #[derive(Copy, Clone, Debug)] pub struct MatcherOpts { + /// Invert the matching. The bases are said to match if they do not contain the pattern pub invert_match: bool, + /// Include the reverse complement of the bases. The bases are said to match if they or their + /// reverse complement contains the pattern. pub reverse_complement: bool, } +/// Builds a bit vector from an iterator of ranges. The ranges may overlap each other. fn to_bitvec(ranges: impl Iterator>, len: usize) -> BitVec { let mut vec = bitvec![0; len]; ranges.for_each(|range| { @@ -25,14 +30,22 @@ fn to_bitvec(ranges: impl Iterator>, len: usize) -> BitVec { vec } +/// Color the bases and qualities based on ranges specifying where a pattern matched bases. The +/// ranges may overlap each other. fn bases_colored( bases: &[u8], quals: &[u8], ranges: impl Iterator>, ) -> (Vec, Vec) { + // The resulting colored bases let mut colored_bases = Vec::with_capacity(bases.len()); let mut colored_quals = Vec::with_capacity(bases.len()); + + // Merge the ranges into a bit mask, with 1 indicating that base is part of a pattern match let bits = to_bitvec(ranges, bases.len()); + + // Iterate over the bit mask, finding stretches of matching and non-matching bases. Color both + // in both the bases and qualities let mut last_color_on = false; let mut last_bases_index = 0; let mut cur_bases_index = 0; @@ -77,7 +90,7 @@ fn bases_colored( } cur_bases_index += 1; } - // add up to but not including this base to the colored vector **as uncolored** + // Color to the end if last_bases_index + 1 < cur_bases_index { if last_color_on { COLOR_BASES @@ -103,6 +116,7 @@ fn bases_colored( (colored_bases, colored_quals) } +/// Validates that a given FIXED pattern contains only valid DNA bases (ACGTN) pub fn validate_fixed_pattern(pattern: &str) -> Result<()> { for (index, base) in pattern.chars().enumerate() { if !DNA_BASES.contains(&(base as u8)) { @@ -117,11 +131,20 @@ pub fn validate_fixed_pattern(pattern: &str) -> Result<()> { Ok(()) } +/// Base trait for all pattern matchers pub trait Matcher { + /// The options for the pattern matcher fn opts(&self) -> MatcherOpts; + /// Returns true if the bases match the pattern, false otherwise fn bases_match(&self, bases: &[u8]) -> bool; + + /// Colors the bases and qualities based on where they match the pattern. All bases + /// that match the patter are colored. Colored in this case means adding ANSI color + /// codes for printing to a terminal. fn color_matched_bases(&self, bases: &[u8], quals: &[u8]) -> (Vec, Vec); + + /// Returns true if the read's bases match the pattern, false otherwise fn read_match(&self, read: &OwnedRecord) -> bool { if self.opts().invert_match { self.bases_match(read.seq()) @@ -135,6 +158,7 @@ pub trait Matcher { } } +/// Matcher for a fixed string pattern pub struct FixedStringMatcher { pattern: Vec, opts: MatcherOpts, @@ -165,6 +189,7 @@ impl FixedStringMatcher { } } +/// Matcher for a set of fixed string patterns pub struct FixedStringSetMatcher { patterns: Vec>, opts: MatcherOpts, @@ -210,6 +235,7 @@ impl FixedStringSetMatcher { } } +/// Matcher for a regular expression pattern pub struct RegexMatcher { regex: Regex, opts: MatcherOpts, @@ -246,6 +272,7 @@ pub struct RegexSetMatcher { opts: MatcherOpts, } +/// Matcher for a set of regular expression patterns impl RegexSetMatcher { pub fn new(patterns: I, opts: MatcherOpts) -> Self where @@ -290,6 +317,7 @@ impl Matcher for RegexSetMatcher { } } +/// Factory for building a matcher pub struct MatcherFactory; impl MatcherFactory { diff --git a/src/main.rs b/src/main.rs index 69fc75d..6045c73 100644 --- a/src/main.rs +++ b/src/main.rs @@ -312,14 +312,18 @@ fn main() -> ExitCode { fn fqgrep() -> Result { let mut opts = setup(); + // Add patterns from a file if given if let Some(file) = &opts.file { for pattern in read_patterns(file)? { opts.regexp.push(pattern); } } + // Inspect the positional arguments to extract a fixed pattern let (pattern, mut files): (Option, Vec) = { let (pattern, file_strings): (Option, Vec) = if opts.regexp.is_empty() { + // No patterns given by -e, so assume the first positional argument is the pattern and + // the rest are files ensure!( !opts.args.is_empty(), "Pattern must be given with -e or as the first positional argument " @@ -327,9 +331,11 @@ fn fqgrep() -> Result { let files = opts.args.iter().skip(1).cloned().collect(); (Some(opts.args[0].clone()), files) } else { + // Patterns given by -e, so assume all positional arguments are files (None, opts.args.clone()) }; + // Convert file strings into paths let files = file_strings .iter() .map(|s| { @@ -341,6 +347,7 @@ fn fqgrep() -> Result { (pattern, files) }; + // Ensure that if multiple files are given, its a multiple of two. if opts.paired { ensure!( files.len() <= 1 || files.len() % 2 == 0, @@ -348,6 +355,7 @@ fn fqgrep() -> Result { ); } + // Validate the fixed string pattern, if fixed-strings are specified if opts.fixed_strings { if let Some(pattern) = &pattern { validate_fixed_pattern(pattern)?; @@ -360,6 +368,7 @@ fn fqgrep() -> Result { } } + // Set up a progress logger if desired let progress_logger = if opts.progress { Some( ProgLogBuilder::new() @@ -374,6 +383,7 @@ fn fqgrep() -> Result { None }; + // Build the common pattern matching options let match_opts = MatcherOpts { invert_match: opts.invert_match, reverse_complement: opts.reverse_complement, @@ -383,7 +393,7 @@ fn fqgrep() -> Result { let matcher: Box = MatcherFactory::new_matcher(&pattern, opts.fixed_strings, &opts.regexp, match_opts); - // The matcher used if the output is to be colored, None otherwise + // Some matcher used if the output is to be colored, None otherwise let color_matcher = { if opts.color == Color::Always || (opts.color == Color::Auto && stdout_isatty()) { let matcher: Box = @@ -394,22 +404,30 @@ fn fqgrep() -> Result { } }; + // The thread pool from which threads are spanwed. let pool = rayon::ThreadPoolBuilder::new() .num_threads(opts.threads) .build() .unwrap(); + // Sender and receiver channels for the final count of matching records let (count_tx, count_rx): (Sender, Receiver) = bounded(1); + + // The writer of final counts or matching records let writer = FastqWriter::new(count_tx, &pool, color_matcher, opts.count, opts.paired); + // The main loop pool.install(|| { + // If no files, use "-" to signify standard input. + if files.is_empty() { + // read from standard input + files.push(PathBuf::from_str("-").unwrap()); + } if opts.paired { - if files.is_empty() { - // read from standad input - files.push(PathBuf::from_str("-").unwrap()); - } + // Either an interleaved paired end FASTQ, or pairs of FASTQs if files.len() == 1 { - // interleaved paired end FASTQ + // Interleaved paired end FASTQ + // The channel FASTQ record chunks are received after being read in let rx = spawn_reader(files[0].clone(), opts.plain); for reads in izip!(rx.iter()) { let paired_reads = reads @@ -419,8 +437,9 @@ fn fqgrep() -> Result { process_paired_reads(paired_reads, &matcher, &writer, &progress_logger); } } else { - // pairs of FASTQ files + // Pairs of FASTQ files for file_pairs in files.chunks_exact(2) { + // The channels for R1 and R2 with FASTQ record chunks that are received after being read in let rx1 = spawn_reader(file_pairs[0].clone(), opts.plain); let rx2 = spawn_reader(file_pairs[1].clone(), opts.plain); for (reads1, reads2) in izip!(rx1.iter(), rx2.iter()) { @@ -430,11 +449,9 @@ fn fqgrep() -> Result { } } } else { - if files.is_empty() { - // read from standad input - files.push(PathBuf::from_str("-").unwrap()); - } + // Process one FSATQ at a time for file in files { + // The channel FASTQ record chunks are received after being read in let rx = spawn_reader(file.clone(), opts.plain); for reads in rx.iter() { // Get the matched reads @@ -461,13 +478,14 @@ fn fqgrep() -> Result { }); drop(writer); // so count_tx.send will execute - + // Get the final count of records matched match count_rx.recv() { Ok(count) => Ok(count), Err(error) => Err(error).with_context(|| "failed receive final match counts"), } } +/// Process a chunk of paired end records in parallel. #[allow(clippy::borrowed_box)] // FIXME: remove me later and solve fn process_paired_reads( reads: Vec<(OwnedRecord, OwnedRecord)>, From 99d976ef9f0d3b19f1e8465122419e238813cffd Mon Sep 17 00:00:00 2001 From: Nils Homer Date: Fri, 12 Aug 2022 00:46:20 -0700 Subject: [PATCH 15/22] change --plain to --decompress to match GREP --- src/main.rs | 37 ++++++++++++++++++------------------- 1 file changed, 18 insertions(+), 19 deletions(-) diff --git a/src/main.rs b/src/main.rs index 6045c73..5b91775 100644 --- a/src/main.rs +++ b/src/main.rs @@ -141,7 +141,7 @@ impl FastqWriter { } } -fn spawn_reader(file: PathBuf, plain: bool) -> Receiver> { +fn spawn_reader(file: PathBuf, decompress: bool) -> Receiver> { let (tx, rx) = bounded(READER_CHANNEL_SIZE); rayon::spawn(move || { // Open the file or standad input @@ -157,12 +157,11 @@ fn spawn_reader(file: PathBuf, plain: bool) -> Receiver> { let buf_handle = BufReader::with_capacity(BUFSIZE, raw_handle); // Maybe wrap it in a decompressor let maybe_decoder_handle = { - // plaintext if your **not** a gzip path AND you're either a FASTQ or plain is true - let is_plain = !is_gzip_path(&file) && (is_fastq_path(&file) || plain); - if is_plain { - Box::new(buf_handle) as Box - } else { + let is_gzip = is_gzip_path(&file) || (!is_fastq_path(&file) && decompress); + if is_gzip { Box::new(MultiGzDecoder::new(buf_handle)) as Box + } else { + Box::new(buf_handle) as Box } }; // Open a FASTQ reader, get an iterator over the records, and chunk them @@ -195,11 +194,11 @@ arg_enum! { /// /// INPUT COMPRESSION /// -/// By default, the input files must be GZIP compressed. If the input files are real files and end -/// with `.gz` or `.bgz`, they are assumed to be GZIP compressed. If they end with `.fastq` or -/// `.fq`, they are assumed to be plaintext (uncompressed). All other inputs are assumed to be -/// GZIP compressed, including standard input. The `--plain` option may be used to treat standard -/// input and any unrecongized inputs as plaintext (uncompressed). +/// By default, the input files are assumed to be uncompressed with the following exceptions: (1) +/// If the input files are real files and end with `.gz` or `.bgz`, they are assumed to be GZIP +/// compressed, or (2) if they end with `.fastq` or `.fq`, they are assumed to be uncompressed, or +/// (3) if the `-Z/--decompress` option is specified then any unrecongized inputs (including +/// standard input) are assumed to be GZIP compressed. /// /// EXIT STATUS /// @@ -248,6 +247,10 @@ struct Opts { #[structopt(short = "v")] invert_match: bool, + /// Assume all unrecognized inputs are GZIP compressed. + #[structopt(short = "Z", long)] + decompress: bool, + /// Treat the input files as paired. The number of input files must be a multiple of two, /// with the first file being R1, second R2, third R1, fourth R2, and so on. If the pattern /// matches either R1 or R2, then both R1 and R2 will be output (interleaved). If the input @@ -263,10 +266,6 @@ struct Opts { #[structopt(long)] progress: bool, - /// The input FASTQ(s) is not compressed - #[structopt(long)] - plain: bool, - /// The first argument is the pattern to match, with the remaining arguments containing the /// files to match. If `-e` is given, then all the arguments are files to match. /// Use standard input if either no files are given or `-` is given. @@ -428,7 +427,7 @@ fn fqgrep() -> Result { if files.len() == 1 { // Interleaved paired end FASTQ // The channel FASTQ record chunks are received after being read in - let rx = spawn_reader(files[0].clone(), opts.plain); + let rx = spawn_reader(files[0].clone(), opts.decompress); for reads in izip!(rx.iter()) { let paired_reads = reads .into_iter() @@ -440,8 +439,8 @@ fn fqgrep() -> Result { // Pairs of FASTQ files for file_pairs in files.chunks_exact(2) { // The channels for R1 and R2 with FASTQ record chunks that are received after being read in - let rx1 = spawn_reader(file_pairs[0].clone(), opts.plain); - let rx2 = spawn_reader(file_pairs[1].clone(), opts.plain); + let rx1 = spawn_reader(file_pairs[0].clone(), opts.decompress); + let rx2 = spawn_reader(file_pairs[1].clone(), opts.decompress); for (reads1, reads2) in izip!(rx1.iter(), rx2.iter()) { let paired_reads = reads1.into_iter().zip(reads2.into_iter()).collect_vec(); process_paired_reads(paired_reads, &matcher, &writer, &progress_logger); @@ -452,7 +451,7 @@ fn fqgrep() -> Result { // Process one FSATQ at a time for file in files { // The channel FASTQ record chunks are received after being read in - let rx = spawn_reader(file.clone(), opts.plain); + let rx = spawn_reader(file.clone(), opts.decompress); for reads in rx.iter() { // Get the matched reads let matched_reads: Vec = reads From e8fe3baa24f1670fc24d1696fc28df4dff376b18 Mon Sep 17 00:00:00 2001 From: Nils Homer Date: Sun, 14 Aug 2022 09:01:41 -0700 Subject: [PATCH 16/22] simplify matching with coloring --- src/lib/matcher.rs | 24 ++++++++++++++++++-- src/main.rs | 55 +++++++++++----------------------------------- 2 files changed, 35 insertions(+), 44 deletions(-) diff --git a/src/lib/matcher.rs b/src/lib/matcher.rs index 0292fc9..204a99c 100644 --- a/src/lib/matcher.rs +++ b/src/lib/matcher.rs @@ -1,6 +1,7 @@ use bitvec::prelude::*; use std::ops::Range; +use crate::color::{color_background, color_head}; use crate::color::{COLOR_BACKGROUND, COLOR_BASES, COLOR_QUALS}; use crate::reverse_complement; use crate::DNA_BASES; @@ -17,6 +18,8 @@ pub struct MatcherOpts { /// Include the reverse complement of the bases. The bases are said to match if they or their /// reverse complement contains the pattern. pub reverse_complement: bool, + /// Color the read based on the match + pub color: bool, } /// Builds a bit vector from an iterator of ranges. The ranges may overlap each other. @@ -145,8 +148,8 @@ pub trait Matcher { fn color_matched_bases(&self, bases: &[u8], quals: &[u8]) -> (Vec, Vec); /// Returns true if the read's bases match the pattern, false otherwise - fn read_match(&self, read: &OwnedRecord) -> bool { - if self.opts().invert_match { + fn read_match(&self, read: &mut OwnedRecord) -> bool { + let match_found = if self.opts().invert_match { self.bases_match(read.seq()) && (self.opts().reverse_complement && self.bases_match(&reverse_complement(read.seq()))) @@ -154,7 +157,23 @@ pub trait Matcher { self.bases_match(read.seq()) || (self.opts().reverse_complement && self.bases_match(&reverse_complement(read.seq()))) + }; + + if self.opts().color { + if match_found { + let (seq, qual) = self.color_matched_bases(&read.seq, &read.qual); + read.head = color_head(&read.head); + read.seq = seq; + read.qual = qual; + } else { + // always color, in case the read is paired + read.head = color_background(&read.head); + read.seq = color_background(&read.seq); + read.qual = color_background(&read.qual); + } } + + match_found } } @@ -318,6 +337,7 @@ impl Matcher for RegexSetMatcher { } /// Factory for building a matcher +/// pub struct MatcherFactory; impl MatcherFactory { diff --git a/src/main.rs b/src/main.rs index 5b91775..a380dbe 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,6 @@ #[global_allocator] static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; -use fqgrep_lib::color::{color_background, color_head}; use isatty::stdout_isatty; use std::process::ExitCode; use std::{ @@ -72,13 +71,7 @@ struct FastqWriter { } impl FastqWriter { - fn new( - count_tx: Sender, - pool: &ThreadPool, - color_matcher: Option>, - count: bool, - paired: bool, - ) -> Self { + fn new(count_tx: Sender, pool: &ThreadPool, count: bool, paired: bool) -> Self { // TODO: try making this unbounded let (tx, rx): (Sender>, Receiver>) = bounded(WRITER_CHANNEL_SIZE); @@ -106,24 +99,7 @@ impl FastqWriter { while let Ok(reads) = rx.recv() { num_matches += reads.len(); for read in reads { - // Color the reads if desired - let (head, seq, qual) = if let Some(ref matcher) = color_matcher { - let bases_match = matcher.bases_match(&read.seq); - if bases_match { - let (seq, qual) = - matcher.color_matched_bases(&read.seq, &read.qual); - (color_head(&read.head), seq, qual) - } else { - ( - color_background(&read.head), - color_background(&read.seq), - color_background(&read.qual), - ) - } - } else { - (read.head, read.seq, read.qual) - }; - fastq::write_to(&mut writer, &head, &seq, &qual) + fastq::write_to(&mut writer, &read.head, &read.seq, &read.qual) .expect("failed writing read"); } } @@ -386,23 +362,13 @@ fn fqgrep() -> Result { let match_opts = MatcherOpts { invert_match: opts.invert_match, reverse_complement: opts.reverse_complement, + color: opts.color == Color::Always || (opts.color == Color::Auto && stdout_isatty()), }; // The matcher used in the primary search let matcher: Box = MatcherFactory::new_matcher(&pattern, opts.fixed_strings, &opts.regexp, match_opts); - // Some matcher used if the output is to be colored, None otherwise - let color_matcher = { - if opts.color == Color::Always || (opts.color == Color::Auto && stdout_isatty()) { - let matcher: Box = - MatcherFactory::new_matcher(&pattern, opts.fixed_strings, &opts.regexp, match_opts); - Some(matcher) - } else { - None - } - }; - // The thread pool from which threads are spanwed. let pool = rayon::ThreadPoolBuilder::new() .num_threads(opts.threads) @@ -413,7 +379,7 @@ fn fqgrep() -> Result { let (count_tx, count_rx): (Sender, Receiver) = bounded(1); // The writer of final counts or matching records - let writer = FastqWriter::new(count_tx, &pool, color_matcher, opts.count, opts.paired); + let writer = FastqWriter::new(count_tx, &pool, opts.count, opts.paired); // The main loop pool.install(|| { @@ -456,11 +422,11 @@ fn fqgrep() -> Result { // Get the matched reads let matched_reads: Vec = reads .into_par_iter() - .map(|read| -> Option { + .map(|mut read| -> Option { if let Some(progress) = &progress_logger { progress.record(); } - if matcher.read_match(&read) { + if matcher.read_match(&mut read) { Some(read) } else { None @@ -494,7 +460,7 @@ fn process_paired_reads( ) { reads .into_par_iter() - .map(|(read1, read2)| { + .map(|(mut read1, mut read2)| { if let Some(progress) = progress_logger { progress.record(); progress.record(); @@ -505,7 +471,12 @@ fn process_paired_reads( std::str::from_utf8(read1.head()).unwrap(), std::str::from_utf8(read2.head()).unwrap() ); - if matcher.read_match(&read1) || matcher.read_match(&read2) { + // NB: if the output is to be colored, always call read_match on read2, regardless of + // whether or not read1 had a match, so that read2 is always colored. If the output + // isn't to be colored, only search for a match in read2 if read1 does not have a match + let match1 = matcher.read_match(&mut read1); + let match2 = (!matcher.opts().color && match1) || matcher.read_match(&mut read2); + if match1 || match2 { Some((read1, read2)) } else { None From 649fb645271059db39f9dcfa9d68f936036f97b8 Mon Sep 17 00:00:00 2001 From: Nils Homer Date: Wed, 24 Aug 2022 20:33:42 -0700 Subject: [PATCH 17/22] move reader and writer into their own threads --- src/main.rs | 85 +++++++++++++++++++++++++++++------------------------ 1 file changed, 47 insertions(+), 38 deletions(-) diff --git a/src/main.rs b/src/main.rs index a380dbe..fad492a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,6 +2,7 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; use isatty::stdout_isatty; +use std::io::Stdout; use std::process::ExitCode; use std::{ fs::File, @@ -22,7 +23,7 @@ use itertools::{self, izip, Itertools}; use lazy_static::lazy_static; use parking_lot::Mutex; use proglog::{CountFormatterKind, ProgLog, ProgLogBuilder}; -use rayon::{prelude::*, ThreadPool}; +use rayon::prelude::*; use seq_io::fastq::{self, OwnedRecord, Record}; use structopt::{clap::AppSettings, StructOpt}; @@ -71,47 +72,46 @@ struct FastqWriter { } impl FastqWriter { - fn new(count_tx: Sender, pool: &ThreadPool, count: bool, paired: bool) -> Self { + fn new(count_tx: Sender, count: bool, paired: bool) -> Self { // TODO: try making this unbounded let (tx, rx): (Sender>, Receiver>) = bounded(WRITER_CHANNEL_SIZE); - if count { - pool.spawn(move || { - let mut num_matches = 0; - while let Ok(reads) = rx.recv() { - num_matches += reads.len(); + std::thread::spawn(move || { + let mut maybe_writer: Option> = { + if count { + None + } else { + Some(BufWriter::with_capacity(BUFSIZE, std::io::stdout())) } - if paired { - num_matches /= 2; - } - std::io::stdout() - .write_all(format!("{}\n", num_matches).as_bytes()) - .unwrap(); + }; - count_tx - .send(num_matches) - .expect("failed sending final count"); - }); - } else { - pool.spawn(move || { - let mut num_matches = 0; - let mut writer = BufWriter::with_capacity(BUFSIZE, std::io::stdout()); - while let Ok(reads) = rx.recv() { - num_matches += reads.len(); + let mut num_matches = 0; + while let Ok(reads) = rx.recv() { + num_matches += reads.len(); + if let Some(ref mut writer) = maybe_writer { for read in reads { - fastq::write_to(&mut writer, &read.head, &read.seq, &read.qual) + fastq::write_to(&mut *writer, &read.head, &read.seq, &read.qual) .expect("failed writing read"); } - } - if paired { - num_matches /= 2; - } + }; + } + if paired { + num_matches /= 2; + } + + if count { + std::io::stdout() + .write_all(format!("{}\n", num_matches).as_bytes()) + .unwrap(); + } + + if let Some(mut writer) = maybe_writer { writer.flush().expect("Error flushing writer"); - count_tx - .send(num_matches) - .expect("failed sending final count"); - }); - } + }; + count_tx + .send(num_matches) + .expect("failed sending final count"); + }); let lock = Mutex::new(()); Self { tx, lock } } @@ -119,7 +119,7 @@ impl FastqWriter { fn spawn_reader(file: PathBuf, decompress: bool) -> Receiver> { let (tx, rx) = bounded(READER_CHANNEL_SIZE); - rayon::spawn(move || { + std::thread::spawn(move || { // Open the file or standad input let raw_handle = if file.as_os_str() == "-" { Box::new(std::io::stdin()) as Box @@ -163,7 +163,7 @@ arg_enum! { } /// The fqgrep utility searches any given input FASTQ files, selecting records whose bases match -/// one or more patterns. By default, a pattern matches the bases in a FASTQ record if the +/// one or more patterns. By default, a pattern matches the bases in a FASTQ record if the /// regular expression (RE) in the pattern matches the bases. An empty expression matches every /// line. Each FASTQ record that matches at least one of the patterns is written to the standard /// output. @@ -176,6 +176,14 @@ arg_enum! { /// (3) if the `-Z/--decompress` option is specified then any unrecongized inputs (including /// standard input) are assumed to be GZIP compressed. /// +/// THREADS +/// +/// The `--threads` option controls the number of threads used to _search_ the reads. +/// Independently, for single end reads or interleaved paired end reads, a single thread will be +/// used to read each input FASTQ. For paired end reads across pairs of FASTQs, two threads will +/// be used to read the FASTQs for each end of a pair. Finally, a single thread will be created +/// for the writer. +/// /// EXIT STATUS /// /// The fqgrep utility exits with one of the following values: @@ -190,7 +198,8 @@ arg_enum! { ] #[allow(clippy::struct_excessive_bools)] struct Opts { - /// The number of threads to use for matching reads against pattern + /// The number of threads to use for matching reads against pattern. See the full usage for + /// threads specific to reading and writing. #[structopt(long, short = "t", default_value = NUM_CPU.as_str())] threads: usize, @@ -379,7 +388,7 @@ fn fqgrep() -> Result { let (count_tx, count_rx): (Sender, Receiver) = bounded(1); // The writer of final counts or matching records - let writer = FastqWriter::new(count_tx, &pool, opts.count, opts.paired); + let writer = FastqWriter::new(count_tx, opts.count, opts.paired); // The main loop pool.install(|| { @@ -414,7 +423,7 @@ fn fqgrep() -> Result { } } } else { - // Process one FSATQ at a time + // Process one FASTQ at a time for file in files { // The channel FASTQ record chunks are received after being read in let rx = spawn_reader(file.clone(), opts.decompress); From e4224292ee7e0e1e5ecd6d164bfe6f572965b660 Mon Sep 17 00:00:00 2001 From: Nils Homer Date: Sun, 11 Sep 2022 15:26:56 -0700 Subject: [PATCH 18/22] minor updates --- src/lib/color.rs | 20 ++++++++------------ src/lib/matcher.rs | 1 - src/lib/mod.rs | 28 ++++------------------------ 3 files changed, 12 insertions(+), 37 deletions(-) diff --git a/src/lib/color.rs b/src/lib/color.rs index c4c33c8..f23f060 100644 --- a/src/lib/color.rs +++ b/src/lib/color.rs @@ -1,17 +1,13 @@ use ansi_term::Color; -use lazy_static::lazy_static; - -lazy_static! { - /// Color for matching bases in a FASTQ - pub static ref COLOR_BASES: Color = Color::Red; - /// Color for matching base qualities in a FASTQ - pub static ref COLOR_QUALS: Color = Color::Fixed(22); - /// Color for a matching read name (head) of a FASTQ - pub static ref COLOR_HEAD: Color = Color::Fixed(30); - /// Color for all non-matching text in a FASTQ - pub static ref COLOR_BACKGROUND: Color = Color::Fixed(240); -} +/// Color for matching bases in a FASTQ +pub const COLOR_BASES: Color = Color::Red; +/// Color for matching base qualities in a FASTQ +pub const COLOR_QUALS: Color = Color::Fixed(22); +/// Color for a matching read name (head) of a FASTQ +pub const COLOR_HEAD: Color = Color::Fixed(30); +/// Color for all non-matching text in a FASTQ +pub const COLOR_BACKGROUND: Color = Color::Fixed(240); /// Colors the text with the given color pub fn color(text: &[u8], colour: &Color) -> Vec { diff --git a/src/lib/matcher.rs b/src/lib/matcher.rs index 204a99c..9e3c7bc 100644 --- a/src/lib/matcher.rs +++ b/src/lib/matcher.rs @@ -337,7 +337,6 @@ impl Matcher for RegexSetMatcher { } /// Factory for building a matcher -/// pub struct MatcherFactory; impl MatcherFactory { diff --git a/src/lib/mod.rs b/src/lib/mod.rs index 530e56f..aa8451b 100644 --- a/src/lib/mod.rs +++ b/src/lib/mod.rs @@ -10,31 +10,11 @@ pub mod matcher; use lazy_static::lazy_static; use std::{borrow::Borrow, path::Path}; -lazy_static! { - pub static ref DNA_BASES: [u8; 5] = { - let mut bases = [0; 5]; - for (i, base) in b"ACGTN".iter().enumerate() { - bases[i] = *base; - } - bases - }; - - pub static ref IUPAC_BASES: [u8;15] = { - let mut bases = [0; 15]; - for (i, base) in b"AGCTYRWSKMDVHBN".iter().enumerate() { - bases[i] = *base; - } - bases - }; - - pub static ref IUPAC_BASES_COMPLEMENT: [u8;15] = { - let mut bases = [0; 15]; - for (i, base) in b"TCGARYWSMKHBDVN".iter().enumerate() { - bases[i] = *base; - } - bases - }; +pub const DNA_BASES: [u8; 5] = *b"ACGTN"; +pub const IUPAC_BASES: [u8; 15] = *b"AGCTYRWSKMDVHBN"; +pub const IUPAC_BASES_COMPLEMENT: [u8; 15] = *b"TCGARYWSMKHBDVN"; +lazy_static! { pub static ref COMPLEMENT: [u8; 256] = { let mut comp = [0; 256]; for (v, a) in comp.iter_mut().enumerate() { From d8e6e6ab587afebe83268bbd04f01c71ee47c035 Mon Sep 17 00:00:00 2001 From: Nils Homer Date: Sun, 11 Sep 2022 16:25:48 -0700 Subject: [PATCH 19/22] fix coloring with reverse complement --- src/lib/matcher.rs | 65 +++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 59 insertions(+), 6 deletions(-) diff --git a/src/lib/matcher.rs b/src/lib/matcher.rs index 9e3c7bc..90d44d5 100644 --- a/src/lib/matcher.rs +++ b/src/lib/matcher.rs @@ -143,7 +143,7 @@ pub trait Matcher { fn bases_match(&self, bases: &[u8]) -> bool; /// Colors the bases and qualities based on where they match the pattern. All bases - /// that match the patter are colored. Colored in this case means adding ANSI color + /// that match the pattern are colored. Colored in this case means adding ANSI color /// codes for printing to a terminal. fn color_matched_bases(&self, bases: &[u8], quals: &[u8]) -> (Vec, Vec); @@ -193,7 +193,19 @@ impl Matcher for FixedStringMatcher { start, end: start + self.pattern.len(), }); - bases_colored(bases, quals, ranges) + if self.opts().reverse_complement { + let bases_revcomp = &reverse_complement(bases); + let ranges_revcomp = bases_revcomp + .find_iter(&self.pattern) + .map(|start| bases.len() - start - self.pattern.len()) + .map(|start| Range { + start, + end: start + self.pattern.len(), + }); + bases_colored(bases, quals, ranges.chain(ranges_revcomp)) + } else { + bases_colored(bases, quals, ranges) + } } fn opts(&self) -> MatcherOpts { @@ -232,7 +244,22 @@ impl Matcher for FixedStringSetMatcher { }) .collect::>() }); - bases_colored(bases, quals, ranges) + if self.opts().reverse_complement { + let bases_revcomp = &reverse_complement(bases); + let ranges_revcomp = self.patterns.iter().flat_map(|pattern| { + bases_revcomp + .find_iter(&pattern) + .map(|start| bases.len() - start - pattern.len()) + .map(|start| Range { + start, + end: start + pattern.len(), + }) + .collect::>() + }); + bases_colored(bases, quals, ranges.chain(ranges_revcomp)) + } else { + bases_colored(bases, quals, ranges) + } } fn opts(&self) -> MatcherOpts { @@ -277,7 +304,20 @@ impl Matcher for RegexMatcher { fn color_matched_bases(&self, bases: &[u8], quals: &[u8]) -> (Vec, Vec) { let ranges = self.regex.find_iter(bases).map(|m| m.range()); - bases_colored(bases, quals, ranges) + if self.opts().reverse_complement { + let bases_revcomp = &reverse_complement(bases); + let ranges_revcomp = + self.regex + .find_iter(bases_revcomp) + .map(|m| m.range()) + .map(|range| Range { + start: bases.len() - range.start - range.len(), + end: bases.len() - range.start, + }); + bases_colored(bases, quals, ranges.chain(ranges_revcomp)) + } else { + bases_colored(bases, quals, ranges) + } } fn opts(&self) -> MatcherOpts { @@ -327,8 +367,21 @@ impl Matcher for RegexSetMatcher { .regex_matchers .iter() .flat_map(|r| r.regex.find_iter(bases).map(|m| m.range())); - - bases_colored(bases, quals, ranges) + if self.opts().reverse_complement { + let bases_revcomp = &reverse_complement(bases); + let ranges_revcomp = self.regex_matchers.iter().flat_map(|r| { + r.regex + .find_iter(bases_revcomp) + .map(|m| m.range()) + .map(|range| Range { + start: bases.len() - range.start - range.len(), + end: bases.len() - range.start, + }) + }); + bases_colored(bases, quals, ranges.chain(ranges_revcomp)) + } else { + bases_colored(bases, quals, ranges) + } } fn opts(&self) -> MatcherOpts { From bba454cecf8678eb9118fe4fc6d1f983bcd2d276 Mon Sep 17 00:00:00 2001 From: Nils Homer Date: Sun, 11 Sep 2022 16:39:44 -0700 Subject: [PATCH 20/22] fixing invert match --- src/lib/matcher.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/lib/matcher.rs b/src/lib/matcher.rs index 90d44d5..d6055dd 100644 --- a/src/lib/matcher.rs +++ b/src/lib/matcher.rs @@ -151,8 +151,8 @@ pub trait Matcher { fn read_match(&self, read: &mut OwnedRecord) -> bool { let match_found = if self.opts().invert_match { self.bases_match(read.seq()) - && (self.opts().reverse_complement - && self.bases_match(&reverse_complement(read.seq()))) + && (!self.opts().reverse_complement + || self.bases_match(&reverse_complement(read.seq()))) } else { self.bases_match(read.seq()) || (self.opts().reverse_complement From a3f9a674cb05bbb875a14fe3ed834731e6c24ea9 Mon Sep 17 00:00:00 2001 From: tfenne Date: Wed, 19 Oct 2022 14:44:19 -0600 Subject: [PATCH 21/22] Add a rust-toolchain file. --- rust-toolchain.toml | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 rust-toolchain.toml diff --git a/rust-toolchain.toml b/rust-toolchain.toml new file mode 100644 index 0000000..cc8f987 --- /dev/null +++ b/rust-toolchain.toml @@ -0,0 +1,3 @@ +[toolchain] +channel = "1.64.0" +components = ["rustfmt", "clippy"] From d80dc1451f8dfe767e9595c5472842367c2abce8 Mon Sep 17 00:00:00 2001 From: tfenne Date: Thu, 27 Oct 2022 04:25:39 -0600 Subject: [PATCH 22/22] Added standard pre-commit script that runs formatting, clippy and tests. --- precommit.sh | 55 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) create mode 100755 precommit.sh diff --git a/precommit.sh b/precommit.sh new file mode 100755 index 0000000..0b93435 --- /dev/null +++ b/precommit.sh @@ -0,0 +1,55 @@ +#!/usr/bin/env bash + +############################################################################### +# Script that should be run pre-commit after making any changes. +############################################################################### + +set -e + +failures="" + +function banner() { + echo + echo "================================================================================" + echo $* + echo "================================================================================" + echo +} + +##################################################################### +# Takes two parameters, a "name" and a "command". +# Runs the command and prints out whether it succeeded or failed, and +# also tracks a list of failed steps in $failures. +##################################################################### +function run() { + local name=$1 + local cmd=$2 + + banner "Running $name" + set +e + $cmd + exit_code=$? + set -e + + if [[ $exit_code == 0 ]]; then + echo Passed $name + else + echo Failed $name + if [ -z "$failures" ]; then + failures="$name" + else + failures="$failures, $name" + fi + fi +} + +run "cargo fmt" "cargo fmt --all" +run "cargo clippy" "cargo clippy --all-features --locked -- -D warnings" +run "cargo test" "cargo test --locked --quiet" + +if [ -z "$failures" ]; then + banner "Precommit Passed" +else + banner "Precommit Failed with failures in: $failures" + exit 1 +fi