Skip to content

Commit

Permalink
fix: injected map to make it more testable
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Peterson <[email protected]>
  • Loading branch information
mattp-swirldslabs committed Jul 24, 2024
1 parent 7916f42 commit 6988dde
Show file tree
Hide file tree
Showing 4 changed files with 310 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@

package com.hedera.block.server.consumer;

import static com.hedera.block.protos.BlockStreamService.BlockItem;
import static com.hedera.block.protos.BlockStreamService.SubscribeStreamResponse;

import com.hedera.block.server.data.ObjectEvent;
import com.hedera.block.server.mediator.StreamMediator;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;

import java.time.InstantSource;
import java.util.concurrent.CountDownLatch;

import static com.hedera.block.protos.BlockStreamService.BlockItem;
import static com.hedera.block.protos.BlockStreamService.SubscribeStreamResponse;

/**
* The LiveStreamObserverImpl class implements the LiveStreamObserver interface to pass blocks to
Expand All @@ -41,8 +41,6 @@ public class ConsumerBlockItemObserver implements BlockItemEventHandler<ObjectEv
private final InstantSource producerLivenessClock;
private long producerLivenessMillis;

private final CountDownLatch shutdownLatch = new CountDownLatch(1);

private final StreamMediator<ObjectEvent<BlockItem>, BlockItem> streamMediator;

private boolean streamStarted;
Expand Down Expand Up @@ -118,6 +116,5 @@ public void onEvent(final ObjectEvent<BlockItem> event, final long l, final bool

@Override
public void awaitShutdown() throws InterruptedException {
shutdownLatch.await();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class LiveStreamMediatorImpl implements StreamMediator<ObjectEvent<BlockI
private final Map<
BlockItemEventHandler<ObjectEvent<BlockItem>>,
BatchEventProcessor<ObjectEvent<BlockItem>>>
subscribers = new HashMap<>();
subscribers;

private final BlockPersistenceHandler<BlockItem, Block> blockPersistenceHandler;
private final Consumer<StreamMediator<ObjectEvent<BlockItem>, BlockItem>> shutdownCallback;
Expand All @@ -61,8 +61,13 @@ public class LiveStreamMediatorImpl implements StreamMediator<ObjectEvent<BlockI
* @param blockPersistenceHandler the block persistence handler
*/
public LiveStreamMediatorImpl(
final Map<
BlockItemEventHandler<ObjectEvent<BlockItem>>,
BatchEventProcessor<ObjectEvent<BlockItem>>>
subscribers,
final BlockPersistenceHandler<BlockItem, Block> blockPersistenceHandler,
final Consumer<StreamMediator<ObjectEvent<BlockItem>, BlockItem>> shutdownCallback) {
this.subscribers = subscribers;
this.blockPersistenceHandler = blockPersistenceHandler;

// Initialize and start the disruptor
Expand All @@ -73,6 +78,12 @@ public LiveStreamMediatorImpl(
this.shutdownCallback = shutdownCallback;
}

public LiveStreamMediatorImpl(
final BlockPersistenceHandler<BlockItem, Block> blockPersistenceHandler,
final Consumer<StreamMediator<ObjectEvent<BlockItem>, BlockItem>> shutdownCallback) {
this(new HashMap<>(), blockPersistenceHandler, shutdownCallback);
}

@Override
public void publishEvent(BlockItem blockItem) {

Expand Down Expand Up @@ -101,6 +112,7 @@ public void subscribe(final BlockItemEventHandler<ObjectEvent<BlockItem>> handle
ringBuffer.addGatingSequences(batchEventProcessor.getSequence());
executor.execute(batchEventProcessor);

// Keep track of the subscriber
subscribers.put(handler, batchEventProcessor);
}

Expand Down
Loading

0 comments on commit 6988dde

Please sign in to comment.