Skip to content

Commit

Permalink
fix: pr feedback
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Peterson <[email protected]>
  • Loading branch information
mattp-swirldslabs committed Feb 13, 2025
1 parent d2b6f91 commit 0e6bff6
Show file tree
Hide file tree
Showing 8 changed files with 171 additions and 52 deletions.
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
// SPDX-License-Identifier: Apache-2.0
package com.hedera.block.server.consumer;

import com.hedera.block.server.config.BlockNodeContext;
import com.hedera.block.server.metrics.MetricsService;
import com.hedera.block.server.persistence.storage.read.BlockReader;
import com.hedera.hapi.block.BlockUnparsed;
import com.hedera.hapi.block.SubscribeStreamResponseUnparsed;
import com.hedera.pbj.runtime.grpc.Pipeline;
import com.swirlds.config.api.Configuration;
import edu.umd.cs.findbugs.annotations.NonNull;

/**
Expand All @@ -21,7 +22,8 @@ private ClosedRangeHistoricStreamEventHandlerBuilder() {}
* @param endBlockNumber - the end of the requested range of blocks
* @param blockReader - the block reader to query for blocks
* @param helidonConsumerObserver - the consumer observer used to send data to the consumer
* @param blockNodeContext - the block node context with the configuration and metrics
* @param metricsService - the service responsible for handling metrics
* @param configuration - the configuration settings for the block node
* @return a new instance of a closed range historic stream event handler
*/
@NonNull
Expand All @@ -30,11 +32,17 @@ public static Runnable build(
long endBlockNumber,
@NonNull final BlockReader<BlockUnparsed> blockReader,
@NonNull final Pipeline<? super SubscribeStreamResponseUnparsed> helidonConsumerObserver,
@NonNull final BlockNodeContext blockNodeContext) {
@NonNull final MetricsService metricsService,
@NonNull final Configuration configuration) {

final var consumerStreamResponseObserver =
new ConsumerStreamResponseObserver(helidonConsumerObserver, blockNodeContext);
new ConsumerStreamResponseObserver(helidonConsumerObserver, metricsService);
return new HistoricBlockStreamSupplier(
startBlockNumber, endBlockNumber, blockReader, consumerStreamResponseObserver, blockNodeContext);
startBlockNumber,
endBlockNumber,
blockReader,
consumerStreamResponseObserver,
metricsService,
configuration);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import static java.lang.System.Logger.Level.DEBUG;
import static java.lang.System.Logger.Level.ERROR;

import com.hedera.block.server.config.BlockNodeContext;
import com.hedera.block.server.events.BlockNodeEventHandler;
import com.hedera.block.server.events.LivenessCalculator;
import com.hedera.block.server.events.ObjectEvent;
Expand All @@ -18,6 +17,7 @@
import com.hedera.pbj.runtime.OneOf;
import com.hedera.pbj.runtime.ParseException;
import com.hedera.pbj.runtime.grpc.Pipeline;
import com.swirlds.config.api.Configuration;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.time.InstantSource;
import java.util.List;
Expand Down Expand Up @@ -48,48 +48,47 @@ class ConsumerStreamResponseObserver implements BlockNodeEventHandler<ObjectEven
private final LivenessCalculator livenessCalculator;

/**
* Constructor for the ConsumerBlockItemObserver class. It is responsible for observing the
* Constructor for the ConsumerStreamResponseObserver class. It is responsible for observing the
* SubscribeStreamResponse events from the Disruptor and passing them to the downstream consumer
* via the subscribeStreamResponseObserver.
*
* @param producerLivenessClock the clock to use to determine the producer liveness
* @param helidonConsumerObserver the observer to use to send responses to the consumer
* @param blockNodeContext contains the context with metrics and configuration for the
* application
* @param metricsService - the service responsible for handling metrics
* @param configuration - the configuration settings for the block node
*/
public ConsumerStreamResponseObserver(
@NonNull final InstantSource producerLivenessClock,
@NonNull final Pipeline<? super SubscribeStreamResponseUnparsed> helidonConsumerObserver,
@NonNull final BlockNodeContext blockNodeContext) {
@NonNull final MetricsService metricsService,
@NonNull final Configuration configuration) {

this.livenessCalculator = new LivenessCalculator(
producerLivenessClock,
blockNodeContext
.configuration()
Objects.requireNonNull(configuration)
.getConfigData(ConsumerConfig.class)
.timeoutThresholdMillis());

this.metricsService = blockNodeContext.metricsService();
this.metricsService = Objects.requireNonNull(metricsService);
this.helidonConsumerObserver = helidonConsumerObserver;
}

/**
* Constructor for the ConsumerBlockItemObserver class. It is responsible for observing the
* Constructor for the ConsumerStreamResponseObserver class. It is responsible for observing the
* SubscribeStreamResponse events from the Disruptor and passing them to the downstream consumer
* via the subscribeStreamResponseObserver. Use this constructor when the producer liveness is not
* required.
*
* @param helidonConsumerObserver the observer to use to send responses to the consumer
* @param blockNodeContext contains the context with metrics and configuration for the
* application
* @param metricsService - the service responsible for handling metrics
*/
public ConsumerStreamResponseObserver(
@NonNull final Pipeline<? super SubscribeStreamResponseUnparsed> helidonConsumerObserver,
@NonNull final BlockNodeContext blockNodeContext) {
@NonNull final MetricsService metricsService) {

this.livenessCalculator = null;
this.metricsService = blockNodeContext.metricsService();
this.helidonConsumerObserver = helidonConsumerObserver;
this.metricsService = Objects.requireNonNull(metricsService);
this.helidonConsumerObserver = Objects.requireNonNull(helidonConsumerObserver);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import static java.lang.System.Logger.Level.ERROR;

import com.hedera.block.common.utils.ChunkUtils;
import com.hedera.block.server.config.BlockNodeContext;
import com.hedera.block.server.events.BlockNodeEventHandler;
import com.hedera.block.server.events.ObjectEvent;
import com.hedera.block.server.metrics.MetricsService;
Expand All @@ -16,6 +15,7 @@
import com.hedera.hapi.block.BlockItemUnparsed;
import com.hedera.hapi.block.BlockUnparsed;
import com.hedera.hapi.block.SubscribeStreamResponseUnparsed;
import com.swirlds.config.api.Configuration;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.List;
import java.util.Objects;
Expand All @@ -42,7 +42,8 @@ class HistoricBlockStreamSupplier implements Runnable {
* @param endBlockNumber - the end of the requested range of blocks
* @param blockReader - the block reader to query for blocks
* @param consumerStreamResponseObserver - the consumer stream response observer to send the blocks
* @param blockNodeContext - the block node context with the configuration and metrics
* @param metricsService - the service responsible for handling metrics
* @param configuration - the configuration settings for the block node
*/
public HistoricBlockStreamSupplier(
long startBlockNumber,
Expand All @@ -51,13 +52,15 @@ public HistoricBlockStreamSupplier(
@NonNull
final BlockNodeEventHandler<ObjectEvent<SubscribeStreamResponseUnparsed>>
consumerStreamResponseObserver,
@NonNull final BlockNodeContext blockNodeContext) {
@NonNull final MetricsService metricsService,
@NonNull final Configuration configuration) {
this.startBlockNumber = startBlockNumber;
this.endBlockNumber = endBlockNumber;
this.blockReader = Objects.requireNonNull(blockReader);

final ConsumerConfig consumerConfig = blockNodeContext.configuration().getConfigData(ConsumerConfig.class);
this.metricsService = blockNodeContext.metricsService();
this.metricsService = Objects.requireNonNull(metricsService);
final ConsumerConfig consumerConfig =
Objects.requireNonNull(configuration).getConfigData(ConsumerConfig.class);
this.maxBlockItemBatchSize = Objects.requireNonNull(consumerConfig).maxBlockItemBatchSize();
this.consumerStreamResponseObserver = Objects.requireNonNull(consumerStreamResponseObserver);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
// SPDX-License-Identifier: Apache-2.0
package com.hedera.block.server.consumer;

import com.hedera.block.server.config.BlockNodeContext;
import com.hedera.block.server.events.BlockNodeEventHandler;
import com.hedera.block.server.events.ObjectEvent;
import com.hedera.block.server.mediator.SubscriptionHandler;
import com.hedera.block.server.metrics.MetricsService;
import com.hedera.hapi.block.SubscribeStreamResponseUnparsed;
import com.hedera.pbj.runtime.grpc.Pipeline;
import com.swirlds.config.api.Configuration;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.time.InstantSource;
import java.util.concurrent.CompletionService;
Expand All @@ -21,11 +22,12 @@ public static BlockNodeEventHandler<ObjectEvent<SubscribeStreamResponseUnparsed>
@NonNull final InstantSource producerLivenessClock,
@NonNull final SubscriptionHandler<SubscribeStreamResponseUnparsed> subscriptionHandler,
@NonNull final Pipeline<? super SubscribeStreamResponseUnparsed> observer,
@NonNull final BlockNodeContext blockNodeContext) {
@NonNull final MetricsService metricsService,
@NonNull final Configuration configuration) {

// Set the links forward through the chain
final var consumerStreamResponseObserver =
new ConsumerStreamResponseObserver(producerLivenessClock, observer, blockNodeContext);
new ConsumerStreamResponseObserver(producerLivenessClock, observer, metricsService, configuration);

final var asyncConsumerStreamResponseObserver = new AsyncConsumerStreamResponseObserver(
completionService, subscriptionHandler, consumerStreamResponseObserver);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ void subscribeBlockStream(
Clock.systemDefaultZone(),
streamMediator,
helidonConsumerObserver,
blockNodeContext);
blockNodeContext.metricsService(),
blockNodeContext.configuration());

streamMediator.subscribe(liveStreamEventHandler);
} else {
Expand All @@ -237,7 +238,8 @@ void subscribeBlockStream(
subscribeStreamRequest.endBlockNumber(),
blockReader,
helidonConsumerObserver,
blockNodeContext);
blockNodeContext.metricsService(),
blockNodeContext.configuration());

// Submit the runnable to the executor service
closedRangeHistoricStreamingExecutorService.submit(closedRangeHistoricStreamingRunnable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,12 @@ public void testProducerTimeoutWithinWindow() throws Exception {
when(testClock.millis()).thenReturn(TEST_TIME, TEST_TIME + TIMEOUT_THRESHOLD_MILLIS);

final var consumerBlockItemObserver = LiveStreamEventHandlerBuilder.build(
completionService, testClock, streamMediator, responseStreamObserver, testContext);
completionService,
testClock,
streamMediator,
responseStreamObserver,
testContext.metricsService(),
testContext.configuration());

final BlockHeader blockHeader = BlockHeader.newBuilder().number(1).build();
final BlockItemUnparsed blockItem = BlockItemUnparsed.newBuilder()
Expand Down Expand Up @@ -106,7 +111,12 @@ public void testProducerTimeoutOutsideWindow() throws Exception {
when(testClock.millis()).thenReturn(TEST_TIME, TEST_TIME + TIMEOUT_THRESHOLD_MILLIS + 1);

final var consumerBlockItemObserver = LiveStreamEventHandlerBuilder.build(
completionService, testClock, streamMediator, responseStreamObserver, testContext);
completionService,
testClock,
streamMediator,
responseStreamObserver,
testContext.metricsService(),
testContext.configuration());

consumerBlockItemObserver.onEvent(objectEvent, 0, true);
verify(streamMediator, timeout(testTimeout)).unsubscribe(consumerBlockItemObserver);
Expand All @@ -120,7 +130,12 @@ public void testConsumerNotToSendBeforeBlockHeader() throws Exception {
when(testClock.millis()).thenReturn(TEST_TIME, TEST_TIME + TIMEOUT_THRESHOLD_MILLIS);

final var consumerBlockItemObserver = LiveStreamEventHandlerBuilder.build(
completionService, testClock, streamMediator, responseStreamObserver, testContext);
completionService,
testClock,
streamMediator,
responseStreamObserver,
testContext.metricsService(),
testContext.configuration());

// Send non-header BlockItems to validate that the observer does not send them
for (int i = 1; i <= 10; i++) {
Expand Down Expand Up @@ -183,7 +198,12 @@ public void testSubscriberStreamResponseIsBlockItemWhenBlockItemIsNull() throws
when(objectEvent.get()).thenReturn(subscribeStreamResponse);

final var consumerBlockItemObserver = LiveStreamEventHandlerBuilder.build(
completionService, testClock, streamMediator, responseStreamObserver, testContext);
completionService,
testClock,
streamMediator,
responseStreamObserver,
testContext.metricsService(),
testContext.configuration());

// This call will throw an exception but, because of the async
// service executor, the exception will not get caught until the
Expand All @@ -203,7 +223,12 @@ public void testSubscribeStreamResponseTypeNotSupported() throws Exception {
when(objectEvent.get()).thenReturn(subscribeStreamResponse);

final var consumerBlockItemObserver = LiveStreamEventHandlerBuilder.build(
completionService, testClock, streamMediator, responseStreamObserver, testContext);
completionService,
testClock,
streamMediator,
responseStreamObserver,
testContext.metricsService(),
testContext.configuration());

// This call will throw an exception but, because of the async
// service executor, the exception will not get caught until the
Expand All @@ -230,7 +255,12 @@ public void testUncheckedIOExceptionException() throws Exception {
doThrow(UncheckedIOException.class).when(responseStreamObserver).onNext(subscribeStreamResponse);

final var consumerBlockItemObserver = LiveStreamEventHandlerBuilder.build(
completionService, testClock, streamMediator, responseStreamObserver, testContext);
completionService,
testClock,
streamMediator,
responseStreamObserver,
testContext.metricsService(),
testContext.configuration());

// This call will throw an exception but, because of the async
// service executor, the exception will not get caught until the
Expand Down Expand Up @@ -259,7 +289,12 @@ public void testRuntimeException() throws Exception {
doThrow(RuntimeException.class).when(responseStreamObserver).onNext(subscribeStreamResponse);

final var consumerBlockItemObserver = LiveStreamEventHandlerBuilder.build(
completionService, testClock, streamMediator, responseStreamObserver, testContext);
completionService,
testClock,
streamMediator,
responseStreamObserver,
testContext.metricsService(),
testContext.configuration());

// This call will throw an exception but, because of the async
// service executor, the exception will not get caught until the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,13 @@ public void setUp() throws IOException {
.maxBlockItemBatchSize();

// The startBlockNumber and endBlockNumber don't matter for these tests
this.historicBlockStreamSupplier =
new HistoricBlockStreamSupplier(0L, 10L, blockReader, helidonConsumerObserver, blockNodeContext);
this.historicBlockStreamSupplier = new HistoricBlockStreamSupplier(
0L,
10L,
blockReader,
helidonConsumerObserver,
blockNodeContext.metricsService(),
blockNodeContext.configuration());
}

@ParameterizedTest
Expand All @@ -92,7 +97,12 @@ public void testClosedRangeHistoricStreamingHappyPath() throws IOException, Pars
}

final Runnable closedRangeHistoricStreamingRunnable = ClosedRangeHistoricStreamEventHandlerBuilder.build(
1L, numberOfBlocks, blockReader, closedRangeHistoricStreamObserver, blockNodeContext);
1L,
numberOfBlocks,
blockReader,
closedRangeHistoricStreamObserver,
blockNodeContext.metricsService(),
blockNodeContext.configuration());

closedRangeHistoricStreamingRunnable.run();

Expand All @@ -108,8 +118,13 @@ public void testClosedRangeHistoricStreamingHappyPath() throws IOException, Pars
public void testClosedRangeHistoricStreamingBlockNotFound() throws Exception {
when(blockReader.read(1)).thenReturn(Optional.empty());

final HistoricBlockStreamSupplier historicBlockStreamSupplier =
new HistoricBlockStreamSupplier(1L, 1L, blockReader, helidonConsumerObserver, blockNodeContext);
final HistoricBlockStreamSupplier historicBlockStreamSupplier = new HistoricBlockStreamSupplier(
1L,
1L,
blockReader,
helidonConsumerObserver,
blockNodeContext.metricsService(),
blockNodeContext.configuration());

historicBlockStreamSupplier.run();

Expand Down
Loading

0 comments on commit 0e6bff6

Please sign in to comment.