Skip to content

Commit

Permalink
apidata: store completed candles in db (#2443)
Browse files Browse the repository at this point in the history
* cache api candles
  • Loading branch information
buck54321 authored Oct 30, 2023
1 parent 0ff3943 commit 6a12553
Show file tree
Hide file tree
Showing 14 changed files with 343 additions and 26 deletions.
9 changes: 2 additions & 7 deletions client/asset/dcr/spv.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"decred.org/dcrdex/client/asset"
"decred.org/dcrdex/dex"
"decred.org/dcrdex/dex/utils"
"decred.org/dcrwallet/v3/chain"
walleterrors "decred.org/dcrwallet/v3/errors"
"decred.org/dcrwallet/v3/p2p"
Expand Down Expand Up @@ -1050,7 +1051,7 @@ func (w *spvWallet) ticketsInRange(ctx context.Context, lowerHeight, upperHeight
// If this is a mempool scan, we cannot scan backwards, so reverse the
// result order.
if includeMempool {
reverseSlice(tickets)
utils.ReverseSlice(tickets)
}

return tickets, nil
Expand Down Expand Up @@ -1419,12 +1420,6 @@ func initLogging(netDir string) error {
return nil
}

func reverseSlice[T any](s []T) {
for i, j := 0, len(s)-1; i < j; i, j = i+1, j-1 {
s[i], s[j] = s[j], s[i]
}
}

func ticketSummaryToAssetTicket(ticketSummary *wallet.TicketSummary, hdr *wire.BlockHeader, log dex.Logger) *asset.Ticket {
spender := ""
if ticketSummary.Spender != nil {
Expand Down
5 changes: 2 additions & 3 deletions client/orderbook/orderbook.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"decred.org/dcrdex/dex"
"decred.org/dcrdex/dex/msgjson"
"decred.org/dcrdex/dex/order"
"decred.org/dcrdex/dex/utils"
)

// ErrEmptyOrderbook is returned from MidGap when the order book is empty.
Expand Down Expand Up @@ -690,9 +691,7 @@ func (ob *OrderBook) AddRecentMatches(matches [][2]int64, ts uint64) []*MatchSum
}

// Put the newest first.
for i, j := 0, len(newMatches)-1; i < j; i, j = i+1, j-1 {
newMatches[i], newMatches[j] = newMatches[j], newMatches[i]
}
utils.ReverseSlice(newMatches)

ob.matchSummaryMtx.Lock()
defer ob.matchSummaryMtx.Unlock()
Expand Down
26 changes: 25 additions & 1 deletion dex/candles/candles.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"decred.org/dcrdex/dex/msgjson"
"decred.org/dcrdex/dex/utils"
)

const (
Expand Down Expand Up @@ -68,7 +69,7 @@ func (c *Cache) Add(candle *Candle) {
return
}
c.Candles = append(c.Candles, *candle)
c.cursor = sz // len(c.candles) - 1
c.cursor = sz // len(c.Candles) - 1
}

func (c *Cache) Reset() {
Expand Down Expand Up @@ -181,6 +182,29 @@ func (c *Cache) Last() *Candle {
return &c.Candles[c.cursor]
}

// CompletedCandlesSince returns any candles that fall into an epoch after the
// epoch of the provided timestamp, and before the current epoch.
func (c *Cache) CompletedCandlesSince(lastStoredEndStamp uint64) (cs []*Candle) {
currentIdx := uint64(time.Now().UnixMilli()) / c.BinSize
lastStoredIdx := lastStoredEndStamp / c.BinSize

sz := len(c.Candles)
for i := 0; i < sz; i++ {
// iterate backwards
candle := &c.Candles[(c.cursor+sz-i)%sz]
epochIdx := candle.EndStamp / c.BinSize
if epochIdx >= currentIdx {
continue
}
if epochIdx <= lastStoredIdx {
break
}
cs = append(cs, candle)
}
utils.ReverseSlice(cs)
return
}

// combineCandles attempts to add the candidate candle to the target candle
// in-place, if they're in the same bin, otherwise returns false.
func (c *Cache) combineCandles(target, candidate *Candle) bool {
Expand Down
5 changes: 2 additions & 3 deletions dex/networks/zec/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io"
"time"

"decred.org/dcrdex/dex/utils"
"github.com/btcsuite/btcd/wire"
)

Expand Down Expand Up @@ -119,8 +120,6 @@ func readInternalByteOrder(r io.Reader, b []byte) error {
return err
}
// Reverse the bytes
for i, j := 0, len(b)-1; i < j; i, j = i+1, j-1 {
b[i], b[j] = b[j], b[i]
}
utils.ReverseSlice(b)
return nil
}
10 changes: 10 additions & 0 deletions dex/utils/generics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
// This code is available on the terms of the project LICENSE.md file,
// also available online at https://blueoakcouncil.org/license/1.0.0.

package utils

func ReverseSlice[T any](s []T) {
for i, j := 0, len(s)-1; i < j; i, j = i+1, j-1 {
s[i], s[j] = s[j], s[i]
}
}
42 changes: 36 additions & 6 deletions server/apidata/apidata.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ var (
// caches at startup.
type DBSource interface {
LoadEpochStats(base, quote uint32, caches []*candles.Cache) error
LastCandleEndStamp(base, quote uint32, candleDur uint64) (uint64, error)
InsertCandles(base, quote uint32, dur uint64, cs []*candles.Candle) error
}

// MarketSource is a source of market information. Markets are added after
Expand All @@ -43,6 +45,11 @@ type BookSource interface {
Book(mktName string) (*msgjson.OrderBook, error)
}

type cacheWithStoredTime struct {
*candles.Cache
lastStoredEndStamp uint64 // protected by DataAPI.cacheMtx
}

// DataAPI is a data API backend.
type DataAPI struct {
db DBSource
Expand All @@ -53,7 +60,7 @@ type DataAPI struct {
spots map[string]json.RawMessage

cacheMtx sync.RWMutex
marketCaches map[string]map[uint64]*candles.Cache
marketCaches map[string]map[uint64]*cacheWithStoredTime
}

// NewDataAPI is the constructor for a new DataAPI.
Expand All @@ -62,7 +69,7 @@ func NewDataAPI(dbSrc DBSource) *DataAPI {
db: dbSrc,
epochDurations: make(map[string]uint64),
spots: make(map[string]json.RawMessage),
marketCaches: make(map[string]map[uint64]*candles.Cache),
marketCaches: make(map[string]map[uint64]*cacheWithStoredTime),
}

if atomic.CompareAndSwapUint32(&started, 0, 1) {
Expand All @@ -81,18 +88,25 @@ func (s *DataAPI) AddMarketSource(mkt MarketSource) error {
}
epochDur := mkt.EpochDuration()
s.epochDurations[mktName] = epochDur
binCaches := make(map[uint64]*candles.Cache, len(binSizes)+1)
s.marketCaches[mktName] = binCaches
binCaches := make(map[uint64]*cacheWithStoredTime, len(binSizes)+1)
cacheList := make([]*candles.Cache, 0, len(binSizes)+1)
for _, binSize := range append([]uint64{epochDur}, binSizes...) {
cache := candles.NewCache(candles.CacheSize, binSize)
lastCandleEndStamp, err := s.db.LastCandleEndStamp(mkt.Base(), mkt.Quote(), cache.BinSize)
if err != nil {
return fmt.Errorf("LastCandleEndStamp: %w", err)
}
c := &cacheWithStoredTime{cache, lastCandleEndStamp}
cacheList = append(cacheList, cache)
binCaches[binSize] = cache
binCaches[binSize] = c
}
err = s.db.LoadEpochStats(mkt.Base(), mkt.Quote(), cacheList)
if err != nil {
return err
}
s.cacheMtx.Lock()
s.marketCaches[mktName] = binCaches
s.cacheMtx.Unlock()
return nil
}

Expand Down Expand Up @@ -120,7 +134,7 @@ func (s *DataAPI) ReportEpoch(base, quote uint32, epochIdx uint64, stats *matche
epochDur := s.epochDurations[mktName]
startStamp := epochIdx * epochDur
endStamp := startStamp + epochDur
var cache5min *candles.Cache
var cache5min *cacheWithStoredTime
const fiveMins = uint64(time.Minute * 5 / time.Millisecond)
candle := &candles.Candle{
StartStamp: startStamp,
Expand All @@ -137,6 +151,21 @@ func (s *DataAPI) ReportEpoch(base, quote uint32, epochIdx uint64, stats *matche
cache5min = cache
}
cache.Add(candle)

// Check if any candles need to be inserted.
// Don't insert epoch candles.
if cache.BinSize == epochDur {
continue
}

newCandles := cache.CompletedCandlesSince(cache.lastStoredEndStamp)
if len(newCandles) == 0 {
continue
}
if err := s.db.InsertCandles(base, quote, cache.BinSize, newCandles); err != nil {
return 0, 0, 0, 0, fmt.Errorf("InsertCandles: %w", err)
}
cache.lastStoredEndStamp = newCandles[len(newCandles)-1].EndStamp
}
if cache5min == nil {
return 0, 0, 0, 0, fmt.Errorf("no 5 minute cache")
Expand All @@ -161,6 +190,7 @@ func (s *DataAPI) ReportEpoch(base, quote uint32, epochIdx uint64, stats *matche
High24: high24,
Low24: low24,
}

s.spotsMtx.Lock()
s.spots[mktName], err = json.Marshal(spot)
s.spotsMtx.Unlock()
Expand Down
8 changes: 8 additions & 0 deletions server/apidata/apidata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ func (db *TDBSource) LoadEpochStats(base, quote uint32, caches []*candles.Cache)
return db.loadEpochErr
}

func (db *TDBSource) LastCandleEndStamp(base, quote uint32, candleDur uint64) (uint64, error) {
return 0, nil
}

func (db *TDBSource) InsertCandles(base, quote uint32, dur uint64, cs []*candles.Candle) error {
return nil
}

type TBookSource struct {
book *msgjson.OrderBook
}
Expand Down
60 changes: 60 additions & 0 deletions server/db/driver/pg/candles_online_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
//go:build pgonline

package pg

import (
"testing"

"decred.org/dcrdex/dex/candles"
)

func TestCandles(t *testing.T) {
if err := cleanTables(archie.db); err != nil {
t.Fatalf("cleanTables: %v", err)
}

var baseID, quoteID uint32 = 42, 0
var candleDur uint64 = 5 * 60 * 1000

lastCandle, err := archie.LastCandleEndStamp(baseID, quoteID, candleDur)
if err != nil {
t.Fatalf("Initial LastCandleEndStamp error: %v", err)
}

cands := []*candles.Candle{
{EndStamp: candleDur},
{EndStamp: candleDur * 2},
}

if err = archie.InsertCandles(baseID, quoteID, candleDur, cands); err != nil {
t.Fatalf("InsertCandles error: %v", err)
}

lastCandle, err = archie.LastCandleEndStamp(baseID, quoteID, candleDur)
if err != nil {
t.Fatalf("LastCandleEndStamp error: %v", err)
}

if lastCandle != candleDur*2 {
t.Fatalf("Wrong last candle. Wanted 2, got %d", lastCandle)
}

// Updating is fine
cands[1].MatchVolume = 1
if err = archie.InsertCandles(baseID, quoteID, candleDur, []*candles.Candle{cands[1]}); err != nil {
t.Fatalf("InsertCandles (overwrite) error: %v", err)
}

cache := candles.NewCache(5, candleDur)
if err = archie.LoadEpochStats(baseID, quoteID, []*candles.Cache{cache}); err != nil {
t.Fatalf("LoadEpochStats error: %v", err)
}

if len(cache.Candles) != 2 {
t.Fatalf("Expected 2 candles, got %d", len(cache.Candles))
}

if cache.Last().MatchVolume != 1 {
t.Fatalf("Overwrite failed")
}
}
Loading

0 comments on commit 6a12553

Please sign in to comment.