Skip to content

Commit

Permalink
go/runtime/history: Add CommitBatch method
Browse files Browse the repository at this point in the history
Commit is now only a special case of CommitBatch.
  • Loading branch information
martintomazic committed Feb 25, 2025
1 parent b7b82d9 commit a0a289e
Show file tree
Hide file tree
Showing 4 changed files with 266 additions and 49 deletions.
23 changes: 19 additions & 4 deletions go/roothash/api/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,28 @@ type BlockHistory interface {
// RuntimeID returns the runtime ID of the runtime this block history is for.
RuntimeID() common.Namespace

// Commit commits an annotated block into history. If notify is set to true,
// the watchers will be notified about the new block. Disable notify when
// doing reindexing.
// Commit commits an annotated block with corresponding round results.
//
// Must be called in order, sorted by round.
// If notify is set to true, the watchers will be notified about the new block.
//
// Any sequence of Commit and CommitBatch calls is valid as long as as blocks
// are sorted by round.
//
// Returns an error if a block at higher or equal round was already committed.
Commit(blk *AnnotatedBlock, roundResults *RoundResults, notify bool) error

// CommitBatch commits annotated blocks with corresponding round results.
//
// If notify is set to true, the watchers will be notified about all
// blocks in batch.
//
// Within a batch, blocks should be sorted by round. Any sequence of Commit
// and CommitBatch calls is valid as long as blocks are sorted by round.
//
// Returns an error if a block at higher or equal round than the first item
// in a batch was already committed.
CommitBatch(blks []*AnnotatedBlock, roundResults []*RoundResults, notify bool) error

// StorageSyncCheckpoint records the last storage round which was synced
// to runtime storage.
StorageSyncCheckpoint(round uint64) error
Expand Down
71 changes: 41 additions & 30 deletions go/runtime/history/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,48 +149,59 @@ func (d *DB) metadata() (*dbMetadata, error) {
return meta, nil
}

func (d *DB) commit(blk *roothash.AnnotatedBlock, roundResults *roothash.RoundResults) error {
func (d *DB) commitBatch(blks []*roothash.AnnotatedBlock, results []*roothash.RoundResults) error {
if len(blks) != len(results) {
return fmt.Errorf("slices size mismatch: %d blocks not equal to %d round results",
len(blks),
len(results),
)
}
if len(blks) == 0 {
return nil
}

return d.db.Update(func(tx *badger.Txn) error {
meta, err := d.queryGetMetadata(tx)
if err != nil {
return err
}

rtID := blk.Block.Header.Namespace
if !rtID.Equal(&meta.RuntimeID) {
return fmt.Errorf("runtime/history: runtime mismatch (expected: %s got: %s)",
meta.RuntimeID,
rtID,
)
}
for i, blk := range blks {
rtID := blk.Block.Header.Namespace
if !rtID.Equal(&meta.RuntimeID) {
return fmt.Errorf("runtime mismatch (expected: %s got: %s)",
meta.RuntimeID,
rtID,
)
}

if blk.Height < meta.LastConsensusHeight {
return fmt.Errorf("runtime/history: commit at lower consensus height (current: %d wanted: %d)",
meta.LastConsensusHeight,
blk.Height,
)
}
if blk.Height < meta.LastConsensusHeight {
return fmt.Errorf("commit at lower or equal consensus height (current: %d wanted: %d)",
meta.LastConsensusHeight,
blk.Height,
)
}

if blk.Block.Header.Round <= meta.LastRound && meta.LastConsensusHeight != 0 {
return fmt.Errorf("runtime/history: commit at lower round (current: %d wanted: %d)",
meta.LastRound,
blk.Block.Header.Round,
)
}
if blk.Block.Header.Round <= meta.LastRound && meta.LastConsensusHeight != 0 {
return fmt.Errorf("commit at lower or equal round (current: %d wanted: %d)",
meta.LastRound,
blk.Block.Header.Round,
)
}

if err = tx.Set(blockKeyFmt.Encode(blk.Block.Header.Round), cbor.Marshal(blk)); err != nil {
return err
}
if err := tx.Set(blockKeyFmt.Encode(blk.Block.Header.Round), cbor.Marshal(blk)); err != nil {
return err
}

if err = tx.Set(roundResultsKeyFmt.Encode(blk.Block.Header.Round), cbor.Marshal(roundResults)); err != nil {
return err
}
if err := tx.Set(roundResultsKeyFmt.Encode(blk.Block.Header.Round), cbor.Marshal(results[i])); err != nil {
return err
}

meta.LastRound = blk.Block.Header.Round
if blk.Height > meta.LastConsensusHeight {
meta.LastConsensusHeight = blk.Height
meta.LastRound = blk.Block.Header.Round
if blk.Height > meta.LastConsensusHeight {
meta.LastConsensusHeight = blk.Height
}
}

return tx.Set(metadataKeyFmt.Encode(), cbor.Marshal(meta))
})
}
Expand Down
35 changes: 29 additions & 6 deletions go/runtime/history/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ func (h *nopHistory) Commit(*roothash.AnnotatedBlock, *roothash.RoundResults, bo
return errNopHistory
}

func (h *nopHistory) CommitBatch([]*roothash.AnnotatedBlock, []*roothash.RoundResults, bool) error {
return errNopHistory
}

func (h *nopHistory) StorageSyncCheckpoint(uint64) error {
return errNopHistory
}
Expand Down Expand Up @@ -134,22 +138,41 @@ func (h *runtimeHistory) RuntimeID() common.Namespace {
return h.runtimeID
}

func (h *runtimeHistory) Commit(blk *roothash.AnnotatedBlock, roundResults *roothash.RoundResults, notify bool) error {
err := h.db.commit(blk, roundResults)
func (h *runtimeHistory) CommitBatch(blks []*roothash.AnnotatedBlock, results []*roothash.RoundResults, notify bool) error {
if len(blks) == 0 && len(results) == 0 {
return nil
}

err := h.db.commitBatch(blks, results)
if err != nil {
return err
return fmt.Errorf("failed to commit batch: %w", err)
}

// Notify the pruner what the new round is.
h.pruneCh.In() <- blk.Block.Header.Round
lastBlk := blks[len(blks)-1]
h.pruneCh.In() <- lastBlk.Block.Header.Round

// If no local storage worker, notify the block watcher that new block is committed,
// If no local storage worker, notify the block watcher about new blocks,
// otherwise the storage-sync-checkpoint will do the notification.
if h.hasLocalStorage || !notify {
return nil
}
h.blocksNotifier.Broadcast(blk)
for _, blk := range blks {
h.blocksNotifier.Broadcast(blk)
}

return nil
}

func (h *runtimeHistory) Commit(blk *roothash.AnnotatedBlock, roundResults *roothash.RoundResults, notify bool) error {
err := h.CommitBatch([]*roothash.AnnotatedBlock{blk}, []*roothash.RoundResults{roundResults}, notify)
if err != nil {
return fmt.Errorf("failed to commit at height %d, and round %d: %w",
blk.Height,
blk.Block.Header.Round,
err,
)
}
return nil
}

Expand Down
Loading

0 comments on commit a0a289e

Please sign in to comment.