Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Stream of repeated block items instead of block items #269

Merged
merged 15 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
ata-nas marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.block.common.utils;

import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/** Utility class for chunking collections. */
public final class ChunkUtils {
/** Chunks a collection into a list of lists of the specified size.
* @param collection the collection to chunk, if the collection is empty, an empty list is returned.
* @param chunkSize the size of each chunk
* @param <T> the type of the collection
* @return a list of lists of the specified size
* */
AlfredoG87 marked this conversation as resolved.
Show resolved Hide resolved
public static <T> List<List<T>> chunkify(
@NonNull final Collection<T> collection, final int chunkSize) {
AlfredoG87 marked this conversation as resolved.
Show resolved Hide resolved
Objects.requireNonNull(collection);
if (chunkSize <= 0) {
throw new IllegalArgumentException("Chunk size must be greater than 0");
}
if (collection.isEmpty()) {
return Collections.emptyList(); // or throw, depends on how we want to handle
ata-nas marked this conversation as resolved.
Show resolved Hide resolved
}
final List<T> localCollection = List.copyOf(collection);
final int localCollectionSize = localCollection.size();
return IntStream.iterate(0, i -> i < localCollectionSize, i -> i + chunkSize)
.mapToObj(
i ->
localCollection.subList(
i, Math.min(i + chunkSize, localCollectionSize)))
.collect(Collectors.toList());
AlfredoG87 marked this conversation as resolved.
Show resolved Hide resolved
}

private ChunkUtils() {}
}
3 changes: 3 additions & 0 deletions common/src/main/java/module-info.java
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
/**
* Module info for common module.
*/
module com.hedera.block.common {
exports com.hedera.block.common.constants;
exports com.hedera.block.common.utils;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.block.common.utils;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.junit.jupiter.api.Test;

public class ChunkUtilsTest {

@Test
public void testEmptyCollection() {
List<Integer> emptyList = Collections.emptyList();
List<List<Integer>> chunks = ChunkUtils.chunkify(emptyList, 3);
assertTrue(chunks.isEmpty(), "Chunks of empty collection should be empty");
}

@Test
public void testChunkSizeZero() {
List<Integer> list = Arrays.asList(1, 2, 3);
Exception exception =
assertThrows(IllegalArgumentException.class, () -> ChunkUtils.chunkify(list, 0));
assertEquals("Chunk size must be greater than 0", exception.getMessage());
}

@Test
public void testChunkSizeNegative() {
List<Integer> list = Arrays.asList(1, 2, 3);
Exception exception =
assertThrows(IllegalArgumentException.class, () -> ChunkUtils.chunkify(list, -1));
assertEquals("Chunk size must be greater than 0", exception.getMessage());
}

@Test
public void testChunkifyCollectionSmallerThanChunkSize() {
List<Integer> list = Arrays.asList(1, 2);
List<List<Integer>> chunks = ChunkUtils.chunkify(list, 5);
assertEquals(1, chunks.size(), "Should return one chunk");
assertEquals(list, chunks.get(0), "Chunk should contain the entire collection");
}

@Test
public void testChunkifyCollectionExactlyDivisible() {
List<Integer> list = Arrays.asList(1, 2, 3, 4);
List<List<Integer>> chunks = ChunkUtils.chunkify(list, 2);
assertEquals(2, chunks.size(), "Should return two chunks");
assertEquals(Arrays.asList(1, 2), chunks.get(0), "First chunk mismatch");
assertEquals(Arrays.asList(3, 4), chunks.get(1), "Second chunk mismatch");
}

@Test
public void testChunkifyCollectionNotExactlyDivisible() {
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
List<List<Integer>> chunks = ChunkUtils.chunkify(list, 2);
assertEquals(3, chunks.size(), "Should return three chunks");
assertEquals(Arrays.asList(1, 2), chunks.get(0), "First chunk mismatch");
assertEquals(Arrays.asList(3, 4), chunks.get(1), "Second chunk mismatch");
assertEquals(Arrays.asList(5), chunks.get(2), "Third chunk mismatch");
}

@Test
public void testNullCollection() {
assertThrows(NullPointerException.class, () -> ChunkUtils.chunkify(null, 3));
}
}
18 changes: 12 additions & 6 deletions server/src/main/java/com/hedera/block/server/BlockNodeApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@
import static java.lang.System.Logger;
import static java.lang.System.Logger.Level.INFO;

import com.hedera.block.server.grpc.BlockAccessService;
import com.hedera.block.server.grpc.BlockStreamService;
import com.hedera.block.server.health.HealthService;
import com.hedera.block.server.service.ServiceStatus;
import edu.umd.cs.findbugs.annotations.NonNull;
import io.helidon.webserver.WebServer;
import io.helidon.webserver.WebServerConfig;
import io.helidon.webserver.grpc.GrpcRouting;
import io.helidon.webserver.grpc.GrpcService;
import io.helidon.webserver.http.HttpRouting;
import java.io.IOException;
import javax.inject.Inject;
Expand All @@ -42,27 +43,31 @@ public class BlockNodeApp {

private final ServiceStatus serviceStatus;
private final HealthService healthService;
private final GrpcService blockStreamService;
private final BlockStreamService blockStreamService;
private final BlockAccessService blockAccessService;
private final WebServerConfig.Builder webServerBuilder;

/**
* Constructs a new BlockNodeApp with the specified dependencies.
*
* @param serviceStatus has the status of the service
* @param healthService handles the health API requests
* @param blockStreamService handles the GRPC API requests
* @param blockStreamService handles the block stream requests
* @param webServerBuilder used to build the web server and start it
* @param blockAccessService grpc service for block access
*/
@Inject
public BlockNodeApp(
@NonNull ServiceStatus serviceStatus,
@NonNull HealthService healthService,
@NonNull GrpcService blockStreamService,
@NonNull WebServerConfig.Builder webServerBuilder) {
@NonNull BlockStreamService blockStreamService,
@NonNull WebServerConfig.Builder webServerBuilder,
@NonNull BlockAccessService blockAccessService) {
this.serviceStatus = serviceStatus;
this.healthService = healthService;
this.blockStreamService = blockStreamService;
this.webServerBuilder = webServerBuilder;
this.blockAccessService = blockAccessService;
}

/**
Expand All @@ -72,7 +77,8 @@ public BlockNodeApp(
*/
public void start() throws IOException {

final GrpcRouting.Builder grpcRouting = GrpcRouting.builder().service(blockStreamService);
final GrpcRouting.Builder grpcRouting =
GrpcRouting.builder().service(blockStreamService).service(blockAccessService);

final HttpRouting.Builder httpRouting =
HttpRouting.builder().register(healthService.getHealthRootPath(), healthService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@
import com.hedera.block.server.config.BlockNodeContext;
import com.hedera.block.server.metrics.MetricsService;
import com.swirlds.config.api.Configuration;
import dagger.Binds;
import dagger.Module;
import dagger.Provides;
import io.helidon.webserver.WebServerConfig;
import io.helidon.webserver.grpc.GrpcService;
import javax.inject.Singleton;

/**
Expand All @@ -47,16 +45,6 @@ static BlockNodeContext provideBlockNodeContext(
return new BlockNodeContext(metricsService, config);
}

/**
* Provides a block stream service singleton using DI.
*
* @param blockStreamService should come from DI
* @return a block stream service singleton
*/
@Singleton
@Binds
GrpcService bindBlockStreamService(BlockStreamService blockStreamService);

/**
* Provides a web server config builder singleton using DI.
*
Expand Down
7 changes: 5 additions & 2 deletions server/src/main/java/com/hedera/block/server/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,11 @@ private Constants() {}
*/
@NonNull public static final String LOGGING_PROPERTIES = "logging.properties";

/** Constant mapped to the name of the service in the .proto file */
@NonNull public static final String SERVICE_NAME = "BlockStreamService";
/** Constant mapped to the name of the BlockStream service in the .proto file */
@NonNull public static final String SERVICE_NAME_BLOCK_STREAM = "BlockStreamService";

/** Constant mapped to the name of the BlockAccess service in the .proto file */
@NonNull public static final String SERVICE_NAME_BLOCK_ACCESS = "BlockAccessService";

/** Constant mapped to the publishBlockStream service method name in the .proto file */
@NonNull public static final String CLIENT_STREAMING_METHOD_NAME = "publishBlockStream";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.hedera.block.server.consumer;

import static com.hedera.block.server.Translator.fromPbj;
import static com.hedera.block.server.metrics.BlockNodeMetricTypes.Counter.LiveBlockItemsConsumed;
import static java.lang.System.Logger;
import static java.lang.System.Logger.Level.DEBUG;
import static java.lang.System.Logger.Level.ERROR;
Expand All @@ -27,6 +26,7 @@
import com.hedera.block.server.events.LivenessCalculator;
import com.hedera.block.server.events.ObjectEvent;
import com.hedera.block.server.mediator.SubscriptionHandler;
import com.hedera.block.server.metrics.BlockNodeMetricTypes;
import com.hedera.block.server.metrics.MetricsService;
import com.hedera.hapi.block.SubscribeStreamResponse;
import com.hedera.hapi.block.stream.BlockItem;
Expand All @@ -35,6 +35,8 @@
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import java.time.InstantSource;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;

/**
Expand All @@ -55,7 +57,7 @@ public class ConsumerStreamResponseObserver

private final AtomicBoolean isResponsePermitted = new AtomicBoolean(true);
private final ResponseSender statusResponseSender = new StatusResponseSender();
private final ResponseSender blockItemResponseSender = new BlockItemResponseSender();
private final ResponseSender blockItemsResponseSender = new BlockItemsResponseSender();

private static final String PROTOCOL_VIOLATION_MESSAGE =
"Protocol Violation. %s is OneOf type %s but %s is null.\n%s";
Expand Down Expand Up @@ -186,7 +188,7 @@ private ResponseSender getResponseSender(
subscribeStreamResponse.response();
return switch (oneOfTypeOneOf.kind()) {
AlfredoG87 marked this conversation as resolved.
Show resolved Hide resolved
case STATUS -> statusResponseSender;
case BLOCK_ITEM -> blockItemResponseSender;
case BLOCK_ITEMS -> blockItemsResponseSender;
default -> throw new IllegalArgumentException(
"Unknown response type: " + oneOfTypeOneOf.kind());
};
Expand All @@ -196,35 +198,39 @@ private interface ResponseSender {
void send(@NonNull final SubscribeStreamResponse subscribeStreamResponse);
}

private final class BlockItemResponseSender implements ResponseSender {
private final class BlockItemsResponseSender implements ResponseSender {
private boolean streamStarted = false;

public void send(@NonNull final SubscribeStreamResponse subscribeStreamResponse) {

// Only start sending BlockItems after we've reached
// the beginning of a block.
final BlockItem blockItem = subscribeStreamResponse.blockItem();
if (blockItem == null) {
if (subscribeStreamResponse.blockItems() == null) {
final String message =
PROTOCOL_VIOLATION_MESSAGE.formatted(
"SubscribeStreamResponse",
"BLOCK_ITEM",
"block_item",
"BLOCK_ITEMS",
"block_items",
subscribeStreamResponse);
LOGGER.log(ERROR, message);
throw new IllegalArgumentException(message);
} else {
if (!streamStarted && blockItem.hasBlockHeader()) {
streamStarted = true;
}
}

if (streamStarted) {
LOGGER.log(DEBUG, "Sending BlockItem downstream: " + blockItem);
final List<BlockItem> blockItems =
Objects.requireNonNull(subscribeStreamResponse.blockItems()).blockItems();
// Only start sending BlockItems after we've reached
// the beginning of a block.
if (!streamStarted && blockItems.getFirst().hasBlockHeader()) {
LOGGER.log(
AlfredoG87 marked this conversation as resolved.
Show resolved Hide resolved
DEBUG,
"Sending BlockItem Batch downstream for block: "
+ blockItems.getFirst().blockHeader().number());
streamStarted = true;
}

// Increment counter
metricsService.get(LiveBlockItemsConsumed).increment();
subscribeStreamResponseObserver.onNext(fromPbj(subscribeStreamResponse));
}
if (streamStarted) {
metricsService
.get(BlockNodeMetricTypes.Counter.LiveBlockItemsReceived)
.add(blockItems.size());
subscribeStreamResponseObserver.onNext(fromPbj(subscribeStreamResponse));
mattp-swirldslabs marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Expand Down
Loading
Loading