Skip to content

Commit

Permalink
Merge pull request #6468 from The-K-R-O-K/UlyanaAndrukhiv/6302-store-…
Browse files Browse the repository at this point in the history
…transaction-result-error-messages

 [Access] Store Transaction Result error messages in db
  • Loading branch information
Guitarheroua authored and illia-malachyn committed Feb 12, 2025
1 parent 5be1172 commit edab5a2
Show file tree
Hide file tree
Showing 31 changed files with 3,090 additions and 914 deletions.
88 changes: 76 additions & 12 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/engine/access/index"
"github.com/onflow/flow-go/engine/access/ingestion"
"github.com/onflow/flow-go/engine/access/ingestion/tx_error_messages"
pingeng "github.com/onflow/flow-go/engine/access/ping"
"github.com/onflow/flow-go/engine/access/rest"
"github.com/onflow/flow-go/engine/access/rest/routes"
Expand All @@ -55,6 +56,7 @@ import (
"github.com/onflow/flow-go/engine/access/subscription"
followereng "github.com/onflow/flow-go/engine/common/follower"
"github.com/onflow/flow-go/engine/common/requester"
commonrpc "github.com/onflow/flow-go/engine/common/rpc"
synceng "github.com/onflow/flow-go/engine/common/synchronization"
"github.com/onflow/flow-go/engine/common/version"
"github.com/onflow/flow-go/engine/execution/computation/query"
Expand Down Expand Up @@ -161,7 +163,6 @@ type AccessNodeConfig struct {
executionDataConfig edrequester.ExecutionDataConfig
PublicNetworkConfig PublicNetworkConfig
TxResultCacheSize uint
TxErrorMessagesCacheSize uint
executionDataIndexingEnabled bool
registersDBPath string
checkpointFile string
Expand All @@ -173,6 +174,9 @@ type AccessNodeConfig struct {
programCacheSize uint
checkPayerBalance bool
versionControlEnabled bool
storeTxResultErrorMessages bool
stopControlEnabled bool
registerDBPruneThreshold uint64
}

type PublicNetworkConfig struct {
Expand Down Expand Up @@ -244,7 +248,6 @@ func DefaultAccessNodeConfig() *AccessNodeConfig {
apiRatelimits: nil,
apiBurstlimits: nil,
TxResultCacheSize: 0,
TxErrorMessagesCacheSize: 1000,
PublicNetworkConfig: PublicNetworkConfig{
BindAddress: cmd.NotSet,
Metrics: metrics.NewNoopCollector(),
Expand Down Expand Up @@ -276,6 +279,9 @@ func DefaultAccessNodeConfig() *AccessNodeConfig {
programCacheSize: 0,
checkPayerBalance: false,
versionControlEnabled: true,
storeTxResultErrorMessages: false,
stopControlEnabled: false,
registerDBPruneThreshold: pruner.DefaultThreshold,
}
}

Expand Down Expand Up @@ -343,6 +349,9 @@ type FlowAccessNodeBuilder struct {
stateStreamGrpcServer *grpcserver.GrpcServer

stateStreamBackend *statestreambackend.StateStreamBackend
nodeBackend *backend.Backend

TxResultErrorMessagesCore *tx_error_messages.TxErrorMessagesCore
}

func (builder *FlowAccessNodeBuilder) buildFollowerState() *FlowAccessNodeBuilder {
Expand Down Expand Up @@ -1229,7 +1238,6 @@ func (builder *FlowAccessNodeBuilder) extraFlags() {
flags.BoolVar(&builder.retryEnabled, "retry-enabled", defaultConfig.retryEnabled, "whether to enable the retry mechanism at the access node level")
flags.BoolVar(&builder.rpcMetricsEnabled, "rpc-metrics-enabled", defaultConfig.rpcMetricsEnabled, "whether to enable the rpc metrics")
flags.UintVar(&builder.TxResultCacheSize, "transaction-result-cache-size", defaultConfig.TxResultCacheSize, "transaction result cache size.(Disabled by default i.e 0)")
flags.UintVar(&builder.TxErrorMessagesCacheSize, "transaction-error-messages-cache-size", defaultConfig.TxErrorMessagesCacheSize, "transaction error messages cache size.(By default 1000)")
flags.StringVarP(&builder.nodeInfoFile,
"node-info-file",
"",
Expand Down Expand Up @@ -1359,7 +1367,10 @@ func (builder *FlowAccessNodeBuilder) extraFlags() {
"tx-result-query-mode",
defaultConfig.rpcConf.BackendConfig.TxResultQueryMode,
"mode to use when querying transaction results. one of [local-only, execution-nodes-only(default), failover]")

flags.BoolVar(&builder.storeTxResultErrorMessages,
"store-tx-result-error-messages",
defaultConfig.storeTxResultErrorMessages,
"whether to enable storing transaction error messages into the db")
// Script Execution
flags.StringVar(&builder.rpcConf.BackendConfig.ScriptExecutionMode,
"script-execution-mode",
Expand Down Expand Up @@ -1464,9 +1475,6 @@ func (builder *FlowAccessNodeBuilder) extraFlags() {
return errors.New("circuit-breaker-restore-timeout must be greater than 0")
}
}
if builder.TxErrorMessagesCacheSize == 0 {
return errors.New("transaction-error-messages-cache-size must be greater than 0")
}

if builder.checkPayerBalance && !builder.executionDataIndexingEnabled {
return errors.New("execution-data-indexing-enabled must be set if check-payer-balance is enabled")
Expand Down Expand Up @@ -1579,7 +1587,8 @@ func (builder *FlowAccessNodeBuilder) enqueueRelayNetwork() {
}

func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
var processedBlockHeight storage.ConsumerProgress
var processedFinalizedBlockHeight storage.ConsumerProgress
var processedTxErrorMessagesBlockHeight storage.ConsumerProgress

if builder.executionDataSyncEnabled {
builder.BuildExecutionSyncComponents()
Expand Down Expand Up @@ -1768,8 +1777,8 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
builder.TxResultsIndex = index.NewTransactionResultsIndex(builder.Reporter, builder.Storage.LightTransactionResults)
return nil
}).
Module("processed block height consumer progress", func(node *cmd.NodeConfig) error {
processedBlockHeight = bstorage.NewConsumerProgress(builder.DB, module.ConsumeProgressIngestionEngineBlockHeight)
Module("processed finalized block height consumer progress", func(node *cmd.NodeConfig) error {
processedFinalizedBlockHeight = bstorage.NewConsumerProgress(builder.DB, module.ConsumeProgressIngestionEngineBlockHeight)
return nil
}).
Module("processed last full block height monotonic consumer progress", func(node *cmd.NodeConfig) error {
Expand All @@ -1786,6 +1795,13 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {

return nil
}).
Module("transaction result error messages storage", func(node *cmd.NodeConfig) error {
if builder.storeTxResultErrorMessages {
builder.Storage.TransactionResultErrorMessages = bstorage.NewTransactionResultErrorMessages(node.Metrics.Cache, node.DB, bstorage.DefaultCacheSize)
}

return nil
}).
Component("version control", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
if !builder.versionControlEnabled {
noop := &module.NoopReadyDoneAware{}
Expand Down Expand Up @@ -1893,6 +1909,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
Transactions: node.Storage.Transactions,
ExecutionReceipts: node.Storage.Receipts,
ExecutionResults: node.Storage.Results,
TxResultErrorMessages: node.Storage.TransactionResultErrorMessages,
ChainID: node.RootChainID,
AccessMetrics: builder.AccessMetrics,
ConnFactory: connFactory,
Expand All @@ -1904,7 +1921,6 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit,
Communicator: backend.NewNodeCommunicator(backendConfig.CircuitBreakerConfig.Enabled),
TxResultCacheSize: builder.TxResultCacheSize,
TxErrorMessagesCacheSize: builder.TxErrorMessagesCacheSize,
ScriptExecutor: builder.ScriptExecutor,
ScriptExecutionMode: scriptExecMode,
CheckPayerBalance: builder.checkPayerBalance,
Expand Down Expand Up @@ -1980,6 +1996,28 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
return nil, fmt.Errorf("could not create requester engine: %w", err)
}

preferredENIdentifiers, err := commonrpc.IdentifierList(builder.rpcConf.BackendConfig.PreferredExecutionNodeIDs)
if err != nil {
return nil, fmt.Errorf("failed to convert node id string to Flow Identifier for preferred EN map: %w", err)
}

fixedENIdentifiers, err := commonrpc.IdentifierList(builder.rpcConf.BackendConfig.FixedExecutionNodeIDs)
if err != nil {
return nil, fmt.Errorf("failed to convert node id string to Flow Identifier for fixed EN map: %w", err)
}

if builder.storeTxResultErrorMessages {
builder.TxResultErrorMessagesCore = tx_error_messages.NewTxErrorMessagesCore(
node.Logger,
node.State,
builder.nodeBackend,
node.Storage.Receipts,
node.Storage.TransactionResultErrorMessages,
preferredENIdentifiers,
fixedENIdentifiers,
)
}

builder.IngestEng, err = ingestion.New(
node.Logger,
node.EngineRegistry,
Expand All @@ -1993,8 +2031,9 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
node.Storage.Results,
node.Storage.Receipts,
builder.collectionExecutedMetric,
processedBlockHeight,
processedFinalizedBlockHeight,
lastFullBlockHeight,
builder.TxResultErrorMessagesCore,
)
if err != nil {
return nil, err
Expand All @@ -2012,6 +2051,31 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
return builder.RequestEng, nil
})

if builder.storeTxResultErrorMessages {
builder.Module("processed error messages block height consumer progress", func(node *cmd.NodeConfig) error {
processedTxErrorMessagesBlockHeight = bstorage.NewConsumerProgress(
builder.DB,
module.ConsumeProgressEngineTxErrorMessagesBlockHeight,
)
return nil
})
builder.Component("transaction result error messages engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
engine, err := tx_error_messages.New(
node.Logger,
node.State,
node.Storage.Headers,
processedTxErrorMessagesBlockHeight,
builder.TxResultErrorMessagesCore,
)
if err != nil {
return nil, err
}
builder.FollowerDistributor.AddOnBlockFinalizedConsumer(engine.OnFinalizedBlock)

return engine, nil
})
}

if builder.supportsObserver {
builder.Component("public sync request handler", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
syncRequestHandler, err := synceng.NewRequestHandlerEngine(
Expand Down
Loading

0 comments on commit edab5a2

Please sign in to comment.