Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dca2: add more log and retry #1576

Merged
merged 1 commit into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions pkg/exchange/retry/order.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"strconv"
"time"

"github.com/cenkalti/backoff/v4"

Expand Down Expand Up @@ -119,6 +120,54 @@ func QueryOpenOrdersUntilSuccessfulLite(
return openOrders, err
}

func QueryClosedOrdersUntilSuccessful(
ctx context.Context, ex types.ExchangeTradeHistoryService, symbol string, since, until time.Time, lastOrderID uint64,
) (closedOrders []types.Order, err error) {
var op = func() (err2 error) {
closedOrders, err2 = ex.QueryClosedOrders(ctx, symbol, since, until, lastOrderID)
return err2
}

err = GeneralBackoff(ctx, op)
return closedOrders, err
}

func QueryClosedOrdersUntilSuccessfulLite(
ctx context.Context, ex types.ExchangeTradeHistoryService, symbol string, since, until time.Time, lastOrderID uint64,
) (closedOrders []types.Order, err error) {
var op = func() (err2 error) {
closedOrders, err2 = ex.QueryClosedOrders(ctx, symbol, since, until, lastOrderID)
return err2
}

err = GeneralLiteBackoff(ctx, op)
return closedOrders, err
}

func QueryOrderTradesUntilSuccessful(
ctx context.Context, ex types.ExchangeOrderQueryService, q types.OrderQuery,
) (trades []types.Trade, err error) {
var op = func() (err2 error) {
trades, err2 = ex.QueryOrderTrades(ctx, q)
return err2
}

err = GeneralBackoff(ctx, op)
return trades, err
}

func QueryOrderTradesUntilSuccessfulLite(
ctx context.Context, ex types.ExchangeOrderQueryService, q types.OrderQuery,
) (trades []types.Trade, err error) {
var op = func() (err2 error) {
trades, err2 = ex.QueryOrderTrades(ctx, q)
return err2
}

err = GeneralLiteBackoff(ctx, op)
return trades, err
}

func QueryAccountUntilSuccessful(
ctx context.Context, ex types.ExchangeAccountService,
) (account *types.Account, err error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/strategy/dca2/active_order_recover.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

func (s *Strategy) recoverPeriodically(ctx context.Context) {
s.logger.Info("[DCA] monitor and recover periodically")
s.logger.Info("monitor and recover periodically")
interval := util.MillisecondsJitter(10*time.Minute, 5*60*1000)
ticker := time.NewTicker(interval)
defer ticker.Stop()
Expand Down
2 changes: 1 addition & 1 deletion pkg/strategy/dca2/open_position.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type cancelOrdersByGroupIDApi interface {
}

func (s *Strategy) placeOpenPositionOrders(ctx context.Context) error {
s.logger.Infof("[DCA] start placing open position orders")
s.logger.Infof("start placing open position orders")
price, err := getBestPriceUntilSuccess(ctx, s.ExchangeSession.Exchange, s.Symbol)
if err != nil {
return err
Expand Down
55 changes: 36 additions & 19 deletions pkg/strategy/dca2/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,15 +203,15 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo.

// order executor
s.OrderExecutor.TradeCollector().OnPositionUpdate(func(position *types.Position) {
s.logger.Infof("[DCA] POSITION UPDATE: %s", s.Position.String())
s.logger.Infof("POSITION UPDATE: %s", s.Position.String())
bbgo.Sync(ctx, s)

// update take profit price here
s.updateTakeProfitPrice()
})

s.OrderExecutor.ActiveMakerOrders().OnFilled(func(o types.Order) {
s.logger.Infof("[DCA] FILLED ORDER: %s", o.String())
s.logger.Infof("FILLED ORDER: %s", o.String())
openPositionSide := types.SideTypeBuy
takeProfitSide := types.SideTypeSell

Expand All @@ -221,7 +221,7 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo.
case takeProfitSide:
s.emitNextState(WaitToOpenPosition)
default:
s.logger.Infof("[DCA] unsupported side (%s) of order: %s", o.Side, o)
s.logger.Infof("unsupported side (%s) of order: %s", o.Side, o)
}

// update metrics when filled
Expand All @@ -244,7 +244,7 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo.
})

session.UserDataStream.OnAuth(func() {
s.logger.Info("[DCA] user data stream authenticated")
s.logger.Info("user data stream authenticated")
time.AfterFunc(3*time.Second, func() {
if isInitialize := s.initializeNextStateC(); !isInitialize {

Expand All @@ -255,16 +255,29 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo.
s.updateState(WaitToOpenPosition)
} else {
// recover
if err := s.recover(ctx); err != nil {
s.logger.WithError(err).Error("[DCA] something wrong when state recovering")
return
maxTry := 3
for try := 1; try <= maxTry; try++ {
s.logger.Infof("try #%d recover", try)

err := s.recover(ctx)
if err == nil {
s.logger.Infof("recover successfully at #%d", try)
break
}

s.logger.WithError(err).Warnf("failed to recover at #%d", try)

if try == 3 {
s.logger.Errorf("failed to recover after %d trying, please check it", maxTry)
return
}
}
}

s.logger.Infof("[DCA] state: %d", s.state)
s.logger.Infof("[DCA] position %s", s.Position.String())
s.logger.Infof("[DCA] profit stats %s", s.ProfitStats.String())
s.logger.Infof("[DCA] startTimeOfNextRound %s", s.startTimeOfNextRound)
s.logger.Infof("state: %d", s.state)
s.logger.Infof("position %s", s.Position.String())
s.logger.Infof("profit stats %s", s.ProfitStats.String())
s.logger.Infof("startTimeOfNextRound %s", s.startTimeOfNextRound)

s.updateTakeProfitPrice()

Expand Down Expand Up @@ -299,17 +312,17 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo.
func (s *Strategy) updateTakeProfitPrice() {
takeProfitRatio := s.TakeProfitRatio
s.takeProfitPrice = s.Market.TruncatePrice(s.Position.AverageCost.Mul(fixedpoint.One.Add(takeProfitRatio)))
s.logger.Infof("[DCA] cost: %s, ratio: %s, price: %s", s.Position.AverageCost, takeProfitRatio, s.takeProfitPrice)
s.logger.Infof("cost: %s, ratio: %s, price: %s", s.Position.AverageCost.String(), takeProfitRatio.String(), s.takeProfitPrice.String())
}

func (s *Strategy) Close(ctx context.Context) error {
s.logger.Infof("[DCA] closing %s dca2", s.Symbol)
s.logger.Infof("closing %s dca2", s.Symbol)

defer s.EmitClosed()

err := s.OrderExecutor.GracefulCancel(ctx)
if err != nil {
s.logger.WithError(err).Errorf("[DCA] there are errors when cancelling orders at close")
s.logger.WithError(err).Errorf("there are errors when cancelling orders at close")
}

bbgo.Sync(ctx, s)
Expand Down Expand Up @@ -370,10 +383,12 @@ func (s *Strategy) CalculateAndEmitProfit(ctx context.Context) error {

// TODO: pagination for it
// query the orders
orders, err := historyService.QueryClosedOrders(ctx, s.Symbol, time.Time{}, time.Time{}, s.ProfitStats.FromOrderID)
s.logger.Infof("query %s closed orders from order id #%d", s.Symbol, s.ProfitStats.FromOrderID)
orders, err := retry.QueryClosedOrdersUntilSuccessfulLite(ctx, historyService, s.Symbol, time.Time{}, time.Time{}, s.ProfitStats.FromOrderID)
if err != nil {
return err
}
s.logger.Infof("there are %d closed orders from order id #%d", len(orders), s.ProfitStats.FromOrderID)

var rounds []Round
var round Round
Expand All @@ -398,18 +413,20 @@ func (s *Strategy) CalculateAndEmitProfit(ctx context.Context) error {
}
}

s.logger.Infof("there are %d rounds from order id #%d", len(rounds), s.ProfitStats.FromOrderID)
for _, round := range rounds {
debugRoundOrders(s.logger, "calculate", round)
var roundOrders []types.Order = round.OpenPositionOrders
roundOrders = append(roundOrders, round.TakeProfitOrder)
for _, order := range roundOrders {
s.logger.Infof("[DCA] calculate profit stats from order: %s", order.String())
s.logger.Infof("calculate profit stats from order: %s", order.String())

// skip no trade orders
if order.ExecutedQuantity.Sign() == 0 {
continue
}

trades, err := queryService.QueryOrderTrades(ctx, types.OrderQuery{
trades, err := retry.QueryOrderTradesUntilSuccessfulLite(ctx, queryService, types.OrderQuery{
Symbol: order.Symbol,
OrderID: strconv.FormatUint(order.OrderID, 10),
})
Expand All @@ -419,7 +436,7 @@ func (s *Strategy) CalculateAndEmitProfit(ctx context.Context) error {
}

for _, trade := range trades {
s.logger.Infof("[DCA] calculate profit stats from trade: %s", trade.String())
s.logger.Infof("calculate profit stats from trade: %s", trade.String())
s.ProfitStats.AddTrade(trade)
}
}
Expand All @@ -430,7 +447,7 @@ func (s *Strategy) CalculateAndEmitProfit(ctx context.Context) error {
// store into persistence
bbgo.Sync(ctx, s)

s.logger.Infof("[DCA] profit stats:\n%s", s.ProfitStats.String())
s.logger.Infof("profit stats:\n%s", s.ProfitStats.String())

// emit profit
s.EmitProfit(s.ProfitStats)
Expand Down
2 changes: 1 addition & 1 deletion pkg/strategy/dca2/take_profit.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

func (s *Strategy) placeTakeProfitOrders(ctx context.Context) error {
s.logger.Info("[DCA] start placing take profit orders")
s.logger.Info("start placing take profit orders")
order := generateTakeProfitOrder(s.Market, s.TakeProfitRatio, s.Position, s.OrderGroupID)
createdOrders, err := s.OrderExecutor.SubmitOrders(ctx, order)
if err != nil {
Expand Down
Loading