Skip to content

Commit

Permalink
Merge pull request #6868 from onflow/leo/storage-refactor-approvals
Browse files Browse the repository at this point in the history
[Storage Refactor] Refactor Approvals
  • Loading branch information
zhangchiqing authored Feb 5, 2025
2 parents 914f353 + 5c6f00c commit 0aae073
Show file tree
Hide file tree
Showing 5 changed files with 483 additions and 1 deletion.
2 changes: 1 addition & 1 deletion engine/testutil/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -1016,7 +1016,7 @@ func VerificationNode(t testing.TB,

chunkVerifier := chunks.NewChunkVerifier(vm, vmCtx, node.Log)

approvalStorage := storage.NewResultApprovals(node.Metrics, node.PublicDB)
approvalStorage := store.NewResultApprovals(node.Metrics, badgerimpl.ToDB(node.PublicDB))

node.VerifierEngine, err = verifier.New(node.Log,
collector,
Expand Down
48 changes: 48 additions & 0 deletions storage/operation/approvals.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package operation

import (
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/storage"
)

// InsertResultApproval inserts a ResultApproval by ID.
// The same key (`approval.ID()`) necessitates that the value (full `approval`) is
// also identical (otherwise, we would have a successful pre-image attack on our
// cryptographic hash function). Therefore, concurrent calls to this function are safe.
func InsertResultApproval(w storage.Writer, approval *flow.ResultApproval) error {
return UpsertByKey(w, MakePrefix(codeResultApproval, approval.ID()), approval)
}

// RetrieveResultApproval retrieves an approval by ID.
// Returns `storage.ErrNotFound` if no Approval with the given ID has been stored.
func RetrieveResultApproval(r storage.Reader, approvalID flow.Identifier, approval *flow.ResultApproval) error {
return RetrieveByKey(r, MakePrefix(codeResultApproval, approvalID), approval)
}

// UnsafeIndexResultApproval inserts a ResultApproval ID keyed by ExecutionResult ID
// and chunk index.
// Unsafe means that it does not check if a different approval is indexed for the same
// chunk, and will overwrite the existing index.
// CAUTION:
// - In general, the Flow protocol requires multiple approvals for the same chunk from different
// verification nodes. In other words, there are multiple different approvals for the same chunk.
// Therefore, this index Executed Chunk ➜ ResultApproval ID is *only safe* to be used by
// Verification Nodes for tracking their own approvals (for the same ExecutionResult, a Verifier
// will always produce the same approval)
// - In order to make sure only one approval is indexed for the chunk, _all calls_ to
// `UnsafeIndexResultApproval` must be synchronized by the higher-logic. Currently, we have the
// convention that `store.ResultApprovals` is the only place that is allowed to call this method.
func UnsafeIndexResultApproval(w storage.Writer, resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) error {
return UpsertByKey(w, MakePrefix(codeIndexResultApprovalByChunk, resultID, chunkIndex), approvalID)
}

// LookupResultApproval finds a ResultApproval by result ID and chunk index.
// Returns `storage.ErrNotFound` if no Approval for the given key (resultID, chunkIndex) has been stored.
//
// NOTE that the Flow protocol requires multiple approvals for the same chunk from different verification
// nodes. In other words, there are multiple different approvals for the same chunk. Therefore, the index
// Executed Chunk ➜ ResultApproval ID (queried here) is *only safe* to be used by Verification Nodes
// for tracking their own approvals (for the same ExecutionResult, a Verifier will always produce the same approval)
func LookupResultApproval(r storage.Reader, resultID flow.Identifier, chunkIndex uint64, approvalID *flow.Identifier) error {
return RetrieveByKey(r, MakePrefix(codeIndexResultApprovalByChunk, resultID, chunkIndex), approvalID)
}
129 changes: 129 additions & 0 deletions storage/store/approvals.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package store

import (
"errors"
"fmt"
"sync"

"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/storage"
"github.com/onflow/flow-go/storage/operation"
)

// ResultApprovals implements persistent storage for result approvals.
//
// CAUTION suitable only for _Verification Nodes_ for persisting their _own_ approvals!
// - In general, the Flow protocol requires multiple approvals for the same chunk from different
// verification nodes. In other words, there are multiple different approvals for the same chunk.
// - Internally, ResultApprovals populates an index from Executed Chunk ➜ ResultApproval. This is
// *only safe* for Verification Nodes when tracking their own approvals (for the same ExecutionResult,
// a Verifier will always produce the same approval)
type ResultApprovals struct {
db storage.DB
cache *Cache[flow.Identifier, *flow.ResultApproval]
indexing *sync.Mutex // preventing concurrent indexing of approvals
}

var _ storage.ResultApprovals = (*ResultApprovals)(nil)

func NewResultApprovals(collector module.CacheMetrics, db storage.DB) *ResultApprovals {
store := func(rw storage.ReaderBatchWriter, key flow.Identifier, val *flow.ResultApproval) error {
return operation.InsertResultApproval(rw.Writer(), val)
}

retrieve := func(r storage.Reader, approvalID flow.Identifier) (*flow.ResultApproval, error) {
var approval flow.ResultApproval
err := operation.RetrieveResultApproval(r, approvalID, &approval)
return &approval, err
}

return &ResultApprovals{
db: db,
cache: newCache(collector, metrics.ResourceResultApprovals,
withLimit[flow.Identifier, *flow.ResultApproval](flow.DefaultTransactionExpiry+100),
withStore(store),
withRetrieve(retrieve)),
indexing: new(sync.Mutex),
}
}

// Store stores a ResultApproval
func (r *ResultApprovals) Store(approval *flow.ResultApproval) error {
return r.db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
return r.cache.PutTx(rw, approval.ID(), approval)
})
}

// Index indexes a ResultApproval by chunk (ResultID + chunk index).
// This operation is idempotent (repeated calls with the same value are equivalent to
// just calling the method once; still the method succeeds on each call).
//
// CAUTION: the Flow protocol requires multiple approvals for the same chunk from different verification
// nodes. In other words, there are multiple different approvals for the same chunk. Therefore, the index
// Executed Chunk ➜ ResultApproval ID (populated here) is *only safe* to be used by Verification Nodes
// for tracking their own approvals.
func (r *ResultApprovals) Index(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) error {
// For the same ExecutionResult, a correct Verifier will always produce the same approval. In other words,
// if we have already indexed an approval for the pair (resultID, chunkIndex) we should never overwrite it
// with a _different_ approval. We explicitly enforce that here to prevent state corruption.
// The lock guarantees that no other thread can concurrently update the index. Thereby confirming that no value
// is already stored for the given key (resultID, chunkIndex) and then updating the index (or aborting) is
// synchronized into one atomic operation.
r.indexing.Lock()
defer r.indexing.Unlock()

err := r.db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
var storedApprovalID flow.Identifier
err := operation.LookupResultApproval(rw.GlobalReader(), resultID, chunkIndex, &storedApprovalID)
if err != nil {
if !errors.Is(err, storage.ErrNotFound) {
return fmt.Errorf("could not lookup result approval ID: %w", err)
}

// no approval found, index the approval

return operation.UnsafeIndexResultApproval(rw.Writer(), resultID, chunkIndex, approvalID)
}

// an approval is already indexed, double check if it is the same
// We don't allow indexing multiple approvals per chunk because the
// store is only used within Verification nodes, and it is impossible
// for a Verification node to compute different approvals for the same
// chunk.

if storedApprovalID != approvalID {
return fmt.Errorf("attempting to store conflicting approval (result: %v, chunk index: %d): storing: %v, stored: %v. %w",
resultID, chunkIndex, approvalID, storedApprovalID, storage.ErrDataMismatch)
}

return nil
})

if err != nil {
return fmt.Errorf("could not index result approval: %w", err)
}
return nil
}

// ByID retrieves a ResultApproval by its ID
func (r *ResultApprovals) ByID(approvalID flow.Identifier) (*flow.ResultApproval, error) {
val, err := r.cache.Get(r.db.Reader(), approvalID)
if err != nil {
return nil, err
}
return val, nil
}

// ByChunk retrieves a ResultApproval by result ID and chunk index. The
// ResultApprovals store is only used within a verification node, where it is
// assumed that there is never more than one approval per chunk.
func (r *ResultApprovals) ByChunk(resultID flow.Identifier, chunkIndex uint64) (*flow.ResultApproval, error) {
var approvalID flow.Identifier
err := operation.LookupResultApproval(r.db.Reader(), resultID, chunkIndex, &approvalID)
if err != nil {
return nil, fmt.Errorf("could not lookup result approval ID: %w", err)
}
return r.ByID(approvalID)
}
131 changes: 131 additions & 0 deletions storage/store/approvals_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package store_test

import (
"errors"
"sync"
"testing"

"github.com/stretchr/testify/require"

"github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/storage"
"github.com/onflow/flow-go/storage/operation/dbtest"
"github.com/onflow/flow-go/storage/store"
"github.com/onflow/flow-go/utils/unittest"
)

func TestApprovalStoreAndRetrieve(t *testing.T) {
dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) {
metrics := metrics.NewNoopCollector()
store := store.NewResultApprovals(metrics, db)

approval := unittest.ResultApprovalFixture()
err := store.Store(approval)
require.NoError(t, err)

err = store.Index(approval.Body.ExecutionResultID, approval.Body.ChunkIndex, approval.ID())
require.NoError(t, err)

byID, err := store.ByID(approval.ID())
require.NoError(t, err)
require.Equal(t, approval, byID)

byChunk, err := store.ByChunk(approval.Body.ExecutionResultID, approval.Body.ChunkIndex)
require.NoError(t, err)
require.Equal(t, approval, byChunk)
})
}

func TestApprovalStoreTwice(t *testing.T) {
dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) {
metrics := metrics.NewNoopCollector()
store := store.NewResultApprovals(metrics, db)

approval := unittest.ResultApprovalFixture()
err := store.Store(approval)
require.NoError(t, err)

err = store.Index(approval.Body.ExecutionResultID, approval.Body.ChunkIndex, approval.ID())
require.NoError(t, err)

err = store.Store(approval)
require.NoError(t, err)

err = store.Index(approval.Body.ExecutionResultID, approval.Body.ChunkIndex, approval.ID())
require.NoError(t, err)
})
}

func TestApprovalStoreTwoDifferentApprovalsShouldFail(t *testing.T) {
dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) {
metrics := metrics.NewNoopCollector()
store := store.NewResultApprovals(metrics, db)

approval1 := unittest.ResultApprovalFixture()
approval2 := unittest.ResultApprovalFixture()

err := store.Store(approval1)
require.NoError(t, err)

err = store.Index(approval1.Body.ExecutionResultID, approval1.Body.ChunkIndex, approval1.ID())
require.NoError(t, err)

// we can store a different approval, but we can't index a different
// approval for the same chunk.
err = store.Store(approval2)
require.NoError(t, err)

err = store.Index(approval1.Body.ExecutionResultID, approval1.Body.ChunkIndex, approval2.ID())
require.Error(t, err)
require.True(t, errors.Is(err, storage.ErrDataMismatch))
})
}

// verify that storing and indexing two conflicting approvals concurrently should be impossible;
// we expect that one operations succeeds, the other one should fail
func TestApprovalStoreTwoDifferentApprovalsConcurrently(t *testing.T) {
dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) {
metrics := metrics.NewNoopCollector()
store := store.NewResultApprovals(metrics, db)

approval1 := unittest.ResultApprovalFixture()
approval2 := unittest.ResultApprovalFixture()

var wg sync.WaitGroup
wg.Add(2)

var firstIndexErr, secondIndexErr error

// First goroutine stores and indexes the first approval.
go func() {
defer wg.Done()

err := store.Store(approval1)
require.NoError(t, err)

firstIndexErr = store.Index(approval1.Body.ExecutionResultID, approval1.Body.ChunkIndex, approval1.ID())
}()

// Second goroutine stores and tries to index the second approval for the same chunk.
go func() {
defer wg.Done()

err := store.Store(approval2)
require.NoError(t, err)

secondIndexErr = store.Index(approval1.Body.ExecutionResultID, approval1.Body.ChunkIndex, approval2.ID())
}()

// Wait for both goroutines to finish
wg.Wait()

// Check that one of the Index operations succeeded and the other failed
if firstIndexErr == nil {
require.Error(t, secondIndexErr)
require.True(t, errors.Is(secondIndexErr, storage.ErrDataMismatch))
} else {
require.NoError(t, secondIndexErr)
require.True(t, errors.Is(firstIndexErr, storage.ErrDataMismatch))
}
})
}
Loading

0 comments on commit 0aae073

Please sign in to comment.