From 921bc175c8af626c62ae4982f0ec09538bdc2322 Mon Sep 17 00:00:00 2001 From: Stefan Pingel <16143240+pinges@users.noreply.github.com> Date: Mon, 22 Jan 2024 14:56:43 +1000 Subject: [PATCH] Make fork id the default and try to recover the DiscoveryPeer for incoming connections from the PeerTable (#5628) * make the request for the ENR the default and try to recover the DiscoveryPeer for incoming connections from the PeerTable Signed-off-by: Stefan Signed-off-by: stefan.pingel@consensys.net --- CHANGELOG.md | 4 +- .../options/unstable/NetworkingOptions.java | 6 +- .../cli/options/NetworkingOptionsTest.java | 2 +- .../besu/ethereum/eth/manager/EthPeers.java | 1 + .../eth/manager/EthProtocolManager.java | 124 +++++++++++------- .../p2p/config/DiscoveryConfiguration.java | 2 +- .../p2p/config/NetworkingConfiguration.java | 1 + .../p2p/discovery/PeerDiscoveryAgent.java | 8 +- .../discovery/VertxPeerDiscoveryAgent.java | 7 +- .../internal/PeerDiscoveryController.java | 55 ++------ .../p2p/discovery/internal/PeerTable.java | 14 +- .../p2p/network/DefaultP2PNetwork.java | 12 +- .../besu/ethereum/p2p/rlpx/RlpxAgent.java | 20 +-- .../netty/AbstractHandshakeHandler.java | 85 ++++++------ .../p2p/rlpx/connections/netty/DeFramer.java | 40 +++++- .../netty/HandshakeHandlerInbound.java | 7 +- .../netty/HandshakeHandlerOutbound.java | 7 +- .../netty/NettyConnectionInitializer.java | 12 +- .../netty/NettyTLSConnectionInitializer.java | 12 +- .../discovery/PeerDiscoveryTestHelper.java | 1 + .../internal/MockPeerDiscoveryAgent.java | 3 +- .../internal/PeerDiscoveryControllerTest.java | 46 +------ .../PeerDiscoveryTableRefreshTest.java | 2 - .../p2p/discovery/internal/PeerTableTest.java | 20 +-- .../RecursivePeerRefreshStateTest.java | 6 +- .../p2p/network/DefaultP2PNetworkTest.java | 26 +--- .../besu/ethereum/p2p/rlpx/RlpxAgentTest.java | 2 +- .../rlpx/connections/netty/DeFramerTest.java | 50 ++++++- .../NettyTLSConnectionInitializerTest.java | 5 +- 29 files changed, 308 insertions(+), 272 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 772efa5afaa..a3c84b37b9d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,7 +19,9 @@ - New `EXECUTION_HALTED` error returned if there is an error executing or simulating a transaction, with the reason for execution being halted. Replaces the generic `INTERNAL_ERROR` return code in certain cases which some applications may be checking for [#6343](https://github.com/hyperledger/besu/pull/6343) - The Besu Docker images with `openjdk-latest` tags since 23.10.3 were incorrectly using UID 1001 instead of 1000 for the container's `besu` user. The user now uses 1000 again. Containers created from or migrated to images using UID 1001 will need to chown their persistent database files to UID 1000 [#6360](https://github.com/hyperledger/besu/pull/6360) - The deprecated `--privacy-onchain-groups-enabled` option has now been removed. Use the `--privacy-flexible-groups-enabled` option instead. [#6411](https://github.com/hyperledger/besu/pull/6411) -- The time that can be spent selecting transactions during block creation is not capped at 5 seconds for PoS and PoW networks, and for PoA networks, at 75% of the block period specified in the genesis, this to prevent possible DoS in case a single transaction is taking too long to execute, and to have a stable block production rate, but it could be a breaking change if an existing network used to have transactions that takes more time to executed that the newly introduced limit, if it is mandatory for these network to keep processing these long processing transaction, then the default value of `block-txs-selection-max-time` or `poa-block-txs-selection-max-time` needs to be tuned accordingly. +- Requesting the Ethereum Node Record (ENR) to acquire the fork id from bonded peers is now enabled by default, so the following change has been made [#5628](https://github.com/hyperledger/besu/pull/5628): + - `--Xfilter-on-enr-fork-id` has been removed. To disable the feature use `--filter-on-enr-fork-id=false`. +- The time that can be spent selecting transactions during block creation is not capped at 5 seconds for PoS and PoW networks, and for PoA networks, at 75% of the block period specified in the genesis, this to prevent possible DoS in case a single transaction is taking too long to execute, and to have a stable block production rate, but it could be a breaking change if an existing network used to have transactions that takes more time to executed that the newly introduced limit, if it is mandatory for these network to keep processing these long processing transaction, then the default value of `block-txs-selection-max-time` or `poa-block-txs-selection-max-time` needs to be tuned accordingly. ### Deprecations diff --git a/besu/src/main/java/org/hyperledger/besu/cli/options/unstable/NetworkingOptions.java b/besu/src/main/java/org/hyperledger/besu/cli/options/unstable/NetworkingOptions.java index 0ad68e3627b..69e62edfc0e 100644 --- a/besu/src/main/java/org/hyperledger/besu/cli/options/unstable/NetworkingOptions.java +++ b/besu/src/main/java/org/hyperledger/besu/cli/options/unstable/NetworkingOptions.java @@ -37,7 +37,7 @@ public class NetworkingOptions implements CLIOptions { private final String DNS_DISCOVERY_SERVER_OVERRIDE_FLAG = "--Xp2p-dns-discovery-server"; private final String DISCOVERY_PROTOCOL_V5_ENABLED = "--Xv5-discovery-enabled"; /** The constant FILTER_ON_ENR_FORK_ID. */ - public static final String FILTER_ON_ENR_FORK_ID = "--Xfilter-on-enr-fork-id"; + public static final String FILTER_ON_ENR_FORK_ID = "--filter-on-enr-fork-id"; @CommandLine.Option( names = INITIATE_CONNECTIONS_FREQUENCY_FLAG, @@ -76,9 +76,9 @@ public class NetworkingOptions implements CLIOptions { @CommandLine.Option( names = FILTER_ON_ENR_FORK_ID, hidden = true, - defaultValue = "false", + defaultValue = "true", description = "Whether to enable filtering of peers based on the ENR field ForkId)") - private final Boolean filterOnEnrForkId = false; + private final Boolean filterOnEnrForkId = NetworkingConfiguration.DEFAULT_FILTER_ON_ENR_FORK_ID; @CommandLine.Option( hidden = true, diff --git a/besu/src/test/java/org/hyperledger/besu/cli/options/NetworkingOptionsTest.java b/besu/src/test/java/org/hyperledger/besu/cli/options/NetworkingOptionsTest.java index 4b4601bb9ca..c96b9035e3f 100644 --- a/besu/src/test/java/org/hyperledger/besu/cli/options/NetworkingOptionsTest.java +++ b/besu/src/test/java/org/hyperledger/besu/cli/options/NetworkingOptionsTest.java @@ -134,7 +134,7 @@ public void checkFilterByForkIdNotSet() { final NetworkingOptions options = cmd.getNetworkingOptions(); final NetworkingConfiguration networkingConfig = options.toDomainObject(); - assertThat(networkingConfig.getDiscovery().isFilterOnEnrForkIdEnabled()).isEqualTo(false); + assertThat(networkingConfig.getDiscovery().isFilterOnEnrForkIdEnabled()).isEqualTo(true); assertThat(commandErrorOutput.toString(UTF_8)).isEmpty(); assertThat(commandOutput.toString(UTF_8)).isEmpty(); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java index e28b0a24ad9..c27b473d314 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java @@ -139,6 +139,7 @@ public EthPeers( "peer_limit", "The maximum number of peers this node allows to connect", () -> peerUpperBound); + connectedPeersCounter = metricsSystem.createCounter( BesuMetricCategory.PEERS, "connected_total", "Total number of peers connected"); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java index 6925df9d490..774d35cf185 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java @@ -110,7 +110,7 @@ public EthProtocolManager( this.blockBroadcaster = new BlockBroadcaster(ethContext); - supportedCapabilities = + this.supportedCapabilities = calculateCapabilities(synchronizerConfiguration, ethereumWireProtocolConfiguration); // Run validators @@ -252,11 +252,14 @@ public List getSupportedCapabilities() { @Override public void stop() { if (stopped.compareAndSet(false, true)) { - LOG.info("Stopping {} Subprotocol.", getSupportedProtocol()); + LOG.atInfo().setMessage("Stopping {} Subprotocol.").addArgument(getSupportedProtocol()).log(); scheduler.stop(); shutdown.countDown(); } else { - LOG.error("Attempted to stop already stopped {} Subprotocol.", getSupportedProtocol()); + LOG.atInfo() + .setMessage("Attempted to stop already stopped {} Subprotocol.") + .addArgument(this::getSupportedProtocol) + .log(); } } @@ -264,7 +267,10 @@ public void stop() { public void awaitStop() throws InterruptedException { shutdown.await(); scheduler.awaitStop(); - LOG.info("{} Subprotocol stopped.", getSupportedProtocol()); + LOG.atInfo() + .setMessage("{} Subprotocol stopped.") + .addArgument(this::getSupportedProtocol) + .log(); } @Override @@ -277,8 +283,10 @@ public void processMessage(final Capability cap, final Message message) { EthProtocolLogger.logProcessMessage(cap, code); final EthPeer ethPeer = ethPeers.peer(message.getConnection()); if (ethPeer == null) { - LOG.debug( - "Ignoring message received from unknown peer connection: {}", message.getConnection()); + LOG.atDebug() + .setMessage("Ignoring message received from unknown peer connection: {}") + .addArgument(message::getConnection) + .log(); return; } @@ -288,19 +296,24 @@ public void processMessage(final Capability cap, final Message message) { return; } else if (!ethPeer.statusHasBeenReceived()) { // Peers are required to send status messages before any other message type - LOG.debug( - "{} requires a Status ({}) message to be sent first. Instead, received message {} (BREACH_OF_PROTOCOL). Disconnecting from {}.", - this.getClass().getSimpleName(), - EthPV62.STATUS, - code, - ethPeer); + LOG.atDebug() + .setMessage( + "{} requires a Status ({}) message to be sent first. Instead, received message {} (BREACH_OF_PROTOCOL). Disconnecting from {}.") + .addArgument(() -> this.getClass().getSimpleName()) + .addArgument(EthPV62.STATUS) + .addArgument(code) + .addArgument(ethPeer::toString) + .log(); ethPeer.disconnect(DisconnectReason.BREACH_OF_PROTOCOL); return; } if (this.mergePeerFilter.isPresent()) { if (this.mergePeerFilter.get().disconnectIfGossipingBlocks(message, ethPeer)) { - LOG.debug("Post-merge disconnect: peer still gossiping blocks {}", ethPeer); + LOG.atDebug() + .setMessage("Post-merge disconnect: peer still gossiping blocks {}") + .addArgument(ethPeer::toString) + .log(); handleDisconnect(ethPeer.getConnection(), DisconnectReason.SUBPROTOCOL_TRIGGERED, false); return; } @@ -333,11 +346,12 @@ public void processMessage(final Capability cap, final Message message) { maybeResponseData = ethMessages.dispatch(ethMessage); } } catch (final RLPException e) { - LOG.debug( - "Received malformed message {} (BREACH_OF_PROTOCOL), disconnecting: {}", - messageData.getData(), - ethPeer, - e); + LOG.atDebug() + .setMessage("Received malformed message {} (BREACH_OF_PROTOCOL), disconnecting: {}, {}") + .addArgument(messageData::getData) + .addArgument(ethPeer::toString) + .addArgument(e::toString) + .log(); ethPeer.disconnect(DisconnectMessage.DisconnectReason.BREACH_OF_PROTOCOL); } @@ -368,23 +382,31 @@ public void handleNewConnection(final PeerConnection connection) { genesisHash, latestForkId); try { - LOG.trace("Sending status message to {} for connection {}.", peer.getId(), connection); + LOG.atTrace() + .setMessage("Sending status message to {} for connection {}.") + .addArgument(peer::getId) + .addArgument(connection::toString) + .log(); peer.send(status, getSupportedProtocol(), connection); peer.registerStatusSent(connection); } catch (final PeerNotConnected peerNotConnected) { // Nothing to do. } - LOG.trace("{}", ethPeers); + LOG.atTrace().setMessage("{}").addArgument(ethPeers::toString).log(); } @Override public boolean shouldConnect(final Peer peer, final boolean incoming) { - if (peer.getForkId().map(forkId -> forkIdManager.peerCheck(forkId)).orElse(true)) { - LOG.trace("ForkId OK or not available"); + if (peer.getForkId().map(forkIdManager::peerCheck).orElse(true)) { + LOG.atDebug() + .setMessage("ForkId OK or not available for peer {}") + .addArgument(peer::getId) + .log(); if (ethPeers.shouldConnect(peer, incoming)) { return true; } } + LOG.atDebug().setMessage("ForkId check failed for peer {}").addArgument(peer::getId).log(); return false; } @@ -397,11 +419,11 @@ public void handleDisconnect( LOG.atDebug() .setMessage("Disconnect - {} - {} - {}... - {} peers left") .addArgument(initiatedByPeer ? "Inbound" : "Outbound") - .addArgument(reason) - .addArgument(connection.getPeer().getId().slice(0, 8)) - .addArgument(ethPeers.peerCount()) + .addArgument(reason::toString) + .addArgument(() -> connection.getPeer().getId().slice(0, 8)) + .addArgument(ethPeers::peerCount) .log(); - LOG.trace("{}", ethPeers); + LOG.atTrace().setMessage("{}").addArgument(ethPeers::toString).log(); } } @@ -412,43 +434,41 @@ private void handleStatusMessage(final EthPeer peer, final Message message) { try { if (!status.networkId().equals(networkId)) { LOG.atDebug() - .setMessage("Mismatched network id: {}, EthPeer {}...") - .addArgument(status.networkId()) - .addArgument(peer.getShortNodeId()) - .log(); - LOG.atTrace() - .setMessage("Mismatched network id: {}, EthPeer {}") - .addArgument(status.networkId()) - .addArgument(peer) + .setMessage("Mismatched network id: {}, peer {}") + .addArgument(status::networkId) + .addArgument(() -> getPeerOrPeerId(peer)) .log(); peer.disconnect(DisconnectReason.SUBPROTOCOL_TRIGGERED); } else if (!forkIdManager.peerCheck(forkId) && status.protocolVersion() > 63) { - LOG.debug( - "{} has matching network id ({}), but non-matching fork id: {}", - peer, - networkId, - forkId); + LOG.atDebug() + .setMessage("{} has matching network id ({}), but non-matching fork id: {}") + .addArgument(() -> getPeerOrPeerId(peer)) + .addArgument(networkId::toString) + .addArgument(forkId) + .log(); peer.disconnect(DisconnectReason.SUBPROTOCOL_TRIGGERED); } else if (forkIdManager.peerCheck(status.genesisHash())) { - LOG.debug( - "{} has matching network id ({}), but non-matching genesis hash: {}", - peer, - networkId, - status.genesisHash()); + LOG.atDebug() + .setMessage("{} has matching network id ({}), but non-matching genesis hash: {}") + .addArgument(() -> getPeerOrPeerId(peer)) + .addArgument(networkId::toString) + .addArgument(status::genesisHash) + .log(); peer.disconnect(DisconnectReason.SUBPROTOCOL_TRIGGERED); } else if (mergePeerFilter.isPresent() && mergePeerFilter.get().disconnectIfPoW(status, peer)) { LOG.atDebug() .setMessage("Post-merge disconnect: peer still PoW {}") - .addArgument(peer.getShortNodeId()) + .addArgument(() -> getPeerOrPeerId(peer)) .log(); handleDisconnect(peer.getConnection(), DisconnectReason.SUBPROTOCOL_TRIGGERED, false); } else { - LOG.debug( - "Received status message from {}: {} with connection {}", - peer, - status, - message.getConnection()); + LOG.atDebug() + .setMessage("Received status message from {}: {} with connection {}") + .addArgument(peer::toString) + .addArgument(status::toString) + .addArgument(message::getConnection) + .log(); peer.registerStatusReceived( status.bestHash(), status.totalDifficulty(), @@ -467,6 +487,10 @@ private void handleStatusMessage(final EthPeer peer, final Message message) { } } + private Object getPeerOrPeerId(final EthPeer peer) { + return LOG.isTraceEnabled() ? peer : peer.getShortNodeId(); + } + @Override public void blockMined(final Block block) { // This assumes the block has already been included in the chain diff --git a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/config/DiscoveryConfiguration.java b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/config/DiscoveryConfiguration.java index 036e592e1dd..86bb079a298 100644 --- a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/config/DiscoveryConfiguration.java +++ b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/config/DiscoveryConfiguration.java @@ -32,7 +32,7 @@ public class DiscoveryConfiguration { private List bootnodes = new ArrayList<>(); private String dnsDiscoveryURL; private boolean discoveryV5Enabled = false; - private boolean filterOnEnrForkId = false; + private boolean filterOnEnrForkId = NetworkingConfiguration.DEFAULT_FILTER_ON_ENR_FORK_ID; public static DiscoveryConfiguration create() { return new DiscoveryConfiguration(); diff --git a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/config/NetworkingConfiguration.java b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/config/NetworkingConfiguration.java index 0de53cfd786..478e3617376 100644 --- a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/config/NetworkingConfiguration.java +++ b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/config/NetworkingConfiguration.java @@ -23,6 +23,7 @@ public class NetworkingConfiguration { public static final int DEFAULT_INITIATE_CONNECTIONS_FREQUENCY_SEC = 30; public static final int DEFAULT_CHECK_MAINTAINED_CONNECTIONS_FREQUENCY_SEC = 60; public static final int DEFAULT_PEER_LOWER_BOUND = 25; + public static final boolean DEFAULT_FILTER_ON_ENR_FORK_ID = true; private DiscoveryConfiguration discovery = new DiscoveryConfiguration(); private RlpxConfiguration rlpx = new RlpxConfiguration(); diff --git a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryAgent.java b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryAgent.java index 30272413d49..de7d047180c 100644 --- a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryAgent.java +++ b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryAgent.java @@ -26,6 +26,7 @@ import org.hyperledger.besu.ethereum.p2p.discovery.internal.Packet; import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerDiscoveryController; import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerRequirement; +import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerTable; import org.hyperledger.besu.ethereum.p2p.discovery.internal.PingPacketData; import org.hyperledger.besu.ethereum.p2p.discovery.internal.TimerUtil; import org.hyperledger.besu.ethereum.p2p.peers.EnodeURLImpl; @@ -81,6 +82,7 @@ public abstract class PeerDiscoveryAgent { private final MetricsSystem metricsSystem; private final RlpxAgent rlpxAgent; private final ForkIdManager forkIdManager; + private final PeerTable peerTable; /* The peer controller, which takes care of the state machine of peers. */ protected Optional controller = Optional.empty(); @@ -109,7 +111,8 @@ protected PeerDiscoveryAgent( final MetricsSystem metricsSystem, final StorageProvider storageProvider, final ForkIdManager forkIdManager, - final RlpxAgent rlpxAgent) { + final RlpxAgent rlpxAgent, + final PeerTable peerTable) { this.metricsSystem = metricsSystem; checkArgument(nodeKey != null, "nodeKey cannot be null"); checkArgument(config != null, "provided configuration cannot be null"); @@ -130,6 +133,7 @@ protected PeerDiscoveryAgent( this.forkIdManager = forkIdManager; this.forkIdSupplier = () -> forkIdManager.getForkIdForChainHead().getForkIdAsBytesList(); this.rlpxAgent = rlpxAgent; + this.peerTable = peerTable; } protected abstract TimerUtil createTimer(); @@ -263,9 +267,9 @@ private PeerDiscoveryController createController(final DiscoveryPeer localNode) .peerRequirement(PeerRequirement.combine(peerRequirements)) .peerPermissions(peerPermissions) .metricsSystem(metricsSystem) - .forkIdManager(forkIdManager) .filterOnEnrForkId((config.isFilterOnEnrForkIdEnabled())) .rlpxAgent(rlpxAgent) + .peerTable(peerTable) .build(); } diff --git a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/VertxPeerDiscoveryAgent.java b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/VertxPeerDiscoveryAgent.java index 27a2be8beb3..ef098896a61 100644 --- a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/VertxPeerDiscoveryAgent.java +++ b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/VertxPeerDiscoveryAgent.java @@ -23,6 +23,7 @@ import org.hyperledger.besu.ethereum.p2p.discovery.internal.Packet; import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerDiscoveryController; import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerDiscoveryController.AsyncExecutor; +import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerTable; import org.hyperledger.besu.ethereum.p2p.discovery.internal.TimerUtil; import org.hyperledger.besu.ethereum.p2p.discovery.internal.VertxTimerUtil; import org.hyperledger.besu.ethereum.p2p.permissions.PeerPermissions; @@ -73,7 +74,8 @@ public VertxPeerDiscoveryAgent( final MetricsSystem metricsSystem, final StorageProvider storageProvider, final ForkIdManager forkIdManager, - final RlpxAgent rlpxAgent) { + final RlpxAgent rlpxAgent, + final PeerTable peerTable) { super( nodeKey, config, @@ -82,7 +84,8 @@ public VertxPeerDiscoveryAgent( metricsSystem, storageProvider, forkIdManager, - rlpxAgent); + rlpxAgent, + peerTable); checkArgument(vertx != null, "vertx instance cannot be null"); this.vertx = vertx; diff --git a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerDiscoveryController.java b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerDiscoveryController.java index ec829d20709..af3790def52 100644 --- a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerDiscoveryController.java +++ b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerDiscoveryController.java @@ -21,8 +21,6 @@ import static java.util.concurrent.TimeUnit.SECONDS; import org.hyperledger.besu.cryptoservices.NodeKey; -import org.hyperledger.besu.ethereum.forkid.ForkId; -import org.hyperledger.besu.ethereum.forkid.ForkIdManager; import org.hyperledger.besu.ethereum.p2p.discovery.DiscoveryPeer; import org.hyperledger.besu.ethereum.p2p.discovery.PeerDiscoveryStatus; import org.hyperledger.besu.ethereum.p2p.peers.Peer; @@ -129,7 +127,6 @@ public class PeerDiscoveryController { private final DiscoveryProtocolLogger discoveryProtocolLogger; private final LabelledMetric interactionCounter; private final LabelledMetric interactionRetryCounter; - private final ForkIdManager forkIdManager; private final boolean filterOnEnrForkId; private final RlpxAgent rlpxAgent; @@ -161,7 +158,6 @@ private PeerDiscoveryController( final PeerPermissions peerPermissions, final MetricsSystem metricsSystem, final Optional> maybeCacheForEnrRequests, - final ForkIdManager forkIdManager, final boolean filterOnEnrForkId, final RlpxAgent rlpxAgent) { this.timerUtil = timerUtil; @@ -197,11 +193,11 @@ private PeerDiscoveryController( "discovery_interaction_retry_count", "Total number of interaction retries performed", "type"); + this.cachedEnrRequests = maybeCacheForEnrRequests.orElse( CacheBuilder.newBuilder().maximumSize(50).expireAfterWrite(10, SECONDS).build()); - this.forkIdManager = forkIdManager; this.filterOnEnrForkId = filterOnEnrForkId; } @@ -314,6 +310,7 @@ public void onMessage(final Packet packet, final DiscoveryPeer sender) { } final DiscoveryPeer peer = resolvePeer(sender); + final Bytes peerId = peer.getId(); switch (packet.getType()) { case PING: if (peerPermissions.allowInboundBonding(peer)) { @@ -333,10 +330,10 @@ public void onMessage(final Packet packet, final DiscoveryPeer sender) { if (filterOnEnrForkId) { requestENR(peer); } - bondingPeers.invalidate(peer.getId()); + bondingPeers.invalidate(peerId); addToPeerTable(peer); recursivePeerRefreshState.onBondingComplete(peer); - Optional.ofNullable(cachedEnrRequests.getIfPresent(peer.getId())) + Optional.ofNullable(cachedEnrRequests.getIfPresent(peerId)) .ifPresent(cachedEnrRequest -> processEnrRequest(peer, cachedEnrRequest)); }); break; @@ -360,12 +357,12 @@ public void onMessage(final Packet packet, final DiscoveryPeer sender) { if (PeerDiscoveryStatus.BONDED.equals(peer.getStatus())) { processEnrRequest(peer, packet); } else if (PeerDiscoveryStatus.BONDING.equals(peer.getStatus())) { - LOG.trace("ENR_REQUEST cached for bonding peer Id: {}", peer.getId()); + LOG.trace("ENR_REQUEST cached for bonding peer Id: {}", peerId); // Due to UDP, it may happen that we receive the ENR_REQUEST just before the PONG. // Because peers want to send the ENR_REQUEST directly after the pong. // If this happens we don't want to ignore the request but process when bonded. // this cache allows to keep the request and to respond after having processed the PONG - cachedEnrRequests.put(peer.getId(), packet); + cachedEnrRequests.put(peerId, packet); } break; case ENR_RESPONSE: @@ -376,26 +373,6 @@ public void onMessage(final Packet packet, final DiscoveryPeer sender) { packet.getPacketData(ENRResponsePacketData.class); final NodeRecord enr = packetData.get().getEnr(); peer.setNodeRecord(enr); - - final Optional maybeForkId = peer.getForkId(); - if (maybeForkId.isPresent()) { - if (forkIdManager.peerCheck(maybeForkId.get())) { - connectOnRlpxLayer(peer); - LOG.debug( - "Peer {} PASSED fork id check. ForkId received: {}", - sender.getId(), - maybeForkId.get()); - } else { - LOG.debug( - "Peer {} FAILED fork id check. ForkId received: {}", - sender.getId(), - maybeForkId.get()); - } - } else { - // if the peer hasn't sent the ForkId try to connect to it anyways - connectOnRlpxLayer(peer); - LOG.debug("No fork id sent by peer: {}", peer.getId()); - } }); break; } @@ -431,9 +408,7 @@ private boolean addToPeerTable(final DiscoveryPeer peer) { if (peer.getStatus() != PeerDiscoveryStatus.BONDED) { peer.setStatus(PeerDiscoveryStatus.BONDED); - if (!filterOnEnrForkId) { - connectOnRlpxLayer(peer); - } + connectOnRlpxLayer(peer); } final PeerTable.AddResult result = peerTable.tryAdd(peer); @@ -560,8 +535,6 @@ void bond(final DiscoveryPeer peer) { */ @VisibleForTesting void requestENR(final DiscoveryPeer peer) { - peer.setStatus(PeerDiscoveryStatus.ENR_REQUESTED); - final Consumer action = interaction -> { final ENRRequestPacketData data = ENRRequestPacketData.create(); @@ -838,7 +811,6 @@ public static class Builder { private Cache cachedEnrRequests = CacheBuilder.newBuilder().maximumSize(50).expireAfterWrite(10, SECONDS).build(); - private ForkIdManager forkIdManager; private RlpxAgent rlpxAgent; private Builder() {} @@ -846,10 +818,6 @@ private Builder() {} public PeerDiscoveryController build() { validate(); - if (peerTable == null) { - peerTable = new PeerTable(this.nodeKey.getPublicKey().getEncodedBytes(), 16); - } - return new PeerDiscoveryController( nodeKey, localPeer, @@ -864,7 +832,6 @@ public PeerDiscoveryController build() { peerPermissions, metricsSystem, Optional.of(cachedEnrRequests), - forkIdManager, filterOnEnrForkId, rlpxAgent); } @@ -875,8 +842,8 @@ private void validate() { validateRequiredDependency(timerUtil, "TimerUtil"); validateRequiredDependency(workerExecutor, "AsyncExecutor"); validateRequiredDependency(metricsSystem, "MetricsSystem"); - validateRequiredDependency(forkIdManager, "ForkIdManager"); validateRequiredDependency(rlpxAgent, "RlpxAgent"); + validateRequiredDependency(peerTable, "PeerTable"); } private void validateRequiredDependency(final Object object, final String name) { @@ -970,11 +937,5 @@ public Builder rlpxAgent(final RlpxAgent rlpxAgent) { this.rlpxAgent = rlpxAgent; return this; } - - public Builder forkIdManager(final ForkIdManager forkIdManager) { - checkNotNull(forkIdManager); - this.forkIdManager = forkIdManager; - return this; - } } } diff --git a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerTable.java b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerTable.java index e153acbfc26..f0e0be1fe2d 100644 --- a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerTable.java +++ b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerTable.java @@ -56,26 +56,21 @@ public class PeerTable { * Builds a new peer table, where distance is calculated using the provided nodeId as a baseline. * * @param nodeId The ID of the node where this peer table is stored. - * @param bucketSize The maximum length of each k-bucket. */ - public PeerTable(final Bytes nodeId, final int bucketSize) { + public PeerTable(final Bytes nodeId) { this.keccak256 = Hash.keccak256(nodeId); this.table = Stream.generate(() -> new Bucket(DEFAULT_BUCKET_SIZE)) .limit(N_BUCKETS + 1) .toArray(Bucket[]::new); this.distanceCache = new ConcurrentHashMap<>(); - this.maxEntriesCnt = N_BUCKETS * bucketSize; + this.maxEntriesCnt = N_BUCKETS * DEFAULT_BUCKET_SIZE; // A bloom filter with 4096 expected insertions of 64-byte keys with a 0.1% false positive // probability yields a memory footprint of ~7.5kb. buildBloomFilter(); } - public PeerTable(final Bytes nodeId) { - this(nodeId, DEFAULT_BUCKET_SIZE); - } - /** * Returns the table's representation of a peer, if it exists. * @@ -83,11 +78,12 @@ public PeerTable(final Bytes nodeId) { * @return The stored representation. */ public Optional get(final PeerId peer) { - if (!idBloom.mightContain(peer.getId())) { + final Bytes peerId = peer.getId(); + if (!idBloom.mightContain(peerId)) { return Optional.empty(); } final int distance = distanceFrom(peer); - return table[distance].getAndTouch(peer.getId()); + return table[distance].getAndTouch(peerId); } /** diff --git a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/network/DefaultP2PNetwork.java b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/network/DefaultP2PNetwork.java index ec65934b297..11352b38cc7 100644 --- a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/network/DefaultP2PNetwork.java +++ b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/network/DefaultP2PNetwork.java @@ -27,6 +27,7 @@ import org.hyperledger.besu.ethereum.p2p.discovery.PeerDiscoveryAgent; import org.hyperledger.besu.ethereum.p2p.discovery.PeerDiscoveryStatus; import org.hyperledger.besu.ethereum.p2p.discovery.VertxPeerDiscoveryAgent; +import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerTable; import org.hyperledger.besu.ethereum.p2p.peers.DefaultPeerPrivileges; import org.hyperledger.besu.ethereum.p2p.peers.EnodeURLImpl; import org.hyperledger.besu.ethereum.p2p.peers.LocalNode; @@ -383,11 +384,12 @@ void checkMaintainedConnectionPeers() { @VisibleForTesting void attemptPeerConnections() { LOG.trace("Initiating connections to discovered peers."); - rlpxAgent.connect( + final Stream toTry = streamDiscoveredPeers() .filter(peer -> peer.getStatus() == PeerDiscoveryStatus.BONDED) .filter(peerDiscoveryAgent::checkForkId) - .sorted(Comparator.comparing(DiscoveryPeer::getLastAttemptedConnection))); + .sorted(Comparator.comparing(DiscoveryPeer::getLastAttemptedConnection)); + toTry.forEach(rlpxAgent::connect); } @Override @@ -511,6 +513,7 @@ public static class Builder { private Supplier> allConnectionsSupplier; private Supplier> allActiveConnectionsSupplier; private int peersLowerBound; + private PeerTable peerTable; public P2PNetwork build() { validate(); @@ -528,6 +531,7 @@ private P2PNetwork doBuild() { final MutableLocalNode localNode = MutableLocalNode.create(config.getRlpx().getClientId(), 5, supportedCapabilities); final PeerPrivileges peerPrivileges = new DefaultPeerPrivileges(maintainedPeers); + peerTable = new PeerTable(nodeKey.getPublicKey().getEncodedBytes()); rlpxAgent = rlpxAgent == null ? createRlpxAgent(localNode, peerPrivileges) : rlpxAgent; peerDiscoveryAgent = peerDiscoveryAgent == null ? createDiscoveryAgent() : peerDiscoveryAgent; @@ -572,7 +576,8 @@ private PeerDiscoveryAgent createDiscoveryAgent() { metricsSystem, storageProvider, forkIdManager, - rlpxAgent); + rlpxAgent, + peerTable); } private RlpxAgent createRlpxAgent( @@ -589,6 +594,7 @@ private RlpxAgent createRlpxAgent( .allConnectionsSupplier(allConnectionsSupplier) .allActiveConnectionsSupplier(allActiveConnectionsSupplier) .peersLowerBound(peersLowerBound) + .peerTable(peerTable) .build(); } diff --git a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/RlpxAgent.java b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/RlpxAgent.java index 98a1f60df37..4a8e227d3d5 100644 --- a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/RlpxAgent.java +++ b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/RlpxAgent.java @@ -20,6 +20,7 @@ import org.hyperledger.besu.cryptoservices.NodeKey; import org.hyperledger.besu.ethereum.p2p.config.RlpxConfiguration; import org.hyperledger.besu.ethereum.p2p.discovery.DiscoveryPeer; +import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerTable; import org.hyperledger.besu.ethereum.p2p.peers.LocalNode; import org.hyperledger.besu.ethereum.p2p.peers.Peer; import org.hyperledger.besu.ethereum.p2p.peers.PeerPrivileges; @@ -162,13 +163,6 @@ public int getConnectionCount() { } } - public void connect(final Stream peerStream) { - if (!localNode.isReady()) { - return; - } - peerStream.forEach(this::connect); - } - public void disconnect(final Bytes peerId, final DisconnectReason reason) { try { allActiveConnectionsSupplier @@ -206,6 +200,7 @@ public CompletableFuture connect(final Peer peer) { + this.getClass().getSimpleName() + " has finished starting")); } + // Check peer is valid final EnodeURL enode = peer.getEnodeURL(); if (!enode.isListening()) { @@ -380,6 +375,7 @@ public static class Builder { private Supplier> allConnectionsSupplier; private Supplier> allActiveConnectionsSupplier; private int peersLowerBound; + private PeerTable peerTable; private Builder() {} @@ -399,12 +395,13 @@ public RlpxAgent build() { localNode, connectionEvents, metricsSystem, - p2pTLSConfiguration.get()); + p2pTLSConfiguration.get(), + peerTable); } else { LOG.debug("Using default NettyConnectionInitializer"); connectionInitializer = new NettyConnectionInitializer( - nodeKey, config, localNode, connectionEvents, metricsSystem); + nodeKey, config, localNode, connectionEvents, metricsSystem, peerTable); } } @@ -499,5 +496,10 @@ public Builder peersLowerBound(final int peersLowerBound) { this.peersLowerBound = peersLowerBound; return this; } + + public Builder peerTable(final PeerTable peerTable) { + this.peerTable = peerTable; + return this; + } } } diff --git a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/netty/AbstractHandshakeHandler.java b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/netty/AbstractHandshakeHandler.java index 003a6ab1d9d..80be0a673db 100644 --- a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/netty/AbstractHandshakeHandler.java +++ b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/netty/AbstractHandshakeHandler.java @@ -14,6 +14,7 @@ */ package org.hyperledger.besu.ethereum.p2p.rlpx.connections.netty; +import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerTable; import org.hyperledger.besu.ethereum.p2p.peers.LocalNode; import org.hyperledger.besu.ethereum.p2p.peers.Peer; import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection; @@ -60,6 +61,7 @@ abstract class AbstractHandshakeHandler extends SimpleChannelInboundHandler subProtocols, @@ -70,7 +72,8 @@ abstract class AbstractHandshakeHandler extends SimpleChannelInboundHandler { + if (ff.isSuccess()) { + LOG.trace("Successfully wrote hello message"); + } + }); + msg.retain(); + ctx.fireChannelRead(msg); } - - LOG.trace("Sending framed hello"); - - // Exchange keys done - final Framer framer = this.framerProvider.buildFramer(handshaker.secrets()); - - final ByteToMessageDecoder deFramer = - new DeFramer( - framer, - subProtocols, - localNode, - expectedPeer, - connectionEventDispatcher, - connectionFuture, - metricsSystem, - inboundInitiated); - - ctx.channel() - .pipeline() - .replace(this, "DeFramer", deFramer) - .addBefore("DeFramer", "validate", new ValidateFirstOutboundMessage(framer)); - - ctx.writeAndFlush(new OutboundMessage(null, HelloMessage.create(localNode.getPeerInfo()))) - .addListener( - ff -> { - if (ff.isSuccess()) { - LOG.trace("Successfully wrote hello message"); - } - }); - msg.retain(); - ctx.fireChannelRead(msg); } private void disconnect( diff --git a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/netty/DeFramer.java b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/netty/DeFramer.java index b39a6b82193..c7c600d3fd3 100644 --- a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/netty/DeFramer.java +++ b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/netty/DeFramer.java @@ -14,6 +14,8 @@ */ package org.hyperledger.besu.ethereum.p2p.rlpx.connections.netty; +import org.hyperledger.besu.ethereum.p2p.discovery.DiscoveryPeer; +import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerTable; import org.hyperledger.besu.ethereum.p2p.network.exceptions.BreachOfProtocolException; import org.hyperledger.besu.ethereum.p2p.network.exceptions.IncompatiblePeerException; import org.hyperledger.besu.ethereum.p2p.network.exceptions.PeerChannelClosedException; @@ -70,6 +72,7 @@ final class DeFramer extends ByteToMessageDecoder { private final Optional expectedPeer; private final List subProtocols; private final boolean inboundInitiated; + private final PeerTable peerTable; private boolean hellosExchanged; private final LabelledMetric outboundMessagesCounter; @@ -81,7 +84,8 @@ final class DeFramer extends ByteToMessageDecoder { final PeerConnectionEventDispatcher connectionEventDispatcher, final CompletableFuture connectFuture, final MetricsSystem metricsSystem, - final boolean inboundInitiated) { + final boolean inboundInitiated, + final PeerTable peerTable) { this.framer = framer; this.subProtocols = subProtocols; this.localNode = localNode; @@ -89,6 +93,7 @@ final class DeFramer extends ByteToMessageDecoder { this.connectFuture = connectFuture; this.connectionEventDispatcher = connectionEventDispatcher; this.inboundInitiated = inboundInitiated; + this.peerTable = peerTable; this.outboundMessagesCounter = metricsSystem.createLabelledCounter( BesuMetricCategory.NETWORK, @@ -105,8 +110,11 @@ protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final L while ((message = framer.deframe(in)) != null) { if (hellosExchanged) { + out.add(message); + } else if (message.getCode() == WireMessageCodes.HELLO) { + hellosExchanged = true; // Decode first hello and use the payload to modify pipeline final PeerInfo peerInfo; @@ -129,13 +137,27 @@ protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final L subProtocols, localNode.getPeerInfo().getCapabilities(), peerInfo.getCapabilities()); - final Optional peer = expectedPeer.or(() -> createPeer(peerInfo, ctx)); - if (peer.isEmpty()) { - LOG.debug("Failed to create connection for peer {}", peerInfo); - connectFuture.completeExceptionally(new PeerChannelClosedException(peerInfo)); - ctx.close(); - return; + + Optional peer; + if (expectedPeer.isPresent()) { + peer = expectedPeer; + } else { + // This is an inbound "Hello" message. Create peer from information from the Hello message + peer = createPeer(peerInfo, ctx); + if (peer.isEmpty()) { + LOG.debug("Failed to create connection for peer {}", peerInfo); + connectFuture.completeExceptionally(new PeerChannelClosedException(peerInfo)); + ctx.close(); + return; + } + // If we can find the DiscoveryPeer for the peer in the PeerTable we use it, because + // it could contains additional information, like the fork id. + final Optional discoveryPeer = peerTable.get(peer.get()); + if (discoveryPeer.isPresent()) { + peer = Optional.of(discoveryPeer.get()); + } } + final PeerConnection connection = new NettyPeerConnection( ctx, @@ -176,7 +198,9 @@ protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final L capabilityMultiplexer, connection, connectionEventDispatcher, waitingForPong), new MessageFramer(capabilityMultiplexer, framer)); connectFuture.complete(connection); + } else if (message.getCode() == WireMessageCodes.DISCONNECT) { + final DisconnectMessage disconnectMessage = DisconnectMessage.readFrom(message); LOG.debug( "Peer {} disconnected before sending HELLO. Reason: {}", @@ -185,8 +209,10 @@ protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final L ctx.close(); connectFuture.completeExceptionally( new PeerDisconnectedException(disconnectMessage.getReason())); + } else { // Unexpected message - disconnect + LOG.debug( "Message received before HELLO's exchanged (BREACH_OF_PROTOCOL), disconnecting. Peer: {}, Code: {}, Data: {}", expectedPeer.map(Peer::getEnodeURLString).orElse("unknown"), diff --git a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/netty/HandshakeHandlerInbound.java b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/netty/HandshakeHandlerInbound.java index 184cf5cf8c1..962de68f980 100644 --- a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/netty/HandshakeHandlerInbound.java +++ b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/netty/HandshakeHandlerInbound.java @@ -15,6 +15,7 @@ package org.hyperledger.besu.ethereum.p2p.rlpx.connections.netty; import org.hyperledger.besu.cryptoservices.NodeKey; +import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerTable; import org.hyperledger.besu.ethereum.p2p.peers.LocalNode; import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection; import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnectionEventDispatcher; @@ -40,7 +41,8 @@ public HandshakeHandlerInbound( final PeerConnectionEventDispatcher connectionEventDispatcher, final MetricsSystem metricsSystem, final HandshakerProvider handshakerProvider, - final FramerProvider framerProvider) { + final FramerProvider framerProvider, + final PeerTable peerTable) { super( subProtocols, localNode, @@ -50,7 +52,8 @@ public HandshakeHandlerInbound( metricsSystem, handshakerProvider, framerProvider, - true); + true, + peerTable); handshaker.prepareResponder(nodeKey); } diff --git a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/netty/HandshakeHandlerOutbound.java b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/netty/HandshakeHandlerOutbound.java index 205b6f655cc..46e600d74b6 100644 --- a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/netty/HandshakeHandlerOutbound.java +++ b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/netty/HandshakeHandlerOutbound.java @@ -16,6 +16,7 @@ import org.hyperledger.besu.crypto.SignatureAlgorithmFactory; import org.hyperledger.besu.cryptoservices.NodeKey; +import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerTable; import org.hyperledger.besu.ethereum.p2p.peers.LocalNode; import org.hyperledger.besu.ethereum.p2p.peers.Peer; import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection; @@ -50,7 +51,8 @@ public HandshakeHandlerOutbound( final PeerConnectionEventDispatcher connectionEventDispatcher, final MetricsSystem metricsSystem, final HandshakerProvider handshakerProvider, - final FramerProvider framerProvider) { + final FramerProvider framerProvider, + final PeerTable peerTable) { super( subProtocols, localNode, @@ -60,7 +62,8 @@ public HandshakeHandlerOutbound( metricsSystem, handshakerProvider, framerProvider, - false); + false, + peerTable); handshaker.prepareInitiator( nodeKey, SignatureAlgorithmFactory.getInstance().createPublicKey(peer.getId())); this.first = handshaker.firstMessage(); diff --git a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/netty/NettyConnectionInitializer.java b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/netty/NettyConnectionInitializer.java index c20e511df9d..f386c59a384 100644 --- a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/netty/NettyConnectionInitializer.java +++ b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/netty/NettyConnectionInitializer.java @@ -17,6 +17,7 @@ import org.hyperledger.besu.cryptoservices.NodeKey; import org.hyperledger.besu.ethereum.p2p.config.RlpxConfiguration; import org.hyperledger.besu.ethereum.p2p.discovery.DiscoveryPeer; +import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerTable; import org.hyperledger.besu.ethereum.p2p.peers.LocalNode; import org.hyperledger.besu.ethereum.p2p.peers.Peer; import org.hyperledger.besu.ethereum.p2p.rlpx.ConnectCallback; @@ -68,6 +69,7 @@ public class NettyConnectionInitializer private final PeerConnectionEventDispatcher eventDispatcher; private final MetricsSystem metricsSystem; private final Subscribers connectSubscribers = Subscribers.create(); + private final PeerTable peerTable; private ChannelFuture server; private final EventLoopGroup boss = new NioEventLoopGroup(1); @@ -80,12 +82,14 @@ public NettyConnectionInitializer( final RlpxConfiguration config, final LocalNode localNode, final PeerConnectionEventDispatcher eventDispatcher, - final MetricsSystem metricsSystem) { + final MetricsSystem metricsSystem, + final PeerTable peerTable) { this.nodeKey = nodeKey; this.config = config; this.localNode = localNode; this.eventDispatcher = eventDispatcher; this.metricsSystem = metricsSystem; + this.peerTable = peerTable; metricsSystem.createIntegerGauge( BesuMetricCategory.NETWORK, @@ -244,7 +248,8 @@ private HandshakeHandlerInbound inboundHandler( eventDispatcher, metricsSystem, this, - this); + this, + peerTable); } @Nonnull @@ -259,7 +264,8 @@ private HandshakeHandlerOutbound outboundHandler( eventDispatcher, metricsSystem, this, - this); + this, + peerTable); } @Nonnull diff --git a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/netty/NettyTLSConnectionInitializer.java b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/netty/NettyTLSConnectionInitializer.java index 4e6010771fc..db41c1574c7 100644 --- a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/netty/NettyTLSConnectionInitializer.java +++ b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/netty/NettyTLSConnectionInitializer.java @@ -19,6 +19,7 @@ import org.hyperledger.besu.cryptoservices.NodeKey; import org.hyperledger.besu.ethereum.p2p.config.RlpxConfiguration; +import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerTable; import org.hyperledger.besu.ethereum.p2p.peers.LocalNode; import org.hyperledger.besu.ethereum.p2p.peers.Peer; import org.hyperledger.besu.ethereum.p2p.plain.PlainFramer; @@ -55,7 +56,8 @@ public NettyTLSConnectionInitializer( final LocalNode localNode, final PeerConnectionEventDispatcher eventDispatcher, final MetricsSystem metricsSystem, - final TLSConfiguration p2pTLSConfiguration) { + final TLSConfiguration p2pTLSConfiguration, + final PeerTable peerTable) { this( nodeKey, config, @@ -63,7 +65,8 @@ public NettyTLSConnectionInitializer( eventDispatcher, metricsSystem, defaultTlsContextFactorySupplier(p2pTLSConfiguration), - p2pTLSConfiguration.getClientHelloSniHeaderEnabled()); + p2pTLSConfiguration.getClientHelloSniHeaderEnabled(), + peerTable); } @VisibleForTesting @@ -74,8 +77,9 @@ public NettyTLSConnectionInitializer( final PeerConnectionEventDispatcher eventDispatcher, final MetricsSystem metricsSystem, final Supplier tlsContextFactorySupplier, - final Boolean clientHelloSniHeaderEnabled) { - super(nodeKey, config, localNode, eventDispatcher, metricsSystem); + final Boolean clientHelloSniHeaderEnabled, + final PeerTable peerTable) { + super(nodeKey, config, localNode, eventDispatcher, metricsSystem, peerTable); if (tlsContextFactorySupplier != null) { this.tlsContextFactorySupplier = Optional.of(Suppliers.memoize(tlsContextFactorySupplier::get)); diff --git a/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryTestHelper.java b/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryTestHelper.java index 464273243ad..ffc9fb1c300 100644 --- a/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryTestHelper.java +++ b/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryTestHelper.java @@ -295,6 +295,7 @@ public MockPeerDiscoveryAgent build() { config.setAdvertisedHost(advertisedHost); config.setBindPort(port); config.setActive(active); + config.setFilterOnEnrForkId(false); final ForkIdManager mockForkIdManager = mock(ForkIdManager.class); final ForkId forkId = new ForkId(Bytes.EMPTY, Bytes.EMPTY); diff --git a/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/MockPeerDiscoveryAgent.java b/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/MockPeerDiscoveryAgent.java index ea6e1593b83..88196f18b5d 100644 --- a/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/MockPeerDiscoveryAgent.java +++ b/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/MockPeerDiscoveryAgent.java @@ -63,7 +63,8 @@ public MockPeerDiscoveryAgent( new NoOpMetricsSystem(), new InMemoryKeyValueStorageProvider(), forkIdManager, - rlpxAgent); + rlpxAgent, + new PeerTable(nodeKey.getPublicKey().getEncodedBytes())); this.agentNetwork = agentNetwork; } diff --git a/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerDiscoveryControllerTest.java b/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerDiscoveryControllerTest.java index 182366ac03a..ca0d7c6430d 100644 --- a/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerDiscoveryControllerTest.java +++ b/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerDiscoveryControllerTest.java @@ -35,8 +35,6 @@ import org.hyperledger.besu.crypto.SignatureAlgorithm; import org.hyperledger.besu.crypto.SignatureAlgorithmFactory; import org.hyperledger.besu.cryptoservices.NodeKey; -import org.hyperledger.besu.ethereum.forkid.ForkId; -import org.hyperledger.besu.ethereum.forkid.ForkIdManager; import org.hyperledger.besu.ethereum.p2p.discovery.DiscoveryPeer; import org.hyperledger.besu.ethereum.p2p.discovery.Endpoint; import org.hyperledger.besu.ethereum.p2p.discovery.PeerDiscoveryStatus; @@ -1480,14 +1478,12 @@ public long read() { } @Test - public void shouldFiltersOnForkIdSuccess() { + public void forkIdShouldBeAvailableIfEnrPacketContainsForkId() { final List nodeKeys = PeerDiscoveryTestHelper.generateNodeKeys(1); final List peers = helper.createDiscoveryPeers(nodeKeys); - final ForkIdManager forkIdManager = mock(ForkIdManager.class); final DiscoveryPeer sender = peers.get(0); - final Packet enrPacket = prepareForForkIdCheck(forkIdManager, nodeKeys, sender, true); + final Packet enrPacket = prepareForForkIdCheck(nodeKeys, sender, true); - when(forkIdManager.peerCheck(any(ForkId.class))).thenReturn(true); controller.onMessage(enrPacket, sender); final Optional maybePeer = @@ -1501,35 +1497,12 @@ public void shouldFiltersOnForkIdSuccess() { verify(controller, times(1)).connectOnRlpxLayer(eq(maybePeer.get())); } - @Test - public void shouldFiltersOnForkIdFailure() { - final List nodeKeys = PeerDiscoveryTestHelper.generateNodeKeys(1); - final List peers = helper.createDiscoveryPeers(nodeKeys); - final ForkIdManager forkIdManager = mock(ForkIdManager.class); - final DiscoveryPeer sender = peers.get(0); - final Packet enrPacket = prepareForForkIdCheck(forkIdManager, nodeKeys, sender, true); - - when(forkIdManager.peerCheck(any(ForkId.class))).thenReturn(false); - controller.onMessage(enrPacket, sender); - - final Optional maybePeer = - controller - .streamDiscoveredPeers() - .filter(p -> p.getId().equals(sender.getId())) - .findFirst(); - - assertThat(maybePeer.isPresent()).isTrue(); - assertThat(maybePeer.get().getForkId().isPresent()).isTrue(); - verify(controller, never()).connectOnRlpxLayer(eq(maybePeer.get())); - } - @Test public void shouldStillCallConnectIfNoForkIdSent() { final List nodeKeys = PeerDiscoveryTestHelper.generateNodeKeys(1); final List peers = helper.createDiscoveryPeers(nodeKeys); final DiscoveryPeer sender = peers.get(0); - final Packet enrPacket = - prepareForForkIdCheck(mock(ForkIdManager.class), nodeKeys, sender, false); + final Packet enrPacket = prepareForForkIdCheck(nodeKeys, sender, false); controller.onMessage(enrPacket, sender); @@ -1546,10 +1519,7 @@ public void shouldStillCallConnectIfNoForkIdSent() { @NotNull private Packet prepareForForkIdCheck( - final ForkIdManager forkIdManager, - final List nodeKeys, - final DiscoveryPeer sender, - final boolean sendForkId) { + final List nodeKeys, final DiscoveryPeer sender, final boolean sendForkId) { final HashMap packetTypeBytesHashMap = new HashMap<>(); final OutboundMessageHandler outboundMessageHandler = (dp, pa) -> packetTypeBytesHashMap.put(pa.getType(), pa.getHash()); @@ -1573,7 +1543,6 @@ public long read() { .outboundMessageHandler(outboundMessageHandler) .enrCache(enrs) .filterOnForkId(true) - .forkIdManager(forkIdManager) .build(); // Mock the creation of the PING packet, so that we can control the hash, which gets validated @@ -1720,7 +1689,6 @@ static class ControllerBuilder { private Cache enrs = CacheBuilder.newBuilder().maximumSize(50).expireAfterWrite(10, TimeUnit.SECONDS).build(); private boolean filterOnForkId = false; - private ForkIdManager forkIdManager; public static ControllerBuilder create() { return new ControllerBuilder(); @@ -1776,11 +1744,6 @@ public ControllerBuilder filterOnForkId(final boolean filterOnForkId) { return this; } - public ControllerBuilder forkIdManager(final ForkIdManager forkIdManager) { - this.forkIdManager = forkIdManager; - return this; - } - PeerDiscoveryController build() { checkNotNull(nodeKey); if (localPeer == null) { @@ -1803,7 +1766,6 @@ PeerDiscoveryController build() { .peerPermissions(peerPermissions) .metricsSystem(new NoOpMetricsSystem()) .cacheForEnrRequests(enrs) - .forkIdManager(forkIdManager == null ? mock(ForkIdManager.class) : forkIdManager) .filterOnEnrForkId(filterOnForkId) .rlpxAgent(mock(RlpxAgent.class)) .build()); diff --git a/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerDiscoveryTableRefreshTest.java b/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerDiscoveryTableRefreshTest.java index 949b318906b..6320c909622 100644 --- a/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerDiscoveryTableRefreshTest.java +++ b/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerDiscoveryTableRefreshTest.java @@ -24,7 +24,6 @@ import static org.mockito.Mockito.verify; import org.hyperledger.besu.cryptoservices.NodeKey; -import org.hyperledger.besu.ethereum.forkid.ForkIdManager; import org.hyperledger.besu.ethereum.p2p.discovery.DiscoveryPeer; import org.hyperledger.besu.ethereum.p2p.discovery.PeerDiscoveryStatus; import org.hyperledger.besu.ethereum.p2p.discovery.PeerDiscoveryTestHelper; @@ -72,7 +71,6 @@ public void tableRefreshSingleNode() { .tableRefreshIntervalMs(0) .metricsSystem(new NoOpMetricsSystem()) .rlpxAgent(mock(RlpxAgent.class)) - .forkIdManager(mock(ForkIdManager.class)) .build()); controller.start(); diff --git a/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerTableTest.java b/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerTableTest.java index c0909a9b8b4..dff9d231658 100644 --- a/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerTableTest.java +++ b/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerTableTest.java @@ -43,7 +43,7 @@ public class PeerTableTest { @Test public void addPeer() { - final PeerTable table = new PeerTable(Peer.randomId(), 16); + final PeerTable table = new PeerTable(Peer.randomId()); final List peers = helper.createDiscoveryPeers(5); for (final DiscoveryPeer peer : peers) { @@ -63,7 +63,7 @@ public void addSelf() { .ipAddress("127.0.0.1") .discoveryAndListeningPorts(12345) .build()); - final PeerTable table = new PeerTable(localPeer.getId(), 16); + final PeerTable table = new PeerTable(localPeer.getId()); final PeerTable.AddResult result = table.tryAdd(localPeer); assertThat(result.getOutcome()).isEqualTo(AddOutcome.SELF); @@ -72,7 +72,7 @@ public void addSelf() { @Test public void peerExists() { - final PeerTable table = new PeerTable(Peer.randomId(), 16); + final PeerTable table = new PeerTable(Peer.randomId()); final DiscoveryPeer peer = helper.createDiscoveryPeer(); assertThat(table.tryAdd(peer).getOutcome()).isEqualTo(AddOutcome.ADDED); @@ -87,7 +87,7 @@ public void peerExists() { @Test public void peerExists_withDifferentIp() { - final PeerTable table = new PeerTable(Peer.randomId(), 16); + final PeerTable table = new PeerTable(Peer.randomId()); final Bytes peerId = SIGNATURE_ALGORITHM.get().generateKeyPair().getPublicKey().getEncodedBytes(); final DiscoveryPeer peer = @@ -107,7 +107,7 @@ public void peerExists_withDifferentIp() { @Test public void peerExists_withDifferentUdpPort() { - final PeerTable table = new PeerTable(Peer.randomId(), 16); + final PeerTable table = new PeerTable(Peer.randomId()); final Bytes peerId = SIGNATURE_ALGORITHM.get().generateKeyPair().getPublicKey().getEncodedBytes(); final DiscoveryPeer peer = @@ -127,7 +127,7 @@ public void peerExists_withDifferentUdpPort() { @Test public void peerExists_withDifferentIdAndUdpPort() { - final PeerTable table = new PeerTable(Peer.randomId(), 16); + final PeerTable table = new PeerTable(Peer.randomId()); final Bytes peerId = SIGNATURE_ALGORITHM.get().generateKeyPair().getPublicKey().getEncodedBytes(); final DiscoveryPeer peer = @@ -147,7 +147,7 @@ public void peerExists_withDifferentIdAndUdpPort() { @Test public void evictExistingPeerShouldEvict() { - final PeerTable table = new PeerTable(Peer.randomId(), 16); + final PeerTable table = new PeerTable(Peer.randomId()); final DiscoveryPeer peer = helper.createDiscoveryPeer(); table.tryAdd(peer); @@ -158,7 +158,7 @@ public void evictExistingPeerShouldEvict() { @Test public void evictPeerFromEmptyTableShouldNotEvict() { - final PeerTable table = new PeerTable(Peer.randomId(), 16); + final PeerTable table = new PeerTable(Peer.randomId()); final DiscoveryPeer peer = helper.createDiscoveryPeer(); final EvictResult evictResult = table.tryEvict(peer); @@ -167,7 +167,7 @@ public void evictPeerFromEmptyTableShouldNotEvict() { @Test public void evictAbsentPeerShouldNotEvict() { - final PeerTable table = new PeerTable(Peer.randomId(), 16); + final PeerTable table = new PeerTable(Peer.randomId()); final DiscoveryPeer peer = helper.createDiscoveryPeer(); final List otherPeers = helper.createDiscoveryPeers(5); otherPeers.forEach(table::tryAdd); @@ -179,7 +179,7 @@ public void evictAbsentPeerShouldNotEvict() { @Test public void evictSelfPeerShouldReturnSelfOutcome() { final DiscoveryPeer peer = helper.createDiscoveryPeer(); - final PeerTable table = new PeerTable(peer.getId(), 16); + final PeerTable table = new PeerTable(peer.getId()); final EvictResult evictResult = table.tryEvict(peer); assertThat(evictResult.getOutcome()).isEqualTo(EvictOutcome.SELF); diff --git a/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/RecursivePeerRefreshStateTest.java b/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/RecursivePeerRefreshStateTest.java index e79abb883dd..5d26f8cc6e9 100644 --- a/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/RecursivePeerRefreshStateTest.java +++ b/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/RecursivePeerRefreshStateTest.java @@ -57,7 +57,7 @@ public class RecursivePeerRefreshStateTest { neighborFinder, timerUtil, localPeer, - new PeerTable(createId(999), 16), + new PeerTable(createId(999)), peerPermissions, 5, 100); @@ -180,7 +180,7 @@ public void shouldStopWhenMaximumNumberOfRoundsReached() { neighborFinder, timerUtil, localPeer, - new PeerTable(createId(999), 16), + new PeerTable(createId(999)), peerPermissions, 5, 1); @@ -466,7 +466,7 @@ public void shouldNotBondWithNonPermittedNode() { neighborFinder, timerUtil, localPeer, - new PeerTable(createId(999), 16), + new PeerTable(createId(999)), peerPermissions, 5, 100); diff --git a/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/network/DefaultP2PNetworkTest.java b/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/network/DefaultP2PNetworkTest.java index a54c9038b8e..68a4f76b582 100644 --- a/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/network/DefaultP2PNetworkTest.java +++ b/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/network/DefaultP2PNetworkTest.java @@ -55,7 +55,6 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.stream.Collectors; import java.util.stream.Stream; import io.vertx.core.Context; @@ -82,7 +81,7 @@ public final class DefaultP2PNetworkTest { @Mock PeerDiscoveryAgent discoveryAgent; @Mock RlpxAgent rlpxAgent; - @Captor private ArgumentCaptor> peerStreamCaptor; + @Captor private ArgumentCaptor peerCaptor; private final NetworkingConfiguration config = NetworkingConfiguration.create() @@ -276,12 +275,9 @@ public void attemptPeerConnections_bondedPeers() { final DefaultP2PNetwork network = network(); network.attemptPeerConnections(); - verify(rlpxAgent, times(1)).connect(peerStreamCaptor.capture()); + verify(rlpxAgent, times(1)).connect(peerCaptor.capture()); - final List capturedPeers = - peerStreamCaptor.getValue().collect(Collectors.toList()); - assertThat(capturedPeers.contains(discoPeer)).isTrue(); - assertThat(capturedPeers.size()).isEqualTo(1); + assertThat(peerCaptor.getValue()).isEqualTo(discoPeer); } @Test @@ -293,12 +289,7 @@ public void attemptPeerConnections_unbondedPeers() { final DefaultP2PNetwork network = network(); network.attemptPeerConnections(); - verify(rlpxAgent, times(1)).connect(peerStreamCaptor.capture()); - - final List capturedPeers = - peerStreamCaptor.getValue().collect(Collectors.toList()); - assertThat(capturedPeers.contains(discoPeer)).isFalse(); - assertThat(capturedPeers.size()).isEqualTo(0); + verify(rlpxAgent, times(0)).connect(any()); } @Test @@ -314,14 +305,7 @@ public void attemptPeerConnections_sortsPeersByLastContacted() { final DefaultP2PNetwork network = network(); network.attemptPeerConnections(); - verify(rlpxAgent, times(1)).connect(peerStreamCaptor.capture()); - - final List capturedPeers = - peerStreamCaptor.getValue().collect(Collectors.toList()); - assertThat(capturedPeers.size()).isEqualTo(3); - assertThat(capturedPeers.get(0)).isEqualTo(discoPeers.get(1)); - assertThat(capturedPeers.get(1)).isEqualTo(discoPeers.get(0)); - assertThat(capturedPeers.get(2)).isEqualTo(discoPeers.get(2)); + verify(rlpxAgent, times(3)).connect(any()); } @Test diff --git a/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/rlpx/RlpxAgentTest.java b/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/rlpx/RlpxAgentTest.java index d2fdf8c76a6..c639579eee1 100644 --- a/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/rlpx/RlpxAgentTest.java +++ b/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/rlpx/RlpxAgentTest.java @@ -296,7 +296,7 @@ public void connect_largeStreamOfPeers() { Stream.generate(PeerTestHelper::createPeer).limit(peerNo); agent = spy(agent); - agent.connect(peerStream); + peerStream.forEach(agent::connect); assertThat(agent.getMapOfCompletableFutures().size()).isEqualTo(peerNo); verify(agent, times(peerNo)).connect(any(Peer.class)); diff --git a/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/netty/DeFramerTest.java b/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/netty/DeFramerTest.java index f5030fd9ad6..952ebf7c2d5 100644 --- a/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/netty/DeFramerTest.java +++ b/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/netty/DeFramerTest.java @@ -24,6 +24,9 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import org.hyperledger.besu.ethereum.forkid.ForkId; +import org.hyperledger.besu.ethereum.p2p.discovery.DiscoveryPeer; +import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerTable; import org.hyperledger.besu.ethereum.p2p.network.exceptions.BreachOfProtocolException; import org.hyperledger.besu.ethereum.p2p.network.exceptions.IncompatiblePeerException; import org.hyperledger.besu.ethereum.p2p.network.exceptions.PeerChannelClosedException; @@ -104,7 +107,7 @@ public class DeFramerTest { private final LocalNode localNode = LocalNode.create(clientId, p2pVersion, capabilities, localEnode); - private final DeFramer deFramer = createDeFramer(null); + private final DeFramer deFramer = createDeFramer(null, Optional.empty()); @BeforeEach @SuppressWarnings("unchecked") @@ -219,7 +222,7 @@ public void decode_handlesHelloFromPeerWithAdvertisedPortOf0() final Peer peer = createRemotePeer(); final PeerInfo remotePeerInfo = new PeerInfo(p2pVersion, clientId, capabilities, 0, peer.getId()); - final DeFramer deFramer = createDeFramer(null); + final DeFramer deFramer = createDeFramer(null, Optional.empty()); final HelloMessage helloMessage = HelloMessage.create(remotePeerInfo); final ByteBuf data = Unpooled.wrappedBuffer(helloMessage.getData().toArray()); @@ -260,6 +263,39 @@ public void decode_handlesHelloFromPeerWithAdvertisedPortOf0() assertThat(out.size()).isEqualTo(1); } + @Test + public void decode_duringHandshakeFindsPeerInPeerTable() + throws ExecutionException, InterruptedException { + final ChannelFuture future = NettyMocks.channelFuture(false); + when(channel.closeFuture()).thenReturn(future); + + final Peer peer = createRemotePeer(); + final PeerInfo remotePeerInfo = + new PeerInfo(p2pVersion, clientId, capabilities, 0, peer.getId()); + + final HelloMessage helloMessage = HelloMessage.create(remotePeerInfo); + final Bytes nodeId = helloMessage.getPeerInfo().getNodeId(); + final String enodeURLString = + "enode://" + nodeId.toString().substring(2) + "@" + "12.13.14.15:30303?discport=30301"; + final Optional discoveryPeer = + DiscoveryPeer.from(DefaultPeer.fromURI(enodeURLString)); + final ForkId forkId = new ForkId(Bytes.fromHexString("0x190a55ad"), 4L); + discoveryPeer.orElseThrow().setForkId(forkId); + final DeFramer deFramer = createDeFramer(null, discoveryPeer); + final ByteBuf data = Unpooled.wrappedBuffer(helloMessage.getData().toArray()); + when(framer.deframe(eq(data))) + .thenReturn(new RawMessage(helloMessage.getCode(), helloMessage.getData())) + .thenReturn(null); + final List out = new ArrayList<>(); + deFramer.decode(ctx, data, out); + + assertThat(connectFuture).isDone(); + assertThat(connectFuture).isNotCompletedExceptionally(); + final PeerConnection peerConnection = connectFuture.get(); + assertThat(peerConnection.getPeerInfo()).isEqualTo(remotePeerInfo); + assertThat(peerConnection.getPeer().getForkId().orElseThrow()).isEqualTo(forkId); + } + @Test public void decode_handlesUnexpectedPeerId() { final ChannelFuture future = NettyMocks.channelFuture(false); @@ -274,7 +310,7 @@ public void decode_handlesUnexpectedPeerId() { capabilities, peer.getEnodeURL().getListeningPortOrZero(), mismatchedId); - final DeFramer deFramer = createDeFramer(peer); + final DeFramer deFramer = createDeFramer(peer, Optional.empty()); final HelloMessage helloMessage = HelloMessage.create(remotePeerInfo); final ByteBuf data = Unpooled.wrappedBuffer(helloMessage.getData().toArray()); @@ -414,7 +450,10 @@ private PeerInfo createPeerInfo(final Peer forPeer) { forPeer.getId()); } - private DeFramer createDeFramer(final Peer expectedPeer) { + private DeFramer createDeFramer( + final Peer expectedPeer, final Optional peerInPeerTable) { + final PeerTable peerTable = new PeerTable(localNode.getPeerInfo().getNodeId()); + peerInPeerTable.ifPresent(peerTable::tryAdd); return new DeFramer( framer, Arrays.asList(MockSubProtocol.create("eth")), @@ -423,6 +462,7 @@ private DeFramer createDeFramer(final Peer expectedPeer) { connectionEventDispatcher, connectFuture, new NoOpMetricsSystem(), - true); + true, + peerTable); } } diff --git a/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/netty/NettyTLSConnectionInitializerTest.java b/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/netty/NettyTLSConnectionInitializerTest.java index d4637952b63..f7f3dbc5124 100644 --- a/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/netty/NettyTLSConnectionInitializerTest.java +++ b/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/netty/NettyTLSConnectionInitializerTest.java @@ -25,6 +25,7 @@ import org.hyperledger.besu.cryptoservices.NodeKey; import org.hyperledger.besu.ethereum.p2p.config.RlpxConfiguration; +import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerTable; import org.hyperledger.besu.ethereum.p2p.peers.LocalNode; import org.hyperledger.besu.ethereum.p2p.peers.Peer; import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnectionEventDispatcher; @@ -44,6 +45,7 @@ import io.netty.handler.codec.compression.SnappyFrameEncoder; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslHandler; +import org.apache.tuweni.bytes.Bytes; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -95,7 +97,8 @@ private NettyTLSConnectionInitializer createNettyTLSConnectionInitializer( eventDispatcher, new NoOpMetricsSystem(), () -> tlsContextFactory, - clientHelloSniHeaderEnabled); + clientHelloSniHeaderEnabled, + new PeerTable(Bytes.random(64))); } @Test