Skip to content

Commit

Permalink
fix: allow relaying based on first block (#18)
Browse files Browse the repository at this point in the history
  • Loading branch information
danwt authored Apr 24, 2024
1 parent ee2d9b1 commit 642d476
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 38 deletions.
18 changes: 14 additions & 4 deletions relayer/chains/cosmos/cosmos_chain_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ const (

defaultMinQueryLoopDuration = 1 * time.Second
defaultBalanceUpdateWaitDuration = 60 * time.Second
inSyncNumBlocksThreshold = 2
defaultInSyncNumBlocksThreshold = 2
blockMaxRetries = 5
)

Expand Down Expand Up @@ -201,6 +201,7 @@ func (ccp *CosmosChainProcessor) clientState(ctx context.Context, clientID strin

// queryCyclePersistence hold the variables that should be retained across queryCycles.
type queryCyclePersistence struct {
// the latest known height of the chain
latestHeight int64
latestQueriedBlock int64
retriesAtLatestQueriedBlock int
Expand Down Expand Up @@ -366,7 +367,7 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu
firstTimeInSync := false

if !ccp.inSync {
if (persistence.latestHeight - persistence.latestQueriedBlock) < inSyncNumBlocksThreshold {
if (persistence.latestHeight - persistence.latestQueriedBlock) < int64(defaultInSyncNumBlocksThreshold) {
ccp.inSync = true
firstTimeInSync = true
ccp.log.Info("Chain is in sync")
Expand All @@ -390,7 +391,14 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu

chainID := ccp.chainProvider.ChainId()

for i := persistence.latestQueriedBlock + 1; i <= persistence.latestHeight; i++ {
firstHeightToQuery := persistence.latestQueriedBlock
// On the first ever update, we want to make sure we propagate the block info to the path processor
// Afterward, we only want to query new blocks
if ccp.inSync && !firstTimeInSync {
firstHeightToQuery++
}

for i := firstHeightToQuery; i <= persistence.latestHeight; i++ {
var (
eg errgroup.Group
blockRes *coretypes.ResultBlockResults
Expand Down Expand Up @@ -491,7 +499,7 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu
}
}

if newLatestQueriedBlock == persistence.latestQueriedBlock {
if (ccp.inSync && !firstTimeInSync) && newLatestQueriedBlock == persistence.latestQueriedBlock {
return nil
}

Expand All @@ -516,6 +524,8 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu
continue
}

ccp.log.Debug("sending new data to the path processor", zap.Bool("inSync", ccp.inSync))

pp.HandleNewData(chainID, processor.ChainProcessorCacheData{
LatestBlock: ccp.latestBlock,
LatestHeader: latestHeader,
Expand Down
11 changes: 6 additions & 5 deletions relayer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ func (c *Chain) CreateClients(ctx context.Context,
customClientTrustingPeriod,
maxClockDrift time.Duration,
customClientTrustingPeriodPercentage int64,
memo string) (string, string, error) {
memo string,
) (string, string, error) {
// Query the latest heights on src and dst and retry if the query fails
var srch, dsth int64
if err := retry.Do(func() error {
Expand Down Expand Up @@ -64,7 +65,7 @@ func (c *Chain) CreateClients(ctx context.Context,
}

// overriding the unbonding period should only be possible when creating single clients at a time (CreateClient)
var overrideUnbondingPeriod = time.Duration(0)
overrideUnbondingPeriod := time.Duration(0)

var clientSrc, clientDst string
eg, egCtx := errgroup.WithContext(ctx)
Expand Down Expand Up @@ -126,7 +127,8 @@ func CreateClient(
overrideUnbondingPeriod,
maxClockDrift time.Duration,
customClientTrustingPeriodPercentage int64,
memo string) (string, error) {
memo string,
) (string, error) {
// If a client ID was specified in the path and override is not set, ensure the client exists.
if !override && src.PathEnd.ClientID != "" {
// TODO: check client is not expired
Expand Down Expand Up @@ -316,7 +318,7 @@ func MsgUpdateClient(
eg.Go(func() error {
return retry.Do(func() error {
var err error
dstTrustedHeader, err = src.ChainProvider.QueryIBCHeader(egCtx, int64(dstClientState.GetLatestHeight().GetRevisionHeight()))
dstTrustedHeader, err = src.ChainProvider.QueryIBCHeader(egCtx, int64(dstClientState.GetLatestHeight().GetRevisionHeight())+1)
return err
}, retry.Context(egCtx), RtyAtt, RtyDel, RtyErr, retry.OnRetry(func(n uint, err error) {
src.log.Info(
Expand Down Expand Up @@ -518,7 +520,6 @@ func findMatchingClient(ctx context.Context, src, dst *Chain, newClientState ibc

for _, existingClientState := range clientsResp {
clientID, err := provider.ClientsMatch(ctx, src.ChainProvider, dst.ChainProvider, existingClientState, newClientState)

// If there is an error parsing/type asserting the client state in ClientsMatch this is going
// to make the entire find matching client logic fail.
// We should really never be encountering an error here and if we do it is probably a sign of a
Expand Down
47 changes: 24 additions & 23 deletions relayer/processor/message_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,17 +104,21 @@ func (mp *messageProcessor) processMessages(
var err error
needsClientUpdate, err = mp.shouldUpdateClientNow(ctx, src, dst)
if err != nil {
return err
return fmt.Errorf("should update client now: %w", err)
}

if err := mp.assembleMsgUpdateClient(ctx, src, dst); err != nil {
return err
return fmt.Errorf("assemble message update client: %w", err)
}
}

mp.assembleMessages(ctx, messages, src, dst)

return mp.trackAndSendMessages(ctx, src, dst, needsClientUpdate)
if err := mp.trackAndSendMessages(ctx, src, dst, needsClientUpdate); err != nil {
return fmt.Errorf("track and send messages: %w", err)
}

return nil
}

func isLocalhostClient(srcClientID, dstClientID string) bool {
Expand All @@ -136,7 +140,7 @@ func (mp *messageProcessor) shouldUpdateClientNow(ctx context.Context, src, dst
if dst.clientState.ConsensusTime.IsZero() {
h, err := src.chainProvider.QueryIBCHeader(ctx, int64(dst.clientState.ConsensusHeight.RevisionHeight))
if err != nil {
return false, fmt.Errorf("failed to get header height: %w", err)
return false, fmt.Errorf("query ibc header: %w", err)
}
consensusHeightTime = time.Unix(0, int64(h.ConsensusState().GetTimestamp()))
} else {
Expand Down Expand Up @@ -166,16 +170,18 @@ func (mp *messageProcessor) shouldUpdateClientNow(ctx context.Context, src, dst
mp.metrics.SetClientTrustingPeriod(src.info.PathName, dst.info.ChainID, dst.info.ClientID, time.Duration(dst.clientState.TrustingPeriod))
}

if shouldUpdateClientNow {
mp.log.Info("Client update threshold condition met",
zap.String("path_name", src.info.PathName),
zap.String("chain_id", dst.info.ChainID),
zap.String("client_id", dst.info.ClientID),
zap.Int64("trusting_period", dst.clientState.TrustingPeriod.Milliseconds()),
zap.Int64("time_since_client_update", time.Since(consensusHeightTime).Milliseconds()),
zap.Int64("client_threshold_time", mp.clientUpdateThresholdTime.Milliseconds()),
)
}
mp.log.Debug("should update client now?",
zap.String("path_name", src.info.PathName),
zap.String("chain_id", dst.info.ChainID),
zap.String("client_id", dst.info.ClientID),
zap.Int64("trusting_period", dst.clientState.TrustingPeriod.Milliseconds()),
zap.Int64("time_since_client_update", time.Since(consensusHeightTime).Milliseconds()),
zap.Int64("client_threshold_time", mp.clientUpdateThresholdTime.Milliseconds()),
zap.Bool("enough_blocks_passed", enoughBlocksPassed),
zap.Bool("past_two_thirds_trusting_period", pastTwoThirdsTrustingPeriod),
zap.Bool("past_configured_client_update_threshold", pastConfiguredClientUpdateThreshold),
zap.Bool("should_update_client_now", shouldUpdateClientNow),
)

return shouldUpdateClientNow, nil
}
Expand Down Expand Up @@ -295,11 +301,6 @@ func (mp *messageProcessor) assembleMsgUpdateClient(ctx context.Context, src, ds
trustedNextValidatorsHash = header.NextValidatorsHash()
}

// As we only require one chain to be in sync the src.latestHeader may be nil. In that case
// we want to skip it
if src.latestHeader == nil {
return fmt.Errorf("latest header is nil for chain_id: %s. Waiting for catching up", src.info.ChainID)
}
if src.latestHeader.Height() == trustedConsensusHeight.RevisionHeight &&
!bytes.Equal(src.latestHeader.NextValidatorsHash(), trustedNextValidatorsHash) {
return fmt.Errorf("latest header height is equal to the client trusted height: %d, "+
Expand All @@ -313,12 +314,12 @@ func (mp *messageProcessor) assembleMsgUpdateClient(ctx context.Context, src, ds
dst.clientTrustedState.IBCHeader,
)
if err != nil {
return fmt.Errorf("error assembling new client header: %w", err)
return fmt.Errorf("msg update client header: %w", err)
}

msgUpdateClient, err := dst.chainProvider.MsgUpdateClient(clientID, msgUpdateClientHeader)
if err != nil {
return fmt.Errorf("error assembling MsgUpdateClient: %w", err)
return fmt.Errorf("msg update client: %w", err)
}

mp.msgUpdateClient = msgUpdateClient
Expand Down Expand Up @@ -475,7 +476,7 @@ func (mp *messageProcessor) sendBatchMessages(
}
callbacks := []func(rtr *provider.RelayerTxResponse, err error){callback}

//During testing, this adds a callback so our test case can inspect the TX results
// During testing, this adds a callback so our test case can inspect the TX results
if PathProcMessageCollector != nil {
testCallback := func(rtr *provider.RelayerTxResponse, err error) {
msgResult := &PathProcessorMessageResp{
Expand Down Expand Up @@ -562,7 +563,7 @@ func (mp *messageProcessor) sendSingleMessage(

callbacks = append(callbacks, callback)

//During testing, this adds a callback so our test case can inspect the TX results
// During testing, this adds a callback so our test case can inspect the TX results
if PathProcMessageCollector != nil {
testCallback := func(rtr *provider.RelayerTxResponse, err error) {
msgResult := &PathProcessorMessageResp{
Expand Down
7 changes: 7 additions & 0 deletions relayer/processor/path_end_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,10 +472,17 @@ func (pathEnd *pathEndRuntime) mergeCacheData(
memoLimit, maxReceiverSize int,
) {
pathEnd.lastClientUpdateHeightMu.Lock()
var zeroType provider.LatestBlock
if d.LatestBlock == zeroType {
// sanity check
panic("received zero type latest block")
}
pathEnd.latestBlock = d.LatestBlock
pathEnd.lastClientUpdateHeightMu.Unlock()

pathEnd.inSync = d.InSync
pathEnd.log.Debug("set in sync", zap.Bool("in_sync", pathEnd.inSync), zap.String("chain_id", pathEnd.info.ChainID))

pathEnd.latestHeader = d.LatestHeader
pathEnd.clientState = d.ClientState

Expand Down
6 changes: 4 additions & 2 deletions relayer/processor/path_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ func (pp *PathProcessor) processAvailableSignals(ctx context.Context, cancel fun
func (pp *PathProcessor) Run(ctx context.Context, cancel func()) {
var retryTimer *time.Timer

pp.flushTimer = time.NewTimer(time.Hour)
pp.flushTimer = time.NewTimer(pp.flushInterval)

for {
// block until we have any signals to process
Expand All @@ -406,7 +406,8 @@ func (pp *PathProcessor) Run(ctx context.Context, cancel func()) {
}
}

if !pp.pathEnd1.inSync && !pp.pathEnd2.inSync {
pp.log.Debug("path processor run: are the chains in sync? ", zap.Bool("pathEnd1", pp.pathEnd1.inSync), zap.Bool("pathEnd2", pp.pathEnd2.inSync))
if !pp.pathEnd1.inSync || !pp.pathEnd2.inSync {
continue
}

Expand All @@ -420,6 +421,7 @@ func (pp *PathProcessor) Run(ctx context.Context, cancel func()) {

// process latest message cache state from both pathEnds
if err := pp.processLatestMessages(ctx, cancel); err != nil {
pp.log.Debug("error process latest messages", zap.Error(err))
// in case of IBC message send errors, schedule retry after durationErrorRetry
if retryTimer != nil {
retryTimer.Stop()
Expand Down
12 changes: 8 additions & 4 deletions relayer/processor/path_processor_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -1069,11 +1069,17 @@ func (pp *PathProcessor) processLatestMessages(ctx context.Context, cancel func(
var eg errgroup.Group
eg.Go(func() error {
mp := newMessageProcessor(pp.log, pp.metrics, pp.memo, pp.clientUpdateThresholdTime, pp.isLocalhost)
return mp.processMessages(ctx, pathEnd1Messages, pp.pathEnd2, pp.pathEnd1)
if err := mp.processMessages(ctx, pathEnd1Messages, pp.pathEnd2, pp.pathEnd1); err != nil {
return fmt.Errorf("process path end 1 messages: %w", err)
}
return nil
})
eg.Go(func() error {
mp := newMessageProcessor(pp.log, pp.metrics, pp.memo, pp.clientUpdateThresholdTime, pp.isLocalhost)
return mp.processMessages(ctx, pathEnd2Messages, pp.pathEnd1, pp.pathEnd2)
if err := mp.processMessages(ctx, pathEnd2Messages, pp.pathEnd1, pp.pathEnd2); err != nil {
return fmt.Errorf("process path end 2 messages: %w", err)
}
return nil
})
return eg.Wait()
}
Expand Down Expand Up @@ -1207,7 +1213,6 @@ func (pp *PathProcessor) queuePendingRecvAndAcks(
srcMu sync.Locker,
dstMu sync.Locker,
) (*skippedPackets, error) {

if len(seqs) == 0 {
src.log.Debug("Nothing to flush", zap.String("channel", k.ChannelID), zap.String("port", k.PortID))
if pp.metrics != nil {
Expand Down Expand Up @@ -1502,7 +1507,6 @@ func (pp *PathProcessor) flush(ctx context.Context) error {
&pathEnd2CacheMu,
&pathEnd1CacheMu,
)

if err != nil {
return err
}
Expand Down

0 comments on commit 642d476

Please sign in to comment.