Skip to content

Commit

Permalink
Merge pull request #1870 from c9s/kbearXD/max/debug
Browse files Browse the repository at this point in the history
DEBUG: [max] add log to debug depth buffer
  • Loading branch information
kbearXD authored Dec 18, 2024
2 parents 49eb0fa + 6c29950 commit 6fb4729
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 8 deletions.
27 changes: 20 additions & 7 deletions pkg/depth/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"sync"
"time"

log "github.com/sirupsen/logrus"
"github.com/sirupsen/logrus"

"github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util"
Expand Down Expand Up @@ -41,36 +41,45 @@ type Buffer struct {

// bufferingPeriod is used to buffer the update message before we get the full depth
bufferingPeriod time.Duration

logger *logrus.Entry
}

func NewBuffer(fetcher SnapshotFetcher, bufferingPeriod time.Duration) *Buffer {
return &Buffer{
fetcher: fetcher,
fetchC: make(chan struct{}, 1),
bufferingPeriod: bufferingPeriod,
logger: logrus.NewEntry(logrus.StandardLogger()),
}
}

func (b *Buffer) SetLogger(logger *logrus.Entry) {
b.logger = logger
}

func (b *Buffer) SetUpdateTimeout(d time.Duration) {
b.updateTimeout = d
}

func (b *Buffer) resetSnapshot() {
log.Info("resetting the snapshot")
b.logger.Info("resetting the snapshot")
b.snapshot = nil
b.finalUpdateID = 0
}

// emitFetch emits the fetch signal, and in the next call of AddUpdate, the buffer will try to fetch the snapshot
// if the fetch signal is already emitted, it will be ignored
func (b *Buffer) emitFetch() {
b.logger.Info("emitting fetch signal")
select {
case b.fetchC <- struct{}{}:
default:
}
}

func (b *Buffer) Reset() {
b.logger.Info("resetting this buffer")
b.mu.Lock()
b.resetSnapshot()
b.emitFetch()
Expand All @@ -92,7 +101,7 @@ func (b *Buffer) SetSnapshot(snapshot types.SliceOrderBook, firstUpdateID int64,
return nil
}

log.Info("setting the snapshot")
b.logger.Info("setting the snapshot")
// set the final update ID so that we will know if there is an update missing
b.finalUpdateID = finalUpdateID

Expand Down Expand Up @@ -123,10 +132,12 @@ func (b *Buffer) AddUpdate(o types.SliceOrderBook, firstUpdateID int64, finalArg

select {
case <-b.fetchC:
b.logger.Info("fetch signal received")
b.buffer = append(b.buffer, u)
b.resetSnapshot()
b.once.Reset()
b.once.Do(func() {
b.logger.Info("try fetching the snapshot due to fetch signal received")
go b.tryFetch()
})
b.mu.Unlock()
Expand All @@ -141,6 +152,7 @@ func (b *Buffer) AddUpdate(o types.SliceOrderBook, firstUpdateID int64, finalArg
if b.snapshot == nil {
b.buffer = append(b.buffer, u)
b.once.Do(func() {
b.logger.Info("try fetching the snapshot due to no snapshot")
go b.tryFetch()
})
b.mu.Unlock()
Expand All @@ -151,7 +163,7 @@ func (b *Buffer) AddUpdate(o types.SliceOrderBook, firstUpdateID int64, finalArg

// skip older events
if u.FinalUpdateID <= b.finalUpdateID {
log.Infof("the final update id %d of event is less than equal to the final update id %d of the snapshot, skip", u.FinalUpdateID, b.finalUpdateID)
b.logger.Infof("the final update id %d of event is less than equal to the final update id %d of the snapshot, skip", u.FinalUpdateID, b.finalUpdateID)
b.mu.Unlock()
return nil
}
Expand All @@ -163,6 +175,7 @@ func (b *Buffer) AddUpdate(o types.SliceOrderBook, firstUpdateID int64, finalArg
b.resetSnapshot()
b.once.Reset()
b.once.Do(func() {
b.logger.Info("try fetching the snapshot due to missing update")
go b.tryFetch()
})

Expand All @@ -175,7 +188,7 @@ func (b *Buffer) AddUpdate(o types.SliceOrderBook, firstUpdateID int64, finalArg
u.FirstUpdateID-b.finalUpdateID)
}

log.Debugf("depth update id %d -> %d", b.finalUpdateID, u.FinalUpdateID)
b.logger.Debugf("depth update id %d -> %d", b.finalUpdateID, u.FinalUpdateID)
b.finalUpdateID = u.FinalUpdateID
b.EmitPush(u)

Expand All @@ -191,7 +204,7 @@ func (b *Buffer) tryFetch() {

err := b.fetchAndPush()
if err != nil {
log.WithError(err).Errorf("snapshot fetch failed, retry in %s", b.bufferingPeriod)
b.logger.WithError(err).Errorf("snapshot fetch failed, retry in %s", b.bufferingPeriod)
continue
}

Expand All @@ -206,7 +219,7 @@ func (b *Buffer) fetchAndPush() error {
}

b.mu.Lock()
log.Debugf("fetched depth snapshot, final update id %d", finalUpdateID)
b.logger.Infof("fetched depth snapshot, final update id %d", finalUpdateID)

if len(b.buffer) > 0 {
// the snapshot is too early, we should re-fetch the snapshot
Expand Down
2 changes: 2 additions & 0 deletions pkg/exchange/binance/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/c9s/bbgo/pkg/depth"
"github.com/c9s/bbgo/pkg/util"
"github.com/sirupsen/logrus"

"github.com/adshao/go-binance/v2"
"github.com/adshao/go-binance/v2/futures"
Expand Down Expand Up @@ -91,6 +92,7 @@ func NewStream(ex *Exchange, client *binance.Client, futuresClient *futures.Clie
log.Infof("fetching %s depth...", e.Symbol)
return ex.QueryDepth(context.Background(), e.Symbol)
}, 3*time.Second)
f.SetLogger(logrus.WithFields(logrus.Fields{"exchange": "binance", "symbol": e.Symbol, "component": "depthBuffer"}))
f.OnReady(func(snapshot types.SliceOrderBook, updates []depth.Update) {
stream.EmitBookSnapshot(snapshot)
for _, u := range updates {
Expand Down
2 changes: 2 additions & 0 deletions pkg/exchange/kucoin/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/gorilla/websocket"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"

"github.com/c9s/bbgo/pkg/depth"
"github.com/c9s/bbgo/pkg/exchange/kucoin/kucoinapi"
Expand Down Expand Up @@ -81,6 +82,7 @@ func (s *Stream) handleOrderBookL2Event(e *WebSocketOrderBookL2Event) {
f = depth.NewBuffer(func() (types.SliceOrderBook, int64, error) {
return s.exchange.QueryDepth(context.Background(), e.Symbol)
}, 3*time.Second)
f.SetLogger(logrus.WithFields(logrus.Fields{"exchange": "kucoin", "symbol": e.Symbol, "component": "depthBuffer"}))
s.depthBuffers[e.Symbol] = f
f.OnReady(func(snapshot types.SliceOrderBook, updates []depth.Update) {
if valid, err := snapshot.IsValid(); !valid {
Expand Down
5 changes: 4 additions & 1 deletion pkg/exchange/max/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/google/uuid"
"github.com/sirupsen/logrus"

"github.com/c9s/bbgo/pkg/depth"
max "github.com/c9s/bbgo/pkg/exchange/max/maxapi"
Expand Down Expand Up @@ -197,7 +198,7 @@ func (s *Stream) handleConnect() {
}

func (s *Stream) handleDisconnect() {
log.Debugf("resetting depth snapshots...")
log.Info("resetting depth snapshots...")
for _, f := range s.depthBuffers {
f.Reset()
}
Expand Down Expand Up @@ -272,6 +273,7 @@ func (s *Stream) handleBookEvent(ex *Exchange) func(e max.BookEvent) {
// the depth of websocket orderbook event is 50 by default, so we use 50 as limit here
return ex.QueryDepth(context.Background(), e.Market, bookDepth)
}, 3*time.Second)
f.SetLogger(logrus.WithFields(logrus.Fields{"exchange": "max", "symbol": symbol, "component": "depthBuffer"}))
f.OnReady(func(snapshot types.SliceOrderBook, updates []depth.Update) {
s.EmitBookSnapshot(snapshot)
for _, u := range updates {
Expand All @@ -287,6 +289,7 @@ func (s *Stream) handleBookEvent(ex *Exchange) func(e max.BookEvent) {
// if we receive orderbook event with both asks and bids are empty, it means we need to rebuild this orderbook
shouldReset := len(e.Asks) == 0 && len(e.Bids) == 0
if shouldReset {
log.Infof("resetting %s orderbook due to both empty asks/bids...", e.Market)
f.Reset()
return
}
Expand Down

0 comments on commit 6fb4729

Please sign in to comment.