Skip to content

Commit

Permalink
Reduce lock contention on transaction pool when building a block (#7180)
Browse files Browse the repository at this point in the history
* Avoid keeping txpool lock during block creation

Signed-off-by: Fabio Di Fabio <[email protected]>

* Update CHANGELOG

Signed-off-by: Fabio Di Fabio <[email protected]>

* Remove unneeded synchronized

Signed-off-by: Fabio Di Fabio <[email protected]>

---------

Signed-off-by: Fabio Di Fabio <[email protected]>
  • Loading branch information
fab-10 authored Jun 13, 2024
1 parent e3e86c7 commit 19d2079
Show file tree
Hide file tree
Showing 15 changed files with 222 additions and 131 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
- Enable continuous profiling with default setting [#7006](https://github.com/hyperledger/besu/pull/7006)
- A full and up to date implementation of EOF for Prague [#7169](https://github.com/hyperledger/besu/pull/7169)
- Add Subnet-Based Peer Permissions. [#7168](https://github.com/hyperledger/besu/pull/7168)
- Reduce lock contention on transaction pool when building a block [#7180](https://github.com/hyperledger/besu/pull/7180)

### Bug fixes
- Make `eth_gasPrice` aware of the base fee market [#7102](https://github.com/hyperledger/besu/pull/7102)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ private List<AbstractTransactionSelector> createTransactionSelectors(
public TransactionSelectionResults buildTransactionListForBlock() {
LOG.atDebug()
.setMessage("Transaction pool stats {}")
.addArgument(blockSelectionContext.transactionPool().logStats())
.addArgument(blockSelectionContext.transactionPool()::logStats)
.log();
timeLimitedSelection();
LOG.atTrace()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ public abstract class AbstractIsolationTests {
txPoolMetrics,
transactionReplacementTester,
new BlobCache(),
MiningParameters.newDefault()));
MiningParameters.newDefault()),
ethScheduler);

protected final List<GenesisAccount> accounts =
GenesisConfigFile.fromResource("/dev.json")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ private static PendingTransactions createLayeredPendingTransactions(
miningParameters);
}

return new LayeredPendingTransactions(transactionPoolConfiguration, pendingTransactionsSorter);
return new LayeredPendingTransactions(
transactionPoolConfiguration, pendingTransactionsSorter, ethScheduler);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolMetrics;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeSet;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import java.util.stream.Stream;

/**
* Holds the current set of executable pending transactions, that are candidate for inclusion on
Expand Down Expand Up @@ -167,9 +167,25 @@ protected int[] getRemainingPromotionsPerType() {
return remainingPromotionsPerType;
}

/**
* Return the full content of this layer, organized as a list of sender pending txs. For each
* sender the collection pending txs is ordered by nonce asc.
*
* <p>Returned sender list order detail: first the sender of the most profitable tx.
*
* @return a list of sender pending txs
*/
@Override
public Stream<PendingTransaction> stream() {
return orderByFee.descendingSet().stream();
public List<SenderPendingTransactions> getBySender() {
final var sendersToAdd = new HashSet<>(txsBySender.keySet());
return orderByFee.descendingSet().stream()
.map(PendingTransaction::getSender)
.filter(sendersToAdd::remove)
.map(
sender ->
new SenderPendingTransactions(
sender, List.copyOf(txsBySender.get(sender).values())))
.toList();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/
package org.hyperledger.besu.ethereum.eth.transactions.layered;

import static java.util.Collections.unmodifiableList;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.ADDED;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.ALREADY_KNOWN;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.REJECTED_UNDERPRICED_REPLACEMENT;
Expand Down Expand Up @@ -54,7 +55,6 @@
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -138,6 +138,14 @@ public boolean contains(final Transaction transaction) {
|| nextLayer.contains(transaction);
}

/**
* Return the full content of this layer, organized as a list of sender pending txs. For each
* sender the collection pending txs is ordered by nonce asc.
*
* @return a list of sender pending txs
*/
public abstract List<SenderPendingTransactions> getBySender();

@Override
public List<PendingTransaction> getAll() {
final List<PendingTransaction> allNextLayers = nextLayer.getAll();
Expand Down Expand Up @@ -548,17 +556,17 @@ public List<Transaction> getAllPriority() {
return priorityTxs;
}

Stream<PendingTransaction> stream(final Address sender) {
return txsBySender.getOrDefault(sender, EMPTY_SENDER_TXS).values().stream();
}

@Override
public List<PendingTransaction> getAllFor(final Address sender) {
return Stream.concat(stream(sender), nextLayer.getAllFor(sender).stream()).toList();
public synchronized List<PendingTransaction> getAllFor(final Address sender) {
final var fromNextLayers = nextLayer.getAllFor(sender);
final var fromThisLayer = txsBySender.getOrDefault(sender, EMPTY_SENDER_TXS).values();
final var concatLayers =
new ArrayList<PendingTransaction>(fromThisLayer.size() + fromNextLayers.size());
concatLayers.addAll(fromThisLayer);
concatLayers.addAll(fromNextLayers);
return unmodifiableList(concatLayers);
}

abstract Stream<PendingTransaction> stream();

@Override
public int count() {
return pendingTransactions.size() + nextLayer.count();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransaction;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransactionAddedListener;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransactionDroppedListener;
Expand All @@ -41,13 +42,10 @@

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collector;
import java.util.stream.Collectors;

Expand All @@ -63,12 +61,15 @@ public class LayeredPendingTransactions implements PendingTransactions {
private static final Marker INVALID_TX_REMOVED = MarkerFactory.getMarker("INVALID_TX_REMOVED");
private final TransactionPoolConfiguration poolConfig;
private final AbstractPrioritizedTransactions prioritizedTransactions;
private final EthScheduler ethScheduler;

public LayeredPendingTransactions(
final TransactionPoolConfiguration poolConfig,
final AbstractPrioritizedTransactions prioritizedTransactions) {
final AbstractPrioritizedTransactions prioritizedTransactions,
final EthScheduler ethScheduler) {
this.poolConfig = poolConfig;
this.prioritizedTransactions = prioritizedTransactions;
this.ethScheduler = ethScheduler;
}

@Override
Expand Down Expand Up @@ -311,79 +312,57 @@ public synchronized List<Transaction> getPriorityTransactions() {
}

@Override
// There's a small edge case here we could encounter.
// When we pass an upgrade block that has a new transaction type, we start allowing transactions
// of that new type into our pool.
// If we then reorg to a block lower than the upgrade block height _and_ we create a block, that
// block could end up with transactions of the new type.
// This seems like it would be very rare but worth it to document that we don't handle that case
// right now.
public synchronized void selectTransactions(
final PendingTransactions.TransactionSelector selector) {
public void selectTransactions(final PendingTransactions.TransactionSelector selector) {
final List<PendingTransaction> invalidTransactions = new ArrayList<>();
final Set<Hash> alreadyChecked = new HashSet<>();
final Set<Address> skipSenders = new HashSet<>();
final AtomicBoolean completed = new AtomicBoolean(false);

prioritizedTransactions.stream()
.takeWhile(unused -> !completed.get())
.filter(highPrioPendingTx -> !skipSenders.contains(highPrioPendingTx.getSender()))
.peek(this::logSenderTxs)
.forEach(
highPrioPendingTx ->
prioritizedTransactions.stream(highPrioPendingTx.getSender())
.takeWhile(
candidatePendingTx ->
!skipSenders.contains(candidatePendingTx.getSender())
&& !completed.get())
.filter(
candidatePendingTx ->
!alreadyChecked.contains(candidatePendingTx.getHash())
&& Long.compareUnsigned(
candidatePendingTx.getNonce(), highPrioPendingTx.getNonce())
<= 0)
.forEach(
candidatePendingTx -> {
alreadyChecked.add(candidatePendingTx.getHash());
final var res = selector.evaluateTransaction(candidatePendingTx);

LOG.atTrace()
.setMessage("Selection result {} for transaction {}")
.addArgument(res)
.addArgument(candidatePendingTx::toTraceLog)
.log();

if (res.discard()) {
invalidTransactions.add(candidatePendingTx);
logDiscardedTransaction(candidatePendingTx, res);
}

if (res.stop()) {
completed.set(true);
}

if (!res.selected()) {
// avoid processing other txs from this sender if this one is skipped
// since the following will not be selected due to the nonce gap
skipSenders.add(candidatePendingTx.getSender());
LOG.trace("Skipping tx from sender {}", candidatePendingTx.getSender());
}
}));

invalidTransactions.forEach(
invalidTx -> prioritizedTransactions.remove(invalidTx, INVALIDATED));
}

private void logSenderTxs(final PendingTransaction highPrioPendingTx) {
LOG.atTrace()
.setMessage("highPrioPendingTx {}, senderTxs {}")
.addArgument(highPrioPendingTx::toTraceLog)
.addArgument(
() ->
prioritizedTransactions.stream(highPrioPendingTx.getSender())
.map(PendingTransaction::toTraceLog)
.collect(Collectors.joining(", ")))
.log();
final List<SenderPendingTransactions> candidateTxsBySender;
synchronized (this) {
// since selecting transactions for block creation is a potential long operation
// we want to avoid to keep the lock for all the process, but we just lock to get
// the candidate transactions
candidateTxsBySender = prioritizedTransactions.getBySender();
}

selection:
for (final var senderTxs : candidateTxsBySender) {
LOG.trace("highPrioSenderTxs {}", senderTxs);

for (final var candidatePendingTx : senderTxs.pendingTransactions()) {
final var selectionResult = selector.evaluateTransaction(candidatePendingTx);

LOG.atTrace()
.setMessage("Selection result {} for transaction {}")
.addArgument(selectionResult)
.addArgument(candidatePendingTx::toTraceLog)
.log();

if (selectionResult.discard()) {
invalidTransactions.add(candidatePendingTx);
logDiscardedTransaction(candidatePendingTx, selectionResult);
}

if (selectionResult.stop()) {
LOG.trace("Stopping selection");
break selection;
}

if (!selectionResult.selected()) {
// avoid processing other txs from this sender if this one is skipped
// since the following will not be selected due to the nonce gap
LOG.trace("Skipping remaining txs for sender {}", candidatePendingTx.getSender());
break;
}
}
}

ethScheduler.scheduleTxWorkerTask(
() ->
invalidTransactions.forEach(
invalidTx -> {
synchronized (this) {
prioritizedTransactions.remove(invalidTx, INVALIDATED);
}
}));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import java.util.function.BiFunction;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class ReadyTransactions extends AbstractSequentialTransactionsLayer {

Expand Down Expand Up @@ -137,11 +136,24 @@ protected boolean promotionFilter(final PendingTransaction pendingTransaction) {
return true;
}

/**
* Return the full content of this layer, organized as a list of sender pending txs. For each
* sender the collection pending txs is ordered by nonce asc.
*
* <p>Returned sender list order detail: first the sender of the tx with the highest max gas
* price.
*
* @return a list of sender pending txs
*/
@Override
public Stream<PendingTransaction> stream() {
public List<SenderPendingTransactions> getBySender() {
return orderByMaxFee.descendingSet().stream()
.map(PendingTransaction::getSender)
.flatMap(sender -> txsBySender.get(sender).values().stream());
.map(
sender ->
new SenderPendingTransactions(
sender, List.copyOf(txsBySender.get(sender).values())))
.toList();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.transactions.layered;

import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransaction;

import java.util.List;
import java.util.stream.Collectors;

/**
* A list of pending transactions of a specific sender, ordered by nonce asc
*
* @param sender the sender
* @param pendingTransactions the list of pending transactions order by nonce asc
*/
public record SenderPendingTransactions(
Address sender, List<PendingTransaction> pendingTransactions) {

@Override
public String toString() {
return "Sender "
+ sender
+ " has "
+ pendingTransactions.size()
+ " pending transactions "
+ pendingTransactions.stream()
.map(PendingTransaction::toTraceLog)
.collect(Collectors.joining(",", "[", "]"));
}
}
Loading

0 comments on commit 19d2079

Please sign in to comment.