From b104dc3479451bd0c6af01eedb6a24fc0b7b9082 Mon Sep 17 00:00:00 2001 From: narumi Date: Thu, 16 Jan 2025 20:59:47 +0800 Subject: [PATCH 1/4] detect when the kline is closed --- config/sentinel.yaml | 11 ++-------- pkg/strategy/sentinel/strategy.go | 34 ++++++++----------------------- 2 files changed, 10 insertions(+), 35 deletions(-) diff --git a/config/sentinel.yaml b/config/sentinel.yaml index 2ead44dd1..fe03612b4 100644 --- a/config/sentinel.yaml +++ b/config/sentinel.yaml @@ -4,18 +4,11 @@ sessions: envVarPrefix: max publicOnly: true -persistence: - json: - directory: var/data - redis: - host: 127.0.0.1 - port: 6379 - db: 0 - exchangeStrategies: - on: *exchange sentinel: symbol: BTCUSDT interval: 1m - scoreThreshold: 0.6 + scoreThreshold: 0.7 klineLimit: 43200 + window: 1440 diff --git a/pkg/strategy/sentinel/strategy.go b/pkg/strategy/sentinel/strategy.go index f71b9ccc2..a8b4115e7 100644 --- a/pkg/strategy/sentinel/strategy.go +++ b/pkg/strategy/sentinel/strategy.go @@ -2,7 +2,6 @@ package sentinel import ( "context" - "fmt" "time" "github.com/c9s/bbgo/pkg/bbgo" @@ -76,19 +75,15 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se return err } - session.MarketDataStream.OnKLine(types.KLineWith(s.Symbol, s.Interval, func(kline types.KLine) { + session.MarketDataStream.OnKLineClosed(types.KLineWith(s.Symbol, s.Interval, func(kline types.KLine) { if !s.isMarketAvailable(session, s.Symbol) { return } - err := s.appendKline(kline) - if err != nil { - log.Errorf("unable to append kline %s", kline.String()) - return + if s.isNewKline(kline) { + s.klines = append(s.klines, kline) + s.klines = s.klines[len(s.klines)-s.KLineLimit:] } - s.klines = s.klines[len(s.klines)-s.KLineLimit:] - - log.Infof("kline length %d", len(s.klines)) volumes := s.extractVolumes(s.klines) samples := s.generateSamples(volumes) @@ -175,7 +170,7 @@ func (s *Strategy) fitIsolationForest(samples [][]float64) { func (s *Strategy) notifyOnIsolationForestScore(scores []float64, kline types.KLine) { lastScore := scores[len(scores)-1] - log.Warnf("Symbol: %s, isolation forest score: %f, threshold: %f, kline: %s", s.Symbol, lastScore, s.ScoreThreshold, kline.String()) + log.Infof("Symbol: %s, iforest score: %f, threshold: %f, volume: %f", s.Symbol, lastScore, s.ScoreThreshold, kline.Volume.Float64()) if lastScore > s.ScoreThreshold { if s.notificationRateLimiter.Allow() { bbgo.Notify("symbol: %s isolation forest score: %f", s.Symbol, lastScore) @@ -183,24 +178,11 @@ func (s *Strategy) notifyOnIsolationForestScore(scores []float64, kline types.KL } } -func (s *Strategy) appendKline(kline types.KLine) error { +func (s *Strategy) isNewKline(kline types.KLine) bool { if len(s.klines) == 0 { - return fmt.Errorf("klines is empty") + return true } lastKline := s.klines[len(s.klines)-1] - if lastKline.Exchange != kline.Exchange { - return fmt.Errorf("last kline exchange %s is not equal to current kline exchange %s", lastKline.Exchange, kline.Exchange) - } - - if lastKline.Symbol != kline.Symbol { - return fmt.Errorf("last kline symbol %s is not equal to current kline symbol %s", lastKline.Symbol, kline.Symbol) - } - - if lastKline.EndTime.After(kline.EndTime.Time()) { - return fmt.Errorf("last kline end time %s is after current kline end time %s", lastKline.EndTime, kline.EndTime) - } - - s.klines = append(s.klines, kline) - return nil + return lastKline.EndTime.Before(kline.EndTime.Time()) } From a8d6f1bbf01fc44a6847712007364769482897ac Mon Sep 17 00:00:00 2001 From: narumi Date: Thu, 16 Jan 2025 23:11:47 +0800 Subject: [PATCH 2/4] update config --- config/sentinel.yaml | 63 +++++++++++++++++++++++++++++++++++++++----- 1 file changed, 56 insertions(+), 7 deletions(-) diff --git a/config/sentinel.yaml b/config/sentinel.yaml index fe03612b4..137689bb5 100644 --- a/config/sentinel.yaml +++ b/config/sentinel.yaml @@ -5,10 +5,59 @@ sessions: publicOnly: true exchangeStrategies: -- on: *exchange - sentinel: - symbol: BTCUSDT - interval: 1m - scoreThreshold: 0.7 - klineLimit: 43200 - window: 1440 + - on: *exchange + sentinel: + symbol: BTCUSDT + interval: &intervel 1m + scoreThreshold: &threshold 0.7 + klineLimit: &limit 43200 + window: &window 1440 + - on: *exchange + sentinel: + symbol: ETHUSDT + interval: *intervel + scoreThreshold: *threshold + klineLimit: *limit + window: *window + - on: *exchange + sentinel: + symbol: BCHUSDT + interval: *intervel + scoreThreshold: *threshold + klineLimit: *limit + window: *window + - on: *exchange + sentinel: + symbol: LTCUSDT + interval: *intervel + scoreThreshold: *threshold + klineLimit: *limit + window: *window + - on: *exchange + sentinel: + symbol: XRPUSDT + interval: *intervel + scoreThreshold: *threshold + klineLimit: *limit + window: *window + - on: *exchange + sentinel: + symbol: MAXUSDT + interval: *intervel + scoreThreshold: *threshold + klineLimit: *limit + window: *window + - on: *exchange + sentinel: + symbol: BCNTUSDT + interval: *intervel + scoreThreshold: *threshold + klineLimit: *limit + window: *window + - on: *exchange + sentinel: + symbol: ARBUSDT + interval: *intervel + scoreThreshold: *threshold + klineLimit: *limit + window: *window \ No newline at end of file From 3d743bc712284b0770acc981bde1e4ef50d0117f Mon Sep 17 00:00:00 2001 From: narumi Date: Thu, 16 Jan 2025 23:13:45 +0800 Subject: [PATCH 3/4] use volume > mean + 2*std for volume check --- config/sentinel.yaml | 42 ++++++----- pkg/ensemble/iforest/forest.go | 22 +----- pkg/strategy/sentinel/strategy.go | 118 ++++++++++++++++++++---------- 3 files changed, 105 insertions(+), 77 deletions(-) diff --git a/config/sentinel.yaml b/config/sentinel.yaml index 137689bb5..2abc398ef 100644 --- a/config/sentinel.yaml +++ b/config/sentinel.yaml @@ -9,55 +9,63 @@ exchangeStrategies: sentinel: symbol: BTCUSDT interval: &intervel 1m - scoreThreshold: &threshold 0.7 - klineLimit: &limit 43200 - window: &window 1440 + threshold: &threshold 0.7 + proportion: &proportion 0.05 + numSamples: &numSamples 1440 + window: &window 60 - on: *exchange sentinel: symbol: ETHUSDT interval: *intervel - scoreThreshold: *threshold - klineLimit: *limit + threshold: *threshold + proportion: *proportion + numSamples: *numSamples window: *window - on: *exchange sentinel: symbol: BCHUSDT interval: *intervel - scoreThreshold: *threshold - klineLimit: *limit + threshold: *threshold + proportion: *proportion + numSamples: *numSamples window: *window - on: *exchange sentinel: symbol: LTCUSDT interval: *intervel - scoreThreshold: *threshold - klineLimit: *limit + threshold: *threshold + proportion: *proportion + numSamples: *numSamples window: *window - on: *exchange sentinel: symbol: XRPUSDT interval: *intervel - scoreThreshold: *threshold - klineLimit: *limit + threshold: *threshold + proportion: *proportion + numSamples: *numSamples window: *window - on: *exchange sentinel: symbol: MAXUSDT interval: *intervel - scoreThreshold: *threshold - klineLimit: *limit + threshold: *threshold + proportion: *proportion + numSamples: *numSamples window: *window - on: *exchange sentinel: symbol: BCNTUSDT interval: *intervel - scoreThreshold: *threshold - klineLimit: *limit + threshold: *threshold + proportion: *proportion + numSamples: *numSamples window: *window - on: *exchange sentinel: symbol: ARBUSDT interval: *intervel - scoreThreshold: *threshold - klineLimit: *limit + threshold: *threshold + proportion: *proportion + numSamples: *numSamples window: *window \ No newline at end of file diff --git a/pkg/ensemble/iforest/forest.go b/pkg/ensemble/iforest/forest.go index d0add3174..faa9ae6ec 100644 --- a/pkg/ensemble/iforest/forest.go +++ b/pkg/ensemble/iforest/forest.go @@ -10,21 +10,12 @@ const ( defaultNumTrees = 100 defaultSampleSize = 256 defaultScoreThreshold = 0.6 - defaultDetectionType = DetectionTypeThreshold offset = 0.5 ) type DetectionType string -const ( - DetectionTypeThreshold DetectionType = "threshold" - DetectionTypeProportion DetectionType = "proportion" -) - type Options struct { - // The method used for anomaly detection - DetectionType DetectionType `json:"detectionType"` - // The anomaly score threshold Threshold float64 `json:"threshold"` @@ -43,10 +34,6 @@ type Options struct { // SetDefaultValues applies default settings to unspecified fields func (o *Options) SetDefaultValues() { - if o.DetectionType == "" { - o.DetectionType = defaultDetectionType - } - if o.Threshold == 0 { o.Threshold = defaultScoreThreshold } @@ -153,14 +140,9 @@ func (f *IsolationForest) Predict(samples [][]float64) []int { predictions := make([]int, len(samples)) scores := f.Score(samples) - var threshold float64 - switch f.DetectionType { - case DetectionTypeThreshold: - threshold = f.Threshold - case DetectionTypeProportion: + threshold := f.Threshold + if f.Proportion > 0 { threshold = Quantile(f.Score(samples), 1-f.Proportion) - default: - panic("Invalid detection type") } for i, score := range scores { diff --git a/pkg/strategy/sentinel/strategy.go b/pkg/strategy/sentinel/strategy.go index a8b4115e7..9fbda9858 100644 --- a/pkg/strategy/sentinel/strategy.go +++ b/pkg/strategy/sentinel/strategy.go @@ -2,6 +2,7 @@ package sentinel import ( "context" + "fmt" "time" "github.com/c9s/bbgo/pkg/bbgo" @@ -20,11 +21,12 @@ func init() { } type Strategy struct { - Symbol string `json:"symbol"` - Interval types.Interval `json:"interval"` - ScoreThreshold float64 `json:"scoreThreshold"` - KLineLimit int `json:"klineLimit"` - Window int `json:"window"` + Symbol string `json:"symbol"` + Interval types.Interval `json:"interval"` + Threshold float64 `json:"threshold"` + Proportion float64 `json:"proportion"` + NumSamples int `json:"numSamples"` + Window int `json:"window"` IsolationForest *iforest.IsolationForest `json:"isolationForest"` NotificationInterval time.Duration `json:"notificationInterval"` @@ -40,12 +42,16 @@ func (s *Strategy) ID() string { } func (s *Strategy) Defaults() error { - if s.ScoreThreshold == 0 { - s.ScoreThreshold = 0.6 + if s.Threshold == 0 { + s.Threshold = 0.6 } - if s.KLineLimit == 0 { - s.KLineLimit = 1440 + if s.Proportion == 0 { + s.Proportion = 0.05 + } + + if s.NumSamples == 0 { + s.NumSamples = 1440 } if s.Window == 0 { @@ -65,6 +71,20 @@ func (s *Strategy) Defaults() error { return nil } +func (s *Strategy) Validate() error { + if s.NumSamples < 0 { + return fmt.Errorf("num samples should be greater than 0") + } + + if s.Window < 0 { + return fmt.Errorf("window size should be greater than 0") + } + + if s.Window > s.NumSamples { + return fmt.Errorf("window size should be less than num samples") + } + return nil +} func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) { session.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: s.Interval}) } @@ -82,20 +102,35 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se if s.isNewKline(kline) { s.klines = append(s.klines, kline) - s.klines = s.klines[len(s.klines)-s.KLineLimit:] + + numKlines := len(s.klines) + klineLimit := s.NumSamples + s.Window - 1 + if numKlines > klineLimit { + s.klines = s.klines[numKlines-klineLimit:] + } + } - volumes := s.extractVolumes(s.klines) - samples := s.generateSamples(volumes) + volumes := s.getVolumeFromKlines(s.klines) - if s.shouldSkipIsolationForest(volumes, samples) { - s.logSkipIsolationForest(samples, volumes, kline) + volume := volumes.Last(0) + mean := volumes.Mean() + std := volumes.Std() + // if the volume is not significantly above the mean, we don't need to calculate the isolation forest + if volume < mean+2*std { + log.Infof("Volume is not significantly above mean, skipping isolation forest calculation, symbol: %s, volume: %f, mean: %f, std: %f", s.Symbol, volume, mean, std) return } - s.fitIsolationForest(samples) + samples := s.generateSamples(volumes) + s.trainIsolationForest(samples) + scores := s.IsolationForest.Score(samples) - s.notifyOnIsolationForestScore(scores, kline) + score := scores[len(scores)-1] + quantile := iforest.Quantile(scores, 1-s.Proportion) + log.Infof("symbol: %s, volume: %f, mean: %f, std: %f, iforest score: %f, quantile: %f", s.Symbol, volume, mean, std, score, quantile) + + s.notifyOnScoreThresholdExceeded(score, quantile) })) return nil } @@ -108,7 +143,7 @@ func (s *Strategy) isMarketAvailable(session *bbgo.ExchangeSession, symbol strin func (s *Strategy) queryHistoricalKlines(ctx context.Context, session *bbgo.ExchangeSession) error { batchQuery := batch.KLineBatchQuery{Exchange: session.Exchange} endTime := time.Now() - startTime := endTime.Add(-time.Duration(s.KLineLimit) * s.Interval.Duration()) + startTime := endTime.Add(-time.Duration(s.NumSamples+s.Window) * s.Interval.Duration()) klineC, errC := batchQuery.Query(ctx, s.Symbol, s.Interval, startTime, endTime) for { select { @@ -127,7 +162,7 @@ func (s *Strategy) queryHistoricalKlines(ctx context.Context, session *bbgo.Exch } } -func (s *Strategy) extractVolumes(klines []types.KLine) floats.Slice { +func (s *Strategy) getVolumeFromKlines(klines []types.KLine) floats.Slice { volumes := floats.Slice{} for _, kline := range klines { volumes.Push(kline.Volume.Float64()) @@ -142,40 +177,43 @@ func (s *Strategy) generateSamples(volumes floats.Slice) [][]float64 { break } subset := volumes[i : i+s.Window] - + last := subset.Last(0) mean := subset.Mean() std := subset.Std() - samples = append(samples, []float64{mean, std}) + + sample := []float64{(last - mean) / std, mean, std} + samples = append(samples, sample) } return samples } -func (s *Strategy) shouldSkipIsolationForest(volumes floats.Slice, samples [][]float64) bool { - volumeMean := volumes.Mean() - lastMovingMean := samples[len(samples)-1][0] - return lastMovingMean < volumeMean -} +func (s *Strategy) trainIsolationForest(samples [][]float64) { + if !s.retrainingRateLimiter.Allow() { + return + } -func (s *Strategy) logSkipIsolationForest(samples [][]float64, volumes floats.Slice, kline types.KLine) { - log.Infof("Skipping isolation forest calculation for symbol: %s, last moving mean: %f, average volume: %f, kline: %s", s.Symbol, samples[len(samples)-1][0], volumes.Mean(), kline.String()) + s.IsolationForest = iforest.New() + s.IsolationForest.Fit(samples) + log.Infof("Isolation forest fitted with %d samples and %d/%d trees", len(samples), len(s.IsolationForest.Trees), s.IsolationForest.NumTrees) } -func (s *Strategy) fitIsolationForest(samples [][]float64) { - if s.retrainingRateLimiter.Allow() { - s.IsolationForest = iforest.New() - s.IsolationForest.Fit(samples) - log.Infof("Isolation forest fitted with %d samples and %d/%d trees", len(samples), len(s.IsolationForest.Trees), s.IsolationForest.NumTrees) +func (s *Strategy) notifyOnScoreThresholdExceeded(score float64, quantile float64) { + // if the score is below the threshold, we don't need to notify + if score < s.Threshold { + return } -} -func (s *Strategy) notifyOnIsolationForestScore(scores []float64, kline types.KLine) { - lastScore := scores[len(scores)-1] - log.Infof("Symbol: %s, iforest score: %f, threshold: %f, volume: %f", s.Symbol, lastScore, s.ScoreThreshold, kline.Volume.Float64()) - if lastScore > s.ScoreThreshold { - if s.notificationRateLimiter.Allow() { - bbgo.Notify("symbol: %s isolation forest score: %f", s.Symbol, lastScore) - } + // if the score is below the quantile, we don't need to notify + if score < quantile { + return } + + // if the notification rate limiter is not allowed, we don't need to notify + if !s.notificationRateLimiter.Allow() { + return + } + + bbgo.Notify("symbol: %s, iforest score: %f, threshold: %f, quantile: %f", s.Symbol, score, s.Threshold, quantile) } func (s *Strategy) isNewKline(kline types.KLine) bool { From 651d8293b9db3443fbd9b6580b14341ad7cd18b4 Mon Sep 17 00:00:00 2001 From: narumi Date: Tue, 21 Jan 2025 14:46:30 +0800 Subject: [PATCH 4/4] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20refactor(config):=20si?= =?UTF-8?q?mplify=20sentinel=20configuration=20and=20add=20validation?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config/sentinel.yaml | 56 ++++++++----------------------- pkg/strategy/sentinel/strategy.go | 8 +++++ 2 files changed, 22 insertions(+), 42 deletions(-) diff --git a/config/sentinel.yaml b/config/sentinel.yaml index 2abc398ef..b7c0dc1b3 100644 --- a/config/sentinel.yaml +++ b/config/sentinel.yaml @@ -6,66 +6,38 @@ sessions: exchangeStrategies: - on: *exchange - sentinel: + sentinel: &sentinel symbol: BTCUSDT - interval: &intervel 1m - threshold: &threshold 0.7 - proportion: &proportion 0.05 - numSamples: &numSamples 1440 - window: &window 60 + interval: 1m + threshold: 0.6 + proportion: 0.05 + numSamples: 1440 + window: 60 - on: *exchange sentinel: + <<: *sentinel symbol: ETHUSDT - interval: *intervel - threshold: *threshold - proportion: *proportion - numSamples: *numSamples - window: *window - on: *exchange sentinel: + <<: *sentinel symbol: BCHUSDT - interval: *intervel - threshold: *threshold - proportion: *proportion - numSamples: *numSamples - window: *window - on: *exchange sentinel: + <<: *sentinel symbol: LTCUSDT - interval: *intervel - threshold: *threshold - proportion: *proportion - numSamples: *numSamples - window: *window - on: *exchange sentinel: + <<: *sentinel symbol: XRPUSDT - interval: *intervel - threshold: *threshold - proportion: *proportion - numSamples: *numSamples - window: *window - on: *exchange sentinel: + <<: *sentinel symbol: MAXUSDT - interval: *intervel - threshold: *threshold - proportion: *proportion - numSamples: *numSamples - window: *window - on: *exchange sentinel: + <<: *sentinel symbol: BCNTUSDT - interval: *intervel - threshold: *threshold - proportion: *proportion - numSamples: *numSamples - window: *window - on: *exchange sentinel: - symbol: ARBUSDT - interval: *intervel - threshold: *threshold - proportion: *proportion - numSamples: *numSamples - window: *window \ No newline at end of file + <<: *sentinel + symbol: ARBUSDT \ No newline at end of file diff --git a/pkg/strategy/sentinel/strategy.go b/pkg/strategy/sentinel/strategy.go index 9fbda9858..07a3d1b0c 100644 --- a/pkg/strategy/sentinel/strategy.go +++ b/pkg/strategy/sentinel/strategy.go @@ -72,6 +72,14 @@ func (s *Strategy) Defaults() error { } func (s *Strategy) Validate() error { + if s.Symbol == "" { + return fmt.Errorf("symbol is required") + } + + if s.Interval == "" { + return fmt.Errorf("interval is required") + } + if s.NumSamples < 0 { return fmt.Errorf("num samples should be greater than 0") }