Skip to content

Commit

Permalink
Merge pull request #1688 from c9s/c9s/xdepthmaker/separate-hedge-symbol
Browse files Browse the repository at this point in the history
FEATURE: [xdepthmaker] separate hedge symbol
  • Loading branch information
c9s authored Aug 7, 2024
2 parents 9e28898 + 813684f commit eb6e5cd
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 31 deletions.
35 changes: 33 additions & 2 deletions pkg/core/tradecollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ import (
"github.com/c9s/bbgo/pkg/types"
)

type TradeConverter interface {
Convert(trade types.Trade) (types.Trade, error)
}

//go:generate callbackgen -type TradeCollector
type TradeCollector struct {
Symbol string
Expand All @@ -25,6 +29,8 @@ type TradeCollector struct {

mu sync.Mutex

tradeConverters []TradeConverter

recoverCallbacks []func(trade types.Trade)

tradeCallbacks []func(trade types.Trade, profit, netProfit fixedpoint.Value)
Expand All @@ -49,6 +55,28 @@ func NewTradeCollector(symbol string, position *types.Position, orderStore *Orde
}
}

func (c *TradeCollector) AddTradeConverter(converter TradeConverter) {
c.tradeConverters = append(c.tradeConverters, converter)
}

func (c *TradeCollector) convertTrade(trade types.Trade) types.Trade {
if len(c.tradeConverters) == 0 {
return trade
}

for _, converter := range c.tradeConverters {
convTrade, err := converter.Convert(trade)
if err != nil {
logrus.WithError(err).Errorf("trade %+v converter error, trade: %s", converter, trade.String())
continue
}

trade = convTrade
}

return trade
}

// OrderStore returns the order store used by the trade collector
func (c *TradeCollector) OrderStore() *OrderStore {
return c.orderStore
Expand Down Expand Up @@ -116,6 +144,8 @@ func (c *TradeCollector) Recover(
}

func (c *TradeCollector) RecoverTrade(td types.Trade) bool {
td = c.convertTrade(td)

logrus.Debugf("checking trade: %s", td.String())
if c.processTrade(td) {
logrus.Infof("recovered trade: %s", td.String())
Expand Down Expand Up @@ -230,7 +260,7 @@ func (c *TradeCollector) processTrade(trade types.Trade) bool {
// return true when the given trade is added
// return false when the given trade is not added
func (c *TradeCollector) ProcessTrade(trade types.Trade) bool {
return c.processTrade(trade)
return c.processTrade(c.convertTrade(trade))
}

// Run is a goroutine executed in the background
Expand All @@ -249,7 +279,8 @@ func (c *TradeCollector) Run(ctx context.Context) {
c.Process()

case trade := <-c.tradeC:
c.processTrade(trade)
c.processTrade(c.convertTrade(trade))

}
}
}
2 changes: 1 addition & 1 deletion pkg/exchange/max/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ func convertWithdrawStatusV3(status max.WithdrawStatus) types.WithdrawStatus {
return types.WithdrawStatus(status)
}

func convertWithdrawStatus(state max.WithdrawState) types.WithdrawStatus {
func convertWithdrawStatusV2(state max.WithdrawState) types.WithdrawStatus {
switch state {

case max.WithdrawStateSent, max.WithdrawStateSubmitting, max.WithdrawStatePending, "accepted", "approved":
Expand Down
3 changes: 1 addition & 2 deletions pkg/exchange/max/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -865,8 +865,7 @@ func (e *Exchange) QueryWithdrawHistory(
continue
}

// we can convert this later
status := convertWithdrawStatusV3(d.Status)
status := convertWithdrawStatusV2(d.State)

txIDs[d.TxID] = struct{}{}
withdraw := types.Withdraw{
Expand Down
2 changes: 1 addition & 1 deletion pkg/exchange/max/maxapi/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ type Withdraw struct {
// "sygna_verifying"
State WithdrawState `json:"state"`

Status WithdrawStatus `json:"status,omitempty"`
// Status WithdrawStatus `json:"status,omitempty"`

CreatedAt types.MillisecondTimestamp `json:"created_at"`
UpdatedAt types.MillisecondTimestamp `json:"updated_at"`
Expand Down
58 changes: 33 additions & 25 deletions pkg/strategy/xdepthmaker/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ type CrossExchangeMarketMakingStrategy struct {
func (s *CrossExchangeMarketMakingStrategy) Initialize(
ctx context.Context, environ *bbgo.Environment,
makerSession, hedgeSession *bbgo.ExchangeSession,
symbol, strategyID, instanceID string,
symbol, hedgeSymbol,
strategyID, instanceID string,
) error {
s.parent = ctx
s.ctx, s.cancel = context.WithCancel(ctx)
Expand All @@ -67,9 +68,9 @@ func (s *CrossExchangeMarketMakingStrategy) Initialize(
s.hedgeSession = hedgeSession

var ok bool
s.hedgeMarket, ok = s.hedgeSession.Market(symbol)
s.hedgeMarket, ok = s.hedgeSession.Market(hedgeSymbol)
if !ok {
return fmt.Errorf("source session market %s is not defined", symbol)
return fmt.Errorf("hedge session market %s is not defined", hedgeSymbol)
}

s.makerMarket, ok = s.makerSession.Market(symbol)
Expand Down Expand Up @@ -150,14 +151,19 @@ type Strategy struct {

Symbol string `json:"symbol"`

// HedgeExchange session name
HedgeExchange string `json:"hedgeExchange"`
// HedgeSymbol is the symbol for the hedge exchange
// symbol could be different from the maker exchange
HedgeSymbol string `json:"hedgeSymbol"`

// MakerExchange session name
MakerExchange string `json:"makerExchange"`

// HedgeExchange session name
HedgeExchange string `json:"hedgeExchange"`

UpdateInterval types.Duration `json:"updateInterval"`
HedgeInterval types.Duration `json:"hedgeInterval"`

HedgeInterval types.Duration `json:"hedgeInterval"`

FullReplenishInterval types.Duration `json:"fullReplenishInterval"`

Expand Down Expand Up @@ -239,12 +245,12 @@ func (s *Strategy) CrossSubscribe(sessions map[string]*bbgo.ExchangeSession) {
panic(err)
}

hedgeSession.Subscribe(types.BookChannel, s.Symbol, types.SubscribeOptions{
hedgeSession.Subscribe(types.BookChannel, s.HedgeSymbol, types.SubscribeOptions{
Depth: types.DepthLevelMedium,
Speed: types.SpeedLow,
})

hedgeSession.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: "1m"})
hedgeSession.Subscribe(types.KLineChannel, s.HedgeSymbol, types.SubscribeOptions{Interval: "1m"})
makerSession.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: "1m"})
}

Expand Down Expand Up @@ -281,6 +287,10 @@ func (s *Strategy) Defaults() error {
s.HedgeInterval = types.Duration(3 * time.Second)
}

if s.HedgeSymbol == "" {
s.HedgeSymbol = s.Symbol
}

if s.NumLayers == 0 {
s.NumLayers = 1
}
Expand Down Expand Up @@ -358,13 +368,13 @@ func (s *Strategy) CrossRun(

if err := s.CrossExchangeMarketMakingStrategy.Initialize(ctx,
s.Environment,
makerSession,
hedgeSession,
s.Symbol, ID, s.InstanceID()); err != nil {
makerSession, hedgeSession,
s.Symbol, s.HedgeSymbol,
ID, s.InstanceID()); err != nil {
return err
}

s.pricingBook = types.NewStreamBook(s.Symbol)
s.pricingBook = types.NewStreamBook(s.HedgeSymbol)
s.pricingBook.BindStream(s.hedgeSession.MarketDataStream)

s.stopC = make(chan struct{})
Expand Down Expand Up @@ -488,7 +498,7 @@ func (s *Strategy) CrossRun(
}

if err := s.HedgeOrderExecutor.GracefulCancel(ctx); err != nil {
log.WithError(err).Errorf("graceful cancel %s order error", s.Symbol)
log.WithError(err).Errorf("graceful cancel %s order error", s.HedgeSymbol)
}

bbgo.Sync(ctx, s)
Expand Down Expand Up @@ -576,12 +586,12 @@ func (s *Strategy) Hedge(ctx context.Context, pos fixedpoint.Value) {
s.hedgeErrorRateReservation = nil
}

log.Infof("submitting %s hedge order %s %v", s.Symbol, side.String(), quantity)
bbgo.Notify("Submitting %s hedge order %s %v", s.Symbol, side.String(), quantity)
log.Infof("submitting %s hedge order %s %v", s.HedgeSymbol, side.String(), quantity)
bbgo.Notify("Submitting %s hedge order %s %v", s.HedgeSymbol, side.String(), quantity)

_, err := s.HedgeOrderExecutor.SubmitOrders(ctx, types.SubmitOrder{
Market: s.hedgeMarket,
Symbol: s.Symbol,
Symbol: s.hedgeMarket.Symbol,
Type: types.OrderTypeMarket,
Side: side,
Quantity: quantity,
Expand Down Expand Up @@ -627,7 +637,7 @@ func (s *Strategy) runTradeRecover(ctx context.Context) {

startTime := time.Now().Add(-tradeScanInterval).Add(-tradeScanOverlapBufferPeriod)

if err := s.HedgeOrderExecutor.TradeCollector().Recover(ctx, s.hedgeSession.Exchange.(types.ExchangeTradeHistoryService), s.Symbol, startTime); err != nil {
if err := s.HedgeOrderExecutor.TradeCollector().Recover(ctx, s.hedgeSession.Exchange.(types.ExchangeTradeHistoryService), s.HedgeSymbol, startTime); err != nil {
log.WithError(err).Errorf("query trades error")
}

Expand All @@ -639,7 +649,9 @@ func (s *Strategy) runTradeRecover(ctx context.Context) {
}

func (s *Strategy) generateMakerOrders(
pricingBook *types.StreamOrderBook, maxLayer int, availableBase fixedpoint.Value, availableQuote fixedpoint.Value,
pricingBook *types.StreamOrderBook,
maxLayer int,
availableBase, availableQuote fixedpoint.Value,
) ([]types.SubmitOrder, error) {
_, _, hasPrice := pricingBook.BestBidAndAsk()
if !hasPrice {
Expand Down Expand Up @@ -776,7 +788,7 @@ func (s *Strategy) generateMakerOrders(
}

submitOrders = append(submitOrders, types.SubmitOrder{
Symbol: s.Symbol,
Symbol: s.makerMarket.Symbol,
Type: types.OrderTypeLimitMaker,
Market: s.makerMarket,
Side: side,
Expand Down Expand Up @@ -829,7 +841,7 @@ func (s *Strategy) updateQuote(ctx context.Context, maxLayer int) {

bestBidPrice := bestBid.Price
bestAskPrice := bestAsk.Price
log.Infof("%s book ticker: best ask / best bid = %v / %v", s.Symbol, bestAskPrice, bestBidPrice)
log.Infof("%s book ticker: best ask / best bid = %v / %v", s.HedgeSymbol, bestAskPrice, bestBidPrice)

s.lastPrice = bestBidPrice.Add(bestAskPrice).Div(Two)

Expand Down Expand Up @@ -898,11 +910,7 @@ func (s *Strategy) cleanUpOpenOrders(ctx context.Context, session *bbgo.Exchange
log.Infof("found existing open orders:")
types.OrderSlice(openOrders).Print()

if err := session.Exchange.CancelOrders(ctx, openOrders...); err != nil {
return err
}

return nil
return session.Exchange.CancelOrders(ctx, openOrders...)
}

func selectSessions2(
Expand Down

0 comments on commit eb6e5cd

Please sign in to comment.