diff --git a/cmd/index.go b/cmd/index.go index a95c9114..6eb1cef2 100644 --- a/cmd/index.go +++ b/cmd/index.go @@ -33,6 +33,7 @@ type Indexer struct { scheduler *gocron.Scheduler blockEnqueueFunction func(chan *core.EnqueueData) error blockEventFilterRegistries blockEventFilterRegistries + messageTypeFilters []filter.MessageTypeFilter } type blockEventFilterRegistries struct { @@ -104,10 +105,10 @@ func setupIndex(cmd *cobra.Command, args []string) error { endBlockEventFilterRegistry: &filter.StaticBlockEventFilterRegistry{}, } - if indexer.cfg.Base.BlockEventFilterFile != "" { - f, err := os.Open(indexer.cfg.Base.BlockEventFilterFile) + if indexer.cfg.Base.FilterFile != "" { + f, err := os.Open(indexer.cfg.Base.FilterFile) if err != nil { - config.Log.Fatalf("Failed to open block event filter file %s: %s", indexer.cfg.Base.BlockEventFilterFile, err) + config.Log.Fatalf("Failed to open block event filter file %s: %s", indexer.cfg.Base.FilterFile, err) } b, err := io.ReadAll(f) @@ -115,7 +116,12 @@ func setupIndex(cmd *cobra.Command, args []string) error { config.Log.Fatal("Failed to parse block event filter config", err) } - indexer.blockEventFilterRegistries.beginBlockEventFilterRegistry.BlockEventFilters, indexer.blockEventFilterRegistries.beginBlockEventFilterRegistry.RollingWindowEventFilters, indexer.blockEventFilterRegistries.endBlockEventFilterRegistry.BlockEventFilters, indexer.blockEventFilterRegistries.endBlockEventFilterRegistry.RollingWindowEventFilters, err = config.ParseJSONFilterConfig(b) + indexer.blockEventFilterRegistries.beginBlockEventFilterRegistry.BlockEventFilters, + indexer.blockEventFilterRegistries.beginBlockEventFilterRegistry.RollingWindowEventFilters, + indexer.blockEventFilterRegistries.endBlockEventFilterRegistry.BlockEventFilters, + indexer.blockEventFilterRegistries.endBlockEventFilterRegistry.RollingWindowEventFilters, + indexer.messageTypeFilters, + err = config.ParseJSONFilterConfig(b) if err != nil { config.Log.Fatal("Failed to parse block event filter config", err) @@ -386,9 +392,9 @@ func (idxr *Indexer) processBlocks(wg *sync.WaitGroup, failedBlockHandler core.F var err error if blockData.GetTxsResponse != nil { - txDBWrappers, _, err = core.ProcessRPCTXs(idxr.db, idxr.cl, blockData.GetTxsResponse) + txDBWrappers, _, err = core.ProcessRPCTXs(idxr.db, idxr.cl, idxr.messageTypeFilters, blockData.GetTxsResponse) } else if blockData.BlockResultsData != nil { - txDBWrappers, _, err = core.ProcessRPCBlockByHeightTXs(idxr.db, idxr.cl, blockData.BlockData, blockData.BlockResultsData) + txDBWrappers, _, err = core.ProcessRPCBlockByHeightTXs(idxr.db, idxr.cl, idxr.messageTypeFilters, blockData.BlockData, blockData.BlockResultsData) } if err != nil { diff --git a/config/filter_config.go b/config/filter_config.go index 1da0b6b6..71b32bc8 100644 --- a/config/filter_config.go +++ b/config/filter_config.go @@ -13,6 +13,8 @@ const ( EventTypeAndAttributeValueKey = "event_type_and_attribute_value" RegexEventTypeKey = "regex_event_type" RollingWindowKey = "rolling_window" + MessageTypeKey = "message_type" + MessageTypeRegex = "message_type_regex" ) var SingleBlockEventFilterKeys = []string{ @@ -21,6 +23,11 @@ var SingleBlockEventFilterKeys = []string{ RegexEventTypeKey, } +var MessageTypeFilterKeys = []string{ + MessageTypeKey, + MessageTypeRegex, +} + func SingleBlockEventFilterIncludes(val string) bool { for _, key := range SingleBlockEventFilterKeys { if key == val { @@ -30,9 +37,10 @@ func SingleBlockEventFilterIncludes(val string) bool { return false } -type blockEventFilterConfigs struct { - BeginBlockFilters []json.RawMessage `json:"begin_block_filters"` - EndBlockFilters []json.RawMessage `json:"end_block_filters"` +type blockFilterConfigs struct { + BeginBlockFilters []json.RawMessage `json:"begin_block_filters,omitempty"` + EndBlockFilters []json.RawMessage `json:"end_block_filters,omitempty"` + MessageTypeFilters []json.RawMessage `json:"message_type_filters,omitempty"` } type BlockEventFilterConfig struct { @@ -41,25 +49,36 @@ type BlockEventFilterConfig struct { Inclusive bool `json:"inclusive"` } -func ParseJSONFilterConfig(configJSON []byte) ([]filter.BlockEventFilter, []filter.RollingWindowBlockEventFilter, []filter.BlockEventFilter, []filter.RollingWindowBlockEventFilter, error) { - config := blockEventFilterConfigs{} +type MessageTypeFilterConfig struct { + Type string `json:"type"` + Pattern string `json:"pattern"` +} + +func ParseJSONFilterConfig(configJSON []byte) ([]filter.BlockEventFilter, []filter.RollingWindowBlockEventFilter, []filter.BlockEventFilter, []filter.RollingWindowBlockEventFilter, []filter.MessageTypeFilter, error) { + config := blockFilterConfigs{} err := json.Unmarshal(configJSON, &config) if err != nil { - return nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, err } beginBlockSingleEventFilters, beginBlockRollingWindowFilters, err := ParseLifecycleConfig(config.BeginBlockFilters) if err != nil { newErr := fmt.Errorf("error parsing begin_block_filters: %s", err) - return nil, nil, nil, nil, newErr + return nil, nil, nil, nil, nil, newErr } endBlockSingleEventFilters, endBlockRollingWindowFilters, err := ParseLifecycleConfig(config.EndBlockFilters) if err != nil { newErr := fmt.Errorf("error parsing end_block_filters: %s", err) - return nil, nil, nil, nil, newErr + return nil, nil, nil, nil, nil, newErr + } + + messageTypeFilters, err := ParseTXMessageTypeConfig(config.MessageTypeFilters) + if err != nil { + newErr := fmt.Errorf("error parsing message_type_filters: %s", err) + return nil, nil, nil, nil, nil, newErr } - return beginBlockSingleEventFilters, beginBlockRollingWindowFilters, endBlockSingleEventFilters, endBlockRollingWindowFilters, nil + return beginBlockSingleEventFilters, beginBlockRollingWindowFilters, endBlockSingleEventFilters, endBlockRollingWindowFilters, messageTypeFilters, nil } func ParseLifecycleConfig(lifecycleConfig []json.RawMessage) ([]filter.BlockEventFilter, []filter.RollingWindowBlockEventFilter, error) { @@ -168,9 +187,77 @@ func ParseJSONFilterConfigFromType(filterType string, configJSON []byte) (filter } } +func ParseTXMessageTypeConfig(messageTypeConfigs []json.RawMessage) ([]filter.MessageTypeFilter, error) { + messageTypeFilters := []filter.MessageTypeFilter{} + for index, messageTypeConfig := range messageTypeConfigs { + newFilter := MessageTypeFilterConfig{} + + err := json.Unmarshal(messageTypeConfig, &newFilter) + if err != nil { + parserError := fmt.Errorf("error parsing message type filter at index %d: %s", index, err) + return nil, parserError + } + + err = validateMessageTypeFilterConfig(newFilter) + + if err != nil { + parserError := fmt.Errorf("error parsing filter at index %d: %s", index, err) + return nil, parserError + } + + switch { + case newFilter.Type == MessageTypeKey: + newFilter := filter.DefaultMessageTypeFilter{} + err := json.Unmarshal(messageTypeConfig, &newFilter) + if err != nil { + return nil, err + } + valid, err := newFilter.Valid() + + if !valid || err != nil { + parserError := fmt.Errorf("error parsing filter at index %d: %s", index, err) + return nil, parserError + } + messageTypeFilters = append(messageTypeFilters, newFilter) + case newFilter.Type == MessageTypeRegex: + newFilter := filter.MessageTypeRegexFilter{} + err := json.Unmarshal(messageTypeConfig, &newFilter) + if err != nil { + return nil, err + } + + newFilter, err = filter.NewRegexMessageTypeFilter(newFilter.MessageTypeRegexPattern) + + if err != nil { + parserError := fmt.Errorf("error parsing filter at index %d: %s", index, err) + return nil, parserError + } + + valid, err := newFilter.Valid() + + if !valid || err != nil { + parserError := fmt.Errorf("error parsing filter at index %d: %s", index, err) + return nil, parserError + } + messageTypeFilters = append(messageTypeFilters, newFilter) + default: + parserError := fmt.Errorf("error parsing filter at index %d: unknown filter type \"%s\"", index, newFilter.Type) + return nil, parserError + } + } + return messageTypeFilters, nil +} + func validateBlockEventFilterConfig(config BlockEventFilterConfig) error { if config.Type == "" { return errors.New("filter config must have a type field") } return nil } + +func validateMessageTypeFilterConfig(config MessageTypeFilterConfig) error { + if config.Type == "" { + return errors.New("filter config must have a type field") + } + return nil +} diff --git a/config/index_config.go b/config/index_config.go index e8a428fd..73f54745 100644 --- a/config/index_config.go +++ b/config/index_config.go @@ -35,7 +35,7 @@ type indexBase struct { TransactionIndexingEnabled bool `mapstructure:"index-transactions"` ExitWhenCaughtUp bool `mapstructure:"exit-when-caught-up"` BlockEventIndexingEnabled bool `mapstructure:"index-block-events"` - BlockEventFilterFile string `mapstructure:"block-event-filter-file"` + FilterFile string `mapstructure:"filter-file"` Dry bool `mapstructure:"dry"` } @@ -56,7 +56,7 @@ func SetupIndexSpecificFlags(conf *IndexConfig, cmd *cobra.Command) { cmd.PersistentFlags().BoolVar(&conf.Base.TransactionIndexingEnabled, "base.index-transactions", false, "enable transaction indexing?") cmd.PersistentFlags().BoolVar(&conf.Base.BlockEventIndexingEnabled, "base.index-block-events", false, "enable block beginblocker and endblocker event indexing?") // filter configs - cmd.PersistentFlags().StringVar(&conf.Base.BlockEventFilterFile, "base.block-event-filter-file", "", "path to a file containing a JSON list of block event filters to apply to beginblocker and endblocker events") + cmd.PersistentFlags().StringVar(&conf.Base.FilterFile, "base.filter-file", "", "path to a file containing a JSON config of block event and message type filters to apply to beginblocker events, endblocker events and TX messages") // other base setting cmd.PersistentFlags().BoolVar(&conf.Base.Dry, "base.dry", false, "index the chain but don't insert data in the DB.") cmd.PersistentFlags().StringVar(&conf.Base.API, "base.api", "", "node api endpoint") @@ -108,10 +108,10 @@ func (conf *IndexConfig) Validate() error { } } - if conf.Base.BlockEventIndexingEnabled && conf.Base.BlockEventFilterFile != "" { + if conf.Base.FilterFile != "" { // check if file exists - if _, err := os.Stat(conf.Base.BlockEventFilterFile); os.IsNotExist(err) { - return fmt.Errorf("base.block-event-filter-file %s does not exist", conf.Base.BlockEventFilterFile) + if _, err := os.Stat(conf.Base.FilterFile); os.IsNotExist(err) { + return fmt.Errorf("base.filter-file %s does not exist", conf.Base.FilterFile) } } @@ -149,6 +149,10 @@ func CheckSuperfluousIndexKeys(keys []string) []string { validKeys[key] = struct{}{} } + for _, key := range getValidConfigKeys(flags{}, "flags") { + validKeys[key] = struct{}{} + } + // Check keys ignoredKeys := make([]string, 0) for _, key := range keys { diff --git a/core/decoding.go b/core/decoding.go new file mode 100644 index 00000000..c6ab5c40 --- /dev/null +++ b/core/decoding.go @@ -0,0 +1,56 @@ +package core + +import ( + "errors" + + probeClient "github.com/DefiantLabs/probe/client" + sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/cosmos/cosmos-sdk/types/tx" +) + +// Provides an in-app tx decoder. +// The primary use-case for this function is to allow fallback decoding if a TX fails to decode after RPC requests. +// This can happen in a number of scenarios, but mainly due to missing proto definitions. +// We can attempt a personal decode of the TX, and see if we can continue indexing based on in-app conditions (such as message type filters). +// This function skips a large chunk of decoding validations, and is not recommended for general use. Its main point is to skip errors that in +// default Cosmos TX decoders would cause the entire decode to fail. +func InAppTxDecoder(cdc probeClient.Codec) sdk.TxDecoder { + return func(txBytes []byte) (sdk.Tx, error) { + var raw tx.TxRaw + var err error + + err = cdc.Marshaler.Unmarshal(txBytes, &raw) + if err != nil { + return nil, err + } + + var body tx.TxBody + + err = body.Unmarshal(raw.BodyBytes) + if err != nil { + return nil, errors.New("failed to unmarshal tx body") + } + + for _, any := range body.Messages { + var msg sdk.Msg + // We deliberately ignore errors here to build up a + // list of properly decoded messages for later analysis. + cdc.Marshaler.UnpackAny(any, &msg) //nolint:errcheck + } + + var authInfo tx.AuthInfo + + err = cdc.Marshaler.Unmarshal(raw.AuthInfoBytes, &authInfo) + if err != nil { + return nil, errors.New("failed to unmarshal auth info") + } + + theTx := &tx.Tx{ + Body: &body, + AuthInfo: &authInfo, + Signatures: raw.Signatures, + } + + return theTx, nil + } +} diff --git a/core/tx.go b/core/tx.go index 28c17fd4..7388ade2 100644 --- a/core/tx.go +++ b/core/tx.go @@ -13,6 +13,7 @@ import ( txtypes "github.com/DefiantLabs/cosmos-indexer/cosmos/modules/tx" dbTypes "github.com/DefiantLabs/cosmos-indexer/db" "github.com/DefiantLabs/cosmos-indexer/db/models" + "github.com/DefiantLabs/cosmos-indexer/filter" "github.com/DefiantLabs/cosmos-indexer/util" "github.com/DefiantLabs/probe/client" coretypes "github.com/cometbft/cometbft/rpc/core/types" @@ -66,7 +67,7 @@ func getUnexportedField(field reflect.Value) interface{} { return reflect.NewAt(field.Type(), unsafe.Pointer(field.UnsafeAddr())).Elem().Interface() } -func ProcessRPCBlockByHeightTXs(db *gorm.DB, cl *client.ChainClient, blockResults *coretypes.ResultBlock, resultBlockRes *coretypes.ResultBlockResults) ([]dbTypes.TxDBWrapper, *time.Time, error) { +func ProcessRPCBlockByHeightTXs(db *gorm.DB, cl *client.ChainClient, messageTypeFilters []filter.MessageTypeFilter, blockResults *coretypes.ResultBlock, resultBlockRes *coretypes.ResultBlockResults) ([]dbTypes.TxDBWrapper, *time.Time, error) { if len(blockResults.Block.Txs) != len(resultBlockRes.TxsResults) { config.Log.Fatalf("blockResults & resultBlockRes: different length") } @@ -86,15 +87,22 @@ func ProcessRPCBlockByHeightTXs(db *gorm.DB, cl *client.ChainClient, blockResult var currLogMsgs []txtypes.LogMessage txDecoder := cl.Codec.TxConfig.TxDecoder() + txBasic, err := txDecoder(tendermintTx) + var txFull *cosmosTx.Tx if err != nil { - return nil, blockTime, fmt.Errorf("ProcessRPCBlockByHeightTXs: TX cannot be parsed from block %v. Err: %v", blockResults.Block.Height, err) + txBasic, err = InAppTxDecoder(cl.Codec)(tendermintTx) + if err != nil { + return nil, blockTime, fmt.Errorf("ProcessRPCBlockByHeightTXs: TX cannot be parsed from block %v. This is usually a proto definition error. Err: %v", blockResults.Block.Height, err) + } + txFull = txBasic.(*cosmosTx.Tx) + } else { + // This is a hack, but as far as I can tell necessary. "wrapper" struct is private in Cosmos SDK. + field := reflect.ValueOf(txBasic).Elem().FieldByName("tx") + iTx := getUnexportedField(field) + txFull = iTx.(*cosmosTx.Tx) } - // This is a hack, but as far as I can tell necessary. "wrapper" struct is private in Cosmos SDK. - field := reflect.ValueOf(txBasic).Elem().FieldByName("tx") - iTx := getUnexportedField(field) - txFull := iTx.(*cosmosTx.Tx) logs := types.ABCIMessageLogs{} // Failed TXs do not have proper JSON in the .Log field, causing ParseABCILogs to fail to unmarshal the logs @@ -112,10 +120,38 @@ func ProcessRPCBlockByHeightTXs(db *gorm.DB, cl *client.ChainClient, blockResult var messagesRaw [][]byte // Get the Messages and Message Logs - for msgIdx, currMsg := range txFull.GetMsgs() { + for msgIdx := range txFull.Body.Messages { + filterData := filter.MessageTypeData{ + MessageType: txFull.Body.Messages[msgIdx].TypeUrl, + } + matches := false + for _, messageTypeFilter := range messageTypeFilters { + typeMatch, err := messageTypeFilter.MessageTypeMatches(filterData) + if err != nil { + return nil, blockTime, err + } + if typeMatch { + matches = true + break + } + } + + if !matches { + config.Log.Debug(fmt.Sprintf("[Block: %v] Skipping msg of type '%v'.", blockResults.Block.Height, txFull.Body.Messages[msgIdx].TypeUrl)) + // To maintain ordering, append nils + currMessages = append(currMessages, nil) + currLogMsgs = append(currLogMsgs, txtypes.LogMessage{ + MessageIndex: msgIdx, + }) + continue + } + + currMsg := txFull.Body.Messages[msgIdx].GetCachedValue() + if currMsg != nil { + msg := currMsg.(types.Msg) messagesRaw = append(messagesRaw, txFull.Body.Messages[msgIdx].Value) - currMessages = append(currMessages, currMsg) + currMessages = append(currMessages, msg) msgEvents := types.StringEvents{} if txResult.Code == 0 { msgEvents = logs[msgIdx].Events @@ -153,7 +189,14 @@ func ProcessRPCBlockByHeightTXs(db *gorm.DB, cl *client.ChainClient, blockResult return currTxDbWrappers, blockTime, err } - signers, err := ProcessSigners(cl, txFull.AuthInfo, txFull.GetSigners()) + filteredSigners := []types.AccAddress{} + for _, filteredMessage := range txBody.Messages { + if filteredMessage != nil { + filteredSigners = append(filteredSigners, filteredMessage.GetSigners()...) + } + } + + signers, err := ProcessSigners(cl, txFull.AuthInfo, filteredSigners) if err != nil { return currTxDbWrappers, blockTime, err } @@ -174,7 +217,7 @@ func ProcessRPCBlockByHeightTXs(db *gorm.DB, cl *client.ChainClient, blockResult } // ProcessRPCTXs - Given an RPC response, build out the more specific data used by the parser. -func ProcessRPCTXs(db *gorm.DB, cl *client.ChainClient, txEventResp *cosmosTx.GetTxsEventResponse) ([]dbTypes.TxDBWrapper, *time.Time, error) { +func ProcessRPCTXs(db *gorm.DB, cl *client.ChainClient, messageTypeFilters []filter.MessageTypeFilter, txEventResp *cosmosTx.GetTxsEventResponse) ([]dbTypes.TxDBWrapper, *time.Time, error) { currTxDbWrappers := make([]dbTypes.TxDBWrapper, len(txEventResp.Txs)) var blockTime *time.Time @@ -192,8 +235,50 @@ func ProcessRPCTXs(db *gorm.DB, cl *client.ChainClient, txEventResp *cosmosTx.Ge // Get the Messages and Message Logs for msgIdx := range currTx.Body.Messages { + filterData := filter.MessageTypeData{ + MessageType: currTx.Body.Messages[msgIdx].TypeUrl, + } + matches := false + for _, messageTypeFilter := range messageTypeFilters { + typeMatch, err := messageTypeFilter.MessageTypeMatches(filterData) + if err != nil { + return nil, blockTime, err + } + if typeMatch { + matches = true + break + } + } + + if !matches { + config.Log.Debug(fmt.Sprintf("[Block: %v] Skipping msg of type '%v'.", currTxResp.Height, currTx.Body.Messages[msgIdx].TypeUrl)) + // To maintain ordering, append nils + currMessages = append(currMessages, nil) + currLogMsgs = append(currLogMsgs, txtypes.LogMessage{ + MessageIndex: msgIdx, + }) + continue + } + currMsg := currTx.Body.Messages[msgIdx].GetCachedValue() messagesRaw = append(messagesRaw, currTx.Body.Messages[msgIdx].Value) + + // If we reached here, unpacking the entire TX raw was not successful + // Attempt to unpack the message individually. + if currMsg == nil { + var currMsgUnpack types.Msg + err := cl.Codec.InterfaceRegistry.UnpackAny(currTx.Body.Messages[msgIdx], &currMsgUnpack) + if err != nil || currMsgUnpack == nil { + return nil, blockTime, fmt.Errorf("tx message could not be processed. Unpacking protos failed and CachedValue is not present. TX Hash: %s, Msg type: %s, Msg index: %d, Code: %d", + currTxResp.TxHash, + currTx.Body.Messages[msgIdx].TypeUrl, + msgIdx, + currTxResp.Code, + ) + } + currMsg = currMsgUnpack + } + if currMsg != nil { msg := currMsg.(types.Msg) currMessages = append(currMessages, msg) @@ -205,13 +290,6 @@ func ProcessRPCTXs(db *gorm.DB, cl *client.ChainClient, txEventResp *cosmosTx.Ge } currLogMsgs = append(currLogMsgs, currTxLog) } - } else { - return nil, blockTime, fmt.Errorf("tx message could not be processed. CachedValue is not present. TX Hash: %s, Msg type: %s, Msg index: %d, Code: %d", - currTxResp.TxHash, - currTx.Body.Messages[msgIdx].TypeUrl, - msgIdx, - currTxResp.Code, - ) } } @@ -241,7 +319,19 @@ func ProcessRPCTXs(db *gorm.DB, cl *client.ChainClient, txEventResp *cosmosTx.Ge blockTime = &txTime } - signers, err := ProcessSigners(cl, currTx.AuthInfo, currTx.GetSigners()) + filteredSigners := []types.AccAddress{} + for _, filteredMessage := range txBody.Messages { + if filteredMessage != nil { + filteredSigners = append(filteredSigners, filteredMessage.GetSigners()...) + } + } + + err = currTx.AuthInfo.UnpackInterfaces(cl.Codec.InterfaceRegistry) + if err != nil { + return currTxDbWrappers, blockTime, err + } + + signers, err := ProcessSigners(cl, currTx.AuthInfo, filteredSigners) if err != nil { return currTxDbWrappers, blockTime, err } @@ -277,11 +367,13 @@ func ProcessTx(db *gorm.DB, tx txtypes.MergedTx, messagesRaw [][]byte) (txDBWapp // non-zero code means the Tx was unsuccessful. We will still need to account for fees in both cases though. if code == 0 { for messageIndex, message := range tx.Tx.Body.Messages { - messageType, currMessageDBWrapper := ProcessMessage(messageIndex, message, tx.TxResponse.Log, uniqueEventTypes, uniqueEventAttributeKeys) - currMessageDBWrapper.Message.MessageBytes = messagesRaw[messageIndex] - uniqueMessageTypes[messageType] = currMessageDBWrapper.Message.MessageType - config.Log.Debug(fmt.Sprintf("[Block: %v] Found msg of type '%v'.", tx.TxResponse.Height, messageType)) - messages = append(messages, currMessageDBWrapper) + if message != nil { + messageType, currMessageDBWrapper := ProcessMessage(messageIndex, message, tx.TxResponse.Log, uniqueEventTypes, uniqueEventAttributeKeys) + currMessageDBWrapper.Message.MessageBytes = messagesRaw[messageIndex] + uniqueMessageTypes[messageType] = currMessageDBWrapper.Message.MessageType + config.Log.Debug(fmt.Sprintf("[Block: %v] Found msg of type '%v'.", tx.TxResponse.Height, messageType)) + messages = append(messages, currMessageDBWrapper) + } } } @@ -298,7 +390,7 @@ func ProcessTx(db *gorm.DB, tx txtypes.MergedTx, messagesRaw [][]byte) (txDBWapp // 1. Processes signers from the auth info // 2. Processes signers from the signers array // 3. Processes the fee payer -func ProcessSigners(cl *client.ChainClient, authInfo *cosmosTx.AuthInfo, signers []types.AccAddress) ([]models.Address, error) { +func ProcessSigners(cl *client.ChainClient, authInfo *cosmosTx.AuthInfo, messageSigners []types.AccAddress) ([]models.Address, error) { // For unique checks signerAddressMap := make(map[string]models.Address) // For deterministic output of signer values @@ -342,7 +434,7 @@ func ProcessSigners(cl *client.ChainClient, authInfo *cosmosTx.AuthInfo, signers } } - for _, signer := range signers { + for _, signer := range messageSigners { addressStr := signer.String() if _, ok := signerAddressMap[addressStr]; !ok { signerAddressArray = append(signerAddressArray, models.Address{Address: addressStr}) diff --git a/filter/message_type_filters.go b/filter/message_type_filters.go new file mode 100644 index 00000000..c1ddfce8 --- /dev/null +++ b/filter/message_type_filters.go @@ -0,0 +1,61 @@ +package filter + +import ( + "errors" + "fmt" + "regexp" +) + +type MessageTypeFilter interface { + MessageTypeMatches(MessageTypeData) (bool, error) + Valid() (bool, error) +} + +type MessageTypeData struct { + MessageType string +} + +type DefaultMessageTypeFilter struct { + MessageType string `json:"message_type"` +} + +type MessageTypeRegexFilter struct { + MessageTypeRegexPattern string `json:"message_type_regex"` + messageTypeRegex *regexp.Regexp +} + +func (f DefaultMessageTypeFilter) MessageTypeMatches(messageTypeData MessageTypeData) (bool, error) { + return messageTypeData.MessageType == f.MessageType, nil +} + +func (f MessageTypeRegexFilter) MessageTypeMatches(messageTypeData MessageTypeData) (bool, error) { + return f.messageTypeRegex.MatchString(messageTypeData.MessageType), nil +} + +func (f DefaultMessageTypeFilter) Valid() (bool, error) { + if f.MessageType != "" { + return true, nil + } + + return false, errors.New("MessageType must be set") +} + +func (f MessageTypeRegexFilter) Valid() (bool, error) { + if f.messageTypeRegex != nil && f.MessageTypeRegexPattern != "" { + return true, nil + } + + return false, errors.New("MessageTypeRegexPattern must be set") +} + +func NewRegexMessageTypeFilter(messageTypeRegexPattern string) (MessageTypeRegexFilter, error) { + messageTypeRegex, err := regexp.Compile(messageTypeRegexPattern) + if err != nil { + return MessageTypeRegexFilter{}, fmt.Errorf("error compiling message type regex: %s", err) + } + + return MessageTypeRegexFilter{ + MessageTypeRegexPattern: messageTypeRegexPattern, + messageTypeRegex: messageTypeRegex, + }, nil +} diff --git a/filter/static_block_event_filters.go b/filter/static_block_event_filters.go index 060cc396..b16a9129 100644 --- a/filter/static_block_event_filters.go +++ b/filter/static_block_event_filters.go @@ -37,7 +37,7 @@ func (f DefaultBlockEventTypeFilter) Valid() (bool, error) { return true, nil } - return true, errors.New("EventType must be set") + return false, errors.New("EventType must be set") } type RegexBlockEventTypeFilter struct { @@ -59,7 +59,7 @@ func (f RegexBlockEventTypeFilter) Valid() (bool, error) { return true, nil } - return true, errors.New("EventTypeRegexPattern must be set") + return false, errors.New("EventTypeRegexPattern must be set") } type DefaultBlockEventTypeAndAttributeValueFilter struct {