From a735598f82500a6f896787bce7a9e5350f00a8db Mon Sep 17 00:00:00 2001 From: narumi Date: Thu, 16 Jan 2025 20:59:47 +0800 Subject: [PATCH] detect when the kline is closed --- config/sentinel.yaml | 11 ++-------- pkg/strategy/sentinel/strategy.go | 34 ++++++++----------------------- 2 files changed, 10 insertions(+), 35 deletions(-) diff --git a/config/sentinel.yaml b/config/sentinel.yaml index 2ead44dd1..fe03612b4 100644 --- a/config/sentinel.yaml +++ b/config/sentinel.yaml @@ -4,18 +4,11 @@ sessions: envVarPrefix: max publicOnly: true -persistence: - json: - directory: var/data - redis: - host: 127.0.0.1 - port: 6379 - db: 0 - exchangeStrategies: - on: *exchange sentinel: symbol: BTCUSDT interval: 1m - scoreThreshold: 0.6 + scoreThreshold: 0.7 klineLimit: 43200 + window: 1440 diff --git a/pkg/strategy/sentinel/strategy.go b/pkg/strategy/sentinel/strategy.go index f71b9ccc2..a8b4115e7 100644 --- a/pkg/strategy/sentinel/strategy.go +++ b/pkg/strategy/sentinel/strategy.go @@ -2,7 +2,6 @@ package sentinel import ( "context" - "fmt" "time" "github.com/c9s/bbgo/pkg/bbgo" @@ -76,19 +75,15 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se return err } - session.MarketDataStream.OnKLine(types.KLineWith(s.Symbol, s.Interval, func(kline types.KLine) { + session.MarketDataStream.OnKLineClosed(types.KLineWith(s.Symbol, s.Interval, func(kline types.KLine) { if !s.isMarketAvailable(session, s.Symbol) { return } - err := s.appendKline(kline) - if err != nil { - log.Errorf("unable to append kline %s", kline.String()) - return + if s.isNewKline(kline) { + s.klines = append(s.klines, kline) + s.klines = s.klines[len(s.klines)-s.KLineLimit:] } - s.klines = s.klines[len(s.klines)-s.KLineLimit:] - - log.Infof("kline length %d", len(s.klines)) volumes := s.extractVolumes(s.klines) samples := s.generateSamples(volumes) @@ -175,7 +170,7 @@ func (s *Strategy) fitIsolationForest(samples [][]float64) { func (s *Strategy) notifyOnIsolationForestScore(scores []float64, kline types.KLine) { lastScore := scores[len(scores)-1] - log.Warnf("Symbol: %s, isolation forest score: %f, threshold: %f, kline: %s", s.Symbol, lastScore, s.ScoreThreshold, kline.String()) + log.Infof("Symbol: %s, iforest score: %f, threshold: %f, volume: %f", s.Symbol, lastScore, s.ScoreThreshold, kline.Volume.Float64()) if lastScore > s.ScoreThreshold { if s.notificationRateLimiter.Allow() { bbgo.Notify("symbol: %s isolation forest score: %f", s.Symbol, lastScore) @@ -183,24 +178,11 @@ func (s *Strategy) notifyOnIsolationForestScore(scores []float64, kline types.KL } } -func (s *Strategy) appendKline(kline types.KLine) error { +func (s *Strategy) isNewKline(kline types.KLine) bool { if len(s.klines) == 0 { - return fmt.Errorf("klines is empty") + return true } lastKline := s.klines[len(s.klines)-1] - if lastKline.Exchange != kline.Exchange { - return fmt.Errorf("last kline exchange %s is not equal to current kline exchange %s", lastKline.Exchange, kline.Exchange) - } - - if lastKline.Symbol != kline.Symbol { - return fmt.Errorf("last kline symbol %s is not equal to current kline symbol %s", lastKline.Symbol, kline.Symbol) - } - - if lastKline.EndTime.After(kline.EndTime.Time()) { - return fmt.Errorf("last kline end time %s is after current kline end time %s", lastKline.EndTime, kline.EndTime) - } - - s.klines = append(s.klines, kline) - return nil + return lastKline.EndTime.Before(kline.EndTime.Time()) }