Skip to content

Commit

Permalink
Normalize sdk v0.4x and v0.5x block events by unmerging sdk v0.5x Fin…
Browse files Browse the repository at this point in the history
…alizeBlockEvents back into BeginBlock and EndBlock events
  • Loading branch information
pharr117 committed Sep 14, 2024
1 parent 4b15faa commit dea4d6d
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 15 deletions.
4 changes: 2 additions & 2 deletions core/block_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import (
"github.com/DefiantLabs/cosmos-indexer/db/models"
"github.com/DefiantLabs/cosmos-indexer/filter"
"github.com/DefiantLabs/cosmos-indexer/parsers"
ctypes "github.com/cometbft/cometbft/rpc/core/types"
"github.com/DefiantLabs/cosmos-indexer/rpc"
)

func ProcessRPCBlockResults(conf config.IndexConfig, block models.Block, blockResults *ctypes.ResultBlockResults, customBeginBlockParsers map[string][]parsers.BlockEventParser, customEndBlockParsers map[string][]parsers.BlockEventParser) (*db.BlockDBWrapper, error) {
func ProcessRPCBlockResults(conf config.IndexConfig, block models.Block, blockResults *rpc.CustomBlockResults, customBeginBlockParsers map[string][]parsers.BlockEventParser, customEndBlockParsers map[string][]parsers.BlockEventParser) (*db.BlockDBWrapper, error) {
var blockDBWrapper db.BlockDBWrapper

blockDBWrapper.Block = &block
Expand Down
3 changes: 2 additions & 1 deletion core/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/DefiantLabs/cosmos-indexer/config"
"github.com/DefiantLabs/cosmos-indexer/db/models"
"github.com/DefiantLabs/cosmos-indexer/rpc"
ctypes "github.com/cometbft/cometbft/rpc/core/types"
sdkTypes "github.com/cosmos/cosmos-sdk/types"
)
Expand All @@ -24,7 +25,7 @@ const (
type FailedBlockHandler func(height int64, code BlockProcessingFailure, err error)

// Process RPC Block data into the model object used by the application.
func ProcessBlock(blockData *ctypes.ResultBlock, blockResultsData *ctypes.ResultBlockResults, chainID uint) (models.Block, error) {
func ProcessBlock(blockData *ctypes.ResultBlock, blockResultsData *rpc.CustomBlockResults, chainID uint) (models.Block, error) {
block := models.Block{
Height: blockData.Block.Height,
ChainID: chainID,
Expand Down
66 changes: 63 additions & 3 deletions core/rpc_worker.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package core

import (
"fmt"
"net/http"
"sync"

"github.com/DefiantLabs/cosmos-indexer/config"
dbTypes "github.com/DefiantLabs/cosmos-indexer/db"
"github.com/DefiantLabs/cosmos-indexer/rpc"
"github.com/DefiantLabs/probe/client"
abci "github.com/cometbft/cometbft/abci/types"
ctypes "github.com/cometbft/cometbft/rpc/core/types"
txTypes "github.com/cosmos/cosmos-sdk/types/tx"
"gorm.io/gorm"
Expand All @@ -16,7 +18,7 @@ import (
// Wrapper types for gathering full dataset.
type IndexerBlockEventData struct {
BlockData *ctypes.ResultBlock
BlockResultsData *ctypes.ResultBlockResults
BlockResultsData *rpc.CustomBlockResults
BlockEventRequestsFailed bool
GetTxsResponse *txTypes.GetTxsEventResponse
TxRequestsFailed bool
Expand Down Expand Up @@ -78,7 +80,16 @@ func BlockRPCWorker(wg *sync.WaitGroup, blockEnqueueChan chan *EnqueueData, chai
currentHeightIndexerData.BlockResultsData = nil
currentHeightIndexerData.BlockEventRequestsFailed = true
} else {
currentHeightIndexerData.BlockResultsData = bresults
bresults, err = NormalizeCustomBlockResults(bresults)
if err != nil {
config.Log.Errorf("Error normalizing block results for block %v from RPC. Err: %v", block, err)
err := dbTypes.UpsertFailedEventBlock(db, block.Height, chainStringID, cfg.Probe.ChainName)
if err != nil {
config.Log.Fatal("Failed to insert failed block event", err)
}
} else {
currentHeightIndexerData.BlockResultsData = bresults
}
}
}

Expand Down Expand Up @@ -106,7 +117,16 @@ func BlockRPCWorker(wg *sync.WaitGroup, blockEnqueueChan chan *EnqueueData, chai
// Only set failed when we can't get the block results either.
currentHeightIndexerData.TxRequestsFailed = true
} else {
currentHeightIndexerData.BlockResultsData = bresults
bresults, err = NormalizeCustomBlockResults(bresults)
if err != nil {
config.Log.Errorf("Error normalizing block results for block %v from RPC. Err: %v", block, err)
err := dbTypes.UpsertFailedBlock(db, block.Height, chainStringID, cfg.Probe.ChainName)
if err != nil {
config.Log.Fatal("Failed to insert failed block", err)
}
} else {
currentHeightIndexerData.BlockResultsData = bresults
}
}

}
Expand All @@ -118,3 +138,43 @@ func BlockRPCWorker(wg *sync.WaitGroup, blockEnqueueChan chan *EnqueueData, chai
outputChannel <- currentHeightIndexerData
}
}

func NormalizeCustomBlockResults(blockResults *rpc.CustomBlockResults) (*rpc.CustomBlockResults, error) {
if len(blockResults.FinalizeBlockEvents) != 0 {
beginBlockEvents := []abci.Event{}
endBlockEvents := []abci.Event{}

for _, event := range blockResults.FinalizeBlockEvents {
eventAttrs := []abci.EventAttribute{}
isBeginBlock := false
isEndBlock := false
for _, attr := range event.Attributes {
if attr.Key == "mode" {
if attr.Value == "BeginBlock" {
isBeginBlock = true
} else if attr.Value == "EndBlock" {
isEndBlock = true
}
} else {
eventAttrs = append(eventAttrs, attr)
}
}

switch {
case isBeginBlock && isEndBlock:
return nil, fmt.Errorf("finalize block event has both BeginBlock and EndBlock mode")
case !isBeginBlock && !isEndBlock:
return nil, fmt.Errorf("finalize block event has neither BeginBlock nor EndBlock mode")
case isBeginBlock:
beginBlockEvents = append(beginBlockEvents, abci.Event{Type: event.Type, Attributes: eventAttrs})
case isEndBlock:
endBlockEvents = append(endBlockEvents, abci.Event{Type: event.Type, Attributes: eventAttrs})
}
}

blockResults.BeginBlockEvents = append(blockResults.BeginBlockEvents, beginBlockEvents...)
blockResults.EndBlockEvents = append(blockResults.EndBlockEvents, endBlockEvents...)
}

return blockResults, nil
}
3 changes: 2 additions & 1 deletion core/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/DefiantLabs/cosmos-indexer/db/models"
"github.com/DefiantLabs/cosmos-indexer/filter"
"github.com/DefiantLabs/cosmos-indexer/parsers"
"github.com/DefiantLabs/cosmos-indexer/rpc"
"github.com/DefiantLabs/cosmos-indexer/util"
"github.com/DefiantLabs/probe/client"
coretypes "github.com/cometbft/cometbft/rpc/core/types"
Expand All @@ -31,7 +32,7 @@ func getUnexportedField(field reflect.Value) interface{} {
return reflect.NewAt(field.Type(), unsafe.Pointer(field.UnsafeAddr())).Elem().Interface()
}

func ProcessRPCBlockByHeightTXs(cfg *config.IndexConfig, db *gorm.DB, cl *client.ChainClient, messageTypeFilters []filter.MessageTypeFilter, messageFilters []filter.MessageFilter, blockResults *coretypes.ResultBlock, resultBlockRes *coretypes.ResultBlockResults, customParsers map[string][]parsers.MessageParser) ([]dbTypes.TxDBWrapper, *time.Time, error) {
func ProcessRPCBlockByHeightTXs(cfg *config.IndexConfig, db *gorm.DB, cl *client.ChainClient, messageTypeFilters []filter.MessageTypeFilter, messageFilters []filter.MessageFilter, blockResults *coretypes.ResultBlock, resultBlockRes *rpc.CustomBlockResults, customParsers map[string][]parsers.MessageParser) ([]dbTypes.TxDBWrapper, *time.Time, error) {
if len(blockResults.Block.Txs) != len(resultBlockRes.TxsResults) {
config.Log.Fatalf("blockResults & resultBlockRes: different length")
}
Expand Down
4 changes: 1 addition & 3 deletions indexer/registration.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,15 @@ func (indexer *Indexer) RegisterCustomModuleBasics(basics []module.AppModuleBasi
}

func (indexer *Indexer) RegisterCustomMsgTypesByTypeURLs(customMessageTypeURLSToTypes map[string]sdkTypes.Msg) error {

if indexer.CustomMsgTypeRegistry == nil {
indexer.CustomMsgTypeRegistry = make(map[string]sdkTypes.Msg)
}

for url, msg := range customMessageTypeURLSToTypes {
if _, ok := indexer.CustomMsgTypeRegistry[url]; ok {
return fmt.Errorf("found duplicate message type with URL \"%s\", message types must be uniquely identified", url)
} else {
indexer.CustomMsgTypeRegistry[url] = msg
}
indexer.CustomMsgTypeRegistry[url] = msg
}

return nil
Expand Down
22 changes: 17 additions & 5 deletions rpc/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ import (
"time"

"github.com/DefiantLabs/cosmos-indexer/config"
abci "github.com/cometbft/cometbft/abci/types"
tmjson "github.com/cometbft/cometbft/libs/json"
ctypes "github.com/cometbft/cometbft/rpc/core/types"
cmtproto "github.com/cometbft/cometbft/proto/tendermint/types"
jsonrpc "github.com/cometbft/cometbft/rpc/jsonrpc/client"
types "github.com/cometbft/cometbft/rpc/jsonrpc/types"
)
Expand Down Expand Up @@ -140,8 +141,19 @@ func validateResponseID(id interface{}) error {
return nil
}

func (c *URIClient) DoBlockResults(ctx context.Context, height *int64) (*ctypes.ResultBlockResults, error) {
result := new(ctypes.ResultBlockResults)
// This type **should** cover SDK v0.4x and v0.50, but updates will need to be monitored
type CustomBlockResults struct {
Height int64 `json:"height"`
TxsResults []*abci.ResponseDeliverTx `json:"txs_results"`
BeginBlockEvents []abci.Event `json:"begin_block_events"`
EndBlockEvents []abci.Event `json:"end_block_events"`
ValidatorUpdates []abci.ValidatorUpdate `json:"validator_updates"`
ConsensusParamUpdates *cmtproto.ConsensusParams `json:"consensus_param_updates"`
FinalizeBlockEvents []abci.Event `json:"finalize_block_events"`
}

func (c *URIClient) DoBlockResults(ctx context.Context, height *int64) (*CustomBlockResults, error) {
result := new(CustomBlockResults)
params := make(map[string]interface{})
if height != nil {
params["height"] = height
Expand All @@ -155,7 +167,7 @@ func (c *URIClient) DoBlockResults(ctx context.Context, height *int64) (*ctypes.
return result, nil
}

func GetBlockResult(client URIClient, height int64) (*ctypes.ResultBlockResults, error) {
func GetBlockResult(client URIClient, height int64) (*CustomBlockResults, error) {
brctx, cancel := context.WithTimeout(context.Background(), 100*time.Second)
defer cancel()

Expand All @@ -167,7 +179,7 @@ func GetBlockResult(client URIClient, height int64) (*ctypes.ResultBlockResults,
return bresults, nil
}

func GetBlockResultWithRetry(client URIClient, height int64, retryMaxAttempts int64, retryMaxWaitSeconds uint64) (*ctypes.ResultBlockResults, error) {
func GetBlockResultWithRetry(client URIClient, height int64, retryMaxAttempts int64, retryMaxWaitSeconds uint64) (*CustomBlockResults, error) {
if retryMaxAttempts == 0 {
return GetBlockResult(client, height)
}
Expand Down

0 comments on commit dea4d6d

Please sign in to comment.