diff --git a/pkg/exchange/max/maxapi/userdata.go b/pkg/exchange/max/maxapi/userdata.go index bb804449c4..644b93f668 100644 --- a/pkg/exchange/max/maxapi/userdata.go +++ b/pkg/exchange/max/maxapi/userdata.go @@ -227,7 +227,7 @@ func ParseUserEvent(v *fastjson.Value) (interface{}, error) { case "trade_snapshot", "mwallet_trade_snapshot": return parseTradeSnapshotEvent(v) - case "trade_update", "trade_fast_update", "mwallet_trade_update": + case "trade_update", "trade_fast_update", "mwallet_trade_update", "mwallet_trade_fast_update": return parseTradeUpdateEvent(v) case "ad_ratio_snapshot", "ad_ratio_update": diff --git a/pkg/exchange/max/stream.go b/pkg/exchange/max/stream.go index 2dc10bfb2c..141d551c2b 100644 --- a/pkg/exchange/max/stream.go +++ b/pkg/exchange/max/stream.go @@ -16,6 +16,90 @@ import ( "github.com/c9s/bbgo/pkg/types" ) +type PrivateChannel string + +const ( + PrivateChannelOrder PrivateChannel = "order" + PrivateChannelOrderUpdate PrivateChannel = "order_update" + PrivateChannelTrade PrivateChannel = "trade" + PrivateChannelTradeUpdate PrivateChannel = "trade_update" + PrivateChannelTradeFastUpdate PrivateChannel = "trade_fast_update" + PrivateChannelAccount PrivateChannel = "account" + PrivateChannelAccountUpdate PrivateChannel = "account_update" + + PrivateChannelAveragePrice PrivateChannel = "average_price" + PrivateChannelFavoriteMarket PrivateChannel = "favorite_market" + + // @group Margin + PrivateChannelMWalletOrder PrivateChannel = "mwallet_order" + PrivateChannelMWalletTrade PrivateChannel = "mwallet_trade" + PrivateChannelMWalletTradeFastUpdate PrivateChannel = "mwallet_trade_fast_update" + PrivateChannelMWalletAccount PrivateChannel = "mwallet_account" + PrivateChannelMWalletAveragePrice PrivateChannel = "mwallet_average_price" + PrivateChannelBorrowing PrivateChannel = "borrowing" + PrivateChannelAdRatio PrivateChannel = "ad_ratio" + PrivateChannelPoolQuota PrivateChannel = "borrowing_pool_quota" +) + +// PrivateChannelStrings converts a slice of PrivateChannel to a slice of string +func PrivateChannelStrings(slice []PrivateChannel) (out []string) { + for _, el := range slice { + out = append(out, string(el)) + } + + return out +} + +// PrivateChannelKeys converts a map of PrivateChannel to a slice of PrivateChannel +func PrivateChannelKeys(values map[PrivateChannel]struct{}) (slice []PrivateChannel) { + for k := range values { + slice = append(slice, k) + } + + return slice +} + +// ValidatePrivateChannel validates the private channel +func ValidatePrivateChannel(ch PrivateChannel) bool { + _, ok := AllPrivateChannels[ch] + return ok +} + +var defaultSpotPrivateChannels = []PrivateChannel{ + PrivateChannelOrder, + PrivateChannelTrade, + PrivateChannelAccount, +} + +var AllMarginPrivateChannels = map[PrivateChannel]struct{}{ + PrivateChannelMWalletOrder: {}, + PrivateChannelMWalletTrade: {}, + PrivateChannelMWalletAccount: {}, + PrivateChannelMWalletAveragePrice: {}, + PrivateChannelBorrowing: {}, + PrivateChannelAdRatio: {}, + PrivateChannelPoolQuota: {}, +} + +var AllPrivateChannels = map[PrivateChannel]struct{}{ + PrivateChannelOrder: {}, + PrivateChannelOrderUpdate: {}, + PrivateChannelTrade: {}, + PrivateChannelTradeUpdate: {}, + PrivateChannelTradeFastUpdate: {}, + PrivateChannelAccount: {}, + PrivateChannelAccountUpdate: {}, + PrivateChannelAveragePrice: {}, + PrivateChannelFavoriteMarket: {}, + PrivateChannelMWalletOrder: {}, + PrivateChannelMWalletTrade: {}, + PrivateChannelMWalletAccount: {}, + PrivateChannelMWalletAveragePrice: {}, + PrivateChannelBorrowing: {}, + PrivateChannelAdRatio: {}, + PrivateChannelPoolQuota: {}, +} + //go:generate callbackgen -type Stream type Stream struct { types.StandardStream @@ -42,6 +126,8 @@ type Stream struct { accountSnapshotEventCallbacks []func(e max.AccountSnapshotEvent) accountUpdateEventCallbacks []func(e max.AccountUpdateEvent) + fastTradeEnabled bool + // depthBuffers is used for storing the depth info depthBuffers map[string]*depth.Buffer } @@ -83,6 +169,33 @@ func (s *Stream) getEndpoint(ctx context.Context) (string, error) { } func (s *Stream) SetPrivateChannels(channels []string) { + // validate channels + tradeUpdate := 0 + fastTrade := false + for _, chstr := range channels { + ch := PrivateChannel(chstr) + if _, ok := AllPrivateChannels[ch]; !ok { + log.Errorf("invalid user data stream channel: %s", ch) + } + + switch ch { + case PrivateChannelTradeFastUpdate, PrivateChannelTradeUpdate: + tradeUpdate++ + if ch == PrivateChannelTradeFastUpdate { + fastTrade = true + } + } + } + + if tradeUpdate > 1 { + log.Errorf("you can only subscribe to one trade update channel, there are %d trade update channels", tradeUpdate) + } + + if fastTrade { + log.Infof("fast trade update is enabled") + } + + s.fastTradeEnabled = fastTrade s.privateChannels = channels } @@ -130,27 +243,16 @@ func (s *Stream) handleConnect() { var filters []string if len(s.privateChannels) > 0 { - // TODO: maybe check the valid private channels filters = s.privateChannels } else { if s.MarginSettings.IsMargin { - filters = []string{ - "mwallet_order", - "mwallet_trade", - "mwallet_account", - "ad_ratio", - "borrowing", - } + filters = PrivateChannelStrings(PrivateChannelKeys(AllMarginPrivateChannels)) } else { - filters = []string{ - "order", - "trade", - "account", - } + filters = PrivateChannelStrings(defaultSpotPrivateChannels) } } - log.Debugf("user data websocket filters: %v", filters) + log.Debugf("user data websocket channels: %v", filters) nonce := time.Now().UnixNano() / int64(time.Millisecond) auth := &max.AuthMessage{