diff --git a/besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java b/besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java index 029f87b005b..edcc248ab34 100644 --- a/besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java +++ b/besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java @@ -1816,10 +1816,8 @@ public BesuControllerBuilder setupControllerBuilder() { if (DataStorageFormat.BONSAI.equals(getDataStorageConfiguration().getDataStorageFormat())) { final DiffBasedSubStorageConfiguration subStorageConfiguration = getDataStorageConfiguration().getDiffBasedSubStorageConfiguration(); - if (subStorageConfiguration.getLimitTrieLogsEnabled()) { - besuControllerBuilder.isParallelTxProcessingEnabled( - subStorageConfiguration.getUnstable().isParallelTxProcessingEnabled()); - } + besuControllerBuilder.isParallelTxProcessingEnabled( + subStorageConfiguration.getUnstable().isParallelTxProcessingEnabled()); } return besuControllerBuilder; } diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/mainnet/AbstractBlockProcessor.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/mainnet/AbstractBlockProcessor.java index 993478fe50c..97a8e1f7d9c 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/mainnet/AbstractBlockProcessor.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/mainnet/AbstractBlockProcessor.java @@ -28,6 +28,7 @@ import org.hyperledger.besu.ethereum.core.Transaction; import org.hyperledger.besu.ethereum.core.TransactionReceipt; import org.hyperledger.besu.ethereum.core.Withdrawal; +import org.hyperledger.besu.ethereum.mainnet.AbstractBlockProcessor.PreprocessingFunction.NoPreprocessing; import org.hyperledger.besu.ethereum.mainnet.requests.ProcessRequestContext; import org.hyperledger.besu.ethereum.mainnet.requests.RequestProcessorCoordinator; import org.hyperledger.besu.ethereum.privacy.storage.PrivateMetadataUpdater; @@ -99,6 +100,26 @@ public BlockProcessingResult processBlock( final List ommers, final Optional> maybeWithdrawals, final PrivateMetadataUpdater privateMetadataUpdater) { + return processBlock( + blockchain, + worldState, + blockHeader, + transactions, + ommers, + maybeWithdrawals, + privateMetadataUpdater, + new NoPreprocessing()); + } + + protected BlockProcessingResult processBlock( + final Blockchain blockchain, + final MutableWorldState worldState, + final BlockHeader blockHeader, + final List transactions, + final List ommers, + final Optional> maybeWithdrawals, + final PrivateMetadataUpdater privateMetadataUpdater, + final PreprocessingFunction preprocessingBlockFunction) { final List receipts = new ArrayList<>(); long currentGasUsed = 0; long currentBlobGasUsed = 0; @@ -125,7 +146,7 @@ public BlockProcessingResult processBlock( .orElse(Wei.ZERO); final Optional preProcessingContext = - runBlockPreProcessing( + preprocessingBlockFunction.run( worldState, privateMetadataUpdater, blockHeader, @@ -255,17 +276,6 @@ public BlockProcessingResult processBlock( parallelizedTxFound ? Optional.of(nbParallelTx) : Optional.empty()); } - protected Optional runBlockPreProcessing( - final MutableWorldState worldState, - final PrivateMetadataUpdater privateMetadataUpdater, - final BlockHeader blockHeader, - final List transactions, - final Address miningBeneficiary, - final BlockHashLookup blockHashLookup, - final Wei blobGasPrice) { - return Optional.empty(); - } - protected TransactionProcessingResult getTransactionProcessingResult( final Optional preProcessingContext, final MutableWorldState worldState, @@ -318,5 +328,30 @@ abstract boolean rewardCoinbase( final boolean skipZeroBlockRewards); public interface PreprocessingContext {} - ; + + public interface PreprocessingFunction { + Optional run( + final MutableWorldState worldState, + final PrivateMetadataUpdater privateMetadataUpdater, + final BlockHeader blockHeader, + final List transactions, + final Address miningBeneficiary, + final BlockHashLookup blockHashLookup, + final Wei blobGasPrice); + + class NoPreprocessing implements PreprocessingFunction { + + @Override + public Optional run( + final MutableWorldState worldState, + final PrivateMetadataUpdater privateMetadataUpdater, + final BlockHeader blockHeader, + final List transactions, + final Address miningBeneficiary, + final BlockHashLookup blockHashLookup, + final Wei blobGasPrice) { + return Optional.empty(); + } + } + } } diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/mainnet/parallelization/MainnetParallelBlockProcessor.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/mainnet/parallelization/MainnetParallelBlockProcessor.java index 688ff918cb4..ce602c72756 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/mainnet/parallelization/MainnetParallelBlockProcessor.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/mainnet/parallelization/MainnetParallelBlockProcessor.java @@ -16,9 +16,13 @@ import org.hyperledger.besu.datatypes.Address; import org.hyperledger.besu.datatypes.Wei; +import org.hyperledger.besu.ethereum.BlockProcessingResult; +import org.hyperledger.besu.ethereum.chain.Blockchain; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.MutableWorldState; import org.hyperledger.besu.ethereum.core.Transaction; +import org.hyperledger.besu.ethereum.core.Withdrawal; +import org.hyperledger.besu.ethereum.mainnet.AbstractBlockProcessor.PreprocessingFunction.NoPreprocessing; import org.hyperledger.besu.ethereum.mainnet.BlockProcessor; import org.hyperledger.besu.ethereum.mainnet.MainnetBlockProcessor; import org.hyperledger.besu.ethereum.mainnet.MainnetTransactionProcessor; @@ -37,8 +41,13 @@ import java.util.List; import java.util.Optional; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class MainnetParallelBlockProcessor extends MainnetBlockProcessor { + private static final Logger LOG = LoggerFactory.getLogger(MainnetParallelBlockProcessor.class); + private final Optional metricsSystem; private final Optional confirmedParallelizedTransactionCounter; private final Optional conflictingButCachedTransactionCounter; @@ -78,34 +87,6 @@ public MainnetParallelBlockProcessor( "Counter for the number of conflicted transactions during block processing")); } - @Override - protected Optional runBlockPreProcessing( - final MutableWorldState worldState, - final PrivateMetadataUpdater privateMetadataUpdater, - final BlockHeader blockHeader, - final List transactions, - final Address miningBeneficiary, - final BlockHashLookup blockHashLookup, - final Wei blobGasPrice) { - if ((worldState instanceof DiffBasedWorldState)) { - ParallelizedConcurrentTransactionProcessor parallelizedConcurrentTransactionProcessor = - new ParallelizedConcurrentTransactionProcessor(transactionProcessor); - // runAsyncBlock, if activated, facilitates the non-blocking parallel execution of - // transactions in the background through an optimistic strategy. - parallelizedConcurrentTransactionProcessor.runAsyncBlock( - worldState, - blockHeader, - transactions, - miningBeneficiary, - blockHashLookup, - blobGasPrice, - privateMetadataUpdater); - return Optional.of( - new ParallelizedPreProcessingContext(parallelizedConcurrentTransactionProcessor)); - } - return Optional.empty(); - } - @Override protected TransactionProcessingResult getTransactionProcessingResult( final Optional preProcessingContext, @@ -126,7 +107,7 @@ protected TransactionProcessingResult getTransactionProcessingResult( (ParallelizedPreProcessingContext) preProcessingContext.get(); transactionProcessingResult = parallelizedPreProcessingContext - .getParallelizedConcurrentTransactionProcessor() + .parallelizedConcurrentTransactionProcessor() .applyParallelizedTransactionResult( worldState, miningBeneficiary, @@ -154,21 +135,48 @@ protected TransactionProcessingResult getTransactionProcessingResult( } } - static class ParallelizedPreProcessingContext implements PreprocessingContext { - final ParallelizedConcurrentTransactionProcessor parallelizedConcurrentTransactionProcessor; - - public ParallelizedPreProcessingContext( - final ParallelizedConcurrentTransactionProcessor - parallelizedConcurrentTransactionProcessor) { - this.parallelizedConcurrentTransactionProcessor = parallelizedConcurrentTransactionProcessor; - } - - public ParallelizedConcurrentTransactionProcessor - getParallelizedConcurrentTransactionProcessor() { - return parallelizedConcurrentTransactionProcessor; + @Override + public BlockProcessingResult processBlock( + final Blockchain blockchain, + final MutableWorldState worldState, + final BlockHeader blockHeader, + final List transactions, + final List ommers, + final Optional> maybeWithdrawals, + final PrivateMetadataUpdater privateMetadataUpdater) { + final BlockProcessingResult blockProcessingResult = + super.processBlock( + blockchain, + worldState, + blockHeader, + transactions, + ommers, + maybeWithdrawals, + privateMetadataUpdater, + new ParallelTransactionPreprocessing()); + if (blockProcessingResult.isFailed()) { + // Fallback to non-parallel processing if there is a block processing exception . + LOG.info( + "Parallel transaction processing failure. Falling back to non-parallel processing for block #{} ({})", + blockHeader.getNumber(), + blockHeader.getBlockHash()); + return super.processBlock( + blockchain, + worldState, + blockHeader, + transactions, + ommers, + maybeWithdrawals, + privateMetadataUpdater, + new NoPreprocessing()); } + return blockProcessingResult; } + record ParallelizedPreProcessingContext( + ParallelizedConcurrentTransactionProcessor parallelizedConcurrentTransactionProcessor) + implements PreprocessingContext {} + public static class ParallelBlockProcessorBuilder implements ProtocolSpecBuilder.BlockProcessorBuilder { @@ -196,4 +204,35 @@ public BlockProcessor apply( metricsSystem); } } + + class ParallelTransactionPreprocessing implements PreprocessingFunction { + + @Override + public Optional run( + final MutableWorldState worldState, + final PrivateMetadataUpdater privateMetadataUpdater, + final BlockHeader blockHeader, + final List transactions, + final Address miningBeneficiary, + final BlockHashLookup blockHashLookup, + final Wei blobGasPrice) { + if ((worldState instanceof DiffBasedWorldState)) { + ParallelizedConcurrentTransactionProcessor parallelizedConcurrentTransactionProcessor = + new ParallelizedConcurrentTransactionProcessor(transactionProcessor); + // When enabled, runAsyncBlock performs non-conflicting parallel execution of transactions + // in the background using an optimistic approach. + parallelizedConcurrentTransactionProcessor.runAsyncBlock( + worldState, + blockHeader, + transactions, + miningBeneficiary, + blockHashLookup, + blobGasPrice, + privateMetadataUpdater); + return Optional.of( + new ParallelizedPreProcessingContext(parallelizedConcurrentTransactionProcessor)); + } + return Optional.empty(); + } + } }