Skip to content

Commit

Permalink
Merge pull request #1708 from c9s/c9s/xmaker/integrate-circuitbreaker
Browse files Browse the repository at this point in the history
FEATURE: [xmaker] integrate circuit breaker
  • Loading branch information
c9s authored Aug 26, 2024
2 parents 92ad80f + 6ef8aa6 commit e7fd90e
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 9 deletions.
13 changes: 10 additions & 3 deletions config/xmaker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ crossExchangeStrategies:
# 0.1 pip is 0.01, here we use 10, so we will get 18000.00, 18001.00 and
# 18002.00
pips: 10
persistence:
type: redis

circuitBreaker:
maximumConsecutiveTotalLoss: 36.0
maximumConsecutiveLossTimes: 10
maximumLossPerRound: 15.0
maximumTotalLoss: 80.0
ignoreConsecutiveDustLoss: true
consecutiveDustLossThreshold: 0.003
haltDuration: "30m"
maximumHaltTimes: 2
maximumHaltTimesExceededPanic: true
14 changes: 8 additions & 6 deletions pkg/risk/circuitbreaker/basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,15 +116,17 @@ type BasicCircuitBreaker struct {
}

func NewBasicCircuitBreaker(strategyID, strategyInstance string) *BasicCircuitBreaker {
return &BasicCircuitBreaker{
b := &BasicCircuitBreaker{
MaximumConsecutiveLossTimes: 8,
MaximumHaltTimes: 3,
MaximumHaltTimesExceededPanic: false,
HaltDuration: types.Duration(30 * time.Minute),
HaltDuration: types.Duration(1 * time.Hour),
strategyID: strategyID,
strategyInstance: strategyInstance,
metricsLabels: prometheus.Labels{"strategy": strategyID, "strategyInstance": strategyInstance},
}
b.updateMetrics()
return b
}

func (b *BasicCircuitBreaker) getMetricsLabels() prometheus.Labels {
Expand Down Expand Up @@ -221,21 +223,21 @@ func (b *BasicCircuitBreaker) reset() {
b.updateMetrics()
}

func (b *BasicCircuitBreaker) IsHalted(now time.Time) bool {
func (b *BasicCircuitBreaker) IsHalted(now time.Time) (string, bool) {
b.mu.Lock()
defer b.mu.Unlock()

if !b.halted {
return false
return "", false
}

// check if it's an expired halt
if now.After(b.haltTo) {
b.reset()
return false
return "", false
}

return true
return b.haltReason, true
}

func (b *BasicCircuitBreaker) halt(now time.Time, reason string) {
Expand Down
33 changes: 33 additions & 0 deletions pkg/strategy/xmaker/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,17 @@ import (
"github.com/c9s/bbgo/pkg/core"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/indicator"
"github.com/c9s/bbgo/pkg/risk/circuitbreaker"
"github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util"
)

var defaultMargin = fixedpoint.NewFromFloat(0.003)
var Two = fixedpoint.NewFromInt(2)

// circuitBreakerAlertLimiter is for CircuitBreaker alerts
var circuitBreakerAlertLimiter = rate.NewLimiter(rate.Every(3*time.Minute), 2)

const priceUpdateTimeout = 30 * time.Second

const ID = "xmaker"
Expand Down Expand Up @@ -98,6 +102,8 @@ type Strategy struct {

state *State

CircuitBreaker *circuitbreaker.BasicCircuitBreaker `json:"circuitBreaker"`

// persistence fields
Position *types.Position `json:"position,omitempty" persistence:"position"`
ProfitStats *ProfitStats `json:"profitStats,omitempty" persistence:"profit_stats"`
Expand Down Expand Up @@ -187,6 +193,19 @@ func (s *Strategy) updateQuote(ctx context.Context, orderExecutionRouter bbgo.Or
return
}

if s.CircuitBreaker != nil {
now := time.Now()
if reason, halted := s.CircuitBreaker.IsHalted(now); halted {
log.Warnf("[arbWorker] strategy is halted, reason: %s", reason)

if circuitBreakerAlertLimiter.AllowN(now, 1) {
bbgo.Notify("Strategy is halted, reason: %s", reason)
}

return
}
}

bestBid, bestAsk, hasPrice := s.book.BestBidAndAsk()
if !hasPrice {
return
Expand Down Expand Up @@ -570,6 +589,8 @@ func (s *Strategy) Hedge(ctx context.Context, pos fixedpoint.Value) {

log.Infof("submitting %s hedge order %s %v", s.Symbol, side.String(), quantity)
bbgo.Notify("Submitting %s hedge order %s %v", s.Symbol, side.String(), quantity)

// TODO: improve order executor
orderExecutor := &bbgo.ExchangeOrderExecutor{Session: s.sourceSession}
returnOrders, err := orderExecutor.SubmitOrders(ctx, types.SubmitOrder{
Market: s.sourceMarket,
Expand Down Expand Up @@ -630,6 +651,14 @@ func (s *Strategy) tradeRecover(ctx context.Context) {
}
}

func (s *Strategy) Defaults() error {
if s.CircuitBreaker == nil {
s.CircuitBreaker = circuitbreaker.NewBasicCircuitBreaker(ID, s.InstanceID())
}

return nil
}

func (s *Strategy) Validate() error {
if s.Quantity.IsZero() || s.QuantityScale == nil {
return errors.New("quantity or quantityScale can not be empty")
Expand Down Expand Up @@ -813,6 +842,10 @@ func (s *Strategy) CrossRun(
s.ProfitStats.AddProfit(p)

s.Environment.RecordPosition(s.Position, trade, &p)

if s.CircuitBreaker != nil {
s.CircuitBreaker.RecordProfit(profit, trade.Time.Time())
}
}
})

Expand Down

0 comments on commit e7fd90e

Please sign in to comment.