Skip to content

Commit

Permalink
update fetch sanpshot logic and add more test case
Browse files Browse the repository at this point in the history
  • Loading branch information
anywhy committed Feb 15, 2025
1 parent 2a79bfc commit 6b709e5
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 31 deletions.
44 changes: 17 additions & 27 deletions pkg/depth/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (b *Buffer) SetUpdateTimeout(d time.Duration) {
b.updateTimeout = d
}

func (b *Buffer) UseInFutures() {
func (b *Buffer) UseFutures() {
b.isFutures = true
}

Expand Down Expand Up @@ -259,41 +259,31 @@ func (b *Buffer) fetchAndPush() error {
}

var pushUpdates []Update
if b.isFutures {
for _, u := range b.buffer {
if u.FinalUpdateID < finalUpdateID {
continue
}
if u.FirstUpdateID <= finalUpdateID && u.FinalUpdateID >= finalUpdateID {
pushUpdates = append(pushUpdates, u)
}
for idx, u := range b.buffer {
// skip old events
if u.FinalUpdateID <= finalUpdateID {
continue
}
} else {
for idx, u := range b.buffer {
// skip old events
if u.FinalUpdateID <= finalUpdateID {
continue
}

if u.FirstUpdateID > finalUpdateID+1 {
// drop prior updates in the buffer since it's corrupted
b.buffer = b.buffer[idx:]
b.mu.Unlock()
return fmt.Errorf("there is a missing depth update, the update id %d > final update id %d + 1", u.FirstUpdateID, finalUpdateID)
}

pushUpdates = append(pushUpdates, u)

// update the final update id to the correct final update id
finalUpdateID = u.FinalUpdateID

if u.FirstUpdateID > finalUpdateID+1 || (b.isFutures && idx > 0 && u.PreviousUpdateID != b.buffer[idx-1].FinalUpdateID) {
// drop prior updates in the buffer since it's corrupted
b.buffer = b.buffer[idx:]
b.mu.Unlock()
return fmt.Errorf("there is a missing depth update, the update id %d > final update id %d + 1", u.FirstUpdateID, finalUpdateID)
}

pushUpdates = append(pushUpdates, u)

// update the final update id to the correct final update id
finalUpdateID = u.FinalUpdateID
}

// clean the buffer since we have filtered out the buffer we want
b.buffer = nil

// set the final update ID so that we will know if there is an update missing
if b.isFutures && len(pushUpdates) > 0 {
// reset finalUpdateId to first update event
finalUpdateID = pushUpdates[0].FinalUpdateID
}
b.finalUpdateID = finalUpdateID
Expand Down
63 changes: 60 additions & 3 deletions pkg/depth/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,11 @@ func TestDepthBuffer_FuturesReadyState(t *testing.T) {
}, 33, nil
}, time.Millisecond*5)

buf.UseInFutures()
buf.UseFutures()

readyC := make(chan struct{})
buf.OnReady(func(snapshot types.SliceOrderBook, updates []Update) {
assert.Equal(t, len(updates), 33)
assert.Greater(t, len(updates), 33)
close(readyC)
})

Expand All @@ -204,7 +204,7 @@ func TestDepthBuffer_FuturesReadyState(t *testing.T) {
Asks: types.PriceVolumeSlice{
{Price: itov(99), Volume: itov(updateID)},
},
}, updateID, updateID+33)
}, updateID, updateID, updateID-1)

assert.NoError(t, err)
}
Expand All @@ -215,3 +215,60 @@ func TestDepthBuffer_FuturesReadyState(t *testing.T) {
t.Fail()
}
}

func TestDepthBuffer_FuturesCorruptedUpdateAtTheBeginning(t *testing.T) {
// snapshot starts from 30,
// the first ready event should have a snapshot(30) and updates (31~50)
var snapshotFinalID int64 = 0
buf := NewBuffer(func() (types.SliceOrderBook, int64, error) {
snapshotFinalID += 30
return types.SliceOrderBook{
Bids: types.PriceVolumeSlice{
{Price: itov(100), Volume: itov(1)},
},
Asks: types.PriceVolumeSlice{
{Price: itov(99), Volume: itov(1)},
},
}, snapshotFinalID, nil
}, time.Millisecond)

buf.UseFutures()

resetC := make(chan struct{}, 2)

buf.OnReset(func() {
t.Logf("reset triggered")
resetC <- struct{}{}
})

var updateID int64 = 10
for ; updateID < 100; updateID++ {
time.Sleep(time.Millisecond)

// send corrupt update when updateID = 50
previousUpdateId := updateID - 1
if updateID == 50 {
previousUpdateId = updateID + 1
}

err := buf.AddUpdate(types.SliceOrderBook{
Bids: types.PriceVolumeSlice{
{Price: itov(100), Volume: itov(updateID)},
},
Asks: types.PriceVolumeSlice{
{Price: itov(99), Volume: itov(updateID)},
},
}, updateID, updateID, previousUpdateId)

if err != nil {
t.Log("emit reset")
buf.Reset()
}
}

select {
case <-resetC:
case <-time.After(10 * time.Second):
t.Fail()
}
}
2 changes: 1 addition & 1 deletion pkg/exchange/binance/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func NewStream(ex *Exchange, client *binance.Client, futuresClient *futures.Clie
}, 3*time.Second)

if ex.IsFutures {
f.UseInFutures()
f.UseFutures()
}

f.SetLogger(logrus.WithFields(logrus.Fields{"exchange": "binance", "symbol": e.Symbol, "component": "depthBuffer"}))
Expand Down

0 comments on commit 6b709e5

Please sign in to comment.