Skip to content

Commit

Permalink
Changes to Helidon and Services
Browse files Browse the repository at this point in the history
Signed-off-by: Alfredo Gutierrez <[email protected]>
  • Loading branch information
AlfredoG87 committed Oct 18, 2024
1 parent a25cdc7 commit 48e4ba0
Show file tree
Hide file tree
Showing 11 changed files with 125 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.hedera.block.server.consumer;

import static com.hedera.block.server.Translator.fromPbj;
import static com.hedera.block.server.metrics.BlockNodeMetricTypes.Counter.LiveBlockItemsConsumed;
import static java.lang.System.Logger;
import static java.lang.System.Logger.Level.DEBUG;
import static java.lang.System.Logger.Level.ERROR;
Expand All @@ -27,14 +26,17 @@
import com.hedera.block.server.events.LivenessCalculator;
import com.hedera.block.server.events.ObjectEvent;
import com.hedera.block.server.mediator.SubscriptionHandler;
import com.hedera.block.server.metrics.BlockNodeMetricTypes;
import com.hedera.block.server.metrics.MetricsService;
import com.hedera.hapi.block.SubscribeStreamResponse;
import com.hedera.hapi.block.stream.BlockItem;
import com.hedera.pbj.runtime.OneOf;
import com.swirlds.metrics.api.Counter;
import edu.umd.cs.findbugs.annotations.NonNull;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import java.time.InstantSource;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

/**
Expand All @@ -55,7 +57,7 @@ public class ConsumerStreamResponseObserver

private final AtomicBoolean isResponsePermitted = new AtomicBoolean(true);
private final ResponseSender statusResponseSender = new StatusResponseSender();
private final ResponseSender blockItemResponseSender = new BlockItemResponseSender();
private final ResponseSender blockItemsResponseSender = new BlockItemsResponseSender();

private static final String PROTOCOL_VIOLATION_MESSAGE =
"Protocol Violation. %s is OneOf type %s but %s is null.\n%s";
Expand Down Expand Up @@ -186,7 +188,7 @@ private ResponseSender getResponseSender(
subscribeStreamResponse.response();
return switch (oneOfTypeOneOf.kind()) {
case STATUS -> statusResponseSender;
case BLOCK_ITEM -> blockItemResponseSender;
case BLOCK_ITEMS -> blockItemsResponseSender;
default -> throw new IllegalArgumentException(
"Unknown response type: " + oneOfTypeOneOf.kind());
};
Expand All @@ -196,35 +198,39 @@ private interface ResponseSender {
void send(@NonNull final SubscribeStreamResponse subscribeStreamResponse);
}

private final class BlockItemResponseSender implements ResponseSender {
private final class BlockItemsResponseSender implements ResponseSender {
private boolean streamStarted = false;

public void send(@NonNull final SubscribeStreamResponse subscribeStreamResponse) {

// Only start sending BlockItems after we've reached
// the beginning of a block.
final BlockItem blockItem = subscribeStreamResponse.blockItem();
if (blockItem == null) {
if (subscribeStreamResponse.blockItems() == null) {
final String message =
PROTOCOL_VIOLATION_MESSAGE.formatted(
"SubscribeStreamResponse",
"BLOCK_ITEM",
"block_item",
"BLOCK_ITEMS",
"block_items",
subscribeStreamResponse);
LOGGER.log(ERROR, message);
throw new IllegalArgumentException(message);
} else {
if (!streamStarted && blockItem.hasBlockHeader()) {
streamStarted = true;
}
}

final List<BlockItem> blockItems = subscribeStreamResponse.blockItems().blockItems();
// Only start sending BlockItems after we've reached
// the beginning of a block.
if (!streamStarted && blockItems.getFirst().hasBlockHeader()) {
streamStarted = true;
}

if (streamStarted) {
LOGGER.log(DEBUG, "Sending BlockItem downstream: " + blockItem);
if (streamStarted) {
LOGGER.log(DEBUG, "Sending BlockItem Batch downstream");

// Increment counter
metricsService.get(LiveBlockItemsConsumed).increment();
subscribeStreamResponseObserver.onNext(fromPbj(subscribeStreamResponse));
final Counter liveBlockItemsConsumed =
metricsService.get(BlockNodeMetricTypes.Counter.LiveBlockItemsConsumed);
// Increment counter manually
for (int i = 0; i < blockItems.size(); i++) {
liveBlockItemsConsumed.increment();
}
subscribeStreamResponseObserver.onNext(fromPbj(subscribeStreamResponse));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@
import com.hedera.block.server.notifier.Notifiable;
import com.hedera.hapi.block.SubscribeStreamResponse;
import com.hedera.hapi.block.stream.BlockItem;
import java.util.List;

/**
* Use this interface to combine the contract for mediating the live stream of blocks from the
* Hedera network with the contract to be notified of critical system events.
*/
public interface LiveStreamMediator
extends StreamMediator<BlockItem, SubscribeStreamResponse>, Notifiable {}
extends StreamMediator<List<BlockItem>, SubscribeStreamResponse>, Notifiable {}
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,12 @@
import com.hedera.block.server.service.ServiceStatus;
import com.hedera.hapi.block.SubscribeStreamResponse;
import com.hedera.hapi.block.SubscribeStreamResponseCode;
import com.hedera.hapi.block.SubscribeStreamResponseSet;
import com.hedera.hapi.block.stream.BlockItem;
import com.lmax.disruptor.BatchEventProcessor;
import com.swirlds.metrics.api.Counter;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.List;
import java.util.Map;

/**
Expand Down Expand Up @@ -87,24 +90,30 @@ class LiveStreamMediatorImpl extends SubscriptionHandlerBase<SubscribeStreamResp

/**
* Publishes the given block item to all subscribers. If an exception occurs while persisting
* the block item, the service status is set to not running, and all downstream consumers are
* the block items, the service status is set to not running, and all downstream consumers are
* unsubscribed.
*
* @param blockItem the block item from the upstream producer to publish to downstream consumers
* @param blockItems the block item from the upstream producer to publish to downstream
* consumers
*/
@Override
public void publish(@NonNull final BlockItem blockItem) {
public void publish(@NonNull final List<BlockItem> blockItems) {

if (serviceStatus.isRunning()) {

// Publish the block for all subscribers to receive
LOGGER.log(DEBUG, "Publishing BlockItem: " + blockItem);
LOGGER.log(DEBUG, "Publishing BlockItem");
final SubscribeStreamResponseSet blockItemsSet =
SubscribeStreamResponseSet.newBuilder().blockItems(blockItems).build();
final var subscribeStreamResponse =
SubscribeStreamResponse.newBuilder().blockItem(blockItem).build();
SubscribeStreamResponse.newBuilder().blockItems(blockItemsSet).build();
ringBuffer.publishEvent((event, sequence) -> event.set(subscribeStreamResponse));

// Increment the block item counter
metricsService.get(LiveBlockItems).increment();
Counter liveBlockItems = metricsService.get(LiveBlockItems);
for (int i = 0; i < blockItems.size(); i++) {
liveBlockItems.increment();
}

} else {
LOGGER.log(ERROR, "StreamMediator is not accepting BlockItems");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
import com.hedera.block.server.mediator.StreamMediator;
import com.hedera.hapi.block.PublishStreamResponse;
import com.hedera.hapi.block.stream.BlockItem;
import java.util.List;

/**
* Use this interface to combine the contract for streaming block items with the contract to be
* notified of critical system events.
*/
public interface Notifier extends StreamMediator<BlockItem, PublishStreamResponse>, Notifiable {}
public interface Notifier
extends StreamMediator<List<BlockItem>, PublishStreamResponse>, Notifiable {}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.hedera.pbj.runtime.io.buffer.Bytes;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.security.NoSuchAlgorithmException;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import javax.inject.Inject;
import javax.inject.Singleton;
Expand Down Expand Up @@ -104,18 +105,18 @@ public void notifyUnrecoverableError() {
/**
* Publishes the given block item to all subscribed producers.
*
* @param blockItem the block item from the persistence layer to publish a response to upstream
* producers
* @param blockItems the block items from the persistence layer to publish a response to
* upstream producers
*/
@Override
public void publish(@NonNull BlockItem blockItem) {
public void publish(@NonNull List<BlockItem> blockItems) {

try {
if (serviceStatus.isRunning()) {
// Publish the block item to the subscribers
final var publishStreamResponse =
PublishStreamResponse.newBuilder()
.acknowledgement(buildAck(blockItem))
.acknowledgement(buildAck(blockItems))
.build();
ringBuffer.publishEvent((event, sequence) -> event.set(publishStreamResponse));

Expand Down Expand Up @@ -150,16 +151,17 @@ static PublishStreamResponse buildErrorStreamResponse() {
/**
* Protected method meant for testing. Builds an Acknowledgement for the block item.
*
* @param blockItem the block item to build the Acknowledgement for
* @param blockItems the block items to build the Acknowledgement for
* @return the Acknowledgement for the block item
* @throws NoSuchAlgorithmException if the hash algorithm is not supported
*/
@NonNull
Acknowledgement buildAck(@NonNull final BlockItem blockItem) throws NoSuchAlgorithmException {
Acknowledgement buildAck(@NonNull final List<BlockItem> blockItems)
throws NoSuchAlgorithmException {
final ItemAcknowledgement itemAck =
ItemAcknowledgement.newBuilder()
// TODO: Replace this with a real hash generator
.itemHash(Bytes.wrap(getFakeHash(blockItem)))
.itemsHash(Bytes.wrap(getFakeHash(blockItems)))
.build();

return Acknowledgement.newBuilder().itemAck(itemAck).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import dagger.Module;
import dagger.Provides;
import java.io.IOException;
import java.util.List;
import javax.inject.Singleton;

/** A Dagger module for providing dependencies for Persistence Module. */
Expand All @@ -45,7 +46,7 @@ public interface PersistenceInjectionModule {
*/
@Provides
@Singleton
static BlockWriter<BlockItem> providesBlockWriter(BlockNodeContext blockNodeContext) {
static BlockWriter<List<BlockItem>> providesBlockWriter(BlockNodeContext blockNodeContext) {
try {
return BlockAsDirWriterBuilder.newBuilder(blockNodeContext).build();
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.hedera.pbj.runtime.OneOf;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import javax.inject.Inject;
import javax.inject.Singleton;
Expand All @@ -53,7 +54,7 @@ public class StreamPersistenceHandlerImpl
private final System.Logger LOGGER = System.getLogger(getClass().getName());

private final SubscriptionHandler<SubscribeStreamResponse> subscriptionHandler;
private final BlockWriter<BlockItem> blockWriter;
private final BlockWriter<List<BlockItem>> blockWriter;
private final Notifier notifier;
private final MetricsService metricsService;
private final ServiceStatus serviceStatus;
Expand All @@ -78,7 +79,7 @@ public class StreamPersistenceHandlerImpl
public StreamPersistenceHandlerImpl(
@NonNull final SubscriptionHandler<SubscribeStreamResponse> subscriptionHandler,
@NonNull final Notifier notifier,
@NonNull final BlockWriter<BlockItem> blockWriter,
@NonNull final BlockWriter<List<BlockItem>> blockWriter,
@NonNull final BlockNodeContext blockNodeContext,
@NonNull final ServiceStatus serviceStatus) {
this.subscriptionHandler = subscriptionHandler;
Expand All @@ -105,9 +106,8 @@ public void onEvent(ObjectEvent<SubscribeStreamResponse> event, long l, boolean
final OneOf<SubscribeStreamResponse.ResponseOneOfType> oneOfTypeOneOf =
subscribeStreamResponse.response();
switch (oneOfTypeOneOf.kind()) {
case BLOCK_ITEM -> {
final BlockItem blockItem = subscribeStreamResponse.blockItem();
if (blockItem == null) {
case BLOCK_ITEMS -> {
if (subscribeStreamResponse.blockItems() == null) {
final String message =
PROTOCOL_VIOLATION_MESSAGE.formatted(
"SubscribeStreamResponse",
Expand All @@ -118,12 +118,14 @@ public void onEvent(ObjectEvent<SubscribeStreamResponse> event, long l, boolean
metricsService.get(StreamPersistenceHandlerError).increment();
throw new BlockStreamProtocolException(message);
} else {
// Persist the BlockItem
Optional<BlockItem> result = blockWriter.write(blockItem);
// Persist the BlockItems
List<BlockItem> blockItems =
subscribeStreamResponse.blockItems().blockItems();
Optional<List<BlockItem>> result = blockWriter.write(blockItems);
if (result.isPresent()) {
// Publish the block item back upstream to the notifier
// to send responses to producers.
notifier.publish(blockItem);
notifier.publish(blockItems);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission;
import java.util.List;
import java.util.Optional;
import java.util.Set;

Expand All @@ -48,7 +49,7 @@
* to remove the current, incomplete block (directory) before re-throwing the exception to the
* caller.
*/
class BlockAsDirWriter implements BlockWriter<BlockItem> {
class BlockAsDirWriter implements BlockWriter<List<BlockItem>> {

private final Logger LOGGER = System.getLogger(getClass().getName());

Expand Down Expand Up @@ -94,20 +95,23 @@ class BlockAsDirWriter implements BlockWriter<BlockItem> {
/**
* Writes the given block item to the filesystem.
*
* @param blockItem the block item to write
* @param blockItems the block item to write
* @throws IOException if an error occurs while writing the block item
*/
@Override
public Optional<BlockItem> write(@NonNull final BlockItem blockItem) throws IOException {
public Optional<List<BlockItem>> write(@NonNull final List<BlockItem> blockItems)
throws IOException {

if (blockItem.hasBlockHeader()) {
resetState(blockItem);
if (blockItems.getFirst().hasBlockHeader()) {
resetState(blockItems.getFirst());
}

final Path blockItemFilePath = calculateBlockItemPath();
for (int retries = 0; ; retries++) {
try {
write(blockItemFilePath, blockItem);
for (BlockItem blockItem : blockItems) {
write(blockItemFilePath, blockItem);
}
break;
} catch (IOException e) {

Expand All @@ -128,9 +132,9 @@ public Optional<BlockItem> write(@NonNull final BlockItem blockItem) throws IOEx
}
}

if (blockItem.hasBlockProof()) {
if (blockItems.getLast().hasBlockProof()) {
metricsService.get(BlocksPersisted).increment();
return Optional.of(blockItem);
return Optional.of(blockItems);
}

return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission;
import java.util.List;
import java.util.Set;

/**
Expand Down Expand Up @@ -103,7 +104,7 @@ public BlockAsDirWriterBuilder blockRemover(@NonNull BlockRemover blockRemover)
* @throws IOException when an error occurs while persisting block items to storage.
*/
@NonNull
public BlockWriter<BlockItem> build() throws IOException {
public BlockWriter<List<BlockItem>> build() throws IOException {
return new BlockAsDirWriter(blockRemover, filePerms, blockNodeContext);
}
}
Loading

0 comments on commit 48e4ba0

Please sign in to comment.