Skip to content

Commit

Permalink
add multierror and fail-fast flag
Browse files Browse the repository at this point in the history
  • Loading branch information
alecps committed Jan 17, 2025
1 parent acb0d45 commit 8f6de48
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 96 deletions.
4 changes: 2 additions & 2 deletions op-chain-ops/cmd/celo-migrate/ancients.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,11 @@ func readAncientBlocks(ctx context.Context, freezer *rawdb.Freezer, startBlock,

blockRange, err := loadAncientRange(freezer, start, count)
if err != nil {
return fmt.Errorf("failed to load ancient block range: %w", err)
return fmt.Errorf("Failed to load ancient block range: %w. This is likely due to a corrupted source directory. Please delete the target directory and repeat the migration with an uncorrupted source directory", err)
}

if _, err = blockRange.CheckContinuity(nil, count); err != nil {
return err
return fmt.Errorf("Failed continuity check for ancient block range: %w. This is likely due to a corrupted source directory. Please delete the target directory and repeat the migration with an uncorrupted source directory", err)
}

if start > 0 {
Expand Down
4 changes: 2 additions & 2 deletions op-chain-ops/cmd/celo-migrate/continuity.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (r *RLPBlockRange) CheckContinuity(prevElement *RLPBlockElement, expectedLe
return nil, err
}
if currElement.Header().Number.Uint64() != r.start+uint64(i) {
return nil, fmt.Errorf("decoded header number mismatch: expected %d, actual %d", r.start+uint64(i), currElement.Header().Number.Uint64())
return nil, fmt.Errorf("decoded header number mismatch indicating a gap in block numbers: expected %d, actual %d", r.start+uint64(i), currElement.Header().Number.Uint64())
}
if prevElement != nil {
log.Debug("Checking continuity", "block", currElement.Header().Number.Uint64(), "prev", prevElement.Header().Number.Uint64())
Expand Down Expand Up @@ -137,7 +137,7 @@ func (r *RLPBlockRange) Transform() error {
// and if the parent hash of the current block matches the hash of the previous block.
func (e *RLPBlockElement) Follows(prev *RLPBlockElement) (err error) {
if e.Header().Number.Uint64() != prev.Header().Number.Uint64()+1 {
err = errors.Join(err, fmt.Errorf("header number mismatch: expected %d, actual %d", prev.Header().Number.Uint64()+1, e.Header().Number.Uint64()))
err = errors.Join(err, fmt.Errorf("header number mismatch indicating a gap in block numbers: expected %d, actual %d", prev.Header().Number.Uint64()+1, e.Header().Number.Uint64()))
}
// We compare the parent hash with the stored hash of the previous block because
// at this point the header object will not calculate the correct hash since it
Expand Down
4 changes: 2 additions & 2 deletions op-chain-ops/cmd/celo-migrate/continuity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func TestCheckContinuity(t *testing.T) {
},
prevElement: &RLPBlockElement{decodedHeader: decodedHeaders[0], hash: []byte("hash0")},
expectedLength: 3,
expectErrorMsg: "decoded header number mismatch: expected 2, actual 1",
expectErrorMsg: "decoded header number mismatch indicating a gap in block numbers: expected 2, actual 1",
},
{
name: "Header number mismatch from prevElement number",
Expand All @@ -174,7 +174,7 @@ func TestCheckContinuity(t *testing.T) {
},
prevElement: &RLPBlockElement{decodedHeader: decodedHeaders[1], hash: []byte("hash1")},
expectedLength: 3,
expectErrorMsg: "header number mismatch: expected 2, actual 1\nparent hash mismatch between blocks 1 and 1",
expectErrorMsg: "header number mismatch indicating a gap in block numbers: expected 2, actual 1\nparent hash mismatch between blocks 1 and 1",
},
// Parent hash mismatch tests
{
Expand Down
137 changes: 54 additions & 83 deletions op-chain-ops/cmd/celo-migrate/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os/exec"
"path/filepath"
"runtime/debug"
"sync"
"time"

"log/slog"
Expand All @@ -18,6 +19,7 @@ import (
"github.com/ethereum-optimism/optimism/op-service/ioutil"
"github.com/ethereum-optimism/optimism/op-service/jsonutil"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/hashicorp/go-multierror"
"github.com/mattn/go-isatty"

"github.com/urfave/cli/v2"
Expand Down Expand Up @@ -107,6 +109,11 @@ var (
Usage: "Path to the db to perform a continuity check on",
Required: true,
}
dbCheckFailFastFlag = &cli.BoolFlag{
Name: "fail-fast",
Usage: "Fail fast on the first error encountered. If set, the db check will stop on the first error encountered, otherwise it will continue to check all blocks and print out all errors at the end.",
Value: false,
}

preMigrationFlags = []cli.Flag{
oldDBPathFlag,
Expand All @@ -130,6 +137,7 @@ var (
dbCheckFlags = []cli.Flag{
dbCheckPathFlag,
batchSizeFlag,
dbCheckFailFastFlag,
}
)

Expand Down Expand Up @@ -161,6 +169,7 @@ type fullMigrationOptions struct {
type dbCheckOptions struct {
dbPath string
batchSize uint64
failFast bool
}

func parsePreMigrationOptions(ctx *cli.Context) preMigrationOptions {
Expand Down Expand Up @@ -198,6 +207,7 @@ func parseDBCheckOptions(ctx *cli.Context) dbCheckOptions {
return dbCheckOptions{
dbPath: ctx.String(dbCheckPathFlag.Name),
batchSize: ctx.Uint64(batchSizeFlag.Name),
failFast: ctx.Bool(dbCheckFailFastFlag.Name),
}
}

Expand Down Expand Up @@ -487,74 +497,6 @@ func runStateMigration(newDBPath string, opts stateMigrationOptions) error {
return nil
}

func runDBCheckSequential(opts dbCheckOptions) (_err error) {
defer timer("db continuity check")()

log.Info("DB Continuity Check Started", "dbPath", opts.dbPath)

ancientDB, err := NewChainFreezer(filepath.Join(opts.dbPath, "ancient"), "", true)
if err != nil {
return fmt.Errorf("failed to open ancient db: %w", err)
}
defer func() {
_err = errors.Join(_err, ancientDB.Close())
}()
nonAncientDB, err := openDBWithoutFreezer(opts.dbPath, true)
if err != nil {
return fmt.Errorf("failed to open non-ancient db: %w", err)
}
defer func() {
_err = errors.Join(_err, nonAncientDB.Close())
}()

numAncients, err := ancientDB.Ancients()
if err != nil {
return fmt.Errorf("failed to get number of ancients in db: %w", err)
}
lastAncient := numAncients - 1
// lastBlock := *rawdb.ReadHeaderNumber(nonAncientDB, rawdb.ReadHeadHeaderHash(nonAncientDB))
lastBlock := uint64(6000000)

var prevBlockElement *RLPBlockElement

checkContinuity := func(start, end uint64, loadRangeFunc func(uint64, uint64) (*RLPBlockRange, error)) error {
var _err error
for i := start; i <= end; i += opts.batchSize {
count := min(opts.batchSize, end-i+1)
blockRange, err := loadRangeFunc(i, count)
if err != nil {
log.Error("failed to load block range", "err", err)
_err = errors.Join(_err, err)
prevBlockElement = nil
} else {
if prevBlockElement, err = blockRange.CheckContinuity(prevBlockElement, count); err != nil {
log.Error("failed continuity check", "err", err)
_err = errors.Join(_err, err)
}
}
}
return _err
}

log.Info("Checking continuity of ancient blocks", "start", 0, "end", lastAncient, "count", lastAncient+1)
if err := checkContinuity(0, lastAncient, func(start, count uint64) (*RLPBlockRange, error) {
return loadAncientRange(ancientDB, start, count)
}); err != nil {
_err = errors.Join(_err, err)
}

log.Info("Checking continuity of non-ancient blocks", "start", lastAncient+1, "end", lastBlock, "count", lastBlock-lastAncient)
if err := checkContinuity(lastAncient+1, lastBlock, func(start, count uint64) (*RLPBlockRange, error) {
return loadNonAncientRange(nonAncientDB, start, count)
}); err != nil {
_err = errors.Join(_err, err)
}

log.Info("DB Continuity Check Finished", "dbPath", opts.dbPath)

return _err
}

func runDBCheck(opts dbCheckOptions) (err error) {
defer timer("db continuity check")()

Expand All @@ -581,25 +523,36 @@ func runDBCheck(opts dbCheckOptions) (err error) {
}
lastAncientNumber := lastAncient.Header().Number.Uint64()
lastBlockNumber := *rawdb.ReadHeaderNumber(nonAncientDB, rawdb.ReadHeadHeaderHash(nonAncientDB))
// lastBlockNumber = uint64(6000000) // TODO(Alec) remove this line

lastBlockNumber = uint64(10000000) // TODO(Alec) remove this line

var errResult *multierror.Error

// First, check continuity between ancients and non-ancients.
// Gaps in data will often halt the freezing process, so attempting to load the first non-ancient block
// will most likely fail if there is a gap.
firstNonAncientRange, err := loadNonAncientRange(nonAncientDB, lastAncientNumber+1, 1)
if err != nil {
return fmt.Errorf("failed to load first non-ancient block: %w", err)
}
if _, err := firstNonAncientRange.CheckContinuity(lastAncient, 1); err != nil {
return fmt.Errorf("failed continuity check between ancients and non-ancients: %w", err)
if opts.failFast {
return fmt.Errorf("failed to load first non-ancient block: %w", err)
}
// We don't need to add the error to errResult here because it will be added below when we call checkContinuity on non-ancients
} else {
if _, err := firstNonAncientRange.CheckContinuity(lastAncient, 1); err != nil {
err = fmt.Errorf("failed continuity check between ancients and non-ancients: %w", err)
if opts.failFast {
return err
}
errResult = multierror.Append(errResult, err)
}
}

// TODO(Alec) Add fail fast flag?

g, _ := errgroup.WithContext(context.Background()) // TODO(Alec) use ctx?
g.SetLimit(16) // TODO(Alec) try other multiples of 4
g.SetLimit(8)

checkRange := func(start, count uint64, loadRangeFunc func(uint64, uint64) (*RLPBlockRange, error)) error {
var mu sync.Mutex

checkRange := func(start, count uint64, loadRangeFunc func(uint64, uint64) (*RLPBlockRange, error)) {
// If we are not at genesis or the first non-ancient block, include the last block of
// the previous range so we can check for continuity between ranges.
if start != 0 && start != lastAncientNumber+1 {
Expand All @@ -609,22 +562,36 @@ func runDBCheck(opts dbCheckOptions) (err error) {
g.Go(func() error {
blockRange, err := loadRangeFunc(start, count)
if err != nil {
return fmt.Errorf("failed to load block range: %w", err)
err = fmt.Errorf("failed to load block range: %w", err)
if opts.failFast {
return err
}
mu.Lock()
errResult = multierror.Append(errResult, err)
mu.Unlock()
return nil
}
if _, err := blockRange.CheckContinuity(nil, count); err != nil {
return fmt.Errorf("failed continuity check: %w", err)
err = fmt.Errorf("failed continuity check: %w", err)
if opts.failFast {
return err
}
mu.Lock()
errResult = multierror.Append(errResult, err)
mu.Unlock()
return nil
}
log.Info("Successfully checked block range continuity", "start", start, "end", start+count-1, "count", count)
return nil
})
return nil
}
checkContinuity := func(start, end uint64, loadRangeFunc func(uint64, uint64) (*RLPBlockRange, error)) error {
if (start <= lastAncientNumber && end > lastAncientNumber) || (end > lastBlockNumber) || (end < start) {
return fmt.Errorf("invalid range for continuity check: start=%d, end=%d, lastAncientNumber=%d, lastBlockNumber=%d", start, end, lastAncientNumber, lastBlockNumber)
}
for i := start; i <= end; i += opts.batchSize {
count := min(opts.batchSize, end-i+1)
if err := checkRange(i, count, loadRangeFunc); err != nil {
return err
}
checkRange(i, count, loadRangeFunc)
}
return nil
}
Expand All @@ -646,6 +613,10 @@ func runDBCheck(opts dbCheckOptions) (err error) {
return err
}

if errResult.ErrorOrNil() != nil {
return errResult
}

log.Info("DB Continuity Check Finished", "dbPath", opts.dbPath)

return nil
Expand Down
19 changes: 12 additions & 7 deletions op-chain-ops/cmd/celo-migrate/non-ancients.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"bytes"
"errors"
"fmt"
"os"
"os/exec"
Expand Down Expand Up @@ -98,11 +99,11 @@ func migrateNonAncientBlocks(newDB ethdb.Database, start, count uint64, prevBloc

blockRange, err := loadNonAncientRange(newDB, start, count)
if err != nil {
return nil, err
return nil, fmt.Errorf("Failed to load non-ancient block range: %w. This is likely due to a corrupted source directory. Please delete the target directory and repeat the migration with an uncorrupted source directory", err)
}
prevBlockElement, err = blockRange.CheckContinuity(prevBlockElement, count)
if err != nil {
return nil, fmt.Errorf("failed continuity check for non-ancient blocks: %w", err)
return nil, fmt.Errorf("Failed continuity check for non-ancient blocks: %w. This is likely due to a corrupted source directory. Please delete the target directory and repeat the migration with an uncorrupted source directory", err)
}
if err = blockRange.Transform(); err != nil {
return nil, err
Expand Down Expand Up @@ -130,29 +131,33 @@ func loadNonAncientRange(db ethdb.Database, start, count uint64) (*RLPBlockRange
if err != nil {
return nil, err
}

var combinedErr error

for i, numberHash := range numberHashes {
number := numberHash.Number
hash := numberHash.Hash

blockRange.hashes[i] = hash[:]
blockRange.headers[i], err = db.Get(headerKey(number, hash))
if err != nil {
return nil, fmt.Errorf("failed to find header in db for non-ancient block %d - %x: %w", number, hash, err)
combinedErr = errors.Join(combinedErr, fmt.Errorf("failed to find header in db for non-ancient block %d - %x: %w", number, hash, err))
}
blockRange.bodies[i], err = db.Get(blockBodyKey(number, hash))
if err != nil {
return nil, fmt.Errorf("failed to find body in db for non-ancient block %d - %x: %w", number, hash, err)
combinedErr = errors.Join(combinedErr, fmt.Errorf("failed to find body in db for non-ancient block %d - %x: %w", number, hash, err))
}
blockRange.receipts[i], err = db.Get(blockReceiptsKey(number, hash))
if err != nil {
return nil, fmt.Errorf("failed to find receipts in db for non-ancient block %d - %x: %w", number, hash, err)
combinedErr = errors.Join(combinedErr, fmt.Errorf("failed to find receipts in db for non-ancient block %d - %x: %w", number, hash, err))
}
blockRange.tds[i], err = db.Get(headerTDKey(number, hash))
if err != nil {
return nil, fmt.Errorf("failed to find total difficulty in db for non-ancient block %d - %x: %w", number, hash, err)
combinedErr = errors.Join(combinedErr, fmt.Errorf("failed to find total difficulty in db for non-ancient block %d - %x: %w", number, hash, err))
}
}
return blockRange, nil

return blockRange, combinedErr
}

// checkNumberHashes checks that the contents of a NumberHash slice match the contents in the headerNumber and headerHash db tables.
Expand Down

0 comments on commit 8f6de48

Please sign in to comment.