From 012df197811ac43b4daedd3f20150bd4bd28e125 Mon Sep 17 00:00:00 2001 From: Eduwer Camacaro Date: Fri, 21 Feb 2025 09:08:36 -0500 Subject: [PATCH] fix(server): make standby on instance tracker synchronized for multiple threads --- .../server/monitoring/StandbyStoresOnInstance.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/io/littlehorse/server/monitoring/StandbyStoresOnInstance.java b/server/src/main/java/io/littlehorse/server/monitoring/StandbyStoresOnInstance.java index 5aba1dd05..3a81e711c 100644 --- a/server/src/main/java/io/littlehorse/server/monitoring/StandbyStoresOnInstance.java +++ b/server/src/main/java/io/littlehorse/server/monitoring/StandbyStoresOnInstance.java @@ -32,7 +32,7 @@ public final class StandbyStoresOnInstance { * @param currentOffset batch end offset * @param endOffset topic partition end offset */ - public void recordOffsets(TopicPartition topicPartition, final long currentOffset, final long endOffset) { + public synchronized void recordOffsets(TopicPartition topicPartition, final long currentOffset, final long endOffset) { StandbyTopicPartitionMetrics newMetric = new StandbyTopicPartitionMetrics(topicPartition, currentOffset, endOffset); partitions.remove(newMetric); @@ -43,7 +43,7 @@ public void recordOffsets(TopicPartition topicPartition, final long currentOffse * Calculates the total lag across all partitions in the store * @return sum of lag values for all partitions. */ - public long totalLag() { + public synchronized long totalLag() { return partitions.stream() .map(StandbyTopicPartitionMetrics::getCurrentLag) .map(lag -> Math.max(0, lag)) // ignore sentinel values (-1) @@ -56,11 +56,11 @@ public long totalLag() { * * @return The number of registered partitions for this store. */ - public int registeredPartitions() { + public synchronized int registeredPartitions() { return partitions.size(); } - public Optional lagInfoForPartition(int partition) { + public synchronized Optional lagInfoForPartition(int partition) { return partitions.stream() .filter(standbyTopicPartitionMetrics -> standbyTopicPartitionMetrics.getPartition() == partition) .findFirst(); @@ -73,7 +73,7 @@ public Optional lagInfoForPartition(int partition) * @param endOffset partition end offset * @param reason standby suspension reason */ - public void suspendPartition( + public synchronized void suspendPartition( final TopicPartition topicPartition, final long currentOffset, final long endOffset,