Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Beacon sync revisting fc automove base #2961

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion nimbus/beacon/api_handler/api_forkchoice.nim
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ proc forkchoiceUpdated*(ben: BeaconEngineRef,

# Update sync header (if any)
com.syncReqNewHead(header)
com.reqBeaconSyncTargetCB(header, update.finalizedBlockHash)
com.reqBeaconSyncTargetCB(header)

return simpleFCU(PayloadExecutionStatus.syncing)

Expand Down
6 changes: 3 additions & 3 deletions nimbus/common/common.nim
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type
SyncReqNewHeadCB* = proc(header: Header) {.gcsafe, raises: [].}
## Update head for syncing

ReqBeaconSyncTargetCB* = proc(header: Header; finHash: Hash32) {.gcsafe, raises: [].}
ReqBeaconSyncTargetCB* = proc(header: Header) {.gcsafe, raises: [].}
## Ditto (for beacon sync)

NotifyBadBlockCB* = proc(invalid, origin: Header) {.gcsafe, raises: [].}
Expand Down Expand Up @@ -344,10 +344,10 @@ proc syncReqNewHead*(com: CommonRef; header: Header)
if not com.syncReqNewHead.isNil:
com.syncReqNewHead(header)

proc reqBeaconSyncTargetCB*(com: CommonRef; header: Header; finHash: Hash32) =
proc reqBeaconSyncTargetCB*(com: CommonRef; header: Header) =
## Used by RPC updater
if not com.reqBeaconSyncTargetCB.isNil:
com.reqBeaconSyncTargetCB(header, finHash)
com.reqBeaconSyncTargetCB(header)

proc notifyBadBlock*(com: CommonRef; invalid, origin: Header)
{.gcsafe, raises: [].} =
Expand Down
62 changes: 59 additions & 3 deletions nimbus/core/chain/forked_chain.nim
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,50 @@ proc updateHeadIfNecessary(c: ForkedChainRef, pvarc: PivotArc) =

c.setHead(pvarc)

proc autoUpdateBase(c: ForkedChainRef): Result[void, string] =
## To be called after`importBlock()` for implied `base` update so that
## there is no need to know about a finalised block. Here the `base` is
## kept at a certain distance from the current `latest` cursor head.
##
# This function code is a tweaked version of `importBlockBlindly()`
# from draft PR #2845.
#
let
distanceFromBase = c.cursorHeader.number - c.baseHeader.number
hysteresis = max(1'u64, min(c.baseDistance div 4'u64, 32'u64))
# Finalizer threshold is baseDistance + 25% of baseDistancce capped at 32.
if distanceFromBase < c.baseDistance + hysteresis:
return ok()

# Move the base forward and stay away `baseDistance` blocks from
# the top block.
let
target = c.cursorHeader.number - c.baseDistance
pvarc = ?c.findCursorArc(c.cursorHash)
newBase = c.calculateNewBase(target, pvarc)

doAssert newBase.pvHash != c.baseHash

# Write segment from base+1 to newBase into database
c.stagingTx.rollback()
c.stagingTx = c.db.ctx.txFrameBegin()
c.replaySegment(newBase.pvHash)
c.writeBaggage(newBase.pvHash)
c.stagingTx.commit()
c.stagingTx = nil

# Update base forward to newBase
c.updateBase(newBase)
c.db.persistent(newBase.pvNumber).isOkOr:
return err("Failed to save state: " & $$error)

# Move chain state forward to current head
c.stagingTx = c.db.ctx.txFrameBegin()
c.replaySegment(pvarc.pvHash)
c.setHead(pvarc)

ok()

# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
Expand Down Expand Up @@ -523,7 +567,11 @@ proc newForkedChain*(com: CommonRef,
com.syncStart = baseHeader.number
chain

proc importBlock*(c: ForkedChainRef, blk: Block): Result[void, string] =
proc importBlock*(
c: ForkedChainRef;
blk: Block;
autoRebase = false;
): Result[void, string] =
# Try to import block to canonical or side chain.
# return error if the block is invalid
if c.stagingTx.isNil:
Expand All @@ -533,7 +581,10 @@ proc importBlock*(c: ForkedChainRef, blk: Block): Result[void, string] =
blk.header

if header.parentHash == c.cursorHash:
return c.validateBlock(c.cursorHeader, blk)
?c.validateBlock(c.cursorHeader, blk)
if autoRebase:
return c.autoUpdateBase()
return ok()

if header.parentHash == c.baseHash:
c.stagingTx.rollback()
Expand All @@ -555,7 +606,12 @@ proc importBlock*(c: ForkedChainRef, blk: Block): Result[void, string] =
# `base` is the point of no return, we only update it on finality.

c.replaySegment(header.parentHash)
c.validateBlock(c.cursorHeader, blk)
?c.validateBlock(c.cursorHeader, blk)
if autoRebase:
return c.autoUpdateBase()

ok()


proc forkChoice*(c: ForkedChainRef,
headHash: Hash32,
Expand Down
4 changes: 2 additions & 2 deletions nimbus/sync/beacon/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ Running the sync process for *MainNet*
--------------------------------------

For syncing, a beacon node is needed that regularly informs via *RPC* of a
recently finalised block header.
recent target block header.

The beacon node program used here is the *nimbus_beacon_node* binary from the
*nimbus-eth2* project (any other, e.g.the *light client* will do.)
Expand All @@ -230,7 +230,7 @@ The beacon node program used here is the *nimbus_beacon_node* binary from the
--jwt-secret=/tmp/jwtsecret

where *http://127.0.0.1:8551* is the URL of the sync process that receives the
finalised block header (here on the same physical machine) and `/tmp/jwtsecret`
target block headers (here on the same physical machine) and `/tmp/jwtsecret`
is the shared secret file needed for mutual communication authentication.

It will take a while for *nimbus_beacon_node* to catch up (see the
Expand Down
4 changes: 0 additions & 4 deletions nimbus/sync/beacon/worker.nim
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,6 @@ proc runPeer*(
buddy.only.multiRunIdle = Moment.now() - buddy.only.stoppedMultiRun
buddy.only.nMultiLoop.inc # statistics/debugging

# Update consensus header target when needed. It comes with a finalised
# header hash where we need to complete the block number.
await buddy.headerStagedUpdateTarget info

if not await buddy.napUnlessSomethingToFetch():
#
# Layout of a triple of linked header chains (see `README.md`)
Expand Down
31 changes: 3 additions & 28 deletions nimbus/sync/beacon/worker/blocks_staged.nim
Original file line number Diff line number Diff line change
Expand Up @@ -291,8 +291,7 @@ proc blocksStagedImport*(
iv = BnRange.new(qItem.key, qItem.key + nBlocks.uint64 - 1)

info "Importing blocks", iv, nBlocks,
base=ctx.chain.baseNumber.bnStr, head=ctx.chain.latestNumber.bnStr,
target=ctx.layout.final.bnStr
base=ctx.chain.baseNumber.bnStr, head=ctx.chain.latestNumber.bnStr

var maxImport = iv.maxPt
block importLoop:
Expand All @@ -312,7 +311,7 @@ proc blocksStagedImport*(
B=ctx.chain.baseNumber.bnStr, L=ctx.chain.latestNumber.bnStr,
nthBn=nBn.bnStr, nthHash=qItem.data.getNthHash(n).short
continue
ctx.pool.chain.importBlock(qItem.data.blocks[n]).isOkOr:
ctx.pool.chain.importBlock(qItem.data.blocks[n], autoRebase=true).isOkOr:
warn info & ": import block error", n, iv,
B=ctx.chain.baseNumber.bnStr, L=ctx.chain.latestNumber.bnStr,
nthBn=nBn.bnStr, nthHash=qItem.data.getNthHash(n).short, `error`=error
Expand All @@ -331,30 +330,6 @@ proc blocksStagedImport*(
maxImport = ctx.chain.latestNumber()
break importLoop

# Occasionally mark the chain finalized
if (n + 1) mod finaliserChainLengthMax == 0 or (n + 1) == nBlocks:
let
nthHash = qItem.data.getNthHash(n)
finHash = if nBn < ctx.layout.final: nthHash
else: ctx.layout.finalHash

doAssert nBn == ctx.chain.latestNumber()
ctx.pool.chain.forkChoice(nthHash, finHash).isOkOr:
warn info & ": fork choice error", n, iv,
B=ctx.chain.baseNumber.bnStr, L=ctx.chain.latestNumber.bnStr,
F=ctx.layout.final.bnStr, nthBn=nBn.bnStr, nthHash=nthHash.short,
finHash=(if finHash == nthHash: "nthHash" else: "F"), `error`=error
# Restore what is left over below
maxImport = ctx.chain.latestNumber()
break importLoop

# Allow pseudo/async thread switch.
try: await sleepAsync asyncThreadSwitchTimeSlot
except CancelledError: discard
if not ctx.daemon:
maxImport = ctx.chain.latestNumber()
break importLoop

# Import probably incomplete, so a partial roll back may be needed
if maxImport < iv.maxPt:
ctx.blocksUnprocCommit(0, maxImport+1, qItem.data.blocks[^1].header.number)
Expand All @@ -367,7 +342,7 @@ proc blocksStagedImport*(
ctx.updateMetrics()

info "Import done", iv, nBlocks, base=ctx.chain.baseNumber.bnStr,
head=ctx.chain.latestNumber.bnStr, target=ctx.layout.final.bnStr
head=ctx.chain.latestNumber.bnStr
return true

# ------------------------------------------------------------------------------
Expand Down
4 changes: 0 additions & 4 deletions nimbus/sync/beacon/worker/db.nim
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,6 @@ proc dbLoadSyncStateLayout*(ctx: BeaconCtxRef; info: static[string]): bool =
# If there was a manual import after a previous sync, then saved state
# might be outdated.
if rc.isOk and
# The base number is the least record of the FCU chains/tree. So the
# finalised entry must not be smaller.
ctx.chain.baseNumber() <= rc.value.final and

# If the latest FCU number is not larger than the head, there is nothing
# to do (might also happen after a manual import.)
latest < rc.value.head and
Expand Down
31 changes: 1 addition & 30 deletions nimbus/sync/beacon/worker/headers_staged.nim
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import
../worker_desc,
./update/metrics,
./headers_staged/[headers, linked_hchain],
"."/[headers_unproc, update]
./headers_unproc

# ------------------------------------------------------------------------------
# Private functions
Expand Down Expand Up @@ -53,35 +53,6 @@ proc fetchAndCheck(
# Public functions
# ------------------------------------------------------------------------------

proc headerStagedUpdateTarget*(
buddy: BeaconBuddyRef;
info: static[string];
) {.async: (raises: []).} =
## Fetch finalised beacon header if there is an update available
let
ctx = buddy.ctx
peer = buddy.peer
if ctx.layout.lastState == idleSyncState and
ctx.target.final == 0 and
ctx.target.finalHash != zeroHash32 and
not ctx.target.locked:
const iv = BnRange.new(1u,1u) # dummy interval

ctx.target.locked = true
let rc = await buddy.headersFetchReversed(iv, ctx.target.finalHash, info)
ctx.target.locked = false

if rc.isOk:
let hash = rlp.encode(rc.value[0]).keccak256
if hash != ctx.target.finalHash:
# Oops
buddy.ctrl.zombie = true
debug info & ": finalised header hash mismatch", peer, hash,
expected=ctx.target.finalHash
else:
ctx.updateFinalBlockHeader(rc.value[0], ctx.target.finalHash, info)


proc headersStagedCollect*(
buddy: BeaconBuddyRef;
info: static[string];
Expand Down
28 changes: 5 additions & 23 deletions nimbus/sync/beacon/worker/start_stop.nim
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ when enableTicker:
head: ctx.layout.head,
headOk: ctx.layout.lastState != idleSyncState,
target: ctx.target.consHead.number,
targetOk: ctx.target.final != 0,
targetOk: ctx.target.changed,

nHdrStaged: ctx.headersStagedQueueLen(),
hdrStagedTop: ctx.headersStagedQueueTopKey(),
Expand All @@ -62,31 +62,13 @@ proc updateBeaconHeaderCB(
): ReqBeaconSyncTargetCB =
## Update beacon header. This function is intended as a call back function
## for the RPC module.
return proc(h: Header; f: Hash32) {.gcsafe, raises: [].} =

# Check whether there is an update running (otherwise take next upate)
if not ctx.target.locked and # ignore if currently updating
ctx.target.final == 0 and # ignore if complete already
f != zeroHash32 and # finalised hash is set
return proc(h: Header) {.gcsafe, raises: [].} =
if ctx.chain.baseNumber() < h.number and # sanity check
ctx.layout.head < h.number and # update is advancing
ctx.target.consHead.number < h.number: # .. ditto

ctx.target.consHead = h
ctx.target.finalHash = f
ctx.target.changed = true

# Check whether `FC` knows about the finalised block already.
#
# On a full node, all blocks before the current state are stored on the
# database which is also accessed by `FC`. So one can already decude here
# whether `FC` id capable of handling that finalised block (the number of
# must be at least the `base` from `FC`.)
#
# Otherwise the block header will need to be fetched from a peer when
# available and checked there (see `headerStagedUpdateTarget()`.)
#
let finHdr = ctx.chain.headerByHash(f).valueOr: return
ctx.updateFinalBlockHeader(finHdr, f, info)
ctx.target.changed = true # enable this dataset
ctx.updateFromHibernating info # wake up if sleeping

# ------------------------------------------------------------------------------
# Public functions
Expand Down
44 changes: 11 additions & 33 deletions nimbus/sync/beacon/worker/update.nim
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,6 @@ proc setupCollectingHeaders(ctx: BeaconCtxRef; info: static[string]) =
ctx.sst.layout = SyncStateLayout(
coupler: c,
dangling: h,
final: ctx.target.final,
finalHash: ctx.target.finalHash,
head: h,
lastState: collectingHeaders) # state transition

Expand Down Expand Up @@ -282,17 +280,16 @@ proc updateSyncState*(ctx: BeaconCtxRef; info: static[string]) =
# Check whether the system has been idle and a new header download
# session can be set up
if prevState == idleSyncState and
ctx.target.changed and # and there is a new target from CL
ctx.target.final != 0: # .. ditto
ctx.target.changed: # and there is a new target from CL
ctx.setupCollectingHeaders info # set up new header sync
return
# Notreached

info "Sync state changed", prevState, thisState,
head=ctx.chain.latestNumber.bnStr,
oldBase=(if ctx.layout.coupler == ctx.layout.dangling: "downloaded"
coupler=(if ctx.layout.coupler == ctx.layout.dangling: "dangling"
else: ctx.layout.coupler.bnStr),
downloaded=(if ctx.layout.dangling == ctx.layout.head: "target"
dangling=(if ctx.layout.dangling == ctx.layout.head: "target"
else: ctx.layout.dangling.bnStr),
target=ctx.layout.head.bnStr

Expand All @@ -313,33 +310,14 @@ proc updateSyncState*(ctx: BeaconCtxRef; info: static[string]) =
ctx.startHibernating info


proc updateFinalBlockHeader*(
ctx: BeaconCtxRef;
finHdr: Header;
finHash: Hash32;
info: static[string];
) =
## Update the finalised header cache. If the finalised header is acceptable,
## the syncer will be activated from hibernation if necessary.
##
let
b = ctx.chain.baseNumber()
f = finHdr.number
if f < b:
trace info & ": finalised block # too low",
B=b.bnStr, finalised=f.bnStr, delta=(b - f)

ctx.target.reset

else:
ctx.target.final = f
ctx.target.finalHash = finHash

# Activate running (unless done yet)
if ctx.hibernate:
ctx.hibernate = false
info "Activating syncer", base=b.bnStr, head=ctx.chain.latestNumber.bnStr,
finalised=f.bnStr, target=ctx.target.consHead.bnStr
proc updateFromHibernating*(ctx: BeaconCtxRef; info: static[string]) =
## Activate syncer if hibernating.
if ctx.hibernate:
ctx.hibernate = false # activates syncer
debug info & ": activating syncer", T=ctx.target.consHead.bnStr

# Re-calculate sync state
ctx.updateSyncState info

# Update, so it can be followed nicely
ctx.updateMetrics()
Expand Down
4 changes: 0 additions & 4 deletions nimbus/sync/beacon/worker_config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,6 @@ const
## entry block number is too high and so leaves a gap to the ledger state
## block number.)

finaliserChainLengthMax* = 32
## When importing with `importBlock()`, finalise after at most this many
## invocations of `importBlock()`.

# ----------------------

static:
Expand Down
Loading
Loading