Skip to content

Commit

Permalink
Merge pull request #157 from kaleido-io/add-warning-logs
Browse files Browse the repository at this point in the history
adding warning logs for rebuiding in memory chain
  • Loading branch information
Chengxuan authored Nov 14, 2024
2 parents d6c1c57 + 7914431 commit b561f24
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 6 deletions.
20 changes: 14 additions & 6 deletions internal/ethereum/blocklistener.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,22 +381,21 @@ func (bl *blockListener) handleNewBlock(mbi *minimalBlockInfo, addAfter *list.El
// a recent block advertisement. So we need to work backwards to the last point of consistency with the current
// chain and re-query the chain state from there.
func (bl *blockListener) rebuildCanonicalChain() *list.Element {

log.L(bl.ctx).Debugf("Rebuilding in-memory canonical chain")

// If none of our blocks were valid, start from the first block number we've notified about previously
lastValidBlock := bl.trimToLastValidBlock()
var nextBlockNumber int64
var expectedParentHash string
if lastValidBlock != nil {
nextBlockNumber = lastValidBlock.number + 1
log.L(bl.ctx).Infof("Canonical chain partially rebuilding from block %d", nextBlockNumber)
expectedParentHash = lastValidBlock.hash
} else {
firstBlock := bl.canonicalChain.Front()
if firstBlock == nil || firstBlock.Value == nil {
return nil
}
nextBlockNumber = firstBlock.Value.(*minimalBlockInfo).number
log.L(bl.ctx).Warnf("Canonical chain re-initialized at block %d", nextBlockNumber)
// Clear out the whole chain
bl.canonicalChain = bl.canonicalChain.Init()
}
Expand All @@ -414,7 +413,7 @@ func (bl *blockListener) rebuildCanonicalChain() *list.Element {
}
}
if bi == nil {
log.L(bl.ctx).Debugf("Block listener canonical chain view rebuilt to head at block %d", nextBlockNumber-1)
log.L(bl.ctx).Infof("Canonical chain rebuilt the chain to the head block %d", nextBlockNumber-1)
break
}
mbi := &minimalBlockInfo{
Expand All @@ -426,7 +425,7 @@ func (bl *blockListener) rebuildCanonicalChain() *list.Element {
// It's possible the chain will change while we're doing this, and we fall back to the next block notification
// to sort that out.
if expectedParentHash != "" && mbi.parentHash != expectedParentHash {
log.L(bl.ctx).Debugf("Block listener canonical chain view rebuilt up to new re-org at block %d", nextBlockNumber)
log.L(bl.ctx).Infof("Canonical chain rebuilding stopped at block: %d due to mismatch hash for parent block (%d): %s (expected: %s)", nextBlockNumber, nextBlockNumber-1, mbi.parentHash, expectedParentHash)
break
}
expectedParentHash = mbi.hash
Expand All @@ -452,13 +451,19 @@ func (bl *blockListener) rebuildCanonicalChain() *list.Element {
func (bl *blockListener) trimToLastValidBlock() (lastValidBlock *minimalBlockInfo) {
// First remove from the end until we get a block that matches the current un-cached query view from the chain
lastElem := bl.canonicalChain.Back()
var startingNumber *int64
for lastElem != nil && lastElem.Value != nil {

// Query the block that is no at this blockNumber
currentViewBlock := lastElem.Value.(*minimalBlockInfo)
if startingNumber == nil {
startingNumber = &currentViewBlock.number
log.L(bl.ctx).Debugf("Canonical chain checking from last block: %d", startingNumber)
}
var freshBlockInfo *blockInfoJSONRPC
var reason ffcapi.ErrorReason
err := bl.c.retry.Do(bl.ctx, "rebuild listener canonical chain", func(_ int) (retry bool, err error) {
log.L(bl.ctx).Debugf("Canonical chain validating block: %d", currentViewBlock.number)
freshBlockInfo, reason, err = bl.getBlockInfoByNumber(bl.ctx, currentViewBlock.number, false, "")
return reason != ffcapi.ErrorReasonNotFound, err
})
Expand All @@ -469,7 +474,7 @@ func (bl *blockListener) trimToLastValidBlock() (lastValidBlock *minimalBlockInf
}

if freshBlockInfo != nil && freshBlockInfo.Hash.String() == currentViewBlock.hash {
log.L(bl.ctx).Debugf("Canonical chain matches current chain up to block %d", currentViewBlock.number)
log.L(bl.ctx).Debugf("Canonical chain found last valid block %d", currentViewBlock.number)
lastValidBlock = currentViewBlock
// Trim everything after this point, as it's invalidated
nextElem := lastElem.Next()
Expand All @@ -481,7 +486,10 @@ func (bl *blockListener) trimToLastValidBlock() (lastValidBlock *minimalBlockInf
break
}
lastElem = lastElem.Prev()
}

if startingNumber != nil && lastValidBlock != nil && *startingNumber != lastValidBlock.number {
log.L(bl.ctx).Debugf("Canonical chain trimmed from block %d to block %d (total number of in memory blocks: %d)", startingNumber, lastValidBlock.number, bl.unstableHeadLength)
}
return lastValidBlock
}
Expand Down
6 changes: 6 additions & 0 deletions internal/ethereum/event_enricher.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
"context"

"github.com/hyperledger/firefly-common/pkg/fftypes"
"github.com/hyperledger/firefly-common/pkg/i18n"
"github.com/hyperledger/firefly-common/pkg/log"
"github.com/hyperledger/firefly-evmconnect/internal/msgs"
"github.com/hyperledger/firefly-signer/pkg/abi"
"github.com/hyperledger/firefly-signer/pkg/ethtypes"
"github.com/hyperledger/firefly-transaction-manager/pkg/ffcapi"
Expand Down Expand Up @@ -84,6 +86,10 @@ func (ee *eventEnricher) filterEnrichEthLog(ctx context.Context, f *eventFilter,
}
}

if blockNumber < 0 || transactionIndex < 0 || logIndex < 0 {
log.L(ctx).Errorf("Invalid block number, transaction index or log index for event '%s'", protoID)
return nil, matched, decoded, i18n.NewError(ctx, msgs.MsgInvalidProtocolID, protoID)
}
signature := f.Signature
return &ffcapi.Event{
ID: ffcapi.EventID{
Expand Down
32 changes: 32 additions & 0 deletions internal/ethereum/event_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,38 @@ func TestFilterEnrichEthLogMethodInputsOk(t *testing.T) {

}

func TestFilterEnrichEthLogInvalidNegativeID(t *testing.T) {

l, mRPC, _ := newTestListener(t, true)

var abiEvent *abi.Entry
err := json.Unmarshal([]byte(abiTransferEvent), &abiEvent)
assert.NoError(t, err)

mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", mock.MatchedBy(func(bh string) bool {
return bh == "0x6b012339fbb85b70c58ecfd97b31950c4a28bcef5226e12dbe551cb1abaf3b4c"
}), false).Return(nil).Run(func(args mock.Arguments) {
*args[1].(**blockInfoJSONRPC) = &blockInfoJSONRPC{
Number: ethtypes.NewHexInteger64(1024),
}
})
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getTransactionByHash", mock.MatchedBy(func(th ethtypes.HexBytes0xPrefix) bool {
return th.String() == "0x1a1f797ee000c529b6a2dd330cedd0d081417a30d16a4eecb3f863ab4657246f"
})).Return(nil).Run(func(args mock.Arguments) {
*args[1].(**txInfoJSONRPC) = &txInfoJSONRPC{
From: ethtypes.MustNewAddress("0x3968ef051b422d3d1cdc182a88bba8dd922e6fa4"),
Input: ethtypes.MustNewHexBytes0xPrefix("0xa9059cbb000000000000000000000000d0f2f5103fd050739a9fb567251bc460cc24d09100000000000000000000000000000000000000000000000000000000000003e8"),
}
}).Once()

ethLogWithNegativeLogIndex := sampleTransferLog()
ethLogWithNegativeLogIndex.LogIndex = ethtypes.NewHexInteger64(-1)
_, ok, err := l.filterEnrichEthLog(context.Background(), l.config.filters[0], l.config.options.Methods, ethLogWithNegativeLogIndex) // cache miss
assert.False(t, ok)
assert.Regexp(t, "FF23055", err)

}

func TestFilterEnrichEthLogMethodInputsTxInfoWithErr(t *testing.T) {

l, mRPC, _ := newTestListener(t, true)
Expand Down
1 change: 1 addition & 0 deletions internal/msgs/en_error_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,5 @@ var (
MsgUnableToCallDebug = ffe("FF23052", "Failed to call debug_traceTransaction to get error detail: %s")
MsgReturnValueNotDecoded = ffe("FF23053", "Error return value for custom error: %s")
MsgReturnValueNotAvailable = ffe("FF23054", "Error return value unavailable")
MsgInvalidProtocolID = ffe("FF23055", "Invalid protocol ID in event log: %s")
)

0 comments on commit b561f24

Please sign in to comment.