Skip to content

Commit

Permalink
go/consensus/cometbft/roothash: Decouple reindex from event processing
Browse files Browse the repository at this point in the history
  • Loading branch information
martintomazic committed Feb 11, 2025
1 parent 15f2400 commit 7e9c7b8
Showing 1 changed file with 21 additions and 14 deletions.
35 changes: 21 additions & 14 deletions go/consensus/cometbft/roothash/roothash.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ func (sc *serviceClient) getRuntimeNotifiers(id common.Namespace) *runtimeBroker
return notifiers
}

func (sc *serviceClient) reindexBlocks(currentHeight int64, bh api.BlockHistory) (uint64, error) {
func (sc *serviceClient) reindexBlocks(ctx context.Context, currentHeight int64, bh api.BlockHistory) (uint64, error) {
lastRound := api.RoundInvalid
if currentHeight <= 0 {
return lastRound, nil
Expand Down Expand Up @@ -500,9 +500,22 @@ func (sc *serviceClient) reindexBlocks(currentHeight int64, bh api.BlockHistory)
if !evRtID.Equal(&runtimeID) {
continue
}
if err = sc.processFinalizedEvent(sc.ctx, height, *evRtID, &ev.Round, true); err != nil {
return 0, fmt.Errorf("failed to process finalized event: %w", err)

annBlk, roundResults, err := sc.fetchFinalizedRound(ctx, height, runtimeID, &ev.Round)
if err != nil {
return 0, fmt.Errorf("failed to fetch roothash finalized round: %w", err)
}
err = bh.Commit(annBlk, roundResults, false)
if err != nil {
sc.logger.Error("failed to commit block to history keeper",
"err", err,
"runtime_id", runtimeID,
"height", height,
"round", annBlk.Block.Header.Round,
)
return 0, fmt.Errorf("failed to commit block to history keeper: %w", err)
}

lastRound = ev.Round
}
}
Expand Down Expand Up @@ -573,7 +586,7 @@ func (sc *serviceClient) DeliverCommand(ctx context.Context, height int64, cmd i
}

// Emit latest block.
if err := sc.processFinalizedEvent(ctx, rs.LastBlockHeight, tr.runtimeID, nil, false); err != nil {
if err := sc.processFinalizedEvent(ctx, rs.LastBlockHeight, tr.runtimeID, nil); err != nil {
sc.logger.Warn("failed to emit latest block",
"err", err,
"runtime_id", tr.runtimeID,
Expand Down Expand Up @@ -607,7 +620,7 @@ func (sc *serviceClient) DeliverEvent(ctx context.Context, height int64, tx cmtt
if sc.trackedRuntime[ev.RuntimeID] == nil {
continue
}
if err = sc.processFinalizedEvent(ctx, height, ev.RuntimeID, &ev.Finalized.Round, false); err != nil { //nolint:gosec
if err = sc.processFinalizedEvent(ctx, height, ev.RuntimeID, &ev.Finalized.Round); err != nil { //nolint:gosec
return fmt.Errorf("roothash: failed to process finalized event: %w", err)
}
}
Expand All @@ -626,7 +639,6 @@ func (sc *serviceClient) processFinalizedEvent(
height int64,
runtimeID common.Namespace,
round *uint64,
isReindex bool,
) (err error) {
tr := sc.trackedRuntime[runtimeID]
if tr == nil {
Expand Down Expand Up @@ -661,10 +673,10 @@ func (sc *serviceClient) processFinalizedEvent(

// Perform reindex if required.
lastRound := api.RoundInvalid
if !isReindex && !tr.reindexDone {
if !tr.reindexDone {
// Note that we need to reindex up to the previous height as the current height is
// already being processed right now.
if lastRound, err = sc.reindexBlocks(height-1, tr.blockHistory); err != nil {
if lastRound, err = sc.reindexBlocks(ctx, height-1, tr.blockHistory); err != nil {
sc.logger.Error("failed to reindex blocks",
"err", err,
"runtime_id", runtimeID,
Expand All @@ -684,7 +696,7 @@ func (sc *serviceClient) processFinalizedEvent(
"round", annBlk.Block.Header.Round,
)

err = tr.blockHistory.Commit(annBlk, roundResults, !isReindex)
err = tr.blockHistory.Commit(annBlk, roundResults, true)
if err != nil {
sc.logger.Error("failed to commit block to history keeper",
"err", err,
Expand All @@ -697,11 +709,6 @@ func (sc *serviceClient) processFinalizedEvent(
}
}

// Skip emitting events if we are reindexing.
if isReindex {
return nil
}

notifiers := sc.getRuntimeNotifiers(runtimeID)
// Ensure latest block is set.
notifiers.Lock()
Expand Down

0 comments on commit 7e9c7b8

Please sign in to comment.