diff --git a/beacon_chain/consensus_object_pools/block_quarantine.nim b/beacon_chain/consensus_object_pools/block_quarantine.nim index da33499be1..be8b300fcd 100644 --- a/beacon_chain/consensus_object_pools/block_quarantine.nim +++ b/beacon_chain/consensus_object_pools/block_quarantine.nim @@ -1,5 +1,5 @@ # beacon_chain -# Copyright (c) 2018-2024 Status Research & Development GmbH +# Copyright (c) 2018-2025 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). @@ -381,6 +381,16 @@ func popBlobless*( else: Opt.none(ForkedSignedBeaconBlock) +func getBlobless*( + quarantine: var Quarantine, + root: Eth2Digest): Opt[ForkedSignedBeaconBlock] = + if quarantine.blobless.hasKey(root): + Opt.some((quarantine.blobless.getOrDefault( + root, + default(ForkedSignedBeaconBlock)))) + else: + Opt.none(ForkedSignedBeaconBlock) + func popColumnless*( quarantine: var Quarantine, root: Eth2Digest): Opt[ForkedSignedBeaconBlock] = diff --git a/beacon_chain/el/el_manager.nim b/beacon_chain/el/el_manager.nim index c5c2e3c93c..20c6864148 100644 --- a/beacon_chain/el/el_manager.nim +++ b/beacon_chain/el/el_manager.nim @@ -108,6 +108,8 @@ const # https://github.com/ethereum/execution-apis/blob/v1.0.0-beta.3/src/engine/shanghai.md#request-2 GETPAYLOAD_TIMEOUT = 1.seconds + # https://github.com/ethereum/execution-apis/blob/ad9102b11212d51b736a0413c8655a8da93e55fc/src/engine/cancun.md#request-3 + GETBLOBS_TIMEOUT = 1.seconds connectionStateChangeHysteresisThreshold = 15 ## How many unsuccesful/successful requests we must see ## before declaring the connection as degraded/restored @@ -862,6 +864,13 @@ proc sendNewPayloadToSingleEL( payload, versioned_hashes, Hash32 parent_beacon_block_root, executionRequests) +proc sendGetBlobsToSingleEL( + connection: ELConnection, + versioned_hashes: seq[engine_api.VersionedHash] +): Future[GetBlobsV1Response] {.async: (raises: [CatchableError]).} = + let rpcClient = await connection.connectedRpcClient() + await rpcClient.engine_getBlobsV1(versioned_hashes) + type StatusRelation = enum newStatusIsPreferable @@ -990,6 +999,56 @@ proc lazyWait(futures: seq[FutureBase]) {.async: (raises: []).} = if len(pending) > 0: await noCancel allFutures(pending) +proc sendGetBlobs*( + m: ELManager, + blck: electra.SignedBeaconBlock | fulu.SignedBeaconBlock, +): Future[Opt[seq[BlobAndProofV1]]] {.async: (raises: [CancelledError]).} = + if m.elConnections.len == 0: + return err() + let + deadline = sleepAsync(GETBLOBS_TIMEOUT) + + var bestResponse = Opt.none(int) + + while true: + let + requests = m.elConnections.mapIt( + sendGetBlobsToSingleEL(it, mapIt( + blck.message.body.blob_kzg_commitments, + engine_api.VersionedHash(kzg_commitment_to_versioned_hash(it))))) + timeoutExceeded = + try: + await allFutures(requests).wait(deadline) + false + except AsyncTimeoutError: + true + except CancelledError as exc: + let pending = + requests.filterIt(not(it.finished())).mapIt(it.cancelAndWait()) + await noCancel allFutures(pending) + raise exc + + for idx, req in requests: + if not(req.finished()): + warn "Timeout while getting blob and proof", + url = m.elConnections[idx].engineUrl.url, + reason = req.error.msg + else: + if bestResponse.isNone: + bestResponse = Opt.some(idx) + + let pending = + requests.filterIt(not(it.finished())).mapIt(it.cancelAndWait()) + await noCancel allFutures(pending) + + if bestResponse.isSome(): + return ok(requests[bestResponse.get()].value()) + + if timeoutExceeded: + break + + err() + proc sendNewPayload*( m: ELManager, blck: SomeForkyBeaconBlock, diff --git a/beacon_chain/gossip_processing/eth2_processor.nim b/beacon_chain/gossip_processing/eth2_processor.nim index 276d7e7c3c..45a65f7fd1 100644 --- a/beacon_chain/gossip_processing/eth2_processor.nim +++ b/beacon_chain/gossip_processing/eth2_processor.nim @@ -1,5 +1,5 @@ # beacon_chain -# Copyright (c) 2018-2024 Status Research & Development GmbH +# Copyright (c) 2018-2025 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). @@ -8,10 +8,12 @@ {.push raises: [].} import - std/tables, + std/[tables, sequtils], chronicles, chronos, metrics, taskpools, + ssz_serialization/types, ../spec/[helpers, forks], + ../el/el_manager, ../consensus_object_pools/[ blob_quarantine, block_clearance, block_quarantine, blockchain_dag, attestation_pool, light_client_pool, sync_committee_msg_pool, @@ -138,6 +140,10 @@ type # ---------------------------------------------------------------- batchCrypto*: ref BatchCrypto + # EL integration + # ---------------------------------------------------------------- + elManager*: ELManager + # Missing information # ---------------------------------------------------------------- quarantine*: ref Quarantine @@ -169,6 +175,7 @@ proc new*(T: type Eth2Processor, blobQuarantine: ref BlobQuarantine, rng: ref HmacDrbgContext, getBeaconTime: GetBeaconTimeFn, + elManager: ELManager, taskpool: Taskpool ): ref Eth2Processor = (ref Eth2Processor)( @@ -186,6 +193,7 @@ proc new*(T: type Eth2Processor, quarantine: quarantine, blobQuarantine: blobQuarantine, getCurrentBeaconTime: getBeaconTime, + elManager: elManager, batchCrypto: BatchCrypto.new( rng = rng, # Only run eager attestation signature verification if we're not @@ -266,10 +274,63 @@ proc processSignedBeaconBlock*( v +proc validateBlobSidecarFromEL( + self: ref Eth2Processor, + block_root: Eth2Digest): + Future[Result[void, ValidationError]] + {.async: (raises: [CancelledError]).} = + + if (let o = self.quarantine[].getBlobless(block_root); o.isSome): + let blobless = o.get() + withBlck(blobless): + when consensusFork >= ConsensusFork.Electra: + let blobsFromElOpt = + await self.elManager.sendGetBlobs(forkyBlck) + if blobsFromElOpt.get.len > 0 and blobsFromElOpt.isSome(): + let blobsEl = blobsFromElOpt.get() + + # check lengths of array[BlobAndProofV1] with blobs + # kzg commitments of the signed block + if blobsEl.len == forkyBlck.message.body.blob_kzg_commitments.len: + + # we have got all blobs from EL, now we can + # conveniently the blobless block from quarantine + discard self.quarantine[].popBlobless(block_root) + + let blob_sidecars_el = + create_blob_sidecars( + forkyBlck, + deneb.KzgProofs.init(blobsEl.mapIt(kzg.KzgProof( + bytes: it.proof.data))), + deneb.Blobs.init(blobsEl.mapIt(it.blob.data))) + + for blb_el in blob_sidecars_el: + self.blobQuarantine[].put(newClone blb_el) + + if self.blobQuarantine[].hasBlobs(forkyBlck): + self.blockProcessor[].enqueueBlock( + MsgSource.gossip, blobless, + Opt.some(self.blobQuarantine[].popBlobs(block_root, forkyBlck))) + + debug "Pulled blobs from EL, bypassing blob gossip validation", + blobs_from_el = blobsEl.len + return ok() + + else: + return errIgnore("EL did not respond with blobs and proofs") + proc processBlobSidecar*( - self: var Eth2Processor, src: MsgSource, - blobSidecar: deneb.BlobSidecar, subnet_id: BlobId): ValidationRes = + self: ref Eth2Processor, src: MsgSource, + blobSidecar: deneb.BlobSidecar, subnet_id: BlobId): + Future[ValidationRes] {.async: (raises: [CancelledError]).} = template block_header: untyped = blobSidecar.signed_block_header.message + let block_root = hash_tree_root(block_header) + + let vEl = + await self.validateBlobSidecarFromEL(block_root) + + if vEl.isOk(): + return vEl let wallTime = self.getCurrentBeaconTime() @@ -295,9 +356,8 @@ proc processBlobSidecar*( debug "Blob validated, putting in blob quarantine" self.blobQuarantine[].put(newClone(blobSidecar)) - let block_root = hash_tree_root(block_header) if (let o = self.quarantine[].popBlobless(block_root); o.isSome): - let blobless = o.unsafeGet() + let blobless = o.get() withBlck(blobless): when consensusFork >= ConsensusFork.Deneb: if self.blobQuarantine[].hasBlobs(forkyBlck): diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index e4d8152ac0..924e526939 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -11,6 +11,7 @@ import std/[os, random, terminal, times, exitprocs], chronos, chronicles, metrics, metrics/chronos_httpserver, + ssz_serialization/types, stew/[byteutils, io2], eth/p2p/discoveryv5/[enr, random2], ./consensus_object_pools/[ @@ -471,8 +472,9 @@ proc initFullNode( else: let blobs = blobQuarantine[].popBlobs(forkyBlck.root, forkyBlck) await blockProcessor[].addBlock(MsgSource.gossip, signedBlock, - Opt.some(blobs), - maybeFinalized = maybeFinalized) + Opt.some(blobs), + maybeFinalized = maybeFinalized) + else: await blockProcessor[].addBlock(MsgSource.gossip, signedBlock, Opt.none(BlobSidecars), @@ -499,7 +501,8 @@ proc initFullNode( config.doppelgangerDetection, blockProcessor, node.validatorMonitor, dag, attestationPool, validatorChangePool, node.attachedValidators, syncCommitteeMsgPool, - lightClientPool, quarantine, blobQuarantine, rng, getBeaconTime, taskpool) + lightClientPool, quarantine, blobQuarantine, rng, getBeaconTime, + node.elManager, taskpool) syncManagerFlags = if node.config.longRangeSync != LongRangeSyncMode.Lenient: {SyncManagerFlag.NoGenesisSync} @@ -2111,12 +2114,12 @@ proc installMessageValidators(node: BeaconNode) = for it in 0.BlobId ..< subnetCount.BlobId: closureScope: # Needed for inner `proc`; don't lift it out of loop. let subnet_id = it - node.network.addValidator( + node.network.addAsyncValidator( getBlobSidecarTopic(digest, subnet_id), proc ( blobSidecar: deneb.BlobSidecar - ): ValidationResult = - toValidationResult( - node.processor[].processBlobSidecar( + ): Future[ValidationResult] {.async: (raises: [CancelledError]).} = + return toValidationResult( + await node.processor.processBlobSidecar( MsgSource.gossip, blobSidecar, subnet_id))) node.installLightClientMessageValidators() diff --git a/vendor/nim-web3 b/vendor/nim-web3 index a3bc5ad48e..e9640d65ec 160000 --- a/vendor/nim-web3 +++ b/vendor/nim-web3 @@ -1 +1 @@ -Subproject commit a3bc5ad48e2b05fa253ba68bbd5b84e4ea234f50 +Subproject commit e9640d65eca5618291438bf6e98f6ea21f4c1d03