From aff6166422f2a95fd5adf91ab32e81d22f53b5cb Mon Sep 17 00:00:00 2001 From: Martin Tomazic Date: Mon, 10 Feb 2025 14:38:35 +0100 Subject: [PATCH] go/roothash: Batch writting runtime blocks during reindex --- .changelog/5738.feature.md | 4 + go/consensus/cometbft/roothash/roothash.go | 98 +++++++++++++++------- go/roothash/api/history.go | 7 ++ go/runtime/history/db.go | 53 ++++++++++++ go/runtime/history/history.go | 8 ++ 5 files changed, 140 insertions(+), 30 deletions(-) create mode 100644 .changelog/5738.feature.md diff --git a/.changelog/5738.feature.md b/.changelog/5738.feature.md new file mode 100644 index 00000000000..05953c0f711 --- /dev/null +++ b/.changelog/5738.feature.md @@ -0,0 +1,4 @@ +go/roothahs: Optimize runtime history reindex + +During runtime history reindex, we batch 1000 writes at the same time, +resulting in 2x speed-up of history reindex. diff --git a/go/consensus/cometbft/roothash/roothash.go b/go/consensus/cometbft/roothash/roothash.go index 3c35d58c709..4db66c54fc9 100644 --- a/go/consensus/cometbft/roothash/roothash.go +++ b/go/consensus/cometbft/roothash/roothash.go @@ -29,7 +29,10 @@ import ( runtimeRegistry "github.com/oasisprotocol/oasis-core/go/runtime/registry" ) -const crashPointBlockBeforeIndex = "roothash.before_index" +const ( + crashPointBlockBeforeIndex = "roothash.before_index" + batchSize = 1000 +) // ServiceClient is the roothash service client interface. type ServiceClient interface { @@ -436,13 +439,61 @@ func (sc *serviceClient) reindexBlocks(ctx context.Context, currentHeight int64, logging.LogEvent, api.LogEventHistoryReindexing, ) - for height := lastHeight; height <= currentHeight; height++ { + for height := lastHeight; height <= currentHeight; height += batchSize { + end := height + batchSize - 1 + if end > currentHeight { + end = currentHeight + } + last, err := sc.reindexBatch(ctx, runtimeID, bh, height, end) + if err != nil { + return 0, fmt.Errorf("failed to commit batch to history keeper: %w", err) + } + if last != api.RoundInvalid && last > lastRound { + lastRound = last + } + } + + if lastRound == api.RoundInvalid { + sc.logger.Debug("no new round reindexed, return latest known round") + switch blk, err := bh.GetCommittedBlock(sc.ctx, api.RoundLatest); err { + case api.ErrNotFound: + case nil: + lastRound = blk.Header.Round + default: + return lastRound, fmt.Errorf("failed to get latest block: %w", err) + } + } + + sc.logger.Debug("block reindex complete", + "last_round", lastRound, + ) + + return lastRound, nil +} + +func (sc *serviceClient) reindexBatch( + ctx context.Context, + runtimeID common.Namespace, + bh api.BlockHistory, + start int64, + end int64, +) (uint64, error) { + sc.logger.Debug("reindexing batch", + "runtime_id", runtimeID, + "batch_start", start, + "batch_end", end, + ) + + lastRound := api.RoundInvalid + var blocks []*api.AnnotatedBlock + var roundResults []*api.RoundResults + for height := start; height <= end; height++ { var results *cmtrpctypes.ResultBlockResults - results, err = sc.backend.GetBlockResults(sc.ctx, height) + results, err := sc.backend.GetBlockResults(sc.ctx, height) if err != nil { // XXX: could soft-fail first few heights in case more heights were // pruned right after the GetLastRetainedVersion query. - logger.Error("failed to get cometbft block results", + sc.logger.Error("failed to get cometbft block results", "err", err, "height", height, ) @@ -481,7 +532,7 @@ func (sc *serviceClient) reindexBlocks(ctx context.Context, currentHeight int64, case eventsAPI.IsAttributeKind(key, &api.FinalizedEvent{}): var e api.FinalizedEvent if err = eventsAPI.DecodeValue(val, &e); err != nil { - logger.Error("failed to unmarshal finalized event", + sc.logger.Error("failed to unmarshal finalized event", "err", err, "height", height, ) @@ -501,40 +552,27 @@ func (sc *serviceClient) reindexBlocks(ctx context.Context, currentHeight int64, continue } - annBlk, roundResults, err := sc.fetchFinalizedRound(ctx, height, runtimeID, &ev.Round) + annBlk, rr, err := sc.fetchFinalizedRound(ctx, height, runtimeID, &ev.Round) if err != nil { return 0, fmt.Errorf("failed to fetch roothash finalized round: %w", err) } - err = bh.Commit(annBlk, roundResults, false) - if err != nil { - sc.logger.Error("failed to commit block to history keeper", - "err", err, - "runtime_id", runtimeID, - "height", height, - "round", annBlk.Block.Header.Round, - ) - return 0, fmt.Errorf("failed to commit block to history keeper: %w", err) - } + blocks = append(blocks, annBlk) + roundResults = append(roundResults, rr) lastRound = ev.Round } } - if lastRound == api.RoundInvalid { - sc.logger.Debug("no new round reindexed, return latest known round") - switch blk, err := bh.GetCommittedBlock(sc.ctx, api.RoundLatest); err { - case api.ErrNotFound: - case nil: - lastRound = blk.Header.Round - default: - return lastRound, fmt.Errorf("failed to get latest block: %w", err) - } + err := bh.CommitBatch(blocks, roundResults) + if err != nil { + sc.logger.Error("failed to commit batch to history keeper", + "err", err, + "runtime_id", runtimeID, + "batch_start", start, + "batch_end", end, + ) + return 0, err } - - sc.logger.Debug("block reindex complete", - "last_round", lastRound, - ) - return lastRound, nil } diff --git a/go/roothash/api/history.go b/go/roothash/api/history.go index 8334e2c3a0e..747dff41ca9 100644 --- a/go/roothash/api/history.go +++ b/go/roothash/api/history.go @@ -25,6 +25,13 @@ type BlockHistory interface { // Must be called in order, sorted by round. Commit(blk *AnnotatedBlock, roundResults *RoundResults, notify bool) error + // CommitBatch commits annotated blocks and corresponding round results, + // into runtime history. + // + // Watcher will not be notified about new blocks since this method is only + // meant to be used during reindex. + CommitBatch(blks []*AnnotatedBlock, roundResults []*RoundResults) error + // ConsensusCheckpoint records the last consensus height which was processed // by the roothash backend. // diff --git a/go/runtime/history/db.go b/go/runtime/history/db.go index 23041255876..134fa60dede 100644 --- a/go/runtime/history/db.go +++ b/go/runtime/history/db.go @@ -214,6 +214,59 @@ func (d *DB) commit(blk *roothash.AnnotatedBlock, roundResults *roothash.RoundRe }) } +func (d *DB) commitBatch(blks []*roothash.AnnotatedBlock, roundResults []*roothash.RoundResults) error { + if len(blks) != len(roundResults) { + return fmt.Errorf("runtime/history: got %d annotated blocks and %d roundResults, but should be equal", len(blks), len(roundResults)) + } + if len(blks) == 0 { + return nil + } + + return d.db.Update(func(tx *badger.Txn) error { + meta, err := d.queryGetMetadata(tx) + if err != nil { + return err + } + + for i, blk := range blks { + rtID := blk.Block.Header.Namespace + if !rtID.Equal(&meta.RuntimeID) { + return fmt.Errorf("runtime/history: runtime mismatch (expected: %s got: %s)", + meta.RuntimeID, + rtID, + ) + } + + if blk.Height < meta.LastConsensusHeight { + return fmt.Errorf("runtime/history: commit at lower consensus height (current: %d wanted: %d)", + meta.LastConsensusHeight, + blk.Height, + ) + } + + if blk.Block.Header.Round <= meta.LastRound && meta.LastConsensusHeight != 0 { + return fmt.Errorf("runtime/history: commit at lower round (current: %d wanted: %d)", + meta.LastRound, + blk.Block.Header.Round, + ) + } + + if err := tx.Set(blockKeyFmt.Encode(blk.Block.Header.Round), cbor.Marshal(blk)); err != nil { + return err + } + + if err := tx.Set(roundResultsKeyFmt.Encode(blk.Block.Header.Round), cbor.Marshal(roundResults[i])); err != nil { + return err + } + meta.LastRound = blk.Block.Header.Round + if blk.Height > meta.LastConsensusHeight { + meta.LastConsensusHeight = blk.Height + } + } + return tx.Set(metadataKeyFmt.Encode(), cbor.Marshal(meta)) + }) +} + func (d *DB) getBlock(round uint64) (*roothash.AnnotatedBlock, error) { var blk roothash.AnnotatedBlock txErr := d.db.View(func(tx *badger.Txn) error { diff --git a/go/runtime/history/history.go b/go/runtime/history/history.go index 076f6d1086a..2160e35cde0 100644 --- a/go/runtime/history/history.go +++ b/go/runtime/history/history.go @@ -54,6 +54,10 @@ func (h *nopHistory) Commit(*roothash.AnnotatedBlock, *roothash.RoundResults, bo return errNopHistory } +func (h *nopHistory) CommitBatch([]*roothash.AnnotatedBlock, []*roothash.RoundResults) error { + return errNopHistory +} + func (h *nopHistory) ConsensusCheckpoint(int64) error { return errNopHistory } @@ -157,6 +161,10 @@ func (h *runtimeHistory) Commit(blk *roothash.AnnotatedBlock, roundResults *root return nil } +func (h *runtimeHistory) CommitBatch(blks []*roothash.AnnotatedBlock, roundResults []*roothash.RoundResults) error { + return h.db.commitBatch(blks, roundResults) +} + func (h *runtimeHistory) ConsensusCheckpoint(height int64) error { return h.db.consensusCheckpoint(height) }