diff --git a/pkg/exchange/okex/convert.go b/pkg/exchange/okex/convert.go index 742ad56f3a..f1889bb0af 100644 --- a/pkg/exchange/okex/convert.go +++ b/pkg/exchange/okex/convert.go @@ -55,9 +55,9 @@ func toGlobalBalance(account *okexapi.Account) types.BalanceMap { } type WebsocketSubscription struct { - Channel string `json:"channel"` - InstrumentID string `json:"instId,omitempty"` - InstrumentType string `json:"instType,omitempty"` + Channel Channel `json:"channel"` + InstrumentID string `json:"instId,omitempty"` + InstrumentType string `json:"instType,omitempty"` } var CandleChannels = []string{ @@ -92,18 +92,23 @@ func convertSubscription(s types.Subscription) (WebsocketSubscription, error) { case types.KLineChannel: // Channel names are: return WebsocketSubscription{ - Channel: convertIntervalToCandle(s.Options.Interval), + Channel: Channel(convertIntervalToCandle(s.Options.Interval)), InstrumentID: toLocalSymbol(s.Symbol), }, nil case types.BookChannel: return WebsocketSubscription{ - Channel: "books", + Channel: ChannelBooks, InstrumentID: toLocalSymbol(s.Symbol), }, nil case types.BookTickerChannel: return WebsocketSubscription{ - Channel: "books5", + Channel: ChannelBook5, + InstrumentID: toLocalSymbol(s.Symbol), + }, nil + case types.MarketTradeChannel: + return WebsocketSubscription{ + Channel: ChannelMarketTrades, InstrumentID: toLocalSymbol(s.Symbol), }, nil } diff --git a/pkg/exchange/okex/parse.go b/pkg/exchange/okex/parse.go index cd2cbffe18..df538b373e 100644 --- a/pkg/exchange/okex/parse.go +++ b/pkg/exchange/okex/parse.go @@ -21,6 +21,7 @@ const ( ChannelBook5 Channel = "book5" ChannelCandlePrefix Channel = "candle" ChannelAccount Channel = "account" + ChannelMarketTrades Channel = "trades" ChannelOrders Channel = "orders" ) @@ -66,6 +67,14 @@ func parseWebSocketEvent(in []byte) (interface{}, error) { bookEvent.Action = event.ActionType return &bookEvent, nil + case ChannelMarketTrades: + var trade []MarketTradeEvent + err = json.Unmarshal(event.Data, &trade) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal data into MarketTradeEvent: %+v, err: %w", string(event.Data), err) + } + return trade, nil + case ChannelOrders: // TODO: remove fastjson return parseOrder(v) @@ -363,3 +372,54 @@ func parseOrder(v *fastjson.Value) ([]okexapi.OrderDetails, error) { return orderDetails, nil } + +func toGlobalSideType(side okexapi.SideType) (types.SideType, error) { + switch side { + case okexapi.SideTypeBuy: + return types.SideTypeBuy, nil + + case okexapi.SideTypeSell: + return types.SideTypeSell, nil + + default: + return types.SideType(side), fmt.Errorf("unexpected side: %s", side) + } +} + +type MarketTradeEvent struct { + InstId string `json:"instId"` + TradeId types.StrInt64 `json:"tradeId"` + Px fixedpoint.Value `json:"px"` + Sz fixedpoint.Value `json:"sz"` + Side okexapi.SideType `json:"side"` + Timestamp types.MillisecondTimestamp `json:"ts"` + Count types.StrInt64 `json:"count"` +} + +func (m *MarketTradeEvent) toGlobalTrade() (types.Trade, error) { + symbol := toGlobalSymbol(m.InstId) + if symbol == "" { + return types.Trade{}, fmt.Errorf("unexpected inst id: %s", m.InstId) + } + + side, err := toGlobalSideType(m.Side) + if err != nil { + return types.Trade{}, err + } + + return types.Trade{ + ID: uint64(m.TradeId), + OrderID: 0, // not supported + Exchange: types.ExchangeOKEx, + Price: m.Px, + Quantity: m.Sz, + QuoteQuantity: m.Px.Mul(m.Sz), + Symbol: symbol, + Side: side, + IsBuyer: side == types.SideTypeBuy, + IsMaker: false, // not supported + Time: types.Time(m.Timestamp.Time()), + Fee: fixedpoint.Zero, // not supported + FeeCurrency: "", // not supported + }, nil +} diff --git a/pkg/exchange/okex/parse_test.go b/pkg/exchange/okex/parse_test.go index 42c41cf2d8..5ebcf19893 100644 --- a/pkg/exchange/okex/parse_test.go +++ b/pkg/exchange/okex/parse_test.go @@ -6,6 +6,7 @@ import ( "github.com/stretchr/testify/assert" + "github.com/c9s/bbgo/pkg/exchange/okex/okexapi" "github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/types" ) @@ -575,3 +576,95 @@ func TestKLine_ToGlobal(t *testing.T) { }) } + +func Test_parseWebSocketEvent(t *testing.T) { + in := ` +{ + "arg": { + "channel": "trades", + "instId": "BTC-USDT" + }, + "data": [ + { + "instId": "BTC-USDT", + "tradeId": "130639474", + "px": "42219.9", + "sz": "0.12060306", + "side": "buy", + "ts": "1630048897897", + "count": "3" + } + ] +} +` + exp := []MarketTradeEvent{{ + InstId: "BTC-USDT", + TradeId: 130639474, + Px: fixedpoint.NewFromFloat(42219.9), + Sz: fixedpoint.NewFromFloat(0.12060306), + Side: okexapi.SideTypeBuy, + Timestamp: types.NewMillisecondTimestampFromInt(1630048897897), + Count: 3, + }} + + res, err := parseWebSocketEvent([]byte(in)) + assert.NoError(t, err) + event, ok := res.([]MarketTradeEvent) + assert.True(t, ok) + assert.Len(t, event, 1) + assert.Equal(t, exp, event) + +} + +func Test_toGlobalTrade(t *testing.T) { + // { + // "instId": "BTC-USDT", + // "tradeId": "130639474", + // "px": "42219.9", + // "sz": "0.12060306", + // "side": "buy", + // "ts": "1630048897897", + // "count": "3" + // } + marketTrade := MarketTradeEvent{ + InstId: "BTC-USDT", + TradeId: 130639474, + Px: fixedpoint.NewFromFloat(42219.9), + Sz: fixedpoint.NewFromFloat(0.12060306), + Side: okexapi.SideTypeBuy, + Timestamp: types.NewMillisecondTimestampFromInt(1630048897897), + Count: 3, + } + t.Run("succeeds", func(t *testing.T) { + trade, err := marketTrade.toGlobalTrade() + assert.NoError(t, err) + assert.Equal(t, types.Trade{ + ID: uint64(130639474), + OrderID: uint64(0), + Exchange: types.ExchangeOKEx, + Price: fixedpoint.NewFromFloat(42219.9), + Quantity: fixedpoint.NewFromFloat(0.12060306), + QuoteQuantity: marketTrade.Px.Mul(marketTrade.Sz), + Symbol: "BTCUSDT", + Side: types.SideTypeBuy, + IsBuyer: true, + IsMaker: false, + Time: types.Time(types.NewMillisecondTimestampFromInt(1630048897897)), + Fee: fixedpoint.Zero, + FeeCurrency: "", + FeeDiscounted: false, + }, trade) + }) + t.Run("unexpected side", func(t *testing.T) { + newTrade := marketTrade + newTrade.Side = "both" + _, err := newTrade.toGlobalTrade() + assert.ErrorContains(t, err, "both") + }) + t.Run("unexpected symbol", func(t *testing.T) { + newTrade := marketTrade + newTrade.InstId = "" + _, err := newTrade.toGlobalTrade() + assert.ErrorContains(t, err, "unexpected inst id") + }) +} diff --git a/pkg/exchange/okex/stream.go b/pkg/exchange/okex/stream.go index 9d45ae3b27..34d6c853ce 100644 --- a/pkg/exchange/okex/stream.go +++ b/pkg/exchange/okex/stream.go @@ -2,6 +2,7 @@ package okex import ( "context" + "golang.org/x/time/rate" "strconv" "time" @@ -9,6 +10,10 @@ import ( "github.com/c9s/bbgo/pkg/types" ) +var ( + tradeLogLimiter = rate.NewLimiter(rate.Every(time.Minute), 1) +) + type WebsocketOp struct { Op string `json:"op"` Args interface{} `json:"args"` @@ -33,6 +38,7 @@ type Stream struct { eventCallbacks []func(event WebSocketEvent) accountEventCallbacks []func(account okexapi.Account) orderDetailsEventCallbacks []func(orderDetails []okexapi.OrderDetails) + marketTradeEventCallbacks []func(tradeDetail []MarketTradeEvent) } func NewStream(client *okexapi.RestClient) *Stream { @@ -48,6 +54,7 @@ func NewStream(client *okexapi.RestClient) *Stream { stream.OnKLineEvent(stream.handleKLineEvent) stream.OnBookEvent(stream.handleBookEvent) stream.OnAccountEvent(stream.handleAccountEvent) + stream.OnMarketTradeEvent(stream.handleMarketTradeEvent) stream.OnOrderDetailsEvent(stream.handleOrderDetailsEvent) stream.OnEvent(stream.handleEvent) stream.OnConnect(stream.handleConnect) @@ -166,6 +173,20 @@ func (s *Stream) handleBookEvent(data BookEvent) { } } +func (s *Stream) handleMarketTradeEvent(data []MarketTradeEvent) { + for _, event := range data { + trade, err := event.toGlobalTrade() + if err != nil { + if tradeLogLimiter.Allow() { + log.WithError(err).Error("failed to convert to market trade") + } + continue + } + + s.EmitMarketTrade(trade) + } +} + func (s *Stream) handleKLineEvent(k KLineEvent) { for _, event := range k.Events { kline := event.ToGlobal(types.Interval(k.Interval), k.Symbol) @@ -207,5 +228,8 @@ func (s *Stream) dispatchEvent(e interface{}) { case []okexapi.OrderDetails: s.EmitOrderDetailsEvent(et) + case []MarketTradeEvent: + s.EmitMarketTradeEvent(et) + } } diff --git a/pkg/exchange/okex/stream_callbacks.go b/pkg/exchange/okex/stream_callbacks.go index 750614b7c6..b735d09850 100644 --- a/pkg/exchange/okex/stream_callbacks.go +++ b/pkg/exchange/okex/stream_callbacks.go @@ -56,6 +56,16 @@ func (s *Stream) EmitOrderDetailsEvent(orderDetails []okexapi.OrderDetails) { } } +func (s *Stream) OnMarketTradeEvent(cb func(tradeDetail []MarketTradeEvent)) { + s.marketTradeEventCallbacks = append(s.marketTradeEventCallbacks, cb) +} + +func (s *Stream) EmitMarketTradeEvent(tradeDetail []MarketTradeEvent) { + for _, cb := range s.marketTradeEventCallbacks { + cb(tradeDetail) + } +} + type StreamEventHub interface { OnKLineEvent(cb func(candle KLineEvent)) @@ -66,4 +76,6 @@ type StreamEventHub interface { OnAccountEvent(cb func(account okexapi.Account)) OnOrderDetailsEvent(cb func(orderDetails []okexapi.OrderDetails)) + + OnMarketTradeEvent(cb func(tradeDetail []MarketTradeEvent)) } diff --git a/pkg/exchange/okex/stream_test.go b/pkg/exchange/okex/stream_test.go index 1cc4e5e5da..b9b758a6aa 100644 --- a/pkg/exchange/okex/stream_test.go +++ b/pkg/exchange/okex/stream_test.go @@ -48,6 +48,20 @@ func TestStream(t *testing.T) { c := make(chan struct{}) <-c }) + + t.Run("market trade test", func(t *testing.T) { + s.Subscribe(types.MarketTradeChannel, "BTCUSDT", types.SubscribeOptions{}) + s.SetPublicOnly() + err := s.Connect(context.Background()) + assert.NoError(t, err) + + s.OnMarketTrade(func(trade types.Trade) { + t.Log("got trade upgrade", trade) + }) + c := make(chan struct{}) + <-c + }) + t.Run("kline test", func(t *testing.T) { s.Subscribe(types.KLineChannel, "LTC-USD-200327", types.SubscribeOptions{ Interval: types.Interval1m,