diff --git a/stores/basestore/base_index.go b/stores/basestore/base_index.go index 79dd2096..e331166e 100644 --- a/stores/basestore/base_index.go +++ b/stores/basestore/base_index.go @@ -1,21 +1,28 @@ package basestore import ( + "sync" + ipfslog "berty.tech/go-ipfs-log" "berty.tech/go-orbit-db/iface" ) type baseIndex struct { + mu sync.Mutex id []byte index []ipfslog.Entry } func (b *baseIndex) Get(_ string) interface{} { + b.mu.Lock() + defer b.mu.Unlock() return b.index } func (b *baseIndex) UpdateIndex(log ipfslog.Log, entries []ipfslog.Entry) error { + b.mu.Lock() + defer b.mu.Unlock() b.index = log.Values().Slice() return nil } diff --git a/stores/eventlogstore/log.go b/stores/eventlogstore/log.go index a4c89060..dc008e59 100644 --- a/stores/eventlogstore/log.go +++ b/stores/eventlogstore/log.go @@ -65,9 +65,12 @@ func (o *orbitDBEventLogStore) Get(ctx context.Context, cid cid.Cid) (operation. }() select { - case value := <-stream: + case value, ok := <-stream: cancel() - return value, nil + if ok { + return value, nil + } + return nil, errors.New("channel read failed") case err := <-errChan: return nil, err @@ -78,6 +81,7 @@ func (o *orbitDBEventLogStore) Get(ctx context.Context, cid cid.Cid) (operation. } func (o *orbitDBEventLogStore) Stream(ctx context.Context, resultChan chan operation.Operation, options *iface.StreamOptions) error { + defer close(resultChan) messages, err := o.query(options) if err != nil { return errors.Wrap(err, "unable to fetch query results") @@ -92,8 +96,6 @@ func (o *orbitDBEventLogStore) Stream(ctx context.Context, resultChan chan opera resultChan <- op } - close(resultChan) - return nil }