Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEATURE: [exchange] Coinbase Websocket connection #1916

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 149 additions & 1 deletion pkg/exchange/coinbase/stream.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,24 @@
package coinbase

import (
"context"
"crypto/hmac"
"crypto/sha256"
"encoding/base64"
"strconv"
"time"

"github.com/c9s/bbgo/pkg/types"
)

const wsFeedURL = "wss://ws-feed.exchange.coinbase.com"

//go:generate callbackgen -type Stream
type Stream struct {
types.StandardStream
apiKey string
passphrase string
secretKey string

// callbacks
statusMessageCallbacks []func(m *StatusMessage)
Expand All @@ -21,10 +33,146 @@ type Stream struct {
activeMessageCallbacks []func(m *ActiveMessage)
}

func NewStream() *Stream {
func NewStream(
apiKey string,
passphrase string,
secretKey string,
) *Stream {
s := Stream{
StandardStream: types.NewStandardStream(),
apiKey: apiKey,
passphrase: passphrase,
secretKey: secretKey,
}
s.SetParser(s.parseMessage)
s.SetDispatcher(s.dispatchEvent)
s.SetEndpointCreator(createEndpoint)

// public handlers
s.OnConnect(s.handleConnect)

return &s
}

func (s *Stream) dispatchEvent(e interface{}) {
switch e := e.(type) {
case *StatusMessage:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you use pointer here, your parser should return pointer address

Copy link
Collaborator

@bailantaotao bailantaotao Feb 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

func (s *Stream) parseMessage(data []byte) (msg interface{}, err error) {
	var baseMsg messageBaseType
	err = json.Unmarshal(data, &baseMsg)
	if err != nil {
		return
	}

	switch baseMsg.Type {
	case "heartbeat":
		var heartbeatMsg HeartbeatMessage
		err = json.Unmarshal(data, &heartbeatMsg)
                 return &heartbeatMsg, err

Copy link
Collaborator Author

@dboyliao dboyliao Feb 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I notice that this morning.
It's fixed in PR #1913 already.

s.EmitStatusMessage(e)
case *AuctionMessage:
s.EmitAuctionMessage(e)
case *RfqMessage:
s.EmitRfqMessage(e)
case *TickerMessage:
s.EmitTickerMessage(e)
case *ReceivedMessage:
s.EmitReceivedMessage(e)
case *OpenMessage:
s.EmitOpenMessage(e)
case *DoneMessage:
s.EmitDoneMessage(e)
case *MatchMessage:
s.EmitMatchMessage(e)
case *ChangeMessage:
s.EmitChangeMessage(e)
case *ActiveMessage:
s.EmitActiveMessage(e)
default:
log.Warnf("skip dispatching msg due to unknown message type: %T", e)
}
}

func createEndpoint(ctx context.Context) (string, error) {
return wsFeedURL, nil
}

type channelType struct {
Name string `json:"name"`
ProductIDs []string `json:"product_ids,omitempty"`
}

type websocketCommand struct {
Type string `json:"type"`
Channels []channelType `json:"channels"`
Signature *string `json:"signature,omitempty"`
Key *string `json:"key,omitempty"`
Passphrase *string `json:"passphrase,omitempty"`
Timestamp *string `json:"timestamp,omitempty"`
}

func (s *Stream) handleConnect() {
// subscribe to channels
if len(s.Subscriptions) == 0 {
return
}

subProductsMap := make(map[string][]string)
for _, sub := range s.Subscriptions {
strChannel := string(sub.Channel)
// "rfq_matches" allow empty symbol
if sub.Channel != "rfq_matches" && len(sub.Symbol) == 0 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

define rfq_matches as constant on global

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the meaning of rfa_matchers?

continue
}
subProductsMap[strChannel] = append(subProductsMap[strChannel], sub.Symbol)
}
subCmds := []websocketCommand{}
signature, ts := s.generateSignature()
for channel, productIDs := range subProductsMap {
var subType string
switch channel {
case "rfq_matches":
subType = "subscriptions"
default:
subType = "subscribe"
}
subCmd := websocketCommand{
Type: subType,
Channels: []channelType{
{
Name: channel,
ProductIDs: productIDs,
},
},
}
if s.authEnabled() {
subCmd.Signature = &signature
subCmd.Key = &s.apiKey
subCmd.Passphrase = &s.passphrase
subCmd.Timestamp = &ts
}
subCmds = append(subCmds, subCmd)
}
for _, subCmd := range subCmds {
err := s.Conn.WriteJSON(subCmd)
if err != nil {
log.WithError(err).Errorf("subscription error: %v", subCmd)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we return on error?

}
}
Comment on lines +144 to +149
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The subscription may be rejected if we exceed the limits.
I think the program should keep on running so I simply add a warning log here.

}

func (s *Stream) authEnabled() bool {
return !s.PublicOnly && len(s.apiKey) > 0 && len(s.passphrase) > 0 && len(s.secretKey) > 0
}

func (s *Stream) generateSignature() (string, string) {
// Convert current time to string timestamp
ts := strconv.FormatInt(time.Now().Unix(), 10)

// Create message string
message := ts + "GET/users/self/verify"

// Decode base64 secret
secretBytes, err := base64.StdEncoding.DecodeString(s.secretKey)
if err != nil {
log.WithError(err).Error("failed to decode secret key")
return "", ""
}

// Create HMAC-SHA256
mac := hmac.New(sha256.New, secretBytes)
mac.Write([]byte(message))

// Get signature and encode to base64
signature := base64.StdEncoding.EncodeToString(mac.Sum(nil))

return signature, ts
}
Loading