diff --git a/pkg/bbgo/session.go b/pkg/bbgo/session.go index a347a75644..dd3b275d57 100644 --- a/pkg/bbgo/session.go +++ b/pkg/bbgo/session.go @@ -87,8 +87,9 @@ type ExchangeSession struct { UserDataStream types.Stream `json:"-" yaml:"-"` MarketDataStream types.Stream `json:"-" yaml:"-"` - UserDataConnectivity *types.Connectivity `json:"-" yaml:"-"` - MarketDataConnectivity *types.Connectivity `json:"-" yaml:"-"` + UserDataConnectivity *types.Connectivity `json:"-" yaml:"-"` + MarketDataConnectivity *types.Connectivity `json:"-" yaml:"-"` + Connectivity *types.ConnectivityGroup `json:"-" yaml:"-"` // Subscriptions // this is a read-only field when running strategy @@ -142,6 +143,8 @@ func NewExchangeSession(name string, exchange types.Exchange) *ExchangeSession { marketDataConnectivity := types.NewConnectivity() marketDataConnectivity.Bind(marketDataStream) + connectivityGroup := types.NewConnectivityGroup(marketDataConnectivity, userDataConnectivity) + session := &ExchangeSession{ Name: name, Exchange: exchange, @@ -152,6 +155,8 @@ func NewExchangeSession(name string, exchange types.Exchange) *ExchangeSession { MarketDataStream: marketDataStream, MarketDataConnectivity: marketDataConnectivity, + Connectivity: connectivityGroup, + Subscriptions: make(map[types.Subscription]types.Subscription), Account: &types.Account{}, Trades: make(map[string]*types.TradeSlice), diff --git a/pkg/strategy/xmaker/strategy.go b/pkg/strategy/xmaker/strategy.go index be9106efa7..b2ef73f76f 100644 --- a/pkg/strategy/xmaker/strategy.go +++ b/pkg/strategy/xmaker/strategy.go @@ -234,7 +234,6 @@ type Strategy struct { metricsLabels prometheus.Labels sourceMarketDataConnectivity, sourceUserDataConnectivity *types.Connectivity - makerMarketDataConnectivity, makerUserDataConnectivity *types.Connectivity connectivityGroup *types.ConnectivityGroup // lastAggregatedSignal stores the last aggregated signal with mutex @@ -591,7 +590,13 @@ func (s *Strategy) updateQuote(ctx context.Context) error { return nil } - if !s.sourceMarketDataConnectivity.IsConnected() || !s.sourceUserDataConnectivity.IsConnected() { + if !s.sourceSession.Connectivity.IsConnected() { + s.logger.Warnf("source session is disconnected, skipping update quote") + return nil + } + + if !s.makerSession.Connectivity.IsConnected() { + s.logger.Warnf("maker session is disconnected, skipping update quote") return nil } @@ -1846,16 +1851,10 @@ func (s *Strategy) CrossRun( s.stopC = make(chan struct{}) - s.sourceUserDataConnectivity = types.NewConnectivity() - s.sourceUserDataConnectivity.Bind(s.sourceSession.UserDataStream) - - s.makerUserDataConnectivity = types.NewConnectivity() - s.makerUserDataConnectivity.Bind(s.makerSession.UserDataStream) - - s.sourceMarketDataConnectivity = types.NewConnectivity() - s.sourceMarketDataConnectivity.Bind(s.sourceSession.MarketDataStream) + s.sourceUserDataConnectivity = s.sourceSession.UserDataConnectivity + s.sourceMarketDataConnectivity = s.sourceSession.MarketDataConnectivity - s.connectivityGroup = types.NewConnectivityGroup(s.sourceUserDataConnectivity, s.makerUserDataConnectivity) + s.connectivityGroup = types.NewConnectivityGroup(s.sourceSession.UserDataConnectivity, s.makerSession.UserDataConnectivity) go func() { s.logger.Infof("waiting for authentication connections to be ready...")