Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose chunkSize parameter in CallDuplexConsensusReads #867

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

mjhipp
Copy link
Contributor

@mjhipp mjhipp commented Jul 30, 2022

I am suggesting that the chunkSize part of the ConsensusCallingIterator be exposed to the CLI for CallDuplexConsensusReads.

I have hit a use case where the default value of 1024 UMI groups per chunk has prevented me from successfully running the tool in multi-threaded mode for an input file with very large and variable family sizes. By reducing the chunkSize to something like 2 * threads I was able to get it to work.

@codecov-commenter
Copy link

codecov-commenter commented Jul 30, 2022

Codecov Report

Merging #867 (f02a67d) into main (46d4346) will decrease coverage by 0.01%.
The diff coverage is 100.00%.

@@            Coverage Diff             @@
##             main     #867      +/-   ##
==========================================
- Coverage   95.67%   95.66%   -0.02%     
==========================================
  Files         125      125              
  Lines        7234     7239       +5     
  Branches      489      501      +12     
==========================================
+ Hits         6921     6925       +4     
- Misses        313      314       +1     
Flag Coverage Δ
unittests 95.66% <100.00%> (-0.02%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...fulcrumgenomics/umi/CallDuplexConsensusReads.scala 100.00% <100.00%> (ø)
...fulcrumgenomics/umi/ConsensusCallingIterator.scala 100.00% <100.00%> (ø)
...cala/com/fulcrumgenomics/alignment/Alignment.scala 98.76% <0.00%> (-1.24%) ⬇️
...ulcrumgenomics/umi/VanillaUmiConsensusCaller.scala 91.50% <0.00%> (-0.08%) ⬇️
...umgenomics/bam/pileup/StreamingPileupBuilder.scala 100.00% <0.00%> (ø)
...ala/com/fulcrumgenomics/bam/SamRecordClipper.scala 97.60% <0.00%> (+0.05%) ⬆️
...ala/com/fulcrumgenomics/util/GeneAnnotations.scala 86.79% <0.00%> (+0.25%) ⬆️
...om/fulcrumgenomics/umi/DuplexConsensusCaller.scala 96.75% <0.00%> (+0.70%) ⬆️

Help us with your feedback. Take ten seconds to tell us how you rate us.

Comment on lines 69 to 75
else {
ParIterator(groupingIterator, threads=threads).flatMap { rs =>
ParIterator(groupingIterator, threads=threads, chunkSize=chunkSize).map { rs =>
val caller = callers.get()
caller.synchronized { caller.consensusReadsFromSamRecords(rs) }
}.toAsync(chunkSize * 8)
}.toAsync(chunkSize).flatten
}
}
Copy link
Contributor Author

@mjhipp mjhipp Jul 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Before, chunkSize parameter was not actually passed down to the ParIterator, only the toAsync part. The default value of 1024 was hard-coded here, which is unexpected as it is an input member of the ConsensusCallingIterator class, and would be expected to be passed on.
  2. My suggestion is to move the flattening of this mapping operation until after the .toAsync call. My reasoning here is that that, according to the documentation, this value should be a multiple of chunkSize. If we flatten before calling toAsync, we are buffering that number of records, rather than that number of tag families, which the input ParIterator is working on.
    • For example, if your chunkSize is 10, and your family size is 100, and we assume two consensus reads per family, each chunk would contain 1000 reads.
    • If we assume each family produces 2 consensus reads, then each chunk would produce 20 reads.
    • If we flatten before .toAsync, then we are using 8*20 reads, which is not a multiple of one chunk of input. In this case, the buffer would not even hold one chunk's worth of input reads in memory.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I convinced myself that flattening after we pull through the parallel iterator is the right move (see pic).

I also agree that the cache of .toAsync should be a multiple of chunkSize and 1 is the smallest multiple.

My only curiosity is if speed is affected by not buffering more consensus before sending them on their way.

@mjhipp mjhipp force-pushed the mh_call_duplex_chunk branch from aeb4a26 to 5ce8295 Compare July 30, 2022 02:14
@mjhipp mjhipp force-pushed the mh_call_duplex_chunk branch from 5ce8295 to f2e63d0 Compare July 30, 2022 02:16
@mjhipp mjhipp requested a review from clintval July 30, 2022 02:19
Copy link
Member

@clintval clintval left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I approve of the code change but am curious about accidentally causing a speed regression.

I will leave final approval on this one up to @tfenne since he recently modified this code.

Comment on lines 69 to 75
else {
ParIterator(groupingIterator, threads=threads).flatMap { rs =>
ParIterator(groupingIterator, threads=threads, chunkSize=chunkSize).map { rs =>
val caller = callers.get()
caller.synchronized { caller.consensusReadsFromSamRecords(rs) }
}.toAsync(chunkSize * 8)
}.toAsync(chunkSize).flatten
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I convinced myself that flattening after we pull through the parallel iterator is the right move (see pic).

I also agree that the cache of .toAsync should be a multiple of chunkSize and 1 is the smallest multiple.

My only curiosity is if speed is affected by not buffering more consensus before sending them on their way.

@mjhipp mjhipp force-pushed the mh_call_duplex_chunk branch from 9d0c517 to 042266f Compare August 2, 2022 17:24
@mjhipp mjhipp force-pushed the mh_call_duplex_chunk branch from 042266f to f02a67d Compare August 2, 2022 17:25
@mjhipp
Copy link
Contributor Author

mjhipp commented Aug 2, 2022

curious about accidentally causing a speed regression.

If a user uses a smaller cache, it will pretty much always be slower, but the default is not changed here.

The only newly hard-coded piece that I can see causing a speed regression is:

< .toAsync(chunkSize * 8)
> .toAsync(chunkSize)

However, my understanding is that this should probably be about equal in ideal cases, since we are flattening after async. So this would be exactly the same number of reads if all families had 8 raw reads. In most cases, the average family size will probably be higher in my experience, so it would potentially cause a small speedup and use slightly more memory.

@clintval clintval requested a review from tfenne August 2, 2022 17:57
@clintval
Copy link
Member

clintval commented Aug 2, 2022

@tfenne are you OK if we merge this?

val caller = callers.get()
caller.synchronized { caller.consensusReadsFromSamRecords(rs) }
}.toAsync(chunkSize * 8)
}.toAsync(chunkSize).flatten
// Flatten AFTER pulling through ParIterator to keep input chunks in phase with output
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand this correctly, we previously got chunks out of order in some cases?

Suggested change
// Flatten AFTER pulling through ParIterator to keep input chunks in phase with output
// Flatten AFTER pulling through ParIterator to keep the iterator in a stable order

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The concern is not about records out of order, but about the number of chunks that are getting pulled through the iterator at a time. Clint's diagram sums it up better I think

#867 (comment)

In this instance, I am referring to a "source molecule" as all records within a group output by GroupReadsByUmi.

The goal of the change in flattening is that the same number of source molecules (not SAM records) are pulled through the end as the beginning. There will be many fewer SAM records after flattening, since we go from <group size> -> 2.

If we flatten before, and use .toAsync on chunkSize records (not groups), then there would be roughly half a chunk worth of groups getting pulled through .toAsync for every chunk that goes in. By flattening before, we are pulling the same number of groups through the async iterator at a time as the ParIterator.

val caller = callers.get()
caller.synchronized { caller.consensusReadsFromSamRecords(rs) }
}.toAsync(chunkSize * 8)
}.toAsync(chunkSize).flatten
Copy link
Member

@nh13 nh13 Aug 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am confused with why the chunk size passed to toAsync was reduced. Don't we want this value to multiples of the input chunkSize. Furthermore, since the ParIterator is presumably processing threads chunks at any one time, I think this should be chunkSize * threads?

Suggested change
}.toAsync(chunkSize).flatten
}.toAsync(chunkSize * threads).flatten

@@ -40,7 +40,7 @@ import com.fulcrumgenomics.util.ProgressLogger
* @param caller the consensus caller to use to call consensus reads
* @param progress an optional progress logger to which to log progress in input reads
* @param threads the number of threads to use.
* @param chunkSize parallel process in chunkSize units; will cause 8 * chunkSize records to be held in memory
* @param chunkSize across the input [[SamRecord]]s from this many source molecules at a time
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new param doc doesn't track for me. Based on my comment below:

Suggested change
* @param chunkSize across the input [[SamRecord]]s from this many source molecules at a time
* @param chunkSize the number of source molecules to process in a single chunk, where chunks
are processed in parallel. This will cause `2 * threads * chunkSize` source
molecules to be held in memory, where the number of [[SamRecord]]s
held in memory depends on the number of [[SamRecord]]s assigned to
each source molecule at any given time.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is true, then presumably in CallDuplexConsensusReads the number given should be divided before supplying here? I think it will be very confusing to say "only hold 10 source molecules worth of reads in memory" and then find out that with 8 threads I've got 160 molecules worth.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tfenne @nh13 Many of these comments, here and elsewhere in the review, go against my interpretation of how ParIterator works. The way I understand it, these iterable functions operate on a single chunk at a time (not counting .toAsync), using the multi-threaded task support, rather then grabbing one chunk per-thread:

https://github.com/fulcrumgenomics/commons/blob/9372c5585084a1d7486c1b6036d0e06824b2c8d4/src/main/scala/com/fulcrumgenomics/commons/collection/ParIterator.scala#L140-L141

Only one chunk is pulled into memory at a time at the "front" of the ParIterator (it does not mention threads * chunkSize:

https://github.com/fulcrumgenomics/commons/blob/9372c5585084a1d7486c1b6036d0e06824b2c8d4/src/main/scala/com/fulcrumgenomics/commons/collection/ParIterator.scala#L119-L120

The parWith function splits a single chunk across threads:

https://github.com/fulcrumgenomics/commons/blob/9372c5585084a1d7486c1b6036d0e06824b2c8d4/src/main/scala/com/fulcrumgenomics/commons/CommonsDef.scala#L360-L364

It was based on this interpretation that I do not divide by threads before parallelizing. Can you please let me know if I have missed something or where my interpretation is wrong?

@@ -142,7 +149,7 @@ class CallDuplexConsensusReads
maxReadsPerStrand = maxReadsPerStrand.getOrElse(VanillaUmiConsensusCallerOptions.DefaultMaxReads)
)
val progress = ProgressLogger(logger, unit=1000000)
val iterator = new ConsensusCallingIterator(inIter, caller, Some(progress), threads)
val iterator = new ConsensusCallingIterator(inIter, caller, Some(progress), threads, maxSourceMoleculesInMemory)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add the same to CallMolecularConsensusReads? Likely it is not immune from the same issue, so it'd be nice to control there too (and have symmetry of options).

@@ -108,6 +109,12 @@ class CallDuplexConsensusReads
val maxReadsPerStrand: Option[Int] = None,
@arg(doc="The number of threads to use while consensus calling.") val threads: Int = 1,
@arg(doc="Consensus call overlapping bases in mapped paired end reads") val consensusCallOverlappingBases: Boolean = true,
@arg(doc="""
|Pull reads from this many source molecules into memory for multi-threaded processing.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not strictly true, since the ParIterator will have threads * maxSourceMoleculesInMemory, and so will the AsyncIterator that wraps it, so it should be 2 * threads * maxSourceMoleculesInMemory I think.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants