Skip to content

Commit

Permalink
Beacon sync maintenance update (#3012)
Browse files Browse the repository at this point in the history
* Force metrics update when peers vanish

why:
  After that there might be reduced activity so that the next metrics
  update is delayed.

* Update comments (code cosmetics)

* Tidy up nano-sleep wait directives to an `update.nim`-function

* Fix copyright year
  • Loading branch information
mjfh authored Jan 20, 2025
1 parent 5b74335 commit 3aae33d
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 17 deletions.
1 change: 1 addition & 0 deletions nimbus/sync/beacon/worker.nim
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ proc napUnlessSomethingToFetch(
buddy.ctrl.zombie = true
return true
else:
# Returning `false` => no need to check for shutdown
return false

# ------------------------------------------------------------------------------
Expand Down
11 changes: 3 additions & 8 deletions nimbus/sync/beacon/worker/blocks_staged.nim
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import
../worker_desc,
./blocks_staged/bodies,
./update/metrics,
"."/[blocks_unproc, db, helpers]
"."/[blocks_unproc, db, helpers, update]

# ------------------------------------------------------------------------------
# Private debugging & logging helpers
Expand Down Expand Up @@ -324,10 +324,7 @@ proc blocksStagedImport*(
ctx.updateMetrics()

# Allow pseudo/async thread switch.
try: await sleepAsync asyncThreadSwitchTimeSlot
except CancelledError: discard
if not ctx.daemon:
# Shutdown?
(await ctx.updateAsyncTasks()).isOkOr:
maxImport = ctx.chain.latestNumber()
break importLoop

Expand All @@ -349,9 +346,7 @@ proc blocksStagedImport*(
break importLoop

# Allow pseudo/async thread switch.
try: await sleepAsync asyncThreadSwitchTimeSlot
except CancelledError: discard
if not ctx.daemon:
(await ctx.updateAsyncTasks()).isOkOr:
maxImport = ctx.chain.latestNumber()
break importLoop

Expand Down
2 changes: 1 addition & 1 deletion nimbus/sync/beacon/worker/headers_staged.nim
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ proc headersStagedCollect*(
isOpportunistic, ctrl=buddy.ctrl.state, hdrErrors=buddy.hdrErrors
ctx.headersUnprocCommit(iv.len, iv)
# At this stage allow a task switch so that some other peer might try
# to work on the currently returned interval.
# to continue work on the currently returned interval.
try: await sleepAsync asyncThreadSwitchTimeSlot
except CancelledError: discard
return false
Expand Down
8 changes: 6 additions & 2 deletions nimbus/sync/beacon/worker/start_stop.nim
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import
../worker_desc,
./blocks_staged/staged_queue,
./headers_staged/staged_queue,
./update/metrics,
"."/[blocks_unproc, db, headers_unproc, update]

when enableTicker:
Expand Down Expand Up @@ -155,11 +156,14 @@ proc startBuddy*(buddy: BeaconBuddyRef): bool =
ctx = buddy.ctx
peer = buddy.peer
if peer.supports(protocol.eth) and peer.state(protocol.eth).initialized:
ctx.pool.nBuddies.inc # for metrics
ctx.pool.nBuddies.inc
ctx.updateMetrics()
return true

proc stopBuddy*(buddy: BeaconBuddyRef) =
buddy.ctx.pool.nBuddies.dec # for metrics
let ctx = buddy.ctx
ctx.pool.nBuddies.dec
ctx.updateMetrics(force=(ctx.pool.nBuddies == 0))

# ------------------------------------------------------------------------------
# End
Expand Down
20 changes: 20 additions & 0 deletions nimbus/sync/beacon/worker/update.nim
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,26 @@ proc updateFinalBlockHeader*(
# Update, so it can be followed nicely
ctx.updateMetrics()


proc updateAsyncTasks*(
ctx: BeaconCtxRef;
): Future[Opt[void]] {.async: (raises: []).} =
## Allow task switch by issuing a short sleep request. The `due` argument
## allows to maintain a minimum time gap when invoking this function.
let start = Moment.now()
if ctx.pool.nextAsyncNanoSleep < start:

try: await sleepAsync asyncThreadSwitchTimeSlot
except CancelledError: discard

if ctx.daemon:
ctx.pool.nextAsyncNanoSleep = Moment.now() + asyncThreadSwitchGap
return ok()
# Shutdown?
return err()

return ok()

# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------
6 changes: 3 additions & 3 deletions nimbus/sync/beacon/worker/update/metrics.nim
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,10 @@ template updateMetricsImpl(ctx: BeaconCtxRef) =

# ---------------

proc updateMetrics*(ctx: BeaconCtxRef) =
proc updateMetrics*(ctx: BeaconCtxRef; force = false) =
let now = Moment.now()
if ctx.pool.nextUpdate < now:
if ctx.pool.nextMetricsUpdate < now or force:
ctx.updateMetricsImpl()
ctx.pool.nextUpdate = now + metricsUpdateInterval
ctx.pool.nextMetricsUpdate = now + metricsUpdateInterval

# End
7 changes: 6 additions & 1 deletion nimbus/sync/beacon/worker_config.nim
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Nimbus
# Copyright (c) 2021-2024 Status Research & Development GmbH
# Copyright (c) 2021-2025 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at
# https://opensource.org/licenses/MIT).
Expand Down Expand Up @@ -46,6 +46,11 @@ const
asyncThreadSwitchTimeSlot* = chronos.nanoseconds(1)
## Nano-sleep to allows pseudo/async thread switch

asyncThreadSwitchGap* = chronos.milliseconds(300)
## Controls nano-sleep tart switch density when using this in a loop (e.g.
## for processing lists.) The constant requires a minimum time gap when
## invoking a nano-sleep utility.

# ----------------------

nFetchHeadersRequest* = 1_024
Expand Down
5 changes: 3 additions & 2 deletions nimbus/sync/beacon/worker_desc.nim
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Nimbus
# Copyright (c) 2021-2024 Status Research & Development GmbH
# Copyright (c) 2021-2025 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at
# https://opensource.org/licenses/MIT).
Expand Down Expand Up @@ -142,7 +142,8 @@ type
syncState*: SyncState ## Save/resume state descriptor
hdrSync*: HeaderImportSync ## Syncing by linked header chains
blkSync*: BlocksImportSync ## For importing/executing blocks
nextUpdate*: Moment ## For updating metrics
nextMetricsUpdate*: Moment ## For updating metrics
nextAsyncNanoSleep*: Moment ## Use nano-sleeps for task switch

# Blocks import/execution settings for importing with
# `nBodiesBatch` blocks in each round (minimum value is
Expand Down

0 comments on commit 3aae33d

Please sign in to comment.