From d27f7b74b8cf987f3120f0700cba73bb95391a21 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Fri, 10 Jan 2025 14:16:24 -0800 Subject: [PATCH 01/10] refactor consumer progress --- .../node_builder/access_node_builder.go | 45 ++++++---- cmd/observer/node_builder/observer_builder.go | 24 +++-- cmd/verification_builder.go | 17 ++-- engine/access/access_test.go | 34 ++++---- engine/access/ingestion/engine.go | 2 +- engine/access/ingestion/engine_test.go | 16 ++-- .../tx_error_messages_engine.go | 2 +- .../tx_error_messages_engine_test.go | 7 +- .../backend_stream_transactions_test.go | 11 ++- engine/access/rpc/backend/backend_test.go | 10 +-- engine/testutil/mock/nodes.go | 4 +- engine/testutil/nodes.go | 4 +- .../assigner/blockconsumer/consumer.go | 2 +- .../assigner/blockconsumer/consumer_test.go | 9 +- .../fetcher/chunkconsumer/consumer.go | 4 +- .../fetcher/chunkconsumer/consumer_test.go | 4 +- .../persistent_strict_monotonic_counter.go | 14 +-- ...ersistent_strict_monotonic_counter_test.go | 17 ++-- module/jobqueue/component_consumer.go | 4 +- module/jobqueue/component_consumer_test.go | 5 +- module/jobqueue/consumer.go | 36 ++------ module/jobqueue/consumer_behavior_test.go | 20 +++-- module/jobqueue/consumer_test.go | 40 ++++----- .../state_synchronization/indexer/indexer.go | 4 +- .../indexer/indexer_test.go | 16 +++- .../requester/execution_data_requester.go | 4 +- .../execution_data_requester_test.go | 7 +- storage/badger/consumer_progress.go | 50 ----------- storage/badger/operation/jobs.go | 14 --- storage/consumer_progress.go | 17 +++- storage/mock/consumer_progress.go | 18 ---- storage/mock/consumer_progress_initializer.go | 57 ++++++++++++ storage/operation/consume_progress.go | 15 ++++ storage/pebble/consumer_progress.go | 50 ----------- storage/store/consumer_progress.go | 87 +++++++++++++++++++ 35 files changed, 359 insertions(+), 311 deletions(-) delete mode 100644 storage/badger/consumer_progress.go create mode 100644 storage/mock/consumer_progress_initializer.go create mode 100644 storage/operation/consume_progress.go delete mode 100644 storage/pebble/consumer_progress.go create mode 100644 storage/store/consumer_progress.go diff --git a/cmd/access/node_builder/access_node_builder.go b/cmd/access/node_builder/access_node_builder.go index 4c209b15693..514250e7cde 100644 --- a/cmd/access/node_builder/access_node_builder.go +++ b/cmd/access/node_builder/access_node_builder.go @@ -117,7 +117,10 @@ import ( "github.com/onflow/flow-go/state/protocol/blocktimer" "github.com/onflow/flow-go/storage" bstorage "github.com/onflow/flow-go/storage/badger" + "github.com/onflow/flow-go/storage/operation/badgerimpl" + "github.com/onflow/flow-go/storage/operation/pebbleimpl" pstorage "github.com/onflow/flow-go/storage/pebble" + "github.com/onflow/flow-go/storage/store" "github.com/onflow/flow-go/utils/grpcutils" ) @@ -552,8 +555,8 @@ func (builder *FlowAccessNodeBuilder) BuildConsensusFollower() *FlowAccessNodeBu func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccessNodeBuilder { var ds datastore.Batching var bs network.BlobService - var processedBlockHeight storage.ConsumerProgress - var processedNotifications storage.ConsumerProgress + var processedBlockHeight storage.ConsumerProgressInitializer + var processedNotifications storage.ConsumerProgressInitializer var bsDependable *module.ProxiedReadyDoneAware var execDataDistributor *edrequester.ExecutionDataDistributor var execDataCacheBackend *herocache.BlockExecutionData @@ -607,21 +610,26 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess Module("processed block height consumer progress", func(node *cmd.NodeConfig) error { // Note: progress is stored in the datastore's DB since that is where the jobqueue // writes execution data to. + var db storage.DB if executionDataDBMode == execution_data.ExecutionDataDBModeBadger { - processedBlockHeight = bstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*badger.DB), module.ConsumeProgressExecutionDataRequesterBlockHeight) + db = badgerimpl.ToDB(builder.ExecutionDatastoreManager.DB().(*badger.DB)) } else { - processedBlockHeight = pstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*pebble.DB), module.ConsumeProgressExecutionDataRequesterBlockHeight) + db = pebbleimpl.ToDB(builder.ExecutionDatastoreManager.DB().(*pebble.DB)) } + + processedBlockHeight = store.NewConsumerProgress(db, module.ConsumeProgressExecutionDataRequesterBlockHeight) return nil }). Module("processed notifications consumer progress", func(node *cmd.NodeConfig) error { // Note: progress is stored in the datastore's DB since that is where the jobqueue // writes execution data to. + var db storage.DB if executionDataDBMode == execution_data.ExecutionDataDBModeBadger { - processedNotifications = bstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*badger.DB), module.ConsumeProgressExecutionDataRequesterNotification) + db = badgerimpl.ToDB(builder.ExecutionDatastoreManager.DB().(*badger.DB)) } else { - processedNotifications = pstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*pebble.DB), module.ConsumeProgressExecutionDataRequesterNotification) + db = pebbleimpl.ToDB(builder.ExecutionDatastoreManager.DB().(*pebble.DB)) } + processedNotifications = store.NewConsumerProgress(db, module.ConsumeProgressExecutionDataRequesterNotification) return nil }). Module("blobservice peer manager dependencies", func(node *cmd.NodeConfig) error { @@ -848,7 +856,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess } if builder.executionDataIndexingEnabled { - var indexedBlockHeight storage.ConsumerProgress + var indexedBlockHeight storage.ConsumerProgressInitializer builder. AdminCommand("execute-script", func(config *cmd.NodeConfig) commands.AdminCommand { @@ -856,7 +864,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess }). Module("indexed block height consumer progress", func(node *cmd.NodeConfig) error { // Note: progress is stored in the MAIN db since that is where indexed execution data is stored. - indexedBlockHeight = bstorage.NewConsumerProgress(builder.DB, module.ConsumeProgressExecutionDataIndexerBlockHeight) + indexedBlockHeight = store.NewConsumerProgress(badgerimpl.ToDB(builder.DB), module.ConsumeProgressExecutionDataIndexerBlockHeight) return nil }). Module("transaction results storage", func(node *cmd.NodeConfig) error { @@ -1633,8 +1641,8 @@ func (builder *FlowAccessNodeBuilder) enqueueRelayNetwork() { } func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) { - var processedFinalizedBlockHeight storage.ConsumerProgress - var processedTxErrorMessagesBlockHeight storage.ConsumerProgress + var processedFinalizedBlockHeight storage.ConsumerProgressInitializer + var processedTxErrorMessagesBlockHeight storage.ConsumerProgressInitializer if builder.executionDataSyncEnabled { builder.BuildExecutionSyncComponents() @@ -1838,17 +1846,18 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) { return nil }). Module("processed finalized block height consumer progress", func(node *cmd.NodeConfig) error { - processedFinalizedBlockHeight = bstorage.NewConsumerProgress(builder.DB, module.ConsumeProgressIngestionEngineBlockHeight) + processedFinalizedBlockHeight = store.NewConsumerProgress(badgerimpl.ToDB(builder.DB), module.ConsumeProgressIngestionEngineBlockHeight) return nil }). Module("processed last full block height monotonic consumer progress", func(node *cmd.NodeConfig) error { rootBlockHeight := node.State.Params().FinalizedRoot().Height - var err error - lastFullBlockHeight, err = counters.NewPersistentStrictMonotonicCounter( - bstorage.NewConsumerProgress(builder.DB, module.ConsumeProgressLastFullBlockHeight), - rootBlockHeight, - ) + progress, err := store.NewConsumerProgress(badgerimpl.ToDB(builder.DB), module.ConsumeProgressLastFullBlockHeight).Initialize(rootBlockHeight) + if err != nil { + return err + } + + lastFullBlockHeight, err = counters.NewPersistentStrictMonotonicCounter(progress) if err != nil { return fmt.Errorf("failed to initialize monotonic consumer progress: %w", err) } @@ -2149,8 +2158,8 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) { if builder.storeTxResultErrorMessages { builder.Module("processed error messages block height consumer progress", func(node *cmd.NodeConfig) error { - processedTxErrorMessagesBlockHeight = bstorage.NewConsumerProgress( - builder.DB, + processedTxErrorMessagesBlockHeight = store.NewConsumerProgress( + badgerimpl.ToDB(builder.DB), module.ConsumeProgressEngineTxErrorMessagesBlockHeight, ) return nil diff --git a/cmd/observer/node_builder/observer_builder.go b/cmd/observer/node_builder/observer_builder.go index 6829b115060..0ced9001d4b 100644 --- a/cmd/observer/node_builder/observer_builder.go +++ b/cmd/observer/node_builder/observer_builder.go @@ -108,7 +108,10 @@ import ( "github.com/onflow/flow-go/state/protocol/events/gadgets" "github.com/onflow/flow-go/storage" bstorage "github.com/onflow/flow-go/storage/badger" + "github.com/onflow/flow-go/storage/operation/badgerimpl" + "github.com/onflow/flow-go/storage/operation/pebbleimpl" pstorage "github.com/onflow/flow-go/storage/pebble" + "github.com/onflow/flow-go/storage/store" "github.com/onflow/flow-go/utils/grpcutils" "github.com/onflow/flow-go/utils/io" ) @@ -1057,8 +1060,8 @@ func (builder *ObserverServiceBuilder) Build() (cmd.Node, error) { func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverServiceBuilder { var ds datastore.Batching var bs network.BlobService - var processedBlockHeight storage.ConsumerProgress - var processedNotifications storage.ConsumerProgress + var processedBlockHeight storage.ConsumerProgressInitializer + var processedNotifications storage.ConsumerProgressInitializer var publicBsDependable *module.ProxiedReadyDoneAware var execDataDistributor *edrequester.ExecutionDataDistributor var execDataCacheBackend *herocache.BlockExecutionData @@ -1112,21 +1115,26 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS Module("processed block height consumer progress", func(node *cmd.NodeConfig) error { // Note: progress is stored in the datastore's DB since that is where the jobqueue // writes execution data to. + var db storage.DB if executionDataDBMode == execution_data.ExecutionDataDBModeBadger { - processedBlockHeight = bstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*badger.DB), module.ConsumeProgressExecutionDataRequesterBlockHeight) + db = badgerimpl.ToDB(builder.ExecutionDatastoreManager.DB().(*badger.DB)) } else { - processedBlockHeight = pstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*pebble.DB), module.ConsumeProgressExecutionDataRequesterBlockHeight) + db = pebbleimpl.ToDB(builder.ExecutionDatastoreManager.DB().(*pebble.DB)) } + + processedBlockHeight = store.NewConsumerProgress(db, module.ConsumeProgressExecutionDataRequesterBlockHeight) return nil }). Module("processed notifications consumer progress", func(node *cmd.NodeConfig) error { // Note: progress is stored in the datastore's DB since that is where the jobqueue // writes execution data to. + var db storage.DB if executionDataDBMode == execution_data.ExecutionDataDBModeBadger { - processedNotifications = bstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*badger.DB), module.ConsumeProgressExecutionDataRequesterNotification) + db = badgerimpl.ToDB(builder.ExecutionDatastoreManager.DB().(*badger.DB)) } else { - processedNotifications = pstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*pebble.DB), module.ConsumeProgressExecutionDataRequesterNotification) + db = pebbleimpl.ToDB(builder.ExecutionDatastoreManager.DB().(*pebble.DB)) } + processedNotifications = store.NewConsumerProgress(db, module.ConsumeProgressExecutionDataRequesterNotification) return nil }). Module("blobservice peer manager dependencies", func(node *cmd.NodeConfig) error { @@ -1311,11 +1319,11 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS return builder.ExecutionDataPruner, nil }) if builder.executionDataIndexingEnabled { - var indexedBlockHeight storage.ConsumerProgress + var indexedBlockHeight storage.ConsumerProgressInitializer builder.Module("indexed block height consumer progress", func(node *cmd.NodeConfig) error { // Note: progress is stored in the MAIN db since that is where indexed execution data is stored. - indexedBlockHeight = bstorage.NewConsumerProgress(builder.DB, module.ConsumeProgressExecutionDataIndexerBlockHeight) + indexedBlockHeight = store.NewConsumerProgress(badgerimpl.ToDB(builder.DB), module.ConsumeProgressExecutionDataIndexerBlockHeight) return nil }).Module("transaction results storage", func(node *cmd.NodeConfig) error { builder.Storage.LightTransactionResults = bstorage.NewLightTransactionResults(node.Metrics.Cache, node.DB, bstorage.DefaultCacheSize) diff --git a/cmd/verification_builder.go b/cmd/verification_builder.go index 70c84617f02..f7c27b6455a 100644 --- a/cmd/verification_builder.go +++ b/cmd/verification_builder.go @@ -37,7 +37,10 @@ import ( "github.com/onflow/flow-go/state/protocol" badgerState "github.com/onflow/flow-go/state/protocol/badger" "github.com/onflow/flow-go/state/protocol/blocktimer" + "github.com/onflow/flow-go/storage" "github.com/onflow/flow-go/storage/badger" + "github.com/onflow/flow-go/storage/operation/badgerimpl" + "github.com/onflow/flow-go/storage/store" ) type VerificationConfig struct { @@ -88,11 +91,11 @@ func (v *VerificationNodeBuilder) LoadComponentsAndModules() { var ( followerState protocol.FollowerState - chunkStatuses *stdmap.ChunkStatuses // used in fetcher engine - chunkRequests *stdmap.ChunkRequests // used in requester engine - processedChunkIndex *badger.ConsumerProgress // used in chunk consumer - processedBlockHeight *badger.ConsumerProgress // used in block consumer - chunkQueue *badger.ChunksQueue // used in chunk consumer + chunkStatuses *stdmap.ChunkStatuses // used in fetcher engine + chunkRequests *stdmap.ChunkRequests // used in requester engine + processedChunkIndex storage.ConsumerProgressInitializer // used in chunk consumer + processedBlockHeight storage.ConsumerProgressInitializer // used in block consumer + chunkQueue *badger.ChunksQueue // used in chunk consumer syncCore *chainsync.Core // used in follower engine assignerEngine *assigner.Engine // the assigner engine @@ -155,11 +158,11 @@ func (v *VerificationNodeBuilder) LoadComponentsAndModules() { return nil }). Module("processed chunk index consumer progress", func(node *NodeConfig) error { - processedChunkIndex = badger.NewConsumerProgress(node.DB, module.ConsumeProgressVerificationChunkIndex) + processedChunkIndex = store.NewConsumerProgress(badgerimpl.ToDB(node.DB), module.ConsumeProgressVerificationChunkIndex) return nil }). Module("processed block height consumer progress", func(node *NodeConfig) error { - processedBlockHeight = badger.NewConsumerProgress(node.DB, module.ConsumeProgressVerificationBlockHeight) + processedBlockHeight = store.NewConsumerProgress(badgerimpl.ToDB(node.DB), module.ConsumeProgressVerificationBlockHeight) return nil }). Module("chunks queue", func(node *NodeConfig) error { diff --git a/engine/access/access_test.go b/engine/access/access_test.go index 7d12c70cdcf..fc7bfda6f79 100644 --- a/engine/access/access_test.go +++ b/engine/access/access_test.go @@ -45,6 +45,8 @@ import ( "github.com/onflow/flow-go/storage" bstorage "github.com/onflow/flow-go/storage/badger" "github.com/onflow/flow-go/storage/badger/operation" + "github.com/onflow/flow-go/storage/operation/badgerimpl" + "github.com/onflow/flow-go/storage/store" "github.com/onflow/flow-go/storage/util" "github.com/onflow/flow-go/utils/unittest" "github.com/onflow/flow-go/utils/unittest/mocks" @@ -684,14 +686,13 @@ func (suite *Suite) TestGetSealedTransaction() { ) require.NoError(suite.T(), err) - lastFullBlockHeight, err := counters.NewPersistentStrictMonotonicCounter( - bstorage.NewConsumerProgress(db, module.ConsumeProgressLastFullBlockHeight), - suite.rootBlock.Height, - ) + progress, err := store.NewConsumerProgress(badgerimpl.ToDB(db), module.ConsumeProgressLastFullBlockHeight).Initialize(suite.rootBlock.Height) + require.NoError(suite.T(), err) + lastFullBlockHeight, err := counters.NewPersistentStrictMonotonicCounter(progress) require.NoError(suite.T(), err) // create the ingest engine - processedHeight := bstorage.NewConsumerProgress(db, module.ConsumeProgressIngestionEngineBlockHeight) + processedHeight := store.NewConsumerProgress(badgerimpl.ToDB(db), module.ConsumeProgressIngestionEngineBlockHeight) ingestEng, err := ingestion.New( suite.log, @@ -874,12 +875,12 @@ func (suite *Suite) TestGetTransactionResult() { ) require.NoError(suite.T(), err) - processedHeight := bstorage.NewConsumerProgress(db, module.ConsumeProgressIngestionEngineBlockHeight) + processedHeightInitializer := store.NewConsumerProgress(badgerimpl.ToDB(db), module.ConsumeProgressIngestionEngineBlockHeight) - lastFullBlockHeight, err := counters.NewPersistentStrictMonotonicCounter( - bstorage.NewConsumerProgress(db, module.ConsumeProgressLastFullBlockHeight), - suite.rootBlock.Height, - ) + processedHeight, err := processedHeightInitializer.Initialize(suite.rootBlock.Height) + require.NoError(suite.T(), err) + + lastFullBlockHeight, err := counters.NewPersistentStrictMonotonicCounter(processedHeight) require.NoError(suite.T(), err) // create the ingest engine @@ -896,7 +897,7 @@ func (suite *Suite) TestGetTransactionResult() { results, receipts, collectionExecutedMetric, - processedHeight, + processedHeightInitializer, lastFullBlockHeight, nil, ) @@ -1130,12 +1131,11 @@ func (suite *Suite) TestExecuteScript() { suite.net.On("Register", channels.ReceiveReceipts, mock.Anything).Return(conduit, nil). Once() - processedHeight := bstorage.NewConsumerProgress(db, module.ConsumeProgressIngestionEngineBlockHeight) + processedHeightInitializer := store.NewConsumerProgress(badgerimpl.ToDB(db), module.ConsumeProgressIngestionEngineBlockHeight) + processedHeight, err := processedHeightInitializer.Initialize(suite.rootBlock.Height) + require.NoError(suite.T(), err) - lastFullBlockHeight, err := counters.NewPersistentStrictMonotonicCounter( - bstorage.NewConsumerProgress(db, module.ConsumeProgressLastFullBlockHeight), - suite.rootBlock.Height, - ) + lastFullBlockHeight, err := counters.NewPersistentStrictMonotonicCounter(processedHeight) require.NoError(suite.T(), err) // create the ingest engine @@ -1152,7 +1152,7 @@ func (suite *Suite) TestExecuteScript() { results, receipts, collectionExecutedMetric, - processedHeight, + processedHeightInitializer, lastFullBlockHeight, nil, ) diff --git a/engine/access/ingestion/engine.go b/engine/access/ingestion/engine.go index 0e4ac42367e..ed7c01fdbbb 100644 --- a/engine/access/ingestion/engine.go +++ b/engine/access/ingestion/engine.go @@ -125,7 +125,7 @@ func New( executionResults storage.ExecutionResults, executionReceipts storage.ExecutionReceipts, collectionExecutedMetric module.CollectionExecutedMetric, - finalizedProcessedHeight storage.ConsumerProgress, + finalizedProcessedHeight storage.ConsumerProgressInitializer, lastFullBlockHeight *counters.PersistentStrictMonotonicCounter, txErrorMessagesCore *tx_error_messages.TxErrorMessagesCore, ) (*Engine, error) { diff --git a/engine/access/ingestion/engine_test.go b/engine/access/ingestion/engine_test.go index 2f5b0169b34..abeb6c1caf1 100644 --- a/engine/access/ingestion/engine_test.go +++ b/engine/access/ingestion/engine_test.go @@ -31,8 +31,9 @@ import ( "github.com/onflow/flow-go/network/mocknetwork" protocol "github.com/onflow/flow-go/state/protocol/mock" storerr "github.com/onflow/flow-go/storage" - bstorage "github.com/onflow/flow-go/storage/badger" storage "github.com/onflow/flow-go/storage/mock" + "github.com/onflow/flow-go/storage/operation/badgerimpl" + "github.com/onflow/flow-go/storage/store" "github.com/onflow/flow-go/utils/unittest" "github.com/onflow/flow-go/utils/unittest/mocks" ) @@ -186,13 +187,12 @@ func (s *Suite) SetupTest() { // initIngestionEngine create new instance of ingestion engine and waits when it starts func (s *Suite) initIngestionEngine(ctx irrecoverable.SignalerContext) *Engine { - processedHeight := bstorage.NewConsumerProgress(s.db, module.ConsumeProgressIngestionEngineBlockHeight) + processedHeightInitializer := store.NewConsumerProgress(badgerimpl.ToDB(s.db), module.ConsumeProgressIngestionEngineBlockHeight) - var err error - s.lastFullBlockHeight, err = counters.NewPersistentStrictMonotonicCounter( - bstorage.NewConsumerProgress(s.db, module.ConsumeProgressLastFullBlockHeight), - s.finalizedBlock.Height, - ) + lastFullBlockHeight, err := store.NewConsumerProgress(badgerimpl.ToDB(s.db), module.ConsumeProgressLastFullBlockHeight).Initialize(s.finalizedBlock.Height) + require.NoError(s.T(), err) + + s.lastFullBlockHeight, err = counters.NewPersistentStrictMonotonicCounter(lastFullBlockHeight) require.NoError(s.T(), err) eng, err := New( @@ -208,7 +208,7 @@ func (s *Suite) initIngestionEngine(ctx irrecoverable.SignalerContext) *Engine { s.results, s.receipts, s.collectionExecutedMetric, - processedHeight, + processedHeightInitializer, s.lastFullBlockHeight, nil, ) diff --git a/engine/access/ingestion/tx_error_messages/tx_error_messages_engine.go b/engine/access/ingestion/tx_error_messages/tx_error_messages_engine.go index cdd65bdc0b3..036cd70c0df 100644 --- a/engine/access/ingestion/tx_error_messages/tx_error_messages_engine.go +++ b/engine/access/ingestion/tx_error_messages/tx_error_messages_engine.go @@ -69,7 +69,7 @@ func New( log zerolog.Logger, state protocol.State, headers storage.Headers, - txErrorMessagesProcessedHeight storage.ConsumerProgress, + txErrorMessagesProcessedHeight storage.ConsumerProgressInitializer, txErrorMessagesCore *TxErrorMessagesCore, ) (*Engine, error) { e := &Engine{ diff --git a/engine/access/ingestion/tx_error_messages/tx_error_messages_engine_test.go b/engine/access/ingestion/tx_error_messages/tx_error_messages_engine_test.go index b1edcffb9a5..a86e1943bb1 100644 --- a/engine/access/ingestion/tx_error_messages/tx_error_messages_engine_test.go +++ b/engine/access/ingestion/tx_error_messages/tx_error_messages_engine_test.go @@ -23,8 +23,9 @@ import ( "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/module/irrecoverable" protocol "github.com/onflow/flow-go/state/protocol/mock" - bstorage "github.com/onflow/flow-go/storage/badger" storage "github.com/onflow/flow-go/storage/mock" + "github.com/onflow/flow-go/storage/operation/badgerimpl" + "github.com/onflow/flow-go/storage/store" "github.com/onflow/flow-go/utils/unittest" "github.com/onflow/flow-go/utils/unittest/mocks" ) @@ -131,8 +132,8 @@ func (s *TxErrorMessagesEngineSuite) SetupTest() { // initEngine creates a new instance of the transaction error messages engine // and waits for it to start. It initializes the engine with mocked components and state. func (s *TxErrorMessagesEngineSuite) initEngine(ctx irrecoverable.SignalerContext) *Engine { - processedTxErrorMessagesBlockHeight := bstorage.NewConsumerProgress( - s.db, + processedTxErrorMessagesBlockHeight := store.NewConsumerProgress( + badgerimpl.ToDB(s.db), module.ConsumeProgressEngineTxErrorMessagesBlockHeight, ) diff --git a/engine/access/rpc/backend/backend_stream_transactions_test.go b/engine/access/rpc/backend/backend_stream_transactions_test.go index 52049a4b0ef..aae96fa61c0 100644 --- a/engine/access/rpc/backend/backend_stream_transactions_test.go +++ b/engine/access/rpc/backend/backend_stream_transactions_test.go @@ -32,8 +32,9 @@ import ( syncmock "github.com/onflow/flow-go/module/state_synchronization/mock" protocolint "github.com/onflow/flow-go/state/protocol" protocol "github.com/onflow/flow-go/state/protocol/mock" - bstorage "github.com/onflow/flow-go/storage/badger" storagemock "github.com/onflow/flow-go/storage/mock" + "github.com/onflow/flow-go/storage/operation/badgerimpl" + "github.com/onflow/flow-go/storage/store" "github.com/onflow/flow-go/utils/unittest" "github.com/onflow/flow-go/utils/unittest/mocks" @@ -136,11 +137,9 @@ func (s *TransactionStatusSuite) SetupTest() { rootResult := unittest.ExecutionResultFixture(unittest.WithBlock(&s.rootBlock)) s.resultsMap[s.rootBlock.ID()] = rootResult - var err error - s.lastFullBlockHeight, err = counters.NewPersistentStrictMonotonicCounter( - bstorage.NewConsumerProgress(s.db, module.ConsumeProgressLastFullBlockHeight), - s.rootBlock.Header.Height, - ) + progress, err := store.NewConsumerProgress(badgerimpl.ToDB(s.db), module.ConsumeProgressLastFullBlockHeight).Initialize(s.rootBlock.Header.Height) + require.NoError(s.T(), err) + s.lastFullBlockHeight, err = counters.NewPersistentStrictMonotonicCounter(progress) require.NoError(s.T(), err) s.sealedBlock = &s.rootBlock diff --git a/engine/access/rpc/backend/backend_test.go b/engine/access/rpc/backend/backend_test.go index a3068e32645..f6459bca0d4 100644 --- a/engine/access/rpc/backend/backend_test.go +++ b/engine/access/rpc/backend/backend_test.go @@ -45,8 +45,9 @@ import ( protocol "github.com/onflow/flow-go/state/protocol/mock" "github.com/onflow/flow-go/state/protocol/util" "github.com/onflow/flow-go/storage" - bstorage "github.com/onflow/flow-go/storage/badger" storagemock "github.com/onflow/flow-go/storage/mock" + "github.com/onflow/flow-go/storage/operation/badgerimpl" + "github.com/onflow/flow-go/storage/store" "github.com/onflow/flow-go/utils/unittest" "github.com/onflow/flow-go/utils/unittest/generator" "github.com/onflow/flow-go/utils/unittest/mocks" @@ -134,10 +135,9 @@ func (suite *Suite) SetupTest() { suite.Require().NoError(err) suite.db, suite.dbDir = unittest.TempBadgerDB(suite.T()) - suite.lastFullBlockHeight, err = counters.NewPersistentStrictMonotonicCounter( - bstorage.NewConsumerProgress(suite.db, module.ConsumeProgressLastFullBlockHeight), - 0, - ) + progress, err := store.NewConsumerProgress(badgerimpl.ToDB(suite.db), module.ConsumeProgressLastFullBlockHeight).Initialize(0) + require.NoError(suite.T(), err) + suite.lastFullBlockHeight, err = counters.NewPersistentStrictMonotonicCounter(progress) suite.Require().NoError(err) } diff --git a/engine/testutil/mock/nodes.go b/engine/testutil/mock/nodes.go index ff9654eacef..71f991d0334 100644 --- a/engine/testutil/mock/nodes.go +++ b/engine/testutil/mock/nodes.go @@ -282,12 +282,12 @@ type VerificationNode struct { Receipts storage.ExecutionReceipts // chunk consumer and processor for fetcher engine - ProcessedChunkIndex storage.ConsumerProgress + ProcessedChunkIndex storage.ConsumerProgressInitializer ChunksQueue *bstorage.ChunksQueue ChunkConsumer *chunkconsumer.ChunkConsumer // block consumer for chunk consumer - ProcessedBlockHeight storage.ConsumerProgress + ProcessedBlockHeight storage.ConsumerProgressInitializer BlockConsumer *blockconsumer.BlockConsumer VerifierEngine *verifier.Engine diff --git a/engine/testutil/nodes.go b/engine/testutil/nodes.go index 137d1c94207..db1c7be4085 100644 --- a/engine/testutil/nodes.go +++ b/engine/testutil/nodes.go @@ -989,7 +989,7 @@ func VerificationNode(t testing.TB, } if node.ProcessedChunkIndex == nil { - node.ProcessedChunkIndex = storage.NewConsumerProgress(node.PublicDB, module.ConsumeProgressVerificationChunkIndex) + node.ProcessedChunkIndex = store.NewConsumerProgress(badgerimpl.ToDB(node.PublicDB), module.ConsumeProgressVerificationChunkIndex) } if node.ChunksQueue == nil { @@ -1000,7 +1000,7 @@ func VerificationNode(t testing.TB, } if node.ProcessedBlockHeight == nil { - node.ProcessedBlockHeight = storage.NewConsumerProgress(node.PublicDB, module.ConsumeProgressVerificationBlockHeight) + node.ProcessedBlockHeight = store.NewConsumerProgress(badgerimpl.ToDB(node.PublicDB), module.ConsumeProgressVerificationBlockHeight) } if node.VerifierEngine == nil { diff --git a/engine/verification/assigner/blockconsumer/consumer.go b/engine/verification/assigner/blockconsumer/consumer.go index 7b6341be000..89871fecb65 100644 --- a/engine/verification/assigner/blockconsumer/consumer.go +++ b/engine/verification/assigner/blockconsumer/consumer.go @@ -42,7 +42,7 @@ func defaultProcessedIndex(state protocol.State) (uint64, error) { // index for initializing the processed index in storage. func NewBlockConsumer(log zerolog.Logger, metrics module.VerificationMetrics, - processedHeight storage.ConsumerProgress, + processedHeight storage.ConsumerProgressInitializer, blocks storage.Blocks, state protocol.State, blockProcessor assigner.FinalizedBlockProcessor, diff --git a/engine/verification/assigner/blockconsumer/consumer_test.go b/engine/verification/assigner/blockconsumer/consumer_test.go index 26b784b03db..d5ea7d82c7f 100644 --- a/engine/verification/assigner/blockconsumer/consumer_test.go +++ b/engine/verification/assigner/blockconsumer/consumer_test.go @@ -5,7 +5,6 @@ import ( "testing" "time" - "github.com/dgraph-io/badger/v2" "github.com/stretchr/testify/require" "github.com/onflow/flow-go/consensus/hotstuff/model" @@ -18,7 +17,9 @@ import ( "github.com/onflow/flow-go/module/jobqueue" "github.com/onflow/flow-go/module/metrics" "github.com/onflow/flow-go/module/trace" - bstorage "github.com/onflow/flow-go/storage/badger" + "github.com/onflow/flow-go/storage" + "github.com/onflow/flow-go/storage/operation/dbtest" + "github.com/onflow/flow-go/storage/store" "github.com/onflow/flow-go/utils/unittest" ) @@ -117,10 +118,10 @@ func withConsumer( process func(notifier module.ProcessingNotifier, block *flow.Block), withBlockConsumer func(*blockconsumer.BlockConsumer, []*flow.Block), ) { - unittest.RunWithBadgerDB(t, func(db *badger.DB) { + dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { maxProcessing := uint64(workerCount) - processedHeight := bstorage.NewConsumerProgress(db, module.ConsumeProgressVerificationBlockHeight) + processedHeight := store.NewConsumerProgress(db, module.ConsumeProgressVerificationBlockHeight) collector := &metrics.NoopCollector{} tracer := trace.NewNoopTracer() log := unittest.Logger() diff --git a/engine/verification/fetcher/chunkconsumer/consumer.go b/engine/verification/fetcher/chunkconsumer/consumer.go index 703b51d5d7e..66a71aa3dca 100644 --- a/engine/verification/fetcher/chunkconsumer/consumer.go +++ b/engine/verification/fetcher/chunkconsumer/consumer.go @@ -29,7 +29,7 @@ type ChunkConsumer struct { func NewChunkConsumer( log zerolog.Logger, metrics module.VerificationMetrics, - processedIndex storage.ConsumerProgress, // to persist the processed index + processedIndexInitializer storage.ConsumerProgressInitializer, // to persist the processed index chunksQueue storage.ChunksQueue, // to read jobs (chunks) from chunkProcessor fetcher.AssignedChunkProcessor, // to process jobs (chunks) maxProcessing uint64, // max number of jobs to be processed in parallel @@ -40,7 +40,7 @@ func NewChunkConsumer( jobs := &ChunkJobs{locators: chunksQueue} lg := log.With().Str("module", "chunk_consumer").Logger() - consumer, err := jobqueue.NewConsumer(lg, jobs, processedIndex, worker, maxProcessing, 0, DefaultJobIndex) + consumer, err := jobqueue.NewConsumer(lg, jobs, processedIndexInitializer, worker, maxProcessing, 0, DefaultJobIndex) if err != nil { return nil, err } diff --git a/engine/verification/fetcher/chunkconsumer/consumer_test.go b/engine/verification/fetcher/chunkconsumer/consumer_test.go index 1aabce2bd14..91f4ec23dbf 100644 --- a/engine/verification/fetcher/chunkconsumer/consumer_test.go +++ b/engine/verification/fetcher/chunkconsumer/consumer_test.go @@ -15,6 +15,8 @@ import ( "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/module/metrics" storage "github.com/onflow/flow-go/storage/badger" + "github.com/onflow/flow-go/storage/operation/badgerimpl" + "github.com/onflow/flow-go/storage/store" "github.com/onflow/flow-go/utils/unittest" ) @@ -144,7 +146,7 @@ func WithConsumer( unittest.RunWithBadgerDB(t, func(db *badger.DB) { maxProcessing := uint64(3) - processedIndex := storage.NewConsumerProgress(db, module.ConsumeProgressVerificationChunkIndex) + processedIndex := store.NewConsumerProgress(badgerimpl.ToDB(db), module.ConsumeProgressVerificationChunkIndex) chunksQueue := storage.NewChunkQueue(db) ok, err := chunksQueue.Init(chunkconsumer.DefaultJobIndex) require.NoError(t, err) diff --git a/module/counters/persistent_strict_monotonic_counter.go b/module/counters/persistent_strict_monotonic_counter.go index caaf7b45919..ae5875f19cd 100644 --- a/module/counters/persistent_strict_monotonic_counter.go +++ b/module/counters/persistent_strict_monotonic_counter.go @@ -20,13 +20,12 @@ type PersistentStrictMonotonicCounter struct { counter StrictMonotonousCounter } -// NewPersistentStrictMonotonicCounter creates a new PersistentStrictMonotonicCounter which inserts the default -// processed index to the storage layer and creates new counter with defaultIndex value. +// NewPersistentStrictMonotonicCounter creates a new PersistentStrictMonotonicCounter. // The consumer progress and associated db entry must not be accessed outside of calls to the returned object, // otherwise the state may become inconsistent. // // No errors are expected during normal operation. -func NewPersistentStrictMonotonicCounter(consumerProgress storage.ConsumerProgress, defaultIndex uint64) (*PersistentStrictMonotonicCounter, error) { +func NewPersistentStrictMonotonicCounter(consumerProgress storage.ConsumerProgress) (*PersistentStrictMonotonicCounter, error) { m := &PersistentStrictMonotonicCounter{ consumerProgress: consumerProgress, } @@ -34,14 +33,7 @@ func NewPersistentStrictMonotonicCounter(consumerProgress storage.ConsumerProgre // sync with storage for the processed index to ensure the consistency value, err := m.consumerProgress.ProcessedIndex() if err != nil { - if !errors.Is(err, storage.ErrNotFound) { - return nil, fmt.Errorf("could not read consumer progress: %w", err) - } - err := m.consumerProgress.InitProcessedIndex(defaultIndex) - if err != nil { - return nil, fmt.Errorf("could not init consumer progress: %w", err) - } - value = defaultIndex + return nil, fmt.Errorf("failed to get processed index: %w", err) } m.counter = NewMonotonousCounter(value) diff --git a/module/counters/persistent_strict_monotonic_counter_test.go b/module/counters/persistent_strict_monotonic_counter_test.go index 62a1adedf22..caf9c9c6642 100644 --- a/module/counters/persistent_strict_monotonic_counter_test.go +++ b/module/counters/persistent_strict_monotonic_counter_test.go @@ -8,17 +8,17 @@ import ( "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/module/counters" - bstorage "github.com/onflow/flow-go/storage/badger" + "github.com/onflow/flow-go/storage/operation/badgerimpl" + "github.com/onflow/flow-go/storage/store" "github.com/onflow/flow-go/utils/unittest" ) func TestMonotonicConsumer(t *testing.T) { unittest.RunWithBadgerDB(t, func(db *badger.DB) { var height1 = uint64(1234) - persistentStrictMonotonicCounter, err := counters.NewPersistentStrictMonotonicCounter( - bstorage.NewConsumerProgress(db, module.ConsumeProgressLastFullBlockHeight), - height1, - ) + progress, err := store.NewConsumerProgress(badgerimpl.ToDB(db), module.ConsumeProgressLastFullBlockHeight).Initialize(height1) + require.NoError(t, err) + persistentStrictMonotonicCounter, err := counters.NewPersistentStrictMonotonicCounter(progress) require.NoError(t, err) // check value can be retrieved @@ -40,11 +40,10 @@ func TestMonotonicConsumer(t *testing.T) { actual = persistentStrictMonotonicCounter.Value() require.Equal(t, height2, actual) + progress2, err := store.NewConsumerProgress(badgerimpl.ToDB(db), module.ConsumeProgressLastFullBlockHeight).Initialize(height1) + require.NoError(t, err) // check that new persistent strict monotonic counter has the same value - persistentStrictMonotonicCounter2, err := counters.NewPersistentStrictMonotonicCounter( - bstorage.NewConsumerProgress(db, module.ConsumeProgressLastFullBlockHeight), - height1, - ) + persistentStrictMonotonicCounter2, err := counters.NewPersistentStrictMonotonicCounter(progress2) require.NoError(t, err) // check that the value still the same diff --git a/module/jobqueue/component_consumer.go b/module/jobqueue/component_consumer.go index 1b174e712ad..457aed3f804 100644 --- a/module/jobqueue/component_consumer.go +++ b/module/jobqueue/component_consumer.go @@ -27,7 +27,7 @@ type ComponentConsumer struct { func NewComponentConsumer( log zerolog.Logger, workSignal <-chan struct{}, - progress storage.ConsumerProgress, + progressInitializer storage.ConsumerProgressInitializer, jobs module.Jobs, defaultIndex uint64, processor JobProcessor, // method used to process jobs @@ -48,7 +48,7 @@ func NewComponentConsumer( maxProcessing, ) - consumer, err := NewConsumer(log, jobs, progress, worker, maxProcessing, maxSearchAhead, defaultIndex) + consumer, err := NewConsumer(log, jobs, progressInitializer, worker, maxProcessing, maxSearchAhead, defaultIndex) if err != nil { return nil, err } diff --git a/module/jobqueue/component_consumer_test.go b/module/jobqueue/component_consumer_test.go index de9d13b5981..44d210961c6 100644 --- a/module/jobqueue/component_consumer_test.go +++ b/module/jobqueue/component_consumer_test.go @@ -89,10 +89,13 @@ func (suite *ComponentConsumerSuite) prepareTest( progress.On("ProcessedIndex").Return(suite.defaultIndex, nil) progress.On("SetProcessedIndex", mock.AnythingOfType("uint64")).Return(nil) + progressInitializer := new(storagemock.ConsumerProgressInitializer) + progressInitializer.On("Initialize", mock.AnythingOfType("uint64")).Return(progress, nil) + consumer, err := NewComponentConsumer( zerolog.New(os.Stdout).With().Timestamp().Logger(), workSignal, - progress, + progressInitializer, jobs, suite.defaultIndex, processor, diff --git a/module/jobqueue/consumer.go b/module/jobqueue/consumer.go index 17e309a929c..c20e05c01fa 100644 --- a/module/jobqueue/consumer.go +++ b/module/jobqueue/consumer.go @@ -50,14 +50,19 @@ type Consumer struct { func NewConsumer( log zerolog.Logger, jobs module.Jobs, - progress storage.ConsumerProgress, + progressInitializer storage.ConsumerProgressInitializer, worker Worker, maxProcessing uint64, maxSearchAhead uint64, defaultIndex uint64, ) (*Consumer, error) { - processedIndex, err := readProcessedIndex(log, progress, defaultIndex) + progress, err := progressInitializer.Initialize(defaultIndex) + if err != nil { + return nil, fmt.Errorf("could not initialize processed index: %w", err) + } + + processedIndex, err := progress.ProcessedIndex() if err != nil { return nil, fmt.Errorf("could not read processed index: %w", err) } @@ -84,33 +89,6 @@ func NewConsumer( }, nil } -func readProcessedIndex(log zerolog.Logger, progress storage.ConsumerProgress, defaultIndex uint64) (uint64, error) { - // on startup, sync with storage for the processed index - // to ensure the consistency - processedIndex, err := progress.ProcessedIndex() - if errors.Is(err, storage.ErrNotFound) { - err := progress.InitProcessedIndex(defaultIndex) - if errors.Is(err, storage.ErrAlreadyExists) { - return 0, fmt.Errorf("processed index has already been inited, no effect for the second time. default index: %v", - defaultIndex) - } - - if err != nil { - return 0, fmt.Errorf("could not init processed index: %w", err) - } - - log.Warn().Uint64("processed index", processedIndex). - Msg("processed index not found, initialized.") - return defaultIndex, nil - } - - if err != nil { - return 0, fmt.Errorf("could not read processed index: %w", err) - } - - return processedIndex, nil -} - // Start starts consuming the jobs from the job queue. func (c *Consumer) Start() error { c.mu.Lock() diff --git a/module/jobqueue/consumer_behavior_test.go b/module/jobqueue/consumer_behavior_test.go index 98fc7395377..3f90e2bd623 100644 --- a/module/jobqueue/consumer_behavior_test.go +++ b/module/jobqueue/consumer_behavior_test.go @@ -14,7 +14,8 @@ import ( "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/module/jobqueue" "github.com/onflow/flow-go/storage" - "github.com/onflow/flow-go/storage/badger" + "github.com/onflow/flow-go/storage/operation/badgerimpl" + "github.com/onflow/flow-go/storage/store" "github.com/onflow/flow-go/utils/unittest" ) @@ -469,16 +470,19 @@ func testWorkOnNextAfterFastforward(t *testing.T) { // rebuild a consumer with the dependencies to simulate a restart // jobs need to be reused, since it stores all the jobs reWorker := newMockWorker() - reProgress := badger.NewConsumerProgress(db, ConsumerTag) + reProgress := store.NewConsumerProgress(badgerimpl.ToDB(db), ConsumerTag) reConsumer := newTestConsumer(t, reProgress, j, reWorker, 0, DefaultIndex) - err := reConsumer.Start() + progress, err := reProgress.Initialize(DefaultIndex) + require.NoError(t, err) + + err = reConsumer.Start() require.NoError(t, err) time.Sleep(1 * time.Millisecond) reWorker.AssertCalled(t, []int64{4, 5, 6}) - assertProcessed(t, reProgress, 3) + assertProcessed(t, progress, 3) }) } @@ -560,8 +564,10 @@ func runWithSeatchAhead(t testing.TB, maxSearchAhead uint64, defaultIndex uint64 unittest.RunWithBadgerDB(t, func(db *badgerdb.DB) { jobs := jobqueue.NewMockJobs() worker := newMockWorker() - progress := badger.NewConsumerProgress(db, ConsumerTag) - consumer := newTestConsumer(t, progress, jobs, worker, maxSearchAhead, defaultIndex) + progressInitializer := store.NewConsumerProgress(badgerimpl.ToDB(db), ConsumerTag) + consumer := newTestConsumer(t, progressInitializer, jobs, worker, maxSearchAhead, defaultIndex) + progress, err := progressInitializer.Initialize(defaultIndex) + require.NoError(t, err) runTestWith(consumer, progress, worker, jobs, db) }) } @@ -572,7 +578,7 @@ func assertProcessed(t testing.TB, cp storage.ConsumerProgress, expectProcessed require.Equal(t, expectProcessed, processed) } -func newTestConsumer(t testing.TB, cp storage.ConsumerProgress, jobs module.Jobs, worker jobqueue.Worker, maxSearchAhead uint64, defaultIndex uint64) module.JobConsumer { +func newTestConsumer(t testing.TB, cp storage.ConsumerProgressInitializer, jobs module.Jobs, worker jobqueue.Worker, maxSearchAhead uint64, defaultIndex uint64) module.JobConsumer { log := unittest.Logger().With().Str("module", "consumer").Logger() maxProcessing := uint64(3) c, err := jobqueue.NewConsumer(log, jobs, cp, worker, maxProcessing, maxSearchAhead, defaultIndex) diff --git a/module/jobqueue/consumer_test.go b/module/jobqueue/consumer_test.go index 90db5332f79..4c672d73876 100644 --- a/module/jobqueue/consumer_test.go +++ b/module/jobqueue/consumer_test.go @@ -7,14 +7,14 @@ import ( "testing" "time" - badgerdb "github.com/dgraph-io/badger/v2" "github.com/rs/zerolog" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/storage" - "github.com/onflow/flow-go/storage/badger" + "github.com/onflow/flow-go/storage/operation/dbtest" + "github.com/onflow/flow-go/storage/store" "github.com/onflow/flow-go/utils/unittest" ) @@ -157,22 +157,16 @@ func TestProcessableJobs(t *testing.T) { // Test after jobs have been processed, the job status are removed to prevent from memory-leak func TestProcessedIndexDeletion(t *testing.T) { - setup := func(t *testing.T, f func(c *Consumer, jobs *MockJobs)) { - unittest.RunWithBadgerDB(t, func(db *badgerdb.DB) { - log := unittest.Logger().With().Str("module", "consumer").Logger() - jobs := NewMockJobs() - progress := badger.NewConsumerProgress(db, "consumer") - worker := newMockWorker() - maxProcessing := uint64(3) - c, err := NewConsumer(log, jobs, progress, worker, maxProcessing, 0, 0) - require.NoError(t, err) - worker.WithConsumer(c) - - f(c, jobs) - }) - } + dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { + log := unittest.Logger().With().Str("module", "consumer").Logger() + jobs := NewMockJobs() + progressInitializer := store.NewConsumerProgress(db, "consumer") + worker := newMockWorker() + maxProcessing := uint64(3) + c, err := NewConsumer(log, jobs, progressInitializer, worker, maxProcessing, 0, 0) + require.NoError(t, err) + worker.WithConsumer(c) - setup(t, func(c *Consumer, jobs *MockJobs) { require.NoError(t, jobs.PushN(10)) require.NoError(t, c.Start()) @@ -193,22 +187,24 @@ func TestProcessedIndexDeletion(t *testing.T) { func TestCheckBeforeStartIsNoop(t *testing.T) { t.Parallel() - unittest.RunWithBadgerDB(t, func(db *badgerdb.DB) { + dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { storedProcessedIndex := uint64(100) worker := newMockWorker() - progress := badger.NewConsumerProgress(db, "consumer") - err := progress.InitProcessedIndex(storedProcessedIndex) + progressInitializer := store.NewConsumerProgress(db, "consumer") + progress, err := progressInitializer.Initialize(10) require.NoError(t, err) + // set the processedIndex to a value + require.NoError(t, progress.SetProcessedIndex(storedProcessedIndex)) c, err := NewConsumer( unittest.Logger(), NewMockJobs(), - progress, + progressInitializer, worker, uint64(3), 0, - 10, + 10, // default index is before the stored processedIndex ) require.NoError(t, err) worker.WithConsumer(c) diff --git a/module/state_synchronization/indexer/indexer.go b/module/state_synchronization/indexer/indexer.go index 2de7650a872..db164a2c12d 100644 --- a/module/state_synchronization/indexer/indexer.go +++ b/module/state_synchronization/indexer/indexer.go @@ -70,7 +70,7 @@ func NewIndexer( indexer *IndexerCore, executionCache *cache.ExecutionDataCache, executionDataLatestHeight func() (uint64, error), - processedHeight storage.ConsumerProgress, + processedHeightInitializer storage.ConsumerProgressInitializer, ) (*Indexer, error) { r := &Indexer{ log: log.With().Str("module", "execution_indexer").Logger(), @@ -89,7 +89,7 @@ func NewIndexer( jobConsumer, err := jobqueue.NewComponentConsumer( r.log, r.exeDataNotifier.Channel(), - processedHeight, + processedHeightInitializer, r.exeDataReader, initHeight, r.processExecutionData, diff --git a/module/state_synchronization/indexer/indexer_test.go b/module/state_synchronization/indexer/indexer_test.go index e7ec3bc5055..41cbd4f885d 100644 --- a/module/state_synchronization/indexer/indexer_test.go +++ b/module/state_synchronization/indexer/indexer_test.go @@ -17,6 +17,7 @@ import ( "github.com/onflow/flow-go/module/executiondatasync/execution_data/mock" "github.com/onflow/flow-go/module/irrecoverable" mempool "github.com/onflow/flow-go/module/mempool/mock" + "github.com/onflow/flow-go/storage" storagemock "github.com/onflow/flow-go/storage/mock" "github.com/onflow/flow-go/utils/unittest" ) @@ -82,7 +83,7 @@ func newIndexerTest(t *testing.T, availableBlocks int, lastIndexedIndex int) *in indexerCoreTest.indexer, exeCache, test.latestHeight, - progress, + &mockProgressInitializer{progress: progress}, ) require.NoError(t, err) @@ -121,6 +122,19 @@ func (w *indexerTest) run(ctx irrecoverable.SignalerContext, reachHeight uint64, unittest.RequireCloseBefore(w.t, w.worker.Done(), testTimeout, "timeout waiting for the consumer to be done") } +type mockProgressInitializer struct { + progress *mockProgress +} + +func (m *mockProgressInitializer) Initialize(defaultIndex uint64) (storage.ConsumerProgress, error) { + err := m.progress.InitProcessedIndex(defaultIndex) + if err != nil { + return nil, err + } + + return m.progress, nil +} + type mockProgress struct { index *atomic.Uint64 doneIndex *atomic.Uint64 diff --git a/module/state_synchronization/requester/execution_data_requester.go b/module/state_synchronization/requester/execution_data_requester.go index 4ed489371dd..e32d2e10bb9 100644 --- a/module/state_synchronization/requester/execution_data_requester.go +++ b/module/state_synchronization/requester/execution_data_requester.go @@ -145,8 +145,8 @@ func New( edrMetrics module.ExecutionDataRequesterMetrics, downloader execution_data.Downloader, execDataCache *cache.ExecutionDataCache, - processedHeight storage.ConsumerProgress, - processedNotifications storage.ConsumerProgress, + processedHeight storage.ConsumerProgressInitializer, + processedNotifications storage.ConsumerProgressInitializer, state protocol.State, headers storage.Headers, cfg ExecutionDataConfig, diff --git a/module/state_synchronization/requester/execution_data_requester_test.go b/module/state_synchronization/requester/execution_data_requester_test.go index deff90cb240..8b15d1f45ce 100644 --- a/module/state_synchronization/requester/execution_data_requester_test.go +++ b/module/state_synchronization/requester/execution_data_requester_test.go @@ -33,7 +33,8 @@ import ( synctest "github.com/onflow/flow-go/module/state_synchronization/requester/unittest" "github.com/onflow/flow-go/state/protocol" statemock "github.com/onflow/flow-go/state/protocol/mock" - bstorage "github.com/onflow/flow-go/storage/badger" + "github.com/onflow/flow-go/storage/operation/badgerimpl" + "github.com/onflow/flow-go/storage/store" "github.com/onflow/flow-go/utils/unittest" ) @@ -412,8 +413,8 @@ func (suite *ExecutionDataRequesterSuite) prepareRequesterTest(cfg *fetchTestRun cache := cache.NewExecutionDataCache(suite.downloader, headers, seals, results, heroCache) followerDistributor := pubsub.NewFollowerDistributor() - processedHeight := bstorage.NewConsumerProgress(suite.db, module.ConsumeProgressExecutionDataRequesterBlockHeight) - processedNotification := bstorage.NewConsumerProgress(suite.db, module.ConsumeProgressExecutionDataRequesterNotification) + processedHeight := store.NewConsumerProgress(badgerimpl.ToDB(suite.db), module.ConsumeProgressExecutionDataRequesterBlockHeight) + processedNotification := store.NewConsumerProgress(badgerimpl.ToDB(suite.db), module.ConsumeProgressExecutionDataRequesterNotification) edr, err := requester.New( logger, diff --git a/storage/badger/consumer_progress.go b/storage/badger/consumer_progress.go deleted file mode 100644 index 52855dd60b1..00000000000 --- a/storage/badger/consumer_progress.go +++ /dev/null @@ -1,50 +0,0 @@ -package badger - -import ( - "fmt" - - "github.com/dgraph-io/badger/v2" - - "github.com/onflow/flow-go/storage/badger/operation" -) - -type ConsumerProgress struct { - db *badger.DB - consumer string // to distinguish the consume progress between different consumers -} - -func NewConsumerProgress(db *badger.DB, consumer string) *ConsumerProgress { - return &ConsumerProgress{ - db: db, - consumer: consumer, - } -} - -func (cp *ConsumerProgress) ProcessedIndex() (uint64, error) { - var processed uint64 - err := cp.db.View(operation.RetrieveProcessedIndex(cp.consumer, &processed)) - if err != nil { - return 0, fmt.Errorf("failed to retrieve processed index: %w", err) - } - return processed, nil -} - -// InitProcessedIndex insert the default processed index to the storage layer, can only be done once. -// initialize for the second time will return storage.ErrAlreadyExists -func (cp *ConsumerProgress) InitProcessedIndex(defaultIndex uint64) error { - err := operation.RetryOnConflict(cp.db.Update, operation.InsertProcessedIndex(cp.consumer, defaultIndex)) - if err != nil { - return fmt.Errorf("could not update processed index: %w", err) - } - - return nil -} - -func (cp *ConsumerProgress) SetProcessedIndex(processed uint64) error { - err := operation.RetryOnConflict(cp.db.Update, operation.SetProcessedIndex(cp.consumer, processed)) - if err != nil { - return fmt.Errorf("could not update processed index: %w", err) - } - - return nil -} diff --git a/storage/badger/operation/jobs.go b/storage/badger/operation/jobs.go index 0f9eb3166ad..1bad48f752f 100644 --- a/storage/badger/operation/jobs.go +++ b/storage/badger/operation/jobs.go @@ -27,17 +27,3 @@ func RetrieveJobAtIndex(queue string, index uint64, entity *flow.Identifier) fun func InsertJobAtIndex(queue string, index uint64, entity flow.Identifier) func(*badger.Txn) error { return insert(makePrefix(codeJobQueue, queue, index), entity) } - -// RetrieveProcessedIndex returns the processed index for a job consumer -func RetrieveProcessedIndex(jobName string, processed *uint64) func(*badger.Txn) error { - return retrieve(makePrefix(codeJobConsumerProcessed, jobName), processed) -} - -func InsertProcessedIndex(jobName string, processed uint64) func(*badger.Txn) error { - return insert(makePrefix(codeJobConsumerProcessed, jobName), processed) -} - -// SetProcessedIndex updates the processed index for a job consumer with given index -func SetProcessedIndex(jobName string, processed uint64) func(*badger.Txn) error { - return update(makePrefix(codeJobConsumerProcessed, jobName), processed) -} diff --git a/storage/consumer_progress.go b/storage/consumer_progress.go index bd99926ba32..9655bd8a95c 100644 --- a/storage/consumer_progress.go +++ b/storage/consumer_progress.go @@ -1,13 +1,22 @@ package storage +// ConsumerProgressInitializer is a helper to initialize the consumer progress index in storage +// It prevents the consumer from being used before initialization +type ConsumerProgressInitializer interface { + // Initialize takes a default index and initializes the consumer progress index in storage + // Initialize must be concurrent safe, meaning if called by different modules, should only + // initialize once. + Initialize(defaultIndex uint64) (ConsumerProgress, error) +} + // ConsumerProgress reads and writes the last processed index of the job in the job queue +// It must be created by the ConsumerProgressInitializer, so that it can guarantee +// the ProcessedIndex and SetProcessedIndex methods are safe to use. type ConsumerProgress interface { // read the current processed index + // any error returned are exceptions ProcessedIndex() (uint64, error) - // insert the default processed index to the storage layer, can only be done once. - // initialize for the second time will return storage.ErrAlreadyExists - InitProcessedIndex(defaultIndex uint64) error // update the processed index in the storage layer. - // it will fail if InitProcessedIndex was never called. + // any error returned are exceptions SetProcessedIndex(processed uint64) error } diff --git a/storage/mock/consumer_progress.go b/storage/mock/consumer_progress.go index 591fbd39af7..6a865c61ab2 100644 --- a/storage/mock/consumer_progress.go +++ b/storage/mock/consumer_progress.go @@ -9,24 +9,6 @@ type ConsumerProgress struct { mock.Mock } -// InitProcessedIndex provides a mock function with given fields: defaultIndex -func (_m *ConsumerProgress) InitProcessedIndex(defaultIndex uint64) error { - ret := _m.Called(defaultIndex) - - if len(ret) == 0 { - panic("no return value specified for InitProcessedIndex") - } - - var r0 error - if rf, ok := ret.Get(0).(func(uint64) error); ok { - r0 = rf(defaultIndex) - } else { - r0 = ret.Error(0) - } - - return r0 -} - // ProcessedIndex provides a mock function with given fields: func (_m *ConsumerProgress) ProcessedIndex() (uint64, error) { ret := _m.Called() diff --git a/storage/mock/consumer_progress_initializer.go b/storage/mock/consumer_progress_initializer.go new file mode 100644 index 00000000000..7e88ba01b68 --- /dev/null +++ b/storage/mock/consumer_progress_initializer.go @@ -0,0 +1,57 @@ +// Code generated by mockery v2.43.2. DO NOT EDIT. + +package mock + +import ( + storage "github.com/onflow/flow-go/storage" + mock "github.com/stretchr/testify/mock" +) + +// ConsumerProgressInitializer is an autogenerated mock type for the ConsumerProgressInitializer type +type ConsumerProgressInitializer struct { + mock.Mock +} + +// Initialize provides a mock function with given fields: defaultIndex +func (_m *ConsumerProgressInitializer) Initialize(defaultIndex uint64) (storage.ConsumerProgress, error) { + ret := _m.Called(defaultIndex) + + if len(ret) == 0 { + panic("no return value specified for Initialize") + } + + var r0 storage.ConsumerProgress + var r1 error + if rf, ok := ret.Get(0).(func(uint64) (storage.ConsumerProgress, error)); ok { + return rf(defaultIndex) + } + if rf, ok := ret.Get(0).(func(uint64) storage.ConsumerProgress); ok { + r0 = rf(defaultIndex) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(storage.ConsumerProgress) + } + } + + if rf, ok := ret.Get(1).(func(uint64) error); ok { + r1 = rf(defaultIndex) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// NewConsumerProgressInitializer creates a new instance of ConsumerProgressInitializer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewConsumerProgressInitializer(t interface { + mock.TestingT + Cleanup(func()) +}) *ConsumerProgressInitializer { + mock := &ConsumerProgressInitializer{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/storage/operation/consume_progress.go b/storage/operation/consume_progress.go new file mode 100644 index 00000000000..177f9a79f30 --- /dev/null +++ b/storage/operation/consume_progress.go @@ -0,0 +1,15 @@ +package operation + +import ( + "github.com/onflow/flow-go/storage" +) + +// RetrieveProcessedIndex returns the processed index for a job consumer +func RetrieveProcessedIndex(r storage.Reader, jobName string, processed *uint64) error { + return RetrieveByKey(r, MakePrefix(codeJobConsumerProcessed, jobName), processed) +} + +// SetProcessedIndex updates the processed index for a job consumer with given index +func SetProcessedIndex(w storage.Writer, jobName string, processed uint64) error { + return UpsertByKey(w, MakePrefix(codeJobConsumerProcessed, jobName), processed) +} diff --git a/storage/pebble/consumer_progress.go b/storage/pebble/consumer_progress.go deleted file mode 100644 index 37448bb4b5f..00000000000 --- a/storage/pebble/consumer_progress.go +++ /dev/null @@ -1,50 +0,0 @@ -package pebble - -import ( - "fmt" - - "github.com/cockroachdb/pebble" - - "github.com/onflow/flow-go/storage/pebble/operation" -) - -type ConsumerProgress struct { - db *pebble.DB - consumer string // to distinguish the consume progress between different consumers -} - -func NewConsumerProgress(db *pebble.DB, consumer string) *ConsumerProgress { - return &ConsumerProgress{ - db: db, - consumer: consumer, - } -} - -func (cp *ConsumerProgress) ProcessedIndex() (uint64, error) { - var processed uint64 - err := operation.RetrieveProcessedIndex(cp.consumer, &processed)(cp.db) - if err != nil { - return 0, fmt.Errorf("failed to retrieve processed index: %w", err) - } - return processed, nil -} - -// InitProcessedIndex insert the default processed index to the storage layer, can only be done once. -// initialize for the second time will return storage.ErrAlreadyExists -func (cp *ConsumerProgress) InitProcessedIndex(defaultIndex uint64) error { - err := operation.InsertProcessedIndex(cp.consumer, defaultIndex)(cp.db) - if err != nil { - return fmt.Errorf("could not update processed index: %w", err) - } - - return nil -} - -func (cp *ConsumerProgress) SetProcessedIndex(processed uint64) error { - err := operation.SetProcessedIndex(cp.consumer, processed)(cp.db) - if err != nil { - return fmt.Errorf("could not update processed index: %w", err) - } - - return nil -} diff --git a/storage/store/consumer_progress.go b/storage/store/consumer_progress.go new file mode 100644 index 00000000000..aaa331b6173 --- /dev/null +++ b/storage/store/consumer_progress.go @@ -0,0 +1,87 @@ +package store + +import ( + "errors" + "fmt" + "sync" + + "github.com/onflow/flow-go/storage" + "github.com/onflow/flow-go/storage/operation" +) + +// ConsumerProgressInitializer is a helper to initialize the consumer progress index in storage +// It prevents the consumer from being used before initialization +type ConsumerProgressInitializer struct { + initing sync.Mutex + progress *consumerProgress +} + +var _ storage.ConsumerProgressInitializer = (*ConsumerProgressInitializer)(nil) + +func NewConsumerProgress(db storage.DB, consumer string) *ConsumerProgressInitializer { + progress := newConsumerProgress(db, consumer) + return &ConsumerProgressInitializer{ + progress: progress, + } +} + +func (cpi *ConsumerProgressInitializer) Initialize(defaultIndex uint64) (storage.ConsumerProgress, error) { + // making sure only one process is initializing at any time. + cpi.initing.Lock() + defer cpi.initing.Unlock() + + _, err := cpi.progress.ProcessedIndex() + if err != nil { + + // if not initialized, then initialize with default index + if !errors.Is(err, storage.ErrNotFound) { + return nil, fmt.Errorf("could not retrieve processed index: %w", err) + } + + err = cpi.progress.SetProcessedIndex(defaultIndex) + if err != nil { + return nil, fmt.Errorf("could not set processed index: %w", err) + } + } + + return cpi.progress, nil +} + +type consumerProgress struct { + db storage.DB + consumer string // to distinguish the consume progress between different consumers +} + +var _ storage.ConsumerProgress = (*consumerProgress)(nil) + +func newConsumerProgress(db storage.DB, consumer string) *consumerProgress { + return &consumerProgress{ + db: db, + consumer: consumer, + } +} + +// ProcessedIndex returns the processed index for the consumer +// any error would be exception +func (cp *consumerProgress) ProcessedIndex() (uint64, error) { + var processed uint64 + err := operation.RetrieveProcessedIndex(cp.db.Reader(), cp.consumer, &processed) + if err != nil { + return 0, fmt.Errorf("failed to retrieve processed index: %w", err) + } + return processed, nil +} + +// SetProcessedIndex updates the processed index for the consumer +// any error would be exception +// The caller must use ConsumerProgressInitializer to initialize the progress index in storage +func (cp *consumerProgress) SetProcessedIndex(processed uint64) error { + err := cp.db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return operation.SetProcessedIndex(rw.Writer(), cp.consumer, processed) + }) + if err != nil { + return fmt.Errorf("could not update processed index: %w", err) + } + + return nil +} From ee957f6fd41609874563140a99b72cfafdda8e2e Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Fri, 10 Jan 2025 14:38:20 -0800 Subject: [PATCH 02/10] add consumer_progress_test --- storage/store/consumer_progress_test.go | 58 +++++++++++++++++++++++++ 1 file changed, 58 insertions(+) create mode 100644 storage/store/consumer_progress_test.go diff --git a/storage/store/consumer_progress_test.go b/storage/store/consumer_progress_test.go new file mode 100644 index 00000000000..1fc70335bf2 --- /dev/null +++ b/storage/store/consumer_progress_test.go @@ -0,0 +1,58 @@ +package store + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/onflow/flow-go/storage" + "github.com/onflow/flow-go/storage/operation/dbtest" +) + +func TestConsumerProgressInitializer(t *testing.T) { + dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { + const testConsumer = "test_consumer" + + t.Run("Initialize with default index", func(t *testing.T) { + cpi := NewConsumerProgress(db, testConsumer) + progress, err := cpi.Initialize(100) + require.NoError(t, err) + + index, err := progress.ProcessedIndex() + require.NoError(t, err) + assert.Equal(t, uint64(100), index) + }) + + t.Run("Initialize when already initialized", func(t *testing.T) { + cpi := NewConsumerProgress(db, testConsumer) + + // First initialization + _, err := cpi.Initialize(100) + require.NoError(t, err) + + // Second initialization with different index + progress, err := cpi.Initialize(200) + require.NoError(t, err) + + // Should still return the original index + index, err := progress.ProcessedIndex() + require.NoError(t, err) + assert.Equal(t, uint64(100), index) + }) + + t.Run("SetProcessedIndex and ProcessedIndex", func(t *testing.T) { + cpi := NewConsumerProgress(db, testConsumer) + progress, err := cpi.Initialize(100) + require.NoError(t, err) + + err = progress.SetProcessedIndex(150) + require.NoError(t, err) + + index, err := progress.ProcessedIndex() + require.NoError(t, err) + assert.Equal(t, uint64(150), index) + }) + + }) +} From 903d85769b1e1af40291e61ba32ceb2c4927362e Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Fri, 10 Jan 2025 17:04:18 -0800 Subject: [PATCH 03/10] fix tests --- module/state_synchronization/indexer/indexer_test.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/module/state_synchronization/indexer/indexer_test.go b/module/state_synchronization/indexer/indexer_test.go index 41cbd4f885d..bcf7e42feb6 100644 --- a/module/state_synchronization/indexer/indexer_test.go +++ b/module/state_synchronization/indexer/indexer_test.go @@ -127,11 +127,6 @@ type mockProgressInitializer struct { } func (m *mockProgressInitializer) Initialize(defaultIndex uint64) (storage.ConsumerProgress, error) { - err := m.progress.InitProcessedIndex(defaultIndex) - if err != nil { - return nil, err - } - return m.progress, nil } From 2fad0800d60c9f92bc031df5cc97f21a81e77f2b Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Fri, 10 Jan 2025 17:34:08 -0800 Subject: [PATCH 04/10] remove pebble opeations jobs --- .../assigner/blockconsumer/consumer_test.go | 9 +++++---- storage/pebble/operation/jobs.go | 19 ------------------- 2 files changed, 5 insertions(+), 23 deletions(-) delete mode 100644 storage/pebble/operation/jobs.go diff --git a/engine/verification/assigner/blockconsumer/consumer_test.go b/engine/verification/assigner/blockconsumer/consumer_test.go index d5ea7d82c7f..afbe1ccc054 100644 --- a/engine/verification/assigner/blockconsumer/consumer_test.go +++ b/engine/verification/assigner/blockconsumer/consumer_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/dgraph-io/badger/v2" "github.com/stretchr/testify/require" "github.com/onflow/flow-go/consensus/hotstuff/model" @@ -17,8 +18,7 @@ import ( "github.com/onflow/flow-go/module/jobqueue" "github.com/onflow/flow-go/module/metrics" "github.com/onflow/flow-go/module/trace" - "github.com/onflow/flow-go/storage" - "github.com/onflow/flow-go/storage/operation/dbtest" + "github.com/onflow/flow-go/storage/operation/badgerimpl" "github.com/onflow/flow-go/storage/store" "github.com/onflow/flow-go/utils/unittest" ) @@ -118,10 +118,11 @@ func withConsumer( process func(notifier module.ProcessingNotifier, block *flow.Block), withBlockConsumer func(*blockconsumer.BlockConsumer, []*flow.Block), ) { - dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { + + unittest.RunWithBadgerDB(t, func(db *badger.DB) { maxProcessing := uint64(workerCount) - processedHeight := store.NewConsumerProgress(db, module.ConsumeProgressVerificationBlockHeight) + processedHeight := store.NewConsumerProgress(badgerimpl.ToDB(db), module.ConsumeProgressVerificationBlockHeight) collector := &metrics.NoopCollector{} tracer := trace.NewNoopTracer() log := unittest.Logger() diff --git a/storage/pebble/operation/jobs.go b/storage/pebble/operation/jobs.go deleted file mode 100644 index d18d3f39446..00000000000 --- a/storage/pebble/operation/jobs.go +++ /dev/null @@ -1,19 +0,0 @@ -package operation - -import ( - "github.com/cockroachdb/pebble" -) - -// RetrieveProcessedIndex returns the processed index for a job consumer -func RetrieveProcessedIndex(jobName string, processed *uint64) func(pebble.Reader) error { - return retrieve(makePrefix(codeJobConsumerProcessed, jobName), processed) -} - -func InsertProcessedIndex(jobName string, processed uint64) func(pebble.Writer) error { - return insert(makePrefix(codeJobConsumerProcessed, jobName), processed) -} - -// SetProcessedIndex updates the processed index for a job consumer with given index -func SetProcessedIndex(jobName string, processed uint64) func(pebble.Writer) error { - return insert(makePrefix(codeJobConsumerProcessed, jobName), processed) -} From 4d325c289d38dd72d81224d38ca5e65f81dd2136 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Fri, 10 Jan 2025 17:51:15 -0800 Subject: [PATCH 05/10] fix access tests --- engine/access/access_test.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/engine/access/access_test.go b/engine/access/access_test.go index fc7bfda6f79..032c3a0e1eb 100644 --- a/engine/access/access_test.go +++ b/engine/access/access_test.go @@ -877,10 +877,11 @@ func (suite *Suite) TestGetTransactionResult() { processedHeightInitializer := store.NewConsumerProgress(badgerimpl.ToDB(db), module.ConsumeProgressIngestionEngineBlockHeight) - processedHeight, err := processedHeightInitializer.Initialize(suite.rootBlock.Height) + lastFullBlockHeightProgress, err := store.NewConsumerProgress(badgerimpl.ToDB(db), module.ConsumeProgressLastFullBlockHeight). + Initialize(suite.rootBlock.Height) require.NoError(suite.T(), err) - lastFullBlockHeight, err := counters.NewPersistentStrictMonotonicCounter(processedHeight) + lastFullBlockHeight, err := counters.NewPersistentStrictMonotonicCounter(lastFullBlockHeightProgress) require.NoError(suite.T(), err) // create the ingest engine @@ -1132,10 +1133,12 @@ func (suite *Suite) TestExecuteScript() { Once() processedHeightInitializer := store.NewConsumerProgress(badgerimpl.ToDB(db), module.ConsumeProgressIngestionEngineBlockHeight) - processedHeight, err := processedHeightInitializer.Initialize(suite.rootBlock.Height) + + lastFullBlockHeightInitializer := store.NewConsumerProgress(badgerimpl.ToDB(db), module.ConsumeProgressLastFullBlockHeight) + lastFullBlockHeightProgress, err := lastFullBlockHeightInitializer.Initialize(suite.rootBlock.Height) require.NoError(suite.T(), err) - lastFullBlockHeight, err := counters.NewPersistentStrictMonotonicCounter(processedHeight) + lastFullBlockHeight, err := counters.NewPersistentStrictMonotonicCounter(lastFullBlockHeightProgress) require.NoError(suite.T(), err) // create the ingest engine From d3503489d1b9d2af6c24080630ca3dfe5dbe8c65 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Wed, 15 Jan 2025 12:27:00 -0800 Subject: [PATCH 06/10] update comments for monotonic counter --- module/counters/persistent_strict_monotonic_counter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/module/counters/persistent_strict_monotonic_counter.go b/module/counters/persistent_strict_monotonic_counter.go index ae5875f19cd..1ca881d04b6 100644 --- a/module/counters/persistent_strict_monotonic_counter.go +++ b/module/counters/persistent_strict_monotonic_counter.go @@ -44,7 +44,7 @@ func NewPersistentStrictMonotonicCounter(consumerProgress storage.ConsumerProgre // Set sets the processed index, ensuring it is strictly monotonically increasing. // // Expected errors during normal operation: -// - codes.ErrIncorrectValue - if stored value is larger than processed. +// - codes.ErrIncorrectValue - if stored value is >= processed (requirement of strict monotonous increase is violated). // - generic error in case of unexpected failure from the database layer or // encoding failure. func (m *PersistentStrictMonotonicCounter) Set(processed uint64) error { From 562a64bae18666c594d9f54067d8d5d28bd67434 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Mon, 27 Jan 2025 12:35:56 -0800 Subject: [PATCH 07/10] remove unused file --- storage/pebble/operation/common.go | 50 ------------------------------ 1 file changed, 50 deletions(-) delete mode 100644 storage/pebble/operation/common.go diff --git a/storage/pebble/operation/common.go b/storage/pebble/operation/common.go deleted file mode 100644 index ad9e96c2c8b..00000000000 --- a/storage/pebble/operation/common.go +++ /dev/null @@ -1,50 +0,0 @@ -package operation - -import ( - "errors" - - "github.com/cockroachdb/pebble" - "github.com/vmihailenco/msgpack" - - "github.com/onflow/flow-go/module/irrecoverable" - "github.com/onflow/flow-go/storage" -) - -func insert(key []byte, val interface{}) func(pebble.Writer) error { - return func(w pebble.Writer) error { - value, err := msgpack.Marshal(val) - if err != nil { - return irrecoverable.NewExceptionf("failed to encode value: %w", err) - } - - err = w.Set(key, value, nil) - if err != nil { - return irrecoverable.NewExceptionf("failed to store data: %w", err) - } - - return nil - } -} - -func retrieve(key []byte, sc interface{}) func(r pebble.Reader) error { - return func(r pebble.Reader) error { - val, closer, err := r.Get(key) - if err != nil { - return convertNotFoundError(err) - } - defer closer.Close() - - err = msgpack.Unmarshal(val, &sc) - if err != nil { - return irrecoverable.NewExceptionf("failed to decode value: %w", err) - } - return nil - } -} - -func convertNotFoundError(err error) error { - if errors.Is(err, pebble.ErrNotFound) { - return storage.ErrNotFound - } - return err -} From 77fdd63ee4c2ca84535373608a9533571abe028e Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Thu, 6 Feb 2025 13:59:23 -0800 Subject: [PATCH 08/10] address review comments --- cmd/access/node_builder/access_node_builder.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/cmd/access/node_builder/access_node_builder.go b/cmd/access/node_builder/access_node_builder.go index 514250e7cde..62b8c388d2a 100644 --- a/cmd/access/node_builder/access_node_builder.go +++ b/cmd/access/node_builder/access_node_builder.go @@ -611,10 +611,14 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess // Note: progress is stored in the datastore's DB since that is where the jobqueue // writes execution data to. var db storage.DB - if executionDataDBMode == execution_data.ExecutionDataDBModeBadger { - db = badgerimpl.ToDB(builder.ExecutionDatastoreManager.DB().(*badger.DB)) + edmdb := builder.ExecutionDatastoreManager.DB() + + if bdb, ok := edmdb.(*badger.DB); ok { + db = badgerimpl.ToDB(bdb) + } else if pdb, ok := edmdb.(*pebble.DB); ok { + db = pebbleimpl.ToDB(pdb) } else { - db = pebbleimpl.ToDB(builder.ExecutionDatastoreManager.DB().(*pebble.DB)) + return fmt.Errorf("unsupported execution data DB type: %T", edmdb) } processedBlockHeight = store.NewConsumerProgress(db, module.ConsumeProgressExecutionDataRequesterBlockHeight) From 30fcf92ec0dd8d02d4d7efb6e5f01f31b4d946f6 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Fri, 7 Feb 2025 09:20:12 -0800 Subject: [PATCH 09/10] refactor block iterator --- module/block_iterator/creator.go | 6 +++--- module/block_iterator/creator_test.go | 9 ++++++--- module/block_iterator/state.go | 13 ++++--------- module/block_iterator/state_test.go | 7 ++++--- 4 files changed, 17 insertions(+), 18 deletions(-) diff --git a/module/block_iterator/creator.go b/module/block_iterator/creator.go index 09278373cf7..05550831e07 100644 --- a/module/block_iterator/creator.go +++ b/module/block_iterator/creator.go @@ -26,7 +26,7 @@ var _ module.IteratorCreator = (*Creator)(nil) // after another to iterate from the root to the latest, and from last iterated to the new latest. func NewCreator( getBlockIDByIndex func(uint64) (blockID flow.Identifier, indexed bool, exception error), - progressStorage storage.ConsumerProgress, + progressStorage storage.ConsumerProgressInitializer, root uint64, latest func() (uint64, error), ) (*Creator, error) { @@ -65,7 +65,7 @@ func (c *Creator) Create() (iter module.BlockIterator, hasNext bool, exception e // from root to the latest (either finalized or sealed) by height. func NewHeightBasedCreator( getBlockIDByHeight func(height uint64) (flow.Identifier, error), - progress storage.ConsumerProgress, + progress storage.ConsumerProgressInitializer, root *flow.Header, latest func() (*flow.Header, error), ) (*Creator, error) { @@ -98,7 +98,7 @@ func NewHeightBasedCreator( // since view has gaps, the iterator will skip views that have no blocks. func NewViewBasedCreator( getBlockIDByView func(view uint64) (blockID flow.Identifier, viewIndexed bool, exception error), - progress storage.ConsumerProgress, + progress storage.ConsumerProgressInitializer, root *flow.Header, latest func() (*flow.Header, error), ) (*Creator, error) { diff --git a/module/block_iterator/creator_test.go b/module/block_iterator/creator_test.go index 61aa58a17a5..01d2373e813 100644 --- a/module/block_iterator/creator_test.go +++ b/module/block_iterator/creator_test.go @@ -351,6 +351,9 @@ type mockProgress struct { initialized bool } +var _ storage.ConsumerProgress = (*mockProgress)(nil) +var _ storage.ConsumerProgressInitializer = (*mockProgress)(nil) + func (m *mockProgress) ProcessedIndex() (uint64, error) { if !m.initialized { return 0, fmt.Errorf("processed index not initialized: %w", storage.ErrNotFound) @@ -358,13 +361,13 @@ func (m *mockProgress) ProcessedIndex() (uint64, error) { return m.index, nil } -func (m *mockProgress) InitProcessedIndex(defaultIndex uint64) error { +func (m *mockProgress) Initialize(defaultIndex uint64) (storage.ConsumerProgress, error) { if m.initialized { - return fmt.Errorf("processed index already initialized") + return nil, fmt.Errorf("processed index already initialized") } m.index = defaultIndex m.initialized = true - return nil + return m, nil } func (m *mockProgress) SetProcessedIndex(processed uint64) error { diff --git a/module/block_iterator/state.go b/module/block_iterator/state.go index 457169e4fc3..d0bf5337daa 100644 --- a/module/block_iterator/state.go +++ b/module/block_iterator/state.go @@ -1,7 +1,6 @@ package block_iterator import ( - "errors" "fmt" "github.com/onflow/flow-go/module" @@ -16,14 +15,10 @@ type PersistentIteratorState struct { var _ module.IteratorState = (*PersistentIteratorState)(nil) -func NewPersistentIteratorState(store storage.ConsumerProgress, root uint64, latest func() (uint64, error)) (*PersistentIteratorState, error) { - _, err := store.ProcessedIndex() - if errors.Is(err, storage.ErrNotFound) { - next := root + 1 - err = store.InitProcessedIndex(next) - if err != nil { - return nil, fmt.Errorf("failed to init processed index: %w", err) - } +func NewPersistentIteratorState(initializer storage.ConsumerProgressInitializer, root uint64, latest func() (uint64, error)) (*PersistentIteratorState, error) { + store, err := initializer.Initialize(root + 1) + if err != nil { + return nil, fmt.Errorf("failed to init processed index: %w", err) } return &PersistentIteratorState{ diff --git a/module/block_iterator/state_test.go b/module/block_iterator/state_test.go index f60ad325e61..4c9efd0010b 100644 --- a/module/block_iterator/state_test.go +++ b/module/block_iterator/state_test.go @@ -6,7 +6,8 @@ import ( "github.com/cockroachdb/pebble" "github.com/stretchr/testify/require" - storagepebble "github.com/onflow/flow-go/storage/pebble" + "github.com/onflow/flow-go/storage/operation/pebbleimpl" + "github.com/onflow/flow-go/storage/store" "github.com/onflow/flow-go/utils/unittest" ) @@ -19,9 +20,9 @@ func TestProgress(t *testing.T) { return latest, nil } - store := storagepebble.NewConsumerProgress(db, "test") + initializer := store.NewConsumerProgress(pebbleimpl.ToDB(db), "test") - progress, err := NewPersistentIteratorState(store, root, getLatest) + progress, err := NewPersistentIteratorState(initializer, root, getLatest) require.NoError(t, err) // 1. verify initial state should be the next of root From 58abccba5f1219895d0ff30717113444d3f147d2 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Fri, 7 Feb 2025 10:06:11 -0800 Subject: [PATCH 10/10] fix tests --- module/block_iterator/creator_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/module/block_iterator/creator_test.go b/module/block_iterator/creator_test.go index 01d2373e813..f9cc17b0490 100644 --- a/module/block_iterator/creator_test.go +++ b/module/block_iterator/creator_test.go @@ -363,7 +363,7 @@ func (m *mockProgress) ProcessedIndex() (uint64, error) { func (m *mockProgress) Initialize(defaultIndex uint64) (storage.ConsumerProgress, error) { if m.initialized { - return nil, fmt.Errorf("processed index already initialized") + return m, nil } m.index = defaultIndex m.initialized = true