Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEATURE: [okx] support Unsubscription and Resubscription #1496

Merged
merged 2 commits into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/exchange/okex/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ func convertSubscription(s types.Subscription) (WebsocketSubscription, error) {
}, nil

case types.BookChannel:
if s.Options.Depth != types.DepthLevel400 {
return WebsocketSubscription{}, fmt.Errorf("%s depth not supported", s.Options.Depth)
}

return WebsocketSubscription{
Channel: ChannelBooks,
InstrumentID: toLocalSymbol(s.Symbol),
Expand Down
9 changes: 7 additions & 2 deletions pkg/exchange/okex/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,10 @@ func parseWebSocketEvent(in []byte) (interface{}, error) {
type WsEventType string

const (
WsEventTypeLogin = "login"
WsEventTypeError = "error"
WsEventTypeLogin = "login"
WsEventTypeError = "error"
WsEventTypeSubscribe = "subscribe"
WsEventTypeUnsubscribe = "unsubscribe"
)

type WebSocketEvent struct {
Expand All @@ -122,6 +124,9 @@ func (w *WebSocketEvent) IsValid() error {
case WsEventTypeError:
return fmt.Errorf("websocket request error, code: %s, msg: %s", w.Code, w.Message)

case WsEventTypeSubscribe, WsEventTypeUnsubscribe:
return nil

case WsEventTypeLogin:
// Actually, this code is unnecessary because the events are either `Subscribe` or `Unsubscribe`, But to avoid bugs
// in the exchange, we still check.
Expand Down
41 changes: 40 additions & 1 deletion pkg/exchange/okex/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package okex

import (
"context"
"fmt"
"golang.org/x/time/rate"
"strconv"
"time"
Expand All @@ -15,7 +16,7 @@ var (
)

type WebsocketOp struct {
Op string `json:"op"`
Op WsEventType `json:"op"`
Args interface{} `json:"args"`
}

Expand Down Expand Up @@ -60,6 +61,44 @@ func NewStream(client *okexapi.RestClient) *Stream {
return stream
}

func (s *Stream) syncSubscriptions(opType WsEventType) error {
if opType != WsEventTypeUnsubscribe && opType != WsEventTypeSubscribe {
return fmt.Errorf("unexpected subscription type: %v", opType)
}

logger := log.WithField("opType", opType)
var topics []WebsocketSubscription
for _, subscription := range s.Subscriptions {
topic, err := convertSubscription(subscription)
if err != nil {
logger.WithError(err).Errorf("convert error, subscription: %+v", subscription)
return err
}

topics = append(topics, topic)
}

logger.Infof("%s channels: %+v", opType, topics)
if err := s.Conn.WriteJSON(WebsocketOp{
Op: opType,
Args: topics,
}); err != nil {
logger.WithError(err).Error("failed to send request")
return err
}

return nil
}

func (s *Stream) Unsubscribe() {
// errors are handled in the syncSubscriptions, so they are skipped here.
_ = s.syncSubscriptions(WsEventTypeUnsubscribe)
s.Resubscribe(func(old []types.Subscription) (new []types.Subscription, err error) {
// clear the subscriptions
return []types.Subscription{}, nil
})
}

func (s *Stream) handleConnect() {
if s.PublicOnly {
var subs []WebsocketSubscription
Expand Down
49 changes: 48 additions & 1 deletion pkg/exchange/okex/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"os"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/assert"

Expand Down Expand Up @@ -47,7 +48,7 @@ func TestStream(t *testing.T) {

t.Run("book test", func(t *testing.T) {
s.Subscribe(types.BookChannel, "BTCUSDT", types.SubscribeOptions{
Depth: types.DepthLevel50,
Depth: types.DepthLevel400,
})
s.SetPublicOnly()
err := s.Connect(context.Background())
Expand Down Expand Up @@ -93,4 +94,50 @@ func TestStream(t *testing.T) {
c := make(chan struct{})
<-c
})

t.Run("Subscribe/Unsubscribe test", func(t *testing.T) {
s.Subscribe(types.BookChannel, "BTCUSDT", types.SubscribeOptions{
Depth: types.DepthLevel400,
})
s.SetPublicOnly()
err := s.Connect(context.Background())
assert.NoError(t, err)

s.OnBookSnapshot(func(book types.SliceOrderBook) {
t.Log("got snapshot", book)
})
s.OnBookUpdate(func(book types.SliceOrderBook) {
t.Log("got update", book)
})

<-time.After(5 * time.Second)

s.Unsubscribe()
c := make(chan struct{})
<-c
})

t.Run("Resubscribe test", func(t *testing.T) {
s.Subscribe(types.BookChannel, "BTCUSDT", types.SubscribeOptions{
Depth: types.DepthLevel400,
})
s.SetPublicOnly()
err := s.Connect(context.Background())
assert.NoError(t, err)

s.OnBookSnapshot(func(book types.SliceOrderBook) {
t.Log("got snapshot", book)
})
s.OnBookUpdate(func(book types.SliceOrderBook) {
t.Log("got update", book)
})

<-time.After(5 * time.Second)

s.Resubscribe(func(old []types.Subscription) (new []types.Subscription, err error) {
return old, nil
})
c := make(chan struct{})
<-c
})
}
1 change: 1 addition & 0 deletions pkg/types/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,7 @@ const (
DepthLevel20 Depth = "20"
DepthLevel50 Depth = "50"
DepthLevel200 Depth = "200"
DepthLevel400 Depth = "400"
)

type Speed string
Expand Down