Skip to content

Commit

Permalink
review comments from @clintval
Browse files Browse the repository at this point in the history
  • Loading branch information
mjhipp committed Aug 2, 2022
1 parent f2e63d0 commit 042266f
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,10 @@ class CallDuplexConsensusReads
@arg(doc="The number of threads to use while consensus calling.") val threads: Int = 1,
@arg(doc="Consensus call overlapping bases in mapped paired end reads") val consensusCallOverlappingBases: Boolean = true,
@arg(doc="""
|The number of consensus groups to include in the buffer when using multiple threads.
|For input with very large family sizes, a smaller chunk size, such as 2*<threads> may be suitable.
""") val chunkSize: Int = DefaultChunkSize
|Pull reads from this many source molecules into memory for multi-threaded processing.
|Using a smaller value will require less memory but will negatively impact processing speed.
|For very large family sizes, a smaller value may be necessary to reduce memory usage.
""") val maxSourceMoleculesInMemory: Int = DefaultChunkSize
) extends FgBioTool with LazyLogging {

Io.assertReadable(input)
Expand Down Expand Up @@ -147,7 +148,7 @@ class CallDuplexConsensusReads
maxReadsPerStrand = maxReadsPerStrand.getOrElse(VanillaUmiConsensusCallerOptions.DefaultMaxReads)
)
val progress = ProgressLogger(logger, unit=1000000)
val iterator = new ConsensusCallingIterator(inIter, caller, Some(progress), threads, chunkSize)
val iterator = new ConsensusCallingIterator(inIter, caller, Some(progress), threads, maxSourceMoleculesInMemory)
out ++= iterator
progress.logLast()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import com.fulcrumgenomics.util.ProgressLogger
* @param caller the consensus caller to use to call consensus reads
* @param progress an optional progress logger to which to log progress in input reads
* @param threads the number of threads to use.
* @param chunkSize parallel process in chunkSize units; will cause chunkSize consensus groups to be held in memory
* @param chunkSize across the input [[SamRecord]]s from this many source molecules at a time
*/
class ConsensusCallingIterator[ConsensusRead <: SimpleRead](sourceIterator: Iterator[SamRecord],
caller: UmiConsensusCaller[ConsensusRead],
Expand Down Expand Up @@ -71,6 +71,7 @@ class ConsensusCallingIterator[ConsensusRead <: SimpleRead](sourceIterator: Iter
val caller = callers.get()
caller.synchronized { caller.consensusReadsFromSamRecords(rs) }
}.toAsync(chunkSize).flatten
// Flatten AFTER pulling through ParIterator to keep input chunks in phase with output
}
}

Expand Down

0 comments on commit 042266f

Please sign in to comment.