Skip to content

Commit

Permalink
re-add a "sync check" before accepting/processing remote transactions…
Browse files Browse the repository at this point in the history
…, but only check that we have completed an initial sync and have a chain head (#4035)

Signed-off-by: garyschulte <[email protected]>
  • Loading branch information
garyschulte committed Jul 7, 2022
1 parent bd52c40 commit 68bfc99
Show file tree
Hide file tree
Showing 12 changed files with 75 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ public BesuController build() {
ethContext,
clock,
metricsSystem,
syncState,
syncState::isInitialSyncPhaseDone,
miningParameters,
transactionPoolConfiguration);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public void setUp() {
mockEthContext,
TestClock.fixed(),
new NoOpMetricsSystem(),
syncState,
syncState::isInitialSyncPhaseDone,
new MiningParameters.Builder().minTransactionGasPrice(Wei.ZERO).build(),
txPoolConfig);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public enum TransactionInvalidReason {
INTRINSIC_GAS_EXCEEDS_GAS_LIMIT,
EXCEEDS_BLOCK_GAS_LIMIT,
TX_SENDER_NOT_AUTHORIZED,
CHAIN_HEAD_NOT_AVAILABLE,
CHAIN_HEAD_WORLD_STATE_NOT_AVAILABLE,
EXCEEDS_PER_TRANSACTION_GAS_LIMIT,
INVALID_TRANSACTION_FORMAT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.task.BufferedGetPooledTransactionsFromPeerFetcher;
import org.hyperledger.besu.ethereum.eth.messages.NewPooledTransactionHashesMessage;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason;
import org.hyperledger.besu.ethereum.rlp.RLPException;
import org.hyperledger.besu.metrics.BesuMetricCategory;
Expand All @@ -34,6 +33,7 @@
import java.time.Instant;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import org.slf4j.Logger;
Expand All @@ -55,19 +55,21 @@ public class NewPooledTransactionHashesMessageProcessor {
private final TransactionPoolConfiguration transactionPoolConfiguration;
private final EthContext ethContext;
private final MetricsSystem metricsSystem;
private final Supplier<Boolean> shouldProcessMessages;

public NewPooledTransactionHashesMessageProcessor(
final PeerTransactionTracker transactionTracker,
final TransactionPool transactionPool,
final TransactionPoolConfiguration transactionPoolConfiguration,
final EthContext ethContext,
final MetricsSystem metricsSystem,
final SyncState syncState) {
final Supplier<Boolean> shouldProcessMessages) {
this.transactionTracker = transactionTracker;
this.transactionPool = transactionPool;
this.transactionPoolConfiguration = transactionPoolConfiguration;
this.ethContext = ethContext;
this.metricsSystem = metricsSystem;
this.shouldProcessMessages = shouldProcessMessages;
this.totalSkippedNewPooledTransactionHashesMessageCounter =
new RunnableCounter(
metricsSystem.createCounter(
Expand Down Expand Up @@ -108,25 +110,26 @@ private void processNewPooledTransactionHashesMessage(
incomingTransactionHashes::size,
incomingTransactionHashes::toString);

final BufferedGetPooledTransactionsFromPeerFetcher bufferedTask =
scheduledTasks.computeIfAbsent(
peer,
ethPeer -> {
ethContext
.getScheduler()
.scheduleFutureTask(
new FetcherCreatorTask(peer),
transactionPoolConfiguration.getEth65TrxAnnouncedBufferingPeriod());

return new BufferedGetPooledTransactionsFromPeerFetcher(
ethContext, peer, transactionPool, transactionTracker, metricsSystem);
});

bufferedTask.addHashes(
incomingTransactionHashes.stream()
.filter(hash -> transactionPool.getTransactionByHash(hash).isEmpty())
.collect(Collectors.toList()));

if (shouldProcessMessages.get()) {
final BufferedGetPooledTransactionsFromPeerFetcher bufferedTask =
scheduledTasks.computeIfAbsent(
peer,
ethPeer -> {
ethContext
.getScheduler()
.scheduleFutureTask(
new FetcherCreatorTask(peer),
transactionPoolConfiguration.getEth65TrxAnnouncedBufferingPeriod());

return new BufferedGetPooledTransactionsFromPeerFetcher(
ethContext, peer, transactionPool, transactionTracker, metricsSystem);
});

bufferedTask.addHashes(
incomingTransactionHashes.stream()
.filter(hash -> transactionPool.getTransactionByHash(hash).isEmpty())
.collect(Collectors.toList()));
}
} catch (final RLPException ex) {
if (peer != null) {
LOG.debug(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
import static java.util.Collections.singletonList;
import static java.util.Optional.ofNullable;
import static org.hyperledger.besu.ethereum.eth.transactions.sorter.AbstractPendingTransactionsSorter.TransactionAddedStatus.ADDED;
import static org.hyperledger.besu.ethereum.transaction.TransactionInvalidReason.CHAIN_HEAD_NOT_AVAILABLE;
import static org.hyperledger.besu.ethereum.transaction.TransactionInvalidReason.CHAIN_HEAD_WORLD_STATE_NOT_AVAILABLE;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.traceLambda;

import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.datatypes.Wei;
Expand Down Expand Up @@ -213,7 +215,16 @@ private ValidationResult<TransactionInvalidReason> validateRemoteTransaction(

private ValidationResult<TransactionInvalidReason> validateTransaction(
final Transaction transaction, final boolean isLocal) {
final BlockHeader chainHeadBlockHeader = getChainHeadBlockHeader();

final BlockHeader chainHeadBlockHeader = getChainHeadBlockHeader().orElse(null);
if (chainHeadBlockHeader == null) {
traceLambda(
LOG,
"rejecting transaction {} due to chain head not available yet",
() -> transaction.getHash());
return ValidationResult.invalid(CHAIN_HEAD_NOT_AVAILABLE);
}

final FeeMarket feeMarket =
protocolSchedule.getByBlockNumber(chainHeadBlockHeader.getNumber()).getFeeMarket();

Expand Down Expand Up @@ -291,17 +302,20 @@ public Optional<Transaction> getTransactionByHash(final Hash hash) {
return pendingTransactions.getTransactionByHash(hash);
}

private BlockHeader getChainHeadBlockHeader() {
private Optional<BlockHeader> getChainHeadBlockHeader() {
final MutableBlockchain blockchain = protocolContext.getBlockchain();
return blockchain.getBlockHeader(blockchain.getChainHeadHash()).get();
return blockchain.getBlockHeader(blockchain.getChainHeadHash());
}

private Wei minTransactionGasPrice(final Transaction transaction) {
final BlockHeader chainHeadBlockHeader = getChainHeadBlockHeader();
return protocolSchedule
.getByBlockNumber(chainHeadBlockHeader.getNumber())
.getFeeMarket()
.minTransactionPriceInNextBlock(transaction, chainHeadBlockHeader::getBaseFee);
return getChainHeadBlockHeader()
.map(
chainHeadBlockHeader ->
protocolSchedule
.getByBlockNumber(chainHeadBlockHeader.getNumber())
.getFeeMarket()
.minTransactionPriceInNextBlock(transaction, chainHeadBlockHeader::getBaseFee))
.orElse(Wei.ZERO);
}

public interface TransactionBatchAddedListener {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.messages.EthPV62;
import org.hyperledger.besu.ethereum.eth.messages.EthPV65;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.transactions.sorter.AbstractPendingTransactionsSorter;
import org.hyperledger.besu.ethereum.eth.transactions.sorter.BaseFeePendingTransactionsSorter;
import org.hyperledger.besu.ethereum.eth.transactions.sorter.GasPricePendingTransactionsSorter;
Expand All @@ -29,6 +28,7 @@
import org.hyperledger.besu.plugin.services.MetricsSystem;

import java.time.Clock;
import java.util.function.Supplier;

public class TransactionPoolFactory {

Expand All @@ -38,7 +38,7 @@ public static TransactionPool createTransactionPool(
final EthContext ethContext,
final Clock clock,
final MetricsSystem metricsSystem,
final SyncState syncState,
final Supplier<Boolean> shouldProcessTransactions,
final MiningParameters miningParameters,
final TransactionPoolConfiguration transactionPoolConfiguration) {

Expand All @@ -58,7 +58,7 @@ public static TransactionPool createTransactionPool(
protocolContext,
ethContext,
metricsSystem,
syncState,
shouldProcessTransactions,
miningParameters,
transactionPoolConfiguration,
pendingTransactions,
Expand All @@ -72,7 +72,7 @@ static TransactionPool createTransactionPool(
final ProtocolContext protocolContext,
final EthContext ethContext,
final MetricsSystem metricsSystem,
final SyncState syncState,
final Supplier<Boolean> shouldProcessTransactions,
final MiningParameters miningParameters,
final TransactionPoolConfiguration transactionPoolConfiguration,
final AbstractPendingTransactionsSorter pendingTransactions,
Expand Down Expand Up @@ -110,7 +110,7 @@ static TransactionPool createTransactionPool(
transactionPoolConfiguration,
ethContext,
metricsSystem,
syncState),
shouldProcessTransactions),
transactionPoolConfiguration.getTxMessageKeepAliveSeconds());
ethContext
.getEthMessages()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import org.hyperledger.besu.ethereum.eth.messages.ReceiptsMessage;
import org.hyperledger.besu.ethereum.eth.messages.StatusMessage;
import org.hyperledger.besu.ethereum.eth.messages.TransactionsMessage;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolFactory;
Expand Down Expand Up @@ -1027,7 +1026,7 @@ public void transactionMessagesGoToTheCorrectExecutor() {
ethManager.ethContext(),
TestClock.fixed(),
metricsSystem,
mock(SyncState.class),
() -> true,
new MiningParameters.Builder().minTransactionGasPrice(Wei.ZERO).build(),
TransactionPoolConfiguration.DEFAULT);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public void setupTest() {
ethContext,
TestClock.fixed(),
metricsSystem,
syncState,
syncState::isInitialSyncPhaseDone,
new MiningParameters.Builder().minTransactionGasPrice(Wei.ONE).build(),
TransactionPoolConfiguration.DEFAULT);
ethProtocolManager =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ public class NewPooledTransactionHashesMessageProcessorTest {
@Mock private TransactionPoolConfiguration transactionPoolConfiguration;
@Mock private PeerTransactionTracker transactionTracker;
@Mock private EthPeer peer1;
@Mock private SyncState syncState;
@Mock private EthContext ethContext;
@Mock private EthScheduler ethScheduler;
@Mock private SyncState syncState;

private final BlockDataGenerator generator = new BlockDataGenerator();
private final Hash hash1 = generator.transaction().getHash();
Expand All @@ -72,15 +72,15 @@ public void setup() {
metricsSystem = new StubMetricsSystem();
when(transactionPoolConfiguration.getEth65TrxAnnouncedBufferingPeriod())
.thenReturn(Duration.ofMillis(500));

when(syncState.isInitialSyncPhaseDone()).thenReturn(true);
messageHandler =
new NewPooledTransactionHashesMessageProcessor(
transactionTracker,
transactionPool,
transactionPoolConfiguration,
ethContext,
metricsSystem,
syncState);
syncState::isInitialSyncPhaseDone);
when(ethContext.getScheduler()).thenReturn(ethScheduler);
}

Expand Down Expand Up @@ -187,4 +187,17 @@ public void shouldNotScheduleGetPooledTransactionsTaskTwice() {
verify(ethScheduler, times(1))
.scheduleFutureTask(any(FetcherCreatorTask.class), any(Duration.class));
}

@Test
public void shouldNotAddTransactionsWhenDisabled() {

when(syncState.isInitialSyncPhaseDone()).thenReturn(false);
messageHandler.processNewPooledTransactionHashesMessage(
peer1,
NewPooledTransactionHashesMessage.create(asList(hash1, hash2, hash3)),
now(),
ofMinutes(1));

verifyNoInteractions(transactionPool);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public TestNode(
ethContext,
TestClock.fixed(),
metricsSystem,
syncState,
syncState::isInitialSyncPhaseDone,
new MiningParameters.Builder().minTransactionGasPrice(Wei.ZERO).build(),
TransactionPoolConfiguration.DEFAULT);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.ForkIdManager;
import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.transactions.sorter.GasPricePendingTransactionsSorter;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage;
Expand Down Expand Up @@ -70,7 +69,6 @@ public void testDisconnect() {
when(ethContext.getEthPeers()).thenReturn(ethPeers);
final EthScheduler ethScheduler = mock(EthScheduler.class);
when(ethContext.getScheduler()).thenReturn(ethScheduler);
final SyncState state = mock(SyncState.class);
final GasPricePendingTransactionsSorter pendingTransactions =
mock(GasPricePendingTransactionsSorter.class);
final PeerTransactionTracker peerTransactionTracker = mock(PeerTransactionTracker.class);
Expand All @@ -85,7 +83,7 @@ public void testDisconnect() {
context,
ethContext,
new NoOpMetricsSystem(),
state,
() -> true,
new MiningParameters.Builder().minTransactionGasPrice(Wei.ONE).build(),
ImmutableTransactionPoolConfiguration.of(
1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ private boolean buildContext(
ethContext,
retestethClock,
metricsSystem,
syncState,
syncState::isInitialSyncPhaseDone,
new MiningParameters.Builder().minTransactionGasPrice(Wei.ZERO).build(),
transactionPoolConfiguration);

Expand Down

0 comments on commit 68bfc99

Please sign in to comment.