Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/message type filters and improving message parsing #50

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions cmd/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Indexer struct {
scheduler *gocron.Scheduler
blockEnqueueFunction func(chan *core.EnqueueData) error
blockEventFilterRegistries blockEventFilterRegistries
messageTypeFilters []filter.MessageTypeFilter
}

type blockEventFilterRegistries struct {
Expand Down Expand Up @@ -104,18 +105,23 @@ func setupIndex(cmd *cobra.Command, args []string) error {
endBlockEventFilterRegistry: &filter.StaticBlockEventFilterRegistry{},
}

if indexer.cfg.Base.BlockEventFilterFile != "" {
f, err := os.Open(indexer.cfg.Base.BlockEventFilterFile)
if indexer.cfg.Base.FilterFile != "" {
f, err := os.Open(indexer.cfg.Base.FilterFile)
if err != nil {
config.Log.Fatalf("Failed to open block event filter file %s: %s", indexer.cfg.Base.BlockEventFilterFile, err)
config.Log.Fatalf("Failed to open block event filter file %s: %s", indexer.cfg.Base.FilterFile, err)
}

b, err := io.ReadAll(f)
if err != nil {
config.Log.Fatal("Failed to parse block event filter config", err)
}

indexer.blockEventFilterRegistries.beginBlockEventFilterRegistry.BlockEventFilters, indexer.blockEventFilterRegistries.beginBlockEventFilterRegistry.RollingWindowEventFilters, indexer.blockEventFilterRegistries.endBlockEventFilterRegistry.BlockEventFilters, indexer.blockEventFilterRegistries.endBlockEventFilterRegistry.RollingWindowEventFilters, err = config.ParseJSONFilterConfig(b)
indexer.blockEventFilterRegistries.beginBlockEventFilterRegistry.BlockEventFilters,
indexer.blockEventFilterRegistries.beginBlockEventFilterRegistry.RollingWindowEventFilters,
indexer.blockEventFilterRegistries.endBlockEventFilterRegistry.BlockEventFilters,
indexer.blockEventFilterRegistries.endBlockEventFilterRegistry.RollingWindowEventFilters,
indexer.messageTypeFilters,
err = config.ParseJSONFilterConfig(b)

if err != nil {
config.Log.Fatal("Failed to parse block event filter config", err)
Expand Down Expand Up @@ -386,9 +392,9 @@ func (idxr *Indexer) processBlocks(wg *sync.WaitGroup, failedBlockHandler core.F
var err error

if blockData.GetTxsResponse != nil {
txDBWrappers, _, err = core.ProcessRPCTXs(idxr.db, idxr.cl, blockData.GetTxsResponse)
txDBWrappers, _, err = core.ProcessRPCTXs(idxr.db, idxr.cl, idxr.messageTypeFilters, blockData.GetTxsResponse)
} else if blockData.BlockResultsData != nil {
txDBWrappers, _, err = core.ProcessRPCBlockByHeightTXs(idxr.db, idxr.cl, blockData.BlockData, blockData.BlockResultsData)
txDBWrappers, _, err = core.ProcessRPCBlockByHeightTXs(idxr.db, idxr.cl, idxr.messageTypeFilters, blockData.BlockData, blockData.BlockResultsData)
}

if err != nil {
Expand Down
105 changes: 96 additions & 9 deletions config/filter_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ const (
EventTypeAndAttributeValueKey = "event_type_and_attribute_value"
RegexEventTypeKey = "regex_event_type"
RollingWindowKey = "rolling_window"
MessageTypeKey = "message_type"
MessageTypeRegex = "message_type_regex"
)

var SingleBlockEventFilterKeys = []string{
Expand All @@ -21,6 +23,11 @@ var SingleBlockEventFilterKeys = []string{
RegexEventTypeKey,
}

var MessageTypeFilterKeys = []string{
MessageTypeKey,
MessageTypeRegex,
}

func SingleBlockEventFilterIncludes(val string) bool {
for _, key := range SingleBlockEventFilterKeys {
if key == val {
Expand All @@ -30,9 +37,10 @@ func SingleBlockEventFilterIncludes(val string) bool {
return false
}

type blockEventFilterConfigs struct {
BeginBlockFilters []json.RawMessage `json:"begin_block_filters"`
EndBlockFilters []json.RawMessage `json:"end_block_filters"`
type blockFilterConfigs struct {
BeginBlockFilters []json.RawMessage `json:"begin_block_filters,omitempty"`
EndBlockFilters []json.RawMessage `json:"end_block_filters,omitempty"`
MessageTypeFilters []json.RawMessage `json:"message_type_filters,omitempty"`
}

type BlockEventFilterConfig struct {
Expand All @@ -41,25 +49,36 @@ type BlockEventFilterConfig struct {
Inclusive bool `json:"inclusive"`
}

func ParseJSONFilterConfig(configJSON []byte) ([]filter.BlockEventFilter, []filter.RollingWindowBlockEventFilter, []filter.BlockEventFilter, []filter.RollingWindowBlockEventFilter, error) {
config := blockEventFilterConfigs{}
type MessageTypeFilterConfig struct {
Type string `json:"type"`
Pattern string `json:"pattern"`
}

func ParseJSONFilterConfig(configJSON []byte) ([]filter.BlockEventFilter, []filter.RollingWindowBlockEventFilter, []filter.BlockEventFilter, []filter.RollingWindowBlockEventFilter, []filter.MessageTypeFilter, error) {
config := blockFilterConfigs{}
err := json.Unmarshal(configJSON, &config)
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, err
}

beginBlockSingleEventFilters, beginBlockRollingWindowFilters, err := ParseLifecycleConfig(config.BeginBlockFilters)
if err != nil {
newErr := fmt.Errorf("error parsing begin_block_filters: %s", err)
return nil, nil, nil, nil, newErr
return nil, nil, nil, nil, nil, newErr
}
endBlockSingleEventFilters, endBlockRollingWindowFilters, err := ParseLifecycleConfig(config.EndBlockFilters)
if err != nil {
newErr := fmt.Errorf("error parsing end_block_filters: %s", err)
return nil, nil, nil, nil, newErr
return nil, nil, nil, nil, nil, newErr
}

messageTypeFilters, err := ParseTXMessageTypeConfig(config.MessageTypeFilters)
if err != nil {
newErr := fmt.Errorf("error parsing message_type_filters: %s", err)
return nil, nil, nil, nil, nil, newErr
}

return beginBlockSingleEventFilters, beginBlockRollingWindowFilters, endBlockSingleEventFilters, endBlockRollingWindowFilters, nil
return beginBlockSingleEventFilters, beginBlockRollingWindowFilters, endBlockSingleEventFilters, endBlockRollingWindowFilters, messageTypeFilters, nil
}

func ParseLifecycleConfig(lifecycleConfig []json.RawMessage) ([]filter.BlockEventFilter, []filter.RollingWindowBlockEventFilter, error) {
Expand Down Expand Up @@ -168,9 +187,77 @@ func ParseJSONFilterConfigFromType(filterType string, configJSON []byte) (filter
}
}

func ParseTXMessageTypeConfig(messageTypeConfigs []json.RawMessage) ([]filter.MessageTypeFilter, error) {
messageTypeFilters := []filter.MessageTypeFilter{}
for index, messageTypeConfig := range messageTypeConfigs {
newFilter := MessageTypeFilterConfig{}

err := json.Unmarshal(messageTypeConfig, &newFilter)
if err != nil {
parserError := fmt.Errorf("error parsing message type filter at index %d: %s", index, err)
return nil, parserError
}

err = validateMessageTypeFilterConfig(newFilter)

if err != nil {
parserError := fmt.Errorf("error parsing filter at index %d: %s", index, err)
return nil, parserError
}

switch {
case newFilter.Type == MessageTypeKey:
newFilter := filter.DefaultMessageTypeFilter{}
err := json.Unmarshal(messageTypeConfig, &newFilter)
if err != nil {
return nil, err
}
valid, err := newFilter.Valid()

if !valid || err != nil {
parserError := fmt.Errorf("error parsing filter at index %d: %s", index, err)
return nil, parserError
}
messageTypeFilters = append(messageTypeFilters, newFilter)
case newFilter.Type == MessageTypeRegex:
newFilter := filter.MessageTypeRegexFilter{}
err := json.Unmarshal(messageTypeConfig, &newFilter)
if err != nil {
return nil, err
}

newFilter, err = filter.NewRegexMessageTypeFilter(newFilter.MessageTypeRegexPattern)

if err != nil {
parserError := fmt.Errorf("error parsing filter at index %d: %s", index, err)
return nil, parserError
}

valid, err := newFilter.Valid()

if !valid || err != nil {
parserError := fmt.Errorf("error parsing filter at index %d: %s", index, err)
return nil, parserError
}
messageTypeFilters = append(messageTypeFilters, newFilter)
default:
parserError := fmt.Errorf("error parsing filter at index %d: unknown filter type \"%s\"", index, newFilter.Type)
return nil, parserError
}
}
return messageTypeFilters, nil
}

func validateBlockEventFilterConfig(config BlockEventFilterConfig) error {
if config.Type == "" {
return errors.New("filter config must have a type field")
}
return nil
}

func validateMessageTypeFilterConfig(config MessageTypeFilterConfig) error {
if config.Type == "" {
return errors.New("filter config must have a type field")
}
return nil
}
14 changes: 9 additions & 5 deletions config/index_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type indexBase struct {
TransactionIndexingEnabled bool `mapstructure:"index-transactions"`
ExitWhenCaughtUp bool `mapstructure:"exit-when-caught-up"`
BlockEventIndexingEnabled bool `mapstructure:"index-block-events"`
BlockEventFilterFile string `mapstructure:"block-event-filter-file"`
FilterFile string `mapstructure:"filter-file"`
Dry bool `mapstructure:"dry"`
}

Expand All @@ -56,7 +56,7 @@ func SetupIndexSpecificFlags(conf *IndexConfig, cmd *cobra.Command) {
cmd.PersistentFlags().BoolVar(&conf.Base.TransactionIndexingEnabled, "base.index-transactions", false, "enable transaction indexing?")
cmd.PersistentFlags().BoolVar(&conf.Base.BlockEventIndexingEnabled, "base.index-block-events", false, "enable block beginblocker and endblocker event indexing?")
// filter configs
cmd.PersistentFlags().StringVar(&conf.Base.BlockEventFilterFile, "base.block-event-filter-file", "", "path to a file containing a JSON list of block event filters to apply to beginblocker and endblocker events")
cmd.PersistentFlags().StringVar(&conf.Base.FilterFile, "base.filter-file", "", "path to a file containing a JSON config of block event and message type filters to apply to beginblocker events, endblocker events and TX messages")
// other base setting
cmd.PersistentFlags().BoolVar(&conf.Base.Dry, "base.dry", false, "index the chain but don't insert data in the DB.")
cmd.PersistentFlags().StringVar(&conf.Base.API, "base.api", "", "node api endpoint")
Expand Down Expand Up @@ -108,10 +108,10 @@ func (conf *IndexConfig) Validate() error {
}
}

if conf.Base.BlockEventIndexingEnabled && conf.Base.BlockEventFilterFile != "" {
if conf.Base.FilterFile != "" {
// check if file exists
if _, err := os.Stat(conf.Base.BlockEventFilterFile); os.IsNotExist(err) {
return fmt.Errorf("base.block-event-filter-file %s does not exist", conf.Base.BlockEventFilterFile)
if _, err := os.Stat(conf.Base.FilterFile); os.IsNotExist(err) {
return fmt.Errorf("base.filter-file %s does not exist", conf.Base.FilterFile)
}
}

Expand Down Expand Up @@ -149,6 +149,10 @@ func CheckSuperfluousIndexKeys(keys []string) []string {
validKeys[key] = struct{}{}
}

for _, key := range getValidConfigKeys(flags{}, "flags") {
validKeys[key] = struct{}{}
}

// Check keys
ignoredKeys := make([]string, 0)
for _, key := range keys {
Expand Down
56 changes: 56 additions & 0 deletions core/decoding.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package core

import (
"errors"

probeClient "github.com/DefiantLabs/probe/client"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/types/tx"
)

// Provides an in-app tx decoder.
// The primary use-case for this function is to allow fallback decoding if a TX fails to decode after RPC requests.
// This can happen in a number of scenarios, but mainly due to missing proto definitions.
// We can attempt a personal decode of the TX, and see if we can continue indexing based on in-app conditions (such as message type filters).
// This function skips a large chunk of decoding validations, and is not recommended for general use. Its main point is to skip errors that in
// default Cosmos TX decoders would cause the entire decode to fail.
func InAppTxDecoder(cdc probeClient.Codec) sdk.TxDecoder {
return func(txBytes []byte) (sdk.Tx, error) {
var raw tx.TxRaw
var err error

err = cdc.Marshaler.Unmarshal(txBytes, &raw)
if err != nil {
return nil, err
}

var body tx.TxBody

err = body.Unmarshal(raw.BodyBytes)
if err != nil {
return nil, errors.New("failed to unmarshal tx body")
}

for _, any := range body.Messages {
var msg sdk.Msg
// We deliberately ignore errors here to build up a
// list of properly decoded messages for later analysis.
cdc.Marshaler.UnpackAny(any, &msg) //nolint:errcheck
}

var authInfo tx.AuthInfo

err = cdc.Marshaler.Unmarshal(raw.AuthInfoBytes, &authInfo)
if err != nil {
return nil, errors.New("failed to unmarshal auth info")
}

theTx := &tx.Tx{
Body: &body,
AuthInfo: &authInfo,
Signatures: raw.Signatures,
}

return theTx, nil
}
}
Loading
Loading