From 670dfc6f4415238e727e1124906b514711696067 Mon Sep 17 00:00:00 2001 From: c9s Date: Tue, 3 Dec 2024 14:53:53 +0800 Subject: [PATCH] depth: fix depth buffer and tests --- pkg/depth/buffer.go | 93 ++++++++++++++++++++++++---------------- pkg/depth/buffer_test.go | 31 +++++++++++--- 2 files changed, 82 insertions(+), 42 deletions(-) diff --git a/pkg/depth/buffer.go b/pkg/depth/buffer.go index c6d36c1979..80d1c8f268 100644 --- a/pkg/depth/buffer.go +++ b/pkg/depth/buffer.go @@ -32,7 +32,7 @@ type Buffer struct { readyCallbacks []func(snapshot types.SliceOrderBook, updates []Update) pushCallbacks []func(update Update) - resetC chan struct{} + fetchC chan struct{} mu sync.Mutex once util.Reonce @@ -46,7 +46,7 @@ type Buffer struct { func NewBuffer(fetcher SnapshotFetcher, bufferingPeriod time.Duration) *Buffer { return &Buffer{ fetcher: fetcher, - resetC: make(chan struct{}, 1), + fetchC: make(chan struct{}, 1), bufferingPeriod: bufferingPeriod, } } @@ -58,12 +58,13 @@ func (b *Buffer) SetUpdateTimeout(d time.Duration) { func (b *Buffer) resetSnapshot() { b.snapshot = nil b.finalUpdateID = 0 - b.EmitReset() } -func (b *Buffer) emitReset() { +// emitFetch emits the fetch signal, and in the next call of AddUpdate, the buffer will try to fetch the snapshot +// if the fetch signal is already emitted, it will be ignored +func (b *Buffer) emitFetch() { select { - case b.resetC <- struct{}{}: + case b.fetchC <- struct{}{}: default: } } @@ -71,8 +72,10 @@ func (b *Buffer) emitReset() { func (b *Buffer) Reset() { b.mu.Lock() b.resetSnapshot() - b.emitReset() + b.emitFetch() b.mu.Unlock() + + b.EmitReset() } func (b *Buffer) SetSnapshot(snapshot types.SliceOrderBook, firstUpdateID int64, finalArgs ...int64) error { @@ -114,17 +117,26 @@ func (b *Buffer) AddUpdate(o types.SliceOrderBook, firstUpdateID int64, finalArg Object: o, } - select { - case <-b.resetC: - log.Warnf("received depth reset signal, resetting...") + // if the snapshot is set to nil, we need to buffer the message + b.mu.Lock() - // if the once goroutine is still running, overwriting this once might cause "unlock of unlocked mutex" panic. + select { + case <-b.fetchC: + b.buffer = append(b.buffer, u) + b.resetSnapshot() b.once.Reset() + b.once.Do(func() { + go b.tryFetch() + }) + b.mu.Unlock() + return nil + default: + } - // if the snapshot is set to nil, we need to buffer the message - b.mu.Lock() + // snapshot is nil means we haven't fetched the snapshot yet + // we need to buffer the message if b.snapshot == nil { b.buffer = append(b.buffer, u) b.once.Do(func() { @@ -134,6 +146,9 @@ func (b *Buffer) AddUpdate(o types.SliceOrderBook, firstUpdateID int64, finalArg return nil } + // if it's ready, then we have the snapshot, we can push the update + + // skip older events if u.FinalUpdateID <= b.finalUpdateID { log.Infof("the final update id %d of event is less than equal to the final update id %d of the snapshot, skip", u.FinalUpdateID, b.finalUpdateID) b.mu.Unlock() @@ -142,16 +157,21 @@ func (b *Buffer) AddUpdate(o types.SliceOrderBook, firstUpdateID int64, finalArg // if there is a missing update, we should reset the snapshot and re-fetch the snapshot if u.FirstUpdateID > b.finalUpdateID+1 { - // emitReset will reset the once outside the mutex lock section + // drop the prior updates in the buffer since it's corrupted b.buffer = []Update{u} - finalUpdateID = b.finalUpdateID b.resetSnapshot() - b.emitReset() + b.once.Reset() + b.once.Do(func() { + go b.tryFetch() + }) + b.mu.Unlock() + b.EmitReset() + return fmt.Errorf("found missing update between finalUpdateID %d and firstUpdateID %d, diff: %d", - finalUpdateID+1, + b.finalUpdateID+1, u.FirstUpdateID, - u.FirstUpdateID-finalUpdateID) + u.FirstUpdateID-b.finalUpdateID) } log.Debugf("depth update id %d -> %d", b.finalUpdateID, u.FinalUpdateID) @@ -159,9 +179,25 @@ func (b *Buffer) AddUpdate(o types.SliceOrderBook, firstUpdateID int64, finalArg b.mu.Unlock() b.EmitPush(u) + return nil } +// tryFetch tries to fetch the snapshot and push the updates +func (b *Buffer) tryFetch() { + for { + <-time.After(b.bufferingPeriod) + + err := b.fetchAndPush() + if err != nil { + log.WithError(err).Errorf("snapshot fetch failed, retry in %s", b.bufferingPeriod) + continue + } + + break + } +} + func (b *Buffer) fetchAndPush() error { book, finalUpdateID, err := b.fetcher() if err != nil { @@ -172,25 +208,23 @@ func (b *Buffer) fetchAndPush() error { log.Debugf("fetched depth snapshot, final update id %d", finalUpdateID) if len(b.buffer) > 0 { - // the snapshot is too early + // the snapshot is too early, we should re-fetch the snapshot if finalUpdateID < b.buffer[0].FirstUpdateID-1 { - // reset buffer - b.buffer = nil b.mu.Unlock() return fmt.Errorf("depth snapshot is too early, final update %d is < the first update id %d", finalUpdateID, b.buffer[0].FirstUpdateID) } } var pushUpdates []Update - for _, u := range b.buffer { + for idx, u := range b.buffer { // skip old events if u.FinalUpdateID <= finalUpdateID { continue } if u.FirstUpdateID > finalUpdateID+1 { - // reset buffer - b.buffer = nil + // drop prior updates in the buffer since it's corrupted + b.buffer = b.buffer[idx:] b.mu.Unlock() return fmt.Errorf("there is a missing depth update, the update id %d > final update id %d + 1", u.FirstUpdateID, finalUpdateID) } @@ -216,16 +250,3 @@ func (b *Buffer) fetchAndPush() error { b.EmitReady(book, pushUpdates) return nil } - -func (b *Buffer) tryFetch() { - for { - <-time.After(b.bufferingPeriod) - - err := b.fetchAndPush() - if err != nil { - log.WithError(err).Errorf("snapshot fetch failed") - continue - } - break - } -} diff --git a/pkg/depth/buffer_test.go b/pkg/depth/buffer_test.go index 8bb05cd563..b4c88b2576 100644 --- a/pkg/depth/buffer_test.go +++ b/pkg/depth/buffer_test.go @@ -36,7 +36,7 @@ func TestDepthBuffer_ReadyState(t *testing.T) { var updateID int64 = 1 for ; updateID < 100; updateID++ { - buf.AddUpdate( + err := buf.AddUpdate( types.SliceOrderBook{ Bids: types.PriceVolumeSlice{ {Price: itov(100), Volume: itov(updateID)}, @@ -45,9 +45,15 @@ func TestDepthBuffer_ReadyState(t *testing.T) { {Price: itov(99), Volume: itov(updateID)}, }, }, updateID) + + assert.NoError(t, err) } - <-readyC + select { + case <-readyC: + case <-time.After(time.Minute): + t.Fail() + } } func TestDepthBuffer_CorruptedUpdateAtTheBeginning(t *testing.T) { @@ -64,21 +70,25 @@ func TestDepthBuffer_CorruptedUpdateAtTheBeginning(t *testing.T) { {Price: itov(99), Volume: itov(1)}, }, }, snapshotFinalID, nil - }, time.Millisecond*5) + }, time.Millisecond) - resetC := make(chan struct{}, 1) + resetC := make(chan struct{}, 2) buf.OnReset(func() { + t.Logf("reset triggered") resetC <- struct{}{} }) var updateID int64 = 10 for ; updateID < 100; updateID++ { + time.Sleep(time.Millisecond) + + // send corrupt update when updateID = 50 if updateID == 50 { updateID += 5 } - buf.AddUpdate(types.SliceOrderBook{ + err := buf.AddUpdate(types.SliceOrderBook{ Bids: types.PriceVolumeSlice{ {Price: itov(100), Volume: itov(updateID)}, }, @@ -86,9 +96,18 @@ func TestDepthBuffer_CorruptedUpdateAtTheBeginning(t *testing.T) { {Price: itov(99), Volume: itov(updateID)}, }, }, updateID) + + if err != nil { + t.Log("emit reset") + buf.Reset() + } } - <-resetC + select { + case <-resetC: + case <-time.After(10 * time.Second): + t.Fail() + } } func TestDepthBuffer_ConcurrentRun(t *testing.T) {