Skip to content

Commit

Permalink
Merge pull request #49 from DefiantLabs/feat/optional-message-bytes-i…
Browse files Browse the repository at this point in the history
…ndexing

feat/optional message bytes indexing
  • Loading branch information
pharr117 authored Jan 1, 2024
2 parents 1006157 + e953b08 commit 433ab58
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 7 deletions.
4 changes: 2 additions & 2 deletions cmd/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,11 +439,11 @@ func (idxr *Indexer) doDBUpdates(wg *sync.WaitGroup, txDataChan chan *dbData, bl
// Note that this does not turn off certain reads or DB connections.
if !idxr.dryRun {
config.Log.Info(fmt.Sprintf("Indexing %v TXs from block %d", len(data.txDBWrappers), data.block.Height))
err := dbTypes.IndexNewBlock(idxr.db, data.block, data.txDBWrappers)
err := dbTypes.IndexNewBlock(idxr.db, data.block, data.txDBWrappers, *idxr.cfg)
if err != nil {
// Do a single reattempt on failure
dbReattempts++
err = dbTypes.IndexNewBlock(idxr.db, data.block, data.txDBWrappers)
err = dbTypes.IndexNewBlock(idxr.db, data.block, data.txDBWrappers, *idxr.cfg)
if err != nil {
config.Log.Fatal(fmt.Sprintf("Error indexing block %v.", data.block.Height), err)
}
Expand Down
9 changes: 9 additions & 0 deletions config/index_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type IndexConfig struct {
Base indexBase
Log log
Probe Probe
Flags flags
}

type indexBase struct {
Expand All @@ -38,6 +39,11 @@ type indexBase struct {
Dry bool `mapstructure:"dry"`
}

// Flags for specific, deeper indexing behavior
type flags struct {
IndexTxMessageRaw bool `mapstructure:"index-tx-message-raw"`
}

func SetupIndexSpecificFlags(conf *IndexConfig, cmd *cobra.Command) {
// chain indexing
cmd.PersistentFlags().Int64Var(&conf.Base.StartBlock, "base.start-block", 0, "block to start indexing at (use -1 to resume from highest block indexed)")
Expand All @@ -61,6 +67,9 @@ func SetupIndexSpecificFlags(conf *IndexConfig, cmd *cobra.Command) {
cmd.PersistentFlags().BoolVar(&conf.Base.ExitWhenCaughtUp, "base.exit-when-caught-up", false, "mainly used for Osmosis rewards indexing")
cmd.PersistentFlags().Int64Var(&conf.Base.RequestRetryAttempts, "base.request-retry-attempts", 0, "number of RPC query retries to make")
cmd.PersistentFlags().Uint64Var(&conf.Base.RequestRetryMaxWait, "base.request-retry-max-wait", 30, "max retry incremental backoff wait time in seconds")

// flags
cmd.PersistentFlags().BoolVar(&conf.Flags.IndexTxMessageRaw, "flags.index-tx-message-raw", false, "if true, this will index the raw message bytes. This will significantly increase the size of the database.")
}

func (conf *IndexConfig) Validate() error {
Expand Down
13 changes: 10 additions & 3 deletions core/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,12 @@ func ProcessRPCBlockByHeightTXs(db *gorm.DB, cl *client.ChainClient, blockResult
return nil, blockTime, fmt.Errorf("logs could not be parsed")
}

var messagesRaw [][]byte

// Get the Messages and Message Logs
for msgIdx, currMsg := range txFull.GetMsgs() {
if currMsg != nil {
messagesRaw = append(messagesRaw, txFull.Body.Messages[msgIdx].Value)
currMessages = append(currMessages, currMsg)
msgEvents := types.StringEvents{}
if txResult.Code == 0 {
Expand Down Expand Up @@ -145,7 +148,7 @@ func ProcessRPCBlockByHeightTXs(db *gorm.DB, cl *client.ChainClient, blockResult
indexerMergedTx.Tx = indexerTx
indexerMergedTx.Tx.AuthInfo = *txFull.AuthInfo

processedTx, _, err := ProcessTx(db, indexerMergedTx)
processedTx, _, err := ProcessTx(db, indexerMergedTx, messagesRaw)
if err != nil {
return currTxDbWrappers, blockTime, err
}
Expand Down Expand Up @@ -182,12 +185,15 @@ func ProcessRPCTXs(db *gorm.DB, cl *client.ChainClient, txEventResp *cosmosTx.Ge
var txBody txtypes.Body
var currMessages []types.Msg
var currLogMsgs []txtypes.LogMessage
var messagesRaw [][]byte

currTx := txEventResp.Txs[txIdx]
currTxResp := txEventResp.TxResponses[txIdx]

// Get the Messages and Message Logs
for msgIdx := range currTx.Body.Messages {
currMsg := currTx.Body.Messages[msgIdx].GetCachedValue()
messagesRaw = append(messagesRaw, currTx.Body.Messages[msgIdx].Value)
if currMsg != nil {
msg := currMsg.(types.Msg)
currMessages = append(currMessages, msg)
Expand Down Expand Up @@ -226,7 +232,7 @@ func ProcessRPCTXs(db *gorm.DB, cl *client.ChainClient, txEventResp *cosmosTx.Ge
indexerMergedTx.Tx = indexerTx
indexerMergedTx.Tx.AuthInfo = *currTx.AuthInfo

processedTx, txTime, err := ProcessTx(db, indexerMergedTx)
processedTx, txTime, err := ProcessTx(db, indexerMergedTx, messagesRaw)
if err != nil {
return currTxDbWrappers, blockTime, err
}
Expand Down Expand Up @@ -254,7 +260,7 @@ func ProcessRPCTXs(db *gorm.DB, cl *client.ChainClient, txEventResp *cosmosTx.Ge
return currTxDbWrappers, blockTime, nil
}

func ProcessTx(db *gorm.DB, tx txtypes.MergedTx) (txDBWapper dbTypes.TxDBWrapper, txTime time.Time, err error) {
func ProcessTx(db *gorm.DB, tx txtypes.MergedTx, messagesRaw [][]byte) (txDBWapper dbTypes.TxDBWrapper, txTime time.Time, err error) {
txTime, err = time.Parse(time.RFC3339, tx.TxResponse.TimeStamp)
if err != nil {
config.Log.Error("Error parsing tx timestamp.", err)
Expand All @@ -272,6 +278,7 @@ func ProcessTx(db *gorm.DB, tx txtypes.MergedTx) (txDBWapper dbTypes.TxDBWrapper
if code == 0 {
for messageIndex, message := range tx.Tx.Body.Messages {
messageType, currMessageDBWrapper := ProcessMessage(messageIndex, message, tx.TxResponse.Log, uniqueEventTypes, uniqueEventAttributeKeys)
currMessageDBWrapper.Message.MessageBytes = messagesRaw[messageIndex]
uniqueMessageTypes[messageType] = currMessageDBWrapper.Message.MessageType
config.Log.Debug(fmt.Sprintf("[Block: %v] Found msg of type '%v'.", tx.TxResponse.Height, messageType))
messages = append(messages, currMessageDBWrapper)
Expand Down
8 changes: 6 additions & 2 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func UpsertFailedEventBlock(db *gorm.DB, blockHeight int64, chainID string, chai
})
}

func IndexNewBlock(db *gorm.DB, block models.Block, txs []TxDBWrapper) error {
func IndexNewBlock(db *gorm.DB, block models.Block, txs []TxDBWrapper, indexerConfig config.IndexConfig) error {
// consider optimizing the transaction, but how? Ordering matters due to foreign key constraints
// Order required: Block -> (For each Tx: Signer Address -> Tx -> (For each Message: Message -> Taxable Events))
// Also, foreign key relations are struct value based so create needs to be called first to get right foreign key ID
Expand Down Expand Up @@ -374,13 +374,17 @@ func IndexNewBlock(db *gorm.DB, block models.Block, txs []TxDBWrapper) error {
}
}

if !indexerConfig.Flags.IndexTxMessageRaw {
tx.Messages[messageIndex].Message.MessageBytes = nil
}

messagesSlice = append(messagesSlice, &tx.Messages[messageIndex].Message)
}

if len(messagesSlice) != 0 {
if err := dbTransaction.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "tx_id"}, {Name: "message_index"}},
DoUpdates: clause.AssignmentColumns([]string{"message_type_id"}),
DoUpdates: clause.AssignmentColumns([]string{"message_type_id", "message_bytes"}),
}).Create(messagesSlice).Error; err != nil {
config.Log.Error("Error getting/creating messages.", err)
return err
Expand Down
1 change: 1 addition & 0 deletions db/models/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type Message struct {
MessageTypeID uint `gorm:"foreignKey:MessageTypeID,index:idx_txid_typeid"`
MessageType MessageType
MessageIndex int `gorm:"uniqueIndex:messageIndex,priority:2"`
MessageBytes []byte
}

type FailedMessage struct {
Expand Down

0 comments on commit 433ab58

Please sign in to comment.