diff --git a/config/index_config.go b/config/index_config.go index e2086ece..aebe8d81 100644 --- a/config/index_config.go +++ b/config/index_config.go @@ -19,21 +19,22 @@ type IndexConfig struct { type indexBase struct { throttlingBase retryBase - ReindexMessageType string `mapstructure:"reindex-message-type"` - ReattemptFailedBlocks bool `mapstructure:"reattempt-failed-blocks"` - StartBlock int64 `mapstructure:"start-block"` - EndBlock int64 `mapstructure:"end-block"` - BlockInputFile string `mapstructure:"block-input-file"` - ReIndex bool `mapstructure:"reindex"` - RPCWorkers int64 `mapstructure:"rpc-workers"` - BlockTimer int64 `mapstructure:"block-timer"` - WaitForChain bool `mapstructure:"wait-for-chain"` - WaitForChainDelay int64 `mapstructure:"wait-for-chain-delay"` - TransactionIndexingEnabled bool `mapstructure:"index-transactions"` - ExitWhenCaughtUp bool `mapstructure:"exit-when-caught-up"` - BlockEventIndexingEnabled bool `mapstructure:"index-block-events"` - FilterFile string `mapstructure:"filter-file"` - Dry bool `mapstructure:"dry"` + ReindexMessageType string `mapstructure:"reindex-message-type"` + ReattemptFailedBlocks bool `mapstructure:"reattempt-failed-blocks"` + StartBlock int64 `mapstructure:"start-block"` + EndBlock int64 `mapstructure:"end-block"` + BlockInputFile string `mapstructure:"block-input-file"` + ReIndex bool `mapstructure:"reindex"` + RPCWorkers int64 `mapstructure:"rpc-workers"` + SkipBlockByHeightRPCRequest bool `mapstructure:"skip-block-by-height-rpc-request"` + BlockTimer int64 `mapstructure:"block-timer"` + WaitForChain bool `mapstructure:"wait-for-chain"` + WaitForChainDelay int64 `mapstructure:"wait-for-chain-delay"` + TransactionIndexingEnabled bool `mapstructure:"index-transactions"` + ExitWhenCaughtUp bool `mapstructure:"exit-when-caught-up"` + BlockEventIndexingEnabled bool `mapstructure:"index-block-events"` + FilterFile string `mapstructure:"filter-file"` + Dry bool `mapstructure:"dry"` } // Flags for specific, deeper indexing behavior @@ -60,6 +61,7 @@ func SetupIndexSpecificFlags(conf *IndexConfig, cmd *cobra.Command) { // other base setting cmd.PersistentFlags().BoolVar(&conf.Base.Dry, "base.dry", false, "index the chain but don't insert data in the DB.") cmd.PersistentFlags().Int64Var(&conf.Base.RPCWorkers, "base.rpc-workers", 1, "the number of concurrent RPC request workers to spin up.") + cmd.PersistentFlags().BoolVar(&conf.Base.SkipBlockByHeightRPCRequest, "base.skip-block-by-height-rpc-request", false, "skip the /block?height= RPC request and only attempt the /block_results RPC request. Sometimes pruned nodes will not have return results for the block RPC request, but still return results for the block_result request.") cmd.PersistentFlags().BoolVar(&conf.Base.WaitForChain, "base.wait-for-chain", false, "wait for chain to be in sync?") cmd.PersistentFlags().Int64Var(&conf.Base.WaitForChainDelay, "base.wait-for-chain-delay", 10, "seconds to wait between each check for node to catch up to the chain") cmd.PersistentFlags().Int64Var(&conf.Base.BlockTimer, "base.block-timer", 10000, "print out how long it takes to process this many blocks") diff --git a/core/rpc_worker.go b/core/rpc_worker.go index 7dffbf68..21173374 100644 --- a/core/rpc_worker.go +++ b/core/rpc_worker.go @@ -83,9 +83,13 @@ func BlockRPCWorker(wg *sync.WaitGroup, blockEnqueueChan chan *EnqueueData, chai } if block.IndexTransactions { - txsEventResp, err := rpc.GetTxsByBlockHeight(chainClient, block.Height) + var txsEventResp *txTypes.GetTxsEventResponse + var err error + if !cfg.Base.SkipBlockByHeightRPCRequest { + txsEventResp, err = rpc.GetTxsByBlockHeight(chainClient, block.Height) + } - if err != nil { + if err != nil || cfg.Base.SkipBlockByHeightRPCRequest { // Attempt to get block results to attempt an in-app codec decode of transactions. if currentHeightIndexerData.BlockResultsData == nil { diff --git a/core/tx.go b/core/tx.go index 9cab50e4..d9577a2e 100644 --- a/core/tx.go +++ b/core/tx.go @@ -31,7 +31,7 @@ func getUnexportedField(field reflect.Value) interface{} { return reflect.NewAt(field.Type(), unsafe.Pointer(field.UnsafeAddr())).Elem().Interface() } -func ProcessRPCBlockByHeightTXs(cfg *config.IndexConfig, db *gorm.DB, cl *client.ChainClient, messageTypeFilters []filter.MessageTypeFilter, blockResults *coretypes.ResultBlock, resultBlockRes *coretypes.ResultBlockResults, customParsers map[string][]parsers.MessageParser) ([]dbTypes.TxDBWrapper, *time.Time, error) { +func ProcessRPCBlockByHeightTXs(cfg *config.IndexConfig, db *gorm.DB, cl *client.ChainClient, messageTypeFilters []filter.MessageTypeFilter, messageFilters []filter.MessageFilter, blockResults *coretypes.ResultBlock, resultBlockRes *coretypes.ResultBlockResults, customParsers map[string][]parsers.MessageParser) ([]dbTypes.TxDBWrapper, *time.Time, error) { if len(blockResults.Block.Txs) != len(resultBlockRes.TxsResults) { config.Log.Fatalf("blockResults & resultBlockRes: different length") } @@ -114,8 +114,6 @@ func ProcessRPCBlockByHeightTXs(cfg *config.IndexConfig, db *gorm.DB, cl *client if currMsg != nil { msg := currMsg.(types.Msg) - messagesRaw = append(messagesRaw, txFull.Body.Messages[msgIdx].Value) - currMessages = append(currMessages, msg) msgEvents := types.StringEvents{} if txResult.Code == 0 { msgEvents = logs[msgIdx].Events @@ -125,6 +123,29 @@ func ProcessRPCBlockByHeightTXs(cfg *config.IndexConfig, db *gorm.DB, cl *client MessageIndex: msgIdx, Events: indexerEvents.StringEventstoNormalizedEvents(msgEvents), } + + // Next filter on the message itself if there are any filters + if len(messageFilters) != 0 { + for _, filter := range messageFilters { + shouldIndex = filter.ShouldIndex(msg, currTxLog) + if shouldIndex { + break + } + } + } + + if !shouldIndex { + config.Log.Debug(fmt.Sprintf("[Block: %v] [TX: %v] Skipping msg of type '%v'.", blockResults.Block.Height, tendermintHashToHex(txHash), txFull.Body.Messages[msgIdx].TypeUrl)) + currMessages = append(currMessages, nil) + currLogMsgs = append(currLogMsgs, txtypes.LogMessage{ + MessageIndex: msgIdx, + }) + messagesRaw = append(messagesRaw, nil) + continue + } + + messagesRaw = append(messagesRaw, txFull.Body.Messages[msgIdx].Value) + currMessages = append(currMessages, msg) currLogMsgs = append(currLogMsgs, currTxLog) } else { return nil, blockTime, fmt.Errorf("tx message could not be processed") @@ -194,7 +215,7 @@ func tendermintHashToHex(hash []byte) string { } // ProcessRPCTXs - Given an RPC response, build out the more specific data used by the parser. -func ProcessRPCTXs(cfg *config.IndexConfig, db *gorm.DB, cl *client.ChainClient, messageTypeFilters []filter.MessageTypeFilter, txEventResp *cosmosTx.GetTxsEventResponse, customParsers map[string][]parsers.MessageParser) ([]dbTypes.TxDBWrapper, *time.Time, error) { +func ProcessRPCTXs(cfg *config.IndexConfig, db *gorm.DB, cl *client.ChainClient, messageTypeFilters []filter.MessageTypeFilter, messageFilters []filter.MessageFilter, txEventResp *cosmosTx.GetTxsEventResponse, customParsers map[string][]parsers.MessageParser) ([]dbTypes.TxDBWrapper, *time.Time, error) { var currTxDbWrappers []dbTypes.TxDBWrapper var blockTime *time.Time @@ -263,15 +284,33 @@ func ProcessRPCTXs(cfg *config.IndexConfig, db *gorm.DB, cl *client.ChainClient, if currMsg != nil { msg := currMsg.(types.Msg) - currMessages = append(currMessages, msg) - if len(currTxResp.Logs) >= msgIdx+1 { - msgEvents := currTxResp.Logs[msgIdx].Events - currTxLog := txtypes.LogMessage{ - MessageIndex: msgIdx, - Events: indexerEvents.StringEventstoNormalizedEvents(msgEvents), + msgEvents := currTxResp.Logs[msgIdx].Events + currTxLog := txtypes.LogMessage{ + MessageIndex: msgIdx, + Events: indexerEvents.StringEventstoNormalizedEvents(msgEvents), + } + + if len(messageFilters) != 0 { + for _, filter := range messageFilters { + shouldIndex = filter.ShouldIndex(msg, currTxLog) + if shouldIndex { + break + } } - currLogMsgs = append(currLogMsgs, currTxLog) } + + if !shouldIndex { + config.Log.Debug(fmt.Sprintf("[Block: %v] [TX: %v] Skipping msg of type '%v'.", currTxResp.Height, currTxResp.TxHash, currTx.Body.Messages[msgIdx].TypeUrl)) + currMessages = append(currMessages, nil) + currLogMsgs = append(currLogMsgs, txtypes.LogMessage{ + MessageIndex: msgIdx, + }) + messagesRaw = append(messagesRaw, nil) + continue + } + + currMessages = append(currMessages, msg) + currLogMsgs = append(currLogMsgs, currTxLog) } } @@ -334,7 +373,7 @@ func ProcessRPCTXs(cfg *config.IndexConfig, db *gorm.DB, cl *client.ChainClient, processedTx.Tx.Fees = fees processedTx.Tx.Memo = currTx.Body.Memo - currTxDbWrappers[txIdx] = processedTx + currTxDbWrappers = append(currTxDbWrappers, processedTx) } return currTxDbWrappers, blockTime, nil diff --git a/filter/message_filter.go b/filter/message_filter.go new file mode 100644 index 00000000..da8f5edf --- /dev/null +++ b/filter/message_filter.go @@ -0,0 +1,10 @@ +package filter + +import ( + "github.com/DefiantLabs/cosmos-indexer/cosmos/modules/tx" + "github.com/cosmos/cosmos-sdk/types" +) + +type MessageFilter interface { + ShouldIndex(types.Msg, tx.LogMessage) bool +} diff --git a/indexer/process.go b/indexer/process.go index bae60160..b1d071c4 100644 --- a/indexer/process.go +++ b/indexer/process.go @@ -75,10 +75,10 @@ func (indexer *Indexer) ProcessBlocks(wg *sync.WaitGroup, failedBlockHandler cor if blockData.GetTxsResponse != nil { config.Log.Debug("Processing TXs from RPC TX Search response") - txDBWrappers, _, err = core.ProcessRPCTXs(indexer.Config, indexer.DB, indexer.ChainClient, indexer.MessageTypeFilters, blockData.GetTxsResponse, indexer.CustomMessageParserRegistry) + txDBWrappers, _, err = core.ProcessRPCTXs(indexer.Config, indexer.DB, indexer.ChainClient, indexer.MessageTypeFilters, indexer.MessageFilters, blockData.GetTxsResponse, indexer.CustomMessageParserRegistry) } else if blockData.BlockResultsData != nil { config.Log.Debug("Processing TXs from BlockResults search response") - txDBWrappers, _, err = core.ProcessRPCBlockByHeightTXs(indexer.Config, indexer.DB, indexer.ChainClient, indexer.MessageTypeFilters, blockData.BlockData, blockData.BlockResultsData, indexer.CustomMessageParserRegistry) + txDBWrappers, _, err = core.ProcessRPCBlockByHeightTXs(indexer.Config, indexer.DB, indexer.ChainClient, indexer.MessageTypeFilters, indexer.MessageFilters, blockData.BlockData, blockData.BlockResultsData, indexer.CustomMessageParserRegistry) } if err != nil { diff --git a/indexer/registration.go b/indexer/registration.go index 31474d35..557fff0b 100644 --- a/indexer/registration.go +++ b/indexer/registration.go @@ -18,6 +18,10 @@ func (indexer *Indexer) RegisterMessageTypeFilter(filter filter.MessageTypeFilte indexer.MessageTypeFilters = append(indexer.MessageTypeFilters, filter) } +func (indexer *Indexer) RegisterMessageFilter(filter filter.MessageFilter) { + indexer.MessageFilters = append(indexer.MessageFilters, filter) +} + func (indexer *Indexer) RegisterCustomModels(models []any) { indexer.CustomModels = append(indexer.CustomModels, models...) } diff --git a/indexer/types.go b/indexer/types.go index fbb7e6e6..e2e69cf4 100644 --- a/indexer/types.go +++ b/indexer/types.go @@ -50,6 +50,7 @@ type Indexer struct { CustomModuleBasics []module.AppModuleBasic // Used for extending the AppModuleBasics registered in the probe ChainClientient BlockEventFilterRegistries BlockEventFilterRegistries MessageTypeFilters []filter.MessageTypeFilter + MessageFilters []filter.MessageFilter CustomBeginBlockEventParserRegistry map[string][]parsers.BlockEventParser // Used for associating parsers to block event types in BeginBlock events CustomEndBlockEventParserRegistry map[string][]parsers.BlockEventParser // Used for associating parsers to block event types in EndBlock events CustomBeginBlockParserTrackers map[string]models.BlockEventParser // Used for tracking block event parsers in the database