Skip to content

Commit

Permalink
depth: fix depth buffer and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
c9s committed Dec 3, 2024
1 parent 17a80f2 commit 670dfc6
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 42 deletions.
93 changes: 57 additions & 36 deletions pkg/depth/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type Buffer struct {
readyCallbacks []func(snapshot types.SliceOrderBook, updates []Update)
pushCallbacks []func(update Update)

resetC chan struct{}
fetchC chan struct{}
mu sync.Mutex
once util.Reonce

Expand All @@ -46,7 +46,7 @@ type Buffer struct {
func NewBuffer(fetcher SnapshotFetcher, bufferingPeriod time.Duration) *Buffer {
return &Buffer{
fetcher: fetcher,
resetC: make(chan struct{}, 1),
fetchC: make(chan struct{}, 1),
bufferingPeriod: bufferingPeriod,
}
}
Expand All @@ -58,21 +58,24 @@ func (b *Buffer) SetUpdateTimeout(d time.Duration) {
func (b *Buffer) resetSnapshot() {
b.snapshot = nil
b.finalUpdateID = 0
b.EmitReset()
}

func (b *Buffer) emitReset() {
// emitFetch emits the fetch signal, and in the next call of AddUpdate, the buffer will try to fetch the snapshot
// if the fetch signal is already emitted, it will be ignored
func (b *Buffer) emitFetch() {
select {
case b.resetC <- struct{}{}:
case b.fetchC <- struct{}{}:
default:
}
}

func (b *Buffer) Reset() {
b.mu.Lock()
b.resetSnapshot()
b.emitReset()
b.emitFetch()
b.mu.Unlock()

b.EmitReset()
}

func (b *Buffer) SetSnapshot(snapshot types.SliceOrderBook, firstUpdateID int64, finalArgs ...int64) error {
Expand Down Expand Up @@ -114,17 +117,26 @@ func (b *Buffer) AddUpdate(o types.SliceOrderBook, firstUpdateID int64, finalArg
Object: o,
}

select {
case <-b.resetC:
log.Warnf("received depth reset signal, resetting...")
// if the snapshot is set to nil, we need to buffer the message
b.mu.Lock()

// if the once goroutine is still running, overwriting this once might cause "unlock of unlocked mutex" panic.
select {
case <-b.fetchC:
b.buffer = append(b.buffer, u)
b.resetSnapshot()
b.once.Reset()
b.once.Do(func() {
go b.tryFetch()
})
b.mu.Unlock()
return nil

default:

}

// if the snapshot is set to nil, we need to buffer the message
b.mu.Lock()
// snapshot is nil means we haven't fetched the snapshot yet
// we need to buffer the message
if b.snapshot == nil {
b.buffer = append(b.buffer, u)
b.once.Do(func() {
Expand All @@ -134,6 +146,9 @@ func (b *Buffer) AddUpdate(o types.SliceOrderBook, firstUpdateID int64, finalArg
return nil
}

// if it's ready, then we have the snapshot, we can push the update

// skip older events
if u.FinalUpdateID <= b.finalUpdateID {
log.Infof("the final update id %d of event is less than equal to the final update id %d of the snapshot, skip", u.FinalUpdateID, b.finalUpdateID)
b.mu.Unlock()
Expand All @@ -142,26 +157,47 @@ func (b *Buffer) AddUpdate(o types.SliceOrderBook, firstUpdateID int64, finalArg

// if there is a missing update, we should reset the snapshot and re-fetch the snapshot
if u.FirstUpdateID > b.finalUpdateID+1 {
// emitReset will reset the once outside the mutex lock section
// drop the prior updates in the buffer since it's corrupted
b.buffer = []Update{u}
finalUpdateID = b.finalUpdateID
b.resetSnapshot()
b.emitReset()
b.once.Reset()
b.once.Do(func() {
go b.tryFetch()
})

b.mu.Unlock()
b.EmitReset()

return fmt.Errorf("found missing update between finalUpdateID %d and firstUpdateID %d, diff: %d",
finalUpdateID+1,
b.finalUpdateID+1,
u.FirstUpdateID,
u.FirstUpdateID-finalUpdateID)
u.FirstUpdateID-b.finalUpdateID)
}

log.Debugf("depth update id %d -> %d", b.finalUpdateID, u.FinalUpdateID)
b.finalUpdateID = u.FinalUpdateID
b.mu.Unlock()

b.EmitPush(u)

return nil
}

// tryFetch tries to fetch the snapshot and push the updates
func (b *Buffer) tryFetch() {
for {
<-time.After(b.bufferingPeriod)

err := b.fetchAndPush()
if err != nil {
log.WithError(err).Errorf("snapshot fetch failed, retry in %s", b.bufferingPeriod)
continue
}

break
}
}

func (b *Buffer) fetchAndPush() error {
book, finalUpdateID, err := b.fetcher()
if err != nil {
Expand All @@ -172,25 +208,23 @@ func (b *Buffer) fetchAndPush() error {
log.Debugf("fetched depth snapshot, final update id %d", finalUpdateID)

if len(b.buffer) > 0 {
// the snapshot is too early
// the snapshot is too early, we should re-fetch the snapshot
if finalUpdateID < b.buffer[0].FirstUpdateID-1 {
// reset buffer
b.buffer = nil
b.mu.Unlock()
return fmt.Errorf("depth snapshot is too early, final update %d is < the first update id %d", finalUpdateID, b.buffer[0].FirstUpdateID)
}
}

var pushUpdates []Update
for _, u := range b.buffer {
for idx, u := range b.buffer {
// skip old events
if u.FinalUpdateID <= finalUpdateID {
continue
}

if u.FirstUpdateID > finalUpdateID+1 {
// reset buffer
b.buffer = nil
// drop prior updates in the buffer since it's corrupted
b.buffer = b.buffer[idx:]
b.mu.Unlock()
return fmt.Errorf("there is a missing depth update, the update id %d > final update id %d + 1", u.FirstUpdateID, finalUpdateID)
}
Expand All @@ -216,16 +250,3 @@ func (b *Buffer) fetchAndPush() error {
b.EmitReady(book, pushUpdates)
return nil
}

func (b *Buffer) tryFetch() {
for {
<-time.After(b.bufferingPeriod)

err := b.fetchAndPush()
if err != nil {
log.WithError(err).Errorf("snapshot fetch failed")
continue
}
break
}
}
31 changes: 25 additions & 6 deletions pkg/depth/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestDepthBuffer_ReadyState(t *testing.T) {

var updateID int64 = 1
for ; updateID < 100; updateID++ {
buf.AddUpdate(
err := buf.AddUpdate(
types.SliceOrderBook{
Bids: types.PriceVolumeSlice{
{Price: itov(100), Volume: itov(updateID)},
Expand All @@ -45,9 +45,15 @@ func TestDepthBuffer_ReadyState(t *testing.T) {
{Price: itov(99), Volume: itov(updateID)},
},
}, updateID)

assert.NoError(t, err)
}

<-readyC
select {
case <-readyC:
case <-time.After(time.Minute):
t.Fail()
}
}

func TestDepthBuffer_CorruptedUpdateAtTheBeginning(t *testing.T) {
Expand All @@ -64,31 +70,44 @@ func TestDepthBuffer_CorruptedUpdateAtTheBeginning(t *testing.T) {
{Price: itov(99), Volume: itov(1)},
},
}, snapshotFinalID, nil
}, time.Millisecond*5)
}, time.Millisecond)

resetC := make(chan struct{}, 1)
resetC := make(chan struct{}, 2)

buf.OnReset(func() {
t.Logf("reset triggered")
resetC <- struct{}{}
})

var updateID int64 = 10
for ; updateID < 100; updateID++ {
time.Sleep(time.Millisecond)

// send corrupt update when updateID = 50
if updateID == 50 {
updateID += 5
}

buf.AddUpdate(types.SliceOrderBook{
err := buf.AddUpdate(types.SliceOrderBook{
Bids: types.PriceVolumeSlice{
{Price: itov(100), Volume: itov(updateID)},
},
Asks: types.PriceVolumeSlice{
{Price: itov(99), Volume: itov(updateID)},
},
}, updateID)

if err != nil {
t.Log("emit reset")
buf.Reset()
}
}

<-resetC
select {
case <-resetC:
case <-time.After(10 * time.Second):
t.Fail()
}
}

func TestDepthBuffer_ConcurrentRun(t *testing.T) {
Expand Down

0 comments on commit 670dfc6

Please sign in to comment.