Skip to content

Commit

Permalink
Expose chunkSize parameter in CallDuplexConsensusReads
Browse files Browse the repository at this point in the history
  • Loading branch information
mjhipp committed Jul 30, 2022
1 parent 46d4346 commit f2e63d0
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import com.fulcrumgenomics.FgBioDef._
import com.fulcrumgenomics.bam.OverlappingBasesConsensusCaller
import com.fulcrumgenomics.bam.api.{SamOrder, SamSource, SamWriter}
import com.fulcrumgenomics.cmdline.{ClpGroups, FgBioTool}
import com.fulcrumgenomics.commons.collection.ParIterator.DefaultChunkSize
import com.fulcrumgenomics.sopt.clp
import com.fulcrumgenomics.commons.io.Io
import com.fulcrumgenomics.commons.util.LazyLogging
Expand Down Expand Up @@ -108,6 +109,10 @@ class CallDuplexConsensusReads
val maxReadsPerStrand: Option[Int] = None,
@arg(doc="The number of threads to use while consensus calling.") val threads: Int = 1,
@arg(doc="Consensus call overlapping bases in mapped paired end reads") val consensusCallOverlappingBases: Boolean = true,
@arg(doc="""
|The number of consensus groups to include in the buffer when using multiple threads.
|For input with very large family sizes, a smaller chunk size, such as 2*<threads> may be suitable.
""") val chunkSize: Int = DefaultChunkSize
) extends FgBioTool with LazyLogging {

Io.assertReadable(input)
Expand Down Expand Up @@ -142,7 +147,7 @@ class CallDuplexConsensusReads
maxReadsPerStrand = maxReadsPerStrand.getOrElse(VanillaUmiConsensusCallerOptions.DefaultMaxReads)
)
val progress = ProgressLogger(logger, unit=1000000)
val iterator = new ConsensusCallingIterator(inIter, caller, Some(progress), threads)
val iterator = new ConsensusCallingIterator(inIter, caller, Some(progress), threads, chunkSize)
out ++= iterator
progress.logLast()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import com.fulcrumgenomics.util.ProgressLogger
* @param caller the consensus caller to use to call consensus reads
* @param progress an optional progress logger to which to log progress in input reads
* @param threads the number of threads to use.
* @param chunkSize parallel process in chunkSize units; will cause 8 * chunkSize records to be held in memory
* @param chunkSize parallel process in chunkSize units; will cause chunkSize consensus groups to be held in memory
*/
class ConsensusCallingIterator[ConsensusRead <: SimpleRead](sourceIterator: Iterator[SamRecord],
caller: UmiConsensusCaller[ConsensusRead],
Expand All @@ -67,10 +67,10 @@ class ConsensusCallingIterator[ConsensusRead <: SimpleRead](sourceIterator: Iter
groupingIterator.flatMap(caller.consensusReadsFromSamRecords)
}
else {
ParIterator(groupingIterator, threads=threads).flatMap { rs =>
ParIterator(groupingIterator, threads=threads, chunkSize=chunkSize).map { rs =>
val caller = callers.get()
caller.synchronized { caller.consensusReadsFromSamRecords(rs) }
}.toAsync(chunkSize * 8)
}.toAsync(chunkSize).flatten
}
}

Expand Down

0 comments on commit f2e63d0

Please sign in to comment.