Skip to content

Commit

Permalink
fix: reconnect sc event loop (#266)
Browse files Browse the repository at this point in the history
* fix: reconnect sc event loop
  • Loading branch information
hunjixin authored Aug 5, 2024
1 parent 79c0272 commit 436363c
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 69 deletions.
5 changes: 2 additions & 3 deletions pkg/http/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"net/http"
"net/url"
"strings"
"time"

"github.com/hashicorp/go-retryablehttp"
"github.com/lilypad-tech/lilypad/pkg/web3"
Expand Down Expand Up @@ -324,8 +323,8 @@ func GetRequestBuffer(

func GenericJSONPostClient(url string, json string) (*http.Response, error) {
data := []byte(json)
client := &http.Client{Timeout: time.Second * 1}
req, err := http.NewRequest("POST", url, bytes.NewBuffer(data))
client := newRetryClient()
req, err := retryablehttp.NewRequest("POST", url, bytes.NewBuffer(data))
if err != nil {
fmt.Printf("error setting up the request: %s", err)
return nil, err
Expand Down
11 changes: 8 additions & 3 deletions pkg/web3/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package web3

import (
"context"
"time"

"github.com/lilypad-tech/lilypad/pkg/system"
"github.com/rs/zerolog/log"
Expand Down Expand Up @@ -51,9 +52,13 @@ func (eventChannels *EventChannels) Start(
for _, collection := range eventChannels.collections {
c := collection
go func() {
err := c.Start(sdk, ctx, cm)
if err != nil {
log.Error().Msgf("error starting listeners: %s", err.Error())
for {
err := c.Start(sdk, ctx, cm)
if err != nil {
log.Error().Msgf("error starting listeners: %s reconnect in 2 seconds", err.Error())
}

time.Sleep(time.Second * 2)
}
}()
}
Expand Down
22 changes: 11 additions & 11 deletions pkg/web3/events_jobcreator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package web3

import (
"context"
"fmt"

"github.com/lilypad-tech/lilypad/pkg/system"
"github.com/lilypad-tech/lilypad/pkg/web3/bindings/jobcreator"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/event"
"github.com/lilypad-tech/lilypad/pkg/system"
"github.com/lilypad-tech/lilypad/pkg/web3/bindings/jobcreator"
"github.com/rs/zerolog/log"
)

Expand Down Expand Up @@ -49,26 +50,25 @@ func (s *JobCreatorEventChannels) Start(
return err
}

go func() {
<-ctx.Done()
jobAddedSub.Unsubscribe()
defer func() {
if jobAddedSub != nil {
jobAddedSub.Unsubscribe()
}
}()

for {
select {
case <-ctx.Done():
return fmt.Errorf("cancel by context")
case event := <-s.jobAddedChan:
log.Debug().
Str("storage->event", "DealStateChange").
Str("storage->event", "JobAdded").
Msgf("%+v", event)
for _, handler := range s.jobAddedSubs {
go handler(*event)
}
case err := <-jobAddedSub.Err():
jobAddedSub.Unsubscribe()
jobAddedSub, err = connectJobAddedSub()
if err != nil {
return err
}
return fmt.Errorf("cancel by job JobAdded event subscribe error %w", err)
}
}
}
Expand Down
20 changes: 10 additions & 10 deletions pkg/web3/events_mediation.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package web3

import (
"context"
"fmt"

"github.com/lilypad-tech/lilypad/pkg/system"
"github.com/lilypad-tech/lilypad/pkg/web3/bindings/mediation"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/event"
"github.com/lilypad-tech/lilypad/pkg/system"
"github.com/lilypad-tech/lilypad/pkg/web3/bindings/mediation"
"github.com/rs/zerolog/log"
)

Expand Down Expand Up @@ -49,13 +50,16 @@ func (m *MediationEventChannels) Start(
return err
}

go func() {
<-ctx.Done()
mediationRequestedSub.Unsubscribe()
defer func() {
if mediationRequestedSub != nil {
mediationRequestedSub.Unsubscribe()
}
}()

for {
select {
case <-ctx.Done():
return fmt.Errorf("cancel by context")
case event := <-m.mediationRequestedChan:
log.Debug().
Str("mediation->event", "MediationRequested").
Expand All @@ -64,11 +68,7 @@ func (m *MediationEventChannels) Start(
go handler(*event)
}
case err := <-mediationRequestedSub.Err():
mediationRequestedSub.Unsubscribe()
mediationRequestedSub, err = connectMediationRequestedSub()
if err != nil {
return err
}
return fmt.Errorf("cancel by mediation MediationRequested event subscribe error %w", err)
}
}
}
Expand Down
21 changes: 10 additions & 11 deletions pkg/web3/events_payments.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package web3

import (
"context"
"fmt"

"github.com/lilypad-tech/lilypad/pkg/system"
"github.com/lilypad-tech/lilypad/pkg/web3/bindings/payments"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/event"
"github.com/lilypad-tech/lilypad/pkg/system"
"github.com/lilypad-tech/lilypad/pkg/web3/bindings/payments"
"github.com/rs/zerolog/log"
)

Expand Down Expand Up @@ -49,13 +50,15 @@ func (p *PaymentEventChannels) Start(
return err
}

go func() {
<-ctx.Done()
paymentSub.Unsubscribe()
defer func() {
if paymentSub != nil {
paymentSub.Unsubscribe()
}
}()

for {
select {
case <-ctx.Done():
return fmt.Errorf("cancel by context")
case event := <-p.paymentChan:
log.Debug().
Str("payments->event", "Payment").
Expand All @@ -64,11 +67,7 @@ func (p *PaymentEventChannels) Start(
go handler(*event)
}
case err := <-paymentSub.Err():
paymentSub.Unsubscribe()
paymentSub, err = connectPaymentSub()
if err != nil {
return err
}
return fmt.Errorf("cancel by mediation MediationRequested event subscribe error %w", err)
}
}
}
Expand Down
22 changes: 11 additions & 11 deletions pkg/web3/events_pow.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package web3

import (
"context"
"fmt"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/event"
Expand Down Expand Up @@ -36,8 +37,8 @@ func (s *PowEventChannels) Start(

connectnewPowRoundSub := func() (event.Subscription, error) {
log.Debug().
Str("jobcreator->connect", "newPowRound").
Msgf("")
Str("pow->connect", "newPowRound").
Msgf("start to watch new pow round")
return sdk.Contracts.Pow.WatchNewPowRound(
&bind.WatchOpts{Start: &blockNumber, Context: ctx},
s.newPowRoundChan,
Expand All @@ -49,26 +50,25 @@ func (s *PowEventChannels) Start(
return err
}

go func() {
<-ctx.Done()
newPowRoundSub.Unsubscribe()
defer func() {
if newPowRoundSub != nil {
newPowRoundSub.Unsubscribe()
}
}()

for {
select {
case <-ctx.Done():
return fmt.Errorf("cancel by context")
case event := <-s.newPowRoundChan:
log.Debug().
Str("pow->event", "PowNewPowRound").
Msgf("%+v", event)
for _, handler := range s.newPowRoundSubs {
go handler(*event)
}
case <-newPowRoundSub.Err():
newPowRoundSub.Unsubscribe()
newPowRoundSub, err = connectnewPowRoundSub()
if err != nil {
return err
}
case err := <-newPowRoundSub.Err():
return fmt.Errorf("cancel by pow newPowRound event subscribe error %w", err)
}
}
}
Expand Down
20 changes: 10 additions & 10 deletions pkg/web3/events_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package web3

import (
"context"
"fmt"

"github.com/lilypad-tech/lilypad/pkg/system"
"github.com/lilypad-tech/lilypad/pkg/web3/bindings/storage"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/event"
"github.com/lilypad-tech/lilypad/pkg/system"
"github.com/lilypad-tech/lilypad/pkg/web3/bindings/storage"
"github.com/rs/zerolog/log"
)

Expand Down Expand Up @@ -49,13 +50,16 @@ func (s *StorageEventChannels) Start(
return err
}

go func() {
<-ctx.Done()
dealStateChangeSub.Unsubscribe()
defer func() {
if dealStateChangeSub != nil {
dealStateChangeSub.Unsubscribe()
}
}()

for {
select {
case <-ctx.Done():
return fmt.Errorf("cancel by context")
case event := <-s.dealStateChangeChan:
log.Debug().
Str("storage->event", "DealStateChange").
Expand All @@ -64,11 +68,7 @@ func (s *StorageEventChannels) Start(
go handler(*event)
}
case err := <-dealStateChangeSub.Err():
dealStateChangeSub.Unsubscribe()
dealStateChangeSub, err = connectDealStateChangeSub()
if err != nil {
return err
}
return fmt.Errorf("cancel by storage DealStateChange event subscribe error %w", err)
}
}
}
Expand Down
20 changes: 10 additions & 10 deletions pkg/web3/events_token.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package web3

import (
"context"
"fmt"

"github.com/lilypad-tech/lilypad/pkg/system"
"github.com/lilypad-tech/lilypad/pkg/web3/bindings/token"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/event"
"github.com/lilypad-tech/lilypad/pkg/system"
"github.com/lilypad-tech/lilypad/pkg/web3/bindings/token"
"github.com/rs/zerolog/log"
)

Expand Down Expand Up @@ -52,13 +53,16 @@ func (t *TokenEventChannels) Start(
return err
}

go func() {
<-ctx.Done()
transferSub.Unsubscribe()
defer func() {
if transferSub != nil {
transferSub.Unsubscribe()
}
}()

for {
select {
case <-ctx.Done():
return fmt.Errorf("cancel by context")
case event := <-t.transferChan:
log.Debug().
Str("token->event", "Transfer").
Expand All @@ -67,11 +71,7 @@ func (t *TokenEventChannels) Start(
go handler(*event)
}
case err := <-transferSub.Err():
transferSub.Unsubscribe()
transferSub, err = connectTransferSub()
if err != nil {
return err
}
return fmt.Errorf("cancel by token Transfer event subscribe error %w", err)
}
}
}
Expand Down

0 comments on commit 436363c

Please sign in to comment.