Skip to content

Commit

Permalink
Make fork id the default and try to recover the DiscoveryPeer for inc…
Browse files Browse the repository at this point in the history
…oming connections from the PeerTable (#5628)

* make the request for the ENR the default and try to recover the DiscoveryPeer for incoming connections from the PeerTable

Signed-off-by: Stefan <[email protected]>
Signed-off-by: [email protected] <[email protected]>
  • Loading branch information
pinges authored Jan 22, 2024
1 parent cfea3ab commit 921bc17
Show file tree
Hide file tree
Showing 29 changed files with 308 additions and 272 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
- New `EXECUTION_HALTED` error returned if there is an error executing or simulating a transaction, with the reason for execution being halted. Replaces the generic `INTERNAL_ERROR` return code in certain cases which some applications may be checking for [#6343](https://github.com/hyperledger/besu/pull/6343)
- The Besu Docker images with `openjdk-latest` tags since 23.10.3 were incorrectly using UID 1001 instead of 1000 for the container's `besu` user. The user now uses 1000 again. Containers created from or migrated to images using UID 1001 will need to chown their persistent database files to UID 1000 [#6360](https://github.com/hyperledger/besu/pull/6360)
- The deprecated `--privacy-onchain-groups-enabled` option has now been removed. Use the `--privacy-flexible-groups-enabled` option instead. [#6411](https://github.com/hyperledger/besu/pull/6411)
- The time that can be spent selecting transactions during block creation is not capped at 5 seconds for PoS and PoW networks, and for PoA networks, at 75% of the block period specified in the genesis, this to prevent possible DoS in case a single transaction is taking too long to execute, and to have a stable block production rate, but it could be a breaking change if an existing network used to have transactions that takes more time to executed that the newly introduced limit, if it is mandatory for these network to keep processing these long processing transaction, then the default value of `block-txs-selection-max-time` or `poa-block-txs-selection-max-time` needs to be tuned accordingly.
- Requesting the Ethereum Node Record (ENR) to acquire the fork id from bonded peers is now enabled by default, so the following change has been made [#5628](https://github.com/hyperledger/besu/pull/5628):
- `--Xfilter-on-enr-fork-id` has been removed. To disable the feature use `--filter-on-enr-fork-id=false`.
- The time that can be spent selecting transactions during block creation is not capped at 5 seconds for PoS and PoW networks, and for PoA networks, at 75% of the block period specified in the genesis, this to prevent possible DoS in case a single transaction is taking too long to execute, and to have a stable block production rate, but it could be a breaking change if an existing network used to have transactions that takes more time to executed that the newly introduced limit, if it is mandatory for these network to keep processing these long processing transaction, then the default value of `block-txs-selection-max-time` or `poa-block-txs-selection-max-time` needs to be tuned accordingly.

### Deprecations

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class NetworkingOptions implements CLIOptions<NetworkingConfiguration> {
private final String DNS_DISCOVERY_SERVER_OVERRIDE_FLAG = "--Xp2p-dns-discovery-server";
private final String DISCOVERY_PROTOCOL_V5_ENABLED = "--Xv5-discovery-enabled";
/** The constant FILTER_ON_ENR_FORK_ID. */
public static final String FILTER_ON_ENR_FORK_ID = "--Xfilter-on-enr-fork-id";
public static final String FILTER_ON_ENR_FORK_ID = "--filter-on-enr-fork-id";

@CommandLine.Option(
names = INITIATE_CONNECTIONS_FREQUENCY_FLAG,
Expand Down Expand Up @@ -76,9 +76,9 @@ public class NetworkingOptions implements CLIOptions<NetworkingConfiguration> {
@CommandLine.Option(
names = FILTER_ON_ENR_FORK_ID,
hidden = true,
defaultValue = "false",
defaultValue = "true",
description = "Whether to enable filtering of peers based on the ENR field ForkId)")
private final Boolean filterOnEnrForkId = false;
private final Boolean filterOnEnrForkId = NetworkingConfiguration.DEFAULT_FILTER_ON_ENR_FORK_ID;

@CommandLine.Option(
hidden = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public void checkFilterByForkIdNotSet() {

final NetworkingOptions options = cmd.getNetworkingOptions();
final NetworkingConfiguration networkingConfig = options.toDomainObject();
assertThat(networkingConfig.getDiscovery().isFilterOnEnrForkIdEnabled()).isEqualTo(false);
assertThat(networkingConfig.getDiscovery().isFilterOnEnrForkIdEnabled()).isEqualTo(true);

assertThat(commandErrorOutput.toString(UTF_8)).isEmpty();
assertThat(commandOutput.toString(UTF_8)).isEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ public EthPeers(
"peer_limit",
"The maximum number of peers this node allows to connect",
() -> peerUpperBound);

connectedPeersCounter =
metricsSystem.createCounter(
BesuMetricCategory.PEERS, "connected_total", "Total number of peers connected");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public EthProtocolManager(

this.blockBroadcaster = new BlockBroadcaster(ethContext);

supportedCapabilities =
this.supportedCapabilities =
calculateCapabilities(synchronizerConfiguration, ethereumWireProtocolConfiguration);

// Run validators
Expand Down Expand Up @@ -252,19 +252,25 @@ public List<Capability> getSupportedCapabilities() {
@Override
public void stop() {
if (stopped.compareAndSet(false, true)) {
LOG.info("Stopping {} Subprotocol.", getSupportedProtocol());
LOG.atInfo().setMessage("Stopping {} Subprotocol.").addArgument(getSupportedProtocol()).log();
scheduler.stop();
shutdown.countDown();
} else {
LOG.error("Attempted to stop already stopped {} Subprotocol.", getSupportedProtocol());
LOG.atInfo()
.setMessage("Attempted to stop already stopped {} Subprotocol.")
.addArgument(this::getSupportedProtocol)
.log();
}
}

@Override
public void awaitStop() throws InterruptedException {
shutdown.await();
scheduler.awaitStop();
LOG.info("{} Subprotocol stopped.", getSupportedProtocol());
LOG.atInfo()
.setMessage("{} Subprotocol stopped.")
.addArgument(this::getSupportedProtocol)
.log();
}

@Override
Expand All @@ -277,8 +283,10 @@ public void processMessage(final Capability cap, final Message message) {
EthProtocolLogger.logProcessMessage(cap, code);
final EthPeer ethPeer = ethPeers.peer(message.getConnection());
if (ethPeer == null) {
LOG.debug(
"Ignoring message received from unknown peer connection: {}", message.getConnection());
LOG.atDebug()
.setMessage("Ignoring message received from unknown peer connection: {}")
.addArgument(message::getConnection)
.log();
return;
}

Expand All @@ -288,19 +296,24 @@ public void processMessage(final Capability cap, final Message message) {
return;
} else if (!ethPeer.statusHasBeenReceived()) {
// Peers are required to send status messages before any other message type
LOG.debug(
"{} requires a Status ({}) message to be sent first. Instead, received message {} (BREACH_OF_PROTOCOL). Disconnecting from {}.",
this.getClass().getSimpleName(),
EthPV62.STATUS,
code,
ethPeer);
LOG.atDebug()
.setMessage(
"{} requires a Status ({}) message to be sent first. Instead, received message {} (BREACH_OF_PROTOCOL). Disconnecting from {}.")
.addArgument(() -> this.getClass().getSimpleName())
.addArgument(EthPV62.STATUS)
.addArgument(code)
.addArgument(ethPeer::toString)
.log();
ethPeer.disconnect(DisconnectReason.BREACH_OF_PROTOCOL);
return;
}

if (this.mergePeerFilter.isPresent()) {
if (this.mergePeerFilter.get().disconnectIfGossipingBlocks(message, ethPeer)) {
LOG.debug("Post-merge disconnect: peer still gossiping blocks {}", ethPeer);
LOG.atDebug()
.setMessage("Post-merge disconnect: peer still gossiping blocks {}")
.addArgument(ethPeer::toString)
.log();
handleDisconnect(ethPeer.getConnection(), DisconnectReason.SUBPROTOCOL_TRIGGERED, false);
return;
}
Expand Down Expand Up @@ -333,11 +346,12 @@ public void processMessage(final Capability cap, final Message message) {
maybeResponseData = ethMessages.dispatch(ethMessage);
}
} catch (final RLPException e) {
LOG.debug(
"Received malformed message {} (BREACH_OF_PROTOCOL), disconnecting: {}",
messageData.getData(),
ethPeer,
e);
LOG.atDebug()
.setMessage("Received malformed message {} (BREACH_OF_PROTOCOL), disconnecting: {}, {}")
.addArgument(messageData::getData)
.addArgument(ethPeer::toString)
.addArgument(e::toString)
.log();

ethPeer.disconnect(DisconnectMessage.DisconnectReason.BREACH_OF_PROTOCOL);
}
Expand Down Expand Up @@ -368,23 +382,31 @@ public void handleNewConnection(final PeerConnection connection) {
genesisHash,
latestForkId);
try {
LOG.trace("Sending status message to {} for connection {}.", peer.getId(), connection);
LOG.atTrace()
.setMessage("Sending status message to {} for connection {}.")
.addArgument(peer::getId)
.addArgument(connection::toString)
.log();
peer.send(status, getSupportedProtocol(), connection);
peer.registerStatusSent(connection);
} catch (final PeerNotConnected peerNotConnected) {
// Nothing to do.
}
LOG.trace("{}", ethPeers);
LOG.atTrace().setMessage("{}").addArgument(ethPeers::toString).log();
}

@Override
public boolean shouldConnect(final Peer peer, final boolean incoming) {
if (peer.getForkId().map(forkId -> forkIdManager.peerCheck(forkId)).orElse(true)) {
LOG.trace("ForkId OK or not available");
if (peer.getForkId().map(forkIdManager::peerCheck).orElse(true)) {
LOG.atDebug()
.setMessage("ForkId OK or not available for peer {}")
.addArgument(peer::getId)
.log();
if (ethPeers.shouldConnect(peer, incoming)) {
return true;
}
}
LOG.atDebug().setMessage("ForkId check failed for peer {}").addArgument(peer::getId).log();
return false;
}

Expand All @@ -397,11 +419,11 @@ public void handleDisconnect(
LOG.atDebug()
.setMessage("Disconnect - {} - {} - {}... - {} peers left")
.addArgument(initiatedByPeer ? "Inbound" : "Outbound")
.addArgument(reason)
.addArgument(connection.getPeer().getId().slice(0, 8))
.addArgument(ethPeers.peerCount())
.addArgument(reason::toString)
.addArgument(() -> connection.getPeer().getId().slice(0, 8))
.addArgument(ethPeers::peerCount)
.log();
LOG.trace("{}", ethPeers);
LOG.atTrace().setMessage("{}").addArgument(ethPeers::toString).log();
}
}

Expand All @@ -412,43 +434,41 @@ private void handleStatusMessage(final EthPeer peer, final Message message) {
try {
if (!status.networkId().equals(networkId)) {
LOG.atDebug()
.setMessage("Mismatched network id: {}, EthPeer {}...")
.addArgument(status.networkId())
.addArgument(peer.getShortNodeId())
.log();
LOG.atTrace()
.setMessage("Mismatched network id: {}, EthPeer {}")
.addArgument(status.networkId())
.addArgument(peer)
.setMessage("Mismatched network id: {}, peer {}")
.addArgument(status::networkId)
.addArgument(() -> getPeerOrPeerId(peer))
.log();
peer.disconnect(DisconnectReason.SUBPROTOCOL_TRIGGERED);
} else if (!forkIdManager.peerCheck(forkId) && status.protocolVersion() > 63) {
LOG.debug(
"{} has matching network id ({}), but non-matching fork id: {}",
peer,
networkId,
forkId);
LOG.atDebug()
.setMessage("{} has matching network id ({}), but non-matching fork id: {}")
.addArgument(() -> getPeerOrPeerId(peer))
.addArgument(networkId::toString)
.addArgument(forkId)
.log();
peer.disconnect(DisconnectReason.SUBPROTOCOL_TRIGGERED);
} else if (forkIdManager.peerCheck(status.genesisHash())) {
LOG.debug(
"{} has matching network id ({}), but non-matching genesis hash: {}",
peer,
networkId,
status.genesisHash());
LOG.atDebug()
.setMessage("{} has matching network id ({}), but non-matching genesis hash: {}")
.addArgument(() -> getPeerOrPeerId(peer))
.addArgument(networkId::toString)
.addArgument(status::genesisHash)
.log();
peer.disconnect(DisconnectReason.SUBPROTOCOL_TRIGGERED);
} else if (mergePeerFilter.isPresent()
&& mergePeerFilter.get().disconnectIfPoW(status, peer)) {
LOG.atDebug()
.setMessage("Post-merge disconnect: peer still PoW {}")
.addArgument(peer.getShortNodeId())
.addArgument(() -> getPeerOrPeerId(peer))
.log();
handleDisconnect(peer.getConnection(), DisconnectReason.SUBPROTOCOL_TRIGGERED, false);
} else {
LOG.debug(
"Received status message from {}: {} with connection {}",
peer,
status,
message.getConnection());
LOG.atDebug()
.setMessage("Received status message from {}: {} with connection {}")
.addArgument(peer::toString)
.addArgument(status::toString)
.addArgument(message::getConnection)
.log();
peer.registerStatusReceived(
status.bestHash(),
status.totalDifficulty(),
Expand All @@ -467,6 +487,10 @@ private void handleStatusMessage(final EthPeer peer, final Message message) {
}
}

private Object getPeerOrPeerId(final EthPeer peer) {
return LOG.isTraceEnabled() ? peer : peer.getShortNodeId();
}

@Override
public void blockMined(final Block block) {
// This assumes the block has already been included in the chain
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class DiscoveryConfiguration {
private List<EnodeURL> bootnodes = new ArrayList<>();
private String dnsDiscoveryURL;
private boolean discoveryV5Enabled = false;
private boolean filterOnEnrForkId = false;
private boolean filterOnEnrForkId = NetworkingConfiguration.DEFAULT_FILTER_ON_ENR_FORK_ID;

public static DiscoveryConfiguration create() {
return new DiscoveryConfiguration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public class NetworkingConfiguration {
public static final int DEFAULT_INITIATE_CONNECTIONS_FREQUENCY_SEC = 30;
public static final int DEFAULT_CHECK_MAINTAINED_CONNECTIONS_FREQUENCY_SEC = 60;
public static final int DEFAULT_PEER_LOWER_BOUND = 25;
public static final boolean DEFAULT_FILTER_ON_ENR_FORK_ID = true;

private DiscoveryConfiguration discovery = new DiscoveryConfiguration();
private RlpxConfiguration rlpx = new RlpxConfiguration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.hyperledger.besu.ethereum.p2p.discovery.internal.Packet;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerDiscoveryController;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerRequirement;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerTable;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.PingPacketData;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.TimerUtil;
import org.hyperledger.besu.ethereum.p2p.peers.EnodeURLImpl;
Expand Down Expand Up @@ -81,6 +82,7 @@ public abstract class PeerDiscoveryAgent {
private final MetricsSystem metricsSystem;
private final RlpxAgent rlpxAgent;
private final ForkIdManager forkIdManager;
private final PeerTable peerTable;

/* The peer controller, which takes care of the state machine of peers. */
protected Optional<PeerDiscoveryController> controller = Optional.empty();
Expand Down Expand Up @@ -109,7 +111,8 @@ protected PeerDiscoveryAgent(
final MetricsSystem metricsSystem,
final StorageProvider storageProvider,
final ForkIdManager forkIdManager,
final RlpxAgent rlpxAgent) {
final RlpxAgent rlpxAgent,
final PeerTable peerTable) {
this.metricsSystem = metricsSystem;
checkArgument(nodeKey != null, "nodeKey cannot be null");
checkArgument(config != null, "provided configuration cannot be null");
Expand All @@ -130,6 +133,7 @@ protected PeerDiscoveryAgent(
this.forkIdManager = forkIdManager;
this.forkIdSupplier = () -> forkIdManager.getForkIdForChainHead().getForkIdAsBytesList();
this.rlpxAgent = rlpxAgent;
this.peerTable = peerTable;
}

protected abstract TimerUtil createTimer();
Expand Down Expand Up @@ -263,9 +267,9 @@ private PeerDiscoveryController createController(final DiscoveryPeer localNode)
.peerRequirement(PeerRequirement.combine(peerRequirements))
.peerPermissions(peerPermissions)
.metricsSystem(metricsSystem)
.forkIdManager(forkIdManager)
.filterOnEnrForkId((config.isFilterOnEnrForkIdEnabled()))
.rlpxAgent(rlpxAgent)
.peerTable(peerTable)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.hyperledger.besu.ethereum.p2p.discovery.internal.Packet;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerDiscoveryController;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerDiscoveryController.AsyncExecutor;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerTable;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.TimerUtil;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.VertxTimerUtil;
import org.hyperledger.besu.ethereum.p2p.permissions.PeerPermissions;
Expand Down Expand Up @@ -73,7 +74,8 @@ public VertxPeerDiscoveryAgent(
final MetricsSystem metricsSystem,
final StorageProvider storageProvider,
final ForkIdManager forkIdManager,
final RlpxAgent rlpxAgent) {
final RlpxAgent rlpxAgent,
final PeerTable peerTable) {
super(
nodeKey,
config,
Expand All @@ -82,7 +84,8 @@ public VertxPeerDiscoveryAgent(
metricsSystem,
storageProvider,
forkIdManager,
rlpxAgent);
rlpxAgent,
peerTable);
checkArgument(vertx != null, "vertx instance cannot be null");
this.vertx = vertx;

Expand Down
Loading

0 comments on commit 921bc17

Please sign in to comment.