diff --git a/nimbus/config.nim b/nimbus/config.nim index a8548574a..a9fb09298 100644 --- a/nimbus/config.nim +++ b/nimbus/config.nim @@ -550,12 +550,19 @@ type defaultValue: false name: "debug-store-slot-hashes".}: bool + usePortal* {. + hidden + desc: "Use portal network instead of era files" + defaultValue: false + name: "debug-use-portal".}: bool + of `import-rlp`: blocksFile* {. argument desc: "One or more RLP encoded block(s) files" name: "blocks-file" }: seq[InputFile] + func parseCmdArg(T: type NetworkId, p: string): T {.gcsafe, raises: [ValueError].} = parseBiggestUInt(p).T diff --git a/nimbus/nimbus_execution_client.nim b/nimbus/nimbus_execution_client.nim index 5a9ab9443..e1f94d08f 100644 --- a/nimbus/nimbus_execution_client.nim +++ b/nimbus/nimbus_execution_client.nim @@ -257,7 +257,7 @@ proc run(nimbus: NimbusNode, conf: NimbusConf) = case conf.cmd of NimbusCmd.`import`: - importBlocks(conf, com) + importBlocksPortal(conf, com) of NimbusCmd.`import-rlp`: importRlpBlocks(conf, com) else: diff --git a/nimbus/nimbus_import.nim b/nimbus/nimbus_import.nim index 4de9fa377..54ddee058 100644 --- a/nimbus/nimbus_import.nim +++ b/nimbus/nimbus_import.nim @@ -13,7 +13,8 @@ import chronicles, metrics, chronos/timer, - std/[strformat, strutils], + chronos, + std/[strformat, strutils, os], stew/io2, beacon_chain/era_db, beacon_chain/networking/network_metadata, @@ -21,7 +22,16 @@ import ./common/common, ./core/chain, ./db/era1_db, - ./utils/era_helpers + ./utils/era_helpers, + eth/common/keys, # rng + eth/net/nat, # setupAddress + eth/p2p/discoveryv5/protocol as discv5_protocol, + eth/p2p/discoveryv5/routing_table, + eth/p2p/discoveryv5/enr, + ../fluffy/portal_node, + ../fluffy/common/common_utils, # getPersistentNetKey, getPersistentEnr + ../fluffy/network_metadata, + ../fluffy/version declareGauge nec_import_block_number, "Latest imported block number" @@ -87,7 +97,164 @@ template boolFlag(flags, b): PersistBlockFlags = else: {} -proc importBlocks*(conf: NimbusConf, com: CommonRef) = +proc run(config: NimbusConf): PortalNode {. + raises: [CatchableError] +.} = + let rng = newRng() + + ## Network configuration + let + bindIp = config.listenAddress + udpPort = Port(config.udpPort) + # TODO: allow for no TCP port mapping! + (extIp, _, extUdpPort) = + try: + setupAddress(config.nat, config.listenAddress, udpPort, udpPort, "portal") + except CatchableError as exc: + raiseAssert exc.msg + # raise exc # TODO: Ideally we don't have the Exception here + except Exception as exc: + raiseAssert exc.msg + (netkey, newNetKey) = + # if config.netKey.isSome(): + # (config.netKey.get(), true) + # else: + getPersistentNetKey(rng[], config.dataDir / "netkey") + + enrFilePath = config.dataDir / "nimbus_portal_node.enr" + previousEnr = + if not newNetKey: + getPersistentEnr(enrFilePath) + else: + Opt.none(enr.Record) + + var bootstrapRecords: seq[Record] + # loadBootstrapFile(string config.bootstrapNodesFile, bootstrapRecords) + # bootstrapRecords.add(config.bootstrapNodes) + + # case config.network + # of PortalNetwork.none: + # discard # don't connect to any network bootstrap nodes + # of PortalNetwork.mainnet: + # for enrURI in mainnetBootstrapNodes: + # let res = enr.Record.fromURI(enrURI) + # if res.isOk(): + # bootstrapRecords.add(res.value) + # of PortalNetwork.angelfood: + # for enrURI in angelfoodBootstrapNodes: + # let res = enr.Record.fromURI(enrURI) + # if res.isOk(): + # bootstrapRecords.add(res.value) + + # Only mainnet + for enrURI in mainnetBootstrapNodes: + let res = enr.Record.fromURI(enrURI) + if res.isOk(): + bootstrapRecords.add(res.value) + + ## Discovery v5 protocol setup + let + discoveryConfig = + DiscoveryConfig.init(DefaultTableIpLimit, DefaultBucketIpLimit, DefaultBitsPerHop) + d = newProtocol( + netkey, + extIp, + Opt.none(Port), + extUdpPort, + # Note: The addition of default clientInfo to the ENR is a temporary + # measure to easily identify & debug the clients used in the testnet. + # Might make this into a, default off, cli option. + localEnrFields = {"c": enrClientInfoShort}, + bootstrapRecords = bootstrapRecords, + previousRecord = previousEnr, + bindIp = bindIp, + bindPort = udpPort, + enrAutoUpdate = true, + config = discoveryConfig, + rng = rng, + ) + + d.open() + + ## Portal node setup + let + portalProtocolConfig = PortalProtocolConfig.init( + DefaultTableIpLimit, DefaultBucketIpLimit, DefaultBitsPerHop, defaultAlpha, RadiusConfig(kind: Static, logRadius: 249), + defaultDisablePoke, defaultMaxGossipNodes, defaultContentCacheSize, + defaultDisableContentCache, defaultMaxConcurrentOffers + ) + + portalNodeConfig = PortalNodeConfig( + accumulatorFile: Opt.none(string), + disableStateRootValidation: true, + trustedBlockRoot: Opt.none(Digest), + portalConfig: portalProtocolConfig, + dataDir: string config.dataDir, + storageCapacity: 0, + contentRequestRetries: 1 + ) + + node = PortalNode.new( + PortalNetwork.mainnet, + portalNodeConfig, + d, + {PortalSubnetwork.history}, + bootstrapRecords = bootstrapRecords, + rng = rng, + ) + + let enrFile = config.dataDir / "nimbus_portal_node.enr" + if io2.writeFile(enrFile, d.localNode.record.toURI()).isErr: + fatal "Failed to write the enr file", file = enrFile + quit 1 + + ## Start the Portal node. + node.start() + + node + +proc getBlockLoop(node: PortalNode, blockQueue: AsyncQueue[seq[EthBlock]], startBlock: uint64): Future[void] {.async.} = + let historyNetwork = node.historyNetwork.value() + var blockNumber = startBlock + + let blockNumberQueue = newAsyncQueue[(uint64, uint64)](2048) + var blocks: seq[EthBlock] = newSeq[EthBlock](8192) + var count = 0 + + proc blockWorker(node: PortalNode): Future[void] {.async.} = + while true: + let (blockNumber, i) = await blockNumberQueue.popFirst() + while true: + let (header, body) = (await historyNetwork.getBlock(blockNumber + i)).valueOr: + error "Failed to get block", blockNumber = blockNumber + i + # Note: loop will get stuck here if a block is not available + continue + + blocks[i] = init(EthBlock, header, body) + count.inc() + + break + + var workers: seq[Future[void]] = @[] + for i in 0 ..< 512: + workers.add node.blockWorker() + + while true: + blocks = newSeq[EthBlock](8192) + count = 0 + info "Downloading 8192 blocks", startBlock = blockNumber + for i in 0..8191'u64: + await blockNumberQueue.addLast((blockNumber, i)) + + # Not great :/ + while count != 8192: + await sleepAsync(10.milliseconds) + info "Finished downloading 8192 blocks", startBlock = blockNumber + await blockQueue.addLast(blocks) + + blockNumber += 8192 + +proc importBlocks*(conf: NimbusConf, com: CommonRef, node: PortalNode, blockQueue: AsyncQueue[seq[EthBlock]]) {.async.} = proc controlCHandler() {.noconv.} = when defined(windows): # workaround for https://github.com/nim-lang/Nim/issues/4057 @@ -119,7 +286,7 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) = boolFlag(NoPersistBodies, not conf.storeBodies) + boolFlag({PersistBlockFlag.NoPersistReceipts}, not conf.storeReceipts) + boolFlag({PersistBlockFlag.NoPersistSlotHashes}, not conf.storeSlotHashes) - blk: Block + blk: blocks.Block persister = Persister.init(chain, flags) cstats: PersistStats # stats at start of chunk @@ -293,11 +460,19 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) = while running and persister.stats.blocks.uint64 < conf.maxBlocks and blockNumber <= lastEra1Block: - if not loadEraBlock(blockNumber): - notice "No more `era1` blocks to import", blockNumber, slot - break - persistBlock() - checkpoint() + if not conf.usePortal: + if not loadEraBlock(blockNumber): + notice "No more `era1` blocks to import", blockNumber, slot + break + persistBlock() + checkpoint() + else: + let blockSeq = await blockQueue.popFirst() + for blck in blockSeq: + blk = blck + persistBlock() + checkpoint() + # debugEcho "blck:" & $blck.header.number block era1Import: if blockNumber > lastEra1Block: @@ -366,3 +541,22 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) = blocks = persister.stats.blocks, txs = persister.stats.txs, mgas = f(persister.stats.gas.float / 1000000) + +proc importBlocksPortal*(conf: NimbusConf, com: CommonRef) {. + raises: [CatchableError] +.} = + let + portalNode = run(conf) + blockQueue = newAsyncQueue[seq[EthBlock]](4) + start = com.db.getSavedStateBlockNumber() + 1 + + if conf.usePortal: + asyncSpawn portalNode.getBlockLoop(blockQueue, start) + + asyncSpawn importBlocks(conf, com, portalNode, blockQueue) + + while running: + try: + poll() + except CatchableError as e: + warn "Exception in poll()", exc = e.name, err = e.msg