diff --git a/src/main/scala/com/fulcrumgenomics/umi/CallDuplexConsensusReads.scala b/src/main/scala/com/fulcrumgenomics/umi/CallDuplexConsensusReads.scala index 01ee0c6a3..55ad31513 100644 --- a/src/main/scala/com/fulcrumgenomics/umi/CallDuplexConsensusReads.scala +++ b/src/main/scala/com/fulcrumgenomics/umi/CallDuplexConsensusReads.scala @@ -28,6 +28,7 @@ import com.fulcrumgenomics.FgBioDef._ import com.fulcrumgenomics.bam.OverlappingBasesConsensusCaller import com.fulcrumgenomics.bam.api.{SamOrder, SamSource, SamWriter} import com.fulcrumgenomics.cmdline.{ClpGroups, FgBioTool} +import com.fulcrumgenomics.commons.collection.ParIterator.DefaultChunkSize import com.fulcrumgenomics.sopt.clp import com.fulcrumgenomics.commons.io.Io import com.fulcrumgenomics.commons.util.LazyLogging @@ -108,6 +109,10 @@ class CallDuplexConsensusReads val maxReadsPerStrand: Option[Int] = None, @arg(doc="The number of threads to use while consensus calling.") val threads: Int = 1, @arg(doc="Consensus call overlapping bases in mapped paired end reads") val consensusCallOverlappingBases: Boolean = true, + @arg(doc=""" + |The number of consensus groups to include in the buffer when using multiple threads. + |For input with very large family sizes, a smaller chunk size, such as 2* may be suitable. + """) val chunkSize: Int = DefaultChunkSize ) extends FgBioTool with LazyLogging { Io.assertReadable(input) @@ -142,7 +147,7 @@ class CallDuplexConsensusReads maxReadsPerStrand = maxReadsPerStrand.getOrElse(VanillaUmiConsensusCallerOptions.DefaultMaxReads) ) val progress = ProgressLogger(logger, unit=1000000) - val iterator = new ConsensusCallingIterator(inIter, caller, Some(progress), threads) + val iterator = new ConsensusCallingIterator(inIter, caller, Some(progress), threads, chunkSize) out ++= iterator progress.logLast() diff --git a/src/main/scala/com/fulcrumgenomics/umi/ConsensusCallingIterator.scala b/src/main/scala/com/fulcrumgenomics/umi/ConsensusCallingIterator.scala index 449133c01..31ec98174 100644 --- a/src/main/scala/com/fulcrumgenomics/umi/ConsensusCallingIterator.scala +++ b/src/main/scala/com/fulcrumgenomics/umi/ConsensusCallingIterator.scala @@ -40,7 +40,7 @@ import com.fulcrumgenomics.util.ProgressLogger * @param caller the consensus caller to use to call consensus reads * @param progress an optional progress logger to which to log progress in input reads * @param threads the number of threads to use. - * @param chunkSize parallel process in chunkSize units; will cause 8 * chunkSize records to be held in memory + * @param chunkSize parallel process in chunkSize units; will cause chunkSize consensus groups to be held in memory */ class ConsensusCallingIterator[ConsensusRead <: SimpleRead](sourceIterator: Iterator[SamRecord], caller: UmiConsensusCaller[ConsensusRead], @@ -67,10 +67,10 @@ class ConsensusCallingIterator[ConsensusRead <: SimpleRead](sourceIterator: Iter groupingIterator.flatMap(caller.consensusReadsFromSamRecords) } else { - ParIterator(groupingIterator, threads=threads).flatMap { rs => + ParIterator(groupingIterator, threads=threads, chunkSize=threads*2).map { rs => val caller = callers.get() caller.synchronized { caller.consensusReadsFromSamRecords(rs) } - }.toAsync(chunkSize * 8) + }.toAsync(chunkSize * 8).flatten } }