Skip to content

Commit

Permalink
simple changes to allow the simulator to stream List<BlockItems> inst…
Browse files Browse the repository at this point in the history
…ead of individual block items.

Pending to add a config with the desired batch size.
Pending to add more tests with different batch sizes.

Signed-off-by: Alfredo Gutierrez <[email protected]>
  • Loading branch information
AlfredoG87 committed Oct 18, 2024
1 parent 499e007 commit a25cdc7
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.swirlds.config.api.Configuration;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.inject.Inject;

Expand Down Expand Up @@ -130,7 +131,7 @@ private void constantRateStreaming()
break;
}

publishStreamGrpcClient.streamBlockItem(blockItem);
publishStreamGrpcClient.streamBlockItem(List.of(blockItem));
blockItemsStreamed++;

Thread.sleep(delayMSBetweenBlockItems, delayNSBetweenBlockItems);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.hedera.hapi.block.stream.Block;
import com.hedera.hapi.block.stream.BlockItem;
import java.util.List;

/**
* The PublishStreamGrpcClient interface provides the methods to stream the block and block item.
Expand All @@ -26,10 +27,10 @@ public interface PublishStreamGrpcClient {
/**
* Streams the block item.
*
* @param blockItem the block item to be streamed
* @param blockItems list of the block item to be streamed
* @return true if the block item is streamed successfully, false otherwise
*/
boolean streamBlockItem(BlockItem blockItem);
boolean streamBlockItem(List<BlockItem> blockItems);

/**
* Streams the block.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.List;
import javax.inject.Inject;

/**
Expand Down Expand Up @@ -57,11 +59,15 @@ public PublishStreamGrpcClientImpl(@NonNull GrpcConfig grpcConfig) {
* stream.
*/
@Override
public boolean streamBlockItem(BlockItem blockItem) {
public boolean streamBlockItem(List<BlockItem> blockItems) {

List<com.hedera.hapi.block.stream.protoc.BlockItem> blockItemsProtoc = new ArrayList<>();
for (BlockItem blockItem : blockItems) {
blockItemsProtoc.add(Translator.fromPbj(blockItem));
}

requestStreamObserver.onNext(
PublishStreamRequest.newBuilder()
.setBlockItem(Translator.fromPbj(blockItem))
.build());
PublishStreamRequest.newBuilder().addAllBlockItems(blockItemsProtoc).build());

return true;
}
Expand All @@ -72,12 +78,13 @@ public boolean streamBlockItem(BlockItem blockItem) {
*/
@Override
public boolean streamBlock(Block block) {
List<com.hedera.hapi.block.stream.protoc.BlockItem> blockItemsProtoc = new ArrayList<>();
for (BlockItem blockItem : block.items()) {
streamBlockItem(blockItem);
blockItemsProtoc.add(Translator.fromPbj(blockItem));
}

// wait for ack on the block
// if and when the ack is received return true
requestStreamObserver.onNext(
PublishStreamRequest.newBuilder().addAllBlockItems(blockItemsProtoc).build());

return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.hedera.hapi.block.stream.Block;
import com.hedera.hapi.block.stream.BlockItem;
import java.io.IOException;
import java.util.List;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -45,7 +46,7 @@ void streamBlockItem() {
BlockItem blockItem = BlockItem.newBuilder().build();
PublishStreamGrpcClientImpl publishStreamGrpcClient =
new PublishStreamGrpcClientImpl(grpcConfig);
boolean result = publishStreamGrpcClient.streamBlockItem(blockItem);
boolean result = publishStreamGrpcClient.streamBlockItem(List.of(blockItem));
assertTrue(result);
}

Expand Down

0 comments on commit a25cdc7

Please sign in to comment.