diff --git a/pkg/depth/buffer.go b/pkg/depth/buffer.go index 33dfc6e5d..f542ac0fe 100644 --- a/pkg/depth/buffer.go +++ b/pkg/depth/buffer.go @@ -259,34 +259,23 @@ func (b *Buffer) fetchAndPush() error { } var pushUpdates []Update - if b.isFutures { - for _, u := range b.buffer { - if u.FinalUpdateID < finalUpdateID { - continue - } - if u.FirstUpdateID <= finalUpdateID && u.FinalUpdateID >= finalUpdateID { - pushUpdates = append(pushUpdates, u) - } + for idx, u := range b.buffer { + // skip old events + if u.FinalUpdateID <= finalUpdateID { + continue } - } else { - for idx, u := range b.buffer { - // skip old events - if u.FinalUpdateID <= finalUpdateID { - continue - } - - if u.FirstUpdateID > finalUpdateID+1 { - // drop prior updates in the buffer since it's corrupted - b.buffer = b.buffer[idx:] - b.mu.Unlock() - return fmt.Errorf("there is a missing depth update, the update id %d > final update id %d + 1", u.FirstUpdateID, finalUpdateID) - } - - pushUpdates = append(pushUpdates, u) - - // update the final update id to the correct final update id - finalUpdateID = u.FinalUpdateID + + if u.FirstUpdateID > finalUpdateID+1 || (b.isFutures && idx > 0 && u.PreviousUpdateID != b.buffer[idx-1].FinalUpdateID) { + // drop prior updates in the buffer since it's corrupted + b.buffer = b.buffer[idx:] + b.mu.Unlock() + return fmt.Errorf("there is a missing depth update, the update id %d > final update id %d + 1", u.FirstUpdateID, finalUpdateID) } + + pushUpdates = append(pushUpdates, u) + + // update the final update id to the correct final update id + finalUpdateID = u.FinalUpdateID } // clean the buffer since we have filtered out the buffer we want @@ -294,6 +283,7 @@ func (b *Buffer) fetchAndPush() error { // set the final update ID so that we will know if there is an update missing if b.isFutures && len(pushUpdates) > 0 { + // reset finalUpdateId to first update event finalUpdateID = pushUpdates[0].FinalUpdateID } b.finalUpdateID = finalUpdateID diff --git a/pkg/depth/buffer_test.go b/pkg/depth/buffer_test.go index 480f9d99a..cdcf56b4e 100644 --- a/pkg/depth/buffer_test.go +++ b/pkg/depth/buffer_test.go @@ -190,7 +190,7 @@ func TestDepthBuffer_FuturesReadyState(t *testing.T) { readyC := make(chan struct{}) buf.OnReady(func(snapshot types.SliceOrderBook, updates []Update) { - assert.Equal(t, len(updates), 33) + assert.Greater(t, len(updates), 33) close(readyC) }) @@ -204,7 +204,7 @@ func TestDepthBuffer_FuturesReadyState(t *testing.T) { Asks: types.PriceVolumeSlice{ {Price: itov(99), Volume: itov(updateID)}, }, - }, updateID, updateID+33) + }, updateID, updateID, updateID-1) assert.NoError(t, err) } @@ -215,3 +215,60 @@ func TestDepthBuffer_FuturesReadyState(t *testing.T) { t.Fail() } } + +func TestDepthBuffer_FuturesCorruptedUpdateAtTheBeginning(t *testing.T) { + // snapshot starts from 30, + // the first ready event should have a snapshot(30) and updates (31~50) + var snapshotFinalID int64 = 0 + buf := NewBuffer(func() (types.SliceOrderBook, int64, error) { + snapshotFinalID += 30 + return types.SliceOrderBook{ + Bids: types.PriceVolumeSlice{ + {Price: itov(100), Volume: itov(1)}, + }, + Asks: types.PriceVolumeSlice{ + {Price: itov(99), Volume: itov(1)}, + }, + }, snapshotFinalID, nil + }, time.Millisecond) + + buf.UseInFutures() + + resetC := make(chan struct{}, 2) + + buf.OnReset(func() { + t.Logf("reset triggered") + resetC <- struct{}{} + }) + + var updateID int64 = 10 + for ; updateID < 100; updateID++ { + time.Sleep(time.Millisecond) + + // send corrupt update when updateID = 50 + previousUpdateId := updateID - 1 + if updateID == 50 { + previousUpdateId = updateID + 1 + } + + err := buf.AddUpdate(types.SliceOrderBook{ + Bids: types.PriceVolumeSlice{ + {Price: itov(100), Volume: itov(updateID)}, + }, + Asks: types.PriceVolumeSlice{ + {Price: itov(99), Volume: itov(updateID)}, + }, + }, updateID, updateID, previousUpdateId) + + if err != nil { + t.Log("emit reset") + buf.Reset() + } + } + + select { + case <-resetC: + case <-time.After(10 * time.Second): + t.Fail() + } +}