Skip to content

Commit

Permalink
[Config Change] rpc is for execution-client. can have multiple
Browse files Browse the repository at this point in the history
Multiple rpc-URLs are just for multiple execution client.
Every validation is only run on one validation client per wasmModuleRoot.
Fa
  • Loading branch information
tsahee committed Apr 30, 2024
1 parent 07c0e29 commit 7328f6a
Show file tree
Hide file tree
Showing 11 changed files with 206 additions and 171 deletions.
9 changes: 4 additions & 5 deletions arbnode/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package arbnode

import (
"context"
"errors"
"fmt"
"time"

Expand Down Expand Up @@ -40,11 +39,11 @@ func (a *BlockValidatorDebugAPI) ValidateMessageNumber(
if moduleRootOptional != nil {
moduleRoot = *moduleRootOptional
} else {
moduleRoots := a.val.GetModuleRootsToValidate()
if len(moduleRoots) == 0 {
return result, errors.New("no current WasmModuleRoot configured, must provide parameter")
var err error
moduleRoot, err = a.val.GetLatestWasmModuleRoot(ctx)
if err != nil {
return result, fmt.Errorf("no latest WasmModuleRoot configured, must provide parameter: %w", err)
}
moduleRoot = moduleRoots[0]
}
start_time := time.Now()
valid, gs, err := a.val.ValidateResult(ctx, arbutil.MessageIndex(msgNum), full, moduleRoot)
Expand Down
6 changes: 3 additions & 3 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func ConfigDefaultL1NonSequencerTest() *Config {
config.SyncMonitor = TestSyncMonitorConfig
config.Staker = staker.TestL1ValidatorConfig
config.Staker.Enable = false
config.BlockValidator.ValidationServerConfigs = []rpcclient.ClientConfig{{URL: ""}}
config.BlockValidator.ExecutionServerConfigs = []rpcclient.ClientConfig{{URL: ""}}

return &config
}
Expand All @@ -217,7 +217,7 @@ func ConfigDefaultL2Test() *Config {
config.Staker = staker.TestL1ValidatorConfig
config.SyncMonitor = TestSyncMonitorConfig
config.Staker.Enable = false
config.BlockValidator.ValidationServerConfigs = []rpcclient.ClientConfig{{URL: ""}}
config.BlockValidator.ExecutionServerConfigs = []rpcclient.ClientConfig{{URL: ""}}
config.TransactionStreamer = DefaultTransactionStreamerConfig

return &config
Expand Down Expand Up @@ -540,7 +540,7 @@ func createNodeImpl(
txStreamer.SetInboxReaders(inboxReader, delayedBridge)

var statelessBlockValidator *staker.StatelessBlockValidator
if config.BlockValidator.RedisValidationClientConfig.Enabled() || config.BlockValidator.ValidationServerConfigs[0].URL != "" {
if config.BlockValidator.RedisValidationClientConfig.Enabled() || config.BlockValidator.ExecutionServerConfigs[0].URL != "" {
statelessBlockValidator, err = staker.NewStatelessBlockValidator(
inboxReader,
inboxTracker,
Expand Down
2 changes: 1 addition & 1 deletion cmd/nitro/nitro.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ func mainImpl() int {
}

var sameProcessValidationNodeEnabled bool
if nodeConfig.Node.BlockValidator.Enable && (nodeConfig.Node.BlockValidator.ValidationServerConfigs[0].URL == "self" || nodeConfig.Node.BlockValidator.ValidationServerConfigs[0].URL == "self-auth") {
if nodeConfig.Node.BlockValidator.Enable && (nodeConfig.Node.BlockValidator.ExecutionServerConfigs[0].URL == "self" || nodeConfig.Node.BlockValidator.ExecutionServerConfigs[0].URL == "self-auth") {
sameProcessValidationNodeEnabled = true
valnode.EnsureValidationExposedViaAuthRPC(&stackConf)
}
Expand Down
151 changes: 92 additions & 59 deletions staker/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"encoding/json"
"errors"
"fmt"
"regexp"
"runtime"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -74,6 +75,13 @@ type BlockValidator struct {
sendRecordChan chan struct{}
progressValidationsChan chan struct{}

chosenValidator map[common.Hash]validator.ValidationSpawner

// wasmModuleRoot
moduleMutex sync.Mutex
currentWasmModuleRoot common.Hash
pendingWasmModuleRoot common.Hash

// for testing only
testingProgressMadeChan chan struct{}

Expand All @@ -84,10 +92,9 @@ type BlockValidator struct {

type BlockValidatorConfig struct {
Enable bool `koanf:"enable"`
ValidationServer rpcclient.ClientConfig `koanf:"validation-server" reload:"hot"`
RedisValidationClientConfig redis.ValidationClientConfig `koanf:"redis-validation-client-config"`
ValidationServerConfigs []rpcclient.ClientConfig `koanf:"validation-server-configs" reload:"hot"`
ExecutionServerConfig rpcclient.ClientConfig `koanf:"execution-server-config" reload:"hot"`
ExecutionServer rpcclient.ClientConfig `koanf:"execution-server" reload:"hot"`
ExecutionServerConfigs []rpcclient.ClientConfig `koanf:"execution-server-configs"`
ValidationPoll time.Duration `koanf:"validation-poll" reload:"hot"`
PrerecordedBlocks uint64 `koanf:"prerecorded-blocks" reload:"hot"`
ForwardBlocks uint64 `koanf:"forward-blocks" reload:"hot"`
Expand All @@ -96,7 +103,7 @@ type BlockValidatorConfig struct {
FailureIsFatal bool `koanf:"failure-is-fatal" reload:"hot"`
Dangerous BlockValidatorDangerousConfig `koanf:"dangerous"`
MemoryFreeLimit string `koanf:"memory-free-limit" reload:"hot"`
ValidationServerConfigsList string `koanf:"validation-server-configs-list" reload:"hot"`
ExecutionServerConfigsList string `koanf:"execution-server-configs-list"`

memoryFreeLimit int
}
Expand All @@ -112,27 +119,21 @@ func (c *BlockValidatorConfig) Validate() error {
c.memoryFreeLimit = limit
}
streamsEnabled := c.RedisValidationClientConfig.Enabled()
if c.ValidationServerConfigs == nil {
c.ValidationServerConfigs = []rpcclient.ClientConfig{c.ValidationServer}
if c.ValidationServerConfigsList != "default" {
var validationServersConfigs []rpcclient.ClientConfig
if err := json.Unmarshal([]byte(c.ValidationServerConfigsList), &validationServersConfigs); err != nil && !streamsEnabled {
if c.ExecutionServerConfigs == nil {
c.ExecutionServerConfigs = []rpcclient.ClientConfig{c.ExecutionServer}
if c.ExecutionServerConfigsList != "default" {
var executionServersConfigs []rpcclient.ClientConfig
if err := json.Unmarshal([]byte(c.ExecutionServerConfigsList), &executionServersConfigs); err != nil && !streamsEnabled {
return fmt.Errorf("failed to parse block-validator validation-server-configs-list string: %w", err)
}
c.ValidationServerConfigs = validationServersConfigs
c.ExecutionServerConfigs = executionServersConfigs
}
}
if len(c.ValidationServerConfigs) == 0 && !streamsEnabled {
return fmt.Errorf("block-validator validation-server-configs is empty, need at least one validation server config")
}
for _, serverConfig := range c.ValidationServerConfigs {
if err := serverConfig.Validate(); err != nil {
return fmt.Errorf("failed to validate one of the block-validator validation-server-configs. url: %s, err: %w", serverConfig.URL, err)
for i := range c.ExecutionServerConfigs {
if err := c.ExecutionServerConfigs[i].Validate(); err != nil {
return fmt.Errorf("failed to validate one of the block-validator execution-server-configs. url: %s, err: %w", c.ExecutionServerConfigs[i].URL, err)
}
}
if err := c.ExecutionServerConfig.Validate(); err != nil {
return fmt.Errorf("validating execution server config: %w", err)
}
return nil
}

Expand All @@ -144,10 +145,9 @@ type BlockValidatorConfigFetcher func() *BlockValidatorConfig

func BlockValidatorConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Bool(prefix+".enable", DefaultBlockValidatorConfig.Enable, "enable block-by-block validation")
rpcclient.RPCClientAddOptions(prefix+".validation-server", f, &DefaultBlockValidatorConfig.ValidationServer)
rpcclient.RPCClientAddOptions(prefix+".execution-server-config", f, &DefaultBlockValidatorConfig.ExecutionServerConfig)
rpcclient.RPCClientAddOptions(prefix+".execution-server", f, &DefaultBlockValidatorConfig.ExecutionServer)
redis.ValidationClientConfigAddOptions(prefix+".redis-validation-client-config", f)
f.String(prefix+".validation-server-configs-list", DefaultBlockValidatorConfig.ValidationServerConfigsList, "array of validation rpc configs given as a json string. time duration should be supplied in number indicating nanoseconds")
f.String(prefix+".execution-server-configs-list", DefaultBlockValidatorConfig.ExecutionServerConfigsList, "array of execution rpc configs given as a json string. time duration should be supplied in number indicating nanoseconds")
f.Duration(prefix+".validation-poll", DefaultBlockValidatorConfig.ValidationPoll, "poll time to check validations")
f.Uint64(prefix+".forward-blocks", DefaultBlockValidatorConfig.ForwardBlocks, "prepare entries for up to that many blocks ahead of validation (small footprint)")
f.Uint64(prefix+".prerecorded-blocks", DefaultBlockValidatorConfig.PrerecordedBlocks, "record that many blocks ahead of validation (larger footprint)")
Expand All @@ -164,9 +164,8 @@ func BlockValidatorDangerousConfigAddOptions(prefix string, f *pflag.FlagSet) {

var DefaultBlockValidatorConfig = BlockValidatorConfig{
Enable: false,
ValidationServerConfigsList: "default",
ValidationServer: rpcclient.DefaultClientConfig,
ExecutionServerConfig: rpcclient.DefaultClientConfig,
ExecutionServerConfigsList: "default",
ExecutionServer: rpcclient.DefaultClientConfig,
RedisValidationClientConfig: redis.DefaultValidationClientConfig,
ValidationPoll: time.Second,
ForwardBlocks: 1024,
Expand All @@ -180,10 +179,9 @@ var DefaultBlockValidatorConfig = BlockValidatorConfig{

var TestBlockValidatorConfig = BlockValidatorConfig{
Enable: false,
ValidationServer: rpcclient.TestClientConfig,
ValidationServerConfigs: []rpcclient.ClientConfig{rpcclient.TestClientConfig},
ExecutionServer: rpcclient.TestClientConfig,
ExecutionServerConfigs: []rpcclient.ClientConfig{rpcclient.TestClientConfig},
RedisValidationClientConfig: redis.TestValidationClientConfig,
ExecutionServerConfig: rpcclient.TestClientConfig,
ValidationPoll: 100 * time.Millisecond,
ForwardBlocks: 128,
PrerecordedBlocks: uint64(2 * runtime.NumCPU()),
Expand Down Expand Up @@ -332,6 +330,17 @@ func nonBlockingTrigger(channel chan struct{}) {
}
}

func (v *BlockValidator) GetModuleRootsToValidate() []common.Hash {
v.moduleMutex.Lock()
defer v.moduleMutex.Unlock()

validatingModuleRoots := []common.Hash{v.currentWasmModuleRoot}
if (v.currentWasmModuleRoot != v.pendingWasmModuleRoot && v.pendingWasmModuleRoot != common.Hash{}) {
validatingModuleRoots = append(validatingModuleRoots, v.pendingWasmModuleRoot)
}
return validatingModuleRoots
}

// called from NewBlockValidator, doesn't need to catch locks
func ReadLastValidatedInfo(db ethdb.Database) (*GlobalStateValidatedInfo, error) {
exists, err := db.Has(lastGlobalStateValidatedInfoKey)
Expand Down Expand Up @@ -460,8 +469,13 @@ func (v *BlockValidator) writeToFile(validationEntry *validationEntry, moduleRoo
if err != nil {
return err
}
_, err = v.execSpawner.WriteToFile(input, validationEntry.End, moduleRoot).Await(v.GetContext())
return err
for _, spawner := range v.execSpawners {
if validator.SpawnerSupportsModule(spawner, moduleRoot) {
_, err = spawner.WriteToFile(input, validationEntry.End, moduleRoot).Await(v.GetContext())
return err
}
}
return errors.New("did not find exec spawner for wasmModuleRoot")
}

func (v *BlockValidator) SetCurrentWasmModuleRoot(hash common.Hash) error {
Expand Down Expand Up @@ -704,14 +718,6 @@ func (v *BlockValidator) advanceValidations(ctx context.Context) (*arbutil.Messa
defer v.reorgMutex.RUnlock()

wasmRoots := v.GetModuleRootsToValidate()
rooms := make([]int, len(v.validationSpawners))
currentSpawnerIndex := 0
for i, spawner := range v.validationSpawners {
here := spawner.Room() / len(wasmRoots)
if here > 0 {
rooms[i] = here
}
}
pos := v.validated() - 1 // to reverse the first +1 in the loop
validationsLoop:
for {
Expand Down Expand Up @@ -780,15 +786,15 @@ validationsLoop:
log.Trace("result validated", "count", v.validated(), "blockHash", v.lastValidGS.BlockHash)
continue
}
for currentSpawnerIndex < len(rooms) {
if rooms[currentSpawnerIndex] > 0 {
break
for _, moduleRoot := range wasmRoots {
if v.chosenValidator[moduleRoot] == nil {
v.possiblyFatal(fmt.Errorf("did not find spawner for moduleRoot :%v", moduleRoot))
continue
}
if v.chosenValidator[moduleRoot].Room() == 0 {
log.Trace("advanceValidations: no more room", "moduleRoot", moduleRoot)
return nil, nil
}
currentSpawnerIndex++
}
if currentSpawnerIndex == len(rooms) {
log.Trace("advanceValidations: no more room", "pos", pos)
return nil, nil
}
if v.isMemoryLimitExceeded() {
log.Warn("advanceValidations: aborting due to running low on memory")
Expand All @@ -808,8 +814,8 @@ validationsLoop:
defer validatorPendingValidationsGauge.Dec(1)
var runs []validator.ValidationRun
for _, moduleRoot := range wasmRoots {
run := v.validationSpawners[currentSpawnerIndex].Launch(input, moduleRoot)
log.Trace("advanceValidations: launched", "pos", validationStatus.Entry.Pos, "moduleRoot", moduleRoot, "spawner", currentSpawnerIndex)
run := v.chosenValidator[moduleRoot].Launch(input, moduleRoot)
log.Trace("advanceValidations: launched", "pos", validationStatus.Entry.Pos, "moduleRoot", moduleRoot)
runs = append(runs, run)
}
validationCtx, cancel := context.WithCancel(ctx)
Expand All @@ -832,10 +838,6 @@ validationsLoop:
}
nonBlockingTrigger(v.progressValidationsChan)
})
rooms[currentSpawnerIndex]--
if rooms[currentSpawnerIndex] == 0 {
currentSpawnerIndex++
}
}
}
}
Expand Down Expand Up @@ -1045,10 +1047,7 @@ func (v *BlockValidator) Initialize(ctx context.Context) error {
currentModuleRoot := config.CurrentModuleRoot
switch currentModuleRoot {
case "latest":
if v.execSpawner == nil {
return fmt.Errorf(`execution spawner is nil while current module root is "latest"`)
}
latest, err := v.execSpawner.LatestWasmModuleRoot().Await(ctx)
latest, err := v.GetLatestWasmModuleRoot(ctx)
if err != nil {
return err
}
Expand All @@ -1063,13 +1062,47 @@ func (v *BlockValidator) Initialize(ctx context.Context) error {
return errors.New("current-module-root config value illegal")
}
}
pendingModuleRoot := config.PendingUpgradeModuleRoot
if pendingModuleRoot != "" {
if pendingModuleRoot == "latest" {
latest, err := v.GetLatestWasmModuleRoot(ctx)
if err != nil {
return err
}
v.pendingWasmModuleRoot = latest
} else {
valid, _ := regexp.MatchString("(0x)?[0-9a-fA-F]{64}", pendingModuleRoot)
v.pendingWasmModuleRoot = common.HexToHash(pendingModuleRoot)
if (!valid || v.pendingWasmModuleRoot == common.Hash{}) {
return errors.New("pending-upgrade-module-root config value illegal")
}
}
}
log.Info("BlockValidator initialized", "current", v.currentWasmModuleRoot, "pending", v.pendingWasmModuleRoot)
moduleRoots := []common.Hash{v.currentWasmModuleRoot}
if v.pendingWasmModuleRoot != v.currentWasmModuleRoot {
if v.pendingWasmModuleRoot != v.currentWasmModuleRoot && v.pendingWasmModuleRoot != (common.Hash{}) {
moduleRoots = append(moduleRoots, v.pendingWasmModuleRoot)
}
if err := v.StatelessBlockValidator.Initialize(moduleRoots); err != nil {
return fmt.Errorf("initializing block validator with module roots: %w", err)
// First spawner is always RedisValidationClient if RedisStreams are enabled.
if v.redisValidator != nil {
err := v.redisValidator.Initialize(moduleRoots)
if err != nil {
return err
}
}
v.chosenValidator = make(map[common.Hash]validator.ValidationSpawner)
for _, root := range moduleRoots {
if v.redisValidator != nil && validator.SpawnerSupportsModule(v.redisValidator, root) {
v.chosenValidator[root] = v.redisValidator
}
if v.chosenValidator[root] == nil {
for _, spawner := range v.execSpawners {
if validator.SpawnerSupportsModule(spawner, root) {
v.chosenValidator[root] = spawner
break
}
}
}
}
return nil
}
Expand Down
15 changes: 12 additions & 3 deletions staker/challenge_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,9 +478,18 @@ func (m *ChallengeManager) createExecutionBackend(ctx context.Context, step uint
}
}
input.BatchInfo = prunedBatches
execRun, err := m.validator.execSpawner.CreateExecutionRun(m.wasmModuleRoot, input).Await(ctx)
if err != nil {
return fmt.Errorf("error creating execution backend for msg %v: %w", initialCount, err)
var execRun validator.ExecutionRun
for _, spawner := range m.validator.execSpawners {
if validator.SpawnerSupportsModule(spawner, m.wasmModuleRoot) {
execRun, err = spawner.CreateExecutionRun(m.wasmModuleRoot, input).Await(ctx)
if err != nil {
return fmt.Errorf("error creating execution backend for msg %v: %w", initialCount, err)
}
break
}
}
if execRun == nil {
return fmt.Errorf("did not find valid execution backend")
}
backend, err := NewExecutionChallengeBackend(execRun)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions staker/challenge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ func runChallengeTest(

for i := 0; i < 100; i++ {
if testTimeout {
backend.Commit()
err = backend.AdjustTime(time.Second * 40)
}
Require(t, err)
Expand Down
Loading

0 comments on commit 7328f6a

Please sign in to comment.