From 1240a32202ec339c97b20aec300a7197f29b10eb Mon Sep 17 00:00:00 2001 From: dboyliao Date: Wed, 26 Feb 2025 15:45:24 +0800 Subject: [PATCH] =?UTF-8?q?=F0=9F=94=A7=20feat(coinbase):=20implement=20me?= =?UTF-8?q?ssage=20parsing=20for=20websocket=20messages?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/exchange/coinbase/parse.go | 101 ++++++++++++++++++++++++++++++++ pkg/exchange/coinbase/stream.go | 5 +- 2 files changed, 102 insertions(+), 4 deletions(-) create mode 100644 pkg/exchange/coinbase/parse.go diff --git a/pkg/exchange/coinbase/parse.go b/pkg/exchange/coinbase/parse.go new file mode 100644 index 000000000..d6031f554 --- /dev/null +++ b/pkg/exchange/coinbase/parse.go @@ -0,0 +1,101 @@ +package coinbase + +import ( + "encoding/json" + "errors" +) + +// See https://docs.cdp.coinbase.com/exchange/docs/websocket-channels for message types +func (s *Stream) parseMessage(data []byte) (interface{}, error) { + var msgType string + { + var e messageBaseType + json.Unmarshal(data, &e) + msgType = e.Type + } + + switch msgType { + case "heartbeat": + var msg HeartbeatMessage + json.Unmarshal(data, &msg) + return &msg, nil + case "status": + var msg StatusMessage + json.Unmarshal(data, &msg) + return &msg, nil + case "auction": + var msg AuctionMessage + json.Unmarshal(data, &msg) + return &msg, nil + case "rfq_match": + var msg RfqMessage + json.Unmarshal(data, &msg) + return &msg, nil + case "ticker": + var msg TickerMessage + json.Unmarshal(data, &msg) + return &msg, nil + case "received": + // try market order first + { + var msg ReceivedMarketOrderMessage + json.Unmarshal(data, &msg) + if !msg.Funds.IsZero() { + return &msg, nil + } + } + var msg ReceivedLimitOrderMessage + json.Unmarshal(data, &msg) + return &msg, nil + case "open": + var msg OpenMessage + json.Unmarshal(data, &msg) + return &msg, nil + case "done": + var msg DoneMessage + json.Unmarshal(data, &msg) + return &msg, nil + case "match", "last_match": + // authenticated stream + if !s.PublicOnly { + // try maker order first + { + var msg AuthMakerMatchMessage + json.Unmarshal(data, &msg) + if len(msg.MakerUserID) > 0 { + return &msg, nil + } + } + // should be taker order + var msg AuthTakerMatchMessage + json.Unmarshal(data, &msg) + return &msg, nil + } + // public stream + var msg MatchMessage + json.Unmarshal(data, &msg) + return &msg, nil + case "change": + var reason string + { + var e changeMessageType + json.Unmarshal(data, &e) + reason = e.Reason + } + switch reason { + case "stp": + var msg StpChangeMessage + json.Unmarshal(data, &msg) + return &msg, nil + case "modify_order": + var msg ModifyOrderChangeMessage + json.Unmarshal(data, &msg) + return &msg, nil + } + case "active": + var msg ActiveMessage + json.Unmarshal(data, &msg) + return &msg, nil + } + return nil, errors.New("unknown message type") +} diff --git a/pkg/exchange/coinbase/stream.go b/pkg/exchange/coinbase/stream.go index cff0853bb..70ba0c25b 100644 --- a/pkg/exchange/coinbase/stream.go +++ b/pkg/exchange/coinbase/stream.go @@ -29,9 +29,6 @@ func NewStream() *Stream { s := Stream{ StandardStream: types.NewStandardStream(), } + s.SetParser(s.parseMessage) return &s } - -// func (s *Stream) handleAuth() { -// return -// }