From f023e084e1c39eb745c2f00967449942a655b1aa Mon Sep 17 00:00:00 2001 From: Martin Tomazic Date: Mon, 10 Feb 2025 14:38:35 +0100 Subject: [PATCH] go/consensus/cometbft/roothash: Batch blocks during reindex --- .changelog/6069.feature.md | 4 + go/consensus/cometbft/roothash/roothash.go | 112 ++++++++++++++------- 2 files changed, 82 insertions(+), 34 deletions(-) create mode 100644 .changelog/6069.feature.md diff --git a/.changelog/6069.feature.md b/.changelog/6069.feature.md new file mode 100644 index 00000000000..06c9bc2bad2 --- /dev/null +++ b/.changelog/6069.feature.md @@ -0,0 +1,4 @@ +go/roothash: Optimize runtime history reindex + +During runtime history reindex, we batch writes resulting in significant +speed-up of history reindex. diff --git a/go/consensus/cometbft/roothash/roothash.go b/go/consensus/cometbft/roothash/roothash.go index 1c289721225..c69804c53e8 100644 --- a/go/consensus/cometbft/roothash/roothash.go +++ b/go/consensus/cometbft/roothash/roothash.go @@ -27,7 +27,10 @@ import ( runtimeRegistry "github.com/oasisprotocol/oasis-core/go/runtime/registry" ) -const crashPointBlockBeforeIndex = "roothash.before_index" +const ( + crashPointBlockBeforeIndex = "roothash.before_index" + reindexWriteBatchSize = 1000 +) // ServiceClient is the roothash service client interface. type ServiceClient interface { @@ -365,7 +368,7 @@ func (sc *serviceClient) reindexBlocks(ctx context.Context, currentHeight int64, var err error var lastHeight int64 if lastHeight, err = bh.LastConsensusHeight(); err != nil { - sc.logger.Error("failed to get last indexed height", + logger.Error("failed to get last indexed height", "err", err, ) return 0, fmt.Errorf("failed to get last indexed height: %w", err) @@ -396,19 +399,67 @@ func (sc *serviceClient) reindexBlocks(ctx context.Context, currentHeight int64, } // Scan all blocks between last indexed height and current height. - logger.Debug("reindexing blocks", - "last_indexed_height", lastHeight, + logger.Info("reindexing blocks", + "last_height", lastHeight, "current_height", currentHeight, logging.LogEvent, api.LogEventHistoryReindexing, ) - for height := lastHeight; height <= currentHeight; height++ { - var results *cmtrpctypes.ResultBlockResults - results, err = sc.backend.GetBlockResults(ctx, height) + for height := lastHeight; height <= currentHeight; height += reindexWriteBatchSize { + end := height + reindexWriteBatchSize - 1 + if end > currentHeight { + end = currentHeight + } + last, err := sc.reindexBatch(ctx, runtimeID, bh, height, end) + if err != nil { + return 0, fmt.Errorf("failed to commit batch to history keeper: %w", err) + } + if last != api.RoundInvalid { + // New rounds indexed. + lastRound = last + } + } + + if lastRound == api.RoundInvalid { + logger.Debug("no new round reindexed, return latest known round") + switch blk, err := bh.GetCommittedBlock(ctx, api.RoundLatest); err { + case api.ErrNotFound: + case nil: + lastRound = blk.Header.Round + default: + return lastRound, fmt.Errorf("failed to get latest block: %w", err) + } + } + + logger.Info("block reindex complete", + "last_round", lastRound, + ) + + return lastRound, nil +} + +func (sc *serviceClient) reindexBatch( + ctx context.Context, + runtimeID common.Namespace, + bh api.BlockHistory, + start int64, + end int64, +) (uint64, error) { + sc.logger.Debug("reindexing batch", + "runtime_id", runtimeID, + "start", start, + "end", end, + ) + + lastRound := api.RoundInvalid + var blocks []*api.AnnotatedBlock + var roundResults []*api.RoundResults + for height := start; height <= end; height++ { + results, err := sc.backend.GetBlockResults(ctx, height) if err != nil { // XXX: could soft-fail first few heights in case more heights were // pruned right after the GetLastRetainedVersion query. - logger.Error("failed to get cometbft block results", + sc.logger.Error("failed to get cometbft block results", "err", err, "height", height, ) @@ -447,7 +498,7 @@ func (sc *serviceClient) reindexBlocks(ctx context.Context, currentHeight int64, case eventsAPI.IsAttributeKind(key, &api.FinalizedEvent{}): var e api.FinalizedEvent if err = eventsAPI.DecodeValue(val, &e); err != nil { - logger.Error("failed to unmarshal finalized event", + sc.logger.Error("failed to unmarshal finalized event", "err", err, "height", height, ) @@ -467,40 +518,33 @@ func (sc *serviceClient) reindexBlocks(ctx context.Context, currentHeight int64, continue } - annBlk, roundResults, err := sc.fetchFinalizedRound(ctx, height, runtimeID, &ev.Round) + annBlk, rr, err := sc.fetchFinalizedRound(ctx, height, runtimeID, &ev.Round) if err != nil { return 0, fmt.Errorf("failed to fetch roothash finalized round: %w", err) } - err = bh.Commit(annBlk, roundResults, false) - if err != nil { - sc.logger.Error("failed to commit block to history keeper", - "err", err, - "runtime_id", runtimeID, - "height", height, - "round", annBlk.Block.Header.Round, - ) - return 0, fmt.Errorf("failed to commit block to history keeper: %w", err) - } + blocks = append(blocks, annBlk) + roundResults = append(roundResults, rr) + sc.logger.Debug("block added to batch", + "runtime_id", runtimeID, + "height", height, + "round", annBlk.Block.Header.Round, + ) lastRound = ev.Round } } - if lastRound == api.RoundInvalid { - sc.logger.Debug("no new round reindexed, return latest known round") - switch blk, err := bh.GetCommittedBlock(ctx, api.RoundLatest); err { - case api.ErrNotFound: - case nil: - lastRound = blk.Header.Round - default: - return lastRound, fmt.Errorf("failed to get latest block: %w", err) - } + // Do not notify watchers during history reindex. + err := bh.CommitBatch(blocks, roundResults, false) + if err != nil { + sc.logger.Error("failed to commit batch to history keeper", + "runtime_id", runtimeID, + "err", err, + "start", start, + "end", end, + ) + return 0, err } - - sc.logger.Debug("block reindex complete", - "last_round", lastRound, - ) - return lastRound, nil }