Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce lock contention on transaction pool when building a block #7180

Merged
merged 6 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
- Enable continuous profiling with default setting [#7006](https://github.com/hyperledger/besu/pull/7006)
- A full and up to date implementation of EOF for Prague [#7169](https://github.com/hyperledger/besu/pull/7169)
- Add Subnet-Based Peer Permissions. [#7168](https://github.com/hyperledger/besu/pull/7168)
- Reduce lock contention on transaction pool when building a block [#7180](https://github.com/hyperledger/besu/pull/7180)

### Bug fixes
- Make `eth_gasPrice` aware of the base fee market [#7102](https://github.com/hyperledger/besu/pull/7102)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ private List<AbstractTransactionSelector> createTransactionSelectors(
public TransactionSelectionResults buildTransactionListForBlock() {
LOG.atDebug()
.setMessage("Transaction pool stats {}")
.addArgument(blockSelectionContext.transactionPool().logStats())
.addArgument(blockSelectionContext.transactionPool()::logStats)
.log();
timeLimitedSelection();
LOG.atTrace()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ public abstract class AbstractIsolationTests {
txPoolMetrics,
transactionReplacementTester,
new BlobCache(),
MiningParameters.newDefault()));
MiningParameters.newDefault()),
ethScheduler);

protected final List<GenesisAccount> accounts =
GenesisConfigFile.fromResource("/dev.json")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ private static PendingTransactions createLayeredPendingTransactions(
miningParameters);
}

return new LayeredPendingTransactions(transactionPoolConfiguration, pendingTransactionsSorter);
return new LayeredPendingTransactions(
transactionPoolConfiguration, pendingTransactionsSorter, ethScheduler);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolMetrics;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeSet;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import java.util.stream.Stream;

/**
* Holds the current set of executable pending transactions, that are candidate for inclusion on
Expand Down Expand Up @@ -167,9 +167,25 @@ protected int[] getRemainingPromotionsPerType() {
return remainingPromotionsPerType;
}

/**
* Return the full content of this layer, organized as a list of sender pending txs. For each
* sender the collection pending txs is ordered by nonce asc.
*
* <p>Returned sender list order detail: first the sender of the most profitable tx.
*
* @return a list of sender pending txs
*/
@Override
public Stream<PendingTransaction> stream() {
return orderByFee.descendingSet().stream();
public List<SenderPendingTransactions> getBySender() {
final var sendersToAdd = new HashSet<>(txsBySender.keySet());
return orderByFee.descendingSet().stream()
.map(PendingTransaction::getSender)
.filter(sendersToAdd::remove)
.map(
sender ->
new SenderPendingTransactions(
sender, List.copyOf(txsBySender.get(sender).values())))
.toList();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/
package org.hyperledger.besu.ethereum.eth.transactions.layered;

import static java.util.Collections.unmodifiableList;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.ADDED;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.ALREADY_KNOWN;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.REJECTED_UNDERPRICED_REPLACEMENT;
Expand Down Expand Up @@ -54,7 +55,6 @@
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -138,6 +138,14 @@ public boolean contains(final Transaction transaction) {
|| nextLayer.contains(transaction);
}

/**
* Return the full content of this layer, organized as a list of sender pending txs. For each
* sender the collection pending txs is ordered by nonce asc.
*
* @return a list of sender pending txs
*/
public abstract List<SenderPendingTransactions> getBySender();

@Override
public List<PendingTransaction> getAll() {
final List<PendingTransaction> allNextLayers = nextLayer.getAll();
Expand Down Expand Up @@ -548,17 +556,17 @@ public List<Transaction> getAllPriority() {
return priorityTxs;
}

Stream<PendingTransaction> stream(final Address sender) {
return txsBySender.getOrDefault(sender, EMPTY_SENDER_TXS).values().stream();
}

@Override
public List<PendingTransaction> getAllFor(final Address sender) {
return Stream.concat(stream(sender), nextLayer.getAllFor(sender).stream()).toList();
public synchronized List<PendingTransaction> getAllFor(final Address sender) {
final var fromNextLayers = nextLayer.getAllFor(sender);
final var fromThisLayer = txsBySender.getOrDefault(sender, EMPTY_SENDER_TXS).values();
final var concatLayers =
new ArrayList<PendingTransaction>(fromThisLayer.size() + fromNextLayers.size());
concatLayers.addAll(fromThisLayer);
concatLayers.addAll(fromNextLayers);
return unmodifiableList(concatLayers);
}

abstract Stream<PendingTransaction> stream();

@Override
public int count() {
return pendingTransactions.size() + nextLayer.count();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransaction;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransactionAddedListener;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransactionDroppedListener;
Expand All @@ -41,13 +42,10 @@

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collector;
import java.util.stream.Collectors;

Expand All @@ -63,12 +61,15 @@ public class LayeredPendingTransactions implements PendingTransactions {
private static final Marker INVALID_TX_REMOVED = MarkerFactory.getMarker("INVALID_TX_REMOVED");
private final TransactionPoolConfiguration poolConfig;
private final AbstractPrioritizedTransactions prioritizedTransactions;
private final EthScheduler ethScheduler;

public LayeredPendingTransactions(
final TransactionPoolConfiguration poolConfig,
final AbstractPrioritizedTransactions prioritizedTransactions) {
final AbstractPrioritizedTransactions prioritizedTransactions,
final EthScheduler ethScheduler) {
this.poolConfig = poolConfig;
this.prioritizedTransactions = prioritizedTransactions;
this.ethScheduler = ethScheduler;
}

@Override
Expand Down Expand Up @@ -311,79 +312,57 @@ public synchronized List<Transaction> getPriorityTransactions() {
}

@Override
// There's a small edge case here we could encounter.
// When we pass an upgrade block that has a new transaction type, we start allowing transactions
// of that new type into our pool.
// If we then reorg to a block lower than the upgrade block height _and_ we create a block, that
// block could end up with transactions of the new type.
// This seems like it would be very rare but worth it to document that we don't handle that case
// right now.
public synchronized void selectTransactions(
final PendingTransactions.TransactionSelector selector) {
public void selectTransactions(final PendingTransactions.TransactionSelector selector) {
final List<PendingTransaction> invalidTransactions = new ArrayList<>();
final Set<Hash> alreadyChecked = new HashSet<>();
final Set<Address> skipSenders = new HashSet<>();
final AtomicBoolean completed = new AtomicBoolean(false);

prioritizedTransactions.stream()
.takeWhile(unused -> !completed.get())
.filter(highPrioPendingTx -> !skipSenders.contains(highPrioPendingTx.getSender()))
.peek(this::logSenderTxs)
.forEach(
highPrioPendingTx ->
prioritizedTransactions.stream(highPrioPendingTx.getSender())
.takeWhile(
candidatePendingTx ->
!skipSenders.contains(candidatePendingTx.getSender())
&& !completed.get())
.filter(
candidatePendingTx ->
!alreadyChecked.contains(candidatePendingTx.getHash())
&& Long.compareUnsigned(
candidatePendingTx.getNonce(), highPrioPendingTx.getNonce())
<= 0)
.forEach(
candidatePendingTx -> {
alreadyChecked.add(candidatePendingTx.getHash());
final var res = selector.evaluateTransaction(candidatePendingTx);

LOG.atTrace()
.setMessage("Selection result {} for transaction {}")
.addArgument(res)
.addArgument(candidatePendingTx::toTraceLog)
.log();

if (res.discard()) {
invalidTransactions.add(candidatePendingTx);
logDiscardedTransaction(candidatePendingTx, res);
}

if (res.stop()) {
completed.set(true);
}

if (!res.selected()) {
// avoid processing other txs from this sender if this one is skipped
// since the following will not be selected due to the nonce gap
skipSenders.add(candidatePendingTx.getSender());
LOG.trace("Skipping tx from sender {}", candidatePendingTx.getSender());
}
}));

invalidTransactions.forEach(
invalidTx -> prioritizedTransactions.remove(invalidTx, INVALIDATED));
}

private void logSenderTxs(final PendingTransaction highPrioPendingTx) {
LOG.atTrace()
.setMessage("highPrioPendingTx {}, senderTxs {}")
.addArgument(highPrioPendingTx::toTraceLog)
.addArgument(
() ->
prioritizedTransactions.stream(highPrioPendingTx.getSender())
.map(PendingTransaction::toTraceLog)
.collect(Collectors.joining(", ")))
.log();
final List<SenderPendingTransactions> candidateTxsBySender;
synchronized (this) {
// since selecting transactions for block creation is a potential long operation
// we want to avoid to keep the lock for all the process, but we just lock to get
// the candidate transactions
candidateTxsBySender = prioritizedTransactions.getBySender();
}

selection:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GOTO coding is generaly discouraged as it is hard to read, can create spaghetti code and difficult to debug. I would suggest to review the code with a while loop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I generally agree with you, but in this specific case, probably this version is better, the code fragment is short, I also tried to choose a label that make sense when read: break selection

this is my version with the while, it is more verbose, wdyt?

    boolean continueSelection = true;
    final var itCandidateTxsBySender = candidateTxsBySender.iterator();

    while(itCandidateTxsBySender.hasNext() && continueSelection) {
      final var senderTxs = itCandidateTxsBySender.next();
      LOG.trace("highPrioSenderTxs {}", senderTxs);

      boolean continueWithSender = true;
      final var itSenderTxs = senderTxs.pendingTransactions().iterator();
      while (itSenderTxs.hasNext() && continueWithSender) {
        final var candidatePendingTx = itSenderTxs.next();
        final var selectionResult = selector.evaluateTransaction(candidatePendingTx);

        LOG.atTrace()
            .setMessage("Selection result {} for transaction {}")
            .addArgument(selectionResult)
            .addArgument(candidatePendingTx::toTraceLog)
            .log();

        if (selectionResult.discard()) {
          invalidTransactions.add(candidatePendingTx);
          logDiscardedTransaction(candidatePendingTx, selectionResult);
        }

        if (selectionResult.stop()) {
          LOG.trace("Stopping selection");
          continueSelection = false;
          continueWithSender = false;
        } else {
          if (!selectionResult.selected()) {
            // avoid processing other txs from this sender if this one is skipped
            // since the following will not be selected due to the nonce gap
            LOG.trace("Skipping remaining txs for sender {}", candidatePendingTx.getSender());
            continueWithSender = false;
          }
        }
      }
    }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm primarily concerned about setting a precedent. While I still believe we should avoid using Goto expressions and prefer the while method, I don't consider this a blocker for merging.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, then will prefer to keep it as is for now, but available to review future suggestions.

for (final var senderTxs : candidateTxsBySender) {
LOG.trace("highPrioSenderTxs {}", senderTxs);

for (final var candidatePendingTx : senderTxs.pendingTransactions()) {
final var selectionResult = selector.evaluateTransaction(candidatePendingTx);

LOG.atTrace()
.setMessage("Selection result {} for transaction {}")
.addArgument(selectionResult)
.addArgument(candidatePendingTx::toTraceLog)
.log();

if (selectionResult.discard()) {
invalidTransactions.add(candidatePendingTx);
logDiscardedTransaction(candidatePendingTx, selectionResult);
}

if (selectionResult.stop()) {
LOG.trace("Stopping selection");
break selection;
}

if (!selectionResult.selected()) {
// avoid processing other txs from this sender if this one is skipped
// since the following will not be selected due to the nonce gap
LOG.trace("Skipping remaining txs for sender {}", candidatePendingTx.getSender());
break;
}
}
}

ethScheduler.scheduleTxWorkerTask(
() ->
invalidTransactions.forEach(
invalidTx -> {
synchronized (this) {
prioritizedTransactions.remove(invalidTx, INVALIDATED);
}
}));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import java.util.function.BiFunction;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class ReadyTransactions extends AbstractSequentialTransactionsLayer {

Expand Down Expand Up @@ -137,11 +136,24 @@ protected boolean promotionFilter(final PendingTransaction pendingTransaction) {
return true;
}

/**
* Return the full content of this layer, organized as a list of sender pending txs. For each
* sender the collection pending txs is ordered by nonce asc.
*
* <p>Returned sender list order detail: first the sender of the tx with the highest max gas
* price.
*
* @return a list of sender pending txs
*/
@Override
public Stream<PendingTransaction> stream() {
public List<SenderPendingTransactions> getBySender() {
return orderByMaxFee.descendingSet().stream()
.map(PendingTransaction::getSender)
.flatMap(sender -> txsBySender.get(sender).values().stream());
.map(
sender ->
new SenderPendingTransactions(
sender, List.copyOf(txsBySender.get(sender).values())))
.toList();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.transactions.layered;

import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransaction;

import java.util.List;
import java.util.stream.Collectors;

/**
* A list of pending transactions of a specific sender, ordered by nonce asc
*
* @param sender the sender
* @param pendingTransactions the list of pending transactions order by nonce asc
*/
public record SenderPendingTransactions(
Address sender, List<PendingTransaction> pendingTransactions) {

@Override
public String toString() {
return "Sender "
+ sender
+ " has "
+ pendingTransactions.size()
+ " pending transactions "
+ pendingTransactions.stream()
.map(PendingTransaction::toTraceLog)
.collect(Collectors.joining(",", "[", "]"));
}
}
Loading
Loading