Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX: [sentinel] Modify detection logic #1884

Merged
merged 4 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 64 additions & 14 deletions config/sentinel.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,68 @@ sessions:
envVarPrefix: max
publicOnly: true

persistence:
json:
directory: var/data
redis:
host: 127.0.0.1
port: 6379
db: 0

exchangeStrategies:
- on: *exchange
sentinel:
symbol: BTCUSDT
interval: 1m
scoreThreshold: 0.6
klineLimit: 43200
- on: *exchange
sentinel:
symbol: BTCUSDT
interval: &intervel 1m
threshold: &threshold 0.7
proportion: &proportion 0.05
numSamples: &numSamples 1440
window: &window 60
- on: *exchange
sentinel:
symbol: ETHUSDT
interval: *intervel
threshold: *threshold
proportion: *proportion
numSamples: *numSamples
window: *window
- on: *exchange
sentinel:
symbol: BCHUSDT
interval: *intervel
threshold: *threshold
proportion: *proportion
numSamples: *numSamples
window: *window
- on: *exchange
sentinel:
symbol: LTCUSDT
interval: *intervel
threshold: *threshold
proportion: *proportion
numSamples: *numSamples
window: *window
- on: *exchange
sentinel:
symbol: XRPUSDT
interval: *intervel
threshold: *threshold
proportion: *proportion
numSamples: *numSamples
window: *window
- on: *exchange
sentinel:
symbol: MAXUSDT
interval: *intervel
threshold: *threshold
proportion: *proportion
numSamples: *numSamples
window: *window
- on: *exchange
sentinel:
symbol: BCNTUSDT
interval: *intervel
threshold: *threshold
proportion: *proportion
numSamples: *numSamples
window: *window
- on: *exchange
sentinel:
symbol: ARBUSDT
interval: *intervel
threshold: *threshold
proportion: *proportion
numSamples: *numSamples
window: *window
22 changes: 2 additions & 20 deletions pkg/ensemble/iforest/forest.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,12 @@ const (
defaultNumTrees = 100
defaultSampleSize = 256
defaultScoreThreshold = 0.6
defaultDetectionType = DetectionTypeThreshold
offset = 0.5
)

type DetectionType string

const (
DetectionTypeThreshold DetectionType = "threshold"
DetectionTypeProportion DetectionType = "proportion"
)

type Options struct {
// The method used for anomaly detection
DetectionType DetectionType `json:"detectionType"`

// The anomaly score threshold
Threshold float64 `json:"threshold"`

Expand All @@ -43,10 +34,6 @@ type Options struct {

// SetDefaultValues applies default settings to unspecified fields
func (o *Options) SetDefaultValues() {
if o.DetectionType == "" {
o.DetectionType = defaultDetectionType
}

if o.Threshold == 0 {
o.Threshold = defaultScoreThreshold
}
Expand Down Expand Up @@ -153,14 +140,9 @@ func (f *IsolationForest) Predict(samples [][]float64) []int {
predictions := make([]int, len(samples))
scores := f.Score(samples)

var threshold float64
switch f.DetectionType {
case DetectionTypeThreshold:
threshold = f.Threshold
case DetectionTypeProportion:
threshold := f.Threshold
if f.Proportion > 0 {
threshold = Quantile(f.Score(samples), 1-f.Proportion)
default:
panic("Invalid detection type")
}

for i, score := range scores {
Expand Down
146 changes: 83 additions & 63 deletions pkg/strategy/sentinel/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ func init() {
}

type Strategy struct {
Symbol string `json:"symbol"`
Interval types.Interval `json:"interval"`
ScoreThreshold float64 `json:"scoreThreshold"`
KLineLimit int `json:"klineLimit"`
Window int `json:"window"`
Symbol string `json:"symbol"`
Interval types.Interval `json:"interval"`
Threshold float64 `json:"threshold"`
Proportion float64 `json:"proportion"`
NumSamples int `json:"numSamples"`
Window int `json:"window"`

IsolationForest *iforest.IsolationForest `json:"isolationForest"`
NotificationInterval time.Duration `json:"notificationInterval"`
Expand All @@ -41,12 +42,16 @@ func (s *Strategy) ID() string {
}

func (s *Strategy) Defaults() error {
if s.ScoreThreshold == 0 {
s.ScoreThreshold = 0.6
if s.Threshold == 0 {
s.Threshold = 0.6
}

if s.KLineLimit == 0 {
s.KLineLimit = 1440
if s.Proportion == 0 {
s.Proportion = 0.05
}

if s.NumSamples == 0 {
s.NumSamples = 1440
}

if s.Window == 0 {
Expand All @@ -66,6 +71,20 @@ func (s *Strategy) Defaults() error {
return nil
}

func (s *Strategy) Validate() error {
if s.NumSamples < 0 {
return fmt.Errorf("num samples should be greater than 0")
}

if s.Window < 0 {
return fmt.Errorf("window size should be greater than 0")
}

if s.Window > s.NumSamples {
return fmt.Errorf("window size should be less than num samples")
}
return nil
}
func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) {
session.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: s.Interval})
}
Expand All @@ -76,31 +95,42 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
return err
}

session.MarketDataStream.OnKLine(types.KLineWith(s.Symbol, s.Interval, func(kline types.KLine) {
session.MarketDataStream.OnKLineClosed(types.KLineWith(s.Symbol, s.Interval, func(kline types.KLine) {
if !s.isMarketAvailable(session, s.Symbol) {
return
}

err := s.appendKline(kline)
if err != nil {
log.Errorf("unable to append kline %s", kline.String())
return
}
s.klines = s.klines[len(s.klines)-s.KLineLimit:]
if s.isNewKline(kline) {
s.klines = append(s.klines, kline)

log.Infof("kline length %d", len(s.klines))
numKlines := len(s.klines)
klineLimit := s.NumSamples + s.Window - 1
if numKlines > klineLimit {
s.klines = s.klines[numKlines-klineLimit:]
}

volumes := s.extractVolumes(s.klines)
samples := s.generateSamples(volumes)
}

volumes := s.getVolumeFromKlines(s.klines)

if s.shouldSkipIsolationForest(volumes, samples) {
s.logSkipIsolationForest(samples, volumes, kline)
volume := volumes.Last(0)
mean := volumes.Mean()
std := volumes.Std()
// if the volume is not significantly above the mean, we don't need to calculate the isolation forest
if volume < mean+2*std {
log.Infof("Volume is not significantly above mean, skipping isolation forest calculation, symbol: %s, volume: %f, mean: %f, std: %f", s.Symbol, volume, mean, std)
return
}

s.fitIsolationForest(samples)
samples := s.generateSamples(volumes)
s.trainIsolationForest(samples)

scores := s.IsolationForest.Score(samples)
s.notifyOnIsolationForestScore(scores, kline)
score := scores[len(scores)-1]
quantile := iforest.Quantile(scores, 1-s.Proportion)
log.Infof("symbol: %s, volume: %f, mean: %f, std: %f, iforest score: %f, quantile: %f", s.Symbol, volume, mean, std, score, quantile)

s.notifyOnScoreThresholdExceeded(score, quantile)
}))
return nil
}
Expand All @@ -113,7 +143,7 @@ func (s *Strategy) isMarketAvailable(session *bbgo.ExchangeSession, symbol strin
func (s *Strategy) queryHistoricalKlines(ctx context.Context, session *bbgo.ExchangeSession) error {
batchQuery := batch.KLineBatchQuery{Exchange: session.Exchange}
endTime := time.Now()
startTime := endTime.Add(-time.Duration(s.KLineLimit) * s.Interval.Duration())
startTime := endTime.Add(-time.Duration(s.NumSamples+s.Window) * s.Interval.Duration())
klineC, errC := batchQuery.Query(ctx, s.Symbol, s.Interval, startTime, endTime)
for {
select {
Expand All @@ -132,7 +162,7 @@ func (s *Strategy) queryHistoricalKlines(ctx context.Context, session *bbgo.Exch
}
}

func (s *Strategy) extractVolumes(klines []types.KLine) floats.Slice {
func (s *Strategy) getVolumeFromKlines(klines []types.KLine) floats.Slice {
volumes := floats.Slice{}
for _, kline := range klines {
volumes.Push(kline.Volume.Float64())
Expand All @@ -147,60 +177,50 @@ func (s *Strategy) generateSamples(volumes floats.Slice) [][]float64 {
break
}
subset := volumes[i : i+s.Window]

last := subset.Last(0)
mean := subset.Mean()
std := subset.Std()
samples = append(samples, []float64{mean, std})

sample := []float64{(last - mean) / std, mean, std}
samples = append(samples, sample)
}
return samples
}

func (s *Strategy) shouldSkipIsolationForest(volumes floats.Slice, samples [][]float64) bool {
volumeMean := volumes.Mean()
lastMovingMean := samples[len(samples)-1][0]
return lastMovingMean < volumeMean
}

func (s *Strategy) logSkipIsolationForest(samples [][]float64, volumes floats.Slice, kline types.KLine) {
log.Infof("Skipping isolation forest calculation for symbol: %s, last moving mean: %f, average volume: %f, kline: %s", s.Symbol, samples[len(samples)-1][0], volumes.Mean(), kline.String())
}

func (s *Strategy) fitIsolationForest(samples [][]float64) {
if s.retrainingRateLimiter.Allow() {
s.IsolationForest = iforest.New()
s.IsolationForest.Fit(samples)
log.Infof("Isolation forest fitted with %d samples and %d/%d trees", len(samples), len(s.IsolationForest.Trees), s.IsolationForest.NumTrees)
func (s *Strategy) trainIsolationForest(samples [][]float64) {
if !s.retrainingRateLimiter.Allow() {
return
}
}

func (s *Strategy) notifyOnIsolationForestScore(scores []float64, kline types.KLine) {
lastScore := scores[len(scores)-1]
log.Warnf("Symbol: %s, isolation forest score: %f, threshold: %f, kline: %s", s.Symbol, lastScore, s.ScoreThreshold, kline.String())
if lastScore > s.ScoreThreshold {
if s.notificationRateLimiter.Allow() {
bbgo.Notify("symbol: %s isolation forest score: %f", s.Symbol, lastScore)
}
}
s.IsolationForest = iforest.New()
s.IsolationForest.Fit(samples)
log.Infof("Isolation forest fitted with %d samples and %d/%d trees", len(samples), len(s.IsolationForest.Trees), s.IsolationForest.NumTrees)
}

func (s *Strategy) appendKline(kline types.KLine) error {
if len(s.klines) == 0 {
return fmt.Errorf("klines is empty")
func (s *Strategy) notifyOnScoreThresholdExceeded(score float64, quantile float64) {
// if the score is below the threshold, we don't need to notify
if score < s.Threshold {
return
}

lastKline := s.klines[len(s.klines)-1]
if lastKline.Exchange != kline.Exchange {
return fmt.Errorf("last kline exchange %s is not equal to current kline exchange %s", lastKline.Exchange, kline.Exchange)
// if the score is below the quantile, we don't need to notify
if score < quantile {
return
}

if lastKline.Symbol != kline.Symbol {
return fmt.Errorf("last kline symbol %s is not equal to current kline symbol %s", lastKline.Symbol, kline.Symbol)
// if the notification rate limiter is not allowed, we don't need to notify
if !s.notificationRateLimiter.Allow() {
return
}

if lastKline.EndTime.After(kline.EndTime.Time()) {
return fmt.Errorf("last kline end time %s is after current kline end time %s", lastKline.EndTime, kline.EndTime)
bbgo.Notify("symbol: %s, iforest score: %f, threshold: %f, quantile: %f", s.Symbol, score, s.Threshold, quantile)
}

func (s *Strategy) isNewKline(kline types.KLine) bool {
if len(s.klines) == 0 {
return true
}

s.klines = append(s.klines, kline)
return nil
lastKline := s.klines[len(s.klines)-1]
return lastKline.EndTime.Before(kline.EndTime.Time())
}
Loading