From 0e6bff67b171c336ad8f0d221b5f34f17818c2ed Mon Sep 17 00:00:00 2001 From: Matt Peterson Date: Thu, 13 Feb 2025 12:55:20 -0700 Subject: [PATCH] fix: pr feedback Signed-off-by: Matt Peterson --- ...angeHistoricStreamEventHandlerBuilder.java | 18 +++-- .../ConsumerStreamResponseObserver.java | 27 ++++--- .../consumer/HistoricBlockStreamSupplier.java | 13 ++-- .../LiveStreamEventHandlerBuilder.java | 8 +- .../pbj/PbjBlockStreamServiceProxy.java | 6 +- .../ConsumerStreamResponseObserverTest.java | 49 ++++++++++-- .../HistoricBlockStreamSupplierTest.java | 25 ++++-- .../mediator/LiveStreamMediatorImplTest.java | 77 ++++++++++++++++--- 8 files changed, 171 insertions(+), 52 deletions(-) diff --git a/server/src/main/java/com/hedera/block/server/consumer/ClosedRangeHistoricStreamEventHandlerBuilder.java b/server/src/main/java/com/hedera/block/server/consumer/ClosedRangeHistoricStreamEventHandlerBuilder.java index 9df282486..246f8d36c 100644 --- a/server/src/main/java/com/hedera/block/server/consumer/ClosedRangeHistoricStreamEventHandlerBuilder.java +++ b/server/src/main/java/com/hedera/block/server/consumer/ClosedRangeHistoricStreamEventHandlerBuilder.java @@ -1,11 +1,12 @@ // SPDX-License-Identifier: Apache-2.0 package com.hedera.block.server.consumer; -import com.hedera.block.server.config.BlockNodeContext; +import com.hedera.block.server.metrics.MetricsService; import com.hedera.block.server.persistence.storage.read.BlockReader; import com.hedera.hapi.block.BlockUnparsed; import com.hedera.hapi.block.SubscribeStreamResponseUnparsed; import com.hedera.pbj.runtime.grpc.Pipeline; +import com.swirlds.config.api.Configuration; import edu.umd.cs.findbugs.annotations.NonNull; /** @@ -21,7 +22,8 @@ private ClosedRangeHistoricStreamEventHandlerBuilder() {} * @param endBlockNumber - the end of the requested range of blocks * @param blockReader - the block reader to query for blocks * @param helidonConsumerObserver - the consumer observer used to send data to the consumer - * @param blockNodeContext - the block node context with the configuration and metrics + * @param metricsService - the service responsible for handling metrics + * @param configuration - the configuration settings for the block node * @return a new instance of a closed range historic stream event handler */ @NonNull @@ -30,11 +32,17 @@ public static Runnable build( long endBlockNumber, @NonNull final BlockReader blockReader, @NonNull final Pipeline helidonConsumerObserver, - @NonNull final BlockNodeContext blockNodeContext) { + @NonNull final MetricsService metricsService, + @NonNull final Configuration configuration) { final var consumerStreamResponseObserver = - new ConsumerStreamResponseObserver(helidonConsumerObserver, blockNodeContext); + new ConsumerStreamResponseObserver(helidonConsumerObserver, metricsService); return new HistoricBlockStreamSupplier( - startBlockNumber, endBlockNumber, blockReader, consumerStreamResponseObserver, blockNodeContext); + startBlockNumber, + endBlockNumber, + blockReader, + consumerStreamResponseObserver, + metricsService, + configuration); } } diff --git a/server/src/main/java/com/hedera/block/server/consumer/ConsumerStreamResponseObserver.java b/server/src/main/java/com/hedera/block/server/consumer/ConsumerStreamResponseObserver.java index 1a0c58f26..6603ff292 100644 --- a/server/src/main/java/com/hedera/block/server/consumer/ConsumerStreamResponseObserver.java +++ b/server/src/main/java/com/hedera/block/server/consumer/ConsumerStreamResponseObserver.java @@ -6,7 +6,6 @@ import static java.lang.System.Logger.Level.DEBUG; import static java.lang.System.Logger.Level.ERROR; -import com.hedera.block.server.config.BlockNodeContext; import com.hedera.block.server.events.BlockNodeEventHandler; import com.hedera.block.server.events.LivenessCalculator; import com.hedera.block.server.events.ObjectEvent; @@ -18,6 +17,7 @@ import com.hedera.pbj.runtime.OneOf; import com.hedera.pbj.runtime.ParseException; import com.hedera.pbj.runtime.grpc.Pipeline; +import com.swirlds.config.api.Configuration; import edu.umd.cs.findbugs.annotations.NonNull; import java.time.InstantSource; import java.util.List; @@ -48,48 +48,47 @@ class ConsumerStreamResponseObserver implements BlockNodeEventHandler helidonConsumerObserver, - @NonNull final BlockNodeContext blockNodeContext) { + @NonNull final MetricsService metricsService, + @NonNull final Configuration configuration) { this.livenessCalculator = new LivenessCalculator( producerLivenessClock, - blockNodeContext - .configuration() + Objects.requireNonNull(configuration) .getConfigData(ConsumerConfig.class) .timeoutThresholdMillis()); - this.metricsService = blockNodeContext.metricsService(); + this.metricsService = Objects.requireNonNull(metricsService); this.helidonConsumerObserver = helidonConsumerObserver; } /** - * Constructor for the ConsumerBlockItemObserver class. It is responsible for observing the + * Constructor for the ConsumerStreamResponseObserver class. It is responsible for observing the * SubscribeStreamResponse events from the Disruptor and passing them to the downstream consumer * via the subscribeStreamResponseObserver. Use this constructor when the producer liveness is not * required. * * @param helidonConsumerObserver the observer to use to send responses to the consumer - * @param blockNodeContext contains the context with metrics and configuration for the - * application + * @param metricsService - the service responsible for handling metrics */ public ConsumerStreamResponseObserver( @NonNull final Pipeline helidonConsumerObserver, - @NonNull final BlockNodeContext blockNodeContext) { + @NonNull final MetricsService metricsService) { this.livenessCalculator = null; - this.metricsService = blockNodeContext.metricsService(); - this.helidonConsumerObserver = helidonConsumerObserver; + this.metricsService = Objects.requireNonNull(metricsService); + this.helidonConsumerObserver = Objects.requireNonNull(helidonConsumerObserver); } /** diff --git a/server/src/main/java/com/hedera/block/server/consumer/HistoricBlockStreamSupplier.java b/server/src/main/java/com/hedera/block/server/consumer/HistoricBlockStreamSupplier.java index b71b9123f..2567aa36d 100644 --- a/server/src/main/java/com/hedera/block/server/consumer/HistoricBlockStreamSupplier.java +++ b/server/src/main/java/com/hedera/block/server/consumer/HistoricBlockStreamSupplier.java @@ -7,7 +7,6 @@ import static java.lang.System.Logger.Level.ERROR; import com.hedera.block.common.utils.ChunkUtils; -import com.hedera.block.server.config.BlockNodeContext; import com.hedera.block.server.events.BlockNodeEventHandler; import com.hedera.block.server.events.ObjectEvent; import com.hedera.block.server.metrics.MetricsService; @@ -16,6 +15,7 @@ import com.hedera.hapi.block.BlockItemUnparsed; import com.hedera.hapi.block.BlockUnparsed; import com.hedera.hapi.block.SubscribeStreamResponseUnparsed; +import com.swirlds.config.api.Configuration; import edu.umd.cs.findbugs.annotations.NonNull; import java.util.List; import java.util.Objects; @@ -42,7 +42,8 @@ class HistoricBlockStreamSupplier implements Runnable { * @param endBlockNumber - the end of the requested range of blocks * @param blockReader - the block reader to query for blocks * @param consumerStreamResponseObserver - the consumer stream response observer to send the blocks - * @param blockNodeContext - the block node context with the configuration and metrics + * @param metricsService - the service responsible for handling metrics + * @param configuration - the configuration settings for the block node */ public HistoricBlockStreamSupplier( long startBlockNumber, @@ -51,13 +52,15 @@ public HistoricBlockStreamSupplier( @NonNull final BlockNodeEventHandler> consumerStreamResponseObserver, - @NonNull final BlockNodeContext blockNodeContext) { + @NonNull final MetricsService metricsService, + @NonNull final Configuration configuration) { this.startBlockNumber = startBlockNumber; this.endBlockNumber = endBlockNumber; this.blockReader = Objects.requireNonNull(blockReader); - final ConsumerConfig consumerConfig = blockNodeContext.configuration().getConfigData(ConsumerConfig.class); - this.metricsService = blockNodeContext.metricsService(); + this.metricsService = Objects.requireNonNull(metricsService); + final ConsumerConfig consumerConfig = + Objects.requireNonNull(configuration).getConfigData(ConsumerConfig.class); this.maxBlockItemBatchSize = Objects.requireNonNull(consumerConfig).maxBlockItemBatchSize(); this.consumerStreamResponseObserver = Objects.requireNonNull(consumerStreamResponseObserver); } diff --git a/server/src/main/java/com/hedera/block/server/consumer/LiveStreamEventHandlerBuilder.java b/server/src/main/java/com/hedera/block/server/consumer/LiveStreamEventHandlerBuilder.java index 2fca7f073..a2dc91d0a 100644 --- a/server/src/main/java/com/hedera/block/server/consumer/LiveStreamEventHandlerBuilder.java +++ b/server/src/main/java/com/hedera/block/server/consumer/LiveStreamEventHandlerBuilder.java @@ -1,12 +1,13 @@ // SPDX-License-Identifier: Apache-2.0 package com.hedera.block.server.consumer; -import com.hedera.block.server.config.BlockNodeContext; import com.hedera.block.server.events.BlockNodeEventHandler; import com.hedera.block.server.events.ObjectEvent; import com.hedera.block.server.mediator.SubscriptionHandler; +import com.hedera.block.server.metrics.MetricsService; import com.hedera.hapi.block.SubscribeStreamResponseUnparsed; import com.hedera.pbj.runtime.grpc.Pipeline; +import com.swirlds.config.api.Configuration; import edu.umd.cs.findbugs.annotations.NonNull; import java.time.InstantSource; import java.util.concurrent.CompletionService; @@ -21,11 +22,12 @@ public static BlockNodeEventHandler @NonNull final InstantSource producerLivenessClock, @NonNull final SubscriptionHandler subscriptionHandler, @NonNull final Pipeline observer, - @NonNull final BlockNodeContext blockNodeContext) { + @NonNull final MetricsService metricsService, + @NonNull final Configuration configuration) { // Set the links forward through the chain final var consumerStreamResponseObserver = - new ConsumerStreamResponseObserver(producerLivenessClock, observer, blockNodeContext); + new ConsumerStreamResponseObserver(producerLivenessClock, observer, metricsService, configuration); final var asyncConsumerStreamResponseObserver = new AsyncConsumerStreamResponseObserver( completionService, subscriptionHandler, consumerStreamResponseObserver); diff --git a/server/src/main/java/com/hedera/block/server/pbj/PbjBlockStreamServiceProxy.java b/server/src/main/java/com/hedera/block/server/pbj/PbjBlockStreamServiceProxy.java index eb4d13729..141a08c5d 100644 --- a/server/src/main/java/com/hedera/block/server/pbj/PbjBlockStreamServiceProxy.java +++ b/server/src/main/java/com/hedera/block/server/pbj/PbjBlockStreamServiceProxy.java @@ -227,7 +227,8 @@ void subscribeBlockStream( Clock.systemDefaultZone(), streamMediator, helidonConsumerObserver, - blockNodeContext); + blockNodeContext.metricsService(), + blockNodeContext.configuration()); streamMediator.subscribe(liveStreamEventHandler); } else { @@ -237,7 +238,8 @@ void subscribeBlockStream( subscribeStreamRequest.endBlockNumber(), blockReader, helidonConsumerObserver, - blockNodeContext); + blockNodeContext.metricsService(), + blockNodeContext.configuration()); // Submit the runnable to the executor service closedRangeHistoricStreamingExecutorService.submit(closedRangeHistoricStreamingRunnable); diff --git a/server/src/test/java/com/hedera/block/server/consumer/ConsumerStreamResponseObserverTest.java b/server/src/test/java/com/hedera/block/server/consumer/ConsumerStreamResponseObserverTest.java index e836538c0..8d5acf635 100644 --- a/server/src/test/java/com/hedera/block/server/consumer/ConsumerStreamResponseObserverTest.java +++ b/server/src/test/java/com/hedera/block/server/consumer/ConsumerStreamResponseObserverTest.java @@ -75,7 +75,12 @@ public void testProducerTimeoutWithinWindow() throws Exception { when(testClock.millis()).thenReturn(TEST_TIME, TEST_TIME + TIMEOUT_THRESHOLD_MILLIS); final var consumerBlockItemObserver = LiveStreamEventHandlerBuilder.build( - completionService, testClock, streamMediator, responseStreamObserver, testContext); + completionService, + testClock, + streamMediator, + responseStreamObserver, + testContext.metricsService(), + testContext.configuration()); final BlockHeader blockHeader = BlockHeader.newBuilder().number(1).build(); final BlockItemUnparsed blockItem = BlockItemUnparsed.newBuilder() @@ -106,7 +111,12 @@ public void testProducerTimeoutOutsideWindow() throws Exception { when(testClock.millis()).thenReturn(TEST_TIME, TEST_TIME + TIMEOUT_THRESHOLD_MILLIS + 1); final var consumerBlockItemObserver = LiveStreamEventHandlerBuilder.build( - completionService, testClock, streamMediator, responseStreamObserver, testContext); + completionService, + testClock, + streamMediator, + responseStreamObserver, + testContext.metricsService(), + testContext.configuration()); consumerBlockItemObserver.onEvent(objectEvent, 0, true); verify(streamMediator, timeout(testTimeout)).unsubscribe(consumerBlockItemObserver); @@ -120,7 +130,12 @@ public void testConsumerNotToSendBeforeBlockHeader() throws Exception { when(testClock.millis()).thenReturn(TEST_TIME, TEST_TIME + TIMEOUT_THRESHOLD_MILLIS); final var consumerBlockItemObserver = LiveStreamEventHandlerBuilder.build( - completionService, testClock, streamMediator, responseStreamObserver, testContext); + completionService, + testClock, + streamMediator, + responseStreamObserver, + testContext.metricsService(), + testContext.configuration()); // Send non-header BlockItems to validate that the observer does not send them for (int i = 1; i <= 10; i++) { @@ -183,7 +198,12 @@ public void testSubscriberStreamResponseIsBlockItemWhenBlockItemIsNull() throws when(objectEvent.get()).thenReturn(subscribeStreamResponse); final var consumerBlockItemObserver = LiveStreamEventHandlerBuilder.build( - completionService, testClock, streamMediator, responseStreamObserver, testContext); + completionService, + testClock, + streamMediator, + responseStreamObserver, + testContext.metricsService(), + testContext.configuration()); // This call will throw an exception but, because of the async // service executor, the exception will not get caught until the @@ -203,7 +223,12 @@ public void testSubscribeStreamResponseTypeNotSupported() throws Exception { when(objectEvent.get()).thenReturn(subscribeStreamResponse); final var consumerBlockItemObserver = LiveStreamEventHandlerBuilder.build( - completionService, testClock, streamMediator, responseStreamObserver, testContext); + completionService, + testClock, + streamMediator, + responseStreamObserver, + testContext.metricsService(), + testContext.configuration()); // This call will throw an exception but, because of the async // service executor, the exception will not get caught until the @@ -230,7 +255,12 @@ public void testUncheckedIOExceptionException() throws Exception { doThrow(UncheckedIOException.class).when(responseStreamObserver).onNext(subscribeStreamResponse); final var consumerBlockItemObserver = LiveStreamEventHandlerBuilder.build( - completionService, testClock, streamMediator, responseStreamObserver, testContext); + completionService, + testClock, + streamMediator, + responseStreamObserver, + testContext.metricsService(), + testContext.configuration()); // This call will throw an exception but, because of the async // service executor, the exception will not get caught until the @@ -259,7 +289,12 @@ public void testRuntimeException() throws Exception { doThrow(RuntimeException.class).when(responseStreamObserver).onNext(subscribeStreamResponse); final var consumerBlockItemObserver = LiveStreamEventHandlerBuilder.build( - completionService, testClock, streamMediator, responseStreamObserver, testContext); + completionService, + testClock, + streamMediator, + responseStreamObserver, + testContext.metricsService(), + testContext.configuration()); // This call will throw an exception but, because of the async // service executor, the exception will not get caught until the diff --git a/server/src/test/java/com/hedera/block/server/consumer/HistoricBlockStreamSupplierTest.java b/server/src/test/java/com/hedera/block/server/consumer/HistoricBlockStreamSupplierTest.java index 331e649ea..cb8ca3b54 100644 --- a/server/src/test/java/com/hedera/block/server/consumer/HistoricBlockStreamSupplierTest.java +++ b/server/src/test/java/com/hedera/block/server/consumer/HistoricBlockStreamSupplierTest.java @@ -64,8 +64,13 @@ public void setUp() throws IOException { .maxBlockItemBatchSize(); // The startBlockNumber and endBlockNumber don't matter for these tests - this.historicBlockStreamSupplier = - new HistoricBlockStreamSupplier(0L, 10L, blockReader, helidonConsumerObserver, blockNodeContext); + this.historicBlockStreamSupplier = new HistoricBlockStreamSupplier( + 0L, + 10L, + blockReader, + helidonConsumerObserver, + blockNodeContext.metricsService(), + blockNodeContext.configuration()); } @ParameterizedTest @@ -92,7 +97,12 @@ public void testClosedRangeHistoricStreamingHappyPath() throws IOException, Pars } final Runnable closedRangeHistoricStreamingRunnable = ClosedRangeHistoricStreamEventHandlerBuilder.build( - 1L, numberOfBlocks, blockReader, closedRangeHistoricStreamObserver, blockNodeContext); + 1L, + numberOfBlocks, + blockReader, + closedRangeHistoricStreamObserver, + blockNodeContext.metricsService(), + blockNodeContext.configuration()); closedRangeHistoricStreamingRunnable.run(); @@ -108,8 +118,13 @@ public void testClosedRangeHistoricStreamingHappyPath() throws IOException, Pars public void testClosedRangeHistoricStreamingBlockNotFound() throws Exception { when(blockReader.read(1)).thenReturn(Optional.empty()); - final HistoricBlockStreamSupplier historicBlockStreamSupplier = - new HistoricBlockStreamSupplier(1L, 1L, blockReader, helidonConsumerObserver, blockNodeContext); + final HistoricBlockStreamSupplier historicBlockStreamSupplier = new HistoricBlockStreamSupplier( + 1L, + 1L, + blockReader, + helidonConsumerObserver, + blockNodeContext.metricsService(), + blockNodeContext.configuration()); historicBlockStreamSupplier.run(); diff --git a/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java b/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java index d550f2897..033f3859c 100644 --- a/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java +++ b/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java @@ -168,11 +168,26 @@ public void testMediatorPublishEventToSubscribers() throws IOException, ParseExc when(testClock.millis()).thenReturn(TEST_TIME, TEST_TIME + TIMEOUT_THRESHOLD_MILLIS); final var concreteObserver1 = LiveStreamEventHandlerBuilder.build( - completionService, testClock, streamMediator, helidonSubscribeStreamObserver1, testContext); + completionService, + testClock, + streamMediator, + helidonSubscribeStreamObserver1, + testContext.metricsService(), + testContext.configuration()); final var concreteObserver2 = LiveStreamEventHandlerBuilder.build( - completionService, testClock, streamMediator, helidonSubscribeStreamObserver2, testContext); + completionService, + testClock, + streamMediator, + helidonSubscribeStreamObserver2, + testContext.metricsService(), + testContext.configuration()); final var concreteObserver3 = LiveStreamEventHandlerBuilder.build( - completionService, testClock, streamMediator, helidonSubscribeStreamObserver3, testContext); + completionService, + testClock, + streamMediator, + helidonSubscribeStreamObserver3, + testContext.metricsService(), + testContext.configuration()); // Set up the subscribers streamMediator.subscribe(concreteObserver1); @@ -226,11 +241,26 @@ public void testSubAndUnsubHandling() throws IOException { when(testClock.millis()).thenReturn(TEST_TIME, TEST_TIME + TIMEOUT_THRESHOLD_MILLIS); final var concreteObserver1 = LiveStreamEventHandlerBuilder.build( - completionService, testClock, streamMediator, helidonSubscribeStreamObserver1, testContext); + completionService, + testClock, + streamMediator, + helidonSubscribeStreamObserver1, + testContext.metricsService(), + testContext.configuration()); final var concreteObserver2 = LiveStreamEventHandlerBuilder.build( - completionService, testClock, streamMediator, helidonSubscribeStreamObserver2, testContext); + completionService, + testClock, + streamMediator, + helidonSubscribeStreamObserver2, + testContext.metricsService(), + testContext.configuration()); final var concreteObserver3 = LiveStreamEventHandlerBuilder.build( - completionService, testClock, streamMediator, helidonSubscribeStreamObserver3, testContext); + completionService, + testClock, + streamMediator, + helidonSubscribeStreamObserver3, + testContext.metricsService(), + testContext.configuration()); // Set up the subscribers streamMediator.subscribe(concreteObserver1); @@ -254,7 +284,12 @@ public void testSubscribeWhenHandlerAlreadySubscribed() throws IOException { .build(); final var concreteObserver1 = LiveStreamEventHandlerBuilder.build( - completionService, testClock, streamMediator, helidonSubscribeStreamObserver1, testContext); + completionService, + testClock, + streamMediator, + helidonSubscribeStreamObserver1, + testContext.metricsService(), + testContext.configuration()); streamMediator.subscribe(concreteObserver1); assertTrue(streamMediator.isSubscribed(concreteObserver1)); @@ -384,11 +419,26 @@ public void testMediatorBlocksPublishAfterException() throws IOException, Interr .build(); final var concreteObserver1 = LiveStreamEventHandlerBuilder.build( - completionService, testClock, streamMediator, helidonSubscribeStreamObserver1, testContext); + completionService, + testClock, + streamMediator, + helidonSubscribeStreamObserver1, + testContext.metricsService(), + testContext.configuration()); final var concreteObserver2 = LiveStreamEventHandlerBuilder.build( - completionService, testClock, streamMediator, helidonSubscribeStreamObserver2, testContext); + completionService, + testClock, + streamMediator, + helidonSubscribeStreamObserver2, + testContext.metricsService(), + testContext.configuration()); final var concreteObserver3 = LiveStreamEventHandlerBuilder.build( - completionService, testClock, streamMediator, helidonSubscribeStreamObserver3, testContext); + completionService, + testClock, + streamMediator, + helidonSubscribeStreamObserver3, + testContext.metricsService(), + testContext.configuration()); // Set up the subscribers streamMediator.subscribe(concreteObserver1); @@ -464,7 +514,12 @@ public void testUnsubscribeWhenNotSubscribed() throws IOException { streamMediator.subscribe(handler); final var testConsumerBlockItemObserver = LiveStreamEventHandlerBuilder.build( - completionService, testClock, streamMediator, helidonSubscribeStreamObserver1, testContext); + completionService, + testClock, + streamMediator, + helidonSubscribeStreamObserver1, + testContext.metricsService(), + testContext.configuration()); // Confirm the observer is not subscribed assertFalse(streamMediator.isSubscribed(testConsumerBlockItemObserver));