From a25cdc7c3af149ef926e60670ed5302c6d09b7ec Mon Sep 17 00:00:00 2001 From: Alfredo Gutierrez Date: Tue, 15 Oct 2024 21:05:06 -0600 Subject: [PATCH] simple changes to allow the simulator to stream List instead of individual block items. Pending to add a config with the desired batch size. Pending to add more tests with different batch sizes. Signed-off-by: Alfredo Gutierrez --- .../simulator/BlockStreamSimulatorApp.java | 3 ++- .../grpc/PublishStreamGrpcClient.java | 5 +++-- .../grpc/PublishStreamGrpcClientImpl.java | 21 ++++++++++++------- .../grpc/PublishStreamGrpcClientImplTest.java | 3 ++- 4 files changed, 21 insertions(+), 11 deletions(-) diff --git a/simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulatorApp.java b/simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulatorApp.java index 96bec9f94..f0e9ce29a 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulatorApp.java +++ b/simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulatorApp.java @@ -26,6 +26,7 @@ import com.swirlds.config.api.Configuration; import edu.umd.cs.findbugs.annotations.NonNull; import java.io.IOException; +import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import javax.inject.Inject; @@ -130,7 +131,7 @@ private void constantRateStreaming() break; } - publishStreamGrpcClient.streamBlockItem(blockItem); + publishStreamGrpcClient.streamBlockItem(List.of(blockItem)); blockItemsStreamed++; Thread.sleep(delayMSBetweenBlockItems, delayNSBetweenBlockItems); diff --git a/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClient.java b/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClient.java index 51cead506..4795acc7f 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClient.java +++ b/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClient.java @@ -18,6 +18,7 @@ import com.hedera.hapi.block.stream.Block; import com.hedera.hapi.block.stream.BlockItem; +import java.util.List; /** * The PublishStreamGrpcClient interface provides the methods to stream the block and block item. @@ -26,10 +27,10 @@ public interface PublishStreamGrpcClient { /** * Streams the block item. * - * @param blockItem the block item to be streamed + * @param blockItems list of the block item to be streamed * @return true if the block item is streamed successfully, false otherwise */ - boolean streamBlockItem(BlockItem blockItem); + boolean streamBlockItem(List blockItems); /** * Streams the block. diff --git a/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClientImpl.java b/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClientImpl.java index d8d85803d..41d8e27d8 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClientImpl.java +++ b/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClientImpl.java @@ -26,6 +26,8 @@ import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.stub.StreamObserver; +import java.util.ArrayList; +import java.util.List; import javax.inject.Inject; /** @@ -57,11 +59,15 @@ public PublishStreamGrpcClientImpl(@NonNull GrpcConfig grpcConfig) { * stream. */ @Override - public boolean streamBlockItem(BlockItem blockItem) { + public boolean streamBlockItem(List blockItems) { + + List blockItemsProtoc = new ArrayList<>(); + for (BlockItem blockItem : blockItems) { + blockItemsProtoc.add(Translator.fromPbj(blockItem)); + } + requestStreamObserver.onNext( - PublishStreamRequest.newBuilder() - .setBlockItem(Translator.fromPbj(blockItem)) - .build()); + PublishStreamRequest.newBuilder().addAllBlockItems(blockItemsProtoc).build()); return true; } @@ -72,12 +78,13 @@ public boolean streamBlockItem(BlockItem blockItem) { */ @Override public boolean streamBlock(Block block) { + List blockItemsProtoc = new ArrayList<>(); for (BlockItem blockItem : block.items()) { - streamBlockItem(blockItem); + blockItemsProtoc.add(Translator.fromPbj(blockItem)); } - // wait for ack on the block - // if and when the ack is received return true + requestStreamObserver.onNext( + PublishStreamRequest.newBuilder().addAllBlockItems(blockItemsProtoc).build()); return true; } diff --git a/simulator/src/test/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClientImplTest.java b/simulator/src/test/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClientImplTest.java index 526aaafce..69a2b0ea9 100644 --- a/simulator/src/test/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClientImplTest.java +++ b/simulator/src/test/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClientImplTest.java @@ -23,6 +23,7 @@ import com.hedera.hapi.block.stream.Block; import com.hedera.hapi.block.stream.BlockItem; import java.io.IOException; +import java.util.List; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -45,7 +46,7 @@ void streamBlockItem() { BlockItem blockItem = BlockItem.newBuilder().build(); PublishStreamGrpcClientImpl publishStreamGrpcClient = new PublishStreamGrpcClientImpl(grpcConfig); - boolean result = publishStreamGrpcClient.streamBlockItem(blockItem); + boolean result = publishStreamGrpcClient.streamBlockItem(List.of(blockItem)); assertTrue(result); }