Skip to content

Commit

Permalink
Add config to skip RPC block by height request if known pruning, add …
Browse files Browse the repository at this point in the history
…a MessageFilter interface for further filtering with custom function logic.
  • Loading branch information
pharr117 committed Aug 15, 2024
1 parent 389142e commit 8871370
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 31 deletions.
32 changes: 17 additions & 15 deletions config/index_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,22 @@ type IndexConfig struct {
type indexBase struct {
throttlingBase
retryBase
ReindexMessageType string `mapstructure:"reindex-message-type"`
ReattemptFailedBlocks bool `mapstructure:"reattempt-failed-blocks"`
StartBlock int64 `mapstructure:"start-block"`
EndBlock int64 `mapstructure:"end-block"`
BlockInputFile string `mapstructure:"block-input-file"`
ReIndex bool `mapstructure:"reindex"`
RPCWorkers int64 `mapstructure:"rpc-workers"`
BlockTimer int64 `mapstructure:"block-timer"`
WaitForChain bool `mapstructure:"wait-for-chain"`
WaitForChainDelay int64 `mapstructure:"wait-for-chain-delay"`
TransactionIndexingEnabled bool `mapstructure:"index-transactions"`
ExitWhenCaughtUp bool `mapstructure:"exit-when-caught-up"`
BlockEventIndexingEnabled bool `mapstructure:"index-block-events"`
FilterFile string `mapstructure:"filter-file"`
Dry bool `mapstructure:"dry"`
ReindexMessageType string `mapstructure:"reindex-message-type"`
ReattemptFailedBlocks bool `mapstructure:"reattempt-failed-blocks"`
StartBlock int64 `mapstructure:"start-block"`
EndBlock int64 `mapstructure:"end-block"`
BlockInputFile string `mapstructure:"block-input-file"`
ReIndex bool `mapstructure:"reindex"`
RPCWorkers int64 `mapstructure:"rpc-workers"`
SkipBlockByHeightRPCRequest bool `mapstructure:"skip-block-by-height-rpc-request"`
BlockTimer int64 `mapstructure:"block-timer"`
WaitForChain bool `mapstructure:"wait-for-chain"`
WaitForChainDelay int64 `mapstructure:"wait-for-chain-delay"`
TransactionIndexingEnabled bool `mapstructure:"index-transactions"`
ExitWhenCaughtUp bool `mapstructure:"exit-when-caught-up"`
BlockEventIndexingEnabled bool `mapstructure:"index-block-events"`
FilterFile string `mapstructure:"filter-file"`
Dry bool `mapstructure:"dry"`
}

// Flags for specific, deeper indexing behavior
Expand All @@ -60,6 +61,7 @@ func SetupIndexSpecificFlags(conf *IndexConfig, cmd *cobra.Command) {
// other base setting
cmd.PersistentFlags().BoolVar(&conf.Base.Dry, "base.dry", false, "index the chain but don't insert data in the DB.")
cmd.PersistentFlags().Int64Var(&conf.Base.RPCWorkers, "base.rpc-workers", 1, "the number of concurrent RPC request workers to spin up.")
cmd.PersistentFlags().BoolVar(&conf.Base.SkipBlockByHeightRPCRequest, "base.skip-block-by-height-rpc-request", false, "skip the /block?height=<height> RPC request and only attempt the /block_results RPC request. Sometimes pruned nodes will not have return results for the block RPC request, but still return results for the block_result request.")
cmd.PersistentFlags().BoolVar(&conf.Base.WaitForChain, "base.wait-for-chain", false, "wait for chain to be in sync?")
cmd.PersistentFlags().Int64Var(&conf.Base.WaitForChainDelay, "base.wait-for-chain-delay", 10, "seconds to wait between each check for node to catch up to the chain")
cmd.PersistentFlags().Int64Var(&conf.Base.BlockTimer, "base.block-timer", 10000, "print out how long it takes to process this many blocks")
Expand Down
8 changes: 6 additions & 2 deletions core/rpc_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,13 @@ func BlockRPCWorker(wg *sync.WaitGroup, blockEnqueueChan chan *EnqueueData, chai
}

if block.IndexTransactions {
txsEventResp, err := rpc.GetTxsByBlockHeight(chainClient, block.Height)
var txsEventResp *txTypes.GetTxsEventResponse
var err error
if !cfg.Base.SkipBlockByHeightRPCRequest {
txsEventResp, err = rpc.GetTxsByBlockHeight(chainClient, block.Height)
}

if err != nil {
if err != nil || cfg.Base.SkipBlockByHeightRPCRequest {
// Attempt to get block results to attempt an in-app codec decode of transactions.
if currentHeightIndexerData.BlockResultsData == nil {

Expand Down
63 changes: 51 additions & 12 deletions core/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func getUnexportedField(field reflect.Value) interface{} {
return reflect.NewAt(field.Type(), unsafe.Pointer(field.UnsafeAddr())).Elem().Interface()
}

func ProcessRPCBlockByHeightTXs(cfg *config.IndexConfig, db *gorm.DB, cl *client.ChainClient, messageTypeFilters []filter.MessageTypeFilter, blockResults *coretypes.ResultBlock, resultBlockRes *coretypes.ResultBlockResults, customParsers map[string][]parsers.MessageParser) ([]dbTypes.TxDBWrapper, *time.Time, error) {
func ProcessRPCBlockByHeightTXs(cfg *config.IndexConfig, db *gorm.DB, cl *client.ChainClient, messageTypeFilters []filter.MessageTypeFilter, messageFilters []filter.MessageFilter, blockResults *coretypes.ResultBlock, resultBlockRes *coretypes.ResultBlockResults, customParsers map[string][]parsers.MessageParser) ([]dbTypes.TxDBWrapper, *time.Time, error) {
if len(blockResults.Block.Txs) != len(resultBlockRes.TxsResults) {
config.Log.Fatalf("blockResults & resultBlockRes: different length")
}
Expand Down Expand Up @@ -114,8 +114,6 @@ func ProcessRPCBlockByHeightTXs(cfg *config.IndexConfig, db *gorm.DB, cl *client

if currMsg != nil {
msg := currMsg.(types.Msg)
messagesRaw = append(messagesRaw, txFull.Body.Messages[msgIdx].Value)
currMessages = append(currMessages, msg)
msgEvents := types.StringEvents{}
if txResult.Code == 0 {
msgEvents = logs[msgIdx].Events
Expand All @@ -125,6 +123,29 @@ func ProcessRPCBlockByHeightTXs(cfg *config.IndexConfig, db *gorm.DB, cl *client
MessageIndex: msgIdx,
Events: indexerEvents.StringEventstoNormalizedEvents(msgEvents),
}

// Next filter on the message itself if there are any filters
if len(messageFilters) != 0 {
for _, filter := range messageFilters {
shouldIndex = filter.ShouldIndex(msg, currTxLog)
if shouldIndex {
break
}
}
}

if !shouldIndex {
config.Log.Debug(fmt.Sprintf("[Block: %v] [TX: %v] Skipping msg of type '%v'.", blockResults.Block.Height, tendermintHashToHex(txHash), txFull.Body.Messages[msgIdx].TypeUrl))
currMessages = append(currMessages, nil)
currLogMsgs = append(currLogMsgs, txtypes.LogMessage{
MessageIndex: msgIdx,
})
messagesRaw = append(messagesRaw, nil)
continue
}

messagesRaw = append(messagesRaw, txFull.Body.Messages[msgIdx].Value)
currMessages = append(currMessages, msg)
currLogMsgs = append(currLogMsgs, currTxLog)
} else {
return nil, blockTime, fmt.Errorf("tx message could not be processed")
Expand Down Expand Up @@ -194,7 +215,7 @@ func tendermintHashToHex(hash []byte) string {
}

// ProcessRPCTXs - Given an RPC response, build out the more specific data used by the parser.
func ProcessRPCTXs(cfg *config.IndexConfig, db *gorm.DB, cl *client.ChainClient, messageTypeFilters []filter.MessageTypeFilter, txEventResp *cosmosTx.GetTxsEventResponse, customParsers map[string][]parsers.MessageParser) ([]dbTypes.TxDBWrapper, *time.Time, error) {
func ProcessRPCTXs(cfg *config.IndexConfig, db *gorm.DB, cl *client.ChainClient, messageTypeFilters []filter.MessageTypeFilter, messageFilters []filter.MessageFilter, txEventResp *cosmosTx.GetTxsEventResponse, customParsers map[string][]parsers.MessageParser) ([]dbTypes.TxDBWrapper, *time.Time, error) {
var currTxDbWrappers []dbTypes.TxDBWrapper
var blockTime *time.Time

Expand Down Expand Up @@ -263,15 +284,33 @@ func ProcessRPCTXs(cfg *config.IndexConfig, db *gorm.DB, cl *client.ChainClient,

if currMsg != nil {
msg := currMsg.(types.Msg)
currMessages = append(currMessages, msg)
if len(currTxResp.Logs) >= msgIdx+1 {
msgEvents := currTxResp.Logs[msgIdx].Events
currTxLog := txtypes.LogMessage{
MessageIndex: msgIdx,
Events: indexerEvents.StringEventstoNormalizedEvents(msgEvents),
msgEvents := currTxResp.Logs[msgIdx].Events
currTxLog := txtypes.LogMessage{
MessageIndex: msgIdx,
Events: indexerEvents.StringEventstoNormalizedEvents(msgEvents),
}

if len(messageFilters) != 0 {
for _, filter := range messageFilters {
shouldIndex = filter.ShouldIndex(msg, currTxLog)
if shouldIndex {
break
}
}
currLogMsgs = append(currLogMsgs, currTxLog)
}

if !shouldIndex {
config.Log.Debug(fmt.Sprintf("[Block: %v] [TX: %v] Skipping msg of type '%v'.", currTxResp.Height, currTxResp.TxHash, currTx.Body.Messages[msgIdx].TypeUrl))
currMessages = append(currMessages, nil)
currLogMsgs = append(currLogMsgs, txtypes.LogMessage{
MessageIndex: msgIdx,
})
messagesRaw = append(messagesRaw, nil)
continue
}

currMessages = append(currMessages, msg)
currLogMsgs = append(currLogMsgs, currTxLog)
}
}

Expand Down Expand Up @@ -334,7 +373,7 @@ func ProcessRPCTXs(cfg *config.IndexConfig, db *gorm.DB, cl *client.ChainClient,
processedTx.Tx.Fees = fees
processedTx.Tx.Memo = currTx.Body.Memo

currTxDbWrappers[txIdx] = processedTx
currTxDbWrappers = append(currTxDbWrappers, processedTx)
}

return currTxDbWrappers, blockTime, nil
Expand Down
10 changes: 10 additions & 0 deletions filter/message_filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package filter

import (
"github.com/DefiantLabs/cosmos-indexer/cosmos/modules/tx"
"github.com/cosmos/cosmos-sdk/types"
)

type MessageFilter interface {
ShouldIndex(types.Msg, tx.LogMessage) bool
}
4 changes: 2 additions & 2 deletions indexer/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,10 @@ func (indexer *Indexer) ProcessBlocks(wg *sync.WaitGroup, failedBlockHandler cor

if blockData.GetTxsResponse != nil {
config.Log.Debug("Processing TXs from RPC TX Search response")
txDBWrappers, _, err = core.ProcessRPCTXs(indexer.Config, indexer.DB, indexer.ChainClient, indexer.MessageTypeFilters, blockData.GetTxsResponse, indexer.CustomMessageParserRegistry)
txDBWrappers, _, err = core.ProcessRPCTXs(indexer.Config, indexer.DB, indexer.ChainClient, indexer.MessageTypeFilters, indexer.MessageFilters, blockData.GetTxsResponse, indexer.CustomMessageParserRegistry)
} else if blockData.BlockResultsData != nil {
config.Log.Debug("Processing TXs from BlockResults search response")
txDBWrappers, _, err = core.ProcessRPCBlockByHeightTXs(indexer.Config, indexer.DB, indexer.ChainClient, indexer.MessageTypeFilters, blockData.BlockData, blockData.BlockResultsData, indexer.CustomMessageParserRegistry)
txDBWrappers, _, err = core.ProcessRPCBlockByHeightTXs(indexer.Config, indexer.DB, indexer.ChainClient, indexer.MessageTypeFilters, indexer.MessageFilters, blockData.BlockData, blockData.BlockResultsData, indexer.CustomMessageParserRegistry)
}

if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions indexer/registration.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ func (indexer *Indexer) RegisterMessageTypeFilter(filter filter.MessageTypeFilte
indexer.MessageTypeFilters = append(indexer.MessageTypeFilters, filter)
}

func (indexer *Indexer) RegisterMessageFilter(filter filter.MessageFilter) {
indexer.MessageFilters = append(indexer.MessageFilters, filter)
}

func (indexer *Indexer) RegisterCustomModels(models []any) {
indexer.CustomModels = append(indexer.CustomModels, models...)
}
Expand Down
1 change: 1 addition & 0 deletions indexer/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type Indexer struct {
CustomModuleBasics []module.AppModuleBasic // Used for extending the AppModuleBasics registered in the probe ChainClientient
BlockEventFilterRegistries BlockEventFilterRegistries
MessageTypeFilters []filter.MessageTypeFilter
MessageFilters []filter.MessageFilter
CustomBeginBlockEventParserRegistry map[string][]parsers.BlockEventParser // Used for associating parsers to block event types in BeginBlock events
CustomEndBlockEventParserRegistry map[string][]parsers.BlockEventParser // Used for associating parsers to block event types in EndBlock events
CustomBeginBlockParserTrackers map[string]models.BlockEventParser // Used for tracking block event parsers in the database
Expand Down

0 comments on commit 8871370

Please sign in to comment.