Skip to content

Commit

Permalink
Fix to avoid broadcasting full blob txs (#6835)
Browse files Browse the repository at this point in the history
* separate queue for tx hashes

Signed-off-by: Gabriel Fukushima <[email protected]>

* Refinements

Signed-off-by: Fabio Di Fabio <[email protected]>

* Update tests

Signed-off-by: Fabio Di Fabio <[email protected]>

* Update CHANGELOG

Signed-off-by: Fabio Di Fabio <[email protected]>

* Refinements

Signed-off-by: Fabio Di Fabio <[email protected]>

---------

Signed-off-by: Gabriel Fukushima <[email protected]>
Signed-off-by: Fabio Di Fabio <[email protected]>
Co-authored-by: Gabriel Fukushima <[email protected]>
  • Loading branch information
fab-10 and gfukushima authored Mar 28, 2024
1 parent ceafa2a commit 3a2eb4e
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 29 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
- Don't enable the BFT mining coordinator when running sub commands such as `blocks export` [#6675](https://github.com/hyperledger/besu/pull/6675)
- In JSON-RPC return optional `v` fields for type 1 and type 2 transactions [#6762](https://github.com/hyperledger/besu/pull/6762)
- Fix Shanghai/QBFT block import bug when syncing new nodes [#6765](https://github.com/hyperledger/besu/pull/6765)
- Fix to avoid broadcasting full blob txs, instead of only the tx announcement, to a subset of nodes [#6835](https://github.com/hyperledger/besu/pull/6835)

### Download Links

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void sendTransactionHashesToPeer(final EthPeer peer) {
final Capability capability = peer.getConnection().capability(EthProtocol.NAME);
for (final List<Transaction> txBatch :
Iterables.partition(
transactionTracker.claimTransactionsToSendToPeer(peer), MAX_TRANSACTIONS_HASHES)) {
transactionTracker.claimTransactionHashesToSendToPeer(peer), MAX_TRANSACTIONS_HASHES)) {
try {
final List<Hash> txHashes = toHashList(txBatch);
LOG.atTrace()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@ public class PeerTransactionTracker implements EthPeer.DisconnectCallback {
private static final int MAX_TRACKED_SEEN_TRANSACTIONS = 100_000;
private final Map<EthPeer, Set<Hash>> seenTransactions = new ConcurrentHashMap<>();
private final Map<EthPeer, Set<Transaction>> transactionsToSend = new ConcurrentHashMap<>();
private final Map<EthPeer, Set<Transaction>> transactionHashesToSend = new ConcurrentHashMap<>();

public void reset() {
seenTransactions.clear();
transactionsToSend.clear();
transactionHashesToSend.clear();
}

public synchronized void markTransactionsAsSeen(
Expand All @@ -55,6 +57,15 @@ public synchronized void addToPeerSendQueue(final EthPeer peer, final Transactio
}
}

public synchronized void addToPeerHashSendQueue(
final EthPeer peer, final Transaction transaction) {
if (!hasPeerSeenTransaction(peer, transaction)) {
transactionHashesToSend
.computeIfAbsent(peer, key -> createTransactionsSet())
.add(transaction);
}
}

public Iterable<EthPeer> getEthPeersWithUnsentTransactions() {
return transactionsToSend.keySet();
}
Expand All @@ -69,6 +80,16 @@ public synchronized Set<Transaction> claimTransactionsToSendToPeer(final EthPeer
}
}

public synchronized Set<Transaction> claimTransactionHashesToSendToPeer(final EthPeer peer) {
final Set<Transaction> transactionHashesToSend = this.transactionHashesToSend.remove(peer);
if (transactionHashesToSend != null) {
markTransactionHashesAsSeen(peer, toHashList(transactionHashesToSend));
return transactionHashesToSend;
} else {
return emptySet();
}
}

public boolean hasSeenTransaction(final Hash txHash) {
return seenTransactions.values().stream().anyMatch(seen -> seen.contains(txHash));
}
Expand Down Expand Up @@ -100,5 +121,6 @@ protected boolean removeEldestEntry(final Map.Entry<T, Boolean> eldest) {
public void onDisconnect(final EthPeer peer) {
seenTransactions.remove(peer);
transactionsToSend.remove(peer);
transactionHashesToSend.remove(peer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.stream.Collectors;

import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -47,16 +49,33 @@ public class TransactionBroadcaster implements TransactionBatchAddedListener {
private final TransactionsMessageSender transactionsMessageSender;
private final NewPooledTransactionHashesMessageSender newPooledTransactionHashesMessageSender;
private final EthContext ethContext;
private final Random random;

public TransactionBroadcaster(
final EthContext ethContext,
final PeerTransactionTracker transactionTracker,
final TransactionsMessageSender transactionsMessageSender,
final NewPooledTransactionHashesMessageSender newPooledTransactionHashesMessageSender) {
this(
ethContext,
transactionTracker,
transactionsMessageSender,
newPooledTransactionHashesMessageSender,
null);
}

@VisibleForTesting
protected TransactionBroadcaster(
final EthContext ethContext,
final PeerTransactionTracker transactionTracker,
final TransactionsMessageSender transactionsMessageSender,
final NewPooledTransactionHashesMessageSender newPooledTransactionHashesMessageSender,
final Long seed) {
this.transactionTracker = transactionTracker;
this.transactionsMessageSender = transactionsMessageSender;
this.newPooledTransactionHashesMessageSender = newPooledTransactionHashesMessageSender;
this.ethContext = ethContext;
this.random = seed != null ? new Random(seed) : new Random();
}

public void relayTransactionPoolTo(
Expand All @@ -65,7 +84,13 @@ public void relayTransactionPoolTo(
if (peer.hasSupportForMessage(EthPV65.NEW_POOLED_TRANSACTION_HASHES)) {
sendTransactionHashes(toTransactionList(pendingTransactions), List.of(peer));
} else {
sendFullTransactions(toTransactionList(pendingTransactions), List.of(peer));
// we need to exclude txs that support hash only broadcasting
final var fullBroadcastTxs =
pendingTransactions.stream()
.map(PendingTransaction::getTransaction)
.filter(tx -> !ANNOUNCE_HASH_ONLY_TX_TYPES.contains(tx.getType()))
.toList();
sendFullTransactions(fullBroadcastTxs, List.of(peer));
}
}
}
Expand All @@ -77,7 +102,7 @@ public void onTransactionsAdded(final Collection<Transaction> transactions) {
return;
}

final int numPeersToSendFullTransactions = (int) Math.ceil(Math.sqrt(currPeerCount));
final int numPeersToSendFullTransactions = (int) Math.round(Math.sqrt(currPeerCount));

final Map<Boolean, List<Transaction>> transactionByBroadcastMode =
transactions.stream()
Expand Down Expand Up @@ -107,7 +132,7 @@ public void onTransactionsAdded(final Collection<Transaction> transactions) {
numPeersToSendFullTransactions - sendOnlyFullTransactionPeers.size(),
sendOnlyHashPeers.size());

Collections.shuffle(sendOnlyHashPeers);
Collections.shuffle(sendOnlyHashPeers, random);

// move peers from the mixed list to reach the required size for full transaction peers
movePeersBetweenLists(sendOnlyHashPeers, sendMixedPeers, delta);
Expand All @@ -121,7 +146,7 @@ public void onTransactionsAdded(final Collection<Transaction> transactions) {
.addArgument(sendOnlyHashPeers::size)
.addArgument(sendMixedPeers::size)
.addArgument(sendOnlyFullTransactionPeers)
.addArgument(() -> sendOnlyHashPeers.toString() + sendMixedPeers.toString())
.addArgument(() -> sendOnlyHashPeers.toString() + sendMixedPeers)
.log();

sendToFullTransactionsPeers(
Expand All @@ -141,7 +166,7 @@ private void sendToOnlyHashPeers(
final Map<Boolean, List<Transaction>> txsByHashOnlyBroadcast,
final List<EthPeer> hashOnlyPeers) {
final List<Transaction> allTransactions =
txsByHashOnlyBroadcast.values().stream().flatMap(List::stream).collect(Collectors.toList());
txsByHashOnlyBroadcast.values().stream().flatMap(List::stream).toList();

sendTransactionHashes(allTransactions, hashOnlyPeers);
}
Expand Down Expand Up @@ -175,7 +200,7 @@ private void sendTransactionHashes(
.forEach(
peer -> {
transactions.forEach(
transaction -> transactionTracker.addToPeerSendQueue(peer, transaction));
transaction -> transactionTracker.addToPeerHashSendQueue(peer, transaction));
ethContext
.getScheduler()
.scheduleSyncWorkerTask(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ public void setUp() {
@Test
public void shouldSendPendingTransactionsToEachPeer() throws Exception {

transactionTracker.addToPeerSendQueue(peer1, transaction1);
transactionTracker.addToPeerSendQueue(peer1, transaction2);
transactionTracker.addToPeerSendQueue(peer2, transaction3);
transactionTracker.addToPeerHashSendQueue(peer1, transaction1);
transactionTracker.addToPeerHashSendQueue(peer1, transaction2);
transactionTracker.addToPeerHashSendQueue(peer2, transaction3);

List.of(peer1, peer2).forEach(messageSender::sendTransactionHashesToPeer);

Expand All @@ -96,7 +96,8 @@ public void shouldSendTransactionsInBatchesWithLimit() throws Exception {
final Set<Transaction> transactions =
generator.transactions(6000).stream().collect(Collectors.toSet());

transactions.forEach(transaction -> transactionTracker.addToPeerSendQueue(peer1, transaction));
transactions.forEach(
transaction -> transactionTracker.addToPeerHashSendQueue(peer1, transaction));

messageSender.sendTransactionHashesToPeer(peer1);
final ArgumentCaptor<MessageData> messageDataArgumentCaptor =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
public class TransactionBroadcasterTest {

private static final Long FIXED_RANDOM_SEED = 0L;
@Mock private EthContext ethContext;
@Mock private EthPeers ethPeers;
@Mock private EthScheduler ethScheduler;
Expand Down Expand Up @@ -92,12 +92,14 @@ public void setUp() {
when(ethContext.getEthPeers()).thenReturn(ethPeers);
when(ethContext.getScheduler()).thenReturn(ethScheduler);

// we use the fixed random seed to have a predictable shuffle of peers
txBroadcaster =
new TransactionBroadcaster(
ethContext,
transactionTracker,
transactionsMessageSender,
newPooledTransactionHashesMessageSender);
newPooledTransactionHashesMessageSender,
FIXED_RANDOM_SEED);
}

@Test
Expand Down Expand Up @@ -132,7 +134,7 @@ public void relayTransactionHashesFromPoolWhenPeerSupportEth65() {

txBroadcaster.relayTransactionPoolTo(ethPeerWithEth65, pendingTxs);

verifyTransactionAddedToPeerSendingQueue(ethPeerWithEth65, txs);
verifyTransactionAddedToPeerHashSendingQueue(ethPeerWithEth65, txs);

sendTaskCapture.getValue().run();

Expand Down Expand Up @@ -177,14 +179,16 @@ public void onTransactionsAddedWithOnlyFewEth65PeersSendFullTransactions() {
List<Transaction> txs = toTransactionList(setupTransactionPool(1, 1));

txBroadcaster.onTransactionsAdded(txs);

verifyTransactionAddedToPeerSendingQueue(ethPeerWithEth65, txs);
// the shuffled hash only peer list is always:
// [ethPeerWithEth65_3, ethPeerWithEth65_2, ethPeerWithEth65]
// so ethPeerWithEth65 and ethPeerWithEth65_2 are moved to the mixed broadcast list
verifyTransactionAddedToPeerHashSendingQueue(ethPeerWithEth65, txs);
verifyTransactionAddedToPeerSendingQueue(ethPeerWithEth65_2, txs);

sendTaskCapture.getAllValues().forEach(Runnable::run);

verify(transactionsMessageSender, times(2)).sendTransactionsToPeer(any(EthPeer.class));
verifyNoInteractions(newPooledTransactionHashesMessageSender);
verify(transactionsMessageSender).sendTransactionsToPeer(ethPeerWithEth65_2);
verify(newPooledTransactionHashesMessageSender).sendTransactionHashesToPeer(ethPeerWithEth65);
}

@Test
Expand All @@ -196,10 +200,12 @@ public void onTransactionsAddedWithOnlyEth65PeersSendFullTransactionsAndTransact
List<Transaction> txs = toTransactionList(setupTransactionPool(1, 1));

txBroadcaster.onTransactionsAdded(txs);

// the shuffled hash only peer list is always:
// [ethPeerWithEth65_3, ethPeerWithEth65_2, ethPeerWithEth65]
// so ethPeerWithEth65 and ethPeerWithEth65_2 are moved to the mixed broadcast list
verifyTransactionAddedToPeerSendingQueue(ethPeerWithEth65, txs);
verifyTransactionAddedToPeerSendingQueue(ethPeerWithEth65_2, txs);
verifyTransactionAddedToPeerSendingQueue(ethPeerWithEth65_3, txs);
verifyTransactionAddedToPeerHashSendingQueue(ethPeerWithEth65_3, txs);

sendTaskCapture.getAllValues().forEach(Runnable::run);

Expand All @@ -218,8 +224,10 @@ public void onTransactionsAddedWithMixedPeersSendFullTransactionsAndTransactionH
List<Transaction> txs = toTransactionList(setupTransactionPool(1, 1));

txBroadcaster.onTransactionsAdded(txs);

verifyTransactionAddedToPeerSendingQueue(ethPeerWithEth65, txs);
// the shuffled hash only peer list is always:
// [ethPeerWithEth65, ethPeerWithEth65_2]
// so ethPeerWithEth65_2 is moved to the mixed broadcast list
verifyTransactionAddedToPeerHashSendingQueue(ethPeerWithEth65, txs);
verifyTransactionAddedToPeerSendingQueue(ethPeerWithEth65_2, txs);
verifyTransactionAddedToPeerSendingQueue(ethPeerNoEth65, txs);

Expand Down Expand Up @@ -250,9 +258,11 @@ public void onTransactionsAddedWithMixedPeersSendFullTransactionsAndTransactionH
List<Transaction> txs = toTransactionList(setupTransactionPool(BLOB, 0, 1));

txBroadcaster.onTransactionsAdded(txs);

verifyTransactionAddedToPeerSendingQueue(ethPeerWithEth65, txs);
verifyTransactionAddedToPeerSendingQueue(ethPeerWithEth65_2, txs);
// the shuffled hash only peer list is always:
// [ethPeerWithEth65, ethPeerWithEth65_2]
// so ethPeerWithEth65_2 is moved to the mixed broadcast list
verifyTransactionAddedToPeerHashSendingQueue(ethPeerWithEth65, txs);
verifyTransactionAddedToPeerHashSendingQueue(ethPeerWithEth65_2, txs);
verifyNoTransactionAddedToPeerSendingQueue(ethPeerNoEth65);

sendTaskCapture.getAllValues().forEach(Runnable::run);
Expand All @@ -268,7 +278,6 @@ public void onTransactionsAddedWithMixedPeersSendFullTransactionsAndTransactionH

@Test
public void onTransactionsAddedWithMixedPeersAndMixedBroadcastKind() {

List<EthPeer> eth65Peers = List.of(ethPeerWithEth65, ethPeerWithEth65_2);

when(ethPeers.peerCount()).thenReturn(3);
Expand All @@ -285,9 +294,12 @@ public void onTransactionsAddedWithMixedPeersAndMixedBroadcastKind() {
mixedTxs.addAll(hashBroadcastTxs);

txBroadcaster.onTransactionsAdded(mixedTxs);

verifyTransactionAddedToPeerSendingQueue(ethPeerWithEth65, mixedTxs);
verifyTransactionAddedToPeerSendingQueue(ethPeerWithEth65_2, mixedTxs);
// the shuffled hash only peer list is always:
// [ethPeerWithEth65, ethPeerWithEth65_2]
// so ethPeerWithEth65_2 is moved to the mixed broadcast list
verifyTransactionAddedToPeerHashSendingQueue(ethPeerWithEth65, mixedTxs);
verifyTransactionAddedToPeerHashSendingQueue(ethPeerWithEth65_2, hashBroadcastTxs);
verifyTransactionAddedToPeerSendingQueue(ethPeerWithEth65_2, fullBroadcastTxs);
verifyTransactionAddedToPeerSendingQueue(ethPeerNoEth65, fullBroadcastTxs);

sendTaskCapture.getAllValues().forEach(Runnable::run);
Expand Down Expand Up @@ -348,6 +360,16 @@ private void verifyTransactionAddedToPeerSendingQueue(
.containsExactlyInAnyOrderElementsOf(transactions);
}

private void verifyTransactionAddedToPeerHashSendingQueue(
final EthPeer peer, final Collection<Transaction> transactions) {

ArgumentCaptor<Transaction> trackedTransactions = ArgumentCaptor.forClass(Transaction.class);
verify(transactionTracker, times(transactions.size()))
.addToPeerHashSendQueue(eq(peer), trackedTransactions.capture());
assertThat(trackedTransactions.getAllValues())
.containsExactlyInAnyOrderElementsOf(transactions);
}

private void verifyNoTransactionAddedToPeerSendingQueue(final EthPeer peer) {

verify(transactionTracker, times(0)).addToPeerSendQueue(eq(peer), any());
Expand Down

0 comments on commit 3a2eb4e

Please sign in to comment.