Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX: [depth] fix early snapshot id checking #1828

Merged
merged 1 commit into from
Nov 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 7 additions & 13 deletions pkg/depth/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package depth
import (
"fmt"
"sync"
"sync/atomic"
"time"

log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -41,24 +40,21 @@ type Buffer struct {
updateTimeout time.Duration

// bufferingPeriod is used to buffer the update message before we get the full depth
bufferingPeriod atomic.Value
bufferingPeriod time.Duration
}

func NewBuffer(fetcher SnapshotFetcher) *Buffer {
func NewBuffer(fetcher SnapshotFetcher, bufferingPeriod time.Duration) *Buffer {
return &Buffer{
fetcher: fetcher,
resetC: make(chan struct{}, 1),
fetcher: fetcher,
resetC: make(chan struct{}, 1),
bufferingPeriod: bufferingPeriod,
}
}

func (b *Buffer) SetUpdateTimeout(d time.Duration) {
b.updateTimeout = d
}

func (b *Buffer) SetBufferingPeriod(d time.Duration) {
b.bufferingPeriod.Store(d)
}

func (b *Buffer) resetSnapshot() {
b.snapshot = nil
b.finalUpdateID = 0
Expand Down Expand Up @@ -151,7 +147,7 @@ func (b *Buffer) fetchAndPush() error {

if len(b.buffer) > 0 {
// the snapshot is too early
if finalUpdateID < b.buffer[0].FirstUpdateID {
if finalUpdateID < b.buffer[0].FirstUpdateID-1 {
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

☝️ main fix

b.resetSnapshot()
b.emitReset()
b.mu.Unlock()
Expand Down Expand Up @@ -197,9 +193,7 @@ func (b *Buffer) fetchAndPush() error {

func (b *Buffer) tryFetch() {
for {
if period := b.bufferingPeriod.Load(); period != nil {
<-time.After(period.(time.Duration))
}
<-time.After(b.bufferingPeriod)

err := b.fetchAndPush()
if err != nil {
Expand Down
3 changes: 1 addition & 2 deletions pkg/depth/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@
{Price: itov(99), Volume: itov(1)},
},
}, 33, nil
})
buf.SetBufferingPeriod(time.Millisecond * 5)
}, time.Millisecond*5)

readyC := make(chan struct{})
buf.OnReady(func(snapshot types.SliceOrderBook, updates []Update) {
Expand Down Expand Up @@ -55,7 +54,7 @@
// snapshot starts from 30,
// the first ready event should have a snapshot(30) and updates (31~50)
var snapshotFinalID int64 = 0
buf := NewBuffer(func() (types.SliceOrderBook, int64, error) {

Check failure on line 57 in pkg/depth/buffer_test.go

View workflow job for this annotation

GitHub Actions / lint

not enough arguments in call to NewBuffer
snapshotFinalID += 30
return types.SliceOrderBook{
Bids: types.PriceVolumeSlice{
Expand Down Expand Up @@ -94,7 +93,7 @@

func TestDepthBuffer_ConcurrentRun(t *testing.T) {
var snapshotFinalID int64 = 0
buf := NewBuffer(func() (types.SliceOrderBook, int64, error) {

Check failure on line 96 in pkg/depth/buffer_test.go

View workflow job for this annotation

GitHub Actions / lint

not enough arguments in call to NewBuffer
snapshotFinalID += 30
time.Sleep(10 * time.Millisecond)
return types.SliceOrderBook{
Expand Down
3 changes: 1 addition & 2 deletions pkg/exchange/binance/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,7 @@ func NewStream(ex *Exchange, client *binance.Client, futuresClient *futures.Clie
f = depth.NewBuffer(func() (types.SliceOrderBook, int64, error) {
log.Infof("fetching %s depth...", e.Symbol)
return ex.QueryDepth(context.Background(), e.Symbol)
})
f.SetBufferingPeriod(time.Second)
}, 3*time.Second)
f.OnReady(func(snapshot types.SliceOrderBook, updates []depth.Update) {
stream.EmitBookSnapshot(snapshot)
for _, u := range updates {
Expand Down
3 changes: 1 addition & 2 deletions pkg/exchange/kucoin/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,8 @@ func (s *Stream) handleOrderBookL2Event(e *WebSocketOrderBookL2Event) {
} else {
f = depth.NewBuffer(func() (types.SliceOrderBook, int64, error) {
return s.exchange.QueryDepth(context.Background(), e.Symbol)
})
}, 3*time.Second)
s.depthBuffers[e.Symbol] = f
f.SetBufferingPeriod(time.Second)
f.OnReady(func(snapshot types.SliceOrderBook, updates []depth.Update) {
if valid, err := snapshot.IsValid(); !valid {
log.Errorf("depth snapshot is invalid, error: %v", err)
Expand Down
3 changes: 1 addition & 2 deletions pkg/exchange/max/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,7 @@ func (s *Stream) handleBookEvent(ex *Exchange) func(e max.BookEvent) {
log.Infof("fetching %s depth with depth = %d...", e.Market, bookDepth)
// the depth of websocket orderbook event is 50 by default, so we use 50 as limit here
return ex.QueryDepth(context.Background(), e.Market, bookDepth)
})
f.SetBufferingPeriod(3 * time.Second)
}, 3*time.Second)
f.OnReady(func(snapshot types.SliceOrderBook, updates []depth.Update) {
s.EmitBookSnapshot(snapshot)
for _, u := range updates {
Expand Down
16 changes: 12 additions & 4 deletions pkg/strategy/xdepthmaker/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@ type Strategy struct {
StopHedgeQuoteBalance fixedpoint.Value `json:"stopHedgeQuoteBalance"`
StopHedgeBaseBalance fixedpoint.Value `json:"stopHedgeBaseBalance"`

SkipCleanUpOpenOrders bool `json:"skipCleanUpOpenOrders"`

// Quantity is used for fixed quantity of the first layer
Quantity fixedpoint.Value `json:"quantity"`

Expand Down Expand Up @@ -410,9 +412,11 @@ func (s *Strategy) quoteWorker(ctx context.Context) {
fullReplenishTicker := time.NewTicker(timejitter.Milliseconds(s.FullReplenishInterval.Duration(), 200))
defer fullReplenishTicker.Stop()

// clean up the previous open orders
if err := s.cleanUpOpenOrders(ctx, s.makerSession); err != nil {
log.WithError(err).Warnf("error cleaning up open orders")
// clean up the previous open orders before starting the quote worker
if !s.SkipCleanUpOpenOrders {
if err := s.cleanUpOpenOrders(ctx, s.makerSession); err != nil {
log.WithError(err).Warnf("error cleaning up open orders")
}
}

s.updateQuote(ctx, 0)
Expand Down Expand Up @@ -966,10 +970,14 @@ func (s *Strategy) generateMakerOrders(
for _, side := range []types.SideType{types.SideTypeBuy, types.SideTypeSell} {
sideBook := dupPricingBook.SideBook(side)
if sideBook.Len() == 0 {
log.Warnf("orderbook %s side is empty", side)
s.logger.Warnf("orderbook %s side is empty", side)
continue
}

if sideBook.Len() < 5 {
s.logger.Warnf("order book %s side is too thin, size: %d, levels: %+v", side, sideBook.Len(), sideBook)
}

availableSideBalance, ok := availableBalances[side]
if !ok {
log.Warnf("no available balance for side %s side", side)
Expand Down
1 change: 1 addition & 0 deletions pkg/types/price_volume_slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func (slice PriceVolumeSlice) ElemOrLast(i int) (PriceVolume, bool) {
return slice[i], true
}

// IndexByQuoteVolumeDepth returns the index of the price volume slice by the required quote volume depth
func (slice PriceVolumeSlice) IndexByQuoteVolumeDepth(requiredQuoteVolume fixedpoint.Value) int {
var totalQuoteVolume = fixedpoint.Zero
for x, pv := range slice {
Expand Down
Loading