diff --git a/common/src/main/java/org/astraea/common/metrics/collector/MetricStore.java b/common/src/main/java/org/astraea/common/metrics/collector/MetricStore.java index 2ce2a1e954..3db998c369 100644 --- a/common/src/main/java/org/astraea/common/metrics/collector/MetricStore.java +++ b/common/src/main/java/org/astraea/common/metrics/collector/MetricStore.java @@ -27,11 +27,13 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; +import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; import org.astraea.common.Utils; @@ -39,6 +41,7 @@ import org.astraea.common.consumer.ConsumerConfigs; import org.astraea.common.consumer.Deserializer; import org.astraea.common.consumer.Record; +import org.astraea.common.cost.NoSufficientMetricsException; import org.astraea.common.metrics.BeanObject; import org.astraea.common.metrics.BeanQuery; import org.astraea.common.metrics.ClusterBean; @@ -67,6 +70,9 @@ static Builder builder() { */ Map> sensors(); + /** Wait for the checker to be true or timeout. */ + void wait(Predicate checker, Duration timeout); + @Override void close(); @@ -193,6 +199,8 @@ class MetricStoreImpl implements MetricStore { private final Set identities = new ConcurrentSkipListSet<>(); private volatile Map> lastSensors = Map.of(); + private final Map> waitingList = + new ConcurrentHashMap<>(); private MetricStoreImpl( Supplier>> sensorsSupplier, @@ -243,8 +251,11 @@ private MetricStoreImpl( } }); }); - // generate new cluster bean - if (!allBeans.isEmpty()) updateClusterBean(); + if (!allBeans.isEmpty()) { + // generate new cluster bean + updateClusterBean(); + checkWaitingList(this.waitingList, clusterBean()); + } } catch (Exception e) { // TODO: it needs better error handling e.printStackTrace(); @@ -278,6 +289,25 @@ public void close() { receiver.close(); } + /** User thread will "wait" until being awakened by the metric store or being timeout. */ + @Override + public void wait(Predicate checker, Duration timeout) { + var latch = new CountDownLatch(1); + try { + waitingList.put(latch, checker); + // Check the newly added checker immediately + checkWaitingList(Map.of(latch, checker), clusterBean()); + // Wait until being awake or timeout + if (!latch.await(timeout.toMillis(), TimeUnit.MILLISECONDS)) { + throw new IllegalStateException("Timeout waiting for the checker"); + } + } catch (InterruptedException ie) { + throw new IllegalStateException("Interrupted while waiting for the checker"); + } finally { + waitingList.remove(latch); + } + } + private void updateClusterBean() { lastClusterBean = ClusterBean.of( @@ -287,5 +317,20 @@ private void updateClusterBean() { Collectors.toUnmodifiableMap( Map.Entry::getKey, e -> List.copyOf(e.getValue())))); } + + /** + * Check the checkers in the waiting list. If the checker returns true, count down the latch. + */ + private static void checkWaitingList( + Map> waitingList, ClusterBean clusterBean) { + waitingList.forEach( + (latch, checker) -> { + try { + if (checker.test(clusterBean)) latch.countDown(); + } catch (NoSufficientMetricsException e) { + // Check failed. Try again next time. + } + }); + } } } diff --git a/common/src/test/java/org/astraea/common/metrics/collector/MetricStoreTest.java b/common/src/test/java/org/astraea/common/metrics/collector/MetricStoreTest.java index e8fc23555c..c70651a759 100644 --- a/common/src/test/java/org/astraea/common/metrics/collector/MetricStoreTest.java +++ b/common/src/test/java/org/astraea/common/metrics/collector/MetricStoreTest.java @@ -88,4 +88,32 @@ void testBeanExpiration() { Assertions.assertEquals(0, store.clusterBean().all().size()); } } + + @Test + void testWait() { + var queue = new LinkedBlockingQueue>>(); + + try (var store = + MetricStore.builder() + .receiver(timeout -> Utils.packException(queue::take)) + .sensorsSupplier( + // Metric sensor provide fake hasBeanObject + () -> + Map.of( + (client, bean) -> + List.of(() -> new BeanObject(Utils.randomString(), Map.of(), Map.of())), + (id, exception) -> {})) + .build()) { + Assertions.assertThrows( + IllegalStateException.class, () -> store.wait((ignore) -> false, Duration.ofSeconds(1))); + Assertions.assertDoesNotThrow(() -> store.wait((ignore) -> true, Duration.ofSeconds(1))); + + Assertions.assertThrows( + IllegalStateException.class, + () -> store.wait((clusterBean) -> !clusterBean.all().isEmpty(), Duration.ofSeconds(1))); + queue.add(Map.of(1000, List.of(new BeanObject(Utils.randomString(), Map.of(), Map.of())))); + Assertions.assertDoesNotThrow( + () -> store.wait((clusterBean) -> !clusterBean.all().isEmpty(), Duration.ofSeconds(1))); + } + } }