From f2e63d0504359d018fede2e3b085f7814a7e5b8e Mon Sep 17 00:00:00 2001 From: Michael Hipp Date: Fri, 29 Jul 2022 18:47:10 -0700 Subject: [PATCH 1/2] Expose chunkSize parameter in CallDuplexConsensusReads --- .../com/fulcrumgenomics/umi/CallDuplexConsensusReads.scala | 7 ++++++- .../com/fulcrumgenomics/umi/ConsensusCallingIterator.scala | 6 +++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/src/main/scala/com/fulcrumgenomics/umi/CallDuplexConsensusReads.scala b/src/main/scala/com/fulcrumgenomics/umi/CallDuplexConsensusReads.scala index 01ee0c6a3..55ad31513 100644 --- a/src/main/scala/com/fulcrumgenomics/umi/CallDuplexConsensusReads.scala +++ b/src/main/scala/com/fulcrumgenomics/umi/CallDuplexConsensusReads.scala @@ -28,6 +28,7 @@ import com.fulcrumgenomics.FgBioDef._ import com.fulcrumgenomics.bam.OverlappingBasesConsensusCaller import com.fulcrumgenomics.bam.api.{SamOrder, SamSource, SamWriter} import com.fulcrumgenomics.cmdline.{ClpGroups, FgBioTool} +import com.fulcrumgenomics.commons.collection.ParIterator.DefaultChunkSize import com.fulcrumgenomics.sopt.clp import com.fulcrumgenomics.commons.io.Io import com.fulcrumgenomics.commons.util.LazyLogging @@ -108,6 +109,10 @@ class CallDuplexConsensusReads val maxReadsPerStrand: Option[Int] = None, @arg(doc="The number of threads to use while consensus calling.") val threads: Int = 1, @arg(doc="Consensus call overlapping bases in mapped paired end reads") val consensusCallOverlappingBases: Boolean = true, + @arg(doc=""" + |The number of consensus groups to include in the buffer when using multiple threads. + |For input with very large family sizes, a smaller chunk size, such as 2* may be suitable. + """) val chunkSize: Int = DefaultChunkSize ) extends FgBioTool with LazyLogging { Io.assertReadable(input) @@ -142,7 +147,7 @@ class CallDuplexConsensusReads maxReadsPerStrand = maxReadsPerStrand.getOrElse(VanillaUmiConsensusCallerOptions.DefaultMaxReads) ) val progress = ProgressLogger(logger, unit=1000000) - val iterator = new ConsensusCallingIterator(inIter, caller, Some(progress), threads) + val iterator = new ConsensusCallingIterator(inIter, caller, Some(progress), threads, chunkSize) out ++= iterator progress.logLast() diff --git a/src/main/scala/com/fulcrumgenomics/umi/ConsensusCallingIterator.scala b/src/main/scala/com/fulcrumgenomics/umi/ConsensusCallingIterator.scala index 449133c01..9b752669b 100644 --- a/src/main/scala/com/fulcrumgenomics/umi/ConsensusCallingIterator.scala +++ b/src/main/scala/com/fulcrumgenomics/umi/ConsensusCallingIterator.scala @@ -40,7 +40,7 @@ import com.fulcrumgenomics.util.ProgressLogger * @param caller the consensus caller to use to call consensus reads * @param progress an optional progress logger to which to log progress in input reads * @param threads the number of threads to use. - * @param chunkSize parallel process in chunkSize units; will cause 8 * chunkSize records to be held in memory + * @param chunkSize parallel process in chunkSize units; will cause chunkSize consensus groups to be held in memory */ class ConsensusCallingIterator[ConsensusRead <: SimpleRead](sourceIterator: Iterator[SamRecord], caller: UmiConsensusCaller[ConsensusRead], @@ -67,10 +67,10 @@ class ConsensusCallingIterator[ConsensusRead <: SimpleRead](sourceIterator: Iter groupingIterator.flatMap(caller.consensusReadsFromSamRecords) } else { - ParIterator(groupingIterator, threads=threads).flatMap { rs => + ParIterator(groupingIterator, threads=threads, chunkSize=chunkSize).map { rs => val caller = callers.get() caller.synchronized { caller.consensusReadsFromSamRecords(rs) } - }.toAsync(chunkSize * 8) + }.toAsync(chunkSize).flatten } } From f02a67d0e6fe8936c6aced7cf034194b5900d613 Mon Sep 17 00:00:00 2001 From: Michael Hipp Date: Tue, 2 Aug 2022 10:22:41 -0700 Subject: [PATCH 2/2] review comments from @clintval --- .../fulcrumgenomics/umi/CallDuplexConsensusReads.scala | 10 ++++++---- .../fulcrumgenomics/umi/ConsensusCallingIterator.scala | 3 ++- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/src/main/scala/com/fulcrumgenomics/umi/CallDuplexConsensusReads.scala b/src/main/scala/com/fulcrumgenomics/umi/CallDuplexConsensusReads.scala index 55ad31513..a8b794ca0 100644 --- a/src/main/scala/com/fulcrumgenomics/umi/CallDuplexConsensusReads.scala +++ b/src/main/scala/com/fulcrumgenomics/umi/CallDuplexConsensusReads.scala @@ -110,9 +110,11 @@ class CallDuplexConsensusReads @arg(doc="The number of threads to use while consensus calling.") val threads: Int = 1, @arg(doc="Consensus call overlapping bases in mapped paired end reads") val consensusCallOverlappingBases: Boolean = true, @arg(doc=""" - |The number of consensus groups to include in the buffer when using multiple threads. - |For input with very large family sizes, a smaller chunk size, such as 2* may be suitable. - """) val chunkSize: Int = DefaultChunkSize + |Pull reads from this many source molecules into memory for multi-threaded processing. + |Using a smaller value will require less memory but will negatively impact processing speed. + |For very large family sizes, a smaller value may be necessary to reduce memory usage. + |This value is only used when `--threads > 1`. + """) val maxSourceMoleculesInMemory: Int = DefaultChunkSize ) extends FgBioTool with LazyLogging { Io.assertReadable(input) @@ -147,7 +149,7 @@ class CallDuplexConsensusReads maxReadsPerStrand = maxReadsPerStrand.getOrElse(VanillaUmiConsensusCallerOptions.DefaultMaxReads) ) val progress = ProgressLogger(logger, unit=1000000) - val iterator = new ConsensusCallingIterator(inIter, caller, Some(progress), threads, chunkSize) + val iterator = new ConsensusCallingIterator(inIter, caller, Some(progress), threads, maxSourceMoleculesInMemory) out ++= iterator progress.logLast() diff --git a/src/main/scala/com/fulcrumgenomics/umi/ConsensusCallingIterator.scala b/src/main/scala/com/fulcrumgenomics/umi/ConsensusCallingIterator.scala index 9b752669b..5ca2eafbe 100644 --- a/src/main/scala/com/fulcrumgenomics/umi/ConsensusCallingIterator.scala +++ b/src/main/scala/com/fulcrumgenomics/umi/ConsensusCallingIterator.scala @@ -40,7 +40,7 @@ import com.fulcrumgenomics.util.ProgressLogger * @param caller the consensus caller to use to call consensus reads * @param progress an optional progress logger to which to log progress in input reads * @param threads the number of threads to use. - * @param chunkSize parallel process in chunkSize units; will cause chunkSize consensus groups to be held in memory + * @param chunkSize across the input [[SamRecord]]s from this many source molecules at a time */ class ConsensusCallingIterator[ConsensusRead <: SimpleRead](sourceIterator: Iterator[SamRecord], caller: UmiConsensusCaller[ConsensusRead], @@ -71,6 +71,7 @@ class ConsensusCallingIterator[ConsensusRead <: SimpleRead](sourceIterator: Iter val caller = callers.get() caller.synchronized { caller.consensusReadsFromSamRecords(rs) } }.toAsync(chunkSize).flatten + // Flatten AFTER pulling through ParIterator to keep input chunks in phase with output } }