Skip to content

Commit

Permalink
return prevElement from CheckContinuity, attempt to parallelize conti…
Browse files Browse the repository at this point in the history
…nuity script
  • Loading branch information
alecps committed Jan 15, 2025
1 parent de9e4f3 commit c5f14c5
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 52 deletions.
2 changes: 1 addition & 1 deletion op-chain-ops/cmd/celo-migrate/ancients.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func readAncientBlocks(ctx context.Context, freezer *rawdb.Freezer, startBlock,
return fmt.Errorf("failed to load ancient block range: %w", err)
}

if err = blockRange.CheckContinuity(nil, count); err != nil {
if _, err = blockRange.CheckContinuity(nil, count); err != nil {
return err
}

Expand Down
41 changes: 28 additions & 13 deletions op-chain-ops/cmd/celo-migrate/continuity.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,30 +58,45 @@ func (r *RLPBlockRange) DropFirst() {
// CheckContinuity checks if the block data in the range is continuous
// by comparing the header number and parent hash of each block with the previous block,
// and by checking if the number of elements retrieved from each table is the same.
// It takes in a pointer to the last element in the preceding range, and re-assigns it to
// the last element in the current range so that continuity can be checked across ranges.
func (r *RLPBlockRange) CheckContinuity(prevElement *RLPBlockElement, expectedLength uint64) error {
log.Info("Checking data continuity for block range", "start", r.start, "end", r.start+expectedLength-1, "count", expectedLength)
// It takes in the last element in the preceding range, and returns the last element
// in the current range so that continuity can be checked across ranges.
func (r *RLPBlockRange) CheckContinuity(prevElement *RLPBlockElement, expectedLength uint64) (*RLPBlockElement, error) {
log.Info("Checking data continuity for block range",
"start", r.start,
"end", r.start+expectedLength-1,
"count", expectedLength,
"prevElement", func() interface{} {
if prevElement != nil {
return prevElement.Header().Number.Uint64()
}
return "nil"
}(),
)

if err := r.CheckLengths(expectedLength); err != nil {
return err
return nil, err
}

var _err error
for i := range r.hashes {
currElement, err := r.Element(uint64(i))
if err != nil {
return err
return nil, err
}
if currElement.Header().Number.Uint64() != r.start+uint64(i) {
return fmt.Errorf("decoded header number mismatch: expected %d, actual %d", r.start+uint64(i), currElement.Header().Number.Uint64())
err = fmt.Errorf("decoded header number mismatch: expected %d, actual %d", r.start+uint64(i), currElement.Header().Number.Uint64())
log.Error(err.Error())
_err = errors.Join(_err, err)
}
if prevElement != nil {
log.Debug("Checking continuity", "block", currElement.Header().Number.Uint64(), "prev", prevElement.Header().Number.Uint64())
if err := currElement.Follows(prevElement); err != nil {
return err
_err = errors.Join(_err, err)
}
}
prevElement = currElement
}
return nil
return prevElement, _err
}

// CheckLengths makes sure the number of elements retrieved from each table is the same
Expand Down Expand Up @@ -124,17 +139,17 @@ func (r *RLPBlockRange) Transform() error {

// Follows checks if the current block has a number one greater than the previous block
// and if the parent hash of the current block matches the hash of the previous block.
func (e *RLPBlockElement) Follows(prev *RLPBlockElement) error {
func (e *RLPBlockElement) Follows(prev *RLPBlockElement) (err error) {
if e.Header().Number.Uint64() != prev.Header().Number.Uint64()+1 {
return fmt.Errorf("header number mismatch: expected %d, actual %d", prev.Header().Number.Uint64()+1, e.Header().Number.Uint64())
err = errors.Join(err, fmt.Errorf("header number mismatch: expected %d, actual %d", prev.Header().Number.Uint64()+1, e.Header().Number.Uint64()))
}
// We compare the parent hash with the stored hash of the previous block because
// at this point the header object will not calculate the correct hash since it
// first needs to be transformed.
if e.Header().ParentHash != common.BytesToHash(prev.hash) {
return fmt.Errorf("parent hash mismatch between blocks %d and %d", e.Header().Number.Uint64(), prev.Header().Number.Uint64())
err = errors.Join(err, fmt.Errorf("parent hash mismatch between blocks %d and %d", e.Header().Number.Uint64(), prev.Header().Number.Uint64()))
}
return nil
return err
}

func (e *RLPBlockElement) Header() *types.Header {
Expand Down
2 changes: 1 addition & 1 deletion op-chain-ops/cmd/celo-migrate/continuity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func TestCheckContinuity(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.blockRange.CheckContinuity(tt.prevElement, tt.expectedLength)
_, err := tt.blockRange.CheckContinuity(tt.prevElement, tt.expectedLength)
if tt.expectErrorMsg == "" {
require.NoError(t, err, "CheckContinuity() unexpected error")
} else {
Expand Down
154 changes: 127 additions & 27 deletions op-chain-ops/cmd/celo-migrate/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,9 @@ type fullMigrationOptions struct {
}

type dbCheckOptions struct {
dbPath string
batchSize uint64
dbPath string
batchSize uint64
bufferSize uint64
}

func parsePreMigrationOptions(ctx *cli.Context) preMigrationOptions {
Expand Down Expand Up @@ -196,8 +197,9 @@ func parseFullMigrationOptions(ctx *cli.Context) fullMigrationOptions {

func parseDBCheckOptions(ctx *cli.Context) dbCheckOptions {
return dbCheckOptions{
dbPath: ctx.String(dbCheckPathFlag.Name),
batchSize: ctx.Uint64(batchSizeFlag.Name),
dbPath: ctx.String(dbCheckPathFlag.Name),
batchSize: ctx.Uint64(batchSizeFlag.Name),
bufferSize: ctx.Uint64(bufferSizeFlag.Name),
}
}

Expand Down Expand Up @@ -507,6 +509,73 @@ func runDBCheck(opts dbCheckOptions) (_err error) {
_err = errors.Join(_err, nonAncientDB.Close())
}()

numAncients, err := ancientDB.Ancients()
if err != nil {
return fmt.Errorf("failed to get number of ancients in db: %w", err)
}
lastAncient := numAncients - 1
lastBlock := *rawdb.ReadHeaderNumber(nonAncientDB, rawdb.ReadHeadHeaderHash(nonAncientDB))

var prevBlockElement *RLPBlockElement

checkContinuity := func(start, end uint64, loadRangeFunc func(uint64, uint64) (*RLPBlockRange, error)) error {
var _err error
for i := start; i <= end; i += opts.batchSize {
count := min(opts.batchSize, end-i+1)
blockRange, err := loadRangeFunc(i, count)
if err != nil {
log.Error("failed to load block range", "err", err)
_err = errors.Join(_err, err)
prevBlockElement = nil
} else {
if prevBlockElement, err = blockRange.CheckContinuity(prevBlockElement, count); err != nil {
log.Error("failed continuity check", "err", err)
_err = errors.Join(_err, err)
}
}
}
return _err
}

log.Info("Checking continuity of ancient blocks", "start", 0, "end", lastAncient, "count", lastAncient+1)
if err := checkContinuity(0, lastAncient, func(start, count uint64) (*RLPBlockRange, error) {
return loadAncientRange(ancientDB, start, count)
}); err != nil {
_err = errors.Join(_err, err)
}

log.Info("Checking continuity of non-ancient blocks", "start", lastAncient+1, "end", lastBlock, "count", lastBlock-lastAncient)
if err := checkContinuity(lastAncient+1, lastBlock, func(start, count uint64) (*RLPBlockRange, error) {
return loadNonAncientRange(nonAncientDB, start, count)
}); err != nil {
_err = errors.Join(_err, err)
}

log.Info("DB Continuity Check Finished", "dbPath", opts.dbPath)

return _err
}

func runDBCheckParallel(opts dbCheckOptions) (_err error) {
defer timer("db continuity check")()

log.Info("DB Continuity Check Started", "dbPath", opts.dbPath)

ancientDB, err := NewChainFreezer(filepath.Join(opts.dbPath, "ancient"), "", true)
if err != nil {
return fmt.Errorf("failed to open ancient db: %w", err)
}
defer func() {
_err = errors.Join(_err, ancientDB.Close())
}()
nonAncientDB, err := openDBWithoutFreezer(opts.dbPath, true)
if err != nil {
return fmt.Errorf("failed to open non-ancient db: %w", err)
}
defer func() {
_err = errors.Join(_err, nonAncientDB.Close())
}()

// Get the number of ancient blocks
numAncients, err := ancientDB.Ancients()
if err != nil {
Expand All @@ -516,36 +585,67 @@ func runDBCheck(opts dbCheckOptions) (_err error) {
lastBlock := *rawdb.ReadHeaderNumber(nonAncientDB, rawdb.ReadHeadHeaderHash(nonAncientDB))
numBlocks := lastBlock + 1

var prevBlockElement *RLPBlockElement
// var prevBlockElement *RLPBlockElement // TODO(Alec)

log.Info("Checking continuity of ancient blocks", "start", 0, "end", numAncients-1, "count", numAncients)
for i := uint64(0); i < numAncients; i += opts.batchSize {
count := min(opts.batchSize, numAncients-i)
ancientRange, err := loadAncientRange(ancientDB, i, count)
// Use errgroup for concurrent processing
g, _ := errgroup.WithContext(context.Background())
blockChan := make(chan *RLPBlockRange, opts.bufferSize)

// Worker function to load blocks
loadBlocks := func(start, count uint64, loadRangeFunc func(uint64, uint64) (*RLPBlockRange, error)) error {
// If we are not at genesis include the last block of
// the previous range so we can check for continuity between ranges.
if start != 0 && start != numAncients {
start = start - 1
count = count + 1
}
blockRange, err := loadRangeFunc(start, count)
if err != nil {
log.Error("failed to load ancient range", "err", err)
_err = errors.Join(_err, err)
} else {
if err = ancientRange.CheckContinuity(prevBlockElement, count); err != nil {
log.Error("failed continuity check for ancient blocks", "err", err)
_err = errors.Join(_err, err)
}
return fmt.Errorf("failed to load block range: %w", err)
}
blockChan <- blockRange
return nil
}

// Launch goroutines to process ancient blocks in parallel
for i := uint64(0); i < numAncients; i += opts.batchSize {
start := i
count := min(opts.batchSize, numAncients-start)
g.Go(func() error {
return loadBlocks(start, count, func(start, count uint64) (*RLPBlockRange, error) {
return loadAncientRange(ancientDB, start, count)
})
})
}

log.Info("Checking continuity of non-ancient blocks", "start", numAncients, "end", numBlocks-1, "count", numBlocks-numAncients)
// Launch goroutines to process non-ancient blocks in parallel
for i := numAncients; i < numBlocks; i += opts.batchSize {
count := min(opts.batchSize, numBlocks-i)
nonAncientRange, err := loadNonAncientRange(nonAncientDB, i, count)
if err != nil {
log.Error("failed to load non-ancient range", "err", err)
_err = errors.Join(_err, err)
} else {
if err = nonAncientRange.CheckContinuity(prevBlockElement, count); err != nil {
log.Error("failed continuity check for non-ancient blocks", "err", err)
_err = errors.Join(_err, err)
}
start := i
count := min(opts.batchSize, numBlocks-start)
g.Go(func() error {
return loadBlocks(start, count, func(start, count uint64) (*RLPBlockRange, error) {
return loadNonAncientRange(nonAncientDB, start, count)
})
})
}

// Pipeline step to check continuity
g.Go(func() error {
for blockRange := range blockChan {
_blockRange := blockRange
g.Go(func() error {
if _, err := _blockRange.CheckContinuity(nil, uint64(len(_blockRange.hashes))); err != nil {
log.Error("failed continuity check for blocks", "err", err)
_err = errors.Join(_err, err)
}
return nil
})
}
return nil
})

if err := g.Wait(); err != nil {
return err
}

log.Info("DB Continuity Check Finished", "dbPath", opts.dbPath)
Expand Down
23 changes: 13 additions & 10 deletions op-chain-ops/cmd/celo-migrate/non-ancients.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,17 @@ func migrateNonAncientsDb(newDB ethdb.Database, lastBlock, batchSize uint64, las
if lastAncient != nil {
// The genesis block is the only block that should remain stored in the non-ancient db even after it is frozen.
log.Info("Migrating genesis block in non-ancient db", "process", "non-ancients")
if err := migrateNonAncientBlocks(newDB, 0, 1, nil); err != nil {
if _, err := migrateNonAncientBlocks(newDB, 0, 1, nil); err != nil {
return 0, err
}
}

lastAncientNumber := lastAncient.Header().Number.Uint64()
prevBlockElement := *lastAncient
prevBlock := lastAncient
var err error
for i := lastAncientNumber + 1; i <= lastBlock; i += batchSize {
if err := migrateNonAncientBlocks(newDB, i, batchSize, &prevBlockElement); err != nil {
prevBlock, err = migrateNonAncientBlocks(newDB, i, batchSize, prevBlock)
if err != nil {
return 0, err
}
}
Expand All @@ -91,24 +93,25 @@ func migrateNonAncientsDb(newDB ethdb.Database, lastBlock, batchSize uint64, las
return migratedCount, nil
}

func migrateNonAncientBlocks(newDB ethdb.Database, start, count uint64, prevBlockElement *RLPBlockElement) error {
func migrateNonAncientBlocks(newDB ethdb.Database, start, count uint64, prevBlockElement *RLPBlockElement) (*RLPBlockElement, error) {
log.Info("Processing Block Range", "process", "non-ancients", "from", start, "to(inclusve)", start+count-1, "count", count)

blockRange, err := loadNonAncientRange(newDB, start, count)
if err != nil {
return err
return nil, err
}
if err = blockRange.CheckContinuity(prevBlockElement, count); err != nil {
return fmt.Errorf("failed continuity check for non-ancient blocks: %w", err)
prevBlockElement, err = blockRange.CheckContinuity(prevBlockElement, count)
if err != nil {
return nil, fmt.Errorf("failed continuity check for non-ancient blocks: %w", err)
}
if err = blockRange.Transform(); err != nil {
return err
return nil, err
}
if err = writeNonAncientBlockRange(newDB, blockRange); err != nil {
return err
return nil, err
}

return nil
return prevBlockElement, nil
}

func loadNonAncientRange(db ethdb.Database, start, count uint64) (*RLPBlockRange, error) {
Expand Down

0 comments on commit c5f14c5

Please sign in to comment.