Skip to content

Commit

Permalink
Merge pull request #1486 from c9s/edwin/okx/add-market-trade-stream
Browse files Browse the repository at this point in the history
FEATURE: [okx] support market trade streaming
  • Loading branch information
bailantaotao authored Jan 9, 2024
2 parents c36975e + 2e34f78 commit a680df2
Show file tree
Hide file tree
Showing 6 changed files with 214 additions and 6 deletions.
17 changes: 11 additions & 6 deletions pkg/exchange/okex/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ func toGlobalBalance(account *okexapi.Account) types.BalanceMap {
}

type WebsocketSubscription struct {
Channel string `json:"channel"`
InstrumentID string `json:"instId,omitempty"`
InstrumentType string `json:"instType,omitempty"`
Channel Channel `json:"channel"`
InstrumentID string `json:"instId,omitempty"`
InstrumentType string `json:"instType,omitempty"`
}

var CandleChannels = []string{
Expand Down Expand Up @@ -92,18 +92,23 @@ func convertSubscription(s types.Subscription) (WebsocketSubscription, error) {
case types.KLineChannel:
// Channel names are:
return WebsocketSubscription{
Channel: convertIntervalToCandle(s.Options.Interval),
Channel: Channel(convertIntervalToCandle(s.Options.Interval)),
InstrumentID: toLocalSymbol(s.Symbol),
}, nil

case types.BookChannel:
return WebsocketSubscription{
Channel: "books",
Channel: ChannelBooks,
InstrumentID: toLocalSymbol(s.Symbol),
}, nil
case types.BookTickerChannel:
return WebsocketSubscription{
Channel: "books5",
Channel: ChannelBook5,
InstrumentID: toLocalSymbol(s.Symbol),
}, nil
case types.MarketTradeChannel:
return WebsocketSubscription{
Channel: ChannelMarketTrades,
InstrumentID: toLocalSymbol(s.Symbol),
}, nil
}
Expand Down
60 changes: 60 additions & 0 deletions pkg/exchange/okex/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const (
ChannelBook5 Channel = "book5"
ChannelCandlePrefix Channel = "candle"
ChannelAccount Channel = "account"
ChannelMarketTrades Channel = "trades"
ChannelOrders Channel = "orders"
)

Expand Down Expand Up @@ -66,6 +67,14 @@ func parseWebSocketEvent(in []byte) (interface{}, error) {
bookEvent.Action = event.ActionType
return &bookEvent, nil

case ChannelMarketTrades:
var trade []MarketTradeEvent
err = json.Unmarshal(event.Data, &trade)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal data into MarketTradeEvent: %+v, err: %w", string(event.Data), err)
}
return trade, nil

case ChannelOrders:
// TODO: remove fastjson
return parseOrder(v)
Expand Down Expand Up @@ -363,3 +372,54 @@ func parseOrder(v *fastjson.Value) ([]okexapi.OrderDetails, error) {

return orderDetails, nil
}

func toGlobalSideType(side okexapi.SideType) (types.SideType, error) {
switch side {
case okexapi.SideTypeBuy:
return types.SideTypeBuy, nil

case okexapi.SideTypeSell:
return types.SideTypeSell, nil

default:
return types.SideType(side), fmt.Errorf("unexpected side: %s", side)
}
}

type MarketTradeEvent struct {
InstId string `json:"instId"`
TradeId types.StrInt64 `json:"tradeId"`
Px fixedpoint.Value `json:"px"`
Sz fixedpoint.Value `json:"sz"`
Side okexapi.SideType `json:"side"`
Timestamp types.MillisecondTimestamp `json:"ts"`
Count types.StrInt64 `json:"count"`
}

func (m *MarketTradeEvent) toGlobalTrade() (types.Trade, error) {
symbol := toGlobalSymbol(m.InstId)
if symbol == "" {
return types.Trade{}, fmt.Errorf("unexpected inst id: %s", m.InstId)
}

side, err := toGlobalSideType(m.Side)
if err != nil {
return types.Trade{}, err
}

return types.Trade{
ID: uint64(m.TradeId),
OrderID: 0, // not supported
Exchange: types.ExchangeOKEx,
Price: m.Px,
Quantity: m.Sz,
QuoteQuantity: m.Px.Mul(m.Sz),
Symbol: symbol,
Side: side,
IsBuyer: side == types.SideTypeBuy,
IsMaker: false, // not supported
Time: types.Time(m.Timestamp.Time()),
Fee: fixedpoint.Zero, // not supported
FeeCurrency: "", // not supported
}, nil
}
93 changes: 93 additions & 0 deletions pkg/exchange/okex/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/stretchr/testify/assert"

"github.com/c9s/bbgo/pkg/exchange/okex/okexapi"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
)
Expand Down Expand Up @@ -575,3 +576,95 @@ func TestKLine_ToGlobal(t *testing.T) {
})

}

func Test_parseWebSocketEvent(t *testing.T) {
in := `
{
"arg": {
"channel": "trades",
"instId": "BTC-USDT"
},
"data": [
{
"instId": "BTC-USDT",
"tradeId": "130639474",
"px": "42219.9",
"sz": "0.12060306",
"side": "buy",
"ts": "1630048897897",
"count": "3"
}
]
}
`
exp := []MarketTradeEvent{{
InstId: "BTC-USDT",
TradeId: 130639474,
Px: fixedpoint.NewFromFloat(42219.9),
Sz: fixedpoint.NewFromFloat(0.12060306),
Side: okexapi.SideTypeBuy,
Timestamp: types.NewMillisecondTimestampFromInt(1630048897897),
Count: 3,
}}

res, err := parseWebSocketEvent([]byte(in))
assert.NoError(t, err)
event, ok := res.([]MarketTradeEvent)
assert.True(t, ok)
assert.Len(t, event, 1)
assert.Equal(t, exp, event)

}

func Test_toGlobalTrade(t *testing.T) {
// {
// "instId": "BTC-USDT",
// "tradeId": "130639474",
// "px": "42219.9",
// "sz": "0.12060306",
// "side": "buy",
// "ts": "1630048897897",
// "count": "3"
// }
marketTrade := MarketTradeEvent{
InstId: "BTC-USDT",
TradeId: 130639474,
Px: fixedpoint.NewFromFloat(42219.9),
Sz: fixedpoint.NewFromFloat(0.12060306),
Side: okexapi.SideTypeBuy,
Timestamp: types.NewMillisecondTimestampFromInt(1630048897897),
Count: 3,
}
t.Run("succeeds", func(t *testing.T) {
trade, err := marketTrade.toGlobalTrade()
assert.NoError(t, err)
assert.Equal(t, types.Trade{
ID: uint64(130639474),
OrderID: uint64(0),
Exchange: types.ExchangeOKEx,
Price: fixedpoint.NewFromFloat(42219.9),
Quantity: fixedpoint.NewFromFloat(0.12060306),
QuoteQuantity: marketTrade.Px.Mul(marketTrade.Sz),
Symbol: "BTCUSDT",
Side: types.SideTypeBuy,
IsBuyer: true,
IsMaker: false,
Time: types.Time(types.NewMillisecondTimestampFromInt(1630048897897)),
Fee: fixedpoint.Zero,
FeeCurrency: "",
FeeDiscounted: false,
}, trade)
})
t.Run("unexpected side", func(t *testing.T) {
newTrade := marketTrade
newTrade.Side = "both"
_, err := newTrade.toGlobalTrade()
assert.ErrorContains(t, err, "both")
})
t.Run("unexpected symbol", func(t *testing.T) {
newTrade := marketTrade
newTrade.InstId = ""
_, err := newTrade.toGlobalTrade()
assert.ErrorContains(t, err, "unexpected inst id")
})
}
24 changes: 24 additions & 0 deletions pkg/exchange/okex/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,18 @@ package okex

import (
"context"
"golang.org/x/time/rate"
"strconv"
"time"

"github.com/c9s/bbgo/pkg/exchange/okex/okexapi"
"github.com/c9s/bbgo/pkg/types"
)

var (
tradeLogLimiter = rate.NewLimiter(rate.Every(time.Minute), 1)
)

type WebsocketOp struct {
Op string `json:"op"`
Args interface{} `json:"args"`
Expand All @@ -33,6 +38,7 @@ type Stream struct {
eventCallbacks []func(event WebSocketEvent)
accountEventCallbacks []func(account okexapi.Account)
orderDetailsEventCallbacks []func(orderDetails []okexapi.OrderDetails)
marketTradeEventCallbacks []func(tradeDetail []MarketTradeEvent)
}

func NewStream(client *okexapi.RestClient) *Stream {
Expand All @@ -48,6 +54,7 @@ func NewStream(client *okexapi.RestClient) *Stream {
stream.OnKLineEvent(stream.handleKLineEvent)
stream.OnBookEvent(stream.handleBookEvent)
stream.OnAccountEvent(stream.handleAccountEvent)
stream.OnMarketTradeEvent(stream.handleMarketTradeEvent)
stream.OnOrderDetailsEvent(stream.handleOrderDetailsEvent)
stream.OnEvent(stream.handleEvent)
stream.OnConnect(stream.handleConnect)
Expand Down Expand Up @@ -166,6 +173,20 @@ func (s *Stream) handleBookEvent(data BookEvent) {
}
}

func (s *Stream) handleMarketTradeEvent(data []MarketTradeEvent) {
for _, event := range data {
trade, err := event.toGlobalTrade()
if err != nil {
if tradeLogLimiter.Allow() {
log.WithError(err).Error("failed to convert to market trade")
}
continue
}

s.EmitMarketTrade(trade)
}
}

func (s *Stream) handleKLineEvent(k KLineEvent) {
for _, event := range k.Events {
kline := event.ToGlobal(types.Interval(k.Interval), k.Symbol)
Expand Down Expand Up @@ -207,5 +228,8 @@ func (s *Stream) dispatchEvent(e interface{}) {
case []okexapi.OrderDetails:
s.EmitOrderDetailsEvent(et)

case []MarketTradeEvent:
s.EmitMarketTradeEvent(et)

}
}
12 changes: 12 additions & 0 deletions pkg/exchange/okex/stream_callbacks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions pkg/exchange/okex/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,20 @@ func TestStream(t *testing.T) {
c := make(chan struct{})
<-c
})

t.Run("market trade test", func(t *testing.T) {
s.Subscribe(types.MarketTradeChannel, "BTCUSDT", types.SubscribeOptions{})
s.SetPublicOnly()
err := s.Connect(context.Background())
assert.NoError(t, err)

s.OnMarketTrade(func(trade types.Trade) {
t.Log("got trade upgrade", trade)
})
c := make(chan struct{})
<-c
})

t.Run("kline test", func(t *testing.T) {
s.Subscribe(types.KLineChannel, "LTC-USD-200327", types.SubscribeOptions{
Interval: types.Interval1m,
Expand Down

0 comments on commit a680df2

Please sign in to comment.