Skip to content

Commit

Permalink
Merge pull request #6872 from onflow/leo/storage-refactor
Browse files Browse the repository at this point in the history
[Storage Refactor] Refactor ConsumerProgress
  • Loading branch information
zhangchiqing authored Feb 7, 2025
2 parents 4ce667d + 58abccb commit 983941b
Show file tree
Hide file tree
Showing 42 changed files with 437 additions and 398 deletions.
51 changes: 32 additions & 19 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,10 @@ import (
"github.com/onflow/flow-go/state/protocol/blocktimer"
"github.com/onflow/flow-go/storage"
bstorage "github.com/onflow/flow-go/storage/badger"
"github.com/onflow/flow-go/storage/operation/badgerimpl"
"github.com/onflow/flow-go/storage/operation/pebbleimpl"
pstorage "github.com/onflow/flow-go/storage/pebble"
"github.com/onflow/flow-go/storage/store"
"github.com/onflow/flow-go/utils/grpcutils"
)

Expand Down Expand Up @@ -552,8 +555,8 @@ func (builder *FlowAccessNodeBuilder) BuildConsensusFollower() *FlowAccessNodeBu
func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccessNodeBuilder {
var ds datastore.Batching
var bs network.BlobService
var processedBlockHeight storage.ConsumerProgress
var processedNotifications storage.ConsumerProgress
var processedBlockHeight storage.ConsumerProgressInitializer
var processedNotifications storage.ConsumerProgressInitializer
var bsDependable *module.ProxiedReadyDoneAware
var execDataDistributor *edrequester.ExecutionDataDistributor
var execDataCacheBackend *herocache.BlockExecutionData
Expand Down Expand Up @@ -607,21 +610,30 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
Module("processed block height consumer progress", func(node *cmd.NodeConfig) error {
// Note: progress is stored in the datastore's DB since that is where the jobqueue
// writes execution data to.
if executionDataDBMode == execution_data.ExecutionDataDBModeBadger {
processedBlockHeight = bstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*badger.DB), module.ConsumeProgressExecutionDataRequesterBlockHeight)
var db storage.DB
edmdb := builder.ExecutionDatastoreManager.DB()

if bdb, ok := edmdb.(*badger.DB); ok {
db = badgerimpl.ToDB(bdb)
} else if pdb, ok := edmdb.(*pebble.DB); ok {
db = pebbleimpl.ToDB(pdb)
} else {
processedBlockHeight = pstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*pebble.DB), module.ConsumeProgressExecutionDataRequesterBlockHeight)
return fmt.Errorf("unsupported execution data DB type: %T", edmdb)
}

processedBlockHeight = store.NewConsumerProgress(db, module.ConsumeProgressExecutionDataRequesterBlockHeight)
return nil
}).
Module("processed notifications consumer progress", func(node *cmd.NodeConfig) error {
// Note: progress is stored in the datastore's DB since that is where the jobqueue
// writes execution data to.
var db storage.DB
if executionDataDBMode == execution_data.ExecutionDataDBModeBadger {
processedNotifications = bstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*badger.DB), module.ConsumeProgressExecutionDataRequesterNotification)
db = badgerimpl.ToDB(builder.ExecutionDatastoreManager.DB().(*badger.DB))
} else {
processedNotifications = pstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*pebble.DB), module.ConsumeProgressExecutionDataRequesterNotification)
db = pebbleimpl.ToDB(builder.ExecutionDatastoreManager.DB().(*pebble.DB))
}
processedNotifications = store.NewConsumerProgress(db, module.ConsumeProgressExecutionDataRequesterNotification)
return nil
}).
Module("blobservice peer manager dependencies", func(node *cmd.NodeConfig) error {
Expand Down Expand Up @@ -848,15 +860,15 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
}

if builder.executionDataIndexingEnabled {
var indexedBlockHeight storage.ConsumerProgress
var indexedBlockHeight storage.ConsumerProgressInitializer

builder.
AdminCommand("execute-script", func(config *cmd.NodeConfig) commands.AdminCommand {
return stateSyncCommands.NewExecuteScriptCommand(builder.ScriptExecutor)
}).
Module("indexed block height consumer progress", func(node *cmd.NodeConfig) error {
// Note: progress is stored in the MAIN db since that is where indexed execution data is stored.
indexedBlockHeight = bstorage.NewConsumerProgress(builder.DB, module.ConsumeProgressExecutionDataIndexerBlockHeight)
indexedBlockHeight = store.NewConsumerProgress(badgerimpl.ToDB(builder.DB), module.ConsumeProgressExecutionDataIndexerBlockHeight)
return nil
}).
Module("transaction results storage", func(node *cmd.NodeConfig) error {
Expand Down Expand Up @@ -1633,8 +1645,8 @@ func (builder *FlowAccessNodeBuilder) enqueueRelayNetwork() {
}

func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
var processedFinalizedBlockHeight storage.ConsumerProgress
var processedTxErrorMessagesBlockHeight storage.ConsumerProgress
var processedFinalizedBlockHeight storage.ConsumerProgressInitializer
var processedTxErrorMessagesBlockHeight storage.ConsumerProgressInitializer

if builder.executionDataSyncEnabled {
builder.BuildExecutionSyncComponents()
Expand Down Expand Up @@ -1838,17 +1850,18 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
return nil
}).
Module("processed finalized block height consumer progress", func(node *cmd.NodeConfig) error {
processedFinalizedBlockHeight = bstorage.NewConsumerProgress(builder.DB, module.ConsumeProgressIngestionEngineBlockHeight)
processedFinalizedBlockHeight = store.NewConsumerProgress(badgerimpl.ToDB(builder.DB), module.ConsumeProgressIngestionEngineBlockHeight)
return nil
}).
Module("processed last full block height monotonic consumer progress", func(node *cmd.NodeConfig) error {
rootBlockHeight := node.State.Params().FinalizedRoot().Height

var err error
lastFullBlockHeight, err = counters.NewPersistentStrictMonotonicCounter(
bstorage.NewConsumerProgress(builder.DB, module.ConsumeProgressLastFullBlockHeight),
rootBlockHeight,
)
progress, err := store.NewConsumerProgress(badgerimpl.ToDB(builder.DB), module.ConsumeProgressLastFullBlockHeight).Initialize(rootBlockHeight)
if err != nil {
return err
}

lastFullBlockHeight, err = counters.NewPersistentStrictMonotonicCounter(progress)
if err != nil {
return fmt.Errorf("failed to initialize monotonic consumer progress: %w", err)
}
Expand Down Expand Up @@ -2149,8 +2162,8 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {

if builder.storeTxResultErrorMessages {
builder.Module("processed error messages block height consumer progress", func(node *cmd.NodeConfig) error {
processedTxErrorMessagesBlockHeight = bstorage.NewConsumerProgress(
builder.DB,
processedTxErrorMessagesBlockHeight = store.NewConsumerProgress(
badgerimpl.ToDB(builder.DB),
module.ConsumeProgressEngineTxErrorMessagesBlockHeight,
)
return nil
Expand Down
24 changes: 16 additions & 8 deletions cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,10 @@ import (
"github.com/onflow/flow-go/state/protocol/events/gadgets"
"github.com/onflow/flow-go/storage"
bstorage "github.com/onflow/flow-go/storage/badger"
"github.com/onflow/flow-go/storage/operation/badgerimpl"
"github.com/onflow/flow-go/storage/operation/pebbleimpl"
pstorage "github.com/onflow/flow-go/storage/pebble"
"github.com/onflow/flow-go/storage/store"
"github.com/onflow/flow-go/utils/grpcutils"
"github.com/onflow/flow-go/utils/io"
)
Expand Down Expand Up @@ -1057,8 +1060,8 @@ func (builder *ObserverServiceBuilder) Build() (cmd.Node, error) {
func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverServiceBuilder {
var ds datastore.Batching
var bs network.BlobService
var processedBlockHeight storage.ConsumerProgress
var processedNotifications storage.ConsumerProgress
var processedBlockHeight storage.ConsumerProgressInitializer
var processedNotifications storage.ConsumerProgressInitializer
var publicBsDependable *module.ProxiedReadyDoneAware
var execDataDistributor *edrequester.ExecutionDataDistributor
var execDataCacheBackend *herocache.BlockExecutionData
Expand Down Expand Up @@ -1112,21 +1115,26 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
Module("processed block height consumer progress", func(node *cmd.NodeConfig) error {
// Note: progress is stored in the datastore's DB since that is where the jobqueue
// writes execution data to.
var db storage.DB
if executionDataDBMode == execution_data.ExecutionDataDBModeBadger {
processedBlockHeight = bstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*badger.DB), module.ConsumeProgressExecutionDataRequesterBlockHeight)
db = badgerimpl.ToDB(builder.ExecutionDatastoreManager.DB().(*badger.DB))
} else {
processedBlockHeight = pstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*pebble.DB), module.ConsumeProgressExecutionDataRequesterBlockHeight)
db = pebbleimpl.ToDB(builder.ExecutionDatastoreManager.DB().(*pebble.DB))
}

processedBlockHeight = store.NewConsumerProgress(db, module.ConsumeProgressExecutionDataRequesterBlockHeight)
return nil
}).
Module("processed notifications consumer progress", func(node *cmd.NodeConfig) error {
// Note: progress is stored in the datastore's DB since that is where the jobqueue
// writes execution data to.
var db storage.DB
if executionDataDBMode == execution_data.ExecutionDataDBModeBadger {
processedNotifications = bstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*badger.DB), module.ConsumeProgressExecutionDataRequesterNotification)
db = badgerimpl.ToDB(builder.ExecutionDatastoreManager.DB().(*badger.DB))
} else {
processedNotifications = pstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*pebble.DB), module.ConsumeProgressExecutionDataRequesterNotification)
db = pebbleimpl.ToDB(builder.ExecutionDatastoreManager.DB().(*pebble.DB))
}
processedNotifications = store.NewConsumerProgress(db, module.ConsumeProgressExecutionDataRequesterNotification)
return nil
}).
Module("blobservice peer manager dependencies", func(node *cmd.NodeConfig) error {
Expand Down Expand Up @@ -1311,11 +1319,11 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
return builder.ExecutionDataPruner, nil
})
if builder.executionDataIndexingEnabled {
var indexedBlockHeight storage.ConsumerProgress
var indexedBlockHeight storage.ConsumerProgressInitializer

builder.Module("indexed block height consumer progress", func(node *cmd.NodeConfig) error {
// Note: progress is stored in the MAIN db since that is where indexed execution data is stored.
indexedBlockHeight = bstorage.NewConsumerProgress(builder.DB, module.ConsumeProgressExecutionDataIndexerBlockHeight)
indexedBlockHeight = store.NewConsumerProgress(badgerimpl.ToDB(builder.DB), module.ConsumeProgressExecutionDataIndexerBlockHeight)
return nil
}).Module("transaction results storage", func(node *cmd.NodeConfig) error {
builder.Storage.LightTransactionResults = bstorage.NewLightTransactionResults(node.Metrics.Cache, node.DB, bstorage.DefaultCacheSize)
Expand Down
17 changes: 10 additions & 7 deletions cmd/verification_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ import (
"github.com/onflow/flow-go/state/protocol"
badgerState "github.com/onflow/flow-go/state/protocol/badger"
"github.com/onflow/flow-go/state/protocol/blocktimer"
"github.com/onflow/flow-go/storage"
"github.com/onflow/flow-go/storage/badger"
"github.com/onflow/flow-go/storage/operation/badgerimpl"
"github.com/onflow/flow-go/storage/store"
)

type VerificationConfig struct {
Expand Down Expand Up @@ -88,11 +91,11 @@ func (v *VerificationNodeBuilder) LoadComponentsAndModules() {
var (
followerState protocol.FollowerState

chunkStatuses *stdmap.ChunkStatuses // used in fetcher engine
chunkRequests *stdmap.ChunkRequests // used in requester engine
processedChunkIndex *badger.ConsumerProgress // used in chunk consumer
processedBlockHeight *badger.ConsumerProgress // used in block consumer
chunkQueue *badger.ChunksQueue // used in chunk consumer
chunkStatuses *stdmap.ChunkStatuses // used in fetcher engine
chunkRequests *stdmap.ChunkRequests // used in requester engine
processedChunkIndex storage.ConsumerProgressInitializer // used in chunk consumer
processedBlockHeight storage.ConsumerProgressInitializer // used in block consumer
chunkQueue *badger.ChunksQueue // used in chunk consumer

syncCore *chainsync.Core // used in follower engine
assignerEngine *assigner.Engine // the assigner engine
Expand Down Expand Up @@ -155,11 +158,11 @@ func (v *VerificationNodeBuilder) LoadComponentsAndModules() {
return nil
}).
Module("processed chunk index consumer progress", func(node *NodeConfig) error {
processedChunkIndex = badger.NewConsumerProgress(node.DB, module.ConsumeProgressVerificationChunkIndex)
processedChunkIndex = store.NewConsumerProgress(badgerimpl.ToDB(node.DB), module.ConsumeProgressVerificationChunkIndex)
return nil
}).
Module("processed block height consumer progress", func(node *NodeConfig) error {
processedBlockHeight = badger.NewConsumerProgress(node.DB, module.ConsumeProgressVerificationBlockHeight)
processedBlockHeight = store.NewConsumerProgress(badgerimpl.ToDB(node.DB), module.ConsumeProgressVerificationBlockHeight)
return nil
}).
Module("chunks queue", func(node *NodeConfig) error {
Expand Down
37 changes: 20 additions & 17 deletions engine/access/access_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ import (
"github.com/onflow/flow-go/storage"
bstorage "github.com/onflow/flow-go/storage/badger"
"github.com/onflow/flow-go/storage/badger/operation"
"github.com/onflow/flow-go/storage/operation/badgerimpl"
"github.com/onflow/flow-go/storage/store"
"github.com/onflow/flow-go/storage/util"
"github.com/onflow/flow-go/utils/unittest"
"github.com/onflow/flow-go/utils/unittest/mocks"
Expand Down Expand Up @@ -684,14 +686,13 @@ func (suite *Suite) TestGetSealedTransaction() {
)
require.NoError(suite.T(), err)

lastFullBlockHeight, err := counters.NewPersistentStrictMonotonicCounter(
bstorage.NewConsumerProgress(db, module.ConsumeProgressLastFullBlockHeight),
suite.rootBlock.Height,
)
progress, err := store.NewConsumerProgress(badgerimpl.ToDB(db), module.ConsumeProgressLastFullBlockHeight).Initialize(suite.rootBlock.Height)
require.NoError(suite.T(), err)
lastFullBlockHeight, err := counters.NewPersistentStrictMonotonicCounter(progress)
require.NoError(suite.T(), err)

// create the ingest engine
processedHeight := bstorage.NewConsumerProgress(db, module.ConsumeProgressIngestionEngineBlockHeight)
processedHeight := store.NewConsumerProgress(badgerimpl.ToDB(db), module.ConsumeProgressIngestionEngineBlockHeight)

ingestEng, err := ingestion.New(
suite.log,
Expand Down Expand Up @@ -874,12 +875,13 @@ func (suite *Suite) TestGetTransactionResult() {
)
require.NoError(suite.T(), err)

processedHeight := bstorage.NewConsumerProgress(db, module.ConsumeProgressIngestionEngineBlockHeight)
processedHeightInitializer := store.NewConsumerProgress(badgerimpl.ToDB(db), module.ConsumeProgressIngestionEngineBlockHeight)

lastFullBlockHeight, err := counters.NewPersistentStrictMonotonicCounter(
bstorage.NewConsumerProgress(db, module.ConsumeProgressLastFullBlockHeight),
suite.rootBlock.Height,
)
lastFullBlockHeightProgress, err := store.NewConsumerProgress(badgerimpl.ToDB(db), module.ConsumeProgressLastFullBlockHeight).
Initialize(suite.rootBlock.Height)
require.NoError(suite.T(), err)

lastFullBlockHeight, err := counters.NewPersistentStrictMonotonicCounter(lastFullBlockHeightProgress)
require.NoError(suite.T(), err)

// create the ingest engine
Expand All @@ -896,7 +898,7 @@ func (suite *Suite) TestGetTransactionResult() {
results,
receipts,
collectionExecutedMetric,
processedHeight,
processedHeightInitializer,
lastFullBlockHeight,
nil,
)
Expand Down Expand Up @@ -1130,12 +1132,13 @@ func (suite *Suite) TestExecuteScript() {
suite.net.On("Register", channels.ReceiveReceipts, mock.Anything).Return(conduit, nil).
Once()

processedHeight := bstorage.NewConsumerProgress(db, module.ConsumeProgressIngestionEngineBlockHeight)
processedHeightInitializer := store.NewConsumerProgress(badgerimpl.ToDB(db), module.ConsumeProgressIngestionEngineBlockHeight)

lastFullBlockHeight, err := counters.NewPersistentStrictMonotonicCounter(
bstorage.NewConsumerProgress(db, module.ConsumeProgressLastFullBlockHeight),
suite.rootBlock.Height,
)
lastFullBlockHeightInitializer := store.NewConsumerProgress(badgerimpl.ToDB(db), module.ConsumeProgressLastFullBlockHeight)
lastFullBlockHeightProgress, err := lastFullBlockHeightInitializer.Initialize(suite.rootBlock.Height)
require.NoError(suite.T(), err)

lastFullBlockHeight, err := counters.NewPersistentStrictMonotonicCounter(lastFullBlockHeightProgress)
require.NoError(suite.T(), err)

// create the ingest engine
Expand All @@ -1152,7 +1155,7 @@ func (suite *Suite) TestExecuteScript() {
results,
receipts,
collectionExecutedMetric,
processedHeight,
processedHeightInitializer,
lastFullBlockHeight,
nil,
)
Expand Down
2 changes: 1 addition & 1 deletion engine/access/ingestion/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func New(
executionResults storage.ExecutionResults,
executionReceipts storage.ExecutionReceipts,
collectionExecutedMetric module.CollectionExecutedMetric,
finalizedProcessedHeight storage.ConsumerProgress,
finalizedProcessedHeight storage.ConsumerProgressInitializer,
lastFullBlockHeight *counters.PersistentStrictMonotonicCounter,
txErrorMessagesCore *tx_error_messages.TxErrorMessagesCore,
) (*Engine, error) {
Expand Down
16 changes: 8 additions & 8 deletions engine/access/ingestion/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ import (
"github.com/onflow/flow-go/network/mocknetwork"
protocol "github.com/onflow/flow-go/state/protocol/mock"
storerr "github.com/onflow/flow-go/storage"
bstorage "github.com/onflow/flow-go/storage/badger"
storage "github.com/onflow/flow-go/storage/mock"
"github.com/onflow/flow-go/storage/operation/badgerimpl"
"github.com/onflow/flow-go/storage/store"
"github.com/onflow/flow-go/utils/unittest"
"github.com/onflow/flow-go/utils/unittest/mocks"
)
Expand Down Expand Up @@ -186,13 +187,12 @@ func (s *Suite) SetupTest() {

// initIngestionEngine create new instance of ingestion engine and waits when it starts
func (s *Suite) initIngestionEngine(ctx irrecoverable.SignalerContext) *Engine {
processedHeight := bstorage.NewConsumerProgress(s.db, module.ConsumeProgressIngestionEngineBlockHeight)
processedHeightInitializer := store.NewConsumerProgress(badgerimpl.ToDB(s.db), module.ConsumeProgressIngestionEngineBlockHeight)

var err error
s.lastFullBlockHeight, err = counters.NewPersistentStrictMonotonicCounter(
bstorage.NewConsumerProgress(s.db, module.ConsumeProgressLastFullBlockHeight),
s.finalizedBlock.Height,
)
lastFullBlockHeight, err := store.NewConsumerProgress(badgerimpl.ToDB(s.db), module.ConsumeProgressLastFullBlockHeight).Initialize(s.finalizedBlock.Height)
require.NoError(s.T(), err)

s.lastFullBlockHeight, err = counters.NewPersistentStrictMonotonicCounter(lastFullBlockHeight)
require.NoError(s.T(), err)

eng, err := New(
Expand All @@ -208,7 +208,7 @@ func (s *Suite) initIngestionEngine(ctx irrecoverable.SignalerContext) *Engine {
s.results,
s.receipts,
s.collectionExecutedMetric,
processedHeight,
processedHeightInitializer,
s.lastFullBlockHeight,
nil,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func New(
log zerolog.Logger,
state protocol.State,
headers storage.Headers,
txErrorMessagesProcessedHeight storage.ConsumerProgress,
txErrorMessagesProcessedHeight storage.ConsumerProgressInitializer,
txErrorMessagesCore *TxErrorMessagesCore,
) (*Engine, error) {
e := &Engine{
Expand Down
Loading

0 comments on commit 983941b

Please sign in to comment.