Skip to content

Commit

Permalink
Fixing Unit Tests
Browse files Browse the repository at this point in the history
Signed-off-by: Alfredo Gutierrez <[email protected]>
  • Loading branch information
AlfredoG87 committed Oct 18, 2024
1 parent 48e4ba0 commit bee9d6d
Show file tree
Hide file tree
Showing 15 changed files with 241 additions and 156 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import static com.hedera.block.server.Constants.CLIENT_STREAMING_METHOD_NAME;
import static com.hedera.block.server.Constants.SERVER_STREAMING_METHOD_NAME;
import static com.hedera.block.server.Constants.SERVICE_NAME;
import static com.hedera.block.server.Constants.SERVICE_NAME_BLOCK_STREAM;
import static com.hedera.block.server.Constants.SINGLE_BLOCK_METHOD_NAME;
import static com.hedera.block.server.Translator.fromPbj;
import static com.hedera.block.server.Translator.toPbj;
Expand Down Expand Up @@ -126,7 +126,7 @@ public Descriptors.FileDescriptor proto() {
@NonNull
@Override
public String serviceName() {
return SERVICE_NAME;
return SERVICE_NAME_BLOCK_STREAM;
}

/**
Expand Down
4 changes: 3 additions & 1 deletion server/src/main/java/com/hedera/block/server/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ private Constants() {}
@NonNull public static final String LOGGING_PROPERTIES = "logging.properties";

/** Constant mapped to the name of the service in the .proto file */
@NonNull public static final String SERVICE_NAME = "BlockStreamService";
@NonNull public static final String SERVICE_NAME_BLOCK_STREAM = "BlockStreamService";

@NonNull public static final String SERVICE_NAME_BLOCK_ACCESS = "BlockAccessService";

/** Constant mapped to the publishBlockStream service method name in the .proto file */
@NonNull public static final String CLIENT_STREAMING_METHOD_NAME = "publishBlockStream";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,28 +106,28 @@ public Optional<List<BlockItem>> write(@NonNull final List<BlockItem> blockItems
resetState(blockItems.getFirst());
}

final Path blockItemFilePath = calculateBlockItemPath();
for (int retries = 0; ; retries++) {
try {
for (BlockItem blockItem : blockItems) {
for (BlockItem blockItem : blockItems) {
final Path blockItemFilePath = calculateBlockItemPath();
for (int retries = 0; ; retries++) {
try {
write(blockItemFilePath, blockItem);
}
break;
} catch (IOException e) {

LOGGER.log(ERROR, "Error writing the BlockItem protobuf to a file: ", e);

// Remove the block if repairing the permissions fails
if (retries > 0) {
// Attempt to remove the block
blockRemover.remove(Long.parseLong(currentBlockDir.toString()));
throw e;
} else {
// Attempt to repair the permissions on the block path
// and the blockItem path
repairPermissions(blockNodeRootPath);
repairPermissions(calculateBlockPath());
LOGGER.log(INFO, "Retrying to write the BlockItem protobuf to a file");
break;
} catch (IOException e) {

LOGGER.log(ERROR, "Error writing the BlockItem protobuf to a file: ", e);

// Remove the block if repairing the permissions fails
if (retries > 0) {
// Attempt to remove the block
blockRemover.remove(Long.parseLong(currentBlockDir.toString()));
throw e;
} else {
// Attempt to repair the permissions on the block path
// and the blockItem path
repairPermissions(blockNodeRootPath);
repairPermissions(calculateBlockPath());
LOGGER.log(INFO, "Retrying to write the BlockItem protobuf to a file");
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import com.hedera.hapi.block.SubscribeStreamRequest;
import com.hedera.hapi.block.SubscribeStreamResponse;
import com.hedera.hapi.block.SubscribeStreamResponseCode;
import com.hedera.hapi.block.SubscribeStreamResponseSet;
import com.hedera.hapi.block.stream.Block;
import com.hedera.hapi.block.stream.BlockItem;
import com.hedera.pbj.runtime.io.buffer.Bytes;
Expand Down Expand Up @@ -133,7 +134,7 @@ public class BlockStreamServiceIntegrationTest {
@Mock private WebServer webServer;

@Mock private BlockReader<Block> blockReader;
@Mock private BlockWriter<BlockItem> blockWriter;
@Mock private BlockWriter<List<BlockItem>> blockWriter;

private static final String TEMP_DIR = "block-node-unit-test-dir";

Expand Down Expand Up @@ -196,16 +197,16 @@ public void testPublishBlockStreamRegistrationAndExecution()
List<BlockItem> blockItems = generateBlockItems(1);
for (int i = 0; i < blockItems.size(); i++) {
if (i == 9) {
when(blockWriter.write(blockItems.get(i)))
.thenReturn(Optional.of(blockItems.get(i)));
when(blockWriter.write(List.of(blockItems.get(i))))
.thenReturn(Optional.of(List.of(blockItems.get(i))));
} else {
when(blockWriter.write(blockItems.get(i))).thenReturn(Optional.empty());
when(blockWriter.write(List.of(blockItems.get(i)))).thenReturn(Optional.empty());
}
}

for (BlockItem blockItem : blockItems) {
final PublishStreamRequest publishStreamRequest =
PublishStreamRequest.newBuilder().blockItem(blockItem).build();
PublishStreamRequest.newBuilder().blockItems(blockItem).build();

// Calling onNext() as Helidon does with each block item for
// the first producer.
Expand Down Expand Up @@ -235,7 +236,7 @@ public void testPublishBlockStreamRegistrationAndExecution()
.onNext(fromPbj(buildSubscribeStreamResponse(blockItems.get(9))));

// Only 1 response is expected per block sent
final Acknowledgement itemAck = buildAck(blockItems.get(9));
final Acknowledgement itemAck = buildAck(List.of(blockItems.get(9)));
final PublishStreamResponse publishStreamResponse =
PublishStreamResponse.newBuilder().acknowledgement(itemAck).build();

Expand Down Expand Up @@ -295,18 +296,24 @@ public void testSubscribeBlockStream() throws IOException {
// Build the BlockItem
final List<BlockItem> blockItems = generateBlockItems(1);
final PublishStreamRequest publishStreamRequest =
PublishStreamRequest.newBuilder().blockItem(blockItems.getFirst()).build();
PublishStreamRequest.newBuilder()
.blockItems(List.of(blockItems.getFirst()))
.build();

// Calling onNext() with a BlockItem
streamObserver.onNext(fromPbj(publishStreamRequest));

// Verify the counter was incremented
assertEquals(1, blockNodeContext.metricsService().get(LiveBlockItems).get());

verify(blockWriter, timeout(testTimeout).times(1)).write(blockItems.getFirst());
verify(blockWriter, timeout(testTimeout).times(1)).write(List.of(blockItems.getFirst()));

final SubscribeStreamResponseSet subscribeStreamResponseSet =
SubscribeStreamResponseSet.newBuilder()
.blockItems(List.of(blockItems.getFirst()))
.build();
final SubscribeStreamResponse subscribeStreamResponse =
SubscribeStreamResponse.newBuilder().blockItem(blockItems.getFirst()).build();
SubscribeStreamResponse.newBuilder().blockItems(subscribeStreamResponseSet).build();

verify(subscribeStreamObserver1, timeout(testTimeout).times(1))
.onNext(fromPbj(subscribeStreamResponse));
Expand All @@ -320,7 +327,7 @@ public void testSubscribeBlockStream() throws IOException {
public void testFullHappyPath() throws IOException {
int numberOfBlocks = 100;

final BlockWriter<BlockItem> blockWriter =
final BlockWriter<List<BlockItem>> blockWriter =
BlockAsDirWriterBuilder.newBuilder(blockNodeContext).build();
final BlockStreamService blockStreamService = buildBlockStreamService(blockWriter);

Expand All @@ -338,7 +345,7 @@ public void testFullHappyPath() throws IOException {
final List<BlockItem> blockItems = generateBlockItems(numberOfBlocks);
for (BlockItem blockItem : blockItems) {
final PublishStreamRequest publishStreamRequest =
PublishStreamRequest.newBuilder().blockItem(blockItem).build();
PublishStreamRequest.newBuilder().blockItems(blockItem).build();
streamObserver.onNext(fromPbj(publishStreamRequest));
}

Expand Down Expand Up @@ -377,7 +384,7 @@ public void testFullWithSubscribersAddedDynamically() {

for (int i = 0; i < blockItems.size(); i++) {
final PublishStreamRequest publishStreamRequest =
PublishStreamRequest.newBuilder().blockItem(blockItems.get(i)).build();
PublishStreamRequest.newBuilder().blockItems(blockItems.get(i)).build();

// Add a new subscriber
if (i == 51) {
Expand Down Expand Up @@ -467,7 +474,7 @@ public void testSubAndUnsubWhileStreaming() throws InterruptedException {
streamObserver.onNext(
fromPbj(
PublishStreamRequest.newBuilder()
.blockItem(blockItems.get(i))
.blockItems(blockItems.get(i))
.build()));

// Remove 1st subscriber
Expand Down Expand Up @@ -553,9 +560,9 @@ public void testMediatorExceptionHandlingWhenPersistenceFailure() throws IOExcep
final List<BlockItem> blockItems = generateBlockItems(1);

// Use a spy to make sure the write() method throws an IOException
final BlockWriter<BlockItem> blockWriter =
final BlockWriter<List<BlockItem>> blockWriter =
spy(BlockAsDirWriterBuilder.newBuilder(blockNodeContext).build());
doThrow(IOException.class).when(blockWriter).write(blockItems.getFirst());
doThrow(IOException.class).when(blockWriter).write(blockItems);

final var streamMediator = buildStreamMediator(consumers, serviceStatus);
final var notifier = new NotifierImpl(streamMediator, blockNodeContext, serviceStatus);
Expand Down Expand Up @@ -585,7 +592,7 @@ public void testMediatorExceptionHandlingWhenPersistenceFailure() throws IOExcep

// Transmit a BlockItem
final PublishStreamRequest publishStreamRequest =
PublishStreamRequest.newBuilder().blockItem(blockItems.getFirst()).build();
PublishStreamRequest.newBuilder().blockItems(blockItems).build();
streamObserver.onNext(fromPbj(publishStreamRequest));

// Use verify to make sure the serviceStatus.stopRunning() method is called
Expand Down Expand Up @@ -620,8 +627,11 @@ public void testMediatorExceptionHandlingWhenPersistenceFailure() throws IOExcep

// The BlockItem expected to pass through since it was published
// before the IOException was thrown.
final SubscribeStreamResponseSet subscribeStreamResponseSet =
SubscribeStreamResponseSet.newBuilder().blockItems(blockItems).build();

final SubscribeStreamResponse subscribeStreamResponse =
SubscribeStreamResponse.newBuilder().blockItem(blockItems.getFirst()).build();
SubscribeStreamResponse.newBuilder().blockItems(subscribeStreamResponseSet).build();
verify(subscribeStreamObserver1, timeout(testTimeout).times(1))
.onNext(fromPbj(subscribeStreamResponse));
verify(subscribeStreamObserver2, timeout(testTimeout).times(1))
Expand Down Expand Up @@ -709,7 +719,9 @@ private static void verifySubscribeStreamResponse(
}

private static SubscribeStreamResponse buildSubscribeStreamResponse(BlockItem blockItem) {
return SubscribeStreamResponse.newBuilder().blockItem(blockItem).build();
final SubscribeStreamResponseSet subscribeStreamResponseSet =
SubscribeStreamResponseSet.newBuilder().blockItems(blockItem).build();
return SubscribeStreamResponse.newBuilder().blockItems(subscribeStreamResponseSet).build();
}

private static PublishStreamResponse buildEndOfStreamResponse() {
Expand All @@ -720,7 +732,8 @@ private static PublishStreamResponse buildEndOfStreamResponse() {
return PublishStreamResponse.newBuilder().status(endOfStream).build();
}

private BlockStreamService buildBlockStreamService(final BlockWriter<BlockItem> blockWriter) {
private BlockStreamService buildBlockStreamService(
final BlockWriter<List<BlockItem>> blockWriter) {

final ServiceStatus serviceStatus = new ServiceStatusImpl(blockNodeContext);
final var streamMediator = buildStreamMediator(new ConcurrentHashMap<>(32), serviceStatus);
Expand Down Expand Up @@ -752,11 +765,11 @@ private LiveStreamMediator buildStreamMediator(
.build();
}

public static Acknowledgement buildAck(@NonNull final BlockItem blockItem)
public static Acknowledgement buildAck(@NonNull final List<BlockItem> blockItems)
throws NoSuchAlgorithmException {
ItemAcknowledgement itemAck =
ItemAcknowledgement.newBuilder()
.itemHash(Bytes.wrap(getFakeHash(blockItem)))
.itemsHash(Bytes.wrap(getFakeHash(blockItems)))
.build();

return Acknowledgement.newBuilder().itemAck(itemAck).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,12 @@
import java.util.Map;
import java.util.Optional;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith(MockitoExtension.class)
Expand All @@ -83,7 +85,7 @@ public class BlockStreamServiceTest {

@Mock private BlockReader<Block> blockReader;

@Mock private BlockWriter<BlockItem> blockWriter;
@Mock private BlockWriter<List<BlockItem>> blockWriter;

@Mock private ServiceStatus serviceStatus;

Expand Down Expand Up @@ -129,10 +131,10 @@ public void testServiceName() {
blockNodeContext);

// Verify the service name
assertEquals(Constants.SERVICE_NAME, blockStreamService.serviceName());
assertEquals(Constants.SERVICE_NAME_BLOCK_STREAM, blockStreamService.serviceName());

// Verify other methods not invoked
verify(streamMediator, timeout(testTimeout).times(0)).publish(any(BlockItem.class));
verify(streamMediator, timeout(testTimeout).times(0)).publish(any());
}

@Test
Expand All @@ -151,11 +153,37 @@ public void testProto() {
blockNodeContext);
Descriptors.FileDescriptor fileDescriptor = blockStreamService.proto();

// Verify the current rpc methods
assertEquals(5, fileDescriptor.getServices().getFirst().getMethods().size());
// Verify the current rpc methods on
Descriptors.ServiceDescriptor blockStreamServiceDescriptor =
fileDescriptor.getServices().stream()
.filter(
service ->
service.getName()
.equals(Constants.SERVICE_NAME_BLOCK_STREAM))
.findFirst()
.orElse(null);

Assertions.assertNotNull(
blockStreamServiceDescriptor,
"Service descriptor not found for: " + Constants.SERVICE_NAME_BLOCK_STREAM);
assertEquals(2, blockStreamServiceDescriptor.getMethods().size());

Descriptors.ServiceDescriptor blockAccessServiceDescriptor =
fileDescriptor.getServices().stream()
.filter(
service ->
service.getName()
.equals(Constants.SERVICE_NAME_BLOCK_ACCESS))
.findFirst()
.orElse(null);
Assertions.assertNotNull(
blockAccessServiceDescriptor,
"Service descriptor not found for: " + Constants.SERVICE_NAME_BLOCK_ACCESS);
assertEquals(1, blockAccessServiceDescriptor.getMethods().size());

// Verify other methods not invoked
verify(streamMediator, timeout(testTimeout).times(0)).publish(any(BlockItem.class));
verify(streamMediator, timeout(testTimeout).times(0))
.publish(Mockito.<List<BlockItem>>any());
}

@Test
Expand All @@ -178,12 +206,10 @@ void testSingleBlockHappyPath() throws IOException, ParseException {
when(serviceStatus.isRunning()).thenReturn(true);

// Generate and persist a block
final BlockWriter<BlockItem> blockWriter =
final BlockWriter<List<BlockItem>> blockWriter =
BlockAsDirWriterBuilder.newBuilder(blockNodeContext).build();
final List<BlockItem> blockItems = generateBlockItems(1);
for (BlockItem blockItem : blockItems) {
blockWriter.write(blockItem);
}
blockWriter.write(blockItems);

// Get the block so we can verify the response payload
final Optional<Block> blockOpt = blockReader.read(1);
Expand Down
Loading

0 comments on commit bee9d6d

Please sign in to comment.