diff --git a/.changelog/6079.bugfix.md b/.changelog/6079.bugfix.md new file mode 100644 index 00000000000..de3b096ba72 --- /dev/null +++ b/.changelog/6079.bugfix.md @@ -0,0 +1,9 @@ +go/runtime/history: Ensure `WatchBlocks` emits sequential round sequence + +Previously, if `WatchBlocks` was called on a node without local storage, +blocks were not emitted during reindexing, causing gaps in the round sequence +for clients. + +Existing behaviour has been improved, so that during initial reindex, +subscribers are still not notified, however during any subsequent reindex, +all block rounds are emitted. diff --git a/go/consensus/cometbft/roothash/roothash.go b/go/consensus/cometbft/roothash/roothash.go index 823d8dcf58f..c7340e1ab3b 100644 --- a/go/consensus/cometbft/roothash/roothash.go +++ b/go/consensus/cometbft/roothash/roothash.go @@ -51,9 +51,10 @@ type runtimeBrokers struct { type trackedRuntime struct { runtimeID common.Namespace - height int64 - blockHistory api.BlockHistory - reindexDone bool + height int64 + blockHistory api.BlockHistory + reindexDone bool + initialReindexDone bool } type cmdTrackRuntime struct { @@ -702,6 +703,10 @@ func (sc *serviceClient) processFinalizedEvent( return fmt.Errorf("failed to reindex blocks: %w", err) } tr.reindexDone = true + if !tr.initialReindexDone { + tr.initialReindexDone = true + tr.blockHistory.ReindexFinished() + } } // Only commit the block in case it was not already committed during reindex. Note that even @@ -714,7 +719,7 @@ func (sc *serviceClient) processFinalizedEvent( "round", blk.Header.Round, ) - err = tr.blockHistory.Commit(annBlk, roundResults, !isReindex) + err = tr.blockHistory.Commit(annBlk, roundResults) if err != nil { sc.logger.Error("failed to commit block to history keeper", "err", err, diff --git a/go/roothash/api/history.go b/go/roothash/api/history.go index 8334e2c3a0e..b1b27c0811c 100644 --- a/go/roothash/api/history.go +++ b/go/roothash/api/history.go @@ -4,7 +4,6 @@ import ( "context" "github.com/oasisprotocol/oasis-core/go/common" - "github.com/oasisprotocol/oasis-core/go/common/pubsub" "github.com/oasisprotocol/oasis-core/go/roothash/api/block" ) @@ -18,33 +17,10 @@ type BlockHistory interface { // RuntimeID returns the runtime ID of the runtime this block history is for. RuntimeID() common.Namespace - // Commit commits an annotated block into history. If notify is set to true, - // the watchers will be notified about the new block. Disable notify when - // doing reindexing. + // Commit commits an annotated block into history. // // Must be called in order, sorted by round. - Commit(blk *AnnotatedBlock, roundResults *RoundResults, notify bool) error - - // ConsensusCheckpoint records the last consensus height which was processed - // by the roothash backend. - // - // This method can only be called once all roothash blocks for consensus - // heights <= height have been committed using Commit. - ConsensusCheckpoint(height int64) error - - // StorageSyncCheckpoint records the last storage round which was synced - // to runtime storage. - StorageSyncCheckpoint(round uint64) error - - // LastStorageSyncedRound returns the last runtime round which was synced to storage. - LastStorageSyncedRound() (uint64, error) - - // WatchBlocks returns a channel watching block rounds as they are committed. - // If node has local storage this includes waiting for the round to be synced into storage. - WatchBlocks() (<-chan *AnnotatedBlock, pubsub.ClosableSubscription, error) - - // WaitRoundSynced waits for the specified round to be synced to storage. - WaitRoundSynced(ctx context.Context, round uint64) (uint64, error) + Commit(blk *AnnotatedBlock, roundResults *RoundResults) error // LastConsensusHeight returns the last consensus height which was seen // by block history. @@ -56,22 +32,8 @@ type BlockHistory interface { // This method can return blocks not yet synced to storage. GetCommittedBlock(ctx context.Context, round uint64) (*block.Block, error) - // GetBlock returns the block at a specific round. - // Passing the special value `RoundLatest` will return the latest block. - // - // This method returns blocks that are both committed and synced to storage. - GetBlock(ctx context.Context, round uint64) (*block.Block, error) - - // GetAnnotatedBlock returns the annotated block at a specific round. - // - // Passing the special value `RoundLatest` will return the latest annotated block. - GetAnnotatedBlock(ctx context.Context, round uint64) (*AnnotatedBlock, error) - - // GetEarliestBlock returns the earliest known block. - GetEarliestBlock(ctx context.Context) (*block.Block, error) - - // GetRoundResults returns the round results for the given round. + // ReindexFinished marks an initial history reindex has finished. // - // Passing the special value `RoundLatest` will return results for the latest round. - GetRoundResults(ctx context.Context, round uint64) (*RoundResults, error) + // Calling this methods more then once has no additional side effect. + ReindexFinished() } diff --git a/go/runtime/history/db.go b/go/runtime/history/db.go index 23041255876..135d01bfd6e 100644 --- a/go/runtime/history/db.go +++ b/go/runtime/history/db.go @@ -149,25 +149,6 @@ func (d *DB) metadata() (*dbMetadata, error) { return meta, nil } -func (d *DB) consensusCheckpoint(height int64) error { - return d.db.Update(func(tx *badger.Txn) error { - meta, err := d.queryGetMetadata(tx) - if err != nil { - return err - } - - if height < meta.LastConsensusHeight { - return fmt.Errorf("runtime/history: consensus checkpoint at lower height (current: %d wanted: %d)", - meta.LastConsensusHeight, - height, - ) - } - - meta.LastConsensusHeight = height - return tx.Set(metadataKeyFmt.Encode(), cbor.Marshal(meta)) - }) -} - func (d *DB) commit(blk *roothash.AnnotatedBlock, roundResults *roothash.RoundResults) error { return d.db.Update(func(tx *badger.Txn) error { meta, err := d.queryGetMetadata(tx) diff --git a/go/runtime/history/history.go b/go/runtime/history/history.go index 076f6d1086a..e2f73a67be1 100644 --- a/go/runtime/history/history.go +++ b/go/runtime/history/history.go @@ -3,7 +3,6 @@ package history import ( "context" - "errors" "fmt" "path/filepath" "sync" @@ -22,11 +21,7 @@ import ( // DbFilename is the filename of the history database. const DbFilename = "history.db" -var ( - errNopHistory = errors.New("runtime/history: not supported") - - _ History = (*runtimeHistory)(nil) -) +var _ History = (*runtimeHistory)(nil) // Factory is the runtime history factory interface. type Factory func(runtimeID common.Namespace, dataDir string) (History, error) @@ -35,80 +30,48 @@ type Factory func(runtimeID common.Namespace, dataDir string) (History, error) type History interface { roothash.BlockHistory - // Pruner returns the history pruner. - Pruner() Pruner - - // Close closes the history keeper. - Close() -} - -type nopHistory struct { - runtimeID common.Namespace -} + // StorageSyncCheckpoint records the last storage round which was synced + // to runtime storage. + StorageSyncCheckpoint(round uint64) error -func (h *nopHistory) RuntimeID() common.Namespace { - return h.runtimeID -} + // LastStorageSyncedRound returns the last runtime round which was synced to storage. + LastStorageSyncedRound() (uint64, error) -func (h *nopHistory) Commit(*roothash.AnnotatedBlock, *roothash.RoundResults, bool) error { - return errNopHistory -} + // WatchBlocks returns a channel watching block rounds as they are committed. + // + // If node has local storage this includes waiting for the round to be synced into storage. + // + // If node has no local storage, we only notify blocks that were committed + // after ReindexFinished was called. + WatchBlocks() (<-chan *roothash.AnnotatedBlock, pubsub.ClosableSubscription, error) -func (h *nopHistory) ConsensusCheckpoint(int64) error { - return errNopHistory -} + // WaitRoundSynced waits for the specified round to be synced to storage. + WaitRoundSynced(ctx context.Context, round uint64) (uint64, error) -func (h *nopHistory) StorageSyncCheckpoint(uint64) error { - return errNopHistory -} + // GetBlock returns the block at a specific round. + // Passing the special value `RoundLatest` will return the latest block. + // + // This method returns blocks that are both committed and synced to storage. + GetBlock(ctx context.Context, round uint64) (*block.Block, error) -func (h *nopHistory) LastStorageSyncedRound() (uint64, error) { - return 0, errNopHistory -} + // GetAnnotatedBlock returns the annotated block at a specific round. + // + // Passing the special value `RoundLatest` will return the latest annotated block. + GetAnnotatedBlock(ctx context.Context, round uint64) (*roothash.AnnotatedBlock, error) -func (h *nopHistory) WatchBlocks() (<-chan *roothash.AnnotatedBlock, pubsub.ClosableSubscription, error) { - return nil, nil, errNopHistory -} + // GetEarliestBlock returns the earliest known block. + GetEarliestBlock(ctx context.Context) (*block.Block, error) -func (h *nopHistory) WaitRoundSynced(context.Context, uint64) (uint64, error) { - return 0, errNopHistory -} + // GetRoundResults returns the round results for the given round. + // + // Passing the special value `RoundLatest` will return results for the latest round. + GetRoundResults(ctx context.Context, round uint64) (*roothash.RoundResults, error) -func (h *nopHistory) LastConsensusHeight() (int64, error) { - return 0, errNopHistory -} - -func (h *nopHistory) GetCommittedBlock(context.Context, uint64) (*block.Block, error) { - return nil, errNopHistory -} - -func (h *nopHistory) GetBlock(context.Context, uint64) (*block.Block, error) { - return nil, errNopHistory -} - -func (h *nopHistory) GetAnnotatedBlock(context.Context, uint64) (*roothash.AnnotatedBlock, error) { - return nil, errNopHistory -} - -func (h *nopHistory) GetEarliestBlock(context.Context) (*block.Block, error) { - return nil, errNopHistory -} - -func (h *nopHistory) GetRoundResults(context.Context, uint64) (*roothash.RoundResults, error) { - return nil, errNopHistory -} - -func (h *nopHistory) Pruner() Pruner { - pruner, _ := NewNonePrunerFactory()(h.runtimeID, nil) - return pruner -} - -func (h *nopHistory) Close() { -} + // Pruner returns the history pruner. + Pruner() Pruner -// NewNop creates a new no-op runtime history keeper. -func NewNop(runtimeID common.Namespace) History { - return &nopHistory{runtimeID: runtimeID} + // Close closes the history keeper. + Close() } type runtimeHistory struct { @@ -122,6 +85,9 @@ type runtimeHistory struct { db *DB blocksNotifier *pubsub.Broker + syncReindexDone sync.RWMutex + reindexDone bool + // Last storage synced round as reported by the storage backend (if enabled). syncRoundLock sync.RWMutex lastStorageSyncedRound uint64 @@ -138,7 +104,7 @@ func (h *runtimeHistory) RuntimeID() common.Namespace { return h.runtimeID } -func (h *runtimeHistory) Commit(blk *roothash.AnnotatedBlock, roundResults *roothash.RoundResults, notify bool) error { +func (h *runtimeHistory) Commit(blk *roothash.AnnotatedBlock, roundResults *roothash.RoundResults) error { err := h.db.commit(blk, roundResults) if err != nil { return err @@ -147,9 +113,10 @@ func (h *runtimeHistory) Commit(blk *roothash.AnnotatedBlock, roundResults *root // Notify the pruner what the new round is. h.pruneCh.In() <- blk.Block.Header.Round - // If no local storage worker, notify the block watcher that new block is committed, - // otherwise the storage-sync-checkpoint will do the notification. - if h.hasLocalStorage || !notify { + // If no local storage worker, and not during initial history reindex, + // notify the block watcher that new block is committed. + // Otherwise the storage-sync-checkpoint will do the notification. + if h.hasLocalStorage || !h.reindexDone { return nil } h.blocksNotifier.Broadcast(blk) @@ -157,8 +124,10 @@ func (h *runtimeHistory) Commit(blk *roothash.AnnotatedBlock, roundResults *root return nil } -func (h *runtimeHistory) ConsensusCheckpoint(height int64) error { - return h.db.consensusCheckpoint(height) +func (h *runtimeHistory) ReindexFinished() { + h.syncReindexDone.Lock() + defer h.syncReindexDone.Unlock() + h.reindexDone = true } func (h *runtimeHistory) StorageSyncCheckpoint(round uint64) error { diff --git a/go/runtime/history/history_test.go b/go/runtime/history/history_test.go index 34a25a5c1e0..875a1875455 100644 --- a/go/runtime/history/history_test.go +++ b/go/runtime/history/history_test.go @@ -55,17 +55,8 @@ func TestHistory(t *testing.T) { require.Error(err, "GetBlock(RoundLatest) should fail for no indexed block") require.Equal(roothash.ErrNotFound, err) - err = history.ConsensusCheckpoint(42) - require.NoError(err, "ConsensusCheckpoint") - err = history.ConsensusCheckpoint(40) - require.Error(err, "ConsensusCheckpoint should fail for lower height") - - lastHeight, err = history.LastConsensusHeight() - require.NoError(err, "LastConsensusHeight") - require.EqualValues(42, lastHeight) - blk := roothash.AnnotatedBlock{ - Height: 40, + Height: 50, Block: block.NewGenesisBlock(runtimeID, 0), } blk.Block.Header.Round = 10 @@ -77,22 +68,26 @@ func TestHistory(t *testing.T) { }, } - err = history.Commit(&blk, roundResults, true) - require.Error(err, "Commit should fail for lower consensus height") - - blk.Height = 50 copy(blk.Block.Header.Namespace[:], runtimeID2[:]) - err = history.Commit(&blk, roundResults, true) + err = history.Commit(&blk, roundResults) require.Error(err, "Commit should fail for different runtime") copy(blk.Block.Header.Namespace[:], runtimeID[:]) - err = history.Commit(&blk, roundResults, true) + err = history.Commit(&blk, roundResults) require.NoError(err, "Commit") + + blk2 := roothash.AnnotatedBlock{ + Height: 40, + Block: block.NewGenesisBlock(runtimeID, 0), + } + err = history.Commit(&blk2, roundResults) + require.Error(err, "Commit should fail for lower consensus height") + putBlk := *blk.Block - err = history.Commit(&blk, roundResults, true) + err = history.Commit(&blk, roundResults) require.Error(err, "Commit should fail for the same round") blk.Block.Header.Round = 5 - err = history.Commit(&blk, roundResults, true) + err = history.Commit(&blk, roundResults) require.Error(err, "Commit should fail for a lower round") blk.Block.Header.Round = 10 @@ -176,6 +171,8 @@ func TestHistory(t *testing.T) { } func testWatchBlocks(t *testing.T, history History, expectedRound uint64) { + t.Helper() + require := require.New(t) ch, sub, err := history.WatchBlocks() @@ -210,39 +207,56 @@ func TestWatchBlocks(t *testing.T) { runtimeID := common.NewTestNamespaceFromSeed([]byte("history test ns 1"), 0) - // Test history with local storage. - prunerFactory := NewNonePrunerFactory() - history, err := New(runtimeID, dataDir, prunerFactory, true) - require.NoError(err, "New") - // No blocks should be received. - testWatchBlocks(t, history, 0) - - // Commit a block. - err = history.ConsensusCheckpoint(40) - require.NoError(err, "ConsensusCheckpoint") - blk := roothash.AnnotatedBlock{ + // Sample data + blk1 := roothash.AnnotatedBlock{ Height: 40, Block: block.NewGenesisBlock(runtimeID, 0), } - blk.Block.Header.Round = 10 - roundResults := &roothash.RoundResults{ + blk1.Block.Header.Round = 10 + results1 := &roothash.RoundResults{ Messages: []*roothash.MessageEvent{ {Module: "", Code: 0, Index: 0}, {Module: "", Code: 0, Index: 1}, }, } - err = history.Commit(&blk, roundResults, true) - require.NoError(err, "Commit") + blk2 := roothash.AnnotatedBlock{ + Height: 41, + Block: block.NewGenesisBlock(runtimeID, 0), + } + blk2.Block.Header.Round = 11 + results2 := &roothash.RoundResults{ + Messages: []*roothash.MessageEvent{ + {Module: "", Code: 1, Index: 0}, + }, + } + // Test history with local storage. + prunerFactory := NewNonePrunerFactory() + history, err := New(runtimeID, dataDir, prunerFactory, true) + require.NoError(err, "New") + // No blocks should be received. + testWatchBlocks(t, history, 0) + // Commit first block. + err = history.Commit(&blk1, results1) + require.NoError(err, "Commit") // No blocks should be received. testWatchBlocks(t, history, 0) - // Commit storage checkpoint. err = history.StorageSyncCheckpoint(10) require.NoError(err, "StorageSyncCheckpoint") - // Block should be received. testWatchBlocks(t, history, 10) + // Commit second block. + err = history.Commit(&blk2, results2) + require.NoError(err, "Commit") + // Commit storage checkpoint. + err = history.StorageSyncCheckpoint(11) + require.NoError(err, "StorageSyncCheckpoint") + // Wait synced round so that notifier processes it. + _, err = history.WaitRoundSynced(ctx, 11) + require.NoError(err, "WaitRoundSynced") + // Block should be received. + testWatchBlocks(t, history, 11) // Test history without local storage. dataDir2, err := os.MkdirTemp("", "oasis-runtime-history-test_") @@ -252,21 +266,20 @@ func TestWatchBlocks(t *testing.T) { require.NoError(err, "New") // No blocks should be received. testWatchBlocks(t, history, 0) - - // Commit a block. - err = history.ConsensusCheckpoint(40) - require.NoError(err, "ConsensusCheckpoint") - err = history.Commit(&blk, roundResults, true) + // Commit first block. + err = history.Commit(&blk1, results1) require.NoError(err, "Commit should work") - - // Block should be received. - testWatchBlocks(t, history, 10) - + // Block should not be received. + testWatchBlocks(t, history, 0) + // Mark reindex as finished. + history.ReindexFinished() + // Commit a second block. + err = history.Commit(&blk2, results2) + require.NoError(err, "Commit") // Wait round sync should return correct round. - r, err := history.WaitRoundSynced(ctx, 10) + r, err := history.WaitRoundSynced(ctx, 11) require.NoError(err, "WaitRoundSynced") - require.EqualValues(10, r, "WaitRoundSynced") - + require.EqualValues(11, r, "WaitRoundSynced") // Committing storage checkpoint should panic. assert.Panics(t, func() { _ = history.StorageSyncCheckpoint(10) }, "StorageSyncCheckpoint should panic") } @@ -338,7 +351,7 @@ func TestHistoryPrune(t *testing.T) { Messages: msgResults, } - err = history.Commit(&blk, roundResults, true) + err = history.Commit(&blk, roundResults) require.NoError(err, "Commit") err = history.StorageSyncCheckpoint(blk.Block.Header.Round) @@ -430,7 +443,7 @@ func TestHistoryPruneError(t *testing.T) { } blk.Block.Header.Round = uint64(i) - err = history.Commit(&blk, nil, true) + err = history.Commit(&blk, nil) require.NoError(err, "Commit") err = history.StorageSyncCheckpoint(blk.Block.Header.Round)