Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose chunkSize parameter in CallDuplexConsensusReads #867

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import com.fulcrumgenomics.FgBioDef._
import com.fulcrumgenomics.bam.OverlappingBasesConsensusCaller
import com.fulcrumgenomics.bam.api.{SamOrder, SamSource, SamWriter}
import com.fulcrumgenomics.cmdline.{ClpGroups, FgBioTool}
import com.fulcrumgenomics.commons.collection.ParIterator.DefaultChunkSize
import com.fulcrumgenomics.sopt.clp
import com.fulcrumgenomics.commons.io.Io
import com.fulcrumgenomics.commons.util.LazyLogging
Expand Down Expand Up @@ -108,6 +109,12 @@ class CallDuplexConsensusReads
val maxReadsPerStrand: Option[Int] = None,
@arg(doc="The number of threads to use while consensus calling.") val threads: Int = 1,
@arg(doc="Consensus call overlapping bases in mapped paired end reads") val consensusCallOverlappingBases: Boolean = true,
@arg(doc="""
|Pull reads from this many source molecules into memory for multi-threaded processing.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not strictly true, since the ParIterator will have threads * maxSourceMoleculesInMemory, and so will the AsyncIterator that wraps it, so it should be 2 * threads * maxSourceMoleculesInMemory I think.

|Using a smaller value will require less memory but will negatively impact processing speed.
|For very large family sizes, a smaller value may be necessary to reduce memory usage.
|This value is only used when `--threads > 1`.
""") val maxSourceMoleculesInMemory: Int = DefaultChunkSize
mjhipp marked this conversation as resolved.
Show resolved Hide resolved
) extends FgBioTool with LazyLogging {

Io.assertReadable(input)
Expand Down Expand Up @@ -142,7 +149,7 @@ class CallDuplexConsensusReads
maxReadsPerStrand = maxReadsPerStrand.getOrElse(VanillaUmiConsensusCallerOptions.DefaultMaxReads)
)
val progress = ProgressLogger(logger, unit=1000000)
val iterator = new ConsensusCallingIterator(inIter, caller, Some(progress), threads)
val iterator = new ConsensusCallingIterator(inIter, caller, Some(progress), threads, maxSourceMoleculesInMemory)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add the same to CallMolecularConsensusReads? Likely it is not immune from the same issue, so it'd be nice to control there too (and have symmetry of options).

out ++= iterator
progress.logLast()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import com.fulcrumgenomics.util.ProgressLogger
* @param caller the consensus caller to use to call consensus reads
* @param progress an optional progress logger to which to log progress in input reads
* @param threads the number of threads to use.
* @param chunkSize parallel process in chunkSize units; will cause 8 * chunkSize records to be held in memory
* @param chunkSize across the input [[SamRecord]]s from this many source molecules at a time
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new param doc doesn't track for me. Based on my comment below:

Suggested change
* @param chunkSize across the input [[SamRecord]]s from this many source molecules at a time
* @param chunkSize the number of source molecules to process in a single chunk, where chunks
are processed in parallel. This will cause `2 * threads * chunkSize` source
molecules to be held in memory, where the number of [[SamRecord]]s
held in memory depends on the number of [[SamRecord]]s assigned to
each source molecule at any given time.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is true, then presumably in CallDuplexConsensusReads the number given should be divided before supplying here? I think it will be very confusing to say "only hold 10 source molecules worth of reads in memory" and then find out that with 8 threads I've got 160 molecules worth.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tfenne @nh13 Many of these comments, here and elsewhere in the review, go against my interpretation of how ParIterator works. The way I understand it, these iterable functions operate on a single chunk at a time (not counting .toAsync), using the multi-threaded task support, rather then grabbing one chunk per-thread:

https://github.com/fulcrumgenomics/commons/blob/9372c5585084a1d7486c1b6036d0e06824b2c8d4/src/main/scala/com/fulcrumgenomics/commons/collection/ParIterator.scala#L140-L141

Only one chunk is pulled into memory at a time at the "front" of the ParIterator (it does not mention threads * chunkSize:

https://github.com/fulcrumgenomics/commons/blob/9372c5585084a1d7486c1b6036d0e06824b2c8d4/src/main/scala/com/fulcrumgenomics/commons/collection/ParIterator.scala#L119-L120

The parWith function splits a single chunk across threads:

https://github.com/fulcrumgenomics/commons/blob/9372c5585084a1d7486c1b6036d0e06824b2c8d4/src/main/scala/com/fulcrumgenomics/commons/CommonsDef.scala#L360-L364

It was based on this interpretation that I do not divide by threads before parallelizing. Can you please let me know if I have missed something or where my interpretation is wrong?

*/
class ConsensusCallingIterator[ConsensusRead <: SimpleRead](sourceIterator: Iterator[SamRecord],
caller: UmiConsensusCaller[ConsensusRead],
Expand All @@ -67,10 +67,11 @@ class ConsensusCallingIterator[ConsensusRead <: SimpleRead](sourceIterator: Iter
groupingIterator.flatMap(caller.consensusReadsFromSamRecords)
}
else {
ParIterator(groupingIterator, threads=threads).flatMap { rs =>
ParIterator(groupingIterator, threads=threads, chunkSize=chunkSize).map { rs =>
val caller = callers.get()
caller.synchronized { caller.consensusReadsFromSamRecords(rs) }
}.toAsync(chunkSize * 8)
}.toAsync(chunkSize).flatten
mjhipp marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

@nh13 nh13 Aug 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am confused with why the chunk size passed to toAsync was reduced. Don't we want this value to multiples of the input chunkSize. Furthermore, since the ParIterator is presumably processing threads chunks at any one time, I think this should be chunkSize * threads?

Suggested change
}.toAsync(chunkSize).flatten
}.toAsync(chunkSize * threads).flatten

// Flatten AFTER pulling through ParIterator to keep input chunks in phase with output
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand this correctly, we previously got chunks out of order in some cases?

Suggested change
// Flatten AFTER pulling through ParIterator to keep input chunks in phase with output
// Flatten AFTER pulling through ParIterator to keep the iterator in a stable order

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The concern is not about records out of order, but about the number of chunks that are getting pulled through the iterator at a time. Clint's diagram sums it up better I think

#867 (comment)

In this instance, I am referring to a "source molecule" as all records within a group output by GroupReadsByUmi.

The goal of the change in flattening is that the same number of source molecules (not SAM records) are pulled through the end as the beginning. There will be many fewer SAM records after flattening, since we go from <group size> -> 2.

If we flatten before, and use .toAsync on chunkSize records (not groups), then there would be roughly half a chunk worth of groups getting pulled through .toAsync for every chunk that goes in. By flattening before, we are pulling the same number of groups through the async iterator at a time as the ParIterator.

}
}

Expand Down