diff --git a/config/sentinel.yaml b/config/sentinel.yaml index 2ead44dd1..b7c0dc1b3 100644 --- a/config/sentinel.yaml +++ b/config/sentinel.yaml @@ -4,18 +4,40 @@ sessions: envVarPrefix: max publicOnly: true -persistence: - json: - directory: var/data - redis: - host: 127.0.0.1 - port: 6379 - db: 0 - exchangeStrategies: -- on: *exchange - sentinel: - symbol: BTCUSDT - interval: 1m - scoreThreshold: 0.6 - klineLimit: 43200 + - on: *exchange + sentinel: &sentinel + symbol: BTCUSDT + interval: 1m + threshold: 0.6 + proportion: 0.05 + numSamples: 1440 + window: 60 + - on: *exchange + sentinel: + <<: *sentinel + symbol: ETHUSDT + - on: *exchange + sentinel: + <<: *sentinel + symbol: BCHUSDT + - on: *exchange + sentinel: + <<: *sentinel + symbol: LTCUSDT + - on: *exchange + sentinel: + <<: *sentinel + symbol: XRPUSDT + - on: *exchange + sentinel: + <<: *sentinel + symbol: MAXUSDT + - on: *exchange + sentinel: + <<: *sentinel + symbol: BCNTUSDT + - on: *exchange + sentinel: + <<: *sentinel + symbol: ARBUSDT \ No newline at end of file diff --git a/pkg/ensemble/iforest/forest.go b/pkg/ensemble/iforest/forest.go index d0add3174..faa9ae6ec 100644 --- a/pkg/ensemble/iforest/forest.go +++ b/pkg/ensemble/iforest/forest.go @@ -10,21 +10,12 @@ const ( defaultNumTrees = 100 defaultSampleSize = 256 defaultScoreThreshold = 0.6 - defaultDetectionType = DetectionTypeThreshold offset = 0.5 ) type DetectionType string -const ( - DetectionTypeThreshold DetectionType = "threshold" - DetectionTypeProportion DetectionType = "proportion" -) - type Options struct { - // The method used for anomaly detection - DetectionType DetectionType `json:"detectionType"` - // The anomaly score threshold Threshold float64 `json:"threshold"` @@ -43,10 +34,6 @@ type Options struct { // SetDefaultValues applies default settings to unspecified fields func (o *Options) SetDefaultValues() { - if o.DetectionType == "" { - o.DetectionType = defaultDetectionType - } - if o.Threshold == 0 { o.Threshold = defaultScoreThreshold } @@ -153,14 +140,9 @@ func (f *IsolationForest) Predict(samples [][]float64) []int { predictions := make([]int, len(samples)) scores := f.Score(samples) - var threshold float64 - switch f.DetectionType { - case DetectionTypeThreshold: - threshold = f.Threshold - case DetectionTypeProportion: + threshold := f.Threshold + if f.Proportion > 0 { threshold = Quantile(f.Score(samples), 1-f.Proportion) - default: - panic("Invalid detection type") } for i, score := range scores { diff --git a/pkg/strategy/sentinel/strategy.go b/pkg/strategy/sentinel/strategy.go index f71b9ccc2..07a3d1b0c 100644 --- a/pkg/strategy/sentinel/strategy.go +++ b/pkg/strategy/sentinel/strategy.go @@ -21,11 +21,12 @@ func init() { } type Strategy struct { - Symbol string `json:"symbol"` - Interval types.Interval `json:"interval"` - ScoreThreshold float64 `json:"scoreThreshold"` - KLineLimit int `json:"klineLimit"` - Window int `json:"window"` + Symbol string `json:"symbol"` + Interval types.Interval `json:"interval"` + Threshold float64 `json:"threshold"` + Proportion float64 `json:"proportion"` + NumSamples int `json:"numSamples"` + Window int `json:"window"` IsolationForest *iforest.IsolationForest `json:"isolationForest"` NotificationInterval time.Duration `json:"notificationInterval"` @@ -41,12 +42,16 @@ func (s *Strategy) ID() string { } func (s *Strategy) Defaults() error { - if s.ScoreThreshold == 0 { - s.ScoreThreshold = 0.6 + if s.Threshold == 0 { + s.Threshold = 0.6 } - if s.KLineLimit == 0 { - s.KLineLimit = 1440 + if s.Proportion == 0 { + s.Proportion = 0.05 + } + + if s.NumSamples == 0 { + s.NumSamples = 1440 } if s.Window == 0 { @@ -66,6 +71,28 @@ func (s *Strategy) Defaults() error { return nil } +func (s *Strategy) Validate() error { + if s.Symbol == "" { + return fmt.Errorf("symbol is required") + } + + if s.Interval == "" { + return fmt.Errorf("interval is required") + } + + if s.NumSamples < 0 { + return fmt.Errorf("num samples should be greater than 0") + } + + if s.Window < 0 { + return fmt.Errorf("window size should be greater than 0") + } + + if s.Window > s.NumSamples { + return fmt.Errorf("window size should be less than num samples") + } + return nil +} func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) { session.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: s.Interval}) } @@ -76,31 +103,42 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se return err } - session.MarketDataStream.OnKLine(types.KLineWith(s.Symbol, s.Interval, func(kline types.KLine) { + session.MarketDataStream.OnKLineClosed(types.KLineWith(s.Symbol, s.Interval, func(kline types.KLine) { if !s.isMarketAvailable(session, s.Symbol) { return } - err := s.appendKline(kline) - if err != nil { - log.Errorf("unable to append kline %s", kline.String()) - return - } - s.klines = s.klines[len(s.klines)-s.KLineLimit:] + if s.isNewKline(kline) { + s.klines = append(s.klines, kline) - log.Infof("kline length %d", len(s.klines)) + numKlines := len(s.klines) + klineLimit := s.NumSamples + s.Window - 1 + if numKlines > klineLimit { + s.klines = s.klines[numKlines-klineLimit:] + } - volumes := s.extractVolumes(s.klines) - samples := s.generateSamples(volumes) + } + + volumes := s.getVolumeFromKlines(s.klines) - if s.shouldSkipIsolationForest(volumes, samples) { - s.logSkipIsolationForest(samples, volumes, kline) + volume := volumes.Last(0) + mean := volumes.Mean() + std := volumes.Std() + // if the volume is not significantly above the mean, we don't need to calculate the isolation forest + if volume < mean+2*std { + log.Infof("Volume is not significantly above mean, skipping isolation forest calculation, symbol: %s, volume: %f, mean: %f, std: %f", s.Symbol, volume, mean, std) return } - s.fitIsolationForest(samples) + samples := s.generateSamples(volumes) + s.trainIsolationForest(samples) + scores := s.IsolationForest.Score(samples) - s.notifyOnIsolationForestScore(scores, kline) + score := scores[len(scores)-1] + quantile := iforest.Quantile(scores, 1-s.Proportion) + log.Infof("symbol: %s, volume: %f, mean: %f, std: %f, iforest score: %f, quantile: %f", s.Symbol, volume, mean, std, score, quantile) + + s.notifyOnScoreThresholdExceeded(score, quantile) })) return nil } @@ -113,7 +151,7 @@ func (s *Strategy) isMarketAvailable(session *bbgo.ExchangeSession, symbol strin func (s *Strategy) queryHistoricalKlines(ctx context.Context, session *bbgo.ExchangeSession) error { batchQuery := batch.KLineBatchQuery{Exchange: session.Exchange} endTime := time.Now() - startTime := endTime.Add(-time.Duration(s.KLineLimit) * s.Interval.Duration()) + startTime := endTime.Add(-time.Duration(s.NumSamples+s.Window) * s.Interval.Duration()) klineC, errC := batchQuery.Query(ctx, s.Symbol, s.Interval, startTime, endTime) for { select { @@ -132,7 +170,7 @@ func (s *Strategy) queryHistoricalKlines(ctx context.Context, session *bbgo.Exch } } -func (s *Strategy) extractVolumes(klines []types.KLine) floats.Slice { +func (s *Strategy) getVolumeFromKlines(klines []types.KLine) floats.Slice { volumes := floats.Slice{} for _, kline := range klines { volumes.Push(kline.Volume.Float64()) @@ -147,60 +185,50 @@ func (s *Strategy) generateSamples(volumes floats.Slice) [][]float64 { break } subset := volumes[i : i+s.Window] - + last := subset.Last(0) mean := subset.Mean() std := subset.Std() - samples = append(samples, []float64{mean, std}) + + sample := []float64{(last - mean) / std, mean, std} + samples = append(samples, sample) } return samples } -func (s *Strategy) shouldSkipIsolationForest(volumes floats.Slice, samples [][]float64) bool { - volumeMean := volumes.Mean() - lastMovingMean := samples[len(samples)-1][0] - return lastMovingMean < volumeMean -} - -func (s *Strategy) logSkipIsolationForest(samples [][]float64, volumes floats.Slice, kline types.KLine) { - log.Infof("Skipping isolation forest calculation for symbol: %s, last moving mean: %f, average volume: %f, kline: %s", s.Symbol, samples[len(samples)-1][0], volumes.Mean(), kline.String()) -} - -func (s *Strategy) fitIsolationForest(samples [][]float64) { - if s.retrainingRateLimiter.Allow() { - s.IsolationForest = iforest.New() - s.IsolationForest.Fit(samples) - log.Infof("Isolation forest fitted with %d samples and %d/%d trees", len(samples), len(s.IsolationForest.Trees), s.IsolationForest.NumTrees) +func (s *Strategy) trainIsolationForest(samples [][]float64) { + if !s.retrainingRateLimiter.Allow() { + return } -} -func (s *Strategy) notifyOnIsolationForestScore(scores []float64, kline types.KLine) { - lastScore := scores[len(scores)-1] - log.Warnf("Symbol: %s, isolation forest score: %f, threshold: %f, kline: %s", s.Symbol, lastScore, s.ScoreThreshold, kline.String()) - if lastScore > s.ScoreThreshold { - if s.notificationRateLimiter.Allow() { - bbgo.Notify("symbol: %s isolation forest score: %f", s.Symbol, lastScore) - } - } + s.IsolationForest = iforest.New() + s.IsolationForest.Fit(samples) + log.Infof("Isolation forest fitted with %d samples and %d/%d trees", len(samples), len(s.IsolationForest.Trees), s.IsolationForest.NumTrees) } -func (s *Strategy) appendKline(kline types.KLine) error { - if len(s.klines) == 0 { - return fmt.Errorf("klines is empty") +func (s *Strategy) notifyOnScoreThresholdExceeded(score float64, quantile float64) { + // if the score is below the threshold, we don't need to notify + if score < s.Threshold { + return } - lastKline := s.klines[len(s.klines)-1] - if lastKline.Exchange != kline.Exchange { - return fmt.Errorf("last kline exchange %s is not equal to current kline exchange %s", lastKline.Exchange, kline.Exchange) + // if the score is below the quantile, we don't need to notify + if score < quantile { + return } - if lastKline.Symbol != kline.Symbol { - return fmt.Errorf("last kline symbol %s is not equal to current kline symbol %s", lastKline.Symbol, kline.Symbol) + // if the notification rate limiter is not allowed, we don't need to notify + if !s.notificationRateLimiter.Allow() { + return } - if lastKline.EndTime.After(kline.EndTime.Time()) { - return fmt.Errorf("last kline end time %s is after current kline end time %s", lastKline.EndTime, kline.EndTime) + bbgo.Notify("symbol: %s, iforest score: %f, threshold: %f, quantile: %f", s.Symbol, score, s.Threshold, quantile) +} + +func (s *Strategy) isNewKline(kline types.KLine) bool { + if len(s.klines) == 0 { + return true } - s.klines = append(s.klines, kline) - return nil + lastKline := s.klines[len(s.klines)-1] + return lastKline.EndTime.Before(kline.EndTime.Time()) }