Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Quick and dirty PoC for syncing from Portal history network #2910

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions nimbus/config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -550,12 +550,19 @@ type
defaultValue: false
name: "debug-store-slot-hashes".}: bool

usePortal* {.
hidden
desc: "Use portal network instead of era files"
defaultValue: false
name: "debug-use-portal".}: bool

of `import-rlp`:
blocksFile* {.
argument
desc: "One or more RLP encoded block(s) files"
name: "blocks-file" }: seq[InputFile]


func parseCmdArg(T: type NetworkId, p: string): T
{.gcsafe, raises: [ValueError].} =
parseBiggestUInt(p).T
Expand Down
2 changes: 1 addition & 1 deletion nimbus/nimbus_execution_client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ proc run(nimbus: NimbusNode, conf: NimbusConf) =

case conf.cmd
of NimbusCmd.`import`:
importBlocks(conf, com)
importBlocksPortal(conf, com)
of NimbusCmd.`import-rlp`:
importRlpBlocks(conf, com)
else:
Expand Down
212 changes: 203 additions & 9 deletions nimbus/nimbus_import.nim
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,25 @@ import
chronicles,
metrics,
chronos/timer,
std/[strformat, strutils],
chronos,
std/[strformat, strutils, os],
stew/io2,
beacon_chain/era_db,
beacon_chain/networking/network_metadata,
./config,
./common/common,
./core/chain,
./db/era1_db,
./utils/era_helpers
./utils/era_helpers,
eth/common/keys, # rng
eth/net/nat, # setupAddress
eth/p2p/discoveryv5/protocol as discv5_protocol,
eth/p2p/discoveryv5/routing_table,
eth/p2p/discoveryv5/enr,
../fluffy/portal_node,
../fluffy/common/common_utils, # getPersistentNetKey, getPersistentEnr
../fluffy/network_metadata,
../fluffy/version

declareGauge nec_import_block_number, "Latest imported block number"

Expand Down Expand Up @@ -87,7 +97,164 @@ template boolFlag(flags, b): PersistBlockFlags =
else:
{}

proc importBlocks*(conf: NimbusConf, com: CommonRef) =
proc run(config: NimbusConf): PortalNode {.
raises: [CatchableError]
.} =
let rng = newRng()

## Network configuration
let
bindIp = config.listenAddress
udpPort = Port(config.udpPort)
# TODO: allow for no TCP port mapping!
(extIp, _, extUdpPort) =
try:
setupAddress(config.nat, config.listenAddress, udpPort, udpPort, "portal")
except CatchableError as exc:
raiseAssert exc.msg
# raise exc # TODO: Ideally we don't have the Exception here
except Exception as exc:
raiseAssert exc.msg
(netkey, newNetKey) =
# if config.netKey.isSome():
# (config.netKey.get(), true)
# else:
getPersistentNetKey(rng[], config.dataDir / "netkey")

enrFilePath = config.dataDir / "nimbus_portal_node.enr"
previousEnr =
if not newNetKey:
getPersistentEnr(enrFilePath)
else:
Opt.none(enr.Record)

var bootstrapRecords: seq[Record]
# loadBootstrapFile(string config.bootstrapNodesFile, bootstrapRecords)
# bootstrapRecords.add(config.bootstrapNodes)

# case config.network
# of PortalNetwork.none:
# discard # don't connect to any network bootstrap nodes
# of PortalNetwork.mainnet:
# for enrURI in mainnetBootstrapNodes:
# let res = enr.Record.fromURI(enrURI)
# if res.isOk():
# bootstrapRecords.add(res.value)
# of PortalNetwork.angelfood:
# for enrURI in angelfoodBootstrapNodes:
# let res = enr.Record.fromURI(enrURI)
# if res.isOk():
# bootstrapRecords.add(res.value)

# Only mainnet
for enrURI in mainnetBootstrapNodes:
let res = enr.Record.fromURI(enrURI)
if res.isOk():
bootstrapRecords.add(res.value)

## Discovery v5 protocol setup
let
discoveryConfig =
DiscoveryConfig.init(DefaultTableIpLimit, DefaultBucketIpLimit, DefaultBitsPerHop)
d = newProtocol(
netkey,
extIp,
Opt.none(Port),
extUdpPort,
# Note: The addition of default clientInfo to the ENR is a temporary
# measure to easily identify & debug the clients used in the testnet.
# Might make this into a, default off, cli option.
localEnrFields = {"c": enrClientInfoShort},
bootstrapRecords = bootstrapRecords,
previousRecord = previousEnr,
bindIp = bindIp,
bindPort = udpPort,
enrAutoUpdate = true,
config = discoveryConfig,
rng = rng,
)

d.open()

## Portal node setup
let
portalProtocolConfig = PortalProtocolConfig.init(
DefaultTableIpLimit, DefaultBucketIpLimit, DefaultBitsPerHop, defaultAlpha, RadiusConfig(kind: Static, logRadius: 249),
defaultDisablePoke, defaultMaxGossipNodes, defaultContentCacheSize,
defaultDisableContentCache, defaultMaxConcurrentOffers
)

portalNodeConfig = PortalNodeConfig(
accumulatorFile: Opt.none(string),
disableStateRootValidation: true,
trustedBlockRoot: Opt.none(Digest),
portalConfig: portalProtocolConfig,
dataDir: string config.dataDir,
storageCapacity: 0,
contentRequestRetries: 1
)

node = PortalNode.new(
PortalNetwork.mainnet,
portalNodeConfig,
d,
{PortalSubnetwork.history},
bootstrapRecords = bootstrapRecords,
rng = rng,
)

let enrFile = config.dataDir / "nimbus_portal_node.enr"
if io2.writeFile(enrFile, d.localNode.record.toURI()).isErr:
fatal "Failed to write the enr file", file = enrFile
quit 1

## Start the Portal node.
node.start()

node

proc getBlockLoop(node: PortalNode, blockQueue: AsyncQueue[seq[EthBlock]], startBlock: uint64): Future[void] {.async.} =
let historyNetwork = node.historyNetwork.value()
var blockNumber = startBlock

let blockNumberQueue = newAsyncQueue[(uint64, uint64)](2048)
var blocks: seq[EthBlock] = newSeq[EthBlock](8192)
var count = 0

proc blockWorker(node: PortalNode): Future[void] {.async.} =
while true:
let (blockNumber, i) = await blockNumberQueue.popFirst()
while true:
let (header, body) = (await historyNetwork.getBlock(blockNumber + i)).valueOr:
error "Failed to get block", blockNumber = blockNumber + i
# Note: loop will get stuck here if a block is not available
continue

blocks[i] = init(EthBlock, header, body)
count.inc()

break

var workers: seq[Future[void]] = @[]
for i in 0 ..< 512:
workers.add node.blockWorker()

while true:
blocks = newSeq[EthBlock](8192)
count = 0
info "Downloading 8192 blocks", startBlock = blockNumber
for i in 0..8191'u64:
await blockNumberQueue.addLast((blockNumber, i))

# Not great :/
while count != 8192:
await sleepAsync(10.milliseconds)
info "Finished downloading 8192 blocks", startBlock = blockNumber
await blockQueue.addLast(blocks)

blockNumber += 8192

proc importBlocks*(conf: NimbusConf, com: CommonRef, node: PortalNode, blockQueue: AsyncQueue[seq[EthBlock]]) {.async.} =
proc controlCHandler() {.noconv.} =
when defined(windows):
# workaround for https://github.com/nim-lang/Nim/issues/4057
Expand Down Expand Up @@ -119,7 +286,7 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) =
boolFlag(NoPersistBodies, not conf.storeBodies) +
boolFlag({PersistBlockFlag.NoPersistReceipts}, not conf.storeReceipts) +
boolFlag({PersistBlockFlag.NoPersistSlotHashes}, not conf.storeSlotHashes)
blk: Block
blk: blocks.Block
persister = Persister.init(chain, flags)
cstats: PersistStats # stats at start of chunk

Expand Down Expand Up @@ -293,11 +460,19 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) =

while running and persister.stats.blocks.uint64 < conf.maxBlocks and
blockNumber <= lastEra1Block:
if not loadEraBlock(blockNumber):
notice "No more `era1` blocks to import", blockNumber, slot
break
persistBlock()
checkpoint()
if not conf.usePortal:
if not loadEraBlock(blockNumber):
notice "No more `era1` blocks to import", blockNumber, slot
break
persistBlock()
checkpoint()
else:
let blockSeq = await blockQueue.popFirst()
for blck in blockSeq:
blk = blck
persistBlock()
checkpoint()
# debugEcho "blck:" & $blck.header.number

block era1Import:
if blockNumber > lastEra1Block:
Expand Down Expand Up @@ -366,3 +541,22 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) =
blocks = persister.stats.blocks,
txs = persister.stats.txs,
mgas = f(persister.stats.gas.float / 1000000)

proc importBlocksPortal*(conf: NimbusConf, com: CommonRef) {.
raises: [CatchableError]
.} =
let
portalNode = run(conf)
blockQueue = newAsyncQueue[seq[EthBlock]](4)
start = com.db.getSavedStateBlockNumber() + 1

if conf.usePortal:
asyncSpawn portalNode.getBlockLoop(blockQueue, start)

asyncSpawn importBlocks(conf, com, portalNode, blockQueue)

while running:
try:
poll()
except CatchableError as e:
warn "Exception in poll()", exc = e.name, err = e.msg
Loading