Skip to content

Commit

Permalink
go/roothash: Batch writting runtime blocks during reindex
Browse files Browse the repository at this point in the history
  • Loading branch information
martintomazic committed Feb 11, 2025
1 parent 7e9c7b8 commit aff6166
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 30 deletions.
4 changes: 4 additions & 0 deletions .changelog/5738.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
go/roothahs: Optimize runtime history reindex

During runtime history reindex, we batch 1000 writes at the same time,
resulting in 2x speed-up of history reindex.
98 changes: 68 additions & 30 deletions go/consensus/cometbft/roothash/roothash.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ import (
runtimeRegistry "github.com/oasisprotocol/oasis-core/go/runtime/registry"
)

const crashPointBlockBeforeIndex = "roothash.before_index"
const (
crashPointBlockBeforeIndex = "roothash.before_index"
batchSize = 1000
)

// ServiceClient is the roothash service client interface.
type ServiceClient interface {
Expand Down Expand Up @@ -436,13 +439,61 @@ func (sc *serviceClient) reindexBlocks(ctx context.Context, currentHeight int64,
logging.LogEvent, api.LogEventHistoryReindexing,
)

for height := lastHeight; height <= currentHeight; height++ {
for height := lastHeight; height <= currentHeight; height += batchSize {
end := height + batchSize - 1
if end > currentHeight {
end = currentHeight
}
last, err := sc.reindexBatch(ctx, runtimeID, bh, height, end)
if err != nil {
return 0, fmt.Errorf("failed to commit batch to history keeper: %w", err)

Check warning on line 449 in go/consensus/cometbft/roothash/roothash.go

View check run for this annotation

Codecov / codecov/patch

go/consensus/cometbft/roothash/roothash.go#L449

Added line #L449 was not covered by tests
}
if last != api.RoundInvalid && last > lastRound {
lastRound = last

Check warning on line 452 in go/consensus/cometbft/roothash/roothash.go

View check run for this annotation

Codecov / codecov/patch

go/consensus/cometbft/roothash/roothash.go#L452

Added line #L452 was not covered by tests
}
}

if lastRound == api.RoundInvalid {
sc.logger.Debug("no new round reindexed, return latest known round")
switch blk, err := bh.GetCommittedBlock(sc.ctx, api.RoundLatest); err {
case api.ErrNotFound:
case nil:
lastRound = blk.Header.Round
default:
return lastRound, fmt.Errorf("failed to get latest block: %w", err)

Check warning on line 463 in go/consensus/cometbft/roothash/roothash.go

View check run for this annotation

Codecov / codecov/patch

go/consensus/cometbft/roothash/roothash.go#L462-L463

Added lines #L462 - L463 were not covered by tests
}
}

sc.logger.Debug("block reindex complete",
"last_round", lastRound,
)

return lastRound, nil
}

func (sc *serviceClient) reindexBatch(
ctx context.Context,
runtimeID common.Namespace,
bh api.BlockHistory,
start int64,
end int64,
) (uint64, error) {
sc.logger.Debug("reindexing batch",
"runtime_id", runtimeID,
"batch_start", start,
"batch_end", end,
)

lastRound := api.RoundInvalid
var blocks []*api.AnnotatedBlock
var roundResults []*api.RoundResults
for height := start; height <= end; height++ {
var results *cmtrpctypes.ResultBlockResults
results, err = sc.backend.GetBlockResults(sc.ctx, height)
results, err := sc.backend.GetBlockResults(sc.ctx, height)
if err != nil {
// XXX: could soft-fail first few heights in case more heights were
// pruned right after the GetLastRetainedVersion query.
logger.Error("failed to get cometbft block results",
sc.logger.Error("failed to get cometbft block results",

Check warning on line 496 in go/consensus/cometbft/roothash/roothash.go

View check run for this annotation

Codecov / codecov/patch

go/consensus/cometbft/roothash/roothash.go#L496

Added line #L496 was not covered by tests
"err", err,
"height", height,
)
Expand Down Expand Up @@ -481,7 +532,7 @@ func (sc *serviceClient) reindexBlocks(ctx context.Context, currentHeight int64,
case eventsAPI.IsAttributeKind(key, &api.FinalizedEvent{}):
var e api.FinalizedEvent
if err = eventsAPI.DecodeValue(val, &e); err != nil {
logger.Error("failed to unmarshal finalized event",
sc.logger.Error("failed to unmarshal finalized event",

Check warning on line 535 in go/consensus/cometbft/roothash/roothash.go

View check run for this annotation

Codecov / codecov/patch

go/consensus/cometbft/roothash/roothash.go#L535

Added line #L535 was not covered by tests
"err", err,
"height", height,
)
Expand All @@ -501,40 +552,27 @@ func (sc *serviceClient) reindexBlocks(ctx context.Context, currentHeight int64,
continue
}

annBlk, roundResults, err := sc.fetchFinalizedRound(ctx, height, runtimeID, &ev.Round)
annBlk, rr, err := sc.fetchFinalizedRound(ctx, height, runtimeID, &ev.Round)
if err != nil {
return 0, fmt.Errorf("failed to fetch roothash finalized round: %w", err)

Check warning on line 557 in go/consensus/cometbft/roothash/roothash.go

View check run for this annotation

Codecov / codecov/patch

go/consensus/cometbft/roothash/roothash.go#L555-L557

Added lines #L555 - L557 were not covered by tests
}
err = bh.Commit(annBlk, roundResults, false)
if err != nil {
sc.logger.Error("failed to commit block to history keeper",
"err", err,
"runtime_id", runtimeID,
"height", height,
"round", annBlk.Block.Header.Round,
)
return 0, fmt.Errorf("failed to commit block to history keeper: %w", err)
}
blocks = append(blocks, annBlk)
roundResults = append(roundResults, rr)

Check warning on line 560 in go/consensus/cometbft/roothash/roothash.go

View check run for this annotation

Codecov / codecov/patch

go/consensus/cometbft/roothash/roothash.go#L559-L560

Added lines #L559 - L560 were not covered by tests

lastRound = ev.Round
}
}

if lastRound == api.RoundInvalid {
sc.logger.Debug("no new round reindexed, return latest known round")
switch blk, err := bh.GetCommittedBlock(sc.ctx, api.RoundLatest); err {
case api.ErrNotFound:
case nil:
lastRound = blk.Header.Round
default:
return lastRound, fmt.Errorf("failed to get latest block: %w", err)
}
err := bh.CommitBatch(blocks, roundResults)
if err != nil {
sc.logger.Error("failed to commit batch to history keeper",
"err", err,
"runtime_id", runtimeID,
"batch_start", start,
"batch_end", end,
)
return 0, err

Check warning on line 574 in go/consensus/cometbft/roothash/roothash.go

View check run for this annotation

Codecov / codecov/patch

go/consensus/cometbft/roothash/roothash.go#L568-L574

Added lines #L568 - L574 were not covered by tests
}

sc.logger.Debug("block reindex complete",
"last_round", lastRound,
)

return lastRound, nil
}

Expand Down
7 changes: 7 additions & 0 deletions go/roothash/api/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ type BlockHistory interface {
// Must be called in order, sorted by round.
Commit(blk *AnnotatedBlock, roundResults *RoundResults, notify bool) error

// CommitBatch commits annotated blocks and corresponding round results,
// into runtime history.
//
// Watcher will not be notified about new blocks since this method is only
// meant to be used during reindex.
CommitBatch(blks []*AnnotatedBlock, roundResults []*RoundResults) error

// ConsensusCheckpoint records the last consensus height which was processed
// by the roothash backend.
//
Expand Down
53 changes: 53 additions & 0 deletions go/runtime/history/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,59 @@ func (d *DB) commit(blk *roothash.AnnotatedBlock, roundResults *roothash.RoundRe
})
}

func (d *DB) commitBatch(blks []*roothash.AnnotatedBlock, roundResults []*roothash.RoundResults) error {
if len(blks) != len(roundResults) {
return fmt.Errorf("runtime/history: got %d annotated blocks and %d roundResults, but should be equal", len(blks), len(roundResults))

Check warning on line 219 in go/runtime/history/db.go

View check run for this annotation

Codecov / codecov/patch

go/runtime/history/db.go#L219

Added line #L219 was not covered by tests
}
if len(blks) == 0 {
return nil
}

return d.db.Update(func(tx *badger.Txn) error {
meta, err := d.queryGetMetadata(tx)
if err != nil {
return err

Check warning on line 228 in go/runtime/history/db.go

View check run for this annotation

Codecov / codecov/patch

go/runtime/history/db.go#L225-L228

Added lines #L225 - L228 were not covered by tests
}

for i, blk := range blks {
rtID := blk.Block.Header.Namespace
if !rtID.Equal(&meta.RuntimeID) {
return fmt.Errorf("runtime/history: runtime mismatch (expected: %s got: %s)",
meta.RuntimeID,
rtID,
)

Check warning on line 237 in go/runtime/history/db.go

View check run for this annotation

Codecov / codecov/patch

go/runtime/history/db.go#L231-L237

Added lines #L231 - L237 were not covered by tests
}

if blk.Height < meta.LastConsensusHeight {
return fmt.Errorf("runtime/history: commit at lower consensus height (current: %d wanted: %d)",
meta.LastConsensusHeight,
blk.Height,
)

Check warning on line 244 in go/runtime/history/db.go

View check run for this annotation

Codecov / codecov/patch

go/runtime/history/db.go#L240-L244

Added lines #L240 - L244 were not covered by tests
}

if blk.Block.Header.Round <= meta.LastRound && meta.LastConsensusHeight != 0 {
return fmt.Errorf("runtime/history: commit at lower round (current: %d wanted: %d)",
meta.LastRound,
blk.Block.Header.Round,
)

Check warning on line 251 in go/runtime/history/db.go

View check run for this annotation

Codecov / codecov/patch

go/runtime/history/db.go#L247-L251

Added lines #L247 - L251 were not covered by tests
}

if err := tx.Set(blockKeyFmt.Encode(blk.Block.Header.Round), cbor.Marshal(blk)); err != nil {
return err

Check warning on line 255 in go/runtime/history/db.go

View check run for this annotation

Codecov / codecov/patch

go/runtime/history/db.go#L254-L255

Added lines #L254 - L255 were not covered by tests
}

if err := tx.Set(roundResultsKeyFmt.Encode(blk.Block.Header.Round), cbor.Marshal(roundResults[i])); err != nil {
return err

Check warning on line 259 in go/runtime/history/db.go

View check run for this annotation

Codecov / codecov/patch

go/runtime/history/db.go#L258-L259

Added lines #L258 - L259 were not covered by tests
}
meta.LastRound = blk.Block.Header.Round
if blk.Height > meta.LastConsensusHeight {
meta.LastConsensusHeight = blk.Height

Check warning on line 263 in go/runtime/history/db.go

View check run for this annotation

Codecov / codecov/patch

go/runtime/history/db.go#L261-L263

Added lines #L261 - L263 were not covered by tests
}
}
return tx.Set(metadataKeyFmt.Encode(), cbor.Marshal(meta))

Check warning on line 266 in go/runtime/history/db.go

View check run for this annotation

Codecov / codecov/patch

go/runtime/history/db.go#L266

Added line #L266 was not covered by tests
})
}

func (d *DB) getBlock(round uint64) (*roothash.AnnotatedBlock, error) {
var blk roothash.AnnotatedBlock
txErr := d.db.View(func(tx *badger.Txn) error {
Expand Down
8 changes: 8 additions & 0 deletions go/runtime/history/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ func (h *nopHistory) Commit(*roothash.AnnotatedBlock, *roothash.RoundResults, bo
return errNopHistory
}

func (h *nopHistory) CommitBatch([]*roothash.AnnotatedBlock, []*roothash.RoundResults) error {
return errNopHistory

Check warning on line 58 in go/runtime/history/history.go

View check run for this annotation

Codecov / codecov/patch

go/runtime/history/history.go#L57-L58

Added lines #L57 - L58 were not covered by tests
}

func (h *nopHistory) ConsensusCheckpoint(int64) error {
return errNopHistory
}
Expand Down Expand Up @@ -157,6 +161,10 @@ func (h *runtimeHistory) Commit(blk *roothash.AnnotatedBlock, roundResults *root
return nil
}

func (h *runtimeHistory) CommitBatch(blks []*roothash.AnnotatedBlock, roundResults []*roothash.RoundResults) error {
return h.db.commitBatch(blks, roundResults)
}

func (h *runtimeHistory) ConsensusCheckpoint(height int64) error {
return h.db.consensusCheckpoint(height)
}
Expand Down

0 comments on commit aff6166

Please sign in to comment.