Skip to content

Commit

Permalink
Merge pull request #1790 from c9s/c9s/xmaker/check-connectivity
Browse files Browse the repository at this point in the history
FIX: [xmaker] check connectivity before calling updateQuote
  • Loading branch information
c9s authored Oct 24, 2024
2 parents 738cb24 + 9f7521b commit 95ad164
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 4 deletions.
2 changes: 2 additions & 0 deletions pkg/exchange/binance/binanceapi/transfer_asset_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ const (
TransferAssetTypeIsolatedMarginToMain TransferAssetType = "ISOLATED_MARGIN_MAIN"
)

// User Universal Transfer (USER_DATA)
//
//go:generate requestgen -method POST -url "/sapi/v1/asset/transfer" -type TransferAssetRequest -responseType .TransferResponse
type TransferAssetRequest struct {
client requestgen.AuthenticatedAPIClient
Expand Down
17 changes: 13 additions & 4 deletions pkg/strategy/xmaker/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,8 @@ type Strategy struct {

metricsLabels prometheus.Labels

connectivityGroup *types.ConnectivityGroup
sourceMarketDataConnectivity, sourceUserDataConnectivity *types.Connectivity
connectivityGroup *types.ConnectivityGroup

// lastAggregatedSignal stores the last aggregated signal with mutex
// TODO: use float64 series instead, so that we can store history signal values
Expand Down Expand Up @@ -599,6 +600,10 @@ func (s *Strategy) updateQuote(ctx context.Context) error {
return nil
}

if !s.sourceMarketDataConnectivity.IsConnected() || !s.sourceUserDataConnectivity.IsConnected() {
return nil
}

signal, err := s.aggregateSignal(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -1442,6 +1447,7 @@ func (s *Strategy) quoteWorker(ctx context.Context) {
return

case <-ticker.C:

if err := s.updateQuote(ctx); err != nil {
s.logger.WithError(err).Errorf("unable to place maker orders")
}
Expand Down Expand Up @@ -1810,10 +1816,13 @@ func (s *Strategy) CrossRun(

s.stopC = make(chan struct{})

sourceConnectivity := types.NewConnectivity()
sourceConnectivity.Bind(s.sourceSession.UserDataStream)
s.sourceUserDataConnectivity = types.NewConnectivity()
s.sourceUserDataConnectivity.Bind(s.sourceSession.UserDataStream)

s.sourceMarketDataConnectivity = types.NewConnectivity()
s.sourceMarketDataConnectivity.Bind(s.sourceSession.MarketDataStream)

s.connectivityGroup = types.NewConnectivityGroup(sourceConnectivity)
s.connectivityGroup = types.NewConnectivityGroup(s.sourceUserDataConnectivity)

go func() {
s.logger.Infof("waiting for authentication connections to be ready...")
Expand Down
14 changes: 14 additions & 0 deletions pkg/types/connectivity.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,20 @@ func NewConnectivity() *Connectivity {
}
}

func (c *Connectivity) IsConnected() (conn bool) {
c.mu.Lock()
conn = c.connected
c.mu.Unlock()
return conn
}

func (c *Connectivity) IsAuthed() (authed bool) {
c.mu.Lock()
authed = c.authed
c.mu.Unlock()
return authed
}

func (c *Connectivity) handleConnect() {
c.mu.Lock()
defer c.mu.Unlock()
Expand Down

0 comments on commit 95ad164

Please sign in to comment.