Skip to content

Commit

Permalink
✨ feat(sentinel): add historical kline querying
Browse files Browse the repository at this point in the history
  • Loading branch information
narumiruna committed Jan 8, 2025
1 parent 00aa355 commit b5a25ad
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 11 deletions.
1 change: 1 addition & 0 deletions config/sentinel.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ exchangeStrategies:
symbol: BTCUSDT
interval: 1m
scoreThreshold: 0.6
klineLimit: 43200
58 changes: 47 additions & 11 deletions pkg/strategy/sentinel/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package sentinel

import (
"context"
"fmt"
"time"

"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/datatype/floats"
"github.com/c9s/bbgo/pkg/exchange/batch"
"github.com/c9s/bbgo/pkg/types"
"github.com/narumiruna/go-iforest/pkg/iforest"
log "github.com/sirupsen/logrus"
Expand All @@ -31,6 +33,7 @@ type Strategy struct {

notificationRateLimiter *rate.Limiter
retrainingRateLimiter *rate.Limiter
klines []types.KLine
}

func (s *Strategy) ID() string {
Expand Down Expand Up @@ -68,18 +71,22 @@ func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) {
}

func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, session *bbgo.ExchangeSession) error {
err := s.queryHistoricalKlines(ctx, session)
if err != nil {
return err
}

session.MarketDataStream.OnKLine(types.KLineWith(s.Symbol, s.Interval, func(kline types.KLine) {
if !s.isMarketAvailable(session, s.Symbol) {
return
}

klines, err := s.queryKLines(ctx, session)
if err != nil {
log.Errorf("Unable to query klines: %v", err)
return
}
s.appendKline(kline)
s.klines = s.klines[len(s.klines)-s.KLineLimit:]

volumes := s.extractVolumes(klines)
log.Infof("kline length %d", len(s.klines))

volumes := s.extractVolumes(s.klines)
samples := s.generateSamples(volumes)

if s.shouldSkipIsolationForest(volumes, samples) {
Expand All @@ -99,13 +106,23 @@ func (s *Strategy) isMarketAvailable(session *bbgo.ExchangeSession, symbol strin
return ok
}

func (s *Strategy) queryKLines(ctx context.Context, session *bbgo.ExchangeSession) ([]types.KLine, error) {
func (s *Strategy) queryHistoricalKlines(ctx context.Context, session *bbgo.ExchangeSession) error {
batchQuery := batch.KLineBatchQuery{Exchange: session.Exchange}
endTime := time.Now()
options := types.KLineQueryOptions{
Limit: s.KLineLimit,
EndTime: &endTime,
startTime := endTime.Add(-time.Duration(s.KLineLimit) * s.Interval.Duration())
klineC, errC := batchQuery.Query(ctx, s.Symbol, s.Interval, startTime, endTime)
for {
select {
case <-ctx.Done():
return ctx.Err()

case kline, ok := <-klineC:
if !ok {
return <-errC
}
s.klines = append(s.klines, kline)
}
}
return session.Exchange.QueryKLines(ctx, s.Symbol, s.Interval, options)
}

func (s *Strategy) extractVolumes(klines []types.KLine) floats.Slice {
Expand Down Expand Up @@ -159,3 +176,22 @@ func (s *Strategy) notifyOnIsolationForestScore(scores []float64, kline types.KL
}
}
}

func (s *Strategy) appendKline(kline types.KLine) error {
if len(s.klines) == 0 {
return fmt.Errorf("klines is empty")
}

lastKline := s.klines[len(s.klines)-1]
if lastKline.Symbol != kline.Symbol {
return fmt.Errorf("last kline symbol %s is not equal to current kline symbol %s", lastKline.Symbol, kline.Symbol)
}

if lastKline.EndTime.After(kline.EndTime.Time()) {
return fmt.Errorf("last kline end time %s is after current kline end time %s", lastKline.EndTime, kline.EndTime)
}

s.klines = append(s.klines, kline)

return nil
}

0 comments on commit b5a25ad

Please sign in to comment.