Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose chunkSize parameter in CallDuplexConsensusReads #867

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import com.fulcrumgenomics.FgBioDef._
import com.fulcrumgenomics.bam.OverlappingBasesConsensusCaller
import com.fulcrumgenomics.bam.api.{SamOrder, SamSource, SamWriter}
import com.fulcrumgenomics.cmdline.{ClpGroups, FgBioTool}
import com.fulcrumgenomics.commons.collection.ParIterator.DefaultChunkSize
import com.fulcrumgenomics.sopt.clp
import com.fulcrumgenomics.commons.io.Io
import com.fulcrumgenomics.commons.util.LazyLogging
Expand Down Expand Up @@ -108,6 +109,10 @@ class CallDuplexConsensusReads
val maxReadsPerStrand: Option[Int] = None,
@arg(doc="The number of threads to use while consensus calling.") val threads: Int = 1,
@arg(doc="Consensus call overlapping bases in mapped paired end reads") val consensusCallOverlappingBases: Boolean = true,
@arg(doc="""
|The number of consensus groups to include in the buffer when using multiple threads.
|For input with very large family sizes, a smaller chunk size, such as 2*<threads> may be suitable.
""") val chunkSize: Int = DefaultChunkSize
mjhipp marked this conversation as resolved.
Show resolved Hide resolved
) extends FgBioTool with LazyLogging {

Io.assertReadable(input)
Expand Down Expand Up @@ -142,7 +147,7 @@ class CallDuplexConsensusReads
maxReadsPerStrand = maxReadsPerStrand.getOrElse(VanillaUmiConsensusCallerOptions.DefaultMaxReads)
)
val progress = ProgressLogger(logger, unit=1000000)
val iterator = new ConsensusCallingIterator(inIter, caller, Some(progress), threads)
val iterator = new ConsensusCallingIterator(inIter, caller, Some(progress), threads, chunkSize)
out ++= iterator
progress.logLast()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import com.fulcrumgenomics.util.ProgressLogger
* @param caller the consensus caller to use to call consensus reads
* @param progress an optional progress logger to which to log progress in input reads
* @param threads the number of threads to use.
* @param chunkSize parallel process in chunkSize units; will cause 8 * chunkSize records to be held in memory
* @param chunkSize parallel process in chunkSize units; will cause chunkSize consensus groups to be held in memory
mjhipp marked this conversation as resolved.
Show resolved Hide resolved
*/
class ConsensusCallingIterator[ConsensusRead <: SimpleRead](sourceIterator: Iterator[SamRecord],
caller: UmiConsensusCaller[ConsensusRead],
Expand All @@ -67,10 +67,10 @@ class ConsensusCallingIterator[ConsensusRead <: SimpleRead](sourceIterator: Iter
groupingIterator.flatMap(caller.consensusReadsFromSamRecords)
}
else {
ParIterator(groupingIterator, threads=threads).flatMap { rs =>
ParIterator(groupingIterator, threads=threads, chunkSize=chunkSize).map { rs =>
val caller = callers.get()
caller.synchronized { caller.consensusReadsFromSamRecords(rs) }
}.toAsync(chunkSize * 8)
}.toAsync(chunkSize).flatten
mjhipp marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

@nh13 nh13 Aug 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am confused with why the chunk size passed to toAsync was reduced. Don't we want this value to multiples of the input chunkSize. Furthermore, since the ParIterator is presumably processing threads chunks at any one time, I think this should be chunkSize * threads?

Suggested change
}.toAsync(chunkSize).flatten
}.toAsync(chunkSize * threads).flatten

}
}
Copy link
Contributor Author

@mjhipp mjhipp Jul 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Before, chunkSize parameter was not actually passed down to the ParIterator, only the toAsync part. The default value of 1024 was hard-coded here, which is unexpected as it is an input member of the ConsensusCallingIterator class, and would be expected to be passed on.
  2. My suggestion is to move the flattening of this mapping operation until after the .toAsync call. My reasoning here is that that, according to the documentation, this value should be a multiple of chunkSize. If we flatten before calling toAsync, we are buffering that number of records, rather than that number of tag families, which the input ParIterator is working on.
    • For example, if your chunkSize is 10, and your family size is 100, and we assume two consensus reads per family, each chunk would contain 1000 reads.
    • If we assume each family produces 2 consensus reads, then each chunk would produce 20 reads.
    • If we flatten before .toAsync, then we are using 8*20 reads, which is not a multiple of one chunk of input. In this case, the buffer would not even hold one chunk's worth of input reads in memory.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I convinced myself that flattening after we pull through the parallel iterator is the right move (see pic).

I also agree that the cache of .toAsync should be a multiple of chunkSize and 1 is the smallest multiple.

My only curiosity is if speed is affected by not buffering more consensus before sending them on their way.


Expand Down