diff --git a/server/src/main/java/io/littlehorse/server/monitoring/StandbyStoresOnInstance.java b/server/src/main/java/io/littlehorse/server/monitoring/StandbyStoresOnInstance.java index 5aba1dd05..3a81e711c 100644 --- a/server/src/main/java/io/littlehorse/server/monitoring/StandbyStoresOnInstance.java +++ b/server/src/main/java/io/littlehorse/server/monitoring/StandbyStoresOnInstance.java @@ -32,7 +32,7 @@ public final class StandbyStoresOnInstance { * @param currentOffset batch end offset * @param endOffset topic partition end offset */ - public void recordOffsets(TopicPartition topicPartition, final long currentOffset, final long endOffset) { + public synchronized void recordOffsets(TopicPartition topicPartition, final long currentOffset, final long endOffset) { StandbyTopicPartitionMetrics newMetric = new StandbyTopicPartitionMetrics(topicPartition, currentOffset, endOffset); partitions.remove(newMetric); @@ -43,7 +43,7 @@ public void recordOffsets(TopicPartition topicPartition, final long currentOffse * Calculates the total lag across all partitions in the store * @return sum of lag values for all partitions. */ - public long totalLag() { + public synchronized long totalLag() { return partitions.stream() .map(StandbyTopicPartitionMetrics::getCurrentLag) .map(lag -> Math.max(0, lag)) // ignore sentinel values (-1) @@ -56,11 +56,11 @@ public long totalLag() { * * @return The number of registered partitions for this store. */ - public int registeredPartitions() { + public synchronized int registeredPartitions() { return partitions.size(); } - public Optional lagInfoForPartition(int partition) { + public synchronized Optional lagInfoForPartition(int partition) { return partitions.stream() .filter(standbyTopicPartitionMetrics -> standbyTopicPartitionMetrics.getPartition() == partition) .findFirst(); @@ -73,7 +73,7 @@ public Optional lagInfoForPartition(int partition) * @param endOffset partition end offset * @param reason standby suspension reason */ - public void suspendPartition( + public synchronized void suspendPartition( final TopicPartition topicPartition, final long currentOffset, final long endOffset,