From 5c3a79f7530b1a1cc2df5032d7a7f7589ba4dd80 Mon Sep 17 00:00:00 2001 From: dboyliao Date: Wed, 26 Feb 2025 17:14:16 +0800 Subject: [PATCH 1/4] =?UTF-8?q?=F0=9F=94=A7=20feat(coinbase):=20Websocket?= =?UTF-8?q?=20stream=20with=20authentication=20and=20subscription=20handli?= =?UTF-8?q?ng?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/exchange/coinbase/stream.go | 169 +++++++++++++++++++++++++++++++- 1 file changed, 168 insertions(+), 1 deletion(-) diff --git a/pkg/exchange/coinbase/stream.go b/pkg/exchange/coinbase/stream.go index ce2660d4f..e032321b0 100644 --- a/pkg/exchange/coinbase/stream.go +++ b/pkg/exchange/coinbase/stream.go @@ -1,12 +1,25 @@ package coinbase import ( + "context" + "crypto/hmac" + "crypto/sha256" + "encoding/base64" + "encoding/json" + "strconv" + "time" + "github.com/c9s/bbgo/pkg/types" ) +const wsFeedURL = "wss://ws-feed.exchange.coinbase.com" + //go:generate callbackgen -type Stream type Stream struct { types.StandardStream + apiKey string + passphrase string + secretKey string // callbacks statusMessageCallbacks []func(m *StatusMessage) @@ -21,10 +34,164 @@ type Stream struct { activeMessageCallbacks []func(m *ActiveMessage) } -func NewStream() *Stream { +func NewStream( + apiKey string, + passphrase string, + secretKey string, +) *Stream { s := Stream{ StandardStream: types.NewStandardStream(), + apiKey: apiKey, + passphrase: passphrase, + secretKey: secretKey, } + s.PublicOnly = (apiKey == "" || passphrase == "" || secretKey == "") s.SetParser(s.parseMessage) + s.SetDispatcher(s.dispatchEvent) + s.SetEndpointCreator(createEndpoint) + + // public handlers + s.OnConnect(s.handleConnect) + return &s } + +func (s *Stream) dispatchEvent(e interface{}) { + switch e := e.(type) { + case *StatusMessage: + s.EmitStatusMessage(e) + case *AuctionMessage: + s.EmitAuctionMessage(e) + case *RfqMessage: + s.EmitRfqMessage(e) + case *TickerMessage: + s.EmitTickerMessage(e) + case *ReceivedLimitOrderMessage: + s.EmitReceivedLimitOrderMessage(e) + case *ReceivedMarketOrderMessage: + s.EmitReceivedMarketOrderMessage(e) + case *OpenMessage: + s.EmitOpenMessage(e) + case *DoneMessage: + s.EmitDoneMessage(e) + case *MatchMessage: + s.EmitMatchMessage(e) + case *AuthMakerMatchMessage: + s.EmitAuthMakerMatchMessage(e) + case *AuthTakerMatchMessage: + s.EmitAuthTakerMatchMessage(e) + case *StpChangeMessage: + s.EmitStpChangeMessage(e) + case *ModifyOrderChangeMessage: + s.EmitModifyOrderChangeMessage(e) + case *ActiveMessage: + s.EmitActiveMessage(e) + default: + log.Warnf("skip dispatching msg due to unknown message type: %T", e) + } +} + +func createEndpoint(ctx context.Context) (string, error) { + return wsFeedURL, nil +} + +type channelType struct { + Name string `json:"name"` + ProductIDs []string `json:"product_ids,omitempty"` +} + +type websocketCommand struct { + Type string `json:"type"` + Channels []channelType `json:"channels"` + Signature *string `json:"signature,omitempty"` + Key *string `json:"key,omitempty"` + Passphrase *string `json:"passphrase,omitempty"` + Timestamp *string `json:"timestamp,omitempty"` +} + +func (s *Stream) handleConnect() { + // subscribe to channels + if len(s.Subscriptions) == 0 { + return + } + + subProductsMap := make(map[string][]string) + for _, sub := range s.Subscriptions { + strChannel := string(sub.Channel) + if _, ok := subProductsMap[strChannel]; !ok { + subProductsMap[strChannel] = []string{} + } + // "rfq_matches" allow empty symbol + if sub.Channel != "rfq_matches" && len(sub.Symbol) == 0 { + continue + } + products := subProductsMap[strChannel] + products = append(products, sub.Symbol) + subProductsMap[strChannel] = products + } + subCmds := []websocketCommand{} + signature, ts := s.generateSignature() + for channel, productIDs := range subProductsMap { + var subType string + switch channel { + case "rfq_matches": + subType = "subscriptions" + default: + subType = "subscribe" + } + subCmd := websocketCommand{ + Type: subType, + Channels: []channelType{ + { + Name: channel, + ProductIDs: productIDs, + }, + }, + } + if !s.PublicOnly { + subCmd.Signature = &signature + subCmd.Key = &s.apiKey + subCmd.Passphrase = &s.passphrase + subCmd.Timestamp = &ts + } + subCmds = append(subCmds, subCmd) + } + for _, subCmd := range subCmds { + err := s.Conn.WriteJSON(subCmd) + if err != nil { + log.WithError(err).Errorf("subscription error: %v", subCmd) + } + } +} + +func (subCmd *websocketCommand) String() string { + jsonData, err := json.Marshal(subCmd) + if err != nil { + return "" + } + return string(jsonData) +} + +func (s *Stream) generateSignature() (string, string) { + // Convert current time to string timestamp + ts := strconv.FormatInt(time.Now().Unix(), 10) + + // Create message string + message := ts + "GET/users/self/verify" + + // Decode base64 secret + secretBytes, err := base64.StdEncoding.DecodeString(s.secretKey) + if err != nil { + log.WithError(err).Error("failed to decode secret key") + return "", "" + } + + // Create HMAC-SHA256 + mac := hmac.New(sha256.New, secretBytes) + mac.Write([]byte(message)) + + // Get signature and encode to base64 + signature := base64.StdEncoding.EncodeToString(mac.Sum(nil)) + + return signature, ts +} From 959c9b8cc18ea39b9bc334ab69ef6cb434ca182e Mon Sep 17 00:00:00 2001 From: dboyliao Date: Wed, 26 Feb 2025 17:31:57 +0800 Subject: [PATCH 2/4] =?UTF-8?q?=F0=9F=94=A7=20fix(coinbase):=20Update=20au?= =?UTF-8?q?thentication=20logic?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/exchange/coinbase/stream.go | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/pkg/exchange/coinbase/stream.go b/pkg/exchange/coinbase/stream.go index e032321b0..6cabfa836 100644 --- a/pkg/exchange/coinbase/stream.go +++ b/pkg/exchange/coinbase/stream.go @@ -5,7 +5,6 @@ import ( "crypto/hmac" "crypto/sha256" "encoding/base64" - "encoding/json" "strconv" "time" @@ -45,7 +44,6 @@ func NewStream( passphrase: passphrase, secretKey: secretKey, } - s.PublicOnly = (apiKey == "" || passphrase == "" || secretKey == "") s.SetParser(s.parseMessage) s.SetDispatcher(s.dispatchEvent) s.SetEndpointCreator(createEndpoint) @@ -131,6 +129,7 @@ func (s *Stream) handleConnect() { } subCmds := []websocketCommand{} signature, ts := s.generateSignature() + authEnabled := !s.PublicOnly && len(s.apiKey) > 0 && len(s.passphrase) > 0 && len(s.secretKey) > 0 for channel, productIDs := range subProductsMap { var subType string switch channel { @@ -148,7 +147,7 @@ func (s *Stream) handleConnect() { }, }, } - if !s.PublicOnly { + if authEnabled { subCmd.Signature = &signature subCmd.Key = &s.apiKey subCmd.Passphrase = &s.passphrase @@ -164,14 +163,6 @@ func (s *Stream) handleConnect() { } } -func (subCmd *websocketCommand) String() string { - jsonData, err := json.Marshal(subCmd) - if err != nil { - return "" - } - return string(jsonData) -} - func (s *Stream) generateSignature() (string, string) { // Convert current time to string timestamp ts := strconv.FormatInt(time.Now().Unix(), 10) From 58bfa74618fb7e522307a1c7edbb23f8ec332e5a Mon Sep 17 00:00:00 2001 From: dboyliao Date: Thu, 27 Feb 2025 09:35:30 +0800 Subject: [PATCH 3/4] =?UTF-8?q?=F0=9F=94=A7=20refactor(coinbase):=20remove?= =?UTF-8?q?=20redundent=20key=20check=20and=20refactor=20auth=20check=20in?= =?UTF-8?q?to=20a=20method?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/exchange/coinbase/stream.go | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/pkg/exchange/coinbase/stream.go b/pkg/exchange/coinbase/stream.go index 6cabfa836..9a97d3af2 100644 --- a/pkg/exchange/coinbase/stream.go +++ b/pkg/exchange/coinbase/stream.go @@ -116,20 +116,15 @@ func (s *Stream) handleConnect() { subProductsMap := make(map[string][]string) for _, sub := range s.Subscriptions { strChannel := string(sub.Channel) - if _, ok := subProductsMap[strChannel]; !ok { - subProductsMap[strChannel] = []string{} - } // "rfq_matches" allow empty symbol if sub.Channel != "rfq_matches" && len(sub.Symbol) == 0 { continue } - products := subProductsMap[strChannel] - products = append(products, sub.Symbol) + products := append(subProductsMap[strChannel], sub.Symbol) subProductsMap[strChannel] = products } subCmds := []websocketCommand{} signature, ts := s.generateSignature() - authEnabled := !s.PublicOnly && len(s.apiKey) > 0 && len(s.passphrase) > 0 && len(s.secretKey) > 0 for channel, productIDs := range subProductsMap { var subType string switch channel { @@ -147,7 +142,7 @@ func (s *Stream) handleConnect() { }, }, } - if authEnabled { + if s.authEnabled() { subCmd.Signature = &signature subCmd.Key = &s.apiKey subCmd.Passphrase = &s.passphrase @@ -163,6 +158,10 @@ func (s *Stream) handleConnect() { } } +func (s *Stream) authEnabled() bool { + return !s.PublicOnly && len(s.apiKey) > 0 && len(s.passphrase) > 0 && len(s.secretKey) > 0 +} + func (s *Stream) generateSignature() (string, string) { // Convert current time to string timestamp ts := strconv.FormatInt(time.Now().Unix(), 10) From 471e81b862b1a1bb9789cff4e928f6102b3e57d1 Mon Sep 17 00:00:00 2001 From: dboyliao Date: Thu, 27 Feb 2025 17:15:19 +0800 Subject: [PATCH 4/4] fix(coinbase): bugs fix and resolve review issues --- pkg/exchange/coinbase/stream.go | 19 +++++-------------- 1 file changed, 5 insertions(+), 14 deletions(-) diff --git a/pkg/exchange/coinbase/stream.go b/pkg/exchange/coinbase/stream.go index 9a97d3af2..75e9218de 100644 --- a/pkg/exchange/coinbase/stream.go +++ b/pkg/exchange/coinbase/stream.go @@ -64,24 +64,16 @@ func (s *Stream) dispatchEvent(e interface{}) { s.EmitRfqMessage(e) case *TickerMessage: s.EmitTickerMessage(e) - case *ReceivedLimitOrderMessage: - s.EmitReceivedLimitOrderMessage(e) - case *ReceivedMarketOrderMessage: - s.EmitReceivedMarketOrderMessage(e) + case *ReceivedMessage: + s.EmitReceivedMessage(e) case *OpenMessage: s.EmitOpenMessage(e) case *DoneMessage: s.EmitDoneMessage(e) case *MatchMessage: s.EmitMatchMessage(e) - case *AuthMakerMatchMessage: - s.EmitAuthMakerMatchMessage(e) - case *AuthTakerMatchMessage: - s.EmitAuthTakerMatchMessage(e) - case *StpChangeMessage: - s.EmitStpChangeMessage(e) - case *ModifyOrderChangeMessage: - s.EmitModifyOrderChangeMessage(e) + case *ChangeMessage: + s.EmitChangeMessage(e) case *ActiveMessage: s.EmitActiveMessage(e) default: @@ -120,8 +112,7 @@ func (s *Stream) handleConnect() { if sub.Channel != "rfq_matches" && len(sub.Symbol) == 0 { continue } - products := append(subProductsMap[strChannel], sub.Symbol) - subProductsMap[strChannel] = products + subProductsMap[strChannel] = append(subProductsMap[strChannel], sub.Symbol) } subCmds := []websocketCommand{} signature, ts := s.generateSignature()