Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix-Memory-Bug #104

Merged
merged 4 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "minimap2"
version = "0.1.21+minimap2.2.28"
version = "0.1.22+minimap2.2.28"
edition = "2021"
authors = ["Joseph Guhlin <[email protected]>"]
license = "MIT OR Apache-2.0"
Expand Down Expand Up @@ -37,6 +37,9 @@ rust-htslib = { version = "0.48", default-features = false, optional = true }

[dev-dependencies]
rayon = "1.10"
crossbeam = "0.8.4"
clap = { version = "4.5.21", features = ["derive"] }
needletail = { version = "0.6", default-features = false}

# The end-user should decide this...
# [profile.release]
Expand All @@ -60,3 +63,13 @@ sse2only = ["minimap2-sys/sse2only"]

[package.metadata.docs.rs]
features = ["map-file", "htslib"]

[[example]]
name = "channels"
path = "examples/channels.rs"
doc-scrape-examples = true

[[example]]
name = "rayon"
path = "examples/rayon.rs"
doc-scrape-examples = true
29 changes: 26 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,27 @@ self.idxopt.flag |= MM_I_HPC as i32;
See [full list of options](#minimap2-mapping-and-indexing-options) below.

### Working Example
There is a binary called "fakeminimap2" which demonstrates basic usage and multithreading using mpsc channels. You can find it [in this repo](https://github.com/jguhlin/minimap2-rs/tree/main/fakeminimap2) for an example.

#### Examples Directory
There are two working examples directly in this repo. In both instances below, 64 is the number of threads to allocate.

**Channel-based multi-threading**
```
cargo run --example channels -- reference_genome.fasta reads.fasta 64
```

**Rayon-based multi-threading**
```
cargo run --example rayon -- reference_genome.fasta reads.fasta 64
```

**Depending on your needs** you can probably do just fine with Rayon. But for very large implementations, interactivity, or limited memory, using channels may be the way to go.

#### Fakeminimap2

There is a binary called "fakeminimap2" which demonstrates basic usage and multithreading using channels or rayon. You can find it [in this repo](https://github.com/jguhlin/minimap2-rs/tree/main/fakeminimap2) for an example. It it much more fully featured example, with an output interface, some mouse support, and interaction.

#### Code Examples

Alignment functions return a [Mapping](https://docs.rs/minimap2/latest/minimap2/struct.Mapping.html) struct. The [Alignment](https://docs.rs/minimap2/latest/minimap2/struct.Alignment.html) struct is only returned when the [Aligner](https://docs.rs/minimap2/latest/minimap2/struct.Aligner.html) is created using [.with_cigar()](https://docs.rs/minimap2/latest/minimap2/struct.Aligner.html#method.with_cigar).

Expand Down Expand Up @@ -161,6 +181,7 @@ Minimap2 is tested on x86_64 and aarch64 (arm64). Other platforms may work, plea
- [HiFiHLA](https://github.com/PacificBiosciences/hifihla) - HLA star-calling tool for PacBio HiFi data
- [STRdust](https://github.com/wdecoster/STRdust) - Tandem repeat genotyper for long reads
- [oarfish](https://github.com/COMBINE-lab/oarfish) - transcript quantification from long-read RNA-seq data
- [lrge](https://github.com/mbhall88/lrge) - Long Read-based Genome size Estimation from overlaps

# Next things todo
* Iterator interface for map_file
Expand Down Expand Up @@ -312,6 +333,10 @@ See [customization](#customization) for how to use these.
| `MM_I_NO_NAME` | `4` | Do not store sequence names in the index. |

# Changelog
### 0.1.22 minimap2 2.28
#### Changes
+ Fixed a memory segfault when re-using a thread local buffer. Not sure why it occurs, but this fix seems to solve it.

### 0.1.21 minimap2 2.28
Contributors to this release: @mbhall88 @rob-p @Sam-Sims @charlesgregory @PB-DB
#### Breaking Changes
Expand All @@ -335,8 +360,6 @@ Contributors to this release: @mbhall88 @rob-p @Sam-Sims @charlesgregory @PB-DB
+ Static str's and now static CStr's
+ FIX: memory leak due to sequences allocated by minimap2 not being freed @charlesgregory
+ Add Send + Sync to Aligner, along with unit test @PB-DB
+ Only use rust-htslib/curl when curl feature is enabled @PB-DB
+ Mark bam::Record objects as supplementary @PB-DB
+ Experimental Android support (tested on aarch64 and x86_64), solves [#66](https://github.com/jguhlin/minimap2-rs/issues/66)
+ Added flag and option documents
+ Added with_gap_open penalty ergonomic function
Expand Down
196 changes: 196 additions & 0 deletions examples/channels.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
use crossbeam::queue::ArrayQueue;
use minimap2::*;
use needletail::{parse_fastx_file, FastxReader};

use std::path::PathBuf;
use std::{error::Error, path::Path, sync::Arc, time::Duration};

use clap::Parser;

/// We use a worker queue to pass around work between threads.
/// We do it this way to be generic over the type.
enum WorkQueue<T> {
Work(T),
Result(T),
}

// Not necessary to make types, but it helps keep things straightforward

// We work on distinct WorkUnits (aka a single sequence)
type WorkUnit = (Vec<u8>, Vec<u8>); // Sequence ID, Sequence

// We return the original sequence, and the vector of mappings (results)
type WorkResult = (WorkUnit, Vec<Mapping>);
// We could also choose to return just the Seq ID, and the Mappings should also have the query name too
// You can return whatever would be most useful

#[derive(Parser, Debug)]
#[command(
name = "minimap2-channels-example",
about = "An example of how to use the minimap2 crate with multithreading"
)]
struct Cli {
/// The target file to align to (e.g. a reference genome - can be in FASTA, FASTQ, or mmi format)
pub target: PathBuf,

/// The query file to align (e.g. reads - can be FASTA or FASTQ)
pub query: PathBuf,

/// The number of threads to use
pub threads: usize,
}

fn main() {
// Parse command line arguments
let args = Cli::parse();

map(args.target, args.query, args.threads).expect("Unable to map");
}

fn map(
target_file: impl AsRef<Path>,
query_file: impl AsRef<Path>,
threads: usize,
) -> Result<(), Box<dyn Error>> {
// Aligner gets created using the build pattern.
// Once .with_index is called, the aligner is set to "Built" and can no longer be changed.
println!("Creating index");
let aligner = Aligner::builder()
.map_ont()
.with_cigar()
.with_index_threads(threads)
.with_index(target_file, None)
.expect("Unable to build index");

println!("Index created");

// Create a queue for work and for results
let work_queue = Arc::new(ArrayQueue::<WorkQueue<WorkUnit>>::new(1024));
let results_queue = Arc::new(ArrayQueue::<WorkQueue<WorkResult>>::new(1024));

// I use a shutdown flag
let shutdown = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));

// Store join handles, it's just good practice to clean up threads
let mut jh = Vec::new();

let aligner = Arc::new(aligner);

// Spin up the threads
for _ in 0..threads {
// Clone everything we will need...
let work_queue = Arc::clone(&work_queue);
let results_queue = Arc::clone(&results_queue);
let shutdown = Arc::clone(&shutdown);
let aligner = Arc::clone(&aligner);

let handle =
std::thread::spawn(move || worker(work_queue, results_queue, shutdown, aligner));

jh.push(handle);
}

// Let's split this into another thread

{
let work_queue = Arc::clone(&work_queue);
let shutdown = Arc::clone(&shutdown);
let query_file = query_file.as_ref().to_path_buf();

let handle = std::thread::spawn(move || {
// Now that the threads are running, read the input file and push the work to the queue
let mut reader: Box<dyn FastxReader> =
parse_fastx_file(query_file).unwrap_or_else(|_| panic!("Can't find query FASTA file"));

// I just do this in the main thread, but you can split threads
let backoff = crossbeam::utils::Backoff::new();
while let Some(Ok(record)) = reader.next() {
let mut work = WorkQueue::Work((record.id().to_vec(), record.seq().to_vec()));
while let Err(work_packet) = work_queue.push(work) {
work = work_packet; // Simple way to maintain ownership
// If we have an error, it's 99% because the queue is full
backoff.snooze();
}
}

// Set the shutdown flag
shutdown.store(true, std::sync::atomic::Ordering::Relaxed);
});

jh.push(handle);
}

let mut num_alignments = 0;

let backoff = crossbeam::utils::Backoff::new();
loop {
match results_queue.pop() {
// This is where we processs mapping results as they come in...
Some(WorkQueue::Result((record, alignments))) => {
num_alignments += alignments.len();

}
Some(_) => unimplemented!("Unexpected result type"),
None => {
backoff.snooze();

// If all join handles are finished, we can break
if jh.iter().all(|h| h.is_finished()) {
break;
}
if backoff.is_completed() {
backoff.reset();
std::thread::sleep(Duration::from_millis(3));
}
}
}
}

// Join all the threads
for handle in jh {
handle.join().expect("Unable to join thread");
}

println!("Total alignments: {}", num_alignments);

Ok(())
}

// Convert this to a function
fn worker(
work_queue: Arc<ArrayQueue<WorkQueue<WorkUnit>>>,
results_queue: Arc<ArrayQueue<WorkQueue<WorkResult>>>,
shutdown: Arc<std::sync::atomic::AtomicBool>,
aligner: Arc<Aligner<Built>>,
) {
loop {
// We use backoff to sleep when we don't have any work
let backoff = crossbeam::utils::Backoff::new();

match work_queue.pop() {
Some(WorkQueue::Work(sequence)) => {
let alignment = aligner
.map(&sequence.1, true, false, None, None, Some(&sequence.0))
.expect("Unable to align");

// Return the original sequence, as well as the mappings
// We have to do it this way because ownership
let mut result = WorkQueue::Result((sequence, alignment));
while let Err(result_packet) = results_queue.push(result) {
result = result_packet; // Simple way to maintain ownership
// If we have an error, it's 99% because the queue is full
backoff.snooze();
}
}
Some(_) => unimplemented!("Unexpected work type"),
None => {
backoff.snooze();

// If we have the shutdown signal, we should exit
if shutdown.load(std::sync::atomic::Ordering::Relaxed) && work_queue.is_empty() {
break;
}
}
}
}
}
80 changes: 80 additions & 0 deletions examples/rayon.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
use minimap2::*;
use needletail::{parse_fastx_file, FastxReader};
use rayon::prelude::*;
use clap::Parser;

use std::path::PathBuf;
use std::{error::Error, path::Path};

#[derive(Parser, Debug)]
#[command(
name = "minimap2-channels-example",
about = "An example of how to use the minimap2 crate with multithreading"
)]
struct Cli {
/// The target file to align to (e.g. a reference genome - can be in FASTA, FASTQ, or mmi format)
pub target: PathBuf,

/// The query file to align (e.g. reads - can be FASTA or FASTQ)
pub query: PathBuf,

/// The number of threads to use
pub threads: usize,
}

fn main() {
// Parse command line arguments
let args = Cli::parse();

map(args.target, args.query, args.threads).expect("Unable to map");
}

fn map(
target_file: impl AsRef<Path>,
query_file: impl AsRef<Path>,
threads: usize,
) -> Result<(), Box<dyn Error>> {
// Set the number of threads to use
rayon::ThreadPoolBuilder::new()
.num_threads(threads)
.build_global()
.expect("Unable to set number of threads");

println!("Creating index");

// Aligner gets created using the build pattern.
// Once .with_index is called, the aligner is set to "Built" and can no longer be changed.
let aligner = Aligner::builder()
.map_ont()
.with_cigar()
.with_index_threads(threads) // Minimap2 uses it's own thread pool for index building
.with_index(target_file, None)
.expect("Unable to build index");

println!("Index created");

// Read in the query file
let mut reader = parse_fastx_file(query_file)?;

let mut queries: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
while let Some(record) = reader.next() {
let record = record.expect("Error reading record");
queries.push((record.id().to_vec(), record.seq().to_vec()));
}

// Map the queries
let results: Vec<Vec<Mapping>> = queries
.par_iter()
.map(|(id, seq)| {
aligner
.map(&seq, false, false, None, None, Some(&id))
.expect("Error mapping")
})
.collect();

// Count total number of alignments
let total_alignments: usize = results.iter().map(|x| x.len()).sum();
println!("Iteration complete, total alignments {}", total_alignments);

Ok(())
}
Loading
Loading