Skip to content

Commit

Permalink
go/consensus/cometbft/roothash: Batch blocks during reindex
Browse files Browse the repository at this point in the history
  • Loading branch information
martintomazic committed Feb 27, 2025
1 parent 5f91ff3 commit f418cd1
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 32 deletions.
4 changes: 4 additions & 0 deletions .changelog/6069.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
go/roothash: Optimize runtime history reindex

During runtime history reindex, we batch writes resulting in significant
speed-up of history reindex.
107 changes: 75 additions & 32 deletions go/consensus/cometbft/roothash/roothash.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ import (
runtimeRegistry "github.com/oasisprotocol/oasis-core/go/runtime/registry"
)

const crashPointBlockBeforeIndex = "roothash.before_index"
const (
crashPointBlockBeforeIndex = "roothash.before_index"
reindexWriteBatchSize = 1000
)

// ServiceClient is the roothash service client interface.
type ServiceClient interface {
Expand Down Expand Up @@ -365,7 +368,7 @@ func (sc *serviceClient) reindexBlocks(ctx context.Context, currentHeight int64,
var err error
var lastHeight int64
if lastHeight, err = bh.LastConsensusHeight(); err != nil {
sc.logger.Error("failed to get last indexed height",
logger.Error("failed to get last indexed height",
"err", err,
)
return 0, fmt.Errorf("failed to get last indexed height: %w", err)
Expand Down Expand Up @@ -396,15 +399,64 @@ func (sc *serviceClient) reindexBlocks(ctx context.Context, currentHeight int64,
}

// Scan all blocks between last indexed height and current height.
logger.Debug("reindexing blocks",
"last_indexed_height", lastHeight,
logger.Info("reindexing blocks",
"last_height", lastHeight,
"current_height", currentHeight,
logging.LogEvent, api.LogEventHistoryReindexing,
)

for height := lastHeight; height <= currentHeight; height++ {
var results *cmtrpctypes.ResultBlockResults
results, err = sc.backend.GetBlockResults(ctx, height)
for height := lastHeight; height <= currentHeight; height += reindexWriteBatchSize {
end := height + reindexWriteBatchSize - 1
if end > currentHeight {
end = currentHeight
}
last, err := sc.reindexBatch(ctx, runtimeID, bh, height, end)
if err != nil {
return 0, fmt.Errorf("failed to reindex batch: %w", err)
}
if last != api.RoundInvalid {
// New rounds indexed.
lastRound = last
}
}

if lastRound == api.RoundInvalid {
logger.Debug("no new round reindexed, return latest known round")
switch blk, err := bh.GetCommittedBlock(ctx, api.RoundLatest); err {
case api.ErrNotFound:
case nil:
lastRound = blk.Header.Round
default:
return lastRound, fmt.Errorf("failed to get latest block: %w", err)
}
}

logger.Info("block reindex complete",
"last_round", lastRound,
)

return lastRound, nil
}

func (sc *serviceClient) reindexBatch(
ctx context.Context,
runtimeID common.Namespace,
bh api.BlockHistory,
start int64,
end int64,
) (uint64, error) {
logger := sc.logger.With("runtime_id", runtimeID)

logger.Debug("reindexing batch",
"start", start,
"end", end,
)

lastRound := api.RoundInvalid
var blocks []*api.AnnotatedBlock
var roundResults []*api.RoundResults
for height := start; height <= end; height++ {
results, err := sc.backend.GetBlockResults(ctx, height)
if err != nil {
// XXX: could soft-fail first few heights in case more heights were
// pruned right after the GetLastRetainedVersion query.
Expand Down Expand Up @@ -467,40 +519,31 @@ func (sc *serviceClient) reindexBlocks(ctx context.Context, currentHeight int64,
continue
}

annBlk, roundResults, err := sc.fetchFinalizedRound(ctx, height, runtimeID, &ev.Round)
annBlk, rr, err := sc.fetchFinalizedRound(ctx, height, runtimeID, &ev.Round)
if err != nil {
return 0, fmt.Errorf("failed to fetch roothash finalized round: %w", err)
}
err = bh.Commit(annBlk, roundResults, false)
if err != nil {
sc.logger.Error("failed to commit block to history keeper",
"err", err,
"runtime_id", runtimeID,
"height", height,
"round", annBlk.Block.Header.Round,
)
return 0, fmt.Errorf("failed to commit block to history keeper: %w", err)
}
blocks = append(blocks, annBlk)
roundResults = append(roundResults, rr)
logger.Debug("block added to batch",
"height", height,
"round", annBlk.Block.Header.Round,
)

lastRound = ev.Round
}
}

if lastRound == api.RoundInvalid {
sc.logger.Debug("no new round reindexed, return latest known round")
switch blk, err := bh.GetCommittedBlock(ctx, api.RoundLatest); err {
case api.ErrNotFound:
case nil:
lastRound = blk.Header.Round
default:
return lastRound, fmt.Errorf("failed to get latest block: %w", err)
}
// Do not notify watchers during history reindex.
err := bh.CommitBatch(blocks, roundResults, false)
if err != nil {
logger.Error("failed to commit batch",
"err", err,
"start", start,
"end", end,
)
return 0, fmt.Errorf("failed to commit batch: %w", err)
}

sc.logger.Debug("block reindex complete",
"last_round", lastRound,
)

return lastRound, nil
}

Expand Down

0 comments on commit f418cd1

Please sign in to comment.