diff --git a/nimbus/sync/beacon/worker.nim b/nimbus/sync/beacon/worker.nim index eb327b33f9..4259141694 100644 --- a/nimbus/sync/beacon/worker.nim +++ b/nimbus/sync/beacon/worker.nim @@ -46,6 +46,7 @@ proc napUnlessSomethingToFetch( buddy.ctrl.zombie = true return true else: + # Returning `false` => no need to check for shutdown return false # ------------------------------------------------------------------------------ diff --git a/nimbus/sync/beacon/worker/blocks_staged.nim b/nimbus/sync/beacon/worker/blocks_staged.nim index 209dcfd1e8..7c663fd768 100644 --- a/nimbus/sync/beacon/worker/blocks_staged.nim +++ b/nimbus/sync/beacon/worker/blocks_staged.nim @@ -18,7 +18,7 @@ import ../worker_desc, ./blocks_staged/bodies, ./update/metrics, - "."/[blocks_unproc, db, helpers] + "."/[blocks_unproc, db, helpers, update] # ------------------------------------------------------------------------------ # Private debugging & logging helpers @@ -324,10 +324,7 @@ proc blocksStagedImport*( ctx.updateMetrics() # Allow pseudo/async thread switch. - try: await sleepAsync asyncThreadSwitchTimeSlot - except CancelledError: discard - if not ctx.daemon: - # Shutdown? + (await ctx.updateAsyncTasks()).isOkOr: maxImport = ctx.chain.latestNumber() break importLoop @@ -349,9 +346,7 @@ proc blocksStagedImport*( break importLoop # Allow pseudo/async thread switch. - try: await sleepAsync asyncThreadSwitchTimeSlot - except CancelledError: discard - if not ctx.daemon: + (await ctx.updateAsyncTasks()).isOkOr: maxImport = ctx.chain.latestNumber() break importLoop diff --git a/nimbus/sync/beacon/worker/headers_staged.nim b/nimbus/sync/beacon/worker/headers_staged.nim index f5518fed86..84a9413725 100644 --- a/nimbus/sync/beacon/worker/headers_staged.nim +++ b/nimbus/sync/beacon/worker/headers_staged.nim @@ -166,7 +166,7 @@ proc headersStagedCollect*( isOpportunistic, ctrl=buddy.ctrl.state, hdrErrors=buddy.hdrErrors ctx.headersUnprocCommit(iv.len, iv) # At this stage allow a task switch so that some other peer might try - # to work on the currently returned interval. + # to continue work on the currently returned interval. try: await sleepAsync asyncThreadSwitchTimeSlot except CancelledError: discard return false diff --git a/nimbus/sync/beacon/worker/start_stop.nim b/nimbus/sync/beacon/worker/start_stop.nim index 43fd9b8a1c..38438399ac 100644 --- a/nimbus/sync/beacon/worker/start_stop.nim +++ b/nimbus/sync/beacon/worker/start_stop.nim @@ -17,6 +17,7 @@ import ../worker_desc, ./blocks_staged/staged_queue, ./headers_staged/staged_queue, + ./update/metrics, "."/[blocks_unproc, db, headers_unproc, update] when enableTicker: @@ -155,11 +156,14 @@ proc startBuddy*(buddy: BeaconBuddyRef): bool = ctx = buddy.ctx peer = buddy.peer if peer.supports(protocol.eth) and peer.state(protocol.eth).initialized: - ctx.pool.nBuddies.inc # for metrics + ctx.pool.nBuddies.inc + ctx.updateMetrics() return true proc stopBuddy*(buddy: BeaconBuddyRef) = - buddy.ctx.pool.nBuddies.dec # for metrics + let ctx = buddy.ctx + ctx.pool.nBuddies.dec + ctx.updateMetrics(force=(ctx.pool.nBuddies == 0)) # ------------------------------------------------------------------------------ # End diff --git a/nimbus/sync/beacon/worker/update.nim b/nimbus/sync/beacon/worker/update.nim index 065da9148b..7dab39d423 100644 --- a/nimbus/sync/beacon/worker/update.nim +++ b/nimbus/sync/beacon/worker/update.nim @@ -344,6 +344,26 @@ proc updateFinalBlockHeader*( # Update, so it can be followed nicely ctx.updateMetrics() + +proc updateAsyncTasks*( + ctx: BeaconCtxRef; + ): Future[Opt[void]] {.async: (raises: []).} = + ## Allow task switch by issuing a short sleep request. The `due` argument + ## allows to maintain a minimum time gap when invoking this function. + let start = Moment.now() + if ctx.pool.nextAsyncNanoSleep < start: + + try: await sleepAsync asyncThreadSwitchTimeSlot + except CancelledError: discard + + if ctx.daemon: + ctx.pool.nextAsyncNanoSleep = Moment.now() + asyncThreadSwitchGap + return ok() + # Shutdown? + return err() + + return ok() + # ------------------------------------------------------------------------------ # End # ------------------------------------------------------------------------------ diff --git a/nimbus/sync/beacon/worker/update/metrics.nim b/nimbus/sync/beacon/worker/update/metrics.nim index 93d02dc9db..5adb860575 100644 --- a/nimbus/sync/beacon/worker/update/metrics.nim +++ b/nimbus/sync/beacon/worker/update/metrics.nim @@ -77,10 +77,10 @@ template updateMetricsImpl(ctx: BeaconCtxRef) = # --------------- -proc updateMetrics*(ctx: BeaconCtxRef) = +proc updateMetrics*(ctx: BeaconCtxRef; force = false) = let now = Moment.now() - if ctx.pool.nextUpdate < now: + if ctx.pool.nextMetricsUpdate < now or force: ctx.updateMetricsImpl() - ctx.pool.nextUpdate = now + metricsUpdateInterval + ctx.pool.nextMetricsUpdate = now + metricsUpdateInterval # End diff --git a/nimbus/sync/beacon/worker_config.nim b/nimbus/sync/beacon/worker_config.nim index ba75723004..0165d0f5fd 100644 --- a/nimbus/sync/beacon/worker_config.nim +++ b/nimbus/sync/beacon/worker_config.nim @@ -1,5 +1,5 @@ # Nimbus -# Copyright (c) 2021-2024 Status Research & Development GmbH +# Copyright (c) 2021-2025 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at # https://opensource.org/licenses/MIT). @@ -46,6 +46,11 @@ const asyncThreadSwitchTimeSlot* = chronos.nanoseconds(1) ## Nano-sleep to allows pseudo/async thread switch + asyncThreadSwitchGap* = chronos.milliseconds(300) + ## Controls nano-sleep tart switch density when using this in a loop (e.g. + ## for processing lists.) The constant requires a minimum time gap when + ## invoking a nano-sleep utility. + # ---------------------- nFetchHeadersRequest* = 1_024 diff --git a/nimbus/sync/beacon/worker_desc.nim b/nimbus/sync/beacon/worker_desc.nim index ff191ec014..0fe3166bdd 100644 --- a/nimbus/sync/beacon/worker_desc.nim +++ b/nimbus/sync/beacon/worker_desc.nim @@ -1,5 +1,5 @@ # Nimbus -# Copyright (c) 2021-2024 Status Research & Development GmbH +# Copyright (c) 2021-2025 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at # https://opensource.org/licenses/MIT). @@ -142,7 +142,8 @@ type syncState*: SyncState ## Save/resume state descriptor hdrSync*: HeaderImportSync ## Syncing by linked header chains blkSync*: BlocksImportSync ## For importing/executing blocks - nextUpdate*: Moment ## For updating metrics + nextMetricsUpdate*: Moment ## For updating metrics + nextAsyncNanoSleep*: Moment ## Use nano-sleeps for task switch # Blocks import/execution settings for importing with # `nBodiesBatch` blocks in each round (minimum value is