diff --git a/config/sentinel.yaml b/config/sentinel.yaml index 137689bb5..2abc398ef 100644 --- a/config/sentinel.yaml +++ b/config/sentinel.yaml @@ -9,55 +9,63 @@ exchangeStrategies: sentinel: symbol: BTCUSDT interval: &intervel 1m - scoreThreshold: &threshold 0.7 - klineLimit: &limit 43200 - window: &window 1440 + threshold: &threshold 0.7 + proportion: &proportion 0.05 + numSamples: &numSamples 1440 + window: &window 60 - on: *exchange sentinel: symbol: ETHUSDT interval: *intervel - scoreThreshold: *threshold - klineLimit: *limit + threshold: *threshold + proportion: *proportion + numSamples: *numSamples window: *window - on: *exchange sentinel: symbol: BCHUSDT interval: *intervel - scoreThreshold: *threshold - klineLimit: *limit + threshold: *threshold + proportion: *proportion + numSamples: *numSamples window: *window - on: *exchange sentinel: symbol: LTCUSDT interval: *intervel - scoreThreshold: *threshold - klineLimit: *limit + threshold: *threshold + proportion: *proportion + numSamples: *numSamples window: *window - on: *exchange sentinel: symbol: XRPUSDT interval: *intervel - scoreThreshold: *threshold - klineLimit: *limit + threshold: *threshold + proportion: *proportion + numSamples: *numSamples window: *window - on: *exchange sentinel: symbol: MAXUSDT interval: *intervel - scoreThreshold: *threshold - klineLimit: *limit + threshold: *threshold + proportion: *proportion + numSamples: *numSamples window: *window - on: *exchange sentinel: symbol: BCNTUSDT interval: *intervel - scoreThreshold: *threshold - klineLimit: *limit + threshold: *threshold + proportion: *proportion + numSamples: *numSamples window: *window - on: *exchange sentinel: symbol: ARBUSDT interval: *intervel - scoreThreshold: *threshold - klineLimit: *limit + threshold: *threshold + proportion: *proportion + numSamples: *numSamples window: *window \ No newline at end of file diff --git a/pkg/ensemble/iforest/forest.go b/pkg/ensemble/iforest/forest.go index d0add3174..faa9ae6ec 100644 --- a/pkg/ensemble/iforest/forest.go +++ b/pkg/ensemble/iforest/forest.go @@ -10,21 +10,12 @@ const ( defaultNumTrees = 100 defaultSampleSize = 256 defaultScoreThreshold = 0.6 - defaultDetectionType = DetectionTypeThreshold offset = 0.5 ) type DetectionType string -const ( - DetectionTypeThreshold DetectionType = "threshold" - DetectionTypeProportion DetectionType = "proportion" -) - type Options struct { - // The method used for anomaly detection - DetectionType DetectionType `json:"detectionType"` - // The anomaly score threshold Threshold float64 `json:"threshold"` @@ -43,10 +34,6 @@ type Options struct { // SetDefaultValues applies default settings to unspecified fields func (o *Options) SetDefaultValues() { - if o.DetectionType == "" { - o.DetectionType = defaultDetectionType - } - if o.Threshold == 0 { o.Threshold = defaultScoreThreshold } @@ -153,14 +140,9 @@ func (f *IsolationForest) Predict(samples [][]float64) []int { predictions := make([]int, len(samples)) scores := f.Score(samples) - var threshold float64 - switch f.DetectionType { - case DetectionTypeThreshold: - threshold = f.Threshold - case DetectionTypeProportion: + threshold := f.Threshold + if f.Proportion > 0 { threshold = Quantile(f.Score(samples), 1-f.Proportion) - default: - panic("Invalid detection type") } for i, score := range scores { diff --git a/pkg/strategy/sentinel/strategy.go b/pkg/strategy/sentinel/strategy.go index a8b4115e7..9fbda9858 100644 --- a/pkg/strategy/sentinel/strategy.go +++ b/pkg/strategy/sentinel/strategy.go @@ -2,6 +2,7 @@ package sentinel import ( "context" + "fmt" "time" "github.com/c9s/bbgo/pkg/bbgo" @@ -20,11 +21,12 @@ func init() { } type Strategy struct { - Symbol string `json:"symbol"` - Interval types.Interval `json:"interval"` - ScoreThreshold float64 `json:"scoreThreshold"` - KLineLimit int `json:"klineLimit"` - Window int `json:"window"` + Symbol string `json:"symbol"` + Interval types.Interval `json:"interval"` + Threshold float64 `json:"threshold"` + Proportion float64 `json:"proportion"` + NumSamples int `json:"numSamples"` + Window int `json:"window"` IsolationForest *iforest.IsolationForest `json:"isolationForest"` NotificationInterval time.Duration `json:"notificationInterval"` @@ -40,12 +42,16 @@ func (s *Strategy) ID() string { } func (s *Strategy) Defaults() error { - if s.ScoreThreshold == 0 { - s.ScoreThreshold = 0.6 + if s.Threshold == 0 { + s.Threshold = 0.6 } - if s.KLineLimit == 0 { - s.KLineLimit = 1440 + if s.Proportion == 0 { + s.Proportion = 0.05 + } + + if s.NumSamples == 0 { + s.NumSamples = 1440 } if s.Window == 0 { @@ -65,6 +71,20 @@ func (s *Strategy) Defaults() error { return nil } +func (s *Strategy) Validate() error { + if s.NumSamples < 0 { + return fmt.Errorf("num samples should be greater than 0") + } + + if s.Window < 0 { + return fmt.Errorf("window size should be greater than 0") + } + + if s.Window > s.NumSamples { + return fmt.Errorf("window size should be less than num samples") + } + return nil +} func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) { session.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: s.Interval}) } @@ -82,20 +102,35 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se if s.isNewKline(kline) { s.klines = append(s.klines, kline) - s.klines = s.klines[len(s.klines)-s.KLineLimit:] + + numKlines := len(s.klines) + klineLimit := s.NumSamples + s.Window - 1 + if numKlines > klineLimit { + s.klines = s.klines[numKlines-klineLimit:] + } + } - volumes := s.extractVolumes(s.klines) - samples := s.generateSamples(volumes) + volumes := s.getVolumeFromKlines(s.klines) - if s.shouldSkipIsolationForest(volumes, samples) { - s.logSkipIsolationForest(samples, volumes, kline) + volume := volumes.Last(0) + mean := volumes.Mean() + std := volumes.Std() + // if the volume is not significantly above the mean, we don't need to calculate the isolation forest + if volume < mean+2*std { + log.Infof("Volume is not significantly above mean, skipping isolation forest calculation, symbol: %s, volume: %f, mean: %f, std: %f", s.Symbol, volume, mean, std) return } - s.fitIsolationForest(samples) + samples := s.generateSamples(volumes) + s.trainIsolationForest(samples) + scores := s.IsolationForest.Score(samples) - s.notifyOnIsolationForestScore(scores, kline) + score := scores[len(scores)-1] + quantile := iforest.Quantile(scores, 1-s.Proportion) + log.Infof("symbol: %s, volume: %f, mean: %f, std: %f, iforest score: %f, quantile: %f", s.Symbol, volume, mean, std, score, quantile) + + s.notifyOnScoreThresholdExceeded(score, quantile) })) return nil } @@ -108,7 +143,7 @@ func (s *Strategy) isMarketAvailable(session *bbgo.ExchangeSession, symbol strin func (s *Strategy) queryHistoricalKlines(ctx context.Context, session *bbgo.ExchangeSession) error { batchQuery := batch.KLineBatchQuery{Exchange: session.Exchange} endTime := time.Now() - startTime := endTime.Add(-time.Duration(s.KLineLimit) * s.Interval.Duration()) + startTime := endTime.Add(-time.Duration(s.NumSamples+s.Window) * s.Interval.Duration()) klineC, errC := batchQuery.Query(ctx, s.Symbol, s.Interval, startTime, endTime) for { select { @@ -127,7 +162,7 @@ func (s *Strategy) queryHistoricalKlines(ctx context.Context, session *bbgo.Exch } } -func (s *Strategy) extractVolumes(klines []types.KLine) floats.Slice { +func (s *Strategy) getVolumeFromKlines(klines []types.KLine) floats.Slice { volumes := floats.Slice{} for _, kline := range klines { volumes.Push(kline.Volume.Float64()) @@ -142,40 +177,43 @@ func (s *Strategy) generateSamples(volumes floats.Slice) [][]float64 { break } subset := volumes[i : i+s.Window] - + last := subset.Last(0) mean := subset.Mean() std := subset.Std() - samples = append(samples, []float64{mean, std}) + + sample := []float64{(last - mean) / std, mean, std} + samples = append(samples, sample) } return samples } -func (s *Strategy) shouldSkipIsolationForest(volumes floats.Slice, samples [][]float64) bool { - volumeMean := volumes.Mean() - lastMovingMean := samples[len(samples)-1][0] - return lastMovingMean < volumeMean -} +func (s *Strategy) trainIsolationForest(samples [][]float64) { + if !s.retrainingRateLimiter.Allow() { + return + } -func (s *Strategy) logSkipIsolationForest(samples [][]float64, volumes floats.Slice, kline types.KLine) { - log.Infof("Skipping isolation forest calculation for symbol: %s, last moving mean: %f, average volume: %f, kline: %s", s.Symbol, samples[len(samples)-1][0], volumes.Mean(), kline.String()) + s.IsolationForest = iforest.New() + s.IsolationForest.Fit(samples) + log.Infof("Isolation forest fitted with %d samples and %d/%d trees", len(samples), len(s.IsolationForest.Trees), s.IsolationForest.NumTrees) } -func (s *Strategy) fitIsolationForest(samples [][]float64) { - if s.retrainingRateLimiter.Allow() { - s.IsolationForest = iforest.New() - s.IsolationForest.Fit(samples) - log.Infof("Isolation forest fitted with %d samples and %d/%d trees", len(samples), len(s.IsolationForest.Trees), s.IsolationForest.NumTrees) +func (s *Strategy) notifyOnScoreThresholdExceeded(score float64, quantile float64) { + // if the score is below the threshold, we don't need to notify + if score < s.Threshold { + return } -} -func (s *Strategy) notifyOnIsolationForestScore(scores []float64, kline types.KLine) { - lastScore := scores[len(scores)-1] - log.Infof("Symbol: %s, iforest score: %f, threshold: %f, volume: %f", s.Symbol, lastScore, s.ScoreThreshold, kline.Volume.Float64()) - if lastScore > s.ScoreThreshold { - if s.notificationRateLimiter.Allow() { - bbgo.Notify("symbol: %s isolation forest score: %f", s.Symbol, lastScore) - } + // if the score is below the quantile, we don't need to notify + if score < quantile { + return } + + // if the notification rate limiter is not allowed, we don't need to notify + if !s.notificationRateLimiter.Allow() { + return + } + + bbgo.Notify("symbol: %s, iforest score: %f, threshold: %f, quantile: %f", s.Symbol, score, s.Threshold, quantile) } func (s *Strategy) isNewKline(kline types.KLine) bool {