Skip to content

Commit

Permalink
FIX: new SetSnapshot method for depthBuffer and use it on snapshot event
Browse files Browse the repository at this point in the history
  • Loading branch information
kbearXD committed Nov 20, 2024
1 parent 8641640 commit 93f68bc
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 9 deletions.
26 changes: 26 additions & 0 deletions pkg/depth/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,32 @@ func (b *Buffer) Reset() {
b.mu.Unlock()
}

func (b *Buffer) SetSnapshot(snapshot types.SliceOrderBook, firstUpdateID int64, finalArgs ...int64) error {
finalUpdateID := firstUpdateID
if len(finalArgs) > 0 {
finalUpdateID = finalArgs[0]
}

b.mu.Lock()

if b.finalUpdateID >= finalUpdateID {
b.mu.Unlock()
return nil
}

// set the final update ID so that we will know if there is an update missing
b.finalUpdateID = finalUpdateID

// set the snapshot
b.snapshot = &snapshot

b.mu.Unlock()

// should unlock first then call ready
b.EmitReady(snapshot, nil)
return nil
}

// AddUpdate adds the update to the buffer or push the update to the subscriber
func (b *Buffer) AddUpdate(o types.SliceOrderBook, firstUpdateID int64, finalArgs ...int64) error {
finalUpdateID := firstUpdateID
Expand Down
11 changes: 9 additions & 2 deletions pkg/exchange/max/maxapi/public_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,15 @@ import (

var ErrIncorrectBookEntryElementLength = errors.New("incorrect book entry element length")

const Buy = 1
const Sell = -1
const (
Buy = 1
Sell = -1
)

const (
BookEventSnapshot string = "snapshot"
BookEventUpdate string = "update"
)

var parserPool fastjson.ParserPool

Expand Down
27 changes: 20 additions & 7 deletions pkg/exchange/max/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,13 +255,26 @@ func (s *Stream) handleBookEvent(ex *Exchange) func(e max.BookEvent) {
return
}

if err := f.AddUpdate(types.SliceOrderBook{
Symbol: symbol,
Time: e.Time(),
Bids: e.Bids,
Asks: e.Asks,
}, e.FirstUpdateID, e.LastUpdateID); err != nil {
log.WithError(err).Errorf("found missing %s update event", e.Market)
if e.Event == max.BookEventSnapshot {
if err := f.SetSnapshot(types.SliceOrderBook{
Symbol: symbol,
Time: e.Time(),
Bids: e.Bids,
Asks: e.Asks,
LastUpdateId: e.LastUpdateID,
}, e.FirstUpdateID, e.LastUpdateID); err != nil {
log.WithError(err).Errorf("failed to set %s snapshot", e.Market)
}
} else {
if err := f.AddUpdate(types.SliceOrderBook{
Symbol: symbol,
Time: e.Time(),
Bids: e.Bids,
Asks: e.Asks,
LastUpdateId: e.LastUpdateID,
}, e.FirstUpdateID, e.LastUpdateID); err != nil {
log.WithError(err).Errorf("found missing %s update event", e.Market)
}
}
}
}
Expand Down

0 comments on commit 93f68bc

Please sign in to comment.