Skip to content

Commit

Permalink
all: refactor for slack alert config
Browse files Browse the repository at this point in the history
  • Loading branch information
c9s committed Dec 2, 2024
1 parent 6292118 commit 17a80f2
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 73 deletions.
18 changes: 11 additions & 7 deletions config/xalign.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ sessions:
binance:
exchange: binance
envVarPrefix: binance
margin: true

persistence:
json:
Expand All @@ -28,10 +29,13 @@ persistence:

crossExchangeStrategies:
- xalign:
interval: 1m
# interval: 1m
interval: 3s
for: 1m
sessions:
- max
- binance
dryRun: true

## quoteCurrencies config specifies which quote currency should be used for BUY order or SELL order.
## when specifying [USDC,TWD] for "BUY", then it will consider BTCUSDC first then BTCTWD second.
Expand All @@ -41,15 +45,15 @@ crossExchangeStrategies:
expectedBalances:
BTC: 0.0440
useTakerOrder: false
dryRun: true
balanceToleranceRange: 10%
balanceToleranceRange: 1%
maxAmounts:
USDT: 100
USDC: 100
TWD: 3000
largeAmountAlert:
quote: USDT
quoteCurrency: USDT
amount: 200
slackMentions:
- '<@USER_ID>'
- '<!subteam^TEAM_ID>'
slack:
mentions:
- '<@USER_ID>'
- '<!subteam^TEAM_ID>'
7 changes: 4 additions & 3 deletions pkg/strategy/autoborrow/alert_margin_level.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@ import (
)

type MarginLevelAlertConfig struct {
slackalert.SlackAlert
Interval types.Duration `json:"interval"`
MinMargin fixedpoint.Value `json:"minMargin"`
Slack *slackalert.SlackAlert `json:"slack"`
Interval types.Duration `json:"interval"`
MinMargin fixedpoint.Value `json:"minMargin"`
}

// MarginLevelAlert is used to send the slack mention alerts when the current margin is less than the required margin level
type MarginLevelAlert struct {
SlackAlert *slackalert.SlackAlert
CurrentMarginLevel fixedpoint.Value
MinimalMarginLevel fixedpoint.Value
SlackMentions []string
Expand Down
3 changes: 2 additions & 1 deletion pkg/strategy/autoborrow/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,9 +592,10 @@ func (s *Strategy) marginAlertWorker(ctx context.Context, alertInterval time.Dur
account := s.ExchangeSession.GetAccount()
if s.MarginLevelAlert != nil && account.MarginLevel.Compare(s.MarginLevelAlert.MinMargin) <= 0 {
bbgo.Notify(&MarginLevelAlert{
SlackAlert: s.MarginLevelAlert.Slack,
CurrentMarginLevel: account.MarginLevel,
MinimalMarginLevel: s.MarginLevelAlert.MinMargin,
SlackMentions: s.MarginLevelAlert.Mentions,
SlackMentions: s.MarginLevelAlert.Slack.Mentions,
SessionName: s.ExchangeSession.Name,
})
bbgo.Notify(account.Balances().Debts())
Expand Down
2 changes: 1 addition & 1 deletion pkg/strategy/deposit2transfer/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type Strategy struct {

AutoRepay bool `json:"autoRepay"`

SlackAlert *envvar.SlackAlert `json:"slackAlert"`
SlackAlert *slackalert.SlackAlert `json:"slackAlert"`

marginTransferService marginTransferService
marginBorrowRepayService types.MarginBorrowRepayService
Expand Down
37 changes: 31 additions & 6 deletions pkg/strategy/xalign/detector/deviation.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"math"
"sync"
"time"

"github.com/sirupsen/logrus"
)

type Record[T any] struct {
Expand All @@ -18,6 +20,8 @@ type DeviationDetector[T any] struct {
duration time.Duration // Time limit for sustained deviation
toFloat64 func(T) float64 // Function to convert T to float64
records []Record[T] // Tracks deviation records

logger logrus.FieldLogger
}

// NewDeviationDetector creates a new instance of DeviationDetector
Expand All @@ -34,43 +38,64 @@ func NewDeviationDetector[T any](
}
}

logger := logrus.New()
// logger.SetLevel(logrus.ErrorLevel)
logger.SetLevel(logrus.DebugLevel)
return &DeviationDetector[T]{
expectedValue: expectedValue,
tolerance: tolerance,
duration: duration,
toFloat64: toFloat64,
records: nil,
logger: logger,
}
}

func (d *DeviationDetector[T]) AddRecord(value T, at time.Time) (bool, time.Duration) {
d.mu.Lock()
defer d.mu.Unlock()
func (d *DeviationDetector[T]) SetLogger(logger logrus.FieldLogger) {
d.logger = logger
}

func (d *DeviationDetector[T]) AddRecord(at time.Time, value T) (bool, time.Duration) {
// Calculate deviation percentage
expected := d.toFloat64(d.expectedValue)
current := d.toFloat64(value)
deviationPercentage := math.Abs((current - expected) / expected)

d.logger.Infof("deviation detection: expected=%f, current=%f, deviation=%f", expected, current, deviationPercentage)

d.mu.Lock()
defer d.mu.Unlock()

// Reset records if deviation is within tolerance
if deviationPercentage <= d.tolerance {
d.records = nil
return false, 0
}

record := Record[T]{Value: value, Time: at}

// If deviation exceeds tolerance, track the record
if len(d.records) == 0 {
// No prior deviation, start tracking
d.records = []Record[T]{{Value: value, Time: at}}
d.records = []Record[T]{record}
return false, 0
}

// Append new record
d.records = append(d.records, Record[T]{Value: value, Time: at})
d.records = append(d.records, record)

// Calculate the sustained duration
return d.ShouldFix()
}

func (d *DeviationDetector[T]) ShouldFix() (bool, time.Duration) {
if len(d.records) == 0 {
return false, 0
}

last := d.records[len(d.records)-1]
firstRecord := d.records[0]
sustainedDuration := at.Sub(firstRecord.Time)
sustainedDuration := last.Time.Sub(firstRecord.Time)
return sustainedDuration >= d.duration, sustainedDuration
}

Expand Down
30 changes: 6 additions & 24 deletions pkg/strategy/xalign/detector/deviation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,7 @@ func TestBalanceDeviationDetector(t *testing.T) {
now := time.Now()

// Add a balance record within tolerance
reset, sustainedDuration := detector.AddRecord(
types.Balance{Currency: "BTC", NetAsset: fixedpoint.NewFromFloat(10.05)},
now,
)
reset, sustainedDuration := detector.AddRecord(now, types.Balance{Currency: "BTC", NetAsset: fixedpoint.NewFromFloat(10.05)})
if reset {
t.Errorf("Expected no sustained deviation for value within tolerance")
}
Expand All @@ -34,10 +31,7 @@ func TestBalanceDeviationDetector(t *testing.T) {
}

// Add a balance record outside tolerance
reset, sustainedDuration = detector.AddRecord(
types.Balance{Currency: "BTC", NetAsset: fixedpoint.NewFromFloat(11.0)},
now.Add(2*time.Minute),
)
reset, sustainedDuration = detector.AddRecord(now.Add(2*time.Minute), types.Balance{Currency: "BTC", NetAsset: fixedpoint.NewFromFloat(11.0)})
if reset {
t.Errorf("Expected no sustained deviation initially")
}
Expand All @@ -46,10 +40,7 @@ func TestBalanceDeviationDetector(t *testing.T) {
}

// Add another record exceeding duration
reset, sustainedDuration = detector.AddRecord(
types.Balance{Currency: "BTC", NetAsset: fixedpoint.NewFromFloat(11.5)},
now.Add(6*time.Minute),
)
reset, sustainedDuration = detector.AddRecord(now.Add(6*time.Minute), types.Balance{Currency: "BTC", NetAsset: fixedpoint.NewFromFloat(11.5)})
if !reset {
t.Errorf("Expected reset to be true")
}
Expand All @@ -72,10 +63,7 @@ func TestBalanceRecordTracking(t *testing.T) {
now := time.Now()

// Add a balance record outside tolerance
_, _ = detector.AddRecord(
types.Balance{Currency: "BTC", NetAsset: fixedpoint.NewFromFloat(11.0)},
now,
)
_, _ = detector.AddRecord(now, types.Balance{Currency: "BTC", NetAsset: fixedpoint.NewFromFloat(11.0)})

// Check if record is being tracked
records := detector.GetRecords()
Expand All @@ -84,20 +72,14 @@ func TestBalanceRecordTracking(t *testing.T) {
}

// Add another record
_, _ = detector.AddRecord(
types.Balance{Currency: "BTC", NetAsset: fixedpoint.NewFromFloat(11.5)},
now.Add(2*time.Minute),
)
_, _ = detector.AddRecord(now.Add(2*time.Minute), types.Balance{Currency: "BTC", NetAsset: fixedpoint.NewFromFloat(11.5)})
records = detector.GetRecords()
if len(records) != 2 {
t.Errorf("Expected 2 records, got %d", len(records))
}

// Add a balance record within tolerance to reset
_, _ = detector.AddRecord(
types.Balance{Currency: "BTC", NetAsset: fixedpoint.NewFromFloat(10.05)},
now.Add(4*time.Minute),
)
_, _ = detector.AddRecord(now.Add(4*time.Minute), types.Balance{Currency: "BTC", NetAsset: fixedpoint.NewFromFloat(10.05)})
records = detector.GetRecords()
if len(records) != 0 {
t.Errorf("Expected records to be cleared, got %d", len(records))
Expand Down
61 changes: 30 additions & 31 deletions pkg/strategy/xalign/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/c9s/bbgo/pkg/core"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/pricesolver"
"github.com/c9s/bbgo/pkg/slack/slackalert"
"github.com/c9s/bbgo/pkg/strategy/xalign/detector"
"github.com/c9s/bbgo/pkg/types"
)
Expand All @@ -39,16 +40,18 @@ type QuoteCurrencyPreference struct {
Sell []string `json:"sell"`
}

type AmountAlertConfig struct {
type LargeAmountAlertConfig struct {
Slack *slackalert.SlackAlert `json:"slack"`

QuoteCurrency string `json:"quoteCurrency"`
Amount fixedpoint.Value `json:"amount"`
SlackMentions []string `json:"slackMentions"`
}

type LargeAmountAlert struct {
SlackAlert *slackalert.SlackAlert

QuoteCurrency string
AlertAmount fixedpoint.Value
SlackMentions []string

BaseCurrency string
Side types.SideType
Expand All @@ -62,7 +65,7 @@ func (m *LargeAmountAlert) SlackAttachment() slack.Attachment {
Color: "red",
Title: fmt.Sprintf("xalign amount alert - try to align %s with quote %s amount %f > %f",
m.BaseCurrency, m.QuoteCurrency, m.Amount.Float64(), m.AlertAmount.Float64()),
Text: strings.Join(m.SlackMentions, " "),
Text: strings.Join(m.SlackAlert.Mentions, " "),
Fields: []slack.AttachmentField{
{
Title: "Base Currency",
Expand Down Expand Up @@ -99,14 +102,12 @@ type Strategy struct {
BalanceToleranceRange fixedpoint.Value `json:"balanceToleranceRange"`
Duration types.Duration `json:"for"`
MaxAmounts map[string]fixedpoint.Value `json:"maxAmounts"`
LargeAmountAlert *AmountAlertConfig `json:"largeAmountAlert"`
LargeAmountAlert *LargeAmountAlertConfig `json:"largeAmountAlert"`

SlackNotify bool `json:"slackNotify"`
SlackNotifyMentions []string `json:"slackNotifyMentions"`
SlackNotifyThresholdAmount fixedpoint.Value `json:"slackNotifyThresholdAmount,omitempty"`

faultBalanceRecords map[string][]TimeBalance

deviationDetectors map[string]*detector.DeviationDetector[types.Balance]

priceResolver *pricesolver.SimplePriceSolver
Expand Down Expand Up @@ -446,10 +447,8 @@ func (s *Strategy) CrossRun(ctx context.Context, _ bbgo.OrderExecutionRouter, se
instanceID := s.InstanceID()
_ = instanceID

s.faultBalanceRecords = make(map[string][]TimeBalance)
s.sessions = make(map[string]*bbgo.ExchangeSession)
s.orderBooks = make(map[string]*bbgo.ActiveOrderBook)

s.orderStore = core.NewOrderStore("")

for currency, expectedValue := range s.ExpectedBalances {
Expand Down Expand Up @@ -525,24 +524,22 @@ func (s *Strategy) CrossRun(ctx context.Context, _ bbgo.OrderExecutionRouter, se
}

func (s *Strategy) resetFaultBalanceRecords(currency string) {
s.faultBalanceRecords[currency] = nil
if d, ok := s.deviationDetectors[currency]; ok {
d.ClearRecords()
}
}

func (s *Strategy) recordBalance(totalBalances types.BalanceMap) {
now := time.Now()
for currency, expectedBalance := range s.ExpectedBalances {
q := s.calculateRefillQuantity(totalBalances, currency, expectedBalance)
rf := q.Div(expectedBalance).Abs().Float64()
tr := s.BalanceToleranceRange.Float64()
if rf > tr {
balance := totalBalances[currency]
s.faultBalanceRecords[currency] = append(s.faultBalanceRecords[currency], TimeBalance{
Time: now,
Balance: balance,
})
} else {
// reset counter
s.resetFaultBalanceRecords(currency)

for currency := range s.ExpectedBalances {
balance, hasBal := totalBalances[currency]
if d, ok := s.deviationDetectors[currency]; ok {
if hasBal {
d.AddRecord(now, balance)
} else {
d.AddRecord(now, types.NewZeroBalance(currency))
}
}
}
}
Expand Down Expand Up @@ -603,17 +600,19 @@ func (s *Strategy) align(ctx context.Context, sessions map[string]*bbgo.Exchange

s.recordBalance(totalBalances)

log.Debugf("checking all fault balance records...")
for currency, expectedBalance := range s.ExpectedBalances {
q := s.calculateRefillQuantity(totalBalances, currency, expectedBalance)

if s.Duration > 0 {
log.Infof("checking %s fault balance records...", currency)
if faultBalance, ok := s.faultBalanceRecords[currency]; ok && len(faultBalance) > 0 {
if time.Since(faultBalance[0].Time) < s.Duration.Duration() {
log.Infof("%s fault record since: %s < persistence period %s", currency, faultBalance[0].Time, s.Duration.Duration())
continue
}
log.Debugf("checking %s fault balance records...", currency)

if d, ok := s.deviationDetectors[currency]; ok {
should, sustainedDuration := d.ShouldFix()
if !should {
continue
}

log.Infof("%s sustained deviation for %s...", currency, sustainedDuration)
}

if s.LargeAmountAlert != nil {
Expand All @@ -623,9 +622,9 @@ func (s *Strategy) align(ctx context.Context, sessions map[string]*bbgo.Exchange
log.Infof("resolved price for currency: %s, price: %f, quantity: %f, amount: %f", currency, price.Float64(), quantity.Float64(), amount.Float64())
if amount.Compare(s.LargeAmountAlert.Amount) > 0 {
alert := &LargeAmountAlert{
SlackAlert: s.LargeAmountAlert.Slack,
QuoteCurrency: s.LargeAmountAlert.QuoteCurrency,
AlertAmount: s.LargeAmountAlert.Amount,
SlackMentions: s.LargeAmountAlert.SlackMentions,
BaseCurrency: currency,
Price: price,
Quantity: quantity,
Expand Down
Loading

0 comments on commit 17a80f2

Please sign in to comment.