From 7e6a14e0a1e7a6089b2523a6bc4e14ea2492c9fe Mon Sep 17 00:00:00 2001 From: Borja Aranda Date: Thu, 16 Jan 2025 14:50:08 +0100 Subject: [PATCH] update models, queries and storers --- pkg/blockchain/interface.go | 1 + pkg/config/options.go | 2 + pkg/db/queries.sql | 20 ++++++++ pkg/db/queries/models.go | 6 +-- pkg/db/queries/queries.sql.go | 50 +++++++++++++++++-- pkg/indexer/indexer.go | 18 ++++--- pkg/indexer/storer/groupMessage.go | 46 ++++++++++++++--- pkg/indexer/storer/identityUpdate.go | 36 ++++++++++++- pkg/indexer/storer/interface.go | 2 +- pkg/indexer/storer/logAppend.go | 49 ++++++++++++++++++ .../00004_add_blockchain_columns.down.sql | 12 +++++ .../00004_add_blockchain_columns.up.sql | 14 ++++++ .../00004_update-gateway-envelopes.sql | 6 --- 13 files changed, 233 insertions(+), 29 deletions(-) create mode 100644 pkg/indexer/storer/logAppend.go create mode 100644 pkg/migrations/00004_add_blockchain_columns.down.sql create mode 100644 pkg/migrations/00004_add_blockchain_columns.up.sql delete mode 100644 pkg/migrations/00004_update-gateway-envelopes.sql diff --git a/pkg/blockchain/interface.go b/pkg/blockchain/interface.go index 904e4f41..fc7e049e 100644 --- a/pkg/blockchain/interface.go +++ b/pkg/blockchain/interface.go @@ -30,6 +30,7 @@ type ChainClient interface { ethereum.BlockNumberReader ethereum.LogFilterer ethereum.ChainIDReader + ethereum.ChainReader } type TransactionSigner interface { diff --git a/pkg/config/options.go b/pkg/config/options.go index ea8988c1..9c7cebd7 100644 --- a/pkg/config/options.go +++ b/pkg/config/options.go @@ -16,6 +16,8 @@ type ContractsOptions struct { ChainID int `long:"chain-id" env:"XMTPD_CONTRACTS_CHAIN_ID" description:"Chain ID for the appchain" default:"31337"` RefreshInterval time.Duration `long:"refresh-interval" env:"XMTPD_CONTRACTS_REFRESH_INTERVAL" description:"Refresh interval for the nodes registry" default:"60s"` MaxChainDisconnectTime time.Duration `long:"max-chain-disconnect-time" env:"XMTPD_CONTRACTS_MAX_CHAIN_DISCONNECT_TIME" description:"Maximum time to allow the node to operate while disconnected" default:"300s"` + // TODO: Calculate the blocks safe distance in the L3 or risk tolerance we assume + SafeBlockDistance uint64 `long:"safe-block-distance" env:"XMTPD_CONTRACTS_SAFE_BLOCK_DISTANCE" description:"Safe block distance" default:"0"` } type DbOptions struct { diff --git a/pkg/db/queries.sql b/pkg/db/queries.sql index c1a4d684..9af95fec 100644 --- a/pkg/db/queries.sql +++ b/pkg/db/queries.sql @@ -122,3 +122,23 @@ FROM WHERE contract_address = @contract_address; +-- name: GetEnvelopeVersion :one +SELECT + version +FROM + gateway_envelopes +WHERE + originator_node_id = $1 + AND originator_sequence_id = $2 + AND is_canonical = TRUE; + +-- name: InvalidateEnvelope :exec +UPDATE + gateway_envelopes +SET + is_canonical = FALSE +WHERE + originator_node_id = $1 + AND originator_sequence_id = $2 + AND is_canonical = TRUE; + diff --git a/pkg/db/queries/models.go b/pkg/db/queries/models.go index 06f422cc..af3a6094 100644 --- a/pkg/db/queries/models.go +++ b/pkg/db/queries/models.go @@ -22,10 +22,10 @@ type GatewayEnvelope struct { OriginatorSequenceID int64 Topic []byte OriginatorEnvelope []byte - BlockNumber int64 + BlockNumber sql.NullInt64 BlockHash []byte - Version int32 - IsCanonical bool + Version sql.NullInt32 + IsCanonical sql.NullBool } type LatestBlock struct { diff --git a/pkg/db/queries/queries.sql.go b/pkg/db/queries/queries.sql.go index 018c3154..95e3ea67 100644 --- a/pkg/db/queries/queries.sql.go +++ b/pkg/db/queries/queries.sql.go @@ -75,6 +75,29 @@ func (q *Queries) GetAddressLogs(ctx context.Context, addresses []string) ([]Get return items, nil } +const getEnvelopeVersion = `-- name: GetEnvelopeVersion :one +SELECT + version +FROM + gateway_envelopes +WHERE + originator_node_id = $1 + AND originator_sequence_id = $2 + AND is_canonical = TRUE +` + +type GetEnvelopeVersionParams struct { + OriginatorNodeID int32 + OriginatorSequenceID int64 +} + +func (q *Queries) GetEnvelopeVersion(ctx context.Context, arg GetEnvelopeVersionParams) (sql.NullInt32, error) { + row := q.db.QueryRowContext(ctx, getEnvelopeVersion, arg.OriginatorNodeID, arg.OriginatorSequenceID) + var version sql.NullInt32 + err := row.Scan(&version) + return version, err +} + const getLatestBlock = `-- name: GetLatestBlock :one SELECT block_number, @@ -150,10 +173,10 @@ type InsertGatewayEnvelopeParams struct { OriginatorSequenceID int64 Topic []byte OriginatorEnvelope []byte - BlockNumber int64 + BlockNumber sql.NullInt64 BlockHash []byte - Version int32 - IsCanonical bool + Version sql.NullInt32 + IsCanonical sql.NullBool } func (q *Queries) InsertGatewayEnvelope(ctx context.Context, arg InsertGatewayEnvelopeParams) (int64, error) { @@ -217,6 +240,27 @@ func (q *Queries) InsertStagedOriginatorEnvelope(ctx context.Context, arg Insert return i, err } +const invalidateEnvelope = `-- name: InvalidateEnvelope :exec +UPDATE + gateway_envelopes +SET + is_canonical = FALSE +WHERE + originator_node_id = $1 + AND originator_sequence_id = $2 + AND is_canonical = TRUE +` + +type InvalidateEnvelopeParams struct { + OriginatorNodeID int32 + OriginatorSequenceID int64 +} + +func (q *Queries) InvalidateEnvelope(ctx context.Context, arg InvalidateEnvelopeParams) error { + _, err := q.db.ExecContext(ctx, invalidateEnvelope, arg.OriginatorNodeID, arg.OriginatorSequenceID) + return err +} + const revokeAddressFromLog = `-- name: RevokeAddressFromLog :execrows UPDATE address_log diff --git a/pkg/indexer/indexer.go b/pkg/indexer/indexer.go index 82f2dfa5..877f4f3a 100644 --- a/pkg/indexer/indexer.go +++ b/pkg/indexer/indexer.go @@ -92,6 +92,7 @@ func (i *Indexer) StartIndexer( client, streamer.messagesChannel, streamer.messagesReorgChannel, + cfg.SafeBlockDistance, indexingLogger, storer.NewGroupMessageStorer(querier, indexingLogger, messagesContract), streamer.messagesBlockTracker, @@ -115,6 +116,7 @@ func (i *Indexer) StartIndexer( client, streamer.identityUpdatesChannel, streamer.identityUpdatesReorgChannel, + cfg.SafeBlockDistance, indexingLogger, storer.NewIdentityUpdateStorer( db, @@ -207,22 +209,24 @@ The only non-retriable errors should be things like malformed events or failed v */ func indexLogs( ctx context.Context, - client *ethclient.Client, + client blockchain.ChainClient, eventChannel <-chan types.Log, reorgChannel chan<- uint64, + safeBlockDistance uint64, logger *zap.Logger, logStorer storer.LogStorer, blockTracker IBlockTracker, ) { var errStorage storer.LogStorageError + // We don't need to listen for the ctx.Done() here, since the eventChannel will be closed when the parent context is canceled for event := range eventChannel { storedBlockNumber, storedBlockHash := blockTracker.GetLatestBlock() // TODO: Calculate the blocks safe distance in the L3 or risk tolerance we assume - // idea - SafeBlockDistance uint64 `env:"SAFE_BLOCK_DISTANCE" envDefault:"100"` - if event.BlockNumber > storedBlockNumber && event.BlockNumber-storedBlockNumber < 100 { - latestBlockHash, err := client.BlockByNumber(ctx, big.NewInt(int64(storedBlockNumber))) + if event.BlockNumber > storedBlockNumber && + event.BlockNumber-storedBlockNumber < safeBlockDistance { + latestBlock, err := client.BlockByNumber(ctx, big.NewInt(int64(storedBlockNumber))) if err != nil { logger.Error("error getting block", zap.Uint64("blockNumber", storedBlockNumber), @@ -232,11 +236,11 @@ func indexLogs( continue } - if !bytes.Equal(storedBlockHash, latestBlockHash.Hash().Bytes()) { + if !bytes.Equal(storedBlockHash, latestBlock.Hash().Bytes()) { logger.Warn("blockchain reorg detected", zap.Uint64("storedBlockNumber", storedBlockNumber), zap.String("storedBlockHash", hex.EncodeToString(storedBlockHash)), - zap.String("onchainBlockHash", latestBlockHash.Hash().String()), + zap.String("onchainBlockHash", latestBlock.Hash().String()), ) // TODO: Implement reorg handling: @@ -256,7 +260,7 @@ func indexLogs( Retry: for { - errStorage = logStorer.StoreLog(ctx, event) + errStorage = logStorer.StoreLog(ctx, event, false) if errStorage != nil { logger.Error("error storing log", zap.Error(errStorage)) if errStorage.ShouldRetry() { diff --git a/pkg/indexer/storer/groupMessage.go b/pkg/indexer/storer/groupMessage.go index afe24e09..a1613ed1 100644 --- a/pkg/indexer/storer/groupMessage.go +++ b/pkg/indexer/storer/groupMessage.go @@ -2,6 +2,7 @@ package storer import ( "context" + "database/sql" "errors" "github.com/ethereum/go-ethereum/core/types" @@ -13,6 +14,11 @@ import ( "google.golang.org/protobuf/proto" ) +const ( + // We may not want to hardcode this to 0 and have an originator ID for each smart contract? + GROUP_MESSAGE_ORIGINATOR_ID = 0 +) + type GroupMessageStorer struct { contract *groupmessages.GroupMessages queries *queries.Queries @@ -32,7 +38,11 @@ func NewGroupMessageStorer( } // Validate and store a group message log event -func (s *GroupMessageStorer) StoreLog(ctx context.Context, event types.Log) LogStorageError { +func (s *GroupMessageStorer) StoreLog( + ctx context.Context, + event types.Log, + appendLog bool, +) LogStorageError { msgSent, err := s.contract.ParseMessageSent(event) if err != nil { return NewLogStorageError(err, false) @@ -75,15 +85,37 @@ func (s *GroupMessageStorer) StoreLog(ctx context.Context, event types.Log) LogS return NewLogStorageError(err, false) } + version := sql.NullInt32{Int32: 1, Valid: true} + + if appendLog { + version, err = GetVersionForAppend( + ctx, + s.queries, + s.logger, + GROUP_MESSAGE_ORIGINATOR_ID, + int64(msgSent.SequenceId), + ) + if err != nil { + if !errors.Is(err, sql.ErrNoRows) { + return NewLogStorageError(err, true) + } + if errors.Is(err, sql.ErrNoRows) { + s.logger.Debug("No rows found for envelope, inserting new", + zap.Int("originator_node_id", GROUP_MESSAGE_ORIGINATOR_ID), + zap.Int64("originator_sequence_id", int64(msgSent.SequenceId)), + ) + } + } + } + s.logger.Debug("Inserting message from contract", zap.String("topic", topicStruct.String())) if _, err = s.queries.InsertGatewayEnvelope(ctx, queries.InsertGatewayEnvelopeParams{ - BlockNumber: int64(event.BlockNumber), - BlockHash: event.BlockHash.Bytes(), - Version: 1, // TODO: Make this dynamic - IsCanonical: true, // TODO: Make this dynamic - // We may not want to hardcode this to 0 and have an originator ID for each smart contract? - OriginatorNodeID: 0, + BlockNumber: sql.NullInt64{Int64: int64(event.BlockNumber), Valid: true}, + BlockHash: event.BlockHash.Bytes(), + Version: version, + IsCanonical: sql.NullBool{Bool: true, Valid: true}, + OriginatorNodeID: GROUP_MESSAGE_ORIGINATOR_ID, OriginatorSequenceID: int64(msgSent.SequenceId), Topic: topicStruct.Bytes(), OriginatorEnvelope: originatorEnvelopeBytes, diff --git a/pkg/indexer/storer/identityUpdate.go b/pkg/indexer/storer/identityUpdate.go index 737923fc..daf29fdf 100644 --- a/pkg/indexer/storer/identityUpdate.go +++ b/pkg/indexer/storer/identityUpdate.go @@ -3,6 +3,7 @@ package storer import ( "context" "database/sql" + "errors" "fmt" "time" @@ -23,6 +24,7 @@ import ( ) const ( + // We may not want to hardcode this to 1 and have an originator ID for each smart contract? IDENTITY_UPDATE_ORIGINATOR_ID = 1 ) @@ -48,7 +50,11 @@ func NewIdentityUpdateStorer( } // Validate and store an identity update log event -func (s *IdentityUpdateStorer) StoreLog(ctx context.Context, event types.Log) LogStorageError { +func (s *IdentityUpdateStorer) StoreLog( + ctx context.Context, + event types.Log, + appendLog bool, +) LogStorageError { msgSent, err := s.contract.ParseIdentityUpdateCreated(event) if err != nil { return NewLogStorageError(err, false) @@ -166,8 +172,34 @@ func (s *IdentityUpdateStorer) StoreLog(ctx context.Context, event types.Log) Lo return NewLogStorageError(err, true) } + version := sql.NullInt32{Int32: 1, Valid: true} + + if appendLog { + version, err = GetVersionForAppend( + ctx, + querier, + s.logger, + IDENTITY_UPDATE_ORIGINATOR_ID, + int64(msgSent.SequenceId), + ) + if err != nil { + if !errors.Is(err, sql.ErrNoRows) { + return NewLogStorageError(err, true) + } + if errors.Is(err, sql.ErrNoRows) { + s.logger.Debug("No rows found for envelope, inserting new", + zap.Int("originator_node_id", IDENTITY_UPDATE_ORIGINATOR_ID), + zap.Int64("originator_sequence_id", int64(msgSent.SequenceId)), + ) + } + } + } + if _, err = querier.InsertGatewayEnvelope(ctx, queries.InsertGatewayEnvelopeParams{ - // We may not want to hardcode this to 1 and have an originator ID for each smart contract? + BlockNumber: sql.NullInt64{Int64: int64(event.BlockNumber), Valid: true}, + BlockHash: event.BlockHash.Bytes(), + Version: version, + IsCanonical: sql.NullBool{Bool: true, Valid: true}, OriginatorNodeID: IDENTITY_UPDATE_ORIGINATOR_ID, OriginatorSequenceID: int64(msgSent.SequenceId), Topic: messageTopic.Bytes(), diff --git a/pkg/indexer/storer/interface.go b/pkg/indexer/storer/interface.go index 71a1651e..aedb569e 100644 --- a/pkg/indexer/storer/interface.go +++ b/pkg/indexer/storer/interface.go @@ -8,5 +8,5 @@ import ( // Takes a log event and stores it, returning either an error that may be retriable, non-retriable, or nil type LogStorer interface { - StoreLog(ctx context.Context, event types.Log) LogStorageError + StoreLog(ctx context.Context, event types.Log, appendLog bool) LogStorageError } diff --git a/pkg/indexer/storer/logAppend.go b/pkg/indexer/storer/logAppend.go new file mode 100644 index 00000000..73118913 --- /dev/null +++ b/pkg/indexer/storer/logAppend.go @@ -0,0 +1,49 @@ +package storer + +import ( + "context" + "database/sql" + "errors" + + "github.com/xmtp/xmtpd/pkg/db/queries" + "go.uber.org/zap" +) + +func GetVersionForAppend( + ctx context.Context, + querier *queries.Queries, + logger *zap.Logger, + originatorNodeID int32, + sequenceID int64, +) (sql.NullInt32, error) { + var version sql.NullInt32 + currentVersion, err := querier.GetEnvelopeVersion(ctx, queries.GetEnvelopeVersionParams{ + OriginatorNodeID: originatorNodeID, + OriginatorSequenceID: sequenceID, + }) + if err != nil && !errors.Is(err, sql.ErrNoRows) { + logger.Error("Error getting current version", zap.Error(err)) + return version, err + } + + if errors.Is(err, sql.ErrNoRows) { + return version, err + } + + if err == nil { + if err = querier.InvalidateEnvelope(ctx, queries.InvalidateEnvelopeParams{ + OriginatorNodeID: originatorNodeID, + OriginatorSequenceID: sequenceID, + }); err != nil { + logger.Error("Error invalidating old envelope", zap.Error(err)) + return version, err + } + + version = sql.NullInt32{ + Int32: currentVersion.Int32 + 1, + Valid: true, + } + } + + return version, nil +} diff --git a/pkg/migrations/00004_add_blockchain_columns.down.sql b/pkg/migrations/00004_add_blockchain_columns.down.sql new file mode 100644 index 00000000..d395e683 --- /dev/null +++ b/pkg/migrations/00004_add_blockchain_columns.down.sql @@ -0,0 +1,12 @@ +-- Drop everything in reverse order +DROP INDEX IF EXISTS idx_gateway_envelopes_reorg; + +ALTER TABLE gateway_envelopes + DROP CONSTRAINT IF EXISTS blockchain_message_constraint; + +ALTER TABLE gateway_envelopes + DROP COLUMN IF EXISTS block_number, + DROP COLUMN IF EXISTS block_hash, + DROP COLUMN IF EXISTS version, + DROP COLUMN IF EXISTS is_canonical; + diff --git a/pkg/migrations/00004_add_blockchain_columns.up.sql b/pkg/migrations/00004_add_blockchain_columns.up.sql new file mode 100644 index 00000000..da18a2ee --- /dev/null +++ b/pkg/migrations/00004_add_blockchain_columns.up.sql @@ -0,0 +1,14 @@ +-- Add blockchain-related columns and constraint +ALTER TABLE gateway_envelopes + ADD COLUMN block_number BIGINT, + ADD COLUMN block_hash BYTEA, + ADD COLUMN version INT, + ADD COLUMN is_canonical BOOLEAN; + +ALTER TABLE gateway_envelopes + ADD CONSTRAINT blockchain_message_constraint CHECK ((block_number IS NULL AND block_hash IS NULL AND version IS NULL AND is_canonical IS NULL) OR (block_number IS NOT NULL AND block_hash IS NOT NULL AND version IS NOT NULL AND is_canonical IS NOT NULL)); + +CREATE INDEX idx_gateway_envelopes_reorg ON gateway_envelopes(block_number DESC, block_hash) +WHERE + block_number IS NOT NULL AND is_canonical = TRUE; + diff --git a/pkg/migrations/00004_update-gateway-envelopes.sql b/pkg/migrations/00004_update-gateway-envelopes.sql deleted file mode 100644 index 41b6162f..00000000 --- a/pkg/migrations/00004_update-gateway-envelopes.sql +++ /dev/null @@ -1,6 +0,0 @@ -ALTER TABLE gateway_envelopes - ADD COLUMN block_number BIGINT NOT NULL, - ADD COLUMN block_hash BYTEA NOT NULL, - ADD COLUMN version INT NOT NULL DEFAULT 1, - ADD COLUMN is_canonical BOOLEAN NOT NULL DEFAULT TRUE; -