Skip to content

Commit

Permalink
Prevent the build up of parallel 'waitELToSync' operations
Browse files Browse the repository at this point in the history
  • Loading branch information
zah committed Mar 10, 2023
1 parent 57b2151 commit 89dcced
Showing 1 changed file with 21 additions and 31 deletions.
52 changes: 21 additions & 31 deletions beacon_chain/eth1/eth1_monitor.nim
Original file line number Diff line number Diff line change
Expand Up @@ -976,6 +976,7 @@ proc waitELToSyncDeposits(connection: ELConnection,
url = connection.engineUrl.url,
blk = minimalRequiredBlock,
err = err.msg
inc attempt
await sleepAsync(seconds(30))
rpcClient = await connection.connectedRpcClient()

Expand All @@ -998,39 +999,21 @@ proc selectConnectionForChainSyncing(m: ELManager): Future[ELConnection] {.async
else:
FutureBase connectedRpcClient(it))

let firstConnected = await firstCompletedFuture(connectionsFuts)

# TODO: Ideally, the cancellation will be handled automatically
# by a helper like `firstCompletedFuture`
let firstConnected = try:
await firstCompletedFuture(connectionsFuts)
except CancelledError as err:
for future in connectionsFuts:
future.cancel()
raise err

for future in connectionsFuts:
if future != firstConnected:
future.cancel()

return m.elConnections[find(connectionsFuts, firstConnected)]

proc getBlobsBundleFromASyncedEL(
m: ELManager,
payloadId: bellatrix.PayloadID): Future[BlobsBundleV1] {.async.} =
let
connection = await m.selectConnectionForChainSyncing()
rpcClient = await connection.connectedRpcClient()

return connection.trackedRequestWithTimeout(
"getBlobsBundle",
rpcClient.engine_getBlobsBundleV1(FixedBytes[8] payloadId),
GETBLOBS_TIMEOUT)

proc getBlobsBundleV1*(
m: ELManager, payloadId: bellatrix.PayloadID):
Future[Opt[BlobsBundleV1]] {.async.} =
if m.elConnections.len == 0:
return Opt.none BlobsBundleV1

result = try:
Opt.some(await m.getBlobsBundleFromASyncedEL(payload_id))
except CatchableError:
Opt.none BlobsBundleV1

proc sendNewPayloadToSingleEL(connection: ELConnection,
payload: engine_api.ExecutionPayloadV1):
Future[PayloadStatusV1] {.async.} =
Expand Down Expand Up @@ -2087,20 +2070,27 @@ proc startChainSyncingLoop(m: ELManager) {.async.} =
info "Starting execution layer deposits syncing",
contract = $m.depositContractAddress

var syncedConnectionFut = m.selectConnectionForChainSyncing()
info "Connection attempt started"

while true:
let connection = awaitWithTimeout(
m.selectConnectionForChainSyncing(),
chronos.seconds(60)):
try:
await syncedConnectionFut or sleepAsync(60.seconds)
if not syncedConnectionFut.finished:
warn "No suitable EL connection for deposit syncing"
await sleepAsync(chronos.seconds(30))
continue

try:
await syncEth1Chain(m, connection)
await syncEth1Chain(m, syncedConnectionFut.read)
except CatchableError as err:
await sleepAsync(10.seconds)

# A more detailed error is already logged by trackEngineApiRequest
debug "Restarting the deposit syncing loop"
await sleepAsync(5.seconds)

# To be extra safe, we will make a fresh connection attempt
await syncedConnectionFut.cancelAndWait()
syncedConnectionFut = m.selectConnectionForChainSyncing()

proc start*(m: ELManager) {.gcsafe.} =
if m.elConnections.len == 0:
Expand Down

0 comments on commit 89dcced

Please sign in to comment.