Skip to content

Commit

Permalink
fix(server): make standby on instance tracker synchronized for multip…
Browse files Browse the repository at this point in the history
…le threads
  • Loading branch information
eduwercamacaro committed Feb 21, 2025
1 parent a948821 commit 012df19
Showing 1 changed file with 5 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public final class StandbyStoresOnInstance {
* @param currentOffset batch end offset
* @param endOffset topic partition end offset
*/
public void recordOffsets(TopicPartition topicPartition, final long currentOffset, final long endOffset) {
public synchronized void recordOffsets(TopicPartition topicPartition, final long currentOffset, final long endOffset) {
StandbyTopicPartitionMetrics newMetric =
new StandbyTopicPartitionMetrics(topicPartition, currentOffset, endOffset);
partitions.remove(newMetric);
Expand All @@ -43,7 +43,7 @@ public void recordOffsets(TopicPartition topicPartition, final long currentOffse
* Calculates the total lag across all partitions in the store
* @return sum of lag values for all partitions.
*/
public long totalLag() {
public synchronized long totalLag() {
return partitions.stream()
.map(StandbyTopicPartitionMetrics::getCurrentLag)
.map(lag -> Math.max(0, lag)) // ignore sentinel values (-1)
Expand All @@ -56,11 +56,11 @@ public long totalLag() {
*
* @return The number of registered partitions for this store.
*/
public int registeredPartitions() {
public synchronized int registeredPartitions() {
return partitions.size();
}

public Optional<StandbyTopicPartitionMetrics> lagInfoForPartition(int partition) {
public synchronized Optional<StandbyTopicPartitionMetrics> lagInfoForPartition(int partition) {
return partitions.stream()
.filter(standbyTopicPartitionMetrics -> standbyTopicPartitionMetrics.getPartition() == partition)
.findFirst();
Expand All @@ -73,7 +73,7 @@ public Optional<StandbyTopicPartitionMetrics> lagInfoForPartition(int partition)
* @param endOffset partition end offset
* @param reason standby suspension reason
*/
public void suspendPartition(
public synchronized void suspendPartition(
final TopicPartition topicPartition,
final long currentOffset,
final long endOffset,
Expand Down

0 comments on commit 012df19

Please sign in to comment.