Skip to content

Commit

Permalink
Merge pull request #1841 from c9s/c9s/add-max-channels
Browse files Browse the repository at this point in the history
FEATURE: [max] define channels
  • Loading branch information
c9s authored Nov 28, 2024
2 parents 1a8820d + 80f4535 commit ed2f1d2
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 15 deletions.
2 changes: 1 addition & 1 deletion pkg/exchange/max/maxapi/userdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func ParseUserEvent(v *fastjson.Value) (interface{}, error) {
case "trade_snapshot", "mwallet_trade_snapshot":
return parseTradeSnapshotEvent(v)

case "trade_update", "trade_fast_update", "mwallet_trade_update":
case "trade_update", "trade_fast_update", "mwallet_trade_update", "mwallet_trade_fast_update":
return parseTradeUpdateEvent(v)

case "ad_ratio_snapshot", "ad_ratio_update":
Expand Down
130 changes: 116 additions & 14 deletions pkg/exchange/max/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,90 @@ import (
"github.com/c9s/bbgo/pkg/types"
)

type PrivateChannel string

const (
PrivateChannelOrder PrivateChannel = "order"
PrivateChannelOrderUpdate PrivateChannel = "order_update"
PrivateChannelTrade PrivateChannel = "trade"
PrivateChannelTradeUpdate PrivateChannel = "trade_update"
PrivateChannelTradeFastUpdate PrivateChannel = "trade_fast_update"
PrivateChannelAccount PrivateChannel = "account"
PrivateChannelAccountUpdate PrivateChannel = "account_update"

PrivateChannelAveragePrice PrivateChannel = "average_price"
PrivateChannelFavoriteMarket PrivateChannel = "favorite_market"

// @group Margin
PrivateChannelMWalletOrder PrivateChannel = "mwallet_order"
PrivateChannelMWalletTrade PrivateChannel = "mwallet_trade"
PrivateChannelMWalletTradeFastUpdate PrivateChannel = "mwallet_trade_fast_update"
PrivateChannelMWalletAccount PrivateChannel = "mwallet_account"
PrivateChannelMWalletAveragePrice PrivateChannel = "mwallet_average_price"
PrivateChannelBorrowing PrivateChannel = "borrowing"
PrivateChannelAdRatio PrivateChannel = "ad_ratio"
PrivateChannelPoolQuota PrivateChannel = "borrowing_pool_quota"
)

// PrivateChannelStrings converts a slice of PrivateChannel to a slice of string
func PrivateChannelStrings(slice []PrivateChannel) (out []string) {
for _, el := range slice {
out = append(out, string(el))
}

return out
}

// PrivateChannelKeys converts a map of PrivateChannel to a slice of PrivateChannel
func PrivateChannelKeys(values map[PrivateChannel]struct{}) (slice []PrivateChannel) {
for k := range values {
slice = append(slice, k)
}

return slice
}

// ValidatePrivateChannel validates the private channel
func ValidatePrivateChannel(ch PrivateChannel) bool {
_, ok := AllPrivateChannels[ch]
return ok
}

var defaultSpotPrivateChannels = []PrivateChannel{
PrivateChannelOrder,
PrivateChannelTrade,
PrivateChannelAccount,
}

var AllMarginPrivateChannels = map[PrivateChannel]struct{}{
PrivateChannelMWalletOrder: {},
PrivateChannelMWalletTrade: {},
PrivateChannelMWalletAccount: {},
PrivateChannelMWalletAveragePrice: {},
PrivateChannelBorrowing: {},
PrivateChannelAdRatio: {},
PrivateChannelPoolQuota: {},
}

var AllPrivateChannels = map[PrivateChannel]struct{}{
PrivateChannelOrder: {},
PrivateChannelOrderUpdate: {},
PrivateChannelTrade: {},
PrivateChannelTradeUpdate: {},
PrivateChannelTradeFastUpdate: {},
PrivateChannelAccount: {},
PrivateChannelAccountUpdate: {},
PrivateChannelAveragePrice: {},
PrivateChannelFavoriteMarket: {},
PrivateChannelMWalletOrder: {},
PrivateChannelMWalletTrade: {},
PrivateChannelMWalletAccount: {},
PrivateChannelMWalletAveragePrice: {},
PrivateChannelBorrowing: {},
PrivateChannelAdRatio: {},
PrivateChannelPoolQuota: {},
}

//go:generate callbackgen -type Stream
type Stream struct {
types.StandardStream
Expand All @@ -42,6 +126,8 @@ type Stream struct {
accountSnapshotEventCallbacks []func(e max.AccountSnapshotEvent)
accountUpdateEventCallbacks []func(e max.AccountUpdateEvent)

fastTradeEnabled bool

// depthBuffers is used for storing the depth info
depthBuffers map[string]*depth.Buffer
}
Expand Down Expand Up @@ -83,6 +169,33 @@ func (s *Stream) getEndpoint(ctx context.Context) (string, error) {
}

func (s *Stream) SetPrivateChannels(channels []string) {
// validate channels
tradeUpdate := 0
fastTrade := false
for _, chstr := range channels {
ch := PrivateChannel(chstr)
if _, ok := AllPrivateChannels[ch]; !ok {
log.Errorf("invalid user data stream channel: %s", ch)
}

switch ch {
case PrivateChannelTradeFastUpdate, PrivateChannelTradeUpdate:
tradeUpdate++
if ch == PrivateChannelTradeFastUpdate {
fastTrade = true
}
}
}

if tradeUpdate > 1 {
log.Errorf("you can only subscribe to one trade update channel, there are %d trade update channels", tradeUpdate)
}

if fastTrade {
log.Infof("fast trade update is enabled")
}

s.fastTradeEnabled = fastTrade
s.privateChannels = channels
}

Expand Down Expand Up @@ -130,27 +243,16 @@ func (s *Stream) handleConnect() {
var filters []string

if len(s.privateChannels) > 0 {
// TODO: maybe check the valid private channels
filters = s.privateChannels
} else {
if s.MarginSettings.IsMargin {
filters = []string{
"mwallet_order",
"mwallet_trade",
"mwallet_account",
"ad_ratio",
"borrowing",
}
filters = PrivateChannelStrings(PrivateChannelKeys(AllMarginPrivateChannels))
} else {
filters = []string{
"order",
"trade",
"account",
}
filters = PrivateChannelStrings(defaultSpotPrivateChannels)
}
}

log.Debugf("user data websocket filters: %v", filters)
log.Debugf("user data websocket channels: %v", filters)

nonce := time.Now().UnixNano() / int64(time.Millisecond)
auth := &max.AuthMessage{
Expand Down

0 comments on commit ed2f1d2

Please sign in to comment.