Skip to content

Commit

Permalink
fix: optimized store replication perf (#76)
Browse files Browse the repository at this point in the history
  • Loading branch information
glouvigny authored Jun 14, 2021
1 parent 3d609c4 commit bceaaf5
Show file tree
Hide file tree
Showing 9 changed files with 489 additions and 72 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module berty.tech/go-orbit-db
go 1.15

require (
berty.tech/go-ipfs-log v1.3.0
berty.tech/go-ipfs-log v1.4.0
github.com/davidlazar/go-crypto v0.0.0-20190912175916-7055855a373f // indirect
github.com/golang-collections/go-datastructures v0.0.0-20150211160725-59788d5eb259
github.com/golang/snappy v0.0.1 // indirect
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
bazil.org/fuse v0.0.0-20200117225306-7b5117fecadc h1:utDghgcjE8u+EBjHOgYT+dJPcnDF05KqWMBcjuJy510=
bazil.org/fuse v0.0.0-20200117225306-7b5117fecadc/go.mod h1:FbcW6z/2VytnFDhZfumh8Ss8zxHE6qpMP5sHTRe0EaM=
berty.tech/go-ipfs-log v1.3.0 h1:IyzN3LUTOklvCM/GtlpsIKWd6mFXjOG6hUcOxBWSZXc=
berty.tech/go-ipfs-log v1.3.0/go.mod h1:MyZWHZNqdnbvs1J70jsEqzjGeCvQPAWj/yGZykDnL0c=
berty.tech/go-ipfs-log v1.4.0 h1:JXD/kT7m6qdDTOpOm07lE/S3qqeNukbJN/I2QPoeIAA=
berty.tech/go-ipfs-log v1.4.0/go.mod h1:9DC6qUUc9RRHqGBCitZT8rQJfAChYmIovZJdyDLUWs8=
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.31.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
Expand Down Expand Up @@ -252,8 +252,6 @@ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO
github.com/huin/goupnp v1.0.0 h1:wg75sLpL6DZqwHQN6E1Cfk6mtfzS45z8OV+ic+DtHRo=
github.com/huin/goupnp v1.0.0/go.mod h1:n9v9KO1tAxYH82qOn+UTIFQDmx5n1Zxd/ClZDMX7Bnc=
github.com/huin/goutil v0.0.0-20170803182201-1ca381bf3150/go.mod h1:PpLOETDnJ0o3iZrZfqZzyLl6l7F3c6L1oWn7OICBi6o=
github.com/iancoleman/orderedmap v0.2.0 h1:sq1N/TFpYH++aViPcaKjys3bDClUEU7s5B+z6jq8pNA=
github.com/iancoleman/orderedmap v0.2.0/go.mod h1:N0Wam8K1arqPXNWjMo21EXnBPOPp36vB07FNRdD2geA=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/ipfs/bbloom v0.0.1/go.mod h1:oqo8CVWsJFMOZqTglBG4wydCE4IQA/G2/SEofB0rjUI=
Expand Down
28 changes: 11 additions & 17 deletions stores/basestore/base_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"fmt"
"sync"
"sync/atomic"
"time"

ipfslog "berty.tech/go-ipfs-log"
Expand Down Expand Up @@ -49,9 +50,9 @@ type BaseStore struct {
replicationStatus replicator.ReplicationInfo
stats struct {
snapshot struct {
bytesLoaded int
bytesLoaded int64
}
syncRequestsReceived int
syncRequestsReceived int64
}
referenceCount int
replicate bool
Expand All @@ -61,7 +62,6 @@ type BaseStore struct {

muCache sync.RWMutex
muIndex sync.RWMutex
muStats sync.RWMutex
muJoining sync.Mutex
sortFn ipfslog.SortFn
logger *zap.Logger
Expand Down Expand Up @@ -184,9 +184,7 @@ func (b *BaseStore) InitBaseStore(ctx context.Context, ipfs coreapi.CoreAPI, ide
b.index = options.Index(b.Identity().PublicKey)
b.muIndex.Unlock()

b.muStats.Lock()
b.stats.snapshot.bytesLoaded = -1
b.muStats.Unlock()
atomic.StoreInt64(&b.stats.snapshot.bytesLoaded, -1)

b.replicator = replicator.NewReplicator(ctx, b, options.ReplicationConcurrency, &replicator.Options{
Logger: b.logger,
Expand Down Expand Up @@ -234,11 +232,11 @@ func (b *BaseStore) InitBaseStore(ctx context.Context, ipfs coreapi.CoreAPI, ide
if b.ReplicationStatus().GetBuffered() > evt.BufferLength {
b.recalculateReplicationProgress(b.ReplicationStatus().GetProgress() + evt.BufferLength)
} else {
if _, ok := b.OpLog().GetEntries().Get(evt.Hash.String()); ok {
if _, ok := b.OpLog().Get(evt.Hash); ok {
continue
}

b.recalculateReplicationProgress(b.OpLog().GetEntries().Len() + evt.BufferLength)
b.recalculateReplicationProgress(b.OpLog().Len() + evt.BufferLength)
}

b.ReplicationStatus().SetBuffered(evt.BufferLength)
Expand All @@ -259,11 +257,9 @@ func (b *BaseStore) Close() error {
// Reset replication statistics
b.ReplicationStatus().Reset()

b.muStats.Lock()
// Reset database statistics
b.stats.snapshot.bytesLoaded = -1
b.stats.syncRequestsReceived = 0
b.muStats.Unlock()
atomic.StoreInt64(&b.stats.snapshot.bytesLoaded, -1)
atomic.StoreInt64(&b.stats.syncRequestsReceived, 0)

b.UnsubscribeAll()

Expand Down Expand Up @@ -456,9 +452,7 @@ func (b *BaseStore) Sync(ctx context.Context, heads []ipfslog.Entry) error {
ctx, span := b.tracer.Start(ctx, "store-sync")
defer span.End()

b.muStats.Lock()
b.stats.syncRequestsReceived++
b.muStats.Unlock()
atomic.AddInt64(&b.stats.syncRequestsReceived, 1)

if len(heads) == 0 {
return nil
Expand Down Expand Up @@ -703,7 +697,7 @@ func (b *BaseStore) AddOperation(ctx context.Context, op operation.Operation, on
}

func (b *BaseStore) recalculateReplicationProgress(max int) {
if opLogLen := b.OpLog().GetEntries().Len(); opLogLen > max {
if opLogLen := b.OpLog().Len(); opLogLen > max {
max = opLogLen

} else if replMax := b.ReplicationStatus().GetMax(); replMax > max {
Expand All @@ -716,7 +710,7 @@ func (b *BaseStore) recalculateReplicationProgress(max int) {
}

func (b *BaseStore) recalculateReplicationMax(max int) {
if opLogLen := b.OpLog().GetEntries().Len(); opLogLen > max {
if opLogLen := b.OpLog().Len(); opLogLen > max {
max = opLogLen

} else if replMax := b.ReplicationStatus().GetMax(); replMax > max {
Expand Down
4 changes: 2 additions & 2 deletions stores/basestore/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func SaveSnapshot(ctx context.Context, b iface.Store) (cid.Cid, error) {
header, err := json.Marshal(&storeSnapshot{
ID: oplog.GetID(),
Heads: entries,
Size: oplog.Values().Len(),
Size: oplog.Len(),
Type: b.Type(),
})

Expand All @@ -50,7 +50,7 @@ func SaveSnapshot(ctx context.Context, b iface.Store) (cid.Cid, error) {
binary.BigEndian.PutUint16(size, uint16(headerSize))
rs := append(size, header...)

for _, e := range oplog.Values().Slice() {
for _, e := range oplog.GetEntries().Slice() {
entryJSON, err := json.Marshal(e)

if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions stores/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func NewEventReplicateProgress(addr address.Address, h cid.Cid, e ipfslog.Entry,
}
}

// EventNewPeer An event sent when data has been replicated
// EventReplicated An event sent when data has been replicated
type EventReplicated struct {
Address address.Address
LogLength int
Expand All @@ -52,7 +52,7 @@ func NewEventReplicated(addr address.Address, logLength int) *EventReplicated {
}
}

// EventNewPeer An event sent when data has been loaded
// EventLoad An event sent when data has been loaded
type EventLoad struct {
Address address.Address
Heads []ipfslog.Entry
Expand Down Expand Up @@ -82,7 +82,7 @@ func NewEventLoad(addr address.Address, heads []ipfslog.Entry) *EventLoad {
// }
//}

// EventNewPeer An event sent when the store is ready
// EventReady An event sent when the store is ready
type EventReady struct {
Address address.Address
Heads []ipfslog.Entry
Expand All @@ -96,7 +96,7 @@ func NewEventReady(addr address.Address, heads []ipfslog.Entry) *EventReady {
}
}

// EventNewPeer An event sent when something has been written
// EventWrite An event sent when something has been written
type EventWrite struct {
Address address.Address
Entry ipfslog.Entry
Expand Down
87 changes: 49 additions & 38 deletions stores/replicator/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"strings"
"sync"
"sync/atomic"
"time"

ipfslog "berty.tech/go-ipfs-log"
Expand All @@ -25,11 +26,11 @@ type replicator struct {
cancelFunc context.CancelFunc
store storeInterface
fetching map[string]cid.Cid
statsTasksRequested uint
statsTasksStarted uint
statsTasksProcessed uint
statsTasksRequested int64
statsTasksStarted int64
statsTasksProcessed int64
buffer []ipfslog.Log
concurrency uint
concurrency int64
queue map[string]cid.Cid
lock sync.RWMutex
logger *zap.Logger
Expand Down Expand Up @@ -72,7 +73,7 @@ func (r *replicator) Load(ctx context.Context, cids []cid.Cid) {
defer span.End()

for _, h := range cids {
inLog := r.store.OpLog().GetEntries().UnsafeGet(h.String()) != nil
_, inLog := r.store.OpLog().Get(h)
r.lock.RLock()
_, fetching := r.fetching[h.String()]
_, queued := r.queue[h.String()]
Expand Down Expand Up @@ -115,7 +116,7 @@ func NewReplicator(ctx context.Context, store storeInterface, concurrency uint,

r := replicator{
cancelFunc: cancelFunc,
concurrency: concurrency,
concurrency: int64(concurrency),
store: store,
queue: map[string]cid.Cid{},
fetching: map[string]cid.Cid{},
Expand All @@ -133,7 +134,7 @@ func NewReplicator(ctx context.Context, store storeInterface, concurrency uint,

if r.tasksRunning() == 0 && qLen > 0 {
r.logger.Debug(fmt.Sprintf("Had to flush the queue! %d items in the queue, %d %d tasks requested/finished", qLen, r.tasksRequested(), r.tasksFinished()))
r.processQueue(ctx)
go r.processQueue(ctx)
}
case <-ctx.Done():
return
Expand All @@ -144,25 +145,16 @@ func NewReplicator(ctx context.Context, store storeInterface, concurrency uint,
return &r
}

func (r *replicator) tasksRunning() uint {
r.lock.RLock()
defer r.lock.RUnlock()

return r.statsTasksStarted - r.statsTasksProcessed
func (r *replicator) tasksRunning() int64 {
return atomic.LoadInt64(&r.statsTasksStarted) - atomic.LoadInt64(&r.statsTasksProcessed)
}

func (r *replicator) tasksRequested() uint {
r.lock.RLock()
defer r.lock.RUnlock()

return r.statsTasksRequested
func (r *replicator) tasksRequested() int64 {
return atomic.LoadInt64(&r.statsTasksRequested)
}

func (r *replicator) tasksFinished() uint {
r.lock.RLock()
defer r.lock.RUnlock()

return r.statsTasksProcessed
func (r *replicator) tasksFinished() int64 {
return atomic.LoadInt64(&r.statsTasksProcessed)
}

func (r *replicator) processOne(ctx context.Context, h cid.Cid) ([]cid.Cid, error) {
Expand All @@ -173,7 +165,7 @@ func (r *replicator) processOne(ctx context.Context, h cid.Cid) ([]cid.Cid, erro
defer r.lock.Unlock()

_, isFetching := r.fetching[h.String()]
_, hasEntry := r.store.OpLog().Values().Get(h.String())
_, hasEntry := r.store.OpLog().Get(h)

if hasEntry || isFetching {
return nil, nil
Expand All @@ -183,7 +175,7 @@ func (r *replicator) processOne(ctx context.Context, h cid.Cid) ([]cid.Cid, erro

r.Emit(ctx, NewEventLoadAdded(h))

r.statsTasksStarted++
atomic.AddInt64(&r.statsTasksStarted, 1)

l, err := ipfslog.NewFromEntryHash(ctx, r.store.IPFS(), r.store.Identity(), h, &ipfslog.LogOptions{
ID: r.store.OpLog().GetID(),
Expand All @@ -207,7 +199,7 @@ func (r *replicator) processOne(ctx context.Context, h cid.Cid) ([]cid.Cid, erro
delete(r.queue, h.String())

// Mark this task as processed
r.statsTasksProcessed++
//r.statsTasksProcessed++

// Notify subscribers that we made progress
r.Emit(ctx, NewEventLoadProgress("", h, latest, len(r.buffer))) // TODO JS: this._id should be undefined
Expand All @@ -216,6 +208,7 @@ func (r *replicator) processOne(ctx context.Context, h cid.Cid) ([]cid.Cid, erro

for _, e := range l.Values().Slice() {
nextValues = append(nextValues, e.GetNext()...)
nextValues = append(nextValues, e.GetRefs()...)
}

// Return all next pointers
Expand All @@ -230,39 +223,57 @@ func (r *replicator) processQueue(ctx context.Context) {
ctx, span := r.tracer.Start(ctx, "replicator-process-queue")
defer span.End()

var hashesList [][]cid.Cid
capacity := r.concurrency - r.tasksRunning()
slicedQueue := r.GetQueue()
if uint(len(slicedQueue)) < capacity {
capacity = uint(len(slicedQueue))
if int64(len(slicedQueue)) < capacity {
capacity = int64(len(slicedQueue))
}

items := map[string]cid.Cid{}
for _, h := range slicedQueue[:capacity] {
items[h.String()] = h
}

var hashesList = make([][]cid.Cid, len(items))
hashesListIdx := 0
wg := sync.WaitGroup{}

r.lock.Lock()
for _, e := range items {
r.lock.Lock()
delete(r.queue, e.String())
r.lock.Unlock()
}
r.lock.Unlock()

hashes, err := r.processOne(ctx, e)
if err != nil {
r.logger.Error("unable to get data to process %v", zap.Error(err))
return
}
for _, e := range items {
wg.Add(1)

go func(hashesListIdx int, e cid.Cid) {
defer wg.Done()

hashes, err := r.processOne(ctx, e)
if err != nil {
r.logger.Error("unable to get data to process %v", zap.Error(err))
return
}

hashesList[hashesListIdx] = hashes
}(hashesListIdx, e)

hashesList = append(hashesList, hashes)
hashesListIdx++
}

wg.Wait()

for _, hashes := range hashesList {
r.lock.RLock()
b := r.buffer
bLen := len(b)
r.lock.RUnlock()

if (len(items) > 0 && bLen > 0) || (r.tasksRunning() == 0 && bLen > 0) {
// Mark this task as processed
atomic.AddInt64(&r.statsTasksProcessed, 1)

if bLen > 0 && r.tasksRunning() == 0 {
r.lock.Lock()
r.buffer = []ipfslog.Log{}
r.lock.Unlock()
Expand All @@ -284,7 +295,7 @@ func (r *replicator) addToQueue(ctx context.Context, span trace.Span, h cid.Cid)
r.lock.Lock()
defer r.lock.Unlock()

r.statsTasksRequested++
atomic.AddInt64(&r.statsTasksRequested, 1)
r.queue[h.String()] = h
}

Expand Down
Loading

0 comments on commit bceaaf5

Please sign in to comment.