diff --git a/go/roothash/api/history.go b/go/roothash/api/history.go index d2ba9620833..d957839e34f 100644 --- a/go/roothash/api/history.go +++ b/go/roothash/api/history.go @@ -18,13 +18,28 @@ type BlockHistory interface { // RuntimeID returns the runtime ID of the runtime this block history is for. RuntimeID() common.Namespace - // Commit commits an annotated block into history. If notify is set to true, - // the watchers will be notified about the new block. Disable notify when - // doing reindexing. + // Commit commits an annotated block with corresponding round results. // - // Must be called in order, sorted by round. + // If notify is set to true, the watchers will be notified about the new block. + // + // Any sequence of Commit and CommitBatch calls is valid as long as as blocks + // are sorted by round. + // + // Returns an error if a block at higher or equal round was already committed. Commit(blk *AnnotatedBlock, roundResults *RoundResults, notify bool) error + // CommitBatch commits annotated blocks with corresponding round results. + // + // If notify is set to true, the watchers will be notified about all + // blocks in batch. + // + // Within a batch, blocks should be sorted by round. Any sequence of Commit + // and CommitBatch calls is valid as long as blocks are sorted by round. + // + // Returns an error if a block at higher or equal round than the first item + // in a batch was already committed. + CommitBatch(blks []*AnnotatedBlock, roundResults []*RoundResults, notify bool) error + // StorageSyncCheckpoint records the last storage round which was synced // to runtime storage. StorageSyncCheckpoint(round uint64) error diff --git a/go/runtime/history/db.go b/go/runtime/history/db.go index 135d01bfd6e..5bdac10e75d 100644 --- a/go/runtime/history/db.go +++ b/go/runtime/history/db.go @@ -149,48 +149,59 @@ func (d *DB) metadata() (*dbMetadata, error) { return meta, nil } -func (d *DB) commit(blk *roothash.AnnotatedBlock, roundResults *roothash.RoundResults) error { +func (d *DB) commitBatch(blks []*roothash.AnnotatedBlock, results []*roothash.RoundResults) error { + if len(blks) != len(results) { + return fmt.Errorf("slices size mismatch: %d blocks not equal to %d round results", + len(blks), + len(results), + ) + } + if len(blks) == 0 { + return nil + } + return d.db.Update(func(tx *badger.Txn) error { meta, err := d.queryGetMetadata(tx) if err != nil { return err } - rtID := blk.Block.Header.Namespace - if !rtID.Equal(&meta.RuntimeID) { - return fmt.Errorf("runtime/history: runtime mismatch (expected: %s got: %s)", - meta.RuntimeID, - rtID, - ) - } + for i, blk := range blks { + rtID := blk.Block.Header.Namespace + if !rtID.Equal(&meta.RuntimeID) { + return fmt.Errorf("runtime mismatch (expected: %s got: %s)", + meta.RuntimeID, + rtID, + ) + } - if blk.Height < meta.LastConsensusHeight { - return fmt.Errorf("runtime/history: commit at lower consensus height (current: %d wanted: %d)", - meta.LastConsensusHeight, - blk.Height, - ) - } + if blk.Height < meta.LastConsensusHeight { + return fmt.Errorf("commit at lower or equal consensus height (current: %d wanted: %d)", + meta.LastConsensusHeight, + blk.Height, + ) + } - if blk.Block.Header.Round <= meta.LastRound && meta.LastConsensusHeight != 0 { - return fmt.Errorf("runtime/history: commit at lower round (current: %d wanted: %d)", - meta.LastRound, - blk.Block.Header.Round, - ) - } + if blk.Block.Header.Round <= meta.LastRound && meta.LastConsensusHeight != 0 { + return fmt.Errorf("commit at lower or equal round (current: %d wanted: %d)", + meta.LastRound, + blk.Block.Header.Round, + ) + } - if err = tx.Set(blockKeyFmt.Encode(blk.Block.Header.Round), cbor.Marshal(blk)); err != nil { - return err - } + if err := tx.Set(blockKeyFmt.Encode(blk.Block.Header.Round), cbor.Marshal(blk)); err != nil { + return err + } - if err = tx.Set(roundResultsKeyFmt.Encode(blk.Block.Header.Round), cbor.Marshal(roundResults)); err != nil { - return err - } + if err := tx.Set(roundResultsKeyFmt.Encode(blk.Block.Header.Round), cbor.Marshal(results[i])); err != nil { + return err + } - meta.LastRound = blk.Block.Header.Round - if blk.Height > meta.LastConsensusHeight { - meta.LastConsensusHeight = blk.Height + meta.LastRound = blk.Block.Header.Round + if blk.Height > meta.LastConsensusHeight { + meta.LastConsensusHeight = blk.Height + } } - return tx.Set(metadataKeyFmt.Encode(), cbor.Marshal(meta)) }) } diff --git a/go/runtime/history/history.go b/go/runtime/history/history.go index 85da61f5532..72ee14646ac 100644 --- a/go/runtime/history/history.go +++ b/go/runtime/history/history.go @@ -54,6 +54,10 @@ func (h *nopHistory) Commit(*roothash.AnnotatedBlock, *roothash.RoundResults, bo return errNopHistory } +func (h *nopHistory) CommitBatch([]*roothash.AnnotatedBlock, []*roothash.RoundResults, bool) error { + return errNopHistory +} + func (h *nopHistory) StorageSyncCheckpoint(uint64) error { return errNopHistory } @@ -134,22 +138,41 @@ func (h *runtimeHistory) RuntimeID() common.Namespace { return h.runtimeID } -func (h *runtimeHistory) Commit(blk *roothash.AnnotatedBlock, roundResults *roothash.RoundResults, notify bool) error { - err := h.db.commit(blk, roundResults) +func (h *runtimeHistory) CommitBatch(blks []*roothash.AnnotatedBlock, results []*roothash.RoundResults, notify bool) error { + if len(blks) == 0 && len(results) == 0 { + return nil + } + + err := h.db.commitBatch(blks, results) if err != nil { - return err + return fmt.Errorf("failed to commit batch: %w", err) } // Notify the pruner what the new round is. - h.pruneCh.In() <- blk.Block.Header.Round + lastBlk := blks[len(blks)-1] + h.pruneCh.In() <- lastBlk.Block.Header.Round - // If no local storage worker, notify the block watcher that new block is committed, + // If no local storage worker, notify the block watcher about new blocks, // otherwise the storage-sync-checkpoint will do the notification. if h.hasLocalStorage || !notify { return nil } - h.blocksNotifier.Broadcast(blk) + for _, blk := range blks { + h.blocksNotifier.Broadcast(blk) + } + + return nil +} +func (h *runtimeHistory) Commit(blk *roothash.AnnotatedBlock, roundResults *roothash.RoundResults, notify bool) error { + err := h.CommitBatch([]*roothash.AnnotatedBlock{blk}, []*roothash.RoundResults{roundResults}, notify) + if err != nil { + return fmt.Errorf("failed to commit at height %d, and round %d: %w", + blk.Height, + blk.Block.Header.Round, + err, + ) + } return nil } diff --git a/go/runtime/history/history_test.go b/go/runtime/history/history_test.go index 28f993956e1..da4a571ce91 100644 --- a/go/runtime/history/history_test.go +++ b/go/runtime/history/history_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/require" "github.com/oasisprotocol/oasis-core/go/common" + "github.com/oasisprotocol/oasis-core/go/roothash/api" roothash "github.com/oasisprotocol/oasis-core/go/roothash/api" "github.com/oasisprotocol/oasis-core/go/roothash/api/block" ) @@ -170,6 +171,121 @@ func TestHistory(t *testing.T) { require.Equal(roundResults, gotResults, "GetRoundResults should return the correct results") } +func TestCommitBatch(t *testing.T) { + require := require.New(t) + ctx := context.Background() + + dataDir, err := os.MkdirTemp("", "oasis-runtime-history-test_") + require.NoError(err, "TempDir") + defer os.RemoveAll(dataDir) + + runtimeID1 := common.NewTestNamespaceFromSeed([]byte("history test ns 1"), 0) + runtimeID2 := common.NewTestNamespaceFromSeed([]byte("history test ns 2"), 0) + + prunerFactory := NewNonePrunerFactory() + history, err := New(runtimeID1, dataDir, prunerFactory, true) + require.NoError(err, "New") + + require.Equal(runtimeID1, history.RuntimeID()) + + // Sample data. + blk1 := roothash.AnnotatedBlock{ + Height: 1, + Block: block.NewGenesisBlock(runtimeID1, 0), + } + blk2 := roothash.AnnotatedBlock{ + Height: 3, + Block: block.NewGenesisBlock(runtimeID2, 0), + } + blk2.Block.Header.Round = 1 + results1 := &roothash.RoundResults{ + Messages: []*roothash.MessageEvent{ + {Module: "", Code: 0, Index: 0}, + {Module: "", Code: 0, Index: 1}, + }, + } + results2 := &roothash.RoundResults{ + Messages: []*roothash.MessageEvent{ + {Module: "", Code: 1, Index: 0}, + }, + } + + err = history.CommitBatch(nil, nil, true) + require.NoError(err, "CommitBatch should succeed for empty batch") + + err = history.CommitBatch([]*roothash.AnnotatedBlock{&blk1, &blk2}, + []*roothash.RoundResults{results1}, + true, + ) + require.Error(err, "CommitBatch should fail when slices don't have equal size") + + err = history.CommitBatch([]*roothash.AnnotatedBlock{&blk1, &blk2}, + []*roothash.RoundResults{results1, results2}, + true, + ) + require.Error(err, "CommitBatch should fail when different runtimes IDs") + + copy(blk2.Block.Header.Namespace[:], blk1.Block.Header.Namespace[:]) + + // Commit batch in wrong order: round 1 and 0 at consenus height 3 and 1. + err = history.CommitBatch([]*roothash.AnnotatedBlock{&blk2, &blk1}, + []*roothash.RoundResults{results2, results1}, + true, + ) + require.Error(err, "CommitBatch should fail for unordered batch") + + // Commit batch round 0 and 1 at consenus height 1 and 3. + err = history.CommitBatch([]*roothash.AnnotatedBlock{&blk1, &blk2}, + []*roothash.RoundResults{results1, results2}, + true, + ) + require.NoError(err, "CommitBatch") + + lastHeight, err := history.LastConsensusHeight() + require.NoError(err, "LastConsensusHeight") + require.EqualValues(3, lastHeight) + + gotBlock, err := history.GetCommittedBlock(ctx, 0) + require.NoError(err, "GetCommittedBlock(0)") + require.Equal(blk1.Block, gotBlock, "GetCommittedBlock should return the correct block") + + gotBlock, err = history.GetCommittedBlock(ctx, api.RoundLatest) + require.NoError(err, "GetCommittedBlock(RoundLatest)") + require.Equal(blk2.Block, gotBlock, "GetCommittedBlock should return the correct block") + + // Commit for the latest height and round should fail + err = history.Commit(&blk2, nil, true) + require.Error(err, "Commit should fail for same consensus height") + + // Commit for the latest round should fail. + blk2.Height = 4 + err = history.Commit(&blk2, nil, true) + require.Error(err, "Commit should fail for same round") + + // Commit after batch commit should succeed when round and height increases. + blk2.Block.Header.Round = 2 + err = history.Commit(&blk2, nil, true) + require.NoError(err, "Commit") + + err = history.StorageSyncCheckpoint(2) + require.NoError(err, "StorageSyncCheckpoint should work") + + gotAnnBlk, err := history.GetAnnotatedBlock(ctx, 2) + require.NoError(err, "GetAnnotatedBlock") + require.Equal(&blk2, gotAnnBlk, "GetAnnotatedBlock should return the correct block") + + // Try committing another batch after a single commit. + blk1.Height = 5 + blk2.Height = 7 + blk1.Block.Header.Round = 5 + blk2.Block.Header.Round = 6 + err = history.CommitBatch([]*roothash.AnnotatedBlock{&blk1, &blk2}, + []*roothash.RoundResults{nil, nil}, + false, + ) + require.NoError(err, "CommitBatch") +} + func testWatchBlocks(t *testing.T, history History, expectedRound uint64) { t.Helper() require := require.New(t) @@ -228,6 +344,8 @@ func TestWatchBlocks(t *testing.T) { {Module: "", Code: 1, Index: 0}, }, } + blocks := []*roothash.AnnotatedBlock{&blk1, &blk2} + results := []*roothash.RoundResults{results1, nil} // Test history with local storage. prunerFactory := NewNonePrunerFactory() @@ -280,7 +398,41 @@ func TestWatchBlocks(t *testing.T) { require.NoError(err, "WaitRoundSynced") require.EqualValues(11, r, "WaitRoundSynced") // Committing storage checkpoint should panic. - assert.Panics(t, func() { _ = history.StorageSyncCheckpoint(11) }, "StorageSyncCheckpoint should panic") + assert.Panics(t, func() { _ = history.StorageSyncCheckpoint(10) }, "StorageSyncCheckpoint should panic") + + // Test history with local storage and batching. + dataDir3, err := os.MkdirTemp("", "oasis-runtime-history-test_") + require.NoError(err, "TempDir") + defer os.RemoveAll(dataDir3) + history, err = New(runtimeID, dataDir3, prunerFactory, true) + require.NoError(err, "New") + testWatchBlocks(t, history, 0) + err = history.CommitBatch(blocks, results, false) // notify=false + require.NoError(err, "CommitBatch") + // In case of a local storage, we broadcast blocks when + // StorageSyncCheckpoint is called, regardless of notify flag. + testWatchBlocks(t, history, 0) + err = history.StorageSyncCheckpoint(10) + require.NoError(err, "StorageSyncCheckpoint") + testWatchBlocks(t, history, 10) + err = history.StorageSyncCheckpoint(11) + require.NoError(err, "StorageSyncCheckpoint") + // Wait synced round so that notifier processes it. + _, err = history.WaitRoundSynced(ctx, 11) + require.NoError(err, "WaitRoundSynced") + testWatchBlocks(t, history, 11) + + // Test history without local storage and with batching. + dataDir4, err := os.MkdirTemp("", "oasis-runtime-history-test_") + require.NoError(err, "TempDir") + defer os.RemoveAll(dataDir4) + history, err = New(runtimeID, dataDir4, prunerFactory, false) + require.NoError(err, "New") + testWatchBlocks(t, history, 0) + err = history.CommitBatch(blocks, results, false) // set notify to false + require.NoError(err, "CommitBatch") + // No block should be received since we set notify to false. + testWatchBlocks(t, history, 0) } type testPruneHandler struct { @@ -330,8 +482,11 @@ func TestHistoryPrune(t *testing.T) { } history.Pruner().RegisterHandler(&ph) - // Create some blocks. - for i := 0; i <= 50; i++ { + const n = 51 + + blks := make([]*roothash.AnnotatedBlock, n) + results := make([]*roothash.RoundResults, n) + for i := 0; i < n; i++ { blk := roothash.AnnotatedBlock{ Height: int64(i), Block: block.NewGenesisBlock(runtimeID, 0), @@ -345,18 +500,31 @@ func TestHistoryPrune(t *testing.T) { {Module: "", Code: 0, Index: 1}, } } - roundResults := &roothash.RoundResults{ Messages: msgResults, } + blks[i] = &blk + results[i] = roundResults + } - err = history.Commit(&blk, roundResults, true) - require.NoError(err, "Commit") + // Commit first 30 blocks in a batch of 10. + err = history.CommitBatch(blks[:10], results[:10], false) + require.NoError(err, "Commit") + err = history.CommitBatch(blks[10:20], results[10:20], false) + require.NoError(err, "Commit") + err = history.CommitBatch(blks[20:30], results[20:30], false) + require.NoError(err, "Commit") - err = history.StorageSyncCheckpoint(blk.Block.Header.Round) + // Commit remaining 20 blocks one by one. + for i := 30; i < n; i++ { + err = history.Commit(blks[i], results[i], true) + require.NoError(err, "Commit") + } + // Simulate storage syncing. + for i := 0; i < n; i++ { + err = history.StorageSyncCheckpoint(blks[i].Block.Header.Round) require.NoError(err, "StorageSyncCheckpoint") } - // No more blocks after this point. // Wait for pruning to complete. @@ -383,7 +551,7 @@ func TestHistoryPrune(t *testing.T) { } // Ensure we can only lookup the last 10 blocks. - for i := 0; i <= 50; i++ { + for i := 0; i < n; i++ { _, err = history.GetBlock(ctx, uint64(i)) if i <= 40 { require.Error(err, "GetBlock should fail for pruned block %d", i)