Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement getBlobs and bypass blob gossip validation on successful blob retrievals from EL #6913

Open
wants to merge 15 commits into
base: unstable
Choose a base branch
from
Open
12 changes: 11 additions & 1 deletion beacon_chain/consensus_object_pools/block_quarantine.nim
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# beacon_chain
# Copyright (c) 2018-2024 Status Research & Development GmbH
# Copyright (c) 2018-2025 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
Expand Down Expand Up @@ -381,6 +381,16 @@ func popBlobless*(
else:
Opt.none(ForkedSignedBeaconBlock)

func getBlobless*(
quarantine: var Quarantine,
root: Eth2Digest): Opt[ForkedSignedBeaconBlock] =
if quarantine.blobless.hasKey(root):
Opt.some((quarantine.blobless.getOrDefault(
root,
default(ForkedSignedBeaconBlock))))
else:
Opt.none(ForkedSignedBeaconBlock)

func popColumnless*(
quarantine: var Quarantine,
root: Eth2Digest): Opt[ForkedSignedBeaconBlock] =
Expand Down
59 changes: 59 additions & 0 deletions beacon_chain/el/el_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ const
# https://github.com/ethereum/execution-apis/blob/v1.0.0-beta.3/src/engine/shanghai.md#request-2
GETPAYLOAD_TIMEOUT = 1.seconds

# https://github.com/ethereum/execution-apis/blob/ad9102b11212d51b736a0413c8655a8da93e55fc/src/engine/cancun.md#request-3
GETBLOBS_TIMEOUT = 1.seconds
connectionStateChangeHysteresisThreshold = 15
## How many unsuccesful/successful requests we must see
## before declaring the connection as degraded/restored
Expand Down Expand Up @@ -862,6 +864,13 @@ proc sendNewPayloadToSingleEL(
payload, versioned_hashes, Hash32 parent_beacon_block_root,
executionRequests)

proc sendGetBlobsToSingleEL(
connection: ELConnection,
versioned_hashes: seq[engine_api.VersionedHash]
): Future[GetBlobsV1Response] {.async: (raises: [CatchableError]).} =
let rpcClient = await connection.connectedRpcClient()
await rpcClient.engine_getBlobsV1(versioned_hashes)

type
StatusRelation = enum
newStatusIsPreferable
Expand Down Expand Up @@ -990,6 +999,56 @@ proc lazyWait(futures: seq[FutureBase]) {.async: (raises: []).} =
if len(pending) > 0:
await noCancel allFutures(pending)

proc sendGetBlobs*(
m: ELManager,
blck: electra.SignedBeaconBlock | fulu.SignedBeaconBlock,
): Future[Opt[seq[BlobAndProofV1]]] {.async: (raises: [CancelledError]).} =
if m.elConnections.len == 0:
return err()
let
deadline = sleepAsync(GETBLOBS_TIMEOUT)

var bestResponse = Opt.none(int)

while true:
let
requests = m.elConnections.mapIt(
sendGetBlobsToSingleEL(it, mapIt(
blck.message.body.blob_kzg_commitments,
engine_api.VersionedHash(kzg_commitment_to_versioned_hash(it)))))
timeoutExceeded =
try:
await allFutures(requests).wait(deadline)
false
except AsyncTimeoutError:
true
except CancelledError as exc:
let pending =
requests.filterIt(not(it.finished())).mapIt(it.cancelAndWait())
await noCancel allFutures(pending)
raise exc

for idx, req in requests:
if not(req.finished()):
warn "Timeout while getting blob and proof",
url = m.elConnections[idx].engineUrl.url,
reason = req.error.msg
else:
if bestResponse.isNone:
bestResponse = Opt.some(idx)

let pending =
requests.filterIt(not(it.finished())).mapIt(it.cancelAndWait())
await noCancel allFutures(pending)

if bestResponse.isSome():
return ok(requests[bestResponse.get()].value())

if timeoutExceeded:
break

err()

proc sendNewPayload*(
m: ELManager,
blck: SomeForkyBeaconBlock,
Expand Down
72 changes: 66 additions & 6 deletions beacon_chain/gossip_processing/eth2_processor.nim
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# beacon_chain
# Copyright (c) 2018-2024 Status Research & Development GmbH
# Copyright (c) 2018-2025 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
Expand All @@ -8,10 +8,12 @@
{.push raises: [].}

import
std/tables,
std/[tables, sequtils],
chronicles, chronos, metrics,
taskpools,
ssz_serialization/types,
../spec/[helpers, forks],
../el/el_manager,
../consensus_object_pools/[
blob_quarantine, block_clearance, block_quarantine, blockchain_dag,
attestation_pool, light_client_pool, sync_committee_msg_pool,
Expand Down Expand Up @@ -138,6 +140,10 @@ type
# ----------------------------------------------------------------
batchCrypto*: ref BatchCrypto

# EL integration
# ----------------------------------------------------------------
elManager*: ELManager

# Missing information
# ----------------------------------------------------------------
quarantine*: ref Quarantine
Expand Down Expand Up @@ -169,6 +175,7 @@ proc new*(T: type Eth2Processor,
blobQuarantine: ref BlobQuarantine,
rng: ref HmacDrbgContext,
getBeaconTime: GetBeaconTimeFn,
elManager: ELManager,
taskpool: Taskpool
): ref Eth2Processor =
(ref Eth2Processor)(
Expand All @@ -186,6 +193,7 @@ proc new*(T: type Eth2Processor,
quarantine: quarantine,
blobQuarantine: blobQuarantine,
getCurrentBeaconTime: getBeaconTime,
elManager: elManager,
batchCrypto: BatchCrypto.new(
rng = rng,
# Only run eager attestation signature verification if we're not
Expand Down Expand Up @@ -266,10 +274,63 @@ proc processSignedBeaconBlock*(

v

proc validateBlobSidecarFromEL(
self: ref Eth2Processor,
block_root: Eth2Digest):
Future[Result[void, ValidationError]]
{.async: (raises: [CancelledError]).} =

if (let o = self.quarantine[].getBlobless(block_root); o.isSome):
let blobless = o.get()
withBlck(blobless):
when consensusFork >= ConsensusFork.Electra:
let blobsFromElOpt =
await self.elManager.sendGetBlobs(forkyBlck)
if blobsFromElOpt.get.len > 0 and blobsFromElOpt.isSome():
let blobsEl = blobsFromElOpt.get()

# check lengths of array[BlobAndProofV1] with blobs
# kzg commitments of the signed block
if blobsEl.len == forkyBlck.message.body.blob_kzg_commitments.len:

# we have got all blobs from EL, now we can
# conveniently the blobless block from quarantine
discard self.quarantine[].popBlobless(block_root)

let blob_sidecars_el =
create_blob_sidecars(
forkyBlck,
deneb.KzgProofs.init(blobsEl.mapIt(kzg.KzgProof(
bytes: it.proof.data))),
deneb.Blobs.init(blobsEl.mapIt(it.blob.data)))

for blb_el in blob_sidecars_el:
self.blobQuarantine[].put(newClone blb_el)

if self.blobQuarantine[].hasBlobs(forkyBlck):
self.blockProcessor[].enqueueBlock(
MsgSource.gossip, blobless,
Opt.some(self.blobQuarantine[].popBlobs(block_root, forkyBlck)))

debug "Pulled blobs from EL, bypassing blob gossip validation",
blobs_from_el = blobsEl.len
return ok()

else:
return errIgnore("EL did not respond with blobs and proofs")

proc processBlobSidecar*(
self: var Eth2Processor, src: MsgSource,
blobSidecar: deneb.BlobSidecar, subnet_id: BlobId): ValidationRes =
self: ref Eth2Processor, src: MsgSource,
blobSidecar: deneb.BlobSidecar, subnet_id: BlobId):
Future[ValidationRes] {.async: (raises: [CancelledError]).} =
template block_header: untyped = blobSidecar.signed_block_header.message
let block_root = hash_tree_root(block_header)

let vEl =
await self.validateBlobSidecarFromEL(block_root)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems like in the case of a pathological EL, this could delay things by 1 second?


if vEl.isOk():
return vEl

let
wallTime = self.getCurrentBeaconTime()
Expand All @@ -295,9 +356,8 @@ proc processBlobSidecar*(
debug "Blob validated, putting in blob quarantine"
self.blobQuarantine[].put(newClone(blobSidecar))

let block_root = hash_tree_root(block_header)
if (let o = self.quarantine[].popBlobless(block_root); o.isSome):
let blobless = o.unsafeGet()
let blobless = o.get()
withBlck(blobless):
when consensusFork >= ConsensusFork.Deneb:
if self.blobQuarantine[].hasBlobs(forkyBlck):
Expand Down
17 changes: 10 additions & 7 deletions beacon_chain/nimbus_beacon_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import
std/[os, random, terminal, times, exitprocs],
chronos, chronicles,
metrics, metrics/chronos_httpserver,
ssz_serialization/types,
stew/[byteutils, io2],
eth/p2p/discoveryv5/[enr, random2],
./consensus_object_pools/[
Expand Down Expand Up @@ -471,8 +472,9 @@ proc initFullNode(
else:
let blobs = blobQuarantine[].popBlobs(forkyBlck.root, forkyBlck)
await blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
Opt.some(blobs),
maybeFinalized = maybeFinalized)
Opt.some(blobs),
maybeFinalized = maybeFinalized)

else:
await blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
Opt.none(BlobSidecars),
Expand All @@ -499,7 +501,8 @@ proc initFullNode(
config.doppelgangerDetection,
blockProcessor, node.validatorMonitor, dag, attestationPool,
validatorChangePool, node.attachedValidators, syncCommitteeMsgPool,
lightClientPool, quarantine, blobQuarantine, rng, getBeaconTime, taskpool)
lightClientPool, quarantine, blobQuarantine, rng, getBeaconTime,
node.elManager, taskpool)
syncManagerFlags =
if node.config.longRangeSync != LongRangeSyncMode.Lenient:
{SyncManagerFlag.NoGenesisSync}
Expand Down Expand Up @@ -2111,12 +2114,12 @@ proc installMessageValidators(node: BeaconNode) =
for it in 0.BlobId ..< subnetCount.BlobId:
closureScope: # Needed for inner `proc`; don't lift it out of loop.
let subnet_id = it
node.network.addValidator(
node.network.addAsyncValidator(
getBlobSidecarTopic(digest, subnet_id), proc (
blobSidecar: deneb.BlobSidecar
): ValidationResult =
toValidationResult(
node.processor[].processBlobSidecar(
): Future[ValidationResult] {.async: (raises: [CancelledError]).} =
return toValidationResult(
await node.processor.processBlobSidecar(
MsgSource.gossip, blobSidecar, subnet_id)))

node.installLightClientMessageValidators()
Expand Down
Loading