Skip to content

Commit

Permalink
Merge pull request #50 from DefiantLabs/feat/message-type-filters-and…
Browse files Browse the repository at this point in the history
…-improving-message-parsing

Feat/message type filters and improving message parsing
  • Loading branch information
pharr117 authored Jan 2, 2024
2 parents 433ab58 + bd37a84 commit 7b44d8e
Show file tree
Hide file tree
Showing 7 changed files with 353 additions and 47 deletions.
18 changes: 12 additions & 6 deletions cmd/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Indexer struct {
scheduler *gocron.Scheduler
blockEnqueueFunction func(chan *core.EnqueueData) error
blockEventFilterRegistries blockEventFilterRegistries
messageTypeFilters []filter.MessageTypeFilter
}

type blockEventFilterRegistries struct {
Expand Down Expand Up @@ -104,18 +105,23 @@ func setupIndex(cmd *cobra.Command, args []string) error {
endBlockEventFilterRegistry: &filter.StaticBlockEventFilterRegistry{},
}

if indexer.cfg.Base.BlockEventFilterFile != "" {
f, err := os.Open(indexer.cfg.Base.BlockEventFilterFile)
if indexer.cfg.Base.FilterFile != "" {
f, err := os.Open(indexer.cfg.Base.FilterFile)
if err != nil {
config.Log.Fatalf("Failed to open block event filter file %s: %s", indexer.cfg.Base.BlockEventFilterFile, err)
config.Log.Fatalf("Failed to open block event filter file %s: %s", indexer.cfg.Base.FilterFile, err)
}

b, err := io.ReadAll(f)
if err != nil {
config.Log.Fatal("Failed to parse block event filter config", err)
}

indexer.blockEventFilterRegistries.beginBlockEventFilterRegistry.BlockEventFilters, indexer.blockEventFilterRegistries.beginBlockEventFilterRegistry.RollingWindowEventFilters, indexer.blockEventFilterRegistries.endBlockEventFilterRegistry.BlockEventFilters, indexer.blockEventFilterRegistries.endBlockEventFilterRegistry.RollingWindowEventFilters, err = config.ParseJSONFilterConfig(b)
indexer.blockEventFilterRegistries.beginBlockEventFilterRegistry.BlockEventFilters,
indexer.blockEventFilterRegistries.beginBlockEventFilterRegistry.RollingWindowEventFilters,
indexer.blockEventFilterRegistries.endBlockEventFilterRegistry.BlockEventFilters,
indexer.blockEventFilterRegistries.endBlockEventFilterRegistry.RollingWindowEventFilters,
indexer.messageTypeFilters,
err = config.ParseJSONFilterConfig(b)

if err != nil {
config.Log.Fatal("Failed to parse block event filter config", err)
Expand Down Expand Up @@ -386,9 +392,9 @@ func (idxr *Indexer) processBlocks(wg *sync.WaitGroup, failedBlockHandler core.F
var err error

if blockData.GetTxsResponse != nil {
txDBWrappers, _, err = core.ProcessRPCTXs(idxr.db, idxr.cl, blockData.GetTxsResponse)
txDBWrappers, _, err = core.ProcessRPCTXs(idxr.db, idxr.cl, idxr.messageTypeFilters, blockData.GetTxsResponse)
} else if blockData.BlockResultsData != nil {
txDBWrappers, _, err = core.ProcessRPCBlockByHeightTXs(idxr.db, idxr.cl, blockData.BlockData, blockData.BlockResultsData)
txDBWrappers, _, err = core.ProcessRPCBlockByHeightTXs(idxr.db, idxr.cl, idxr.messageTypeFilters, blockData.BlockData, blockData.BlockResultsData)
}

if err != nil {
Expand Down
105 changes: 96 additions & 9 deletions config/filter_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ const (
EventTypeAndAttributeValueKey = "event_type_and_attribute_value"
RegexEventTypeKey = "regex_event_type"
RollingWindowKey = "rolling_window"
MessageTypeKey = "message_type"
MessageTypeRegex = "message_type_regex"
)

var SingleBlockEventFilterKeys = []string{
Expand All @@ -21,6 +23,11 @@ var SingleBlockEventFilterKeys = []string{
RegexEventTypeKey,
}

var MessageTypeFilterKeys = []string{
MessageTypeKey,
MessageTypeRegex,
}

func SingleBlockEventFilterIncludes(val string) bool {
for _, key := range SingleBlockEventFilterKeys {
if key == val {
Expand All @@ -30,9 +37,10 @@ func SingleBlockEventFilterIncludes(val string) bool {
return false
}

type blockEventFilterConfigs struct {
BeginBlockFilters []json.RawMessage `json:"begin_block_filters"`
EndBlockFilters []json.RawMessage `json:"end_block_filters"`
type blockFilterConfigs struct {
BeginBlockFilters []json.RawMessage `json:"begin_block_filters,omitempty"`
EndBlockFilters []json.RawMessage `json:"end_block_filters,omitempty"`
MessageTypeFilters []json.RawMessage `json:"message_type_filters,omitempty"`
}

type BlockEventFilterConfig struct {
Expand All @@ -41,25 +49,36 @@ type BlockEventFilterConfig struct {
Inclusive bool `json:"inclusive"`
}

func ParseJSONFilterConfig(configJSON []byte) ([]filter.BlockEventFilter, []filter.RollingWindowBlockEventFilter, []filter.BlockEventFilter, []filter.RollingWindowBlockEventFilter, error) {
config := blockEventFilterConfigs{}
type MessageTypeFilterConfig struct {
Type string `json:"type"`
Pattern string `json:"pattern"`
}

func ParseJSONFilterConfig(configJSON []byte) ([]filter.BlockEventFilter, []filter.RollingWindowBlockEventFilter, []filter.BlockEventFilter, []filter.RollingWindowBlockEventFilter, []filter.MessageTypeFilter, error) {
config := blockFilterConfigs{}
err := json.Unmarshal(configJSON, &config)
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, err
}

beginBlockSingleEventFilters, beginBlockRollingWindowFilters, err := ParseLifecycleConfig(config.BeginBlockFilters)
if err != nil {
newErr := fmt.Errorf("error parsing begin_block_filters: %s", err)
return nil, nil, nil, nil, newErr
return nil, nil, nil, nil, nil, newErr
}
endBlockSingleEventFilters, endBlockRollingWindowFilters, err := ParseLifecycleConfig(config.EndBlockFilters)
if err != nil {
newErr := fmt.Errorf("error parsing end_block_filters: %s", err)
return nil, nil, nil, nil, newErr
return nil, nil, nil, nil, nil, newErr
}

messageTypeFilters, err := ParseTXMessageTypeConfig(config.MessageTypeFilters)
if err != nil {
newErr := fmt.Errorf("error parsing message_type_filters: %s", err)
return nil, nil, nil, nil, nil, newErr
}

return beginBlockSingleEventFilters, beginBlockRollingWindowFilters, endBlockSingleEventFilters, endBlockRollingWindowFilters, nil
return beginBlockSingleEventFilters, beginBlockRollingWindowFilters, endBlockSingleEventFilters, endBlockRollingWindowFilters, messageTypeFilters, nil
}

func ParseLifecycleConfig(lifecycleConfig []json.RawMessage) ([]filter.BlockEventFilter, []filter.RollingWindowBlockEventFilter, error) {
Expand Down Expand Up @@ -168,9 +187,77 @@ func ParseJSONFilterConfigFromType(filterType string, configJSON []byte) (filter
}
}

func ParseTXMessageTypeConfig(messageTypeConfigs []json.RawMessage) ([]filter.MessageTypeFilter, error) {
messageTypeFilters := []filter.MessageTypeFilter{}
for index, messageTypeConfig := range messageTypeConfigs {
newFilter := MessageTypeFilterConfig{}

err := json.Unmarshal(messageTypeConfig, &newFilter)
if err != nil {
parserError := fmt.Errorf("error parsing message type filter at index %d: %s", index, err)
return nil, parserError
}

err = validateMessageTypeFilterConfig(newFilter)

if err != nil {
parserError := fmt.Errorf("error parsing filter at index %d: %s", index, err)
return nil, parserError
}

switch {
case newFilter.Type == MessageTypeKey:
newFilter := filter.DefaultMessageTypeFilter{}
err := json.Unmarshal(messageTypeConfig, &newFilter)
if err != nil {
return nil, err
}
valid, err := newFilter.Valid()

if !valid || err != nil {
parserError := fmt.Errorf("error parsing filter at index %d: %s", index, err)
return nil, parserError
}
messageTypeFilters = append(messageTypeFilters, newFilter)
case newFilter.Type == MessageTypeRegex:
newFilter := filter.MessageTypeRegexFilter{}
err := json.Unmarshal(messageTypeConfig, &newFilter)
if err != nil {
return nil, err
}

newFilter, err = filter.NewRegexMessageTypeFilter(newFilter.MessageTypeRegexPattern)

if err != nil {
parserError := fmt.Errorf("error parsing filter at index %d: %s", index, err)
return nil, parserError
}

valid, err := newFilter.Valid()

if !valid || err != nil {
parserError := fmt.Errorf("error parsing filter at index %d: %s", index, err)
return nil, parserError
}
messageTypeFilters = append(messageTypeFilters, newFilter)
default:
parserError := fmt.Errorf("error parsing filter at index %d: unknown filter type \"%s\"", index, newFilter.Type)
return nil, parserError
}
}
return messageTypeFilters, nil
}

func validateBlockEventFilterConfig(config BlockEventFilterConfig) error {
if config.Type == "" {
return errors.New("filter config must have a type field")
}
return nil
}

func validateMessageTypeFilterConfig(config MessageTypeFilterConfig) error {
if config.Type == "" {
return errors.New("filter config must have a type field")
}
return nil
}
14 changes: 9 additions & 5 deletions config/index_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type indexBase struct {
TransactionIndexingEnabled bool `mapstructure:"index-transactions"`
ExitWhenCaughtUp bool `mapstructure:"exit-when-caught-up"`
BlockEventIndexingEnabled bool `mapstructure:"index-block-events"`
BlockEventFilterFile string `mapstructure:"block-event-filter-file"`
FilterFile string `mapstructure:"filter-file"`
Dry bool `mapstructure:"dry"`
}

Expand All @@ -56,7 +56,7 @@ func SetupIndexSpecificFlags(conf *IndexConfig, cmd *cobra.Command) {
cmd.PersistentFlags().BoolVar(&conf.Base.TransactionIndexingEnabled, "base.index-transactions", false, "enable transaction indexing?")
cmd.PersistentFlags().BoolVar(&conf.Base.BlockEventIndexingEnabled, "base.index-block-events", false, "enable block beginblocker and endblocker event indexing?")
// filter configs
cmd.PersistentFlags().StringVar(&conf.Base.BlockEventFilterFile, "base.block-event-filter-file", "", "path to a file containing a JSON list of block event filters to apply to beginblocker and endblocker events")
cmd.PersistentFlags().StringVar(&conf.Base.FilterFile, "base.filter-file", "", "path to a file containing a JSON config of block event and message type filters to apply to beginblocker events, endblocker events and TX messages")
// other base setting
cmd.PersistentFlags().BoolVar(&conf.Base.Dry, "base.dry", false, "index the chain but don't insert data in the DB.")
cmd.PersistentFlags().StringVar(&conf.Base.API, "base.api", "", "node api endpoint")
Expand Down Expand Up @@ -108,10 +108,10 @@ func (conf *IndexConfig) Validate() error {
}
}

if conf.Base.BlockEventIndexingEnabled && conf.Base.BlockEventFilterFile != "" {
if conf.Base.FilterFile != "" {
// check if file exists
if _, err := os.Stat(conf.Base.BlockEventFilterFile); os.IsNotExist(err) {
return fmt.Errorf("base.block-event-filter-file %s does not exist", conf.Base.BlockEventFilterFile)
if _, err := os.Stat(conf.Base.FilterFile); os.IsNotExist(err) {
return fmt.Errorf("base.filter-file %s does not exist", conf.Base.FilterFile)
}
}

Expand Down Expand Up @@ -149,6 +149,10 @@ func CheckSuperfluousIndexKeys(keys []string) []string {
validKeys[key] = struct{}{}
}

for _, key := range getValidConfigKeys(flags{}, "flags") {
validKeys[key] = struct{}{}
}

// Check keys
ignoredKeys := make([]string, 0)
for _, key := range keys {
Expand Down
56 changes: 56 additions & 0 deletions core/decoding.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package core

import (
"errors"

probeClient "github.com/DefiantLabs/probe/client"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/types/tx"
)

// Provides an in-app tx decoder.
// The primary use-case for this function is to allow fallback decoding if a TX fails to decode after RPC requests.
// This can happen in a number of scenarios, but mainly due to missing proto definitions.
// We can attempt a personal decode of the TX, and see if we can continue indexing based on in-app conditions (such as message type filters).
// This function skips a large chunk of decoding validations, and is not recommended for general use. Its main point is to skip errors that in
// default Cosmos TX decoders would cause the entire decode to fail.
func InAppTxDecoder(cdc probeClient.Codec) sdk.TxDecoder {
return func(txBytes []byte) (sdk.Tx, error) {
var raw tx.TxRaw
var err error

err = cdc.Marshaler.Unmarshal(txBytes, &raw)
if err != nil {
return nil, err
}

var body tx.TxBody

err = body.Unmarshal(raw.BodyBytes)
if err != nil {
return nil, errors.New("failed to unmarshal tx body")
}

for _, any := range body.Messages {
var msg sdk.Msg
// We deliberately ignore errors here to build up a
// list of properly decoded messages for later analysis.
cdc.Marshaler.UnpackAny(any, &msg) //nolint:errcheck
}

var authInfo tx.AuthInfo

err = cdc.Marshaler.Unmarshal(raw.AuthInfoBytes, &authInfo)
if err != nil {
return nil, errors.New("failed to unmarshal auth info")
}

theTx := &tx.Tx{
Body: &body,
AuthInfo: &authInfo,
Signatures: raw.Signatures,
}

return theTx, nil
}
}
Loading

0 comments on commit 7b44d8e

Please sign in to comment.