Skip to content

Commit

Permalink
sync active orders and send metrics of order nums
Browse files Browse the repository at this point in the history
  • Loading branch information
kbearXD committed Mar 4, 2024
1 parent 5936cf3 commit 8e22473
Show file tree
Hide file tree
Showing 11 changed files with 261 additions and 166 deletions.
10 changes: 9 additions & 1 deletion pkg/exchange/util.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package exchange

import "github.com/c9s/bbgo/pkg/types"
import (
"github.com/c9s/bbgo/pkg/exchange/max"
"github.com/c9s/bbgo/pkg/types"
)

func GetSessionAttributes(exchange types.Exchange) (isMargin, isFutures, isIsolated bool, isolatedSymbol string) {
if marginExchange, ok := exchange.(types.MarginExchange); ok {
Expand All @@ -27,3 +30,8 @@ func GetSessionAttributes(exchange types.Exchange) (isMargin, isFutures, isIsola

return isMargin, isFutures, isIsolated, isolatedSymbol
}

func IsMaxExchange(exchange interface{}) bool {
_, res := exchange.(*max.Exchange)
return res
}
100 changes: 100 additions & 0 deletions pkg/strategy/common/sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package common

import (
"context"
"strconv"
"time"

"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/exchange"
maxapi "github.com/c9s/bbgo/pkg/exchange/max/maxapi"
"github.com/c9s/bbgo/pkg/exchange/retry"
"github.com/c9s/bbgo/pkg/types"
"github.com/sirupsen/logrus"
"go.uber.org/multierr"
)

func SyncActiveOrder(ctx context.Context, ex types.Exchange, orderQueryService types.ExchangeOrderQueryService, activeOrderBook *bbgo.ActiveOrderBook, orderID uint64, syncBefore time.Time) (isOrderUpdated bool, err error) {
isMax := exchange.IsMaxExchange(ex)

updatedOrder, err := retry.QueryOrderUntilSuccessful(ctx, orderQueryService, types.OrderQuery{
Symbol: activeOrderBook.Symbol,
OrderID: strconv.FormatUint(orderID, 10),
})

if err != nil {
return isOrderUpdated, err
}

// maxapi.OrderStateFinalizing does not mean the fee is calculated
// we should only consider order state done for MAX
if isMax && updatedOrder.OriginalStatus != string(maxapi.OrderStateDone) {
return isOrderUpdated, nil
}

// should only trigger order update when the updated time is old enough
isOrderUpdated = updatedOrder.UpdateTime.Before(syncBefore)
if isOrderUpdated {
activeOrderBook.Update(*updatedOrder)
}

return isOrderUpdated, nil
}

type SyncActiveOrdersOpts struct {
Logger *logrus.Entry
Exchange types.Exchange
OrderQueryService types.ExchangeOrderQueryService
ActiveOrderBook *bbgo.ActiveOrderBook
OpenOrders []types.Order
}

func SyncActiveOrders(ctx context.Context, opts SyncActiveOrdersOpts) error {
opts.Logger.Infof("[ActiveOrderRecover] syncActiveOrders")

// only sync orders which is updated over 3 min, because we may receive from websocket and handle it twice
syncBefore := time.Now().Add(-3 * time.Minute)

activeOrders := opts.ActiveOrderBook.Orders()

openOrdersMap := make(map[uint64]types.Order)
for _, openOrder := range opts.OpenOrders {
openOrdersMap[openOrder.OrderID] = openOrder
}

var errs error
// update active orders not in open orders
for _, activeOrder := range activeOrders {
if _, exist := openOrdersMap[activeOrder.OrderID]; exist {
// no need to sync active order already in active orderbook, because we only need to know if it filled or not.
delete(openOrdersMap, activeOrder.OrderID)
} else {
opts.Logger.Infof("[ActiveOrderRecover] found active order #%d is not in the open orders, updating...", activeOrder.OrderID)

isActiveOrderBookUpdated, err := SyncActiveOrder(ctx, opts.Exchange, opts.OrderQueryService, opts.ActiveOrderBook, activeOrder.OrderID, syncBefore)
if err != nil {
opts.Logger.WithError(err).Errorf("[ActiveOrderRecover] unable to query order #%d", activeOrder.OrderID)
errs = multierr.Append(errs, err)
continue
}

if !isActiveOrderBookUpdated {
opts.Logger.Infof("[ActiveOrderRecover] active order #%d is updated in 3 min, skip updating...", activeOrder.OrderID)
}
}
}

// update open orders not in active orders
for _, openOrder := range openOrdersMap {
opts.Logger.Infof("found open order #%d is not in active orderbook, updating...", openOrder.OrderID)
// we don't add open orders into active orderbook if updated in 3 min, because we may receive message from websocket and add it twice.
if openOrder.UpdateTime.After(syncBefore) {
opts.Logger.Infof("open order #%d is updated in 3 min, skip updating...", openOrder.OrderID)
continue
}

opts.ActiveOrderBook.Add(openOrder)
}

return errs
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package grid2
package common

import (
"context"
Expand All @@ -10,7 +10,7 @@ import (
"github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/types/mocks"
"github.com/golang/mock/gomock"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
)

Expand All @@ -23,34 +23,30 @@ func TestSyncActiveOrders(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

log := logrus.WithField("strategy", "test")
symbol := "ETHUSDT"
labels := prometheus.Labels{
"exchange": "default",
"symbol": symbol,
}
t.Run("all open orders are match with active orderbook", func(t *testing.T) {
mockOrderQueryService := mocks.NewMockExchangeOrderQueryService(mockCtrl)
mockExchange := mocks.NewMockExchange(mockCtrl)
activeOrderbook := bbgo.NewActiveOrderBook(symbol)

opts := SyncActiveOrdersOpts{
logger: log,
metricsLabels: labels,
activeOrderBook: activeOrderbook,
orderQueryService: mockOrderQueryService,
exchange: mockExchange,
}

order := types.Order{
OrderID: 1,
Status: types.OrderStatusNew,
}
order.Symbol = symbol

opts := SyncActiveOrdersOpts{
Logger: log,
Exchange: mockExchange,
OrderQueryService: mockOrderQueryService,
ActiveOrderBook: activeOrderbook,
OpenOrders: []types.Order{order},
}

activeOrderbook.Add(order)
mockExchange.EXPECT().QueryOpenOrders(ctx, symbol).Return([]types.Order{order}, nil)

assert.NoError(syncActiveOrders(ctx, opts))
assert.NoError(SyncActiveOrders(ctx, opts))

// verify active orderbook
activeOrders := activeOrderbook.Orders()
Expand All @@ -64,14 +60,6 @@ func TestSyncActiveOrders(t *testing.T) {
mockExchange := mocks.NewMockExchange(mockCtrl)
activeOrderbook := bbgo.NewActiveOrderBook(symbol)

opts := SyncActiveOrdersOpts{
logger: log,
metricsLabels: labels,
activeOrderBook: activeOrderbook,
orderQueryService: mockOrderQueryService,
exchange: mockExchange,
}

order := types.Order{
OrderID: 1,
Status: types.OrderStatusNew,
Expand All @@ -82,14 +70,21 @@ func TestSyncActiveOrders(t *testing.T) {
updatedOrder := order
updatedOrder.Status = types.OrderStatusFilled

opts := SyncActiveOrdersOpts{
Logger: log,
ActiveOrderBook: activeOrderbook,
OrderQueryService: mockOrderQueryService,
Exchange: mockExchange,
OpenOrders: nil,
}

activeOrderbook.Add(order)
mockExchange.EXPECT().QueryOpenOrders(ctx, symbol).Return(nil, nil)
mockOrderQueryService.EXPECT().QueryOrder(ctx, types.OrderQuery{
Symbol: symbol,
OrderID: strconv.FormatUint(order.OrderID, 10),
}).Return(&updatedOrder, nil)

assert.NoError(syncActiveOrders(ctx, opts))
assert.NoError(SyncActiveOrders(ctx, opts))

// verify active orderbook
activeOrders := activeOrderbook.Orders()
Expand All @@ -101,14 +96,6 @@ func TestSyncActiveOrders(t *testing.T) {
mockExchange := mocks.NewMockExchange(mockCtrl)
activeOrderbook := bbgo.NewActiveOrderBook(symbol)

opts := SyncActiveOrdersOpts{
logger: log,
metricsLabels: labels,
activeOrderBook: activeOrderbook,
orderQueryService: mockOrderQueryService,
exchange: mockExchange,
}

order := types.Order{
OrderID: 1,
Status: types.OrderStatusNew,
Expand All @@ -118,8 +105,14 @@ func TestSyncActiveOrders(t *testing.T) {
CreationTime: types.Time(time.Now()),
}

mockExchange.EXPECT().QueryOpenOrders(ctx, symbol).Return([]types.Order{order}, nil)
assert.NoError(syncActiveOrders(ctx, opts))
opts := SyncActiveOrdersOpts{
Logger: log,
ActiveOrderBook: activeOrderbook,
OrderQueryService: mockOrderQueryService,
Exchange: mockExchange,
OpenOrders: []types.Order{order},
}
assert.NoError(SyncActiveOrders(ctx, opts))

// verify active orderbook
activeOrders := activeOrderbook.Orders()
Expand All @@ -133,14 +126,6 @@ func TestSyncActiveOrders(t *testing.T) {
mockExchange := mocks.NewMockExchange(mockCtrl)
activeOrderbook := bbgo.NewActiveOrderBook(symbol)

opts := SyncActiveOrdersOpts{
logger: log,
metricsLabels: labels,
activeOrderBook: activeOrderbook,
orderQueryService: mockOrderQueryService,
exchange: mockExchange,
}

order1 := types.Order{
OrderID: 1,
Status: types.OrderStatusNew,
Expand All @@ -158,14 +143,21 @@ func TestSyncActiveOrders(t *testing.T) {
},
}

opts := SyncActiveOrdersOpts{
Logger: log,
ActiveOrderBook: activeOrderbook,
OrderQueryService: mockOrderQueryService,
Exchange: mockExchange,
OpenOrders: []types.Order{order2},
}

activeOrderbook.Add(order1)
mockExchange.EXPECT().QueryOpenOrders(ctx, symbol).Return([]types.Order{order2}, nil)
mockOrderQueryService.EXPECT().QueryOrder(ctx, types.OrderQuery{
Symbol: symbol,
OrderID: strconv.FormatUint(order1.OrderID, 10),
}).Return(&updatedOrder1, nil)

assert.NoError(syncActiveOrders(ctx, opts))
assert.NoError(SyncActiveOrders(ctx, opts))

// verify active orderbook
activeOrders := activeOrderbook.Orders()
Expand Down
57 changes: 57 additions & 0 deletions pkg/strategy/dca2/active_order_recover.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package dca2

import (
"context"
"time"

"github.com/c9s/bbgo/pkg/exchange/retry"
"github.com/c9s/bbgo/pkg/strategy/common"
"github.com/c9s/bbgo/pkg/util"
)

func (s *Strategy) recoverPeriodically(ctx context.Context) {
s.logger.Info("[DCA] monitor and recover periodically")
interval := util.MillisecondsJitter(10*time.Minute, 5*60*1000)
ticker := time.NewTicker(interval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := s.recoverActiveOrders(ctx); err != nil {
s.logger.WithError(err).Warn(err, "failed to recover active orders")
}
}
}
}

func (s *Strategy) recoverActiveOrders(ctx context.Context) error {
openOrders, err := retry.QueryOpenOrdersUntilSuccessfulLite(ctx, s.ExchangeSession.Exchange, s.Symbol)
if err != nil {
s.logger.WithError(err).Warn("failed to query open orders")
return err
}

activeOrders := s.OrderExecutor.ActiveMakerOrders().Orders()

// update num of open orders metrics
if metricsNumOfOpenOrders != nil {
metricsNumOfOpenOrders.With(baseLabels).Set(float64(len(openOrders)))
}

// update num of active orders metrics
if metricsNumOfActiveOrders != nil {
metricsNumOfActiveOrders.With(baseLabels).Set(float64(len(activeOrders)))
}

opts := common.SyncActiveOrdersOpts{
Logger: s.logger,
Exchange: s.ExchangeSession.Exchange,
ActiveOrderBook: s.OrderExecutor.ActiveMakerOrders(),
OpenOrders: openOrders,
}

return common.SyncActiveOrders(ctx, opts)
}
2 changes: 1 addition & 1 deletion pkg/strategy/dca2/open_position.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type cancelOrdersByGroupIDApi interface {

func (s *Strategy) placeOpenPositionOrders(ctx context.Context) error {
s.logger.Infof("[DCA] start placing open position orders")
price, err := getBestPriceUntilSuccess(ctx, s.Session.Exchange, s.Symbol)
price, err := getBestPriceUntilSuccess(ctx, s.ExchangeSession.Exchange, s.Symbol)
if err != nil {
return err
}
Expand Down
9 changes: 4 additions & 5 deletions pkg/strategy/dca2/recover.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"time"

"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
)

Expand All @@ -25,9 +24,9 @@ type RecoverApiQueryService interface {

func (s *Strategy) recover(ctx context.Context) error {
s.logger.Info("[DCA] recover")
queryService, ok := s.Session.Exchange.(RecoverApiQueryService)
queryService, ok := s.ExchangeSession.Exchange.(RecoverApiQueryService)
if !ok {
return fmt.Errorf("[DCA] exchange %s doesn't support queryAPI interface", s.Session.ExchangeName)
return fmt.Errorf("[DCA] exchange %s doesn't support queryAPI interface", s.ExchangeSession.ExchangeName)
}

openOrders, err := queryService.QueryOpenOrders(ctx, s.Symbol)
Expand Down Expand Up @@ -63,7 +62,7 @@ func (s *Strategy) recover(ctx context.Context) error {
s.startTimeOfNextRound = startTimeOfNextRound

// recover state
state, err := recoverState(ctx, s.ProfitStats.QuoteInvestment, int(s.MaxOrderCount), currentRound, s.OrderExecutor)
state, err := recoverState(ctx, int(s.MaxOrderCount), currentRound, s.OrderExecutor)
if err != nil {
return err
}
Expand All @@ -74,7 +73,7 @@ func (s *Strategy) recover(ctx context.Context) error {
}

// recover state
func recoverState(ctx context.Context, quoteInvestment fixedpoint.Value, maxOrderCount int, currentRound Round, orderExecutor *bbgo.GeneralOrderExecutor) (State, error) {
func recoverState(ctx context.Context, maxOrderCount int, currentRound Round, orderExecutor *bbgo.GeneralOrderExecutor) (State, error) {
activeOrderBook := orderExecutor.ActiveMakerOrders()
orderStore := orderExecutor.OrderStore()

Expand Down
Loading

0 comments on commit 8e22473

Please sign in to comment.