From 5b096eccc1a1ad4bd170cccd977cbcbcb49db310 Mon Sep 17 00:00:00 2001 From: Alfredo Gutierrez Date: Fri, 18 Oct 2024 17:19:05 -0600 Subject: [PATCH] Refactor BlockStreamService to extract BlockAccessService Signed-off-by: Alfredo Gutierrez --- .../server/BlockNodeAppInjectionModule.java | 1 + .../com/hedera/block/server/Constants.java | 3 +- .../block/server/grpc/BlockAccessService.java | 176 ++++++++++++ .../server/{ => grpc}/BlockStreamService.java | 106 +------- server/src/main/java/module-info.java | 1 + .../hedera/block/server/BlockNodeAppTest.java | 1 + .../server/grpc/BlockAccessServiceTest.java | 250 ++++++++++++++++++ .../BlockStreamServiceIntegrationTest.java | 8 +- .../{ => grpc}/BlockStreamServiceTest.java | 206 +-------------- .../server/notifier/NotifierImplTest.java | 2 +- 10 files changed, 448 insertions(+), 306 deletions(-) create mode 100644 server/src/main/java/com/hedera/block/server/grpc/BlockAccessService.java rename server/src/main/java/com/hedera/block/server/{ => grpc}/BlockStreamService.java (62%) create mode 100644 server/src/test/java/com/hedera/block/server/grpc/BlockAccessServiceTest.java rename server/src/test/java/com/hedera/block/server/{ => grpc}/BlockStreamServiceIntegrationTest.java (99%) rename server/src/test/java/com/hedera/block/server/{ => grpc}/BlockStreamServiceTest.java (50%) diff --git a/server/src/main/java/com/hedera/block/server/BlockNodeAppInjectionModule.java b/server/src/main/java/com/hedera/block/server/BlockNodeAppInjectionModule.java index f23cab5a7..3aafbc4b9 100644 --- a/server/src/main/java/com/hedera/block/server/BlockNodeAppInjectionModule.java +++ b/server/src/main/java/com/hedera/block/server/BlockNodeAppInjectionModule.java @@ -17,6 +17,7 @@ package com.hedera.block.server; import com.hedera.block.server.config.BlockNodeContext; +import com.hedera.block.server.grpc.BlockStreamService; import com.hedera.block.server.metrics.MetricsService; import com.swirlds.config.api.Configuration; import dagger.Binds; diff --git a/server/src/main/java/com/hedera/block/server/Constants.java b/server/src/main/java/com/hedera/block/server/Constants.java index 4b7a894d2..e471d213d 100644 --- a/server/src/main/java/com/hedera/block/server/Constants.java +++ b/server/src/main/java/com/hedera/block/server/Constants.java @@ -31,9 +31,10 @@ private Constants() {} */ @NonNull public static final String LOGGING_PROPERTIES = "logging.properties"; - /** Constant mapped to the name of the service in the .proto file */ + /** Constant mapped to the name of the BlockStream service in the .proto file */ @NonNull public static final String SERVICE_NAME_BLOCK_STREAM = "BlockStreamService"; + /** Constant mapped to the name of the BlockAccess service in the .proto file */ @NonNull public static final String SERVICE_NAME_BLOCK_ACCESS = "BlockAccessService"; /** Constant mapped to the publishBlockStream service method name in the .proto file */ diff --git a/server/src/main/java/com/hedera/block/server/grpc/BlockAccessService.java b/server/src/main/java/com/hedera/block/server/grpc/BlockAccessService.java new file mode 100644 index 000000000..aae176ef2 --- /dev/null +++ b/server/src/main/java/com/hedera/block/server/grpc/BlockAccessService.java @@ -0,0 +1,176 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.server.grpc; + +import static com.hedera.block.server.Constants.SINGLE_BLOCK_METHOD_NAME; +import static com.hedera.block.server.Translator.fromPbj; +import static com.hedera.block.server.Translator.toPbj; +import static com.hedera.block.server.metrics.BlockNodeMetricTypes.Counter.SingleBlocksNotFound; +import static com.hedera.block.server.metrics.BlockNodeMetricTypes.Counter.SingleBlocksRetrieved; +import static java.lang.System.Logger.Level.DEBUG; +import static java.lang.System.Logger.Level.ERROR; + +import com.google.protobuf.Descriptors; +import com.google.protobuf.InvalidProtocolBufferException; +import com.hedera.block.server.Constants; +import com.hedera.block.server.metrics.MetricsService; +import com.hedera.block.server.persistence.storage.read.BlockReader; +import com.hedera.block.server.service.ServiceStatus; +import com.hedera.hapi.block.SingleBlockRequest; +import com.hedera.hapi.block.SingleBlockResponseCode; +import com.hedera.hapi.block.protoc.BlockService; +import com.hedera.hapi.block.protoc.SingleBlockResponse; +import com.hedera.hapi.block.stream.Block; +import com.hedera.pbj.runtime.ParseException; +import edu.umd.cs.findbugs.annotations.NonNull; +import io.grpc.stub.StreamObserver; +import io.helidon.webserver.grpc.GrpcService; +import java.io.IOException; +import java.util.Optional; +import javax.inject.Inject; + +/** + * The BlockAccessService class provides a gRPC service to access blocks. + * + *

This service provides a unary gRPC method to retrieve a single block by block number. + */ +public class BlockAccessService implements GrpcService { + private final System.Logger LOGGER = System.getLogger(getClass().getName()); + + private final ServiceStatus serviceStatus; + private final BlockReader blockReader; + private final MetricsService metricsService; + + /** + * Constructs a new BlockAccessService instance with the given dependencies. + * + * @param serviceStatus used to query the service status + * @param blockReader used to retrieve blocks + * @param metricsService used to observe metrics + */ + @Inject + public BlockAccessService( + @NonNull ServiceStatus serviceStatus, + @NonNull BlockReader blockReader, + @NonNull MetricsService metricsService) { + this.serviceStatus = serviceStatus; + this.blockReader = blockReader; + this.metricsService = metricsService; + } + + @Override + public Descriptors.FileDescriptor proto() { + return BlockService.getDescriptor(); + } + + @Override + public String serviceName() { + return Constants.SERVICE_NAME_BLOCK_ACCESS; + } + + @Override + public void update(Routing routing) { + routing.unary(SINGLE_BLOCK_METHOD_NAME, this::protocSingleBlock); + } + + void protocSingleBlock( + @NonNull final com.hedera.hapi.block.protoc.SingleBlockRequest singleBlockRequest, + @NonNull final StreamObserver singleBlockResponseStreamObserver) { + LOGGER.log(DEBUG, "Executing Unary singleBlock gRPC method"); + + try { + final SingleBlockRequest pbjSingleBlockRequest = + toPbj(SingleBlockRequest.PROTOBUF, singleBlockRequest.toByteArray()); + singleBlock(pbjSingleBlockRequest, singleBlockResponseStreamObserver); + } catch (ParseException e) { + LOGGER.log(ERROR, "Error parsing protoc SingleBlockRequest: {0}", singleBlockRequest); + singleBlockResponseStreamObserver.onNext(buildSingleBlockNotAvailableResponse()); + } + } + + private void singleBlock( + @NonNull final SingleBlockRequest singleBlockRequest, + @NonNull + final StreamObserver + singleBlockResponseStreamObserver) { + + LOGGER.log(DEBUG, "Executing Unary singleBlock gRPC method"); + + if (serviceStatus.isRunning()) { + final long blockNumber = singleBlockRequest.blockNumber(); + try { + final Optional blockOpt = blockReader.read(blockNumber); + if (blockOpt.isPresent()) { + LOGGER.log(DEBUG, "Successfully returning block number: {0}", blockNumber); + singleBlockResponseStreamObserver.onNext( + fromPbjSingleBlockSuccessResponse(blockOpt.get())); + + metricsService.get(SingleBlocksRetrieved).increment(); + } else { + LOGGER.log(DEBUG, "Block number {0} not found", blockNumber); + singleBlockResponseStreamObserver.onNext(buildSingleBlockNotFoundResponse()); + metricsService.get(SingleBlocksNotFound).increment(); + } + } catch (IOException e) { + LOGGER.log(ERROR, "Error reading block number: {0}", blockNumber); + singleBlockResponseStreamObserver.onNext(buildSingleBlockNotAvailableResponse()); + } catch (ParseException e) { + LOGGER.log(ERROR, "Error parsing block number: {0}", blockNumber); + singleBlockResponseStreamObserver.onNext(buildSingleBlockNotAvailableResponse()); + } + } else { + LOGGER.log(ERROR, "Unary singleBlock gRPC method is not currently running"); + singleBlockResponseStreamObserver.onNext(buildSingleBlockNotAvailableResponse()); + } + + // Send the response + singleBlockResponseStreamObserver.onCompleted(); + } + + @NonNull + static com.hedera.hapi.block.protoc.SingleBlockResponse buildSingleBlockNotAvailableResponse() { + final com.hedera.hapi.block.SingleBlockResponse response = + com.hedera.hapi.block.SingleBlockResponse.newBuilder() + .status(SingleBlockResponseCode.READ_BLOCK_NOT_AVAILABLE) + .build(); + + return fromPbj(response); + } + + @NonNull + static com.hedera.hapi.block.protoc.SingleBlockResponse buildSingleBlockNotFoundResponse() + throws InvalidProtocolBufferException { + final com.hedera.hapi.block.SingleBlockResponse response = + com.hedera.hapi.block.SingleBlockResponse.newBuilder() + .status(SingleBlockResponseCode.READ_BLOCK_NOT_FOUND) + .build(); + + return fromPbj(response); + } + + @NonNull + static com.hedera.hapi.block.protoc.SingleBlockResponse fromPbjSingleBlockSuccessResponse( + @NonNull final Block block) { + final com.hedera.hapi.block.SingleBlockResponse singleBlockResponse = + com.hedera.hapi.block.SingleBlockResponse.newBuilder() + .status(SingleBlockResponseCode.READ_BLOCK_SUCCESS) + .block(block) + .build(); + + return fromPbj(singleBlockResponse); + } +} diff --git a/server/src/main/java/com/hedera/block/server/BlockStreamService.java b/server/src/main/java/com/hedera/block/server/grpc/BlockStreamService.java similarity index 62% rename from server/src/main/java/com/hedera/block/server/BlockStreamService.java rename to server/src/main/java/com/hedera/block/server/grpc/BlockStreamService.java index dbd8462aa..a7295c6e0 100644 --- a/server/src/main/java/com/hedera/block/server/BlockStreamService.java +++ b/server/src/main/java/com/hedera/block/server/grpc/BlockStreamService.java @@ -14,22 +14,17 @@ * limitations under the License. */ -package com.hedera.block.server; +package com.hedera.block.server.grpc; import static com.hedera.block.server.Constants.CLIENT_STREAMING_METHOD_NAME; import static com.hedera.block.server.Constants.SERVER_STREAMING_METHOD_NAME; import static com.hedera.block.server.Constants.SERVICE_NAME_BLOCK_STREAM; -import static com.hedera.block.server.Constants.SINGLE_BLOCK_METHOD_NAME; import static com.hedera.block.server.Translator.fromPbj; -import static com.hedera.block.server.Translator.toPbj; -import static com.hedera.block.server.metrics.BlockNodeMetricTypes.Counter.SingleBlocksNotFound; -import static com.hedera.block.server.metrics.BlockNodeMetricTypes.Counter.SingleBlocksRetrieved; import static java.lang.System.Logger; import static java.lang.System.Logger.Level.DEBUG; import static java.lang.System.Logger.Level.ERROR; import com.google.protobuf.Descriptors; -import com.google.protobuf.InvalidProtocolBufferException; import com.hedera.block.server.config.BlockNodeContext; import com.hedera.block.server.consumer.ConsumerStreamResponseObserver; import com.hedera.block.server.events.BlockNodeEventHandler; @@ -40,26 +35,19 @@ import com.hedera.block.server.persistence.storage.read.BlockReader; import com.hedera.block.server.producer.ProducerBlockItemObserver; import com.hedera.block.server.service.ServiceStatus; -import com.hedera.hapi.block.SingleBlockRequest; -import com.hedera.hapi.block.SingleBlockResponse; -import com.hedera.hapi.block.SingleBlockResponseCode; import com.hedera.hapi.block.SubscribeStreamResponse; import com.hedera.hapi.block.SubscribeStreamResponseCode; import com.hedera.hapi.block.protoc.BlockService; import com.hedera.hapi.block.stream.Block; -import com.hedera.pbj.runtime.ParseException; import edu.umd.cs.findbugs.annotations.NonNull; import io.grpc.stub.StreamObserver; import io.helidon.webserver.grpc.GrpcService; -import java.io.IOException; import java.time.Clock; -import java.util.Optional; import javax.inject.Inject; /** * The BlockStreamService class defines the gRPC service for the block stream service. It provides - * the implementation for the bidirectional streaming, server streaming, and unary methods defined - * in the proto file. + * the implementation for the bidirectional streaming, server streaming as defined in the proto file. */ public class BlockStreamService implements GrpcService { @@ -140,7 +128,6 @@ public String serviceName() { public void update(@NonNull final Routing routing) { routing.bidi(CLIENT_STREAMING_METHOD_NAME, this::protocPublishBlockStream); routing.serverStream(SERVER_STREAMING_METHOD_NAME, this::protocSubscribeBlockStream); - routing.unary(SINGLE_BLOCK_METHOD_NAME, this::protocSingleBlock); } StreamObserver protocPublishBlockStream( @@ -198,62 +185,6 @@ void protocSubscribeBlockStream( } } - void protocSingleBlock( - @NonNull final com.hedera.hapi.block.protoc.SingleBlockRequest singleBlockRequest, - @NonNull - final StreamObserver - singleBlockResponseStreamObserver) { - LOGGER.log(DEBUG, "Executing Unary singleBlock gRPC method"); - - try { - final SingleBlockRequest pbjSingleBlockRequest = - toPbj(SingleBlockRequest.PROTOBUF, singleBlockRequest.toByteArray()); - singleBlock(pbjSingleBlockRequest, singleBlockResponseStreamObserver); - } catch (ParseException e) { - LOGGER.log(ERROR, "Error parsing protoc SingleBlockRequest: {0}", singleBlockRequest); - singleBlockResponseStreamObserver.onNext(buildSingleBlockNotAvailableResponse()); - } - } - - private void singleBlock( - @NonNull final SingleBlockRequest singleBlockRequest, - @NonNull - final StreamObserver - singleBlockResponseStreamObserver) { - - LOGGER.log(DEBUG, "Executing Unary singleBlock gRPC method"); - - if (serviceStatus.isRunning()) { - final long blockNumber = singleBlockRequest.blockNumber(); - try { - final Optional blockOpt = blockReader.read(blockNumber); - if (blockOpt.isPresent()) { - LOGGER.log(DEBUG, "Successfully returning block number: {0}", blockNumber); - singleBlockResponseStreamObserver.onNext( - fromPbjSingleBlockSuccessResponse(blockOpt.get())); - - metricsService.get(SingleBlocksRetrieved).increment(); - } else { - LOGGER.log(DEBUG, "Block number {0} not found", blockNumber); - singleBlockResponseStreamObserver.onNext(buildSingleBlockNotFoundResponse()); - metricsService.get(SingleBlocksNotFound).increment(); - } - } catch (IOException e) { - LOGGER.log(ERROR, "Error reading block number: {0}", blockNumber); - singleBlockResponseStreamObserver.onNext(buildSingleBlockNotAvailableResponse()); - } catch (ParseException e) { - LOGGER.log(ERROR, "Error parsing block number: {0}", blockNumber); - singleBlockResponseStreamObserver.onNext(buildSingleBlockNotAvailableResponse()); - } - } else { - LOGGER.log(ERROR, "Unary singleBlock gRPC method is not currently running"); - singleBlockResponseStreamObserver.onNext(buildSingleBlockNotAvailableResponse()); - } - - // Send the response - singleBlockResponseStreamObserver.onCompleted(); - } - // TODO: Fix this error type once it's been standardized in `hedera-protobufs` // this should not be success @NonNull @@ -266,37 +197,4 @@ private void singleBlock( return fromPbj(response); } - - @NonNull - static com.hedera.hapi.block.protoc.SingleBlockResponse buildSingleBlockNotAvailableResponse() { - final SingleBlockResponse response = - SingleBlockResponse.newBuilder() - .status(SingleBlockResponseCode.READ_BLOCK_NOT_AVAILABLE) - .build(); - - return fromPbj(response); - } - - @NonNull - static com.hedera.hapi.block.protoc.SingleBlockResponse buildSingleBlockNotFoundResponse() - throws InvalidProtocolBufferException { - final SingleBlockResponse response = - SingleBlockResponse.newBuilder() - .status(SingleBlockResponseCode.READ_BLOCK_NOT_FOUND) - .build(); - - return fromPbj(response); - } - - @NonNull - static com.hedera.hapi.block.protoc.SingleBlockResponse fromPbjSingleBlockSuccessResponse( - @NonNull final Block block) { - final SingleBlockResponse singleBlockResponse = - SingleBlockResponse.newBuilder() - .status(SingleBlockResponseCode.READ_BLOCK_SUCCESS) - .block(block) - .build(); - - return fromPbj(singleBlockResponse); - } } diff --git a/server/src/main/java/module-info.java b/server/src/main/java/module-info.java index ccd73557f..579ce946b 100644 --- a/server/src/main/java/module-info.java +++ b/server/src/main/java/module-info.java @@ -17,6 +17,7 @@ exports com.hedera.block.server.persistence; exports com.hedera.block.server.notifier; exports com.hedera.block.server.service; + exports com.hedera.block.server.grpc; requires com.hedera.block.stream; requires com.google.protobuf; diff --git a/server/src/test/java/com/hedera/block/server/BlockNodeAppTest.java b/server/src/test/java/com/hedera/block/server/BlockNodeAppTest.java index f12a1bc27..41e0a3a85 100644 --- a/server/src/test/java/com/hedera/block/server/BlockNodeAppTest.java +++ b/server/src/test/java/com/hedera/block/server/BlockNodeAppTest.java @@ -20,6 +20,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.hedera.block.server.grpc.BlockStreamService; import com.hedera.block.server.health.HealthService; import com.hedera.block.server.service.ServiceStatus; import io.helidon.webserver.WebServer; diff --git a/server/src/test/java/com/hedera/block/server/grpc/BlockAccessServiceTest.java b/server/src/test/java/com/hedera/block/server/grpc/BlockAccessServiceTest.java new file mode 100644 index 000000000..2ff8ace12 --- /dev/null +++ b/server/src/test/java/com/hedera/block/server/grpc/BlockAccessServiceTest.java @@ -0,0 +1,250 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.server.grpc; + +import static com.hedera.block.server.Constants.SINGLE_BLOCK_METHOD_NAME; +import static com.hedera.block.server.grpc.BlockAccessService.buildSingleBlockNotAvailableResponse; +import static com.hedera.block.server.grpc.BlockAccessService.buildSingleBlockNotFoundResponse; +import static com.hedera.block.server.grpc.BlockAccessService.fromPbjSingleBlockSuccessResponse; +import static com.hedera.block.server.util.PersistTestUtils.generateBlockItems; +import static java.lang.System.Logger.Level.INFO; +import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.hedera.block.server.config.BlockNodeContext; +import com.hedera.block.server.mediator.LiveStreamMediator; +import com.hedera.block.server.notifier.Notifier; +import com.hedera.block.server.persistence.storage.PersistenceStorageConfig; +import com.hedera.block.server.persistence.storage.read.BlockAsDirReaderBuilder; +import com.hedera.block.server.persistence.storage.read.BlockReader; +import com.hedera.block.server.persistence.storage.write.BlockAsDirWriterBuilder; +import com.hedera.block.server.persistence.storage.write.BlockWriter; +import com.hedera.block.server.service.ServiceStatus; +import com.hedera.block.server.util.TestConfigUtil; +import com.hedera.block.server.util.TestUtils; +import com.hedera.hapi.block.protoc.SingleBlockResponse; +import com.hedera.hapi.block.stream.Block; +import com.hedera.hapi.block.stream.BlockItem; +import com.hedera.pbj.runtime.ParseException; +import io.grpc.stub.ServerCalls; +import io.grpc.stub.StreamObserver; +import io.helidon.webserver.grpc.GrpcService; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class BlockAccessServiceTest { + + @Mock private Notifier notifier; + + @Mock private StreamObserver responseObserver; + + @Mock private LiveStreamMediator streamMediator; + + @Mock private BlockReader blockReader; + + @Mock private BlockWriter> blockWriter; + + @Mock private ServiceStatus serviceStatus; + + private final System.Logger LOGGER = System.getLogger(getClass().getName()); + + private static final String TEMP_DIR = "block-node-unit-test-dir"; + + private static final int testTimeout = 1000; + + private Path testPath; + private BlockNodeContext blockNodeContext; + private PersistenceStorageConfig config; + + @BeforeEach + public void setUp() throws IOException { + testPath = Files.createTempDirectory(TEMP_DIR); + LOGGER.log(INFO, "Created temp directory: " + testPath.toString()); + + blockNodeContext = + TestConfigUtil.getTestBlockNodeContext( + Map.of("persistence.storage.rootPath", testPath.toString())); + config = blockNodeContext.configuration().getConfigData(PersistenceStorageConfig.class); + } + + @AfterEach + public void tearDown() { + TestUtils.deleteDirectory(testPath.toFile()); + } + + @Test + void testSingleBlockHappyPath() throws IOException, ParseException { + + final BlockReader blockReader = BlockAsDirReaderBuilder.newBuilder(config).build(); + + final BlockAccessService blockAccessService = + new BlockAccessService( + serviceStatus, blockReader, blockNodeContext.metricsService()); + + // Enable the serviceStatus + when(serviceStatus.isRunning()).thenReturn(true); + + // Generate and persist a block + final BlockWriter> blockWriter = + BlockAsDirWriterBuilder.newBuilder(blockNodeContext).build(); + final List blockItems = generateBlockItems(1); + blockWriter.write(blockItems); + + // Get the block so we can verify the response payload + final Optional blockOpt = blockReader.read(1); + if (blockOpt.isEmpty()) { + fail("Block 1 should be present"); + return; + } + + // Build a response to verify what's passed to the response observer + final com.hedera.hapi.block.protoc.SingleBlockResponse expectedSingleBlockResponse = + fromPbjSingleBlockSuccessResponse(blockOpt.get()); + + // Build a request to invoke the service + final com.hedera.hapi.block.protoc.SingleBlockRequest singleBlockRequest = + com.hedera.hapi.block.protoc.SingleBlockRequest.newBuilder() + .setBlockNumber(1) + .build(); + + // Call the service + blockAccessService.protocSingleBlock(singleBlockRequest, responseObserver); + verify(responseObserver, times(1)).onNext(expectedSingleBlockResponse); + } + + @Test + void testSingleBlockNotFoundPath() throws IOException, ParseException { + + // Get the block so we can verify the response payload + when(blockReader.read(1)).thenReturn(Optional.empty()); + + // Build a response to verify what's passed to the response observer + final com.hedera.hapi.block.protoc.SingleBlockResponse expectedNotFound = + buildSingleBlockNotFoundResponse(); + + // Build a request to invoke the service + final com.hedera.hapi.block.protoc.SingleBlockRequest singleBlockRequest = + com.hedera.hapi.block.protoc.SingleBlockRequest.newBuilder() + .setBlockNumber(1) + .build(); + + final BlockAccessService blockAccessService = + new BlockAccessService( + serviceStatus, blockReader, blockNodeContext.metricsService()); + + // Enable the serviceStatus + when(serviceStatus.isRunning()).thenReturn(true); + + blockAccessService.protocSingleBlock(singleBlockRequest, responseObserver); + verify(responseObserver, times(1)).onNext(expectedNotFound); + } + + @Test + void testSingleBlockServiceNotAvailable() { + + final BlockAccessService blockAccessService = + new BlockAccessService( + serviceStatus, blockReader, blockNodeContext.metricsService()); + + // Set the service status to not running + when(serviceStatus.isRunning()).thenReturn(false); + + final com.hedera.hapi.block.protoc.SingleBlockResponse expectedNotAvailable = + buildSingleBlockNotAvailableResponse(); + + // Build a request to invoke the service + final com.hedera.hapi.block.protoc.SingleBlockRequest singleBlockRequest = + com.hedera.hapi.block.protoc.SingleBlockRequest.newBuilder() + .setBlockNumber(1) + .build(); + blockAccessService.protocSingleBlock(singleBlockRequest, responseObserver); + verify(responseObserver, times(1)).onNext(expectedNotAvailable); + } + + @Test + public void testSingleBlockIOExceptionPath() throws IOException, ParseException { + final BlockAccessService blockAccessService = + new BlockAccessService( + serviceStatus, blockReader, blockNodeContext.metricsService()); + + when(serviceStatus.isRunning()).thenReturn(true); + when(blockReader.read(1)).thenThrow(new IOException("Test exception")); + + final com.hedera.hapi.block.protoc.SingleBlockResponse expectedNotAvailable = + buildSingleBlockNotAvailableResponse(); + + // Build a request to invoke the service + final com.hedera.hapi.block.protoc.SingleBlockRequest singleBlockRequest = + com.hedera.hapi.block.protoc.SingleBlockRequest.newBuilder() + .setBlockNumber(1) + .build(); + blockAccessService.protocSingleBlock(singleBlockRequest, responseObserver); + verify(responseObserver, times(1)).onNext(expectedNotAvailable); + } + + @Test + public void testSingleBlockParseExceptionPath() throws IOException, ParseException { + final BlockAccessService blockAccessService = + new BlockAccessService( + serviceStatus, blockReader, blockNodeContext.metricsService()); + + when(serviceStatus.isRunning()).thenReturn(true); + when(blockReader.read(1)).thenThrow(new ParseException("Test exception")); + + final com.hedera.hapi.block.protoc.SingleBlockResponse expectedNotAvailable = + buildSingleBlockNotAvailableResponse(); + + // Build a request to invoke the service + final com.hedera.hapi.block.protoc.SingleBlockRequest singleBlockRequest = + com.hedera.hapi.block.protoc.SingleBlockRequest.newBuilder() + .setBlockNumber(1) + .build(); + blockAccessService.protocSingleBlock(singleBlockRequest, responseObserver); + verify(responseObserver, times(1)).onNext(expectedNotAvailable); + } + + @Test + public void testUpdateInvokesRoutingWithLambdas() { + + final BlockAccessService blockAccessService = + new BlockAccessService( + serviceStatus, blockReader, blockNodeContext.metricsService()); + + GrpcService.Routing routing = mock(GrpcService.Routing.class); + blockAccessService.update(routing); + + verify(routing, timeout(testTimeout).times(1)) + .unary(eq(SINGLE_BLOCK_METHOD_NAME), any(ServerCalls.UnaryMethod.class)); + } +} diff --git a/server/src/test/java/com/hedera/block/server/BlockStreamServiceIntegrationTest.java b/server/src/test/java/com/hedera/block/server/grpc/BlockStreamServiceIntegrationTest.java similarity index 99% rename from server/src/test/java/com/hedera/block/server/BlockStreamServiceIntegrationTest.java rename to server/src/test/java/com/hedera/block/server/grpc/BlockStreamServiceIntegrationTest.java index 3665c4ea6..0ebd46c9c 100644 --- a/server/src/test/java/com/hedera/block/server/BlockStreamServiceIntegrationTest.java +++ b/server/src/test/java/com/hedera/block/server/grpc/BlockStreamServiceIntegrationTest.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.hedera.block.server; +package com.hedera.block.server.grpc; import static com.hedera.block.server.Translator.fromPbj; import static com.hedera.block.server.metrics.BlockNodeMetricTypes.Counter.LiveBlockItems; @@ -578,6 +578,10 @@ public void testMediatorExceptionHandlingWhenPersistenceFailure() throws IOExcep notifier, blockNodeContext); + final BlockAccessService blockAccessService = + new BlockAccessService( + serviceStatus, blockReader, blockNodeContext.metricsService()); + // Subscribe the consumers blockStreamService.protocSubscribeBlockStream( subscribeStreamRequest, subscribeStreamObserver1); @@ -616,7 +620,7 @@ public void testMediatorExceptionHandlingWhenPersistenceFailure() throws IOExcep .build(); // Simulate a consumer attempting to connect to the Block Node after the exception. - blockStreamService.protocSingleBlock(singleBlockRequest, singleBlockResponseStreamObserver); + blockAccessService.protocSingleBlock(singleBlockRequest, singleBlockResponseStreamObserver); // Build a request to invoke the subscribeBlockStream service final SubscribeStreamRequest subscribeStreamRequest = diff --git a/server/src/test/java/com/hedera/block/server/BlockStreamServiceTest.java b/server/src/test/java/com/hedera/block/server/grpc/BlockStreamServiceTest.java similarity index 50% rename from server/src/test/java/com/hedera/block/server/BlockStreamServiceTest.java rename to server/src/test/java/com/hedera/block/server/grpc/BlockStreamServiceTest.java index 458b3d0d6..f3becede7 100644 --- a/server/src/test/java/com/hedera/block/server/BlockStreamServiceTest.java +++ b/server/src/test/java/com/hedera/block/server/grpc/BlockStreamServiceTest.java @@ -14,21 +14,15 @@ * limitations under the License. */ -package com.hedera.block.server; +package com.hedera.block.server.grpc; -import static com.hedera.block.server.BlockStreamService.buildSingleBlockNotAvailableResponse; -import static com.hedera.block.server.BlockStreamService.buildSingleBlockNotFoundResponse; -import static com.hedera.block.server.BlockStreamService.fromPbjSingleBlockSuccessResponse; import static com.hedera.block.server.Constants.CLIENT_STREAMING_METHOD_NAME; import static com.hedera.block.server.Constants.SERVER_STREAMING_METHOD_NAME; -import static com.hedera.block.server.Constants.SINGLE_BLOCK_METHOD_NAME; import static com.hedera.block.server.Translator.fromPbj; -import static com.hedera.block.server.util.PersistTestUtils.generateBlockItems; import static com.hedera.block.server.util.PersistTestUtils.reverseByteArray; import static java.lang.System.Logger; import static java.lang.System.Logger.Level.INFO; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; @@ -39,14 +33,13 @@ import static org.mockito.Mockito.when; import com.google.protobuf.Descriptors; +import com.hedera.block.server.Constants; import com.hedera.block.server.config.BlockNodeContext; import com.hedera.block.server.mediator.LiveStreamMediator; import com.hedera.block.server.notifier.Notifier; import com.hedera.block.server.persistence.StreamPersistenceHandlerImpl; import com.hedera.block.server.persistence.storage.PersistenceStorageConfig; -import com.hedera.block.server.persistence.storage.read.BlockAsDirReaderBuilder; import com.hedera.block.server.persistence.storage.read.BlockReader; -import com.hedera.block.server.persistence.storage.write.BlockAsDirWriterBuilder; import com.hedera.block.server.persistence.storage.write.BlockWriter; import com.hedera.block.server.service.ServiceStatus; import com.hedera.block.server.util.TestConfigUtil; @@ -55,7 +48,6 @@ import com.hedera.hapi.block.SingleBlockResponseCode; import com.hedera.hapi.block.stream.Block; import com.hedera.hapi.block.stream.BlockItem; -import com.hedera.pbj.runtime.ParseException; import io.grpc.stub.ServerCalls; import io.grpc.stub.StreamObserver; import io.helidon.webserver.grpc.GrpcService; @@ -64,7 +56,6 @@ import java.nio.file.Path; import java.util.List; import java.util.Map; -import java.util.Optional; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -131,7 +122,8 @@ public void testServiceName() { blockNodeContext); // Verify the service name - assertEquals(Constants.SERVICE_NAME_BLOCK_STREAM, blockStreamService.serviceName()); + Assertions.assertEquals( + Constants.SERVICE_NAME_BLOCK_STREAM, blockStreamService.serviceName()); // Verify other methods not invoked verify(streamMediator, timeout(testTimeout).times(0)).publish(any()); @@ -186,177 +178,6 @@ public void testProto() { .publish(Mockito.>any()); } - @Test - void testSingleBlockHappyPath() throws IOException, ParseException { - - final var blockNodeEventHandler = - new StreamPersistenceHandlerImpl( - streamMediator, notifier, blockWriter, blockNodeContext, serviceStatus); - final BlockReader blockReader = BlockAsDirReaderBuilder.newBuilder(config).build(); - final BlockStreamService blockStreamService = - new BlockStreamService( - streamMediator, - blockReader, - serviceStatus, - blockNodeEventHandler, - notifier, - blockNodeContext); - - // Enable the serviceStatus - when(serviceStatus.isRunning()).thenReturn(true); - - // Generate and persist a block - final BlockWriter> blockWriter = - BlockAsDirWriterBuilder.newBuilder(blockNodeContext).build(); - final List blockItems = generateBlockItems(1); - blockWriter.write(blockItems); - - // Get the block so we can verify the response payload - final Optional blockOpt = blockReader.read(1); - if (blockOpt.isEmpty()) { - fail("Block 1 should be present"); - return; - } - - // Build a response to verify what's passed to the response observer - final com.hedera.hapi.block.protoc.SingleBlockResponse expectedSingleBlockResponse = - fromPbjSingleBlockSuccessResponse(blockOpt.get()); - - // Build a request to invoke the service - final com.hedera.hapi.block.protoc.SingleBlockRequest singleBlockRequest = - com.hedera.hapi.block.protoc.SingleBlockRequest.newBuilder() - .setBlockNumber(1) - .build(); - - // Call the service - blockStreamService.protocSingleBlock(singleBlockRequest, responseObserver); - verify(responseObserver, times(1)).onNext(expectedSingleBlockResponse); - } - - @Test - void testSingleBlockNotFoundPath() throws IOException, ParseException { - - // Get the block so we can verify the response payload - when(blockReader.read(1)).thenReturn(Optional.empty()); - - // Build a response to verify what's passed to the response observer - final com.hedera.hapi.block.protoc.SingleBlockResponse expectedNotFound = - buildSingleBlockNotFoundResponse(); - - // Build a request to invoke the service - final com.hedera.hapi.block.protoc.SingleBlockRequest singleBlockRequest = - com.hedera.hapi.block.protoc.SingleBlockRequest.newBuilder() - .setBlockNumber(1) - .build(); - - // Call the service - final var blockNodeEventHandler = - new StreamPersistenceHandlerImpl( - streamMediator, notifier, blockWriter, blockNodeContext, serviceStatus); - final BlockStreamService blockStreamService = - new BlockStreamService( - streamMediator, - blockReader, - serviceStatus, - blockNodeEventHandler, - notifier, - blockNodeContext); - - // Enable the serviceStatus - when(serviceStatus.isRunning()).thenReturn(true); - - blockStreamService.protocSingleBlock(singleBlockRequest, responseObserver); - verify(responseObserver, times(1)).onNext(expectedNotFound); - } - - @Test - void testSingleBlockServiceNotAvailable() { - - final var blockNodeEventHandler = - new StreamPersistenceHandlerImpl( - streamMediator, notifier, blockWriter, blockNodeContext, serviceStatus); - final BlockStreamService blockStreamService = - new BlockStreamService( - streamMediator, - blockReader, - serviceStatus, - blockNodeEventHandler, - notifier, - blockNodeContext); - - // Set the service status to not running - when(serviceStatus.isRunning()).thenReturn(false); - - final com.hedera.hapi.block.protoc.SingleBlockResponse expectedNotAvailable = - buildSingleBlockNotAvailableResponse(); - - // Build a request to invoke the service - final com.hedera.hapi.block.protoc.SingleBlockRequest singleBlockRequest = - com.hedera.hapi.block.protoc.SingleBlockRequest.newBuilder() - .setBlockNumber(1) - .build(); - blockStreamService.protocSingleBlock(singleBlockRequest, responseObserver); - verify(responseObserver, times(1)).onNext(expectedNotAvailable); - } - - @Test - public void testSingleBlockIOExceptionPath() throws IOException, ParseException { - final var blockNodeEventHandler = - new StreamPersistenceHandlerImpl( - streamMediator, notifier, blockWriter, blockNodeContext, serviceStatus); - final BlockStreamService blockStreamService = - new BlockStreamService( - streamMediator, - blockReader, - serviceStatus, - blockNodeEventHandler, - notifier, - blockNodeContext); - - when(serviceStatus.isRunning()).thenReturn(true); - when(blockReader.read(1)).thenThrow(new IOException("Test exception")); - - final com.hedera.hapi.block.protoc.SingleBlockResponse expectedNotAvailable = - buildSingleBlockNotAvailableResponse(); - - // Build a request to invoke the service - final com.hedera.hapi.block.protoc.SingleBlockRequest singleBlockRequest = - com.hedera.hapi.block.protoc.SingleBlockRequest.newBuilder() - .setBlockNumber(1) - .build(); - blockStreamService.protocSingleBlock(singleBlockRequest, responseObserver); - verify(responseObserver, times(1)).onNext(expectedNotAvailable); - } - - @Test - public void testSingleBlockParseExceptionPath() throws IOException, ParseException { - final var blockNodeEventHandler = - new StreamPersistenceHandlerImpl( - streamMediator, notifier, blockWriter, blockNodeContext, serviceStatus); - final BlockStreamService blockStreamService = - new BlockStreamService( - streamMediator, - blockReader, - serviceStatus, - blockNodeEventHandler, - notifier, - blockNodeContext); - - when(serviceStatus.isRunning()).thenReturn(true); - when(blockReader.read(1)).thenThrow(new ParseException("Test exception")); - - final com.hedera.hapi.block.protoc.SingleBlockResponse expectedNotAvailable = - buildSingleBlockNotAvailableResponse(); - - // Build a request to invoke the service - final com.hedera.hapi.block.protoc.SingleBlockRequest singleBlockRequest = - com.hedera.hapi.block.protoc.SingleBlockRequest.newBuilder() - .setBlockNumber(1) - .build(); - blockStreamService.protocSingleBlock(singleBlockRequest, responseObserver); - verify(responseObserver, times(1)).onNext(expectedNotAvailable); - } - @Test public void testUpdateInvokesRoutingWithLambdas() { @@ -371,8 +192,6 @@ public void testUpdateInvokesRoutingWithLambdas() { .serverStream( eq(SERVER_STREAMING_METHOD_NAME), any(ServerCalls.ServerStreamingMethod.class)); - verify(routing, timeout(testTimeout).times(1)) - .unary(eq(SINGLE_BLOCK_METHOD_NAME), any(ServerCalls.UnaryMethod.class)); } private BlockStreamService getBlockStreamService() { @@ -393,18 +212,9 @@ private BlockStreamService getBlockStreamService() { @Test public void testProtocParseExceptionHandling() { // TODO: We might be able to remove this test once we can remove the Translator class - - final var blockNodeEventHandler = - new StreamPersistenceHandlerImpl( - streamMediator, notifier, blockWriter, blockNodeContext, serviceStatus); - final BlockStreamService blockStreamService = - new BlockStreamService( - streamMediator, - blockReader, - serviceStatus, - blockNodeEventHandler, - notifier, - blockNodeContext); + final BlockAccessService blockAccessService = + new BlockAccessService( + serviceStatus, blockReader, blockNodeContext.metricsService()); // Build a request to invoke the service final com.hedera.hapi.block.protoc.SingleBlockRequest singleBlockRequest = @@ -422,7 +232,7 @@ public void testProtocParseExceptionHandling() { .status(SingleBlockResponseCode.READ_BLOCK_NOT_AVAILABLE) .build(); // Call the service - blockStreamService.protocSingleBlock(singleBlockRequest, responseObserver); + blockAccessService.protocSingleBlock(singleBlockRequest, responseObserver); verify(responseObserver, times(1)).onNext(fromPbj(expectedSingleBlockErrorResponse)); } } diff --git a/server/src/test/java/com/hedera/block/server/notifier/NotifierImplTest.java b/server/src/test/java/com/hedera/block/server/notifier/NotifierImplTest.java index d56df4c91..a4c926180 100644 --- a/server/src/test/java/com/hedera/block/server/notifier/NotifierImplTest.java +++ b/server/src/test/java/com/hedera/block/server/notifier/NotifierImplTest.java @@ -16,8 +16,8 @@ package com.hedera.block.server.notifier; -import static com.hedera.block.server.BlockStreamServiceIntegrationTest.buildAck; import static com.hedera.block.server.Translator.fromPbj; +import static com.hedera.block.server.grpc.BlockStreamServiceIntegrationTest.buildAck; import static com.hedera.block.server.notifier.NotifierImpl.buildErrorStreamResponse; import static com.hedera.block.server.util.PersistTestUtils.generateBlockItems; import static org.junit.jupiter.api.Assertions.assertFalse;