diff --git a/src/main/scala/com/fulcrumgenomics/umi/CallDuplexConsensusReads.scala b/src/main/scala/com/fulcrumgenomics/umi/CallDuplexConsensusReads.scala index 55ad31513..a8b794ca0 100644 --- a/src/main/scala/com/fulcrumgenomics/umi/CallDuplexConsensusReads.scala +++ b/src/main/scala/com/fulcrumgenomics/umi/CallDuplexConsensusReads.scala @@ -110,9 +110,11 @@ class CallDuplexConsensusReads @arg(doc="The number of threads to use while consensus calling.") val threads: Int = 1, @arg(doc="Consensus call overlapping bases in mapped paired end reads") val consensusCallOverlappingBases: Boolean = true, @arg(doc=""" - |The number of consensus groups to include in the buffer when using multiple threads. - |For input with very large family sizes, a smaller chunk size, such as 2* may be suitable. - """) val chunkSize: Int = DefaultChunkSize + |Pull reads from this many source molecules into memory for multi-threaded processing. + |Using a smaller value will require less memory but will negatively impact processing speed. + |For very large family sizes, a smaller value may be necessary to reduce memory usage. + |This value is only used when `--threads > 1`. + """) val maxSourceMoleculesInMemory: Int = DefaultChunkSize ) extends FgBioTool with LazyLogging { Io.assertReadable(input) @@ -147,7 +149,7 @@ class CallDuplexConsensusReads maxReadsPerStrand = maxReadsPerStrand.getOrElse(VanillaUmiConsensusCallerOptions.DefaultMaxReads) ) val progress = ProgressLogger(logger, unit=1000000) - val iterator = new ConsensusCallingIterator(inIter, caller, Some(progress), threads, chunkSize) + val iterator = new ConsensusCallingIterator(inIter, caller, Some(progress), threads, maxSourceMoleculesInMemory) out ++= iterator progress.logLast() diff --git a/src/main/scala/com/fulcrumgenomics/umi/ConsensusCallingIterator.scala b/src/main/scala/com/fulcrumgenomics/umi/ConsensusCallingIterator.scala index 9b752669b..5ca2eafbe 100644 --- a/src/main/scala/com/fulcrumgenomics/umi/ConsensusCallingIterator.scala +++ b/src/main/scala/com/fulcrumgenomics/umi/ConsensusCallingIterator.scala @@ -40,7 +40,7 @@ import com.fulcrumgenomics.util.ProgressLogger * @param caller the consensus caller to use to call consensus reads * @param progress an optional progress logger to which to log progress in input reads * @param threads the number of threads to use. - * @param chunkSize parallel process in chunkSize units; will cause chunkSize consensus groups to be held in memory + * @param chunkSize across the input [[SamRecord]]s from this many source molecules at a time */ class ConsensusCallingIterator[ConsensusRead <: SimpleRead](sourceIterator: Iterator[SamRecord], caller: UmiConsensusCaller[ConsensusRead], @@ -71,6 +71,7 @@ class ConsensusCallingIterator[ConsensusRead <: SimpleRead](sourceIterator: Iter val caller = callers.get() caller.synchronized { caller.consensusReadsFromSamRecords(rs) } }.toAsync(chunkSize).flatten + // Flatten AFTER pulling through ParIterator to keep input chunks in phase with output } }