Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add fallback for parallelization #8084

Merged
merged 3 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -1816,10 +1816,8 @@ public BesuControllerBuilder setupControllerBuilder() {
if (DataStorageFormat.BONSAI.equals(getDataStorageConfiguration().getDataStorageFormat())) {
final DiffBasedSubStorageConfiguration subStorageConfiguration =
getDataStorageConfiguration().getDiffBasedSubStorageConfiguration();
if (subStorageConfiguration.getLimitTrieLogsEnabled()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was this a git merge thing? odd that we were gating parallel tx on trielog pruning

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a bug I found so I removed this invalid check

besuControllerBuilder.isParallelTxProcessingEnabled(
subStorageConfiguration.getUnstable().isParallelTxProcessingEnabled());
}
besuControllerBuilder.isParallelTxProcessingEnabled(
subStorageConfiguration.getUnstable().isParallelTxProcessingEnabled());
}
return besuControllerBuilder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.core.TransactionReceipt;
import org.hyperledger.besu.ethereum.core.Withdrawal;
import org.hyperledger.besu.ethereum.mainnet.AbstractBlockProcessor.PreprocessingFunction.NoPreprocessing;
import org.hyperledger.besu.ethereum.mainnet.requests.ProcessRequestContext;
import org.hyperledger.besu.ethereum.mainnet.requests.RequestProcessorCoordinator;
import org.hyperledger.besu.ethereum.privacy.storage.PrivateMetadataUpdater;
Expand Down Expand Up @@ -99,6 +100,26 @@ public BlockProcessingResult processBlock(
final List<BlockHeader> ommers,
final Optional<List<Withdrawal>> maybeWithdrawals,
final PrivateMetadataUpdater privateMetadataUpdater) {
return processBlock(
blockchain,
worldState,
blockHeader,
transactions,
ommers,
maybeWithdrawals,
privateMetadataUpdater,
new NoPreprocessing());
}

protected BlockProcessingResult processBlock(
final Blockchain blockchain,
final MutableWorldState worldState,
final BlockHeader blockHeader,
final List<Transaction> transactions,
final List<BlockHeader> ommers,
final Optional<List<Withdrawal>> maybeWithdrawals,
final PrivateMetadataUpdater privateMetadataUpdater,
final PreprocessingFunction preprocessingBlockFunction) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

final List<TransactionReceipt> receipts = new ArrayList<>();
long currentGasUsed = 0;
long currentBlobGasUsed = 0;
Expand All @@ -125,7 +146,7 @@ public BlockProcessingResult processBlock(
.orElse(Wei.ZERO);

final Optional<PreprocessingContext> preProcessingContext =
runBlockPreProcessing(
preprocessingBlockFunction.run(
worldState,
privateMetadataUpdater,
blockHeader,
Expand Down Expand Up @@ -255,17 +276,6 @@ public BlockProcessingResult processBlock(
parallelizedTxFound ? Optional.of(nbParallelTx) : Optional.empty());
}

protected Optional<PreprocessingContext> runBlockPreProcessing(
final MutableWorldState worldState,
final PrivateMetadataUpdater privateMetadataUpdater,
final BlockHeader blockHeader,
final List<Transaction> transactions,
final Address miningBeneficiary,
final BlockHashLookup blockHashLookup,
final Wei blobGasPrice) {
return Optional.empty();
}

protected TransactionProcessingResult getTransactionProcessingResult(
final Optional<PreprocessingContext> preProcessingContext,
final MutableWorldState worldState,
Expand Down Expand Up @@ -318,5 +328,30 @@ abstract boolean rewardCoinbase(
final boolean skipZeroBlockRewards);

public interface PreprocessingContext {}
;

public interface PreprocessingFunction {
Optional<PreprocessingContext> run(
final MutableWorldState worldState,
final PrivateMetadataUpdater privateMetadataUpdater,
final BlockHeader blockHeader,
final List<Transaction> transactions,
final Address miningBeneficiary,
final BlockHashLookup blockHashLookup,
final Wei blobGasPrice);

class NoPreprocessing implements PreprocessingFunction {

@Override
public Optional<PreprocessingContext> run(
final MutableWorldState worldState,
final PrivateMetadataUpdater privateMetadataUpdater,
final BlockHeader blockHeader,
final List<Transaction> transactions,
final Address miningBeneficiary,
final BlockHashLookup blockHashLookup,
final Wei blobGasPrice) {
return Optional.empty();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@

import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.datatypes.Wei;
import org.hyperledger.besu.ethereum.BlockProcessingResult;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.MutableWorldState;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.core.Withdrawal;
import org.hyperledger.besu.ethereum.mainnet.AbstractBlockProcessor.PreprocessingFunction.NoPreprocessing;
import org.hyperledger.besu.ethereum.mainnet.BlockProcessor;
import org.hyperledger.besu.ethereum.mainnet.MainnetBlockProcessor;
import org.hyperledger.besu.ethereum.mainnet.MainnetTransactionProcessor;
Expand All @@ -37,8 +41,13 @@
import java.util.List;
import java.util.Optional;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MainnetParallelBlockProcessor extends MainnetBlockProcessor {

private static final Logger LOG = LoggerFactory.getLogger(MainnetParallelBlockProcessor.class);

private final Optional<MetricsSystem> metricsSystem;
private final Optional<Counter> confirmedParallelizedTransactionCounter;
private final Optional<Counter> conflictingButCachedTransactionCounter;
Expand Down Expand Up @@ -78,34 +87,6 @@ public MainnetParallelBlockProcessor(
"Counter for the number of conflicted transactions during block processing"));
}

@Override
protected Optional<PreprocessingContext> runBlockPreProcessing(
final MutableWorldState worldState,
final PrivateMetadataUpdater privateMetadataUpdater,
final BlockHeader blockHeader,
final List<Transaction> transactions,
final Address miningBeneficiary,
final BlockHashLookup blockHashLookup,
final Wei blobGasPrice) {
if ((worldState instanceof DiffBasedWorldState)) {
ParallelizedConcurrentTransactionProcessor parallelizedConcurrentTransactionProcessor =
new ParallelizedConcurrentTransactionProcessor(transactionProcessor);
// runAsyncBlock, if activated, facilitates the non-blocking parallel execution of
// transactions in the background through an optimistic strategy.
parallelizedConcurrentTransactionProcessor.runAsyncBlock(
worldState,
blockHeader,
transactions,
miningBeneficiary,
blockHashLookup,
blobGasPrice,
privateMetadataUpdater);
return Optional.of(
new ParallelizedPreProcessingContext(parallelizedConcurrentTransactionProcessor));
}
return Optional.empty();
}

@Override
protected TransactionProcessingResult getTransactionProcessingResult(
final Optional<PreprocessingContext> preProcessingContext,
Expand All @@ -126,7 +107,7 @@ protected TransactionProcessingResult getTransactionProcessingResult(
(ParallelizedPreProcessingContext) preProcessingContext.get();
transactionProcessingResult =
parallelizedPreProcessingContext
.getParallelizedConcurrentTransactionProcessor()
.parallelizedConcurrentTransactionProcessor()
.applyParallelizedTransactionResult(
worldState,
miningBeneficiary,
Expand Down Expand Up @@ -154,21 +135,48 @@ protected TransactionProcessingResult getTransactionProcessingResult(
}
}

static class ParallelizedPreProcessingContext implements PreprocessingContext {
final ParallelizedConcurrentTransactionProcessor parallelizedConcurrentTransactionProcessor;

public ParallelizedPreProcessingContext(
final ParallelizedConcurrentTransactionProcessor
parallelizedConcurrentTransactionProcessor) {
this.parallelizedConcurrentTransactionProcessor = parallelizedConcurrentTransactionProcessor;
}

public ParallelizedConcurrentTransactionProcessor
getParallelizedConcurrentTransactionProcessor() {
return parallelizedConcurrentTransactionProcessor;
@Override
public BlockProcessingResult processBlock(
final Blockchain blockchain,
final MutableWorldState worldState,
final BlockHeader blockHeader,
final List<Transaction> transactions,
final List<BlockHeader> ommers,
final Optional<List<Withdrawal>> maybeWithdrawals,
final PrivateMetadataUpdater privateMetadataUpdater) {
final BlockProcessingResult blockProcessingResult =
super.processBlock(
blockchain,
worldState,
blockHeader,
transactions,
ommers,
maybeWithdrawals,
privateMetadataUpdater,
new ParallelTransactionPreprocessing());
if (blockProcessingResult.isFailed()) {
// Fallback to non-parallel processing if there is a block processing exception .
LOG.info(
"Parallel transaction processing failure. Falling back to non-parallel processing for block #{} ({})",
blockHeader.getNumber(),
blockHeader.getBlockHash());
return super.processBlock(
blockchain,
worldState,
blockHeader,
transactions,
ommers,
maybeWithdrawals,
privateMetadataUpdater,
new NoPreprocessing());
}
return blockProcessingResult;
}

record ParallelizedPreProcessingContext(
ParallelizedConcurrentTransactionProcessor parallelizedConcurrentTransactionProcessor)
implements PreprocessingContext {}

public static class ParallelBlockProcessorBuilder
implements ProtocolSpecBuilder.BlockProcessorBuilder {

Expand Down Expand Up @@ -196,4 +204,35 @@ public BlockProcessor apply(
metricsSystem);
}
}

class ParallelTransactionPreprocessing implements PreprocessingFunction {

@Override
public Optional<PreprocessingContext> run(
final MutableWorldState worldState,
final PrivateMetadataUpdater privateMetadataUpdater,
final BlockHeader blockHeader,
final List<Transaction> transactions,
final Address miningBeneficiary,
final BlockHashLookup blockHashLookup,
final Wei blobGasPrice) {
if ((worldState instanceof DiffBasedWorldState)) {
ParallelizedConcurrentTransactionProcessor parallelizedConcurrentTransactionProcessor =
new ParallelizedConcurrentTransactionProcessor(transactionProcessor);
// When enabled, runAsyncBlock performs non-conflicting parallel execution of transactions
// in the background using an optimistic approach.
parallelizedConcurrentTransactionProcessor.runAsyncBlock(
worldState,
blockHeader,
transactions,
miningBeneficiary,
blockHashLookup,
blobGasPrice,
privateMetadataUpdater);
return Optional.of(
new ParallelizedPreProcessingContext(parallelizedConcurrentTransactionProcessor));
}
return Optional.empty();
}
}
}
Loading