-
Notifications
You must be signed in to change notification settings - Fork 85
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add its own commit loop which resolves partition offsets conflicts #27
Conversation
…er consumption of value buffers
… to avoid duplicates and version corruption
…er; Fix off-by-one error with offsets/versions
b2bcd38
to
9d38903
Compare
Co-authored-by: Christian Williams <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good to me overall 👍
|
||
// TODO if the new assignment is only an addition of partitions we don't need to reset state | ||
if partition_assignment.rebalance.is_some() { | ||
let partitions = partition_assignment.rebalance.as_ref().unwrap().clone(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i feel like this unwrap and clone can be avoided if we refactor how we check and reset partition assignments. but since this is not in the hot path, the overhead should be minor, so we can do the clean in future PRs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay so this took me a while. Yes I don't need clone, but I couldn't make it compile otherwise.
The errors I was getting is something like cannot borrow as the immutable because it's already borrowed a mutable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, to workaround that, we need to combine the rebalance check and partition_assignment.reset_with
into a single method.
Co-authored-by: QP Hou <[email protected]>
Co-authored-by: QP Hou <[email protected]>
c4bd2c2
to
9ac5f16
Compare
Fixes #25
Here's log from tests right after the new consumer is added to the group:
So what do we see here. Before, the WORKER-1 had all 4 partition, after introducing the WORKER-2, we've got first to have 0 and 1, second 2 and 3.
On rebalance, we clear all pending state and seek the consumers to the offsets from delta.
However you might notice that WORKER-2 is seeked twice. That's due to the distributive nature, first seek it to
3:9,2:10
, because delta log versions are3:8,2:9
(notice +1, because we seek to the next message). But then when WORKER-2 tries to do a commit it founds that the delta store is actually3:10,2:11
now. That is because of WORKER-1 committed version 6 right in between/before rebalance (note that we first load a table and then subscribe a consumer, so there's a time gap).But that's okay since WORKER-2 notices that (e.g. updated delta offsets) and seek its consumer to
3:11,2:12
.Right after that, WORKER-1 cannot write data to partitions
2,3
so no more conflicts are possible and hence both WORKER-1 and WORKER-2 are making commits to the delta without any conflicts or any other issues in parallel