diff --git a/cmd/index.go b/cmd/index.go index 2ee5f3e5..a95c9114 100644 --- a/cmd/index.go +++ b/cmd/index.go @@ -439,11 +439,11 @@ func (idxr *Indexer) doDBUpdates(wg *sync.WaitGroup, txDataChan chan *dbData, bl // Note that this does not turn off certain reads or DB connections. if !idxr.dryRun { config.Log.Info(fmt.Sprintf("Indexing %v TXs from block %d", len(data.txDBWrappers), data.block.Height)) - err := dbTypes.IndexNewBlock(idxr.db, data.block, data.txDBWrappers) + err := dbTypes.IndexNewBlock(idxr.db, data.block, data.txDBWrappers, *idxr.cfg) if err != nil { // Do a single reattempt on failure dbReattempts++ - err = dbTypes.IndexNewBlock(idxr.db, data.block, data.txDBWrappers) + err = dbTypes.IndexNewBlock(idxr.db, data.block, data.txDBWrappers, *idxr.cfg) if err != nil { config.Log.Fatal(fmt.Sprintf("Error indexing block %v.", data.block.Height), err) } diff --git a/config/index_config.go b/config/index_config.go index 8bc99812..e8a428fd 100644 --- a/config/index_config.go +++ b/config/index_config.go @@ -15,6 +15,7 @@ type IndexConfig struct { Base indexBase Log log Probe Probe + Flags flags } type indexBase struct { @@ -38,6 +39,11 @@ type indexBase struct { Dry bool `mapstructure:"dry"` } +// Flags for specific, deeper indexing behavior +type flags struct { + IndexTxMessageRaw bool `mapstructure:"index-tx-message-raw"` +} + func SetupIndexSpecificFlags(conf *IndexConfig, cmd *cobra.Command) { // chain indexing cmd.PersistentFlags().Int64Var(&conf.Base.StartBlock, "base.start-block", 0, "block to start indexing at (use -1 to resume from highest block indexed)") @@ -61,6 +67,9 @@ func SetupIndexSpecificFlags(conf *IndexConfig, cmd *cobra.Command) { cmd.PersistentFlags().BoolVar(&conf.Base.ExitWhenCaughtUp, "base.exit-when-caught-up", false, "mainly used for Osmosis rewards indexing") cmd.PersistentFlags().Int64Var(&conf.Base.RequestRetryAttempts, "base.request-retry-attempts", 0, "number of RPC query retries to make") cmd.PersistentFlags().Uint64Var(&conf.Base.RequestRetryMaxWait, "base.request-retry-max-wait", 30, "max retry incremental backoff wait time in seconds") + + // flags + cmd.PersistentFlags().BoolVar(&conf.Flags.IndexTxMessageRaw, "flags.index-tx-message-raw", false, "if true, this will index the raw message bytes. This will significantly increase the size of the database.") } func (conf *IndexConfig) Validate() error { diff --git a/core/tx.go b/core/tx.go index 8b5f14e8..28c17fd4 100644 --- a/core/tx.go +++ b/core/tx.go @@ -109,9 +109,12 @@ func ProcessRPCBlockByHeightTXs(db *gorm.DB, cl *client.ChainClient, blockResult return nil, blockTime, fmt.Errorf("logs could not be parsed") } + var messagesRaw [][]byte + // Get the Messages and Message Logs for msgIdx, currMsg := range txFull.GetMsgs() { if currMsg != nil { + messagesRaw = append(messagesRaw, txFull.Body.Messages[msgIdx].Value) currMessages = append(currMessages, currMsg) msgEvents := types.StringEvents{} if txResult.Code == 0 { @@ -145,7 +148,7 @@ func ProcessRPCBlockByHeightTXs(db *gorm.DB, cl *client.ChainClient, blockResult indexerMergedTx.Tx = indexerTx indexerMergedTx.Tx.AuthInfo = *txFull.AuthInfo - processedTx, _, err := ProcessTx(db, indexerMergedTx) + processedTx, _, err := ProcessTx(db, indexerMergedTx, messagesRaw) if err != nil { return currTxDbWrappers, blockTime, err } @@ -182,12 +185,15 @@ func ProcessRPCTXs(db *gorm.DB, cl *client.ChainClient, txEventResp *cosmosTx.Ge var txBody txtypes.Body var currMessages []types.Msg var currLogMsgs []txtypes.LogMessage + var messagesRaw [][]byte + currTx := txEventResp.Txs[txIdx] currTxResp := txEventResp.TxResponses[txIdx] // Get the Messages and Message Logs for msgIdx := range currTx.Body.Messages { currMsg := currTx.Body.Messages[msgIdx].GetCachedValue() + messagesRaw = append(messagesRaw, currTx.Body.Messages[msgIdx].Value) if currMsg != nil { msg := currMsg.(types.Msg) currMessages = append(currMessages, msg) @@ -226,7 +232,7 @@ func ProcessRPCTXs(db *gorm.DB, cl *client.ChainClient, txEventResp *cosmosTx.Ge indexerMergedTx.Tx = indexerTx indexerMergedTx.Tx.AuthInfo = *currTx.AuthInfo - processedTx, txTime, err := ProcessTx(db, indexerMergedTx) + processedTx, txTime, err := ProcessTx(db, indexerMergedTx, messagesRaw) if err != nil { return currTxDbWrappers, blockTime, err } @@ -254,7 +260,7 @@ func ProcessRPCTXs(db *gorm.DB, cl *client.ChainClient, txEventResp *cosmosTx.Ge return currTxDbWrappers, blockTime, nil } -func ProcessTx(db *gorm.DB, tx txtypes.MergedTx) (txDBWapper dbTypes.TxDBWrapper, txTime time.Time, err error) { +func ProcessTx(db *gorm.DB, tx txtypes.MergedTx, messagesRaw [][]byte) (txDBWapper dbTypes.TxDBWrapper, txTime time.Time, err error) { txTime, err = time.Parse(time.RFC3339, tx.TxResponse.TimeStamp) if err != nil { config.Log.Error("Error parsing tx timestamp.", err) @@ -272,6 +278,7 @@ func ProcessTx(db *gorm.DB, tx txtypes.MergedTx) (txDBWapper dbTypes.TxDBWrapper if code == 0 { for messageIndex, message := range tx.Tx.Body.Messages { messageType, currMessageDBWrapper := ProcessMessage(messageIndex, message, tx.TxResponse.Log, uniqueEventTypes, uniqueEventAttributeKeys) + currMessageDBWrapper.Message.MessageBytes = messagesRaw[messageIndex] uniqueMessageTypes[messageType] = currMessageDBWrapper.Message.MessageType config.Log.Debug(fmt.Sprintf("[Block: %v] Found msg of type '%v'.", tx.TxResponse.Height, messageType)) messages = append(messages, currMessageDBWrapper) diff --git a/db/db.go b/db/db.go index 8912770e..39b659e6 100644 --- a/db/db.go +++ b/db/db.go @@ -217,7 +217,7 @@ func UpsertFailedEventBlock(db *gorm.DB, blockHeight int64, chainID string, chai }) } -func IndexNewBlock(db *gorm.DB, block models.Block, txs []TxDBWrapper) error { +func IndexNewBlock(db *gorm.DB, block models.Block, txs []TxDBWrapper, indexerConfig config.IndexConfig) error { // consider optimizing the transaction, but how? Ordering matters due to foreign key constraints // Order required: Block -> (For each Tx: Signer Address -> Tx -> (For each Message: Message -> Taxable Events)) // Also, foreign key relations are struct value based so create needs to be called first to get right foreign key ID @@ -374,13 +374,17 @@ func IndexNewBlock(db *gorm.DB, block models.Block, txs []TxDBWrapper) error { } } + if !indexerConfig.Flags.IndexTxMessageRaw { + tx.Messages[messageIndex].Message.MessageBytes = nil + } + messagesSlice = append(messagesSlice, &tx.Messages[messageIndex].Message) } if len(messagesSlice) != 0 { if err := dbTransaction.Clauses(clause.OnConflict{ Columns: []clause.Column{{Name: "tx_id"}, {Name: "message_index"}}, - DoUpdates: clause.AssignmentColumns([]string{"message_type_id"}), + DoUpdates: clause.AssignmentColumns([]string{"message_type_id", "message_bytes"}), }).Create(messagesSlice).Error; err != nil { config.Log.Error("Error getting/creating messages.", err) return err diff --git a/db/models/tx.go b/db/models/tx.go index 81031f96..739498bd 100644 --- a/db/models/tx.go +++ b/db/models/tx.go @@ -54,6 +54,7 @@ type Message struct { MessageTypeID uint `gorm:"foreignKey:MessageTypeID,index:idx_txid_typeid"` MessageType MessageType MessageIndex int `gorm:"uniqueIndex:messageIndex,priority:2"` + MessageBytes []byte } type FailedMessage struct {