Skip to content

Commit

Permalink
disable unverified marker logic, deferring it for a future pr
Browse files Browse the repository at this point in the history
Signed-off-by: Atanas Atanasov <[email protected]>
  • Loading branch information
ata-nas committed Feb 6, 2025
1 parent f97e0b6 commit 7454755
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

import com.hedera.block.server.block.BlockInfo;
import com.hedera.block.server.notifier.Notifier;
import com.hedera.block.server.persistence.storage.path.BlockPathResolver;
import com.hedera.block.server.persistence.storage.remove.BlockRemover;
import com.hedera.block.server.persistence.storage.write.BlockPersistenceResult;
import com.hedera.block.server.persistence.storage.write.BlockPersistenceResult.BlockPersistenceStatus;
Expand All @@ -26,16 +25,13 @@
* consecutive ACKs for all blocks that are both persisted and verified.
*/
public class AckHandlerImpl implements AckHandler {

private final System.Logger LOGGER = System.getLogger(getClass().getName());

private final Map<Long, BlockInfo> blockInfo = new ConcurrentHashMap<>();
private volatile long lastAcknowledgedBlockNumber = -1;
private final Notifier notifier;
private final boolean skipAcknowledgement;
private final ServiceStatus serviceStatus;
private final BlockRemover blockRemover;
private final BlockPathResolver blockPathResolver;

/**
* Constructor. If either skipPersistence or skipVerification is true,
Expand All @@ -46,13 +42,11 @@ public AckHandlerImpl(
@NonNull final Notifier notifier,
final boolean skipAcknowledgement,
@NonNull final ServiceStatus serviceStatus,
@NonNull final BlockRemover blockRemover,
@NonNull final BlockPathResolver blockPathResolver) {
@NonNull final BlockRemover blockRemover) {
this.notifier = Objects.requireNonNull(notifier);
this.skipAcknowledgement = skipAcknowledgement;
this.serviceStatus = Objects.requireNonNull(serviceStatus);
this.blockRemover = Objects.requireNonNull(blockRemover);
this.blockPathResolver = Objects.requireNonNull(blockPathResolver);
}

@Override
Expand Down Expand Up @@ -145,12 +139,6 @@ private void attemptAcks() {

// Remove from map if desired (so we don't waste memory)
blockInfo.remove(nextBlock);

try {
blockPathResolver.markVerified(nextBlock);
} catch (final IOException e) {
throw new RuntimeException(e);
}
}

// Loop again in case the next block is also ready.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

import com.hedera.block.server.notifier.Notifier;
import com.hedera.block.server.persistence.storage.PersistenceStorageConfig;
import com.hedera.block.server.persistence.storage.path.BlockPathResolver;
import com.hedera.block.server.persistence.storage.remove.BlockRemover;
import com.hedera.block.server.service.ServiceStatus;
import com.hedera.block.server.verification.VerificationConfig;
Expand All @@ -30,13 +29,11 @@ static AckHandler provideBlockManager(
@NonNull final PersistenceStorageConfig persistenceStorageConfig,
@NonNull final VerificationConfig verificationConfig,
@NonNull final ServiceStatus serviceStatus,
@NonNull final BlockRemover blockRemover,
@NonNull final BlockPathResolver blockPathResolver) {
@NonNull final BlockRemover blockRemover) {

boolean skipPersistence = persistenceStorageConfig.type().equals(PersistenceStorageConfig.StorageType.NO_OP);
boolean skipVerification = verificationConfig.type().equals(VerificationConfig.VerificationServiceType.NO_OP);

return new AckHandlerImpl(
notifier, skipPersistence | skipVerification, serviceStatus, blockRemover, blockPathResolver);
return new AckHandlerImpl(notifier, skipPersistence | skipVerification, serviceStatus, blockRemover);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,19 +100,22 @@ public TransferQueue<BlockItemUnparsed> getQueue() {
}

private Path getResolvedUnverifiedBlockPath() throws IOException {
final Path rawUnverifiedBlockPath = blockPathResolver.resolveLiveRawUnverifiedPathToBlock(blockNumber);
// for now, we will not use the unverified block logic, deferring it for
// a future PR - simply, we resolve the raw path to the block instead
// of resolving it as unverified
final Path rawBlockPath = blockPathResolver.resolveLiveRawPathToBlock(blockNumber);
final Path resolvedBlockPath =
FileUtilities.appendExtension(rawUnverifiedBlockPath, compression.getCompressionFileExtension());
if (Files.notExists(resolvedBlockPath)) {
// We should not throw, unverified blocks are allowed to be overwritten
// in the beginning of the task we check if the block is already persisted
// and verified. Those must never be overwritten! If we have reached
// here, we must know that the block has never been persisted before.
// Else we need not create the file, it will be overwritten since is
// unverified. If the createFile method throws an exception here,
// it is either a bug, or a potential race condition!
FileUtilities.createFile(resolvedBlockPath);
}
FileUtilities.appendExtension(rawBlockPath, compression.getCompressionFileExtension());
// if (Files.notExists(resolvedBlockPath)) {
// We should not throw, unverified blocks are allowed to be overwritten
// in the beginning of the task we check if the block is already persisted
// and verified. Those must never be overwritten! If we have reached
// here, we must know that the block has never been persisted before.
// Else we need not create the file, it will be overwritten since is
// unverified. If the createFile method throws an exception here,
// it is either a bug, or a potential race condition!
FileUtilities.createFile(resolvedBlockPath);
// }
return resolvedBlockPath;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import com.hedera.block.server.ack.AckHandlerImpl;
import com.hedera.block.server.notifier.Notifier;
import com.hedera.block.server.persistence.storage.path.BlockPathResolver;
import com.hedera.block.server.persistence.storage.remove.BlockRemover;
import com.hedera.block.server.persistence.storage.write.BlockPersistenceResult;
import com.hedera.block.server.persistence.storage.write.BlockPersistenceResult.BlockPersistenceStatus;
Expand Down Expand Up @@ -38,23 +37,19 @@ class AckHandlerImplTest {
@Mock
private BlockRemover blockRemover;

@Mock
private BlockPathResolver blockPathResolverMock;

private AckHandlerImpl toTest;

@BeforeEach
void setUp() {
// By default, we do NOT skip acknowledgements
toTest = new AckHandlerImpl(notifier, false, serviceStatus, blockRemover, blockPathResolverMock);
toTest = new AckHandlerImpl(notifier, false, serviceStatus, blockRemover);
}

@Test
@DisplayName("blockVerified + blockPersisted should do nothing if skipAcknowledgement == true")
void blockVerified_skippedAcknowledgement() {
// given
final AckHandlerImpl managerWithSkip =
new AckHandlerImpl(notifier, true, serviceStatus, blockRemover, blockPathResolverMock);
final AckHandlerImpl managerWithSkip = new AckHandlerImpl(notifier, true, serviceStatus, blockRemover);

// when
final long blockNumber = 1L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import com.hedera.block.server.ack.AckHandlerInjectionModule;
import com.hedera.block.server.notifier.Notifier;
import com.hedera.block.server.persistence.storage.PersistenceStorageConfig;
import com.hedera.block.server.persistence.storage.path.BlockPathResolver;
import com.hedera.block.server.persistence.storage.remove.BlockRemover;
import com.hedera.block.server.service.ServiceStatus;
import com.hedera.block.server.verification.VerificationConfig;
Expand All @@ -26,7 +25,6 @@ void testProvideBlockManager() {
final Notifier notifier = mock(Notifier.class);
final ServiceStatus serviceStatus = mock(ServiceStatus.class);
final BlockRemover blockRemover = mock(BlockRemover.class);
final BlockPathResolver blockPathResolver = mock(BlockPathResolver.class);
final PersistenceStorageConfig persistenceStorageConfig = new PersistenceStorageConfig(
"",
"",
Expand All @@ -40,7 +38,7 @@ void testProvideBlockManager() {

// when
final AckHandler ackHandler = AckHandlerInjectionModule.provideBlockManager(
notifier, persistenceStorageConfig, verificationConfig, serviceStatus, blockRemover, blockPathResolver);
notifier, persistenceStorageConfig, verificationConfig, serviceStatus, blockRemover);

// then
// AckHandlerImpl is the default and only implementation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import com.hedera.block.server.notifier.NotifierImpl;
import com.hedera.block.server.persistence.StreamPersistenceHandlerImpl;
import com.hedera.block.server.persistence.storage.PersistenceStorageConfig;
import com.hedera.block.server.persistence.storage.path.BlockAsLocalFilePathResolver;
import com.hedera.block.server.persistence.storage.path.BlockPathResolver;
import com.hedera.block.server.persistence.storage.read.BlockReader;
import com.hedera.block.server.persistence.storage.remove.BlockRemover;
import com.hedera.block.server.persistence.storage.write.AsyncBlockWriterFactory;
Expand Down Expand Up @@ -81,7 +79,6 @@
@ExtendWith(MockitoExtension.class)
public class PbjBlockStreamServiceIntegrationTest {
private static final int testTimeout = 2000;
private BlockPathResolver pathResolverMock;

@Mock
private Notifier notifier;
Expand Down Expand Up @@ -146,7 +143,6 @@ public void setUp() throws IOException {

final String testConfigLiveRootPath = testConfig.liveRootPath();
assertThat(testConfigLiveRootPath).isEqualTo(testLiveRootPath.toString());
pathResolverMock = spy(BlockAsLocalFilePathResolver.of(testConfig));
}

@Test
Expand Down Expand Up @@ -701,12 +697,10 @@ private BlockVerificationSessionFactory getBlockVerificationSessionFactory() {

private PbjBlockStreamServiceProxy buildBlockStreamService() {
final BlockRemover blockRemover = mock(BlockRemover.class);
final BlockPathResolver blockPathResolver = mock(BlockPathResolver.class);
final ServiceStatus serviceStatus = new ServiceStatusImpl(blockNodeContext);
final LiveStreamMediator streamMediator = buildStreamMediator(new ConcurrentHashMap<>(32), serviceStatus);
final Notifier notifier = new NotifierImpl(streamMediator, blockNodeContext, serviceStatus);
final AckHandler blockManager =
new AckHandlerImpl(notifier, false, serviceStatus, blockRemover, blockPathResolver);
final AckHandler blockManager = new AckHandlerImpl(notifier, false, serviceStatus, blockRemover);
final BlockVerificationSessionFactory blockVerificationSessionFactory = getBlockVerificationSessionFactory();
final BlockVerificationService BlockVerificationService = new BlockVerificationServiceImpl(
blockNodeContext.metricsService(), blockVerificationSessionFactory, blockManager);
Expand Down

0 comments on commit 7454755

Please sign in to comment.