Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

REFACTOR: [okx] refactor account info #1489

Merged
merged 2 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/exchange/bitget/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ func (s *Stream) dispatchEvent(event interface{}) {
case *WsEvent:
if err := e.IsValid(); err != nil {
log.Errorf("invalid event: %v", err)
return
}
if e.IsAuthenticated() {
s.EmitAuth()
Expand Down
49 changes: 37 additions & 12 deletions pkg/exchange/okex/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,12 @@ func parseWebSocketEvent(in []byte) (interface{}, error) {
return nil, err
}
if event.Event != "" {
// TODO: remove fastjson
return event, nil
return &event, nil
}

switch event.Arg.Channel {
case ChannelAccount:
// TODO: remove fastjson
return parseAccount(v)
return parseAccount(event.Data)

case ChannelBooks, ChannelBook5:
var bookEvent BookEvent
Expand Down Expand Up @@ -100,10 +98,17 @@ func parseWebSocketEvent(in []byte) (interface{}, error) {
return nil, nil
}

type WsEventType string

const (
WsEventTypeLogin = "login"
WsEventTypeError = "error"
)

type WebSocketEvent struct {
Event string `json:"event"`
Code string `json:"code,omitempty"`
Message string `json:"msg,omitempty"`
Event WsEventType `json:"event"`
Code string `json:"code,omitempty"`
Message string `json:"msg,omitempty"`
Arg struct {
Channel Channel `json:"channel"`
InstId string `json:"instId"`
Expand All @@ -112,6 +117,28 @@ type WebSocketEvent struct {
ActionType ActionType `json:"action"`
}

func (w *WebSocketEvent) IsValid() error {
switch w.Event {
case WsEventTypeError:
return fmt.Errorf("websocket request error, code: %s, msg: %s", w.Code, w.Message)

case WsEventTypeLogin:
// Actually, this code is unnecessary because the events are either `Subscribe` or `Unsubscribe`, But to avoid bugs
// in the exchange, we still check.
if w.Code != "0" || len(w.Message) != 0 {
return fmt.Errorf("websocket request error, code: %s, msg: %s", w.Code, w.Message)
}
return nil

default:
return fmt.Errorf("unexpected event type: %+v", w)
}
}

func (w *WebSocketEvent) IsAuthenticated() bool {
return w.Event == WsEventTypeLogin && w.Code == "0"
}

type BookEvent struct {
InstrumentID string
Symbol string
Expand Down Expand Up @@ -345,17 +372,15 @@ type KLineEvent struct {
Channel Channel
}

func parseAccount(v *fastjson.Value) (*okexapi.Account, error) {
data := v.Get("data").MarshalTo(nil)

func parseAccount(v []byte) (*okexapi.Account, error) {
var accounts []okexapi.Account
err := json.Unmarshal(data, &accounts)
err := json.Unmarshal(v, &accounts)
if err != nil {
return nil, err
}

if len(accounts) == 0 {
return nil, errors.New("empty account data")
return &okexapi.Account{}, nil
}

return &accounts[0], nil
Expand Down
182 changes: 182 additions & 0 deletions pkg/exchange/okex/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,126 @@ import (
"github.com/c9s/bbgo/pkg/types"
)

func Test_parseWebSocketEvent_accountEvent(t *testing.T) {
t.Run("succeeds", func(t *testing.T) {
in := `
{
"arg": {
"channel": "account",
"uid": "77982378738415879"
},
"data": [
{
"uTime": "1614846244194",
"totalEq": "91884",
"adjEq": "91884.8502560037982063",
"isoEq": "0",
"ordFroz": "0",
"imr": "0",
"mmr": "0",
"borrowFroz": "",
"notionalUsd": "",
"mgnRatio": "100000",
"details": [{
"availBal": "",
"availEq": "1",
"ccy": "BTC",
"cashBal": "1",
"uTime": "1617279471503",
"disEq": "50559.01",
"eq": "1",
"eqUsd": "45078",
"fixedBal": "0",
"frozenBal": "0",
"interest": "0",
"isoEq": "0",
"liab": "0",
"maxLoan": "",
"mgnRatio": "",
"notionalLever": "0",
"ordFrozen": "0",
"upl": "0",
"uplLiab": "0",
"crossLiab": "0",
"isoLiab": "0",
"coinUsdPrice": "60000",
"stgyEq":"0",
"spotInUseAmt":"",
"isoUpl":"",
"borrowFroz": ""
},
{
"availBal": "",
"availEq": "41307",
"ccy": "USDT",
"cashBal": "41307",
"uTime": "1617279471503",
"disEq": "41325",
"eq": "41307",
"eqUsd": "45078",
"fixedBal": "0",
"frozenBal": "0",
"interest": "0",
"isoEq": "0",
"liab": "0",
"maxLoan": "",
"mgnRatio": "",
"notionalLever": "0",
"ordFrozen": "0",
"upl": "0",
"uplLiab": "0",
"crossLiab": "0",
"isoLiab": "0",
"coinUsdPrice": "1.00007",
"stgyEq":"0",
"spotInUseAmt":"",
"isoUpl":"",
"borrowFroz": ""
}
]
}
]
}
`

exp := &okexapi.Account{
TotalEquityInUSD: fixedpoint.NewFromFloat(91884),
UpdateTime: "1614846244194",
Details: []okexapi.BalanceDetail{
{
Currency: "BTC",
Available: fixedpoint.NewFromFloat(1),
CashBalance: fixedpoint.NewFromFloat(1),
OrderFrozen: fixedpoint.Zero,
Frozen: fixedpoint.Zero,
Equity: fixedpoint.One,
EquityInUSD: fixedpoint.NewFromFloat(45078),
UpdateTime: types.NewMillisecondTimestampFromInt(1617279471503),
UnrealizedProfitAndLoss: fixedpoint.Zero,
},
{
Currency: "USDT",
Available: fixedpoint.NewFromFloat(41307),
CashBalance: fixedpoint.NewFromFloat(41307),
OrderFrozen: fixedpoint.Zero,
Frozen: fixedpoint.Zero,
Equity: fixedpoint.NewFromFloat(41307),
EquityInUSD: fixedpoint.NewFromFloat(45078),
UpdateTime: types.NewMillisecondTimestampFromInt(1617279471503),
UnrealizedProfitAndLoss: fixedpoint.Zero,
},
},
}

res, err := parseWebSocketEvent([]byte(in))
assert.NoError(t, err)
event, ok := res.(*okexapi.Account)
assert.True(t, ok)
assert.Equal(t, exp, event)
})

}

func TestParsePriceVolumeOrderSliceJSON(t *testing.T) {
t.Run("snapshot", func(t *testing.T) {
in := `
Expand Down Expand Up @@ -668,3 +788,65 @@ func Test_toGlobalTrade(t *testing.T) {
assert.ErrorContains(t, err, "unexpected inst id")
})
}

func TestWebSocketEvent_IsValid(t *testing.T) {
t.Run("op login event", func(t *testing.T) {
input := `{
"event": "login",
"code": "0",
"msg": "",
"connId": "a4d3ae55"
}`
res, err := parseWebSocketEvent([]byte(input))
assert.NoError(t, err)
opEvent, ok := res.(*WebSocketEvent)
assert.True(t, ok)
assert.Equal(t, WebSocketEvent{
Event: WsEventTypeLogin,
Code: "0",
Message: "",
}, *opEvent)

assert.NoError(t, opEvent.IsValid())
})

t.Run("op error event", func(t *testing.T) {
input := `{
"event": "error",
"code": "60009",
"msg": "Login failed.",
"connId": "a4d3ae55"
}`
res, err := parseWebSocketEvent([]byte(input))
assert.NoError(t, err)
opEvent, ok := res.(*WebSocketEvent)
assert.True(t, ok)
assert.Equal(t, WebSocketEvent{
Event: WsEventTypeError,
Code: "60009",
Message: "Login failed.",
}, *opEvent)

assert.ErrorContains(t, opEvent.IsValid(), "request error")
})

t.Run("unexpected event", func(t *testing.T) {
input := `{
"event": "test gg",
"code": "60009",
"msg": "unexpected",
"connId": "a4d3ae55"
}`
res, err := parseWebSocketEvent([]byte(input))
assert.NoError(t, err)
opEvent, ok := res.(*WebSocketEvent)
assert.True(t, ok)
assert.Equal(t, WebSocketEvent{
Event: "test gg",
Code: "60009",
Message: "unexpected",
}, *opEvent)

assert.ErrorContains(t, opEvent.IsValid(), "unexpected event type")
})
}
44 changes: 21 additions & 23 deletions pkg/exchange/okex/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ type Stream struct {
// public callbacks
kLineEventCallbacks []func(candle KLineEvent)
bookEventCallbacks []func(book BookEvent)
eventCallbacks []func(event WebSocketEvent)
accountEventCallbacks []func(account okexapi.Account)
orderDetailsEventCallbacks []func(orderDetails []okexapi.OrderDetails)
marketTradeEventCallbacks []func(tradeDetail []MarketTradeEvent)
Expand All @@ -56,8 +55,8 @@ func NewStream(client *okexapi.RestClient) *Stream {
stream.OnAccountEvent(stream.handleAccountEvent)
stream.OnMarketTradeEvent(stream.handleMarketTradeEvent)
stream.OnOrderDetailsEvent(stream.handleOrderDetailsEvent)
stream.OnEvent(stream.handleEvent)
stream.OnConnect(stream.handleConnect)
stream.OnAuth(stream.handleAuth)
return stream
}

Expand Down Expand Up @@ -113,26 +112,19 @@ func (s *Stream) handleConnect() {
}
}

func (s *Stream) handleEvent(event WebSocketEvent) {
switch event.Event {
case "login":
if event.Code == "0" {
s.EmitAuth()
var subs = []WebsocketSubscription{
{Channel: "account"},
{Channel: "orders", InstrumentType: string(okexapi.InstrumentTypeSpot)},
}

log.Infof("subscribing private channels: %+v", subs)
err := s.Conn.WriteJSON(WebsocketOp{
Op: "subscribe",
Args: subs,
})
func (s *Stream) handleAuth() {
var subs = []WebsocketSubscription{
{Channel: ChannelAccount},
{Channel: "orders", InstrumentType: string(okexapi.InstrumentTypeSpot)},
}

if err != nil {
log.WithError(err).Error("private channel subscribe error")
}
}
log.Infof("subscribing private channels: %+v", subs)
err := s.Conn.WriteJSON(WebsocketOp{
Op: "subscribe",
Args: subs,
})
if err != nil {
log.WithError(err).Error("private channel subscribe error")
}
}

Expand Down Expand Up @@ -160,7 +152,7 @@ func (s *Stream) handleOrderDetailsEvent(orderDetails []okexapi.OrderDetails) {

func (s *Stream) handleAccountEvent(account okexapi.Account) {
balances := toGlobalBalance(&account)
s.EmitBalanceSnapshot(balances)
s.EmitBalanceUpdate(balances)
}

func (s *Stream) handleBookEvent(data BookEvent) {
Expand Down Expand Up @@ -211,7 +203,13 @@ func (s *Stream) createEndpoint(ctx context.Context) (string, error) {
func (s *Stream) dispatchEvent(e interface{}) {
switch et := e.(type) {
case *WebSocketEvent:
s.EmitEvent(*et)
if err := et.IsValid(); err != nil {
log.Errorf("invalid event: %v", err)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is early return missing here?

return
}
if et.IsAuthenticated() {
s.EmitAuth()
}

case *BookEvent:
// there's "books" for 400 depth and books5 for 5 depth
Expand Down
12 changes: 0 additions & 12 deletions pkg/exchange/okex/stream_callbacks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading