From c5909d168f6e4d034bcf71da76cababbc6b60d90 Mon Sep 17 00:00:00 2001 From: Borja Aranda Date: Fri, 31 Jan 2025 19:00:18 +0100 Subject: [PATCH] update canonicality with a lock --- pkg/db/queries.sql | 20 ++++++++++++++------ pkg/db/queries/queries.sql.go | 31 ++++++++++++++++++++++--------- pkg/indexer/indexer.go | 2 +- pkg/indexer/reorgHandler.go | 13 ++++++++----- 4 files changed, 45 insertions(+), 21 deletions(-) diff --git a/pkg/db/queries.sql b/pkg/db/queries.sql index 71113788..7311d76e 100644 --- a/pkg/db/queries.sql +++ b/pkg/db/queries.sql @@ -124,12 +124,12 @@ WHERE -- name: GetLatestCursor :many SELECT - originator_node_id, - MAX(originator_sequence_id)::BIGINT AS max_sequence_id + originator_node_id, + MAX(originator_sequence_id)::BIGINT AS max_sequence_id FROM - gateway_envelopes + gateway_envelopes GROUP BY - originator_node_id; + originator_node_id; -- name: InsertBlockchainMessage :exec INSERT INTO blockchain_messages(block_number, block_hash, originator_node_id, originator_sequence_id, is_canonical) @@ -157,9 +157,17 @@ ORDER BY -- name: UpdateBlocksCanonicalityInRange :exec UPDATE - blockchain_messages + blockchain_messages AS bm SET is_canonical = FALSE +FROM ( + SELECT + block_number + FROM + blockchain_messages + WHERE + bm.block_number BETWEEN @start_block_number AND @end_block_number + FOR UPDATE) AS locked_rows WHERE - block_number >= @reorg_block_number; + bm.block_number = locked_rows.block_number; diff --git a/pkg/db/queries/queries.sql.go b/pkg/db/queries/queries.sql.go index abe292e4..71f0fcca 100644 --- a/pkg/db/queries/queries.sql.go +++ b/pkg/db/queries/queries.sql.go @@ -100,7 +100,7 @@ type GetBlocksInRangeRow struct { BlockHash []byte } -// Returns blocks in descending order (newest to oldest) +// Returns blocks in ascending order (oldest to newest) // StartBlock should be the lower bound (older block) // EndBlock should be the upper bound (newer block) // Example: GetBlocksInRange(1000, 2000), returns 1000, 1001, 1002, ..., 2000 @@ -151,12 +151,12 @@ func (q *Queries) GetLatestBlock(ctx context.Context, contractAddress string) (G const getLatestCursor = `-- name: GetLatestCursor :many SELECT - originator_node_id, - MAX(originator_sequence_id)::BIGINT AS max_sequence_id + originator_node_id, + MAX(originator_sequence_id)::BIGINT AS max_sequence_id FROM - gateway_envelopes + gateway_envelopes GROUP BY - originator_node_id + originator_node_id ` type GetLatestCursorRow struct { @@ -525,14 +525,27 @@ func (q *Queries) SetLatestBlock(ctx context.Context, arg SetLatestBlockParams) const updateBlocksCanonicalityInRange = `-- name: UpdateBlocksCanonicalityInRange :exec UPDATE - blockchain_messages + blockchain_messages AS bm SET is_canonical = FALSE +FROM ( + SELECT + block_number + FROM + blockchain_messages + WHERE + bm.block_number BETWEEN $1 AND $2 + FOR UPDATE) AS locked_rows WHERE - block_number >= $1 + bm.block_number = locked_rows.block_number ` -func (q *Queries) UpdateBlocksCanonicalityInRange(ctx context.Context, reorgBlockNumber uint64) error { - _, err := q.db.ExecContext(ctx, updateBlocksCanonicalityInRange, reorgBlockNumber) +type UpdateBlocksCanonicalityInRangeParams struct { + StartBlockNumber uint64 + EndBlockNumber uint64 +} + +func (q *Queries) UpdateBlocksCanonicalityInRange(ctx context.Context, arg UpdateBlocksCanonicalityInRangeParams) error { + _, err := q.db.ExecContext(ctx, updateBlocksCanonicalityInRange, arg.StartBlockNumber, arg.EndBlockNumber) return err } diff --git a/pkg/indexer/indexer.go b/pkg/indexer/indexer.go index df63a37b..5155b27b 100644 --- a/pkg/indexer/indexer.go +++ b/pkg/indexer/indexer.go @@ -233,7 +233,7 @@ func indexLogs( storedBlockNumber uint64 storedBlockHash []byte lastBlockSeen uint64 - reorgCheckInterval uint64 = 10 // TODO: Adapt based on blocks per batch + reorgCheckInterval uint64 = 10 // TODO(borja): Adapt based on blocks per batch reorgCheckAt uint64 reorgDetectedAt uint64 reorgBeginsAt uint64 diff --git a/pkg/indexer/reorgHandler.go b/pkg/indexer/reorgHandler.go index 2af9fe54..6eee9242 100644 --- a/pkg/indexer/reorgHandler.go +++ b/pkg/indexer/reorgHandler.go @@ -27,7 +27,7 @@ var ( ErrGetBlock = errors.New("failed to get block") ) -// TODO: Make this configurable? +// TODO(borja): Make this configurable? const BLOCK_RANGE_SIZE uint64 = 1000 func NewChainReorgHandler( @@ -42,7 +42,7 @@ func NewChainReorgHandler( } } -// TODO: When reorg range has been calculated, alert clients (TBD) +// TODO(borja): When reorg range has been calculated, alert clients (TBD) func (r *ReorgHandler) FindReorgPoint(detectedAt uint64) (uint64, []byte, error) { startBlock, endBlock := blockRange(detectedAt) @@ -84,16 +84,19 @@ func (r *ReorgHandler) FindReorgPoint(detectedAt uint64) (uint64, []byte, error) } // Oldest block matches, reorg happened in this range - blockNumber, blockHash, err := r.searchInRange(storedBlocks) + startedAt, blockHash, err := r.searchInRange(storedBlocks) if err != nil { return 0, nil, fmt.Errorf("failed to search reorg block in range: %w", err) } - if err := r.queries.UpdateBlocksCanonicalityInRange(r.ctx, blockNumber); err != nil { + if err := r.queries.UpdateBlocksCanonicalityInRange(r.ctx, queries.UpdateBlocksCanonicalityInRangeParams{ + StartBlockNumber: startedAt, + EndBlockNumber: detectedAt, + }); err != nil { return 0, nil, fmt.Errorf("failed to update block range canonicality: %w", err) } - return blockNumber, blockHash, nil + return startedAt, blockHash, nil } }