diff --git a/op-chain-ops/cmd/celo-migrate/ancients.go b/op-chain-ops/cmd/celo-migrate/ancients.go index 088dc7e22a2c..e347797bfc22 100644 --- a/op-chain-ops/cmd/celo-migrate/ancients.go +++ b/op-chain-ops/cmd/celo-migrate/ancients.go @@ -117,7 +117,7 @@ func readAncientBlocks(ctx context.Context, freezer *rawdb.Freezer, startBlock, return fmt.Errorf("failed to load ancient block range: %w", err) } - if err = blockRange.CheckContinuity(nil, count); err != nil { + if _, err = blockRange.CheckContinuity(nil, count); err != nil { return err } diff --git a/op-chain-ops/cmd/celo-migrate/continuity.go b/op-chain-ops/cmd/celo-migrate/continuity.go index d7749ebb482f..c006fdc8cc5d 100644 --- a/op-chain-ops/cmd/celo-migrate/continuity.go +++ b/op-chain-ops/cmd/celo-migrate/continuity.go @@ -58,30 +58,45 @@ func (r *RLPBlockRange) DropFirst() { // CheckContinuity checks if the block data in the range is continuous // by comparing the header number and parent hash of each block with the previous block, // and by checking if the number of elements retrieved from each table is the same. -// It takes in a pointer to the last element in the preceding range, and re-assigns it to -// the last element in the current range so that continuity can be checked across ranges. -func (r *RLPBlockRange) CheckContinuity(prevElement *RLPBlockElement, expectedLength uint64) error { - log.Info("Checking data continuity for block range", "start", r.start, "end", r.start+expectedLength-1, "count", expectedLength) +// It takes in the last element in the preceding range, and returns the last element +// in the current range so that continuity can be checked across ranges. +func (r *RLPBlockRange) CheckContinuity(prevElement *RLPBlockElement, expectedLength uint64) (*RLPBlockElement, error) { + log.Info("Checking data continuity for block range", + "start", r.start, + "end", r.start+expectedLength-1, + "count", expectedLength, + "prevElement", func() interface{} { + if prevElement != nil { + return prevElement.Header().Number.Uint64() + } + return "nil" + }(), + ) if err := r.CheckLengths(expectedLength); err != nil { - return err + return nil, err } + + var _err error for i := range r.hashes { currElement, err := r.Element(uint64(i)) if err != nil { - return err + return nil, err } if currElement.Header().Number.Uint64() != r.start+uint64(i) { - return fmt.Errorf("decoded header number mismatch: expected %d, actual %d", r.start+uint64(i), currElement.Header().Number.Uint64()) + err = fmt.Errorf("decoded header number mismatch: expected %d, actual %d", r.start+uint64(i), currElement.Header().Number.Uint64()) + log.Error(err.Error()) + _err = errors.Join(_err, err) } if prevElement != nil { + log.Debug("Checking continuity", "block", currElement.Header().Number.Uint64(), "prev", prevElement.Header().Number.Uint64()) if err := currElement.Follows(prevElement); err != nil { - return err + _err = errors.Join(_err, err) } } prevElement = currElement } - return nil + return prevElement, _err } // CheckLengths makes sure the number of elements retrieved from each table is the same @@ -124,17 +139,17 @@ func (r *RLPBlockRange) Transform() error { // Follows checks if the current block has a number one greater than the previous block // and if the parent hash of the current block matches the hash of the previous block. -func (e *RLPBlockElement) Follows(prev *RLPBlockElement) error { +func (e *RLPBlockElement) Follows(prev *RLPBlockElement) (err error) { if e.Header().Number.Uint64() != prev.Header().Number.Uint64()+1 { - return fmt.Errorf("header number mismatch: expected %d, actual %d", prev.Header().Number.Uint64()+1, e.Header().Number.Uint64()) + err = errors.Join(err, fmt.Errorf("header number mismatch: expected %d, actual %d", prev.Header().Number.Uint64()+1, e.Header().Number.Uint64())) } // We compare the parent hash with the stored hash of the previous block because // at this point the header object will not calculate the correct hash since it // first needs to be transformed. if e.Header().ParentHash != common.BytesToHash(prev.hash) { - return fmt.Errorf("parent hash mismatch between blocks %d and %d", e.Header().Number.Uint64(), prev.Header().Number.Uint64()) + err = errors.Join(err, fmt.Errorf("parent hash mismatch between blocks %d and %d", e.Header().Number.Uint64(), prev.Header().Number.Uint64())) } - return nil + return err } func (e *RLPBlockElement) Header() *types.Header { diff --git a/op-chain-ops/cmd/celo-migrate/continuity_test.go b/op-chain-ops/cmd/celo-migrate/continuity_test.go index 86862f4ca5f6..c50e06a67df2 100644 --- a/op-chain-ops/cmd/celo-migrate/continuity_test.go +++ b/op-chain-ops/cmd/celo-migrate/continuity_test.go @@ -195,7 +195,7 @@ func TestCheckContinuity(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - err := tt.blockRange.CheckContinuity(tt.prevElement, tt.expectedLength) + _, err := tt.blockRange.CheckContinuity(tt.prevElement, tt.expectedLength) if tt.expectErrorMsg == "" { require.NoError(t, err, "CheckContinuity() unexpected error") } else { diff --git a/op-chain-ops/cmd/celo-migrate/main.go b/op-chain-ops/cmd/celo-migrate/main.go index 75b58895b5a2..3db1f363ca01 100644 --- a/op-chain-ops/cmd/celo-migrate/main.go +++ b/op-chain-ops/cmd/celo-migrate/main.go @@ -159,8 +159,9 @@ type fullMigrationOptions struct { } type dbCheckOptions struct { - dbPath string - batchSize uint64 + dbPath string + batchSize uint64 + bufferSize uint64 } func parsePreMigrationOptions(ctx *cli.Context) preMigrationOptions { @@ -196,8 +197,9 @@ func parseFullMigrationOptions(ctx *cli.Context) fullMigrationOptions { func parseDBCheckOptions(ctx *cli.Context) dbCheckOptions { return dbCheckOptions{ - dbPath: ctx.String(dbCheckPathFlag.Name), - batchSize: ctx.Uint64(batchSizeFlag.Name), + dbPath: ctx.String(dbCheckPathFlag.Name), + batchSize: ctx.Uint64(batchSizeFlag.Name), + bufferSize: ctx.Uint64(bufferSizeFlag.Name), } } @@ -507,6 +509,73 @@ func runDBCheck(opts dbCheckOptions) (_err error) { _err = errors.Join(_err, nonAncientDB.Close()) }() + numAncients, err := ancientDB.Ancients() + if err != nil { + return fmt.Errorf("failed to get number of ancients in db: %w", err) + } + lastAncient := numAncients - 1 + lastBlock := *rawdb.ReadHeaderNumber(nonAncientDB, rawdb.ReadHeadHeaderHash(nonAncientDB)) + + var prevBlockElement *RLPBlockElement + + checkContinuity := func(start, end uint64, loadRangeFunc func(uint64, uint64) (*RLPBlockRange, error)) error { + var _err error + for i := start; i <= end; i += opts.batchSize { + count := min(opts.batchSize, end-i+1) + blockRange, err := loadRangeFunc(i, count) + if err != nil { + log.Error("failed to load block range", "err", err) + _err = errors.Join(_err, err) + prevBlockElement = nil + } else { + if prevBlockElement, err = blockRange.CheckContinuity(prevBlockElement, count); err != nil { + log.Error("failed continuity check", "err", err) + _err = errors.Join(_err, err) + } + } + } + return _err + } + + log.Info("Checking continuity of ancient blocks", "start", 0, "end", lastAncient, "count", lastAncient+1) + if err := checkContinuity(0, lastAncient, func(start, count uint64) (*RLPBlockRange, error) { + return loadAncientRange(ancientDB, start, count) + }); err != nil { + _err = errors.Join(_err, err) + } + + log.Info("Checking continuity of non-ancient blocks", "start", lastAncient+1, "end", lastBlock, "count", lastBlock-lastAncient) + if err := checkContinuity(lastAncient+1, lastBlock, func(start, count uint64) (*RLPBlockRange, error) { + return loadNonAncientRange(nonAncientDB, start, count) + }); err != nil { + _err = errors.Join(_err, err) + } + + log.Info("DB Continuity Check Finished", "dbPath", opts.dbPath) + + return _err +} + +func runDBCheckParallel(opts dbCheckOptions) (_err error) { + defer timer("db continuity check")() + + log.Info("DB Continuity Check Started", "dbPath", opts.dbPath) + + ancientDB, err := NewChainFreezer(filepath.Join(opts.dbPath, "ancient"), "", true) + if err != nil { + return fmt.Errorf("failed to open ancient db: %w", err) + } + defer func() { + _err = errors.Join(_err, ancientDB.Close()) + }() + nonAncientDB, err := openDBWithoutFreezer(opts.dbPath, true) + if err != nil { + return fmt.Errorf("failed to open non-ancient db: %w", err) + } + defer func() { + _err = errors.Join(_err, nonAncientDB.Close()) + }() + // Get the number of ancient blocks numAncients, err := ancientDB.Ancients() if err != nil { @@ -516,36 +585,67 @@ func runDBCheck(opts dbCheckOptions) (_err error) { lastBlock := *rawdb.ReadHeaderNumber(nonAncientDB, rawdb.ReadHeadHeaderHash(nonAncientDB)) numBlocks := lastBlock + 1 - var prevBlockElement *RLPBlockElement + // var prevBlockElement *RLPBlockElement // TODO(Alec) - log.Info("Checking continuity of ancient blocks", "start", 0, "end", numAncients-1, "count", numAncients) - for i := uint64(0); i < numAncients; i += opts.batchSize { - count := min(opts.batchSize, numAncients-i) - ancientRange, err := loadAncientRange(ancientDB, i, count) + // Use errgroup for concurrent processing + g, _ := errgroup.WithContext(context.Background()) + blockChan := make(chan *RLPBlockRange, opts.bufferSize) + + // Worker function to load blocks + loadBlocks := func(start, count uint64, loadRangeFunc func(uint64, uint64) (*RLPBlockRange, error)) error { + // If we are not at genesis include the last block of + // the previous range so we can check for continuity between ranges. + if start != 0 && start != numAncients { + start = start - 1 + count = count + 1 + } + blockRange, err := loadRangeFunc(start, count) if err != nil { - log.Error("failed to load ancient range", "err", err) - _err = errors.Join(_err, err) - } else { - if err = ancientRange.CheckContinuity(prevBlockElement, count); err != nil { - log.Error("failed continuity check for ancient blocks", "err", err) - _err = errors.Join(_err, err) - } + return fmt.Errorf("failed to load block range: %w", err) } + blockChan <- blockRange + return nil + } + + // Launch goroutines to process ancient blocks in parallel + for i := uint64(0); i < numAncients; i += opts.batchSize { + start := i + count := min(opts.batchSize, numAncients-start) + g.Go(func() error { + return loadBlocks(start, count, func(start, count uint64) (*RLPBlockRange, error) { + return loadAncientRange(ancientDB, start, count) + }) + }) } - log.Info("Checking continuity of non-ancient blocks", "start", numAncients, "end", numBlocks-1, "count", numBlocks-numAncients) + // Launch goroutines to process non-ancient blocks in parallel for i := numAncients; i < numBlocks; i += opts.batchSize { - count := min(opts.batchSize, numBlocks-i) - nonAncientRange, err := loadNonAncientRange(nonAncientDB, i, count) - if err != nil { - log.Error("failed to load non-ancient range", "err", err) - _err = errors.Join(_err, err) - } else { - if err = nonAncientRange.CheckContinuity(prevBlockElement, count); err != nil { - log.Error("failed continuity check for non-ancient blocks", "err", err) - _err = errors.Join(_err, err) - } + start := i + count := min(opts.batchSize, numBlocks-start) + g.Go(func() error { + return loadBlocks(start, count, func(start, count uint64) (*RLPBlockRange, error) { + return loadNonAncientRange(nonAncientDB, start, count) + }) + }) + } + + // Pipeline step to check continuity + g.Go(func() error { + for blockRange := range blockChan { + _blockRange := blockRange + g.Go(func() error { + if _, err := _blockRange.CheckContinuity(nil, uint64(len(_blockRange.hashes))); err != nil { + log.Error("failed continuity check for blocks", "err", err) + _err = errors.Join(_err, err) + } + return nil + }) } + return nil + }) + + if err := g.Wait(); err != nil { + return err } log.Info("DB Continuity Check Finished", "dbPath", opts.dbPath) diff --git a/op-chain-ops/cmd/celo-migrate/non-ancients.go b/op-chain-ops/cmd/celo-migrate/non-ancients.go index a0f1f11ee018..e0a2bd55688d 100644 --- a/op-chain-ops/cmd/celo-migrate/non-ancients.go +++ b/op-chain-ops/cmd/celo-migrate/non-ancients.go @@ -74,15 +74,17 @@ func migrateNonAncientsDb(newDB ethdb.Database, lastBlock, batchSize uint64, las if lastAncient != nil { // The genesis block is the only block that should remain stored in the non-ancient db even after it is frozen. log.Info("Migrating genesis block in non-ancient db", "process", "non-ancients") - if err := migrateNonAncientBlocks(newDB, 0, 1, nil); err != nil { + if _, err := migrateNonAncientBlocks(newDB, 0, 1, nil); err != nil { return 0, err } } lastAncientNumber := lastAncient.Header().Number.Uint64() - prevBlockElement := *lastAncient + prevBlock := lastAncient + var err error for i := lastAncientNumber + 1; i <= lastBlock; i += batchSize { - if err := migrateNonAncientBlocks(newDB, i, batchSize, &prevBlockElement); err != nil { + prevBlock, err = migrateNonAncientBlocks(newDB, i, batchSize, prevBlock) + if err != nil { return 0, err } } @@ -91,24 +93,25 @@ func migrateNonAncientsDb(newDB ethdb.Database, lastBlock, batchSize uint64, las return migratedCount, nil } -func migrateNonAncientBlocks(newDB ethdb.Database, start, count uint64, prevBlockElement *RLPBlockElement) error { +func migrateNonAncientBlocks(newDB ethdb.Database, start, count uint64, prevBlockElement *RLPBlockElement) (*RLPBlockElement, error) { log.Info("Processing Block Range", "process", "non-ancients", "from", start, "to(inclusve)", start+count-1, "count", count) blockRange, err := loadNonAncientRange(newDB, start, count) if err != nil { - return err + return nil, err } - if err = blockRange.CheckContinuity(prevBlockElement, count); err != nil { - return fmt.Errorf("failed continuity check for non-ancient blocks: %w", err) + prevBlockElement, err = blockRange.CheckContinuity(prevBlockElement, count) + if err != nil { + return nil, fmt.Errorf("failed continuity check for non-ancient blocks: %w", err) } if err = blockRange.Transform(); err != nil { - return err + return nil, err } if err = writeNonAncientBlockRange(newDB, blockRange); err != nil { - return err + return nil, err } - return nil + return prevBlockElement, nil } func loadNonAncientRange(db ethdb.Database, start, count uint64) (*RLPBlockRange, error) {