diff --git a/Cargo.toml b/Cargo.toml index bce364d..73e01b0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "minimap2" -version = "0.1.21+minimap2.2.28" +version = "0.1.22+minimap2.2.28" edition = "2021" authors = ["Joseph Guhlin "] license = "MIT OR Apache-2.0" @@ -37,6 +37,9 @@ rust-htslib = { version = "0.48", default-features = false, optional = true } [dev-dependencies] rayon = "1.10" +crossbeam = "0.8.4" +clap = { version = "4.5.21", features = ["derive"] } +needletail = { version = "0.6", default-features = false} # The end-user should decide this... # [profile.release] @@ -60,3 +63,13 @@ sse2only = ["minimap2-sys/sse2only"] [package.metadata.docs.rs] features = ["map-file", "htslib"] + +[[example]] +name = "channels" +path = "examples/channels.rs" +doc-scrape-examples = true + +[[example]] +name = "rayon" +path = "examples/rayon.rs" +doc-scrape-examples = true diff --git a/README.md b/README.md index ca2eabc..a481da9 100644 --- a/README.md +++ b/README.md @@ -71,7 +71,27 @@ self.idxopt.flag |= MM_I_HPC as i32; See [full list of options](#minimap2-mapping-and-indexing-options) below. ### Working Example -There is a binary called "fakeminimap2" which demonstrates basic usage and multithreading using mpsc channels. You can find it [in this repo](https://github.com/jguhlin/minimap2-rs/tree/main/fakeminimap2) for an example. + +#### Examples Directory +There are two working examples directly in this repo. In both instances below, 64 is the number of threads to allocate. + +**Channel-based multi-threading** +``` +cargo run --example channels -- reference_genome.fasta reads.fasta 64 +``` + +**Rayon-based multi-threading** +``` +cargo run --example rayon -- reference_genome.fasta reads.fasta 64 +``` + +**Depending on your needs** you can probably do just fine with Rayon. But for very large implementations, interactivity, or limited memory, using channels may be the way to go. + +#### Fakeminimap2 + +There is a binary called "fakeminimap2" which demonstrates basic usage and multithreading using channels or rayon. You can find it [in this repo](https://github.com/jguhlin/minimap2-rs/tree/main/fakeminimap2) for an example. It it much more fully featured example, with an output interface, some mouse support, and interaction. + +#### Code Examples Alignment functions return a [Mapping](https://docs.rs/minimap2/latest/minimap2/struct.Mapping.html) struct. The [Alignment](https://docs.rs/minimap2/latest/minimap2/struct.Alignment.html) struct is only returned when the [Aligner](https://docs.rs/minimap2/latest/minimap2/struct.Aligner.html) is created using [.with_cigar()](https://docs.rs/minimap2/latest/minimap2/struct.Aligner.html#method.with_cigar). @@ -161,6 +181,7 @@ Minimap2 is tested on x86_64 and aarch64 (arm64). Other platforms may work, plea - [HiFiHLA](https://github.com/PacificBiosciences/hifihla) - HLA star-calling tool for PacBio HiFi data - [STRdust](https://github.com/wdecoster/STRdust) - Tandem repeat genotyper for long reads - [oarfish](https://github.com/COMBINE-lab/oarfish) - transcript quantification from long-read RNA-seq data +- [lrge](https://github.com/mbhall88/lrge) - Long Read-based Genome size Estimation from overlaps # Next things todo * Iterator interface for map_file @@ -312,6 +333,10 @@ See [customization](#customization) for how to use these. | `MM_I_NO_NAME` | `4` | Do not store sequence names in the index. | # Changelog +### 0.1.22 minimap2 2.28 +#### Changes ++ Fixed a memory segfault when re-using a thread local buffer. Not sure why it occurs, but this fix seems to solve it. + ### 0.1.21 minimap2 2.28 Contributors to this release: @mbhall88 @rob-p @Sam-Sims @charlesgregory @PB-DB #### Breaking Changes @@ -335,8 +360,6 @@ Contributors to this release: @mbhall88 @rob-p @Sam-Sims @charlesgregory @PB-DB + Static str's and now static CStr's + FIX: memory leak due to sequences allocated by minimap2 not being freed @charlesgregory + Add Send + Sync to Aligner, along with unit test @PB-DB -+ Only use rust-htslib/curl when curl feature is enabled @PB-DB -+ Mark bam::Record objects as supplementary @PB-DB + Experimental Android support (tested on aarch64 and x86_64), solves [#66](https://github.com/jguhlin/minimap2-rs/issues/66) + Added flag and option documents + Added with_gap_open penalty ergonomic function diff --git a/examples/channels.rs b/examples/channels.rs new file mode 100644 index 0000000..775cf02 --- /dev/null +++ b/examples/channels.rs @@ -0,0 +1,196 @@ +use crossbeam::queue::ArrayQueue; +use minimap2::*; +use needletail::{parse_fastx_file, FastxReader}; + +use std::path::PathBuf; +use std::{error::Error, path::Path, sync::Arc, time::Duration}; + +use clap::Parser; + +/// We use a worker queue to pass around work between threads. +/// We do it this way to be generic over the type. +enum WorkQueue { + Work(T), + Result(T), +} + +// Not necessary to make types, but it helps keep things straightforward + +// We work on distinct WorkUnits (aka a single sequence) +type WorkUnit = (Vec, Vec); // Sequence ID, Sequence + +// We return the original sequence, and the vector of mappings (results) +type WorkResult = (WorkUnit, Vec); +// We could also choose to return just the Seq ID, and the Mappings should also have the query name too +// You can return whatever would be most useful + +#[derive(Parser, Debug)] +#[command( + name = "minimap2-channels-example", + about = "An example of how to use the minimap2 crate with multithreading" +)] +struct Cli { + /// The target file to align to (e.g. a reference genome - can be in FASTA, FASTQ, or mmi format) + pub target: PathBuf, + + /// The query file to align (e.g. reads - can be FASTA or FASTQ) + pub query: PathBuf, + + /// The number of threads to use + pub threads: usize, +} + +fn main() { + // Parse command line arguments + let args = Cli::parse(); + + map(args.target, args.query, args.threads).expect("Unable to map"); +} + +fn map( + target_file: impl AsRef, + query_file: impl AsRef, + threads: usize, +) -> Result<(), Box> { + // Aligner gets created using the build pattern. + // Once .with_index is called, the aligner is set to "Built" and can no longer be changed. + println!("Creating index"); + let aligner = Aligner::builder() + .map_ont() + .with_cigar() + .with_index_threads(threads) + .with_index(target_file, None) + .expect("Unable to build index"); + + println!("Index created"); + + // Create a queue for work and for results + let work_queue = Arc::new(ArrayQueue::>::new(1024)); + let results_queue = Arc::new(ArrayQueue::>::new(1024)); + + // I use a shutdown flag + let shutdown = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)); + + // Store join handles, it's just good practice to clean up threads + let mut jh = Vec::new(); + + let aligner = Arc::new(aligner); + + // Spin up the threads + for _ in 0..threads { + // Clone everything we will need... + let work_queue = Arc::clone(&work_queue); + let results_queue = Arc::clone(&results_queue); + let shutdown = Arc::clone(&shutdown); + let aligner = Arc::clone(&aligner); + + let handle = + std::thread::spawn(move || worker(work_queue, results_queue, shutdown, aligner)); + + jh.push(handle); + } + + // Let's split this into another thread + + { + let work_queue = Arc::clone(&work_queue); + let shutdown = Arc::clone(&shutdown); + let query_file = query_file.as_ref().to_path_buf(); + + let handle = std::thread::spawn(move || { + // Now that the threads are running, read the input file and push the work to the queue + let mut reader: Box = + parse_fastx_file(query_file).unwrap_or_else(|_| panic!("Can't find query FASTA file")); + + // I just do this in the main thread, but you can split threads + let backoff = crossbeam::utils::Backoff::new(); + while let Some(Ok(record)) = reader.next() { + let mut work = WorkQueue::Work((record.id().to_vec(), record.seq().to_vec())); + while let Err(work_packet) = work_queue.push(work) { + work = work_packet; // Simple way to maintain ownership + // If we have an error, it's 99% because the queue is full + backoff.snooze(); + } + } + + // Set the shutdown flag + shutdown.store(true, std::sync::atomic::Ordering::Relaxed); + }); + + jh.push(handle); + } + + let mut num_alignments = 0; + + let backoff = crossbeam::utils::Backoff::new(); + loop { + match results_queue.pop() { + // This is where we processs mapping results as they come in... + Some(WorkQueue::Result((record, alignments))) => { + num_alignments += alignments.len(); + + } + Some(_) => unimplemented!("Unexpected result type"), + None => { + backoff.snooze(); + + // If all join handles are finished, we can break + if jh.iter().all(|h| h.is_finished()) { + break; + } + if backoff.is_completed() { + backoff.reset(); + std::thread::sleep(Duration::from_millis(3)); + } + } + } + } + + // Join all the threads + for handle in jh { + handle.join().expect("Unable to join thread"); + } + + println!("Total alignments: {}", num_alignments); + + Ok(()) +} + +// Convert this to a function +fn worker( + work_queue: Arc>>, + results_queue: Arc>>, + shutdown: Arc, + aligner: Arc>, +) { + loop { + // We use backoff to sleep when we don't have any work + let backoff = crossbeam::utils::Backoff::new(); + + match work_queue.pop() { + Some(WorkQueue::Work(sequence)) => { + let alignment = aligner + .map(&sequence.1, true, false, None, None, Some(&sequence.0)) + .expect("Unable to align"); + + // Return the original sequence, as well as the mappings + // We have to do it this way because ownership + let mut result = WorkQueue::Result((sequence, alignment)); + while let Err(result_packet) = results_queue.push(result) { + result = result_packet; // Simple way to maintain ownership + // If we have an error, it's 99% because the queue is full + backoff.snooze(); + } + } + Some(_) => unimplemented!("Unexpected work type"), + None => { + backoff.snooze(); + + // If we have the shutdown signal, we should exit + if shutdown.load(std::sync::atomic::Ordering::Relaxed) && work_queue.is_empty() { + break; + } + } + } + } +} diff --git a/examples/rayon.rs b/examples/rayon.rs new file mode 100644 index 0000000..8f68070 --- /dev/null +++ b/examples/rayon.rs @@ -0,0 +1,80 @@ +use minimap2::*; +use needletail::{parse_fastx_file, FastxReader}; +use rayon::prelude::*; +use clap::Parser; + +use std::path::PathBuf; +use std::{error::Error, path::Path}; + +#[derive(Parser, Debug)] +#[command( + name = "minimap2-channels-example", + about = "An example of how to use the minimap2 crate with multithreading" +)] +struct Cli { + /// The target file to align to (e.g. a reference genome - can be in FASTA, FASTQ, or mmi format) + pub target: PathBuf, + + /// The query file to align (e.g. reads - can be FASTA or FASTQ) + pub query: PathBuf, + + /// The number of threads to use + pub threads: usize, +} + +fn main() { + // Parse command line arguments + let args = Cli::parse(); + + map(args.target, args.query, args.threads).expect("Unable to map"); +} + +fn map( + target_file: impl AsRef, + query_file: impl AsRef, + threads: usize, +) -> Result<(), Box> { + // Set the number of threads to use + rayon::ThreadPoolBuilder::new() + .num_threads(threads) + .build_global() + .expect("Unable to set number of threads"); + + println!("Creating index"); + + // Aligner gets created using the build pattern. + // Once .with_index is called, the aligner is set to "Built" and can no longer be changed. + let aligner = Aligner::builder() + .map_ont() + .with_cigar() + .with_index_threads(threads) // Minimap2 uses it's own thread pool for index building + .with_index(target_file, None) + .expect("Unable to build index"); + + println!("Index created"); + + // Read in the query file + let mut reader = parse_fastx_file(query_file)?; + + let mut queries: Vec<(Vec, Vec)> = Vec::new(); + while let Some(record) = reader.next() { + let record = record.expect("Error reading record"); + queries.push((record.id().to_vec(), record.seq().to_vec())); + } + + // Map the queries + let results: Vec> = queries + .par_iter() + .map(|(id, seq)| { + aligner + .map(&seq, false, false, None, None, Some(&id)) + .expect("Error mapping") + }) + .collect(); + + // Count total number of alignments + let total_alignments: usize = results.iter().map(|x| x.len()).sum(); + println!("Iteration complete, total alignments {}", total_alignments); + + Ok(()) +} diff --git a/src/lib.rs b/src/lib.rs index 71e198d..f07ca0a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -223,8 +223,8 @@ thread_local! { #[derive(Debug)] struct ThreadLocalBuffer { buf: *mut mm_tbuf_t, - max_uses: usize, - uses: usize, + // max_uses: usize, + // uses: usize, } impl ThreadLocalBuffer { @@ -232,22 +232,22 @@ impl ThreadLocalBuffer { let buf = unsafe { mm_tbuf_init() }; Self { buf, - max_uses: 15, - uses: 0, + // max_uses: 15, + // uses: 0, } } /// Return the buffer, checking how many times it has been borrowed. /// Free the memory of the old buffer and reinitialise a new one If /// num_uses exceeds max_uses. pub fn get_buf(&mut self) -> *mut mm_tbuf_t { - if self.uses > self.max_uses { + /* if self.uses > self.max_uses { // println!("renewing threadbuffer"); self.free_buffer(); let buf = unsafe { mm_tbuf_init() }; self.buf = buf; self.uses = 0; } - self.uses += 1; + self.uses += 1; */ self.buf } @@ -259,7 +259,8 @@ impl ThreadLocalBuffer { /// Handle destruction of thread local buffer properly. impl Drop for ThreadLocalBuffer { fn drop(&mut self) { - unsafe { mm_tbuf_destroy(self.buf) }; + // unsafe { mm_tbuf_destroy(self.buf) }; + self.free_buffer(); } } @@ -580,7 +581,7 @@ where // Make sure MM_F_CIGAR flag isn't already set assert!((self.mapopt.flag & MM_F_CIGAR as i64) == 0); - self.mapopt.flag |= MM_F_CIGAR as i64; + self.mapopt.flag |= MM_F_CIGAR as i64 | MM_F_OUT_CS as i64; self } @@ -605,12 +606,11 @@ where self } - - /// Sets the gap open penalty for minimap2. - /// + /// Sets the gap open penalty for minimap2. + /// /// minimap2 -O 4 sets both the short and long gap open penalty to 4. /// [minimap2 code](https://github.com/lh3/minimap2/blob/618d33515e5853c4576d5a3d126fdcda28f0e8a4/main.c#L315) - /// + /// /// To set the long gap open penalty, simply provide a value for `penalty_long`. pub fn with_gap_open_penalty(mut self, penalty: i32, penalty_long: Option) -> Self { self.mapopt.q = penalty; @@ -938,7 +938,7 @@ impl Aligner { &self, seq: &[u8], cs: bool, - md: bool, // TODO + md: bool, max_frag_len: Option, extra_flags: Option<&[u64]>, query_name: Option<&[u8]>, @@ -995,8 +995,8 @@ impl Aligner { Some(qname) => qname.as_ref().as_ptr() as *const ::std::os::raw::c_char, }; - let mappings = BUF.with(|buf| { - let km = unsafe { mm_tbuf_get_km(buf.borrow_mut().get_buf()) }; + let mappings = BUF.with_borrow_mut(|buf| { + let km: *mut c_void = unsafe { mm_tbuf_get_km(buf.get_buf()) }; mm_reg = MaybeUninit::new(unsafe { mm_map( @@ -1004,7 +1004,7 @@ impl Aligner { seq.len() as i32, seq.as_ptr() as *const ::std::os::raw::c_char, &mut n_regs, - buf.borrow_mut().get_buf(), + buf.get_buf(), &map_opt, qname, ) @@ -1014,9 +1014,9 @@ impl Aligner { for i in 0..n_regs { unsafe { - let reg_ptr = (*mm_reg.as_ptr()).offset(i as isize); - let const_ptr = reg_ptr as *const mm_reg1_t; - let reg: mm_reg1_t = *reg_ptr; + let mm_reg1_mut_ptr = (*mm_reg.as_ptr()).offset(i as isize); + let mm_reg1_const_ptr = mm_reg1_mut_ptr as *const mm_reg1_t; + let reg: mm_reg1_t = *mm_reg1_mut_ptr; let idx = Arc::as_ptr(self.idx.as_ref().unwrap()); let contig = @@ -1113,46 +1113,64 @@ impl Aligner { }; let (cs_str, md_str) = if cs || md { - let mut cs_string: *mut libc::c_char = std::ptr::null_mut(); - let mut m_cs_string: libc::c_int = 0i32; + let idx: *const mm_idx_t = *Arc::as_ptr(self.idx.as_ref().unwrap()); let cs_str = if cs { + let mut cs_string: *mut libc::c_char = std::ptr::null_mut(); + let mut m_cs_string: libc::c_int = 0i32; + + // This solves a weird segfault... + let km = km_init(); + let _cs_len = mm_gen_cs( km, &mut cs_string, &mut m_cs_string, - &**self.idx.as_ref().unwrap().as_ref() as *const mm_idx_t, - const_ptr, + idx, + mm_reg1_const_ptr, seq.as_ptr() as *const libc::c_char, true.into(), ); + let _cs_string = std::ffi::CStr::from_ptr(cs_string) .to_str() .unwrap() .to_string(); + + libc::free(cs_string as *mut c_void); + km_destroy(km); Some(_cs_string) } else { None }; let md_str = if md { + let mut cs_string: *mut libc::c_char = std::ptr::null_mut(); + let mut m_cs_string: libc::c_int = 0i32; + + // This solves a weird segfault... + let km = km_init(); + let _md_len = mm_gen_MD( km, &mut cs_string, &mut m_cs_string, - &**self.idx.as_ref().unwrap().as_ref() as *const mm_idx_t, - const_ptr, + idx, + mm_reg1_const_ptr, seq.as_ptr() as *const libc::c_char, ); let _md_string = std::ffi::CStr::from_ptr(cs_string) .to_str() .unwrap() .to_string(); + + libc::free(cs_string as *mut c_void); + km_destroy(km); Some(_md_string) } else { None }; - libc::free(cs_string as *mut c_void); + (cs_str, md_str) } else { (None, None) @@ -1207,6 +1225,7 @@ impl Aligner { }); // free some stuff here unsafe { + // Free mm_regs let ptr: *mut mm_reg1_t = mm_reg.assume_init(); let c_void_ptr: *mut c_void = ptr as *mut c_void; libc::free(c_void_ptr);