From fdfd686cdc1da70c2c2980a1fa9c98ab2fd25dcf Mon Sep 17 00:00:00 2001 From: Tine Jozelj Date: Thu, 25 Jun 2020 16:50:29 +0200 Subject: [PATCH 1/4] feat(stores): add document store Signed-off-by: Tine Jozelj --- iface/interface.go | 29 ++++- orbitdb.go | 26 ++++ stores/documentstore/doc.go | 2 + stores/documentstore/document.go | 149 +++++++++++++++++++++++ stores/documentstore/index.go | 90 ++++++++++++++ tests/docs_test.go | 196 +++++++++++++++++++++++++++++++ 6 files changed, 491 insertions(+), 1 deletion(-) create mode 100644 stores/documentstore/doc.go create mode 100644 stores/documentstore/document.go create mode 100644 stores/documentstore/index.go create mode 100644 tests/docs_test.go diff --git a/iface/interface.go b/iface/interface.go index 177e2c4e..c15380bc 100644 --- a/iface/interface.go +++ b/iface/interface.go @@ -89,6 +89,18 @@ type BaseOrbitDB interface { Tracer() trace.Tracer } +// OrbitDBDocumentStore An OrbitDB instance providing a Document store +type OrbitDBDocumentStore interface { + BaseOrbitDB + OrbitDBDocumentStoreProvider +} + +// OrbitDBDocumentStoreProvider Exposes a method providing a document store +type OrbitDBDocumentStoreProvider interface { + // Docs Creates or opens an DocumentStore + Docs(ctx context.Context, address string, options *CreateDBOptions) (DocumentStore, error) +} + // OrbitDBKVStore An OrbitDB instance providing a KeyValue store type OrbitDBKVStore interface { BaseOrbitDB @@ -119,6 +131,7 @@ type OrbitDB interface { OrbitDBKVStoreProvider OrbitDBLogStoreProvider + OrbitDBDocumentStoreProvider } // StreamOptions Defines the parameters that can be given to the Stream function of an EventLogStore @@ -216,7 +229,7 @@ type EventLogStore interface { List(ctx context.Context, options *StreamOptions) ([]operation.Operation, error) } -// EventLogStore A type of store that provides a key value store +// KeyValueStore A type of store that provides a key value store type KeyValueStore interface { Store @@ -233,6 +246,20 @@ type KeyValueStore interface { Get(ctx context.Context, key string) ([]byte, error) } +// DocumentStore A type of store that provides a document store +type DocumentStore interface { + Store + + // Put Stores the document + Put(ctx context.Context, document map[string]interface{}) (operation.Operation, error) + + // Delete Clears the document for a key + Delete(ctx context.Context, key string) (operation.Operation, error) + + // Get Retrieves the document for a key + Get(ctx context.Context, key string, caseSensitive bool) ([]map[string]interface{}, error) +} + // StoreIndex Index contains the state of a datastore, // ie. what data we currently have. // diff --git a/orbitdb.go b/orbitdb.go index 473b0ca5..6fd02073 100644 --- a/orbitdb.go +++ b/orbitdb.go @@ -8,6 +8,7 @@ import ( "berty.tech/go-orbit-db/accesscontroller/simple" "berty.tech/go-orbit-db/baseorbitdb" "berty.tech/go-orbit-db/iface" + "berty.tech/go-orbit-db/stores/documentstore" "berty.tech/go-orbit-db/stores/eventlogstore" "berty.tech/go-orbit-db/stores/kvstore" coreapi "github.com/ipfs/interface-go-ipfs-core" @@ -30,6 +31,9 @@ type EventLogStore = iface.EventLogStore // KeyValueStore An alias of the type defined in the iface package type KeyValueStore = iface.KeyValueStore +// DocumentStore An alias of the type defined in the iface package +type DocumentStore = iface.DocumentStore + // StoreIndex An alias of the type defined in the iface package type StoreIndex = iface.StoreIndex @@ -64,6 +68,7 @@ func NewOrbitDB(ctx context.Context, i coreapi.CoreAPI, options *NewOrbitDBOptio odb.RegisterStoreType("eventlog", eventlogstore.NewOrbitDBEventLogStore) odb.RegisterStoreType("keyvalue", kvstore.NewOrbitDBKeyValue) + odb.RegisterStoreType("docstore", documentstore.NewOrbitDBDocumentStore) _ = odb.RegisterAccessControllerType(ipfs.NewIPFSAccessController) _ = odb.RegisterAccessControllerType(orbitdb.NewOrbitDBAccessController) @@ -123,4 +128,25 @@ func (o *orbitDB) KeyValue(ctx context.Context, address string, options *CreateD return kvStore, nil } +func (o *orbitDB) Docs(ctx context.Context, address string, options *CreateDBOptions) (DocumentStore, error) { + if options == nil { + options = &CreateDBOptions{} + } + + options.Create = boolPtr(true) + options.StoreType = stringPtr("docstore") + + store, err := o.Open(ctx, address, options) + if err != nil { + return nil, errors.Wrap(err, "unable to open database") + } + + documentStore, ok := store.(DocumentStore) + if !ok { + return nil, errors.New("unable to cast store to document") + } + + return documentStore, nil +} + var _ OrbitDB = (*orbitDB)(nil) diff --git a/stores/documentstore/doc.go b/stores/documentstore/doc.go new file mode 100644 index 00000000..3e348d6b --- /dev/null +++ b/stores/documentstore/doc.go @@ -0,0 +1,2 @@ +// documentstore a document store for OrbitDB +package documentstore // import "berty.tech/go-orbit-db/stores/documentstore" diff --git a/stores/documentstore/document.go b/stores/documentstore/document.go new file mode 100644 index 00000000..67c9a9ff --- /dev/null +++ b/stores/documentstore/document.go @@ -0,0 +1,149 @@ +package documentstore + +import ( + "context" + "encoding/json" + "fmt" + "strings" + + "berty.tech/go-ipfs-log/identityprovider" + "berty.tech/go-orbit-db/address" + "berty.tech/go-orbit-db/iface" + "berty.tech/go-orbit-db/stores/basestore" + "berty.tech/go-orbit-db/stores/operation" + coreapi "github.com/ipfs/interface-go-ipfs-core" + "github.com/pkg/errors" +) + +type orbitDBDocumentStore struct { + basestore.BaseStore + + indexBy string +} + +func (o *orbitDBDocumentStore) Get(ctx context.Context, key string, caseSensitive bool) ([]map[string]interface{}, error) { + numTerms := len(strings.Split(key, " ")) + if numTerms > 1 { + key = strings.ReplaceAll(key, ".", " ") + key = strings.ToLower(key) + } else { + key = strings.ToLower(key) + } + + docIndex, ok := o.Index().(*documentIndex) + if !ok { + return nil, errors.New("unable to cast index to documentIndex") + } + + documents := make([]map[string]interface{}, 0) + + for _, indexKey := range docIndex.Keys() { + if caseSensitive && strings.Contains(indexKey, key) { + op, ok := o.Index().Get(indexKey).(operation.Operation) + if !ok { + return nil, errors.New("unable to cast document to operation") + } + var valueJSON map[string]interface{} + err := json.Unmarshal(op.GetValue(), &valueJSON) + if err != nil { + return nil, errors.Wrap(err, "unable to unmarshal index content") + } + documents = append(documents, valueJSON) + } + if !caseSensitive { + if numTerms > 1 { + indexKey = strings.ReplaceAll(indexKey, ".", " ") + } + lower := strings.ToLower(indexKey) + + if strings.Contains(lower, key) { + op, ok := o.Index().Get(indexKey).(operation.Operation) + if !ok { + return nil, errors.New("unable to cast document to operation") + } + var valueJSON map[string]interface{} + err := json.Unmarshal(op.GetValue(), &valueJSON) + if err != nil { + return nil, errors.Wrap(err, "unable to unmarshal index content") + } + documents = append(documents, valueJSON) + } + } + } + + return documents, nil +} + +func (o *orbitDBDocumentStore) Put(ctx context.Context, document map[string]interface{}) (operation.Operation, error) { + + index, ok := document[o.indexBy] + if !ok { + return nil, fmt.Errorf("index '%s' not present in value", index) + } + + indexStr, ok := index.(string) + if !ok { + return nil, errors.New("unable to cast index to string") + } + + documentJSON, err := json.Marshal(document) + if err != nil { + return nil, errors.Wrapf(err, "failed marshaling value to json") + } + + op := operation.NewOperation(&indexStr, "PUT", documentJSON) + + e, err := o.AddOperation(ctx, op, nil) + if err != nil { + return nil, errors.Wrap(err, "error while adding operation") + } + + op, err = operation.ParseOperation(e) + if err != nil { + return nil, errors.Wrap(err, "unable to parse newly created entry") + } + + return op, nil +} + +func (o *orbitDBDocumentStore) Delete(ctx context.Context, key string) (operation.Operation, error) { + if e := o.Index().Get(key); e == nil { + return nil, fmt.Errorf("no entry with key '%s' in database", key) + } + + op := operation.NewOperation(&key, "DEL", nil) + + e, err := o.AddOperation(ctx, op, nil) + if err != nil { + return nil, errors.Wrap(err, "error while adding operation") + } + + op, err = operation.ParseOperation(e) + if err != nil { + return nil, errors.Wrap(err, "unable to parse newly created entry") + } + + return op, nil +} + +func (o *orbitDBDocumentStore) Type() string { + return "docstore" +} + +// NewOrbitDBDocumentStore Instantiates a new DocumentStore +func NewOrbitDBDocumentStore(ctx context.Context, ipfs coreapi.CoreAPI, identity *identityprovider.Identity, addr address.Address, options *iface.NewStoreOptions) (iface.Store, error) { + store := &orbitDBDocumentStore{} + options.Index = NewDocumentIndex + + // TODO: How can we pass this via options? + store.indexBy = "_id" + + err := store.InitBaseStore(ctx, ipfs, identity, addr, options) + if err != nil { + return nil, errors.Wrap(err, "unable to initialize base store") + } + + return store, nil +} + +var _ iface.DocumentStore = &orbitDBDocumentStore{} diff --git a/stores/documentstore/index.go b/stores/documentstore/index.go new file mode 100644 index 00000000..febd9b2f --- /dev/null +++ b/stores/documentstore/index.go @@ -0,0 +1,90 @@ +package documentstore + +import ( + "sync" + + ipfslog "berty.tech/go-ipfs-log" + "berty.tech/go-orbit-db/iface" + "berty.tech/go-orbit-db/stores/operation" + "github.com/pkg/errors" +) + +type documentIndex struct { + iface.StoreIndex + index map[string]operation.Operation + muIndex sync.RWMutex +} + +func (i *documentIndex) Keys() []string { + i.muIndex.RLock() + defer i.muIndex.RUnlock() + + keys := make([]string, len(i.index)) + + for key := range i.index { + keys = append(keys, key) + } + + return keys +} + +func (i *documentIndex) Get(key string) interface{} { + i.muIndex.RLock() + defer i.muIndex.RUnlock() + + if i.index == nil { + return nil + } + + entry, ok := i.index[key] + if !ok { + return nil + } + + return entry +} + +func (i *documentIndex) UpdateIndex(oplog ipfslog.Log, _ []ipfslog.Entry) error { + entries := oplog.Values().Slice() + size := len(entries) + + handled := map[string]struct{}{} + + i.muIndex.Lock() + defer i.muIndex.Unlock() + + for idx := range entries { + item, err := operation.ParseOperation(entries[size-idx-1]) + if err != nil { + return errors.Wrap(err, "unable to parse log documentstore operation") + } + + key := item.GetKey() + if key == nil { + // ignoring entries with nil keys + continue + } + + if _, ok := handled[*item.GetKey()]; !ok { + handled[*item.GetKey()] = struct{}{} + + if item.GetOperation() == "PUT" { + i.index[*item.GetKey()] = item + } else if item.GetOperation() == "DEL" { + delete(i.index, *item.GetKey()) + } + } + } + + return nil +} + +// NewDocumentIndex Creates a new index for a Document Store +func NewDocumentIndex(_ []byte) iface.StoreIndex { + return &documentIndex{ + index: make(map[string]operation.Operation), + } +} + +var _ iface.IndexConstructor = NewDocumentIndex +var _ iface.StoreIndex = &documentIndex{} diff --git a/tests/docs_test.go b/tests/docs_test.go new file mode 100644 index 00000000..352f7cd1 --- /dev/null +++ b/tests/docs_test.go @@ -0,0 +1,196 @@ +package tests + +import ( + "context" + "testing" + + orbitdb2 "berty.tech/go-orbit-db" + . "github.com/smartystreets/goconvey/convey" +) + +func TestDocumentsStore(t *testing.T) { + tmpDir, clean := testingTempDir(t, "db-docsstore") + defer clean() + + cases := []struct{ Name, Directory string }{ + {Name: "in memory", Directory: ":memory:"}, + {Name: "persistent", Directory: tmpDir}, + } + + for _, c := range cases { + t.Run(c.Name, func(t *testing.T) { + testingDocsStore(t, c.Directory) + }) + } +} + +func createDocument(id string, key string, value string) map[string]interface{} { + m := make(map[string]interface{}) + m["_id"] = id + m[key] = value + return m +} + +func testingDocsStore(t *testing.T, dir string) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + dbname := "orbit-db-tests" + Convey("orbit-db - Documents Database", t, FailureHalts, func(c C) { + mocknet := testingMockNet(ctx) + + node, clean := testingIPFSNode(ctx, t, mocknet) + defer clean() + + db1IPFS := testingCoreAPI(t, node) + + orbitdb1, err := orbitdb2.NewOrbitDB(ctx, db1IPFS, &orbitdb2.NewOrbitDBOptions{ + Directory: &dir, + }) + defer orbitdb1.Close() + + c.So(err, ShouldBeNil) + + db, err := orbitdb1.Docs(ctx, dbname, nil) + c.So(err, ShouldBeNil) + + defer db.Close() + + c.Convey("creates and opens a database", FailureHalts, func(c C) { + db, err := orbitdb1.Docs(ctx, "first docs database", nil) + c.So(err, ShouldBeNil) + + if db == nil { + t.Fatalf("db should not be nil") + } + + defer db.Close() + + c.So(db.Type(), ShouldEqual, "docstore") + c.So(db.DBName(), ShouldEqual, "first docs database") + }) + + c.Convey("put", FailureHalts, func(c C) { + document := createDocument("doc1", "hello", "world") + _, err := db.Put(ctx, document) + c.So(err, ShouldBeNil) + + docs, err := db.Get(ctx, "doc1", true) + c.So(err, ShouldBeNil) + c.So(len(docs), ShouldEqual, 1) + c.So(docs[0], ShouldResemble, document) + }) + + c.Convey("get", FailureHalts, func(c C) { + document := createDocument("doc1", "hello", "world") + _, err := db.Put(ctx, document) + c.So(err, ShouldBeNil) + + docs, err := db.Get(ctx, "doc1", true) + c.So(err, ShouldBeNil) + c.So(len(docs), ShouldEqual, 1) + c.So(docs[0], ShouldResemble, document) + }) + + c.Convey("get case insensitive", FailureHalts, func(c C) { + document := createDocument("DOC1", "hello", "world") + _, err := db.Put(ctx, document) + c.So(err, ShouldBeNil) + + docs, err := db.Get(ctx, "doc1", false) + c.So(err, ShouldBeNil) + c.So(len(docs), ShouldEqual, 1) + c.So(docs[0], ShouldResemble, document) + }) + + c.Convey("get case sensitive without match", FailureHalts, func(c C) { + document := createDocument("DOC1", "hello", "world") + _, err := db.Put(ctx, document) + c.So(err, ShouldBeNil) + + docs, err := db.Get(ctx, "doc1", true) + c.So(err, ShouldBeNil) + c.So(len(docs), ShouldEqual, 0) + }) + + c.Convey("put updates a value", FailureHalts, func(c C) { + documentOne := createDocument("doc1", "hello", "world") + _, err := db.Put(ctx, documentOne) + c.So(err, ShouldBeNil) + + documentTwo := createDocument("doc1", "hello", "galaxy") + _, err = db.Put(ctx, documentTwo) + c.So(err, ShouldBeNil) + + docs, err := db.Get(ctx, "doc1", true) + c.So(err, ShouldBeNil) + c.So(len(docs), ShouldEqual, 1) + c.So(docs[0], ShouldResemble, documentTwo) + }) + + c.Convey("put/get - multiple keys", FailureHalts, func(c C) { + documentOne := createDocument("doc1", "hello", "world") + _, err := db.Put(ctx, documentOne) + c.So(err, ShouldBeNil) + + documentTwo := createDocument("doc2", "hello", "galaxy") + _, err = db.Put(ctx, documentTwo) + c.So(err, ShouldBeNil) + + documentThree := createDocument("doc3", "hello", "universe") + _, err = db.Put(ctx, documentThree) + c.So(err, ShouldBeNil) + + docsOne, err := db.Get(ctx, "doc1", true) + c.So(err, ShouldBeNil) + + docsTwo, err := db.Get(ctx, "doc2", true) + c.So(err, ShouldBeNil) + + docsThree, err := db.Get(ctx, "doc3", true) + c.So(err, ShouldBeNil) + + c.So(len(docsOne), ShouldEqual, 1) + c.So(docsOne[0], ShouldResemble, documentOne) + c.So(len(docsTwo), ShouldEqual, 1) + c.So(docsTwo[0], ShouldResemble, documentTwo) + c.So(len(docsThree), ShouldEqual, 1) + c.So(docsThree[0], ShouldResemble, documentThree) + }) + + c.Convey("deletes a key", FailureHalts, func(c C) { + document := createDocument("doc1", "hello", "world") + _, err := db.Put(ctx, document) + c.So(err, ShouldBeNil) + + _, err = db.Delete(ctx, "doc1") + c.So(err, ShouldBeNil) + + docs, err := db.Get(ctx, "doc1", true) + c.So(err, ShouldBeNil) + c.So(len(docs), ShouldEqual, 0) + }) + + c.Convey("deletes a key after multiple updates", FailureHalts, func(c C) { + documentOne := createDocument("doc1", "hello", "world") + _, err := db.Put(ctx, documentOne) + c.So(err, ShouldBeNil) + + documentTwo := createDocument("doc1", "hello", "galaxy") + _, err = db.Put(ctx, documentTwo) + c.So(err, ShouldBeNil) + + documentThree := createDocument("doc1", "hello", "universe") + _, err = db.Put(ctx, documentThree) + c.So(err, ShouldBeNil) + + _, err = db.Delete(ctx, "doc1") + c.So(err, ShouldBeNil) + + docs, err := db.Get(ctx, "doc1", true) + c.So(err, ShouldBeNil) + c.So(len(docs), ShouldEqual, 0) + }) + + }) +} From 2880e8a11e4286b5f435a80ad277d247f049082b Mon Sep 17 00:00:00 2001 From: Tine Jozelj Date: Thu, 25 Jun 2020 18:21:30 +0200 Subject: [PATCH 2/4] style: gofmt code Signed-off-by: Tine Jozelj --- iface/interface.go | 4 ++-- stores/documentstore/index.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/iface/interface.go b/iface/interface.go index c15380bc..eb1175e4 100644 --- a/iface/interface.go +++ b/iface/interface.go @@ -97,8 +97,8 @@ type OrbitDBDocumentStore interface { // OrbitDBDocumentStoreProvider Exposes a method providing a document store type OrbitDBDocumentStoreProvider interface { - // Docs Creates or opens an DocumentStore - Docs(ctx context.Context, address string, options *CreateDBOptions) (DocumentStore, error) + // Docs Creates or opens an DocumentStore + Docs(ctx context.Context, address string, options *CreateDBOptions) (DocumentStore, error) } // OrbitDBKVStore An OrbitDB instance providing a KeyValue store diff --git a/stores/documentstore/index.go b/stores/documentstore/index.go index febd9b2f..251bd088 100644 --- a/stores/documentstore/index.go +++ b/stores/documentstore/index.go @@ -29,7 +29,7 @@ func (i *documentIndex) Keys() []string { } func (i *documentIndex) Get(key string) interface{} { - i.muIndex.RLock() + i.muIndex.RLock() defer i.muIndex.RUnlock() if i.index == nil { From fbb5d005c4260c2a86863685166ba8281f6d3a2f Mon Sep 17 00:00:00 2001 From: Guillaume Louvigny Date: Fri, 20 Aug 2021 11:06:15 +0200 Subject: [PATCH 3/4] feat(docstore): added PutAll/PutBatch + style Signed-off-by: Guillaume Louvigny --- baseorbitdb/orbitdb.go | 21 +-- iface/interface.go | 38 +++-- orbitdb.go | 8 +- stores/documentstore/document.go | 220 +++++++++++++++++++------- stores/documentstore/index.go | 53 +++++-- stores/operation/interface.go | 8 + stores/operation/operation.go | 51 ++++++ tests/docs_test.go | 264 +++++++++++++++++++++---------- 8 files changed, 482 insertions(+), 181 deletions(-) diff --git a/baseorbitdb/orbitdb.go b/baseorbitdb/orbitdb.go index fa0238a2..2139269d 100644 --- a/baseorbitdb/orbitdb.go +++ b/baseorbitdb/orbitdb.go @@ -754,16 +754,17 @@ func (o *orbitDB) createStore(ctx context.Context, storeType string, parsedDBAdd } store, err := storeFunc(ctx, o.IPFS(), identity, parsedDBAddress, &iface.NewStoreOptions{ - AccessController: accessController, - Cache: options.Cache, - Replicate: options.Replicate, - Directory: *options.Directory, - SortFn: options.SortFn, - CacheDestroy: func() error { return o.cache.Destroy(o.directory, parsedDBAddress) }, - Logger: o.logger, - Tracer: o.tracer, - IO: options.IO, - SharedKey: options.SharedKey, + AccessController: accessController, + Cache: options.Cache, + Replicate: options.Replicate, + Directory: *options.Directory, + SortFn: options.SortFn, + CacheDestroy: func() error { return o.cache.Destroy(o.directory, parsedDBAddress) }, + Logger: o.logger, + Tracer: o.tracer, + IO: options.IO, + SharedKey: options.SharedKey, + StoreSpecificOpts: options.StoreSpecificOpts, }) if err != nil { return nil, errors.Wrap(err, "unable to instantiate store") diff --git a/iface/interface.go b/iface/interface.go index eb1175e4..1b7d8a13 100644 --- a/iface/interface.go +++ b/iface/interface.go @@ -38,6 +38,14 @@ type CreateDBOptions struct { SortFn ipfslog.SortFn IO ipfslog.IO SharedKey enc.SharedKey + StoreSpecificOpts interface{} +} + +type CreateDocumentDBOptions struct { + KeyExtractor func(interface{}) (string, error) + Marshal func(interface{}) ([]byte, error) + Unmarshal func(data []byte, v interface{}) error + ItemFactory func() interface{} } // DetermineAddressOptions Lists the arguments used to determine a store address @@ -70,7 +78,7 @@ type BaseOrbitDB interface { // RegisterStoreType Registers a new store type RegisterStoreType(storeType string, constructor StoreConstructor) - // RegisterStoreType Removes a store type + // UnregisterStoreType Removes a store type UnregisterStoreType(storeType string) // RegisterAccessControllerType Registers a new access controller type @@ -107,7 +115,7 @@ type OrbitDBKVStore interface { OrbitDBKVStoreProvider } -// OrbitDBLogStoreProvider Exposes a method providing a key value store +// OrbitDBKVStoreProvider Exposes a method providing a key value store type OrbitDBKVStoreProvider interface { // KeyValue Creates or opens an KeyValueStore KeyValue(ctx context.Context, address string, options *CreateDBOptions) (KeyValueStore, error) @@ -165,7 +173,7 @@ type Store interface { // Replicator Returns the Replicator object Replicator() replicator.Replicator - // Replicator Returns the Cache object + // Cache Returns the Cache object Cache() datastore.Datastore // Drop Removes all the local store content @@ -246,18 +254,29 @@ type KeyValueStore interface { Get(ctx context.Context, key string) ([]byte, error) } +type DocumentStoreGetOptions struct { + CaseInsensitive bool + PartialMatches bool +} + // DocumentStore A type of store that provides a document store type DocumentStore interface { Store // Put Stores the document - Put(ctx context.Context, document map[string]interface{}) (operation.Operation, error) + Put(ctx context.Context, document interface{}) (operation.Operation, error) // Delete Clears the document for a key Delete(ctx context.Context, key string) (operation.Operation, error) + // PutBatch Add values as multiple operations and returns the latest + PutBatch(ctx context.Context, values []interface{}) (operation.Operation, error) + + // PutAll Add values as a single operation and returns it + PutAll(ctx context.Context, values []interface{}) (operation.Operation, error) + // Get Retrieves the document for a key - Get(ctx context.Context, key string, caseSensitive bool) ([]map[string]interface{}, error) + Get(ctx context.Context, key string, opts *DocumentStoreGetOptions) ([]interface{}, error) } // StoreIndex Index contains the state of a datastore, @@ -299,6 +318,7 @@ type NewStoreOptions struct { Tracer trace.Tracer IO ipfslog.IO SharedKey enc.SharedKey + StoreSpecificOpts interface{} } type DirectChannelOptions struct { @@ -311,7 +331,7 @@ type DirectChannel interface { // Connect Waits for the other peer to be connected Connect(context.Context) error - // Sends Sends a message to the other peer + // Send Sends a message to the other peer Send(context.Context, []byte) error // Close Closes the connection @@ -343,15 +363,15 @@ type PubSubTopic interface { // WatchPeers subscribes to peers joining or leaving the topic WatchPeers(ctx context.Context) (<-chan events.Event, error) - // WatchMessages + // WatchMessages Subscribes to new messages WatchMessages(ctx context.Context) (<-chan *EventPubSubMessage, error) - // Returns the topic name + // Topic Returns the topic name Topic() string } type PubSubInterface interface { - // Subscribe Subscribes to a topic + // TopicSubscribe Subscribes to a topic TopicSubscribe(ctx context.Context, topic string) (PubSubTopic, error) } diff --git a/orbitdb.go b/orbitdb.go index 6fd02073..54b65650 100644 --- a/orbitdb.go +++ b/orbitdb.go @@ -3,6 +3,9 @@ package orbitdb import ( "context" + coreapi "github.com/ipfs/interface-go-ipfs-core" + "github.com/pkg/errors" + "berty.tech/go-orbit-db/accesscontroller/ipfs" "berty.tech/go-orbit-db/accesscontroller/orbitdb" "berty.tech/go-orbit-db/accesscontroller/simple" @@ -11,8 +14,6 @@ import ( "berty.tech/go-orbit-db/stores/documentstore" "berty.tech/go-orbit-db/stores/eventlogstore" "berty.tech/go-orbit-db/stores/kvstore" - coreapi "github.com/ipfs/interface-go-ipfs-core" - "github.com/pkg/errors" ) type orbitDB struct { @@ -52,6 +53,9 @@ type StreamOptions = iface.StreamOptions // CreateDBOptions An alias of the type defined in the iface package type CreateDBOptions = iface.CreateDBOptions +// CreateDocumentDBOptions An alias of the type defined in the iface package +type CreateDocumentDBOptions = iface.CreateDocumentDBOptions + // DetermineAddressOptions An alias of the type defined in the iface package type DetermineAddressOptions = iface.DetermineAddressOptions diff --git a/stores/documentstore/document.go b/stores/documentstore/document.go index 67c9a9ff..c65fe54a 100644 --- a/stores/documentstore/document.go +++ b/stores/documentstore/document.go @@ -11,96 +11,96 @@ import ( "berty.tech/go-orbit-db/iface" "berty.tech/go-orbit-db/stores/basestore" "berty.tech/go-orbit-db/stores/operation" + coreapi "github.com/ipfs/interface-go-ipfs-core" - "github.com/pkg/errors" ) type orbitDBDocumentStore struct { basestore.BaseStore - - indexBy string + docOpts *iface.CreateDocumentDBOptions } -func (o *orbitDBDocumentStore) Get(ctx context.Context, key string, caseSensitive bool) ([]map[string]interface{}, error) { - numTerms := len(strings.Split(key, " ")) - if numTerms > 1 { +func (o *orbitDBDocumentStore) Get(ctx context.Context, key string, opts *iface.DocumentStoreGetOptions) ([]interface{}, error) { + if opts == nil { + opts = &iface.DocumentStoreGetOptions{} + } + + hasMultipleTerms := strings.Contains(key, " ") + if hasMultipleTerms { key = strings.ReplaceAll(key, ".", " ") - key = strings.ToLower(key) - } else { + } + if opts.CaseInsensitive { key = strings.ToLower(key) } docIndex, ok := o.Index().(*documentIndex) if !ok { - return nil, errors.New("unable to cast index to documentIndex") + return nil, fmt.Errorf("unable to cast index to documentIndex") } - documents := make([]map[string]interface{}, 0) + documents := []interface{}(nil) for _, indexKey := range docIndex.Keys() { - if caseSensitive && strings.Contains(indexKey, key) { - op, ok := o.Index().Get(indexKey).(operation.Operation) - if !ok { - return nil, errors.New("unable to cast document to operation") - } - var valueJSON map[string]interface{} - err := json.Unmarshal(op.GetValue(), &valueJSON) - if err != nil { - return nil, errors.Wrap(err, "unable to unmarshal index content") + indexKeyForSearch := indexKey + + if opts.CaseInsensitive { + indexKeyForSearch = strings.ToLower(indexKeyForSearch) + if hasMultipleTerms { + indexKeyForSearch = strings.ReplaceAll(indexKeyForSearch, ".", " ") } - documents = append(documents, valueJSON) } - if !caseSensitive { - if numTerms > 1 { - indexKey = strings.ReplaceAll(indexKey, ".", " ") + + if !opts.PartialMatches { + if indexKeyForSearch != key { + continue } - lower := strings.ToLower(indexKey) - - if strings.Contains(lower, key) { - op, ok := o.Index().Get(indexKey).(operation.Operation) - if !ok { - return nil, errors.New("unable to cast document to operation") - } - var valueJSON map[string]interface{} - err := json.Unmarshal(op.GetValue(), &valueJSON) - if err != nil { - return nil, errors.Wrap(err, "unable to unmarshal index content") - } - documents = append(documents, valueJSON) + } else if opts.PartialMatches { + if !strings.Contains(indexKeyForSearch, key) { + continue } } - } - return documents, nil -} + value := o.Index().Get(indexKey) + if value == nil { + return nil, fmt.Errorf("value not found for key %s", indexKey) + } -func (o *orbitDBDocumentStore) Put(ctx context.Context, document map[string]interface{}) (operation.Operation, error) { + if _, ok := value.([]byte); !ok { + return nil, fmt.Errorf("invalid type for key %s", indexKey) + } - index, ok := document[o.indexBy] - if !ok { - return nil, fmt.Errorf("index '%s' not present in value", index) + out := o.docOpts.ItemFactory() + if err := o.docOpts.Unmarshal(value.([]byte), &out); err != nil { + return nil, fmt.Errorf("unable to unmarshal value for key %s: %w", indexKey, err) + } + + documents = append(documents, out) } - indexStr, ok := index.(string) - if !ok { - return nil, errors.New("unable to cast index to string") + return documents, nil +} + +func (o *orbitDBDocumentStore) Put(ctx context.Context, document interface{}) (operation.Operation, error) { + key, err := o.docOpts.KeyExtractor(document) + if err != nil { + return nil, fmt.Errorf("unable to extract key from value: %w", err) } - documentJSON, err := json.Marshal(document) + data, err := o.docOpts.Marshal(document) if err != nil { - return nil, errors.Wrapf(err, "failed marshaling value to json") + return nil, fmt.Errorf("unable to marshal value: %w", err) } - op := operation.NewOperation(&indexStr, "PUT", documentJSON) + op := operation.NewOperation(&key, "PUT", data) e, err := o.AddOperation(ctx, op, nil) if err != nil { - return nil, errors.Wrap(err, "error while adding operation") + return nil, fmt.Errorf("error while adding operation: %w", err) } op, err = operation.ParseOperation(e) if err != nil { - return nil, errors.Wrap(err, "unable to parse newly created entry") + return nil, fmt.Errorf("unable to parse newly created entry: %w", err) } return op, nil @@ -115,12 +115,64 @@ func (o *orbitDBDocumentStore) Delete(ctx context.Context, key string) (operatio e, err := o.AddOperation(ctx, op, nil) if err != nil { - return nil, errors.Wrap(err, "error while adding operation") + return nil, fmt.Errorf("error while adding operation: %w", err) } op, err = operation.ParseOperation(e) if err != nil { - return nil, errors.Wrap(err, "unable to parse newly created entry") + return nil, fmt.Errorf("unable to parse newly created entry: %w", err) + } + + return op, nil +} + +// PutBatch Add values as multiple operations and returns the latest +func (o *orbitDBDocumentStore) PutBatch(ctx context.Context, values []interface{}) (operation.Operation, error) { + if len(values) == 0 { + return nil, fmt.Errorf("nothing to add to the store") + } + + op := operation.Operation(nil) + var err error + for _, val := range values { + op, err = o.Put(ctx, val) + if err != nil { + return nil, fmt.Errorf("unable to add data to the store: %w", err) + } + } + + return op, nil +} + +// PutAll Add values as a single operation and returns it +func (o *orbitDBDocumentStore) PutAll(ctx context.Context, values []interface{}) (operation.Operation, error) { + toAdd := map[string][]byte{} + + for _, val := range values { + key, err := o.docOpts.KeyExtractor(val) + if err != nil { + return nil, fmt.Errorf("one of the provided documents has no index key") + } + + data, err := o.docOpts.Marshal(val) + if err != nil { + return nil, fmt.Errorf("unable to marshal one of the provided documents") + } + + toAdd[key] = data + } + + empty := "" + op := operation.NewOperationWithDocuments(&empty, "PUTALL", toAdd) + + e, err := o.AddOperation(ctx, op, nil) + if err != nil { + return nil, fmt.Errorf("error while adding operation: %w", err) + } + + op, err = operation.ParseOperation(e) + if err != nil { + return nil, fmt.Errorf("unable to parse newly created entry: %w", err) } return op, nil @@ -130,17 +182,69 @@ func (o *orbitDBDocumentStore) Type() string { return "docstore" } +func MapKeyExtractor(keyField string) func(obj interface{}) (string, error) { + return func(obj interface{}) (string, error) { + mapped, ok := obj.(map[string]interface{}) + if !ok { + return "", fmt.Errorf("can't extract key from something else than a `map[string]interface{}` entry") + } + + val, ok := mapped[keyField] + if !ok { + return "", fmt.Errorf("missing value for field `%s` in entry", keyField) + } + + key, ok := val.(string) + if !ok { + return "", fmt.Errorf("value for field `%s` is not a string", keyField) + } + + return key, nil + } +} + +func DefaultStoreOptsForMap(keyField string) *iface.CreateDocumentDBOptions { + return &iface.CreateDocumentDBOptions{ + Marshal: json.Marshal, + Unmarshal: json.Unmarshal, + KeyExtractor: MapKeyExtractor(keyField), + ItemFactory: func() interface{} { return map[string]interface{}{} }, + } +} + // NewOrbitDBDocumentStore Instantiates a new DocumentStore func NewOrbitDBDocumentStore(ctx context.Context, ipfs coreapi.CoreAPI, identity *identityprovider.Identity, addr address.Address, options *iface.NewStoreOptions) (iface.Store, error) { - store := &orbitDBDocumentStore{} - options.Index = NewDocumentIndex + if options.StoreSpecificOpts == nil { + options.StoreSpecificOpts = DefaultStoreOptsForMap("_id") + } + + docOpts, ok := options.StoreSpecificOpts.(*iface.CreateDocumentDBOptions) + if !ok { + return nil, fmt.Errorf("invalid type supplied for opts.StoreSpecificOpts") + } + + if docOpts.Marshal == nil { + return nil, fmt.Errorf("missing value for option opts.StoreSpecificOpts.Marshal") + } + + if docOpts.Unmarshal == nil { + return nil, fmt.Errorf("missing value for option opts.StoreSpecificOpts.Unmarshal") + } + + if docOpts.ItemFactory == nil { + return nil, fmt.Errorf("missing value for option opts.StoreSpecificOpts.ItemFactory") + } + + if docOpts.KeyExtractor == nil { + return nil, fmt.Errorf("missing value for option opts.StoreSpecificOpts.ExtractKey") + } - // TODO: How can we pass this via options? - store.indexBy = "_id" + store := &orbitDBDocumentStore{docOpts: docOpts} + options.Index = func(_ []byte) iface.StoreIndex { return newDocumentIndex(docOpts) } err := store.InitBaseStore(ctx, ipfs, identity, addr, options) if err != nil { - return nil, errors.Wrap(err, "unable to initialize base store") + return nil, fmt.Errorf("unable to initialize document store: %w", err) } return store, nil diff --git a/stores/documentstore/index.go b/stores/documentstore/index.go index 251bd088..b7452737 100644 --- a/stores/documentstore/index.go +++ b/stores/documentstore/index.go @@ -1,18 +1,19 @@ package documentstore import ( + "fmt" "sync" ipfslog "berty.tech/go-ipfs-log" "berty.tech/go-orbit-db/iface" "berty.tech/go-orbit-db/stores/operation" - "github.com/pkg/errors" ) type documentIndex struct { iface.StoreIndex - index map[string]operation.Operation + index map[string][]byte muIndex sync.RWMutex + opts *iface.CreateDocumentDBOptions } func (i *documentIndex) Keys() []string { @@ -21,8 +22,10 @@ func (i *documentIndex) Keys() []string { keys := make([]string, len(i.index)) + idx := 0 for key := range i.index { - keys = append(keys, key) + keys[idx] = key + idx++ } return keys @@ -56,35 +59,51 @@ func (i *documentIndex) UpdateIndex(oplog ipfslog.Log, _ []ipfslog.Entry) error for idx := range entries { item, err := operation.ParseOperation(entries[size-idx-1]) if err != nil { - return errors.Wrap(err, "unable to parse log documentstore operation") + return fmt.Errorf("unable to parse log documentstore operation: %w", err) + } + + if item.GetOperation() == "PUTALL" { + for _, opDoc := range item.GetDocs() { + if _, ok := handled[opDoc.GetKey()]; ok { + continue + } + + handled[*item.GetKey()] = struct{}{} + i.index[opDoc.GetKey()] = opDoc.GetValue() + } + + continue } key := item.GetKey() - if key == nil { - // ignoring entries with nil keys + if key == nil || *key == "" { + // ignoring entries with nil or empty keys continue } - if _, ok := handled[*item.GetKey()]; !ok { - handled[*item.GetKey()] = struct{}{} + if _, ok := handled[*item.GetKey()]; ok { + continue + } - if item.GetOperation() == "PUT" { - i.index[*item.GetKey()] = item - } else if item.GetOperation() == "DEL" { - delete(i.index, *item.GetKey()) - } + handled[*item.GetKey()] = struct{}{} + switch item.GetOperation() { + case "PUT": + i.index[*item.GetKey()] = item.GetValue() + + case "DEL": + delete(i.index, *item.GetKey()) } } return nil } -// NewDocumentIndex Creates a new index for a Document Store -func NewDocumentIndex(_ []byte) iface.StoreIndex { +// newDocumentIndex Creates a new index for a Document Store +func newDocumentIndex(opts *iface.CreateDocumentDBOptions) iface.StoreIndex { return &documentIndex{ - index: make(map[string]operation.Operation), + index: map[string][]byte{}, + opts: opts, } } -var _ iface.IndexConstructor = NewDocumentIndex var _ iface.StoreIndex = &documentIndex{} diff --git a/stores/operation/interface.go b/stores/operation/interface.go index 739adc14..2c4ffde7 100644 --- a/stores/operation/interface.go +++ b/stores/operation/interface.go @@ -4,6 +4,11 @@ import ( ipfslog "berty.tech/go-ipfs-log" ) +type OpDoc interface { + GetKey() string + GetValue() []byte +} + // Operation Describe an CRDT operation type Operation interface { // GetKey Gets a key if applicable (ie. key value stores) @@ -18,6 +23,9 @@ type Operation interface { // GetEntry Gets the underlying IPFS log Entry GetEntry() ipfslog.Entry + // GetDocs Gets the list of documents + GetDocs() []OpDoc + // Marshal Serializes the operation Marshal() ([]byte, error) } diff --git a/stores/operation/operation.go b/stores/operation/operation.go index e8b9ae76..999a5795 100644 --- a/stores/operation/operation.go +++ b/stores/operation/operation.go @@ -8,10 +8,24 @@ import ( "github.com/pkg/errors" ) +type opDoc struct { + Key string `json:"key,omitempty"` + Value []byte `json:"value,omitempty"` +} + +func (b *opDoc) GetKey() string { + return b.Key +} + +func (b *opDoc) GetValue() []byte { + return b.Value +} + type operation struct { Key *string `json:"key,omitempty"` Op string `json:"op,omitempty"` Value []byte `json:"value,omitempty"` + Docs []*opDoc `json:"docs,omitempty"` Entry ipfslog.Entry `json:"-"` } @@ -35,6 +49,16 @@ func (o *operation) GetEntry() ipfslog.Entry { return o.Entry } +func (o *operation) GetDocs() []OpDoc { + ret := make([]OpDoc, len(o.Docs)) + + for i, val := range o.Docs { + ret[i] = val + } + + return ret +} + // ParseOperation Gets the operation from an entry func ParseOperation(e ipfslog.Entry) (Operation, error) { if e == nil { @@ -53,6 +77,13 @@ func ParseOperation(e ipfslog.Entry) (Operation, error) { return &op, nil } +func NewOpDoc(key string, value []byte) OpDoc { + return &opDoc{ + Key: key, + Value: value, + } +} + // NewOperation Creates a new operation func NewOperation(key *string, op string, value []byte) Operation { return &operation{ @@ -62,4 +93,24 @@ func NewOperation(key *string, op string, value []byte) Operation { } } +// NewOperationWithDocuments Creates a new operation from a map of batched documents +func NewOperationWithDocuments(key *string, op string, docs map[string][]byte) Operation { + _docs := make([]*opDoc, len(docs)) + + i := 0 + for k, v := range docs { + _docs[i] = &opDoc{ + Key: k, + Value: v, + } + i++ + } + + return &operation{ + Key: key, + Op: op, + Docs: _docs, + } +} + var _ Operation = &operation{} diff --git a/tests/docs_test.go b/tests/docs_test.go index 352f7cd1..04c19dbe 100644 --- a/tests/docs_test.go +++ b/tests/docs_test.go @@ -4,8 +4,11 @@ import ( "context" "testing" + "github.com/stretchr/testify/require" + orbitdb2 "berty.tech/go-orbit-db" - . "github.com/smartystreets/goconvey/convey" + "berty.tech/go-orbit-db/iface" + "berty.tech/go-orbit-db/stores/documentstore" ) func TestDocumentsStore(t *testing.T) { @@ -36,7 +39,8 @@ func testingDocsStore(t *testing.T, dir string) { defer cancel() dbname := "orbit-db-tests" - Convey("orbit-db - Documents Database", t, FailureHalts, func(c C) { + + t.Run("orbit-db - Documents Database", func(t *testing.T) { mocknet := testingMockNet(ctx) node, clean := testingIPFSNode(ctx, t, mocknet) @@ -47,150 +51,240 @@ func testingDocsStore(t *testing.T, dir string) { orbitdb1, err := orbitdb2.NewOrbitDB(ctx, db1IPFS, &orbitdb2.NewOrbitDBOptions{ Directory: &dir, }) - defer orbitdb1.Close() + defer func() { _ = orbitdb1.Close() }() - c.So(err, ShouldBeNil) + require.NoError(t, err) db, err := orbitdb1.Docs(ctx, dbname, nil) - c.So(err, ShouldBeNil) + require.NoError(t, err) - defer db.Close() + defer func() { _ = db.Close() }() - c.Convey("creates and opens a database", FailureHalts, func(c C) { + t.Run("creates and opens a database", func(t *testing.T) { db, err := orbitdb1.Docs(ctx, "first docs database", nil) - c.So(err, ShouldBeNil) + require.NoError(t, err) if db == nil { t.Fatalf("db should not be nil") } - defer db.Close() + defer func() { _ = db.Close() }() - c.So(db.Type(), ShouldEqual, "docstore") - c.So(db.DBName(), ShouldEqual, "first docs database") + require.Equal(t, "docstore", db.Type()) + require.Equal(t, "first docs database", db.DBName()) }) - c.Convey("put", FailureHalts, func(c C) { - document := createDocument("doc1", "hello", "world") - _, err := db.Put(ctx, document) - c.So(err, ShouldBeNil) - - docs, err := db.Get(ctx, "doc1", true) - c.So(err, ShouldBeNil) - c.So(len(docs), ShouldEqual, 1) - c.So(docs[0], ShouldResemble, document) - }) + document := createDocument("doc1", "hello", "world") + documentUpdate1 := createDocument("doc1", "hello", "galaxy") + documentUppercase := createDocument("DOCUPPER1", "hello", "world") - c.Convey("get", FailureHalts, func(c C) { - document := createDocument("doc1", "hello", "world") + t.Run("put/get", func(t *testing.T) { _, err := db.Put(ctx, document) - c.So(err, ShouldBeNil) + require.NoError(t, err) - docs, err := db.Get(ctx, "doc1", true) - c.So(err, ShouldBeNil) - c.So(len(docs), ShouldEqual, 1) - c.So(docs[0], ShouldResemble, document) + docs, err := db.Get(ctx, "doc1", &iface.DocumentStoreGetOptions{CaseInsensitive: false}) + require.NoError(t, err) + require.Equal(t, 1, len(docs)) + require.Equal(t, document, docs[0]) }) - c.Convey("get case insensitive", FailureHalts, func(c C) { - document := createDocument("DOC1", "hello", "world") - _, err := db.Put(ctx, document) - c.So(err, ShouldBeNil) + _, err = db.Put(ctx, documentUppercase) + require.NoError(t, err) + + t.Run("get case insensitive", func(t *testing.T) { + docs, err := db.Get(ctx, "DOC1", &iface.DocumentStoreGetOptions{CaseInsensitive: true}) + require.NoError(t, err) + require.Equal(t, 1, len(docs)) + require.Equal(t, document, docs[0]) - docs, err := db.Get(ctx, "doc1", false) - c.So(err, ShouldBeNil) - c.So(len(docs), ShouldEqual, 1) - c.So(docs[0], ShouldResemble, document) + docs, err = db.Get(ctx, "docupper1", &iface.DocumentStoreGetOptions{CaseInsensitive: true}) + require.NoError(t, err) + require.Equal(t, 1, len(docs)) + require.Equal(t, documentUppercase, docs[0]) }) - c.Convey("get case sensitive without match", FailureHalts, func(c C) { - document := createDocument("DOC1", "hello", "world") - _, err := db.Put(ctx, document) - c.So(err, ShouldBeNil) + t.Run("get case sensitive without match", func(t *testing.T) { + docs, err := db.Get(ctx, "DOC1", &iface.DocumentStoreGetOptions{CaseInsensitive: false}) + require.NoError(t, err) + require.Equal(t, 0, len(docs)) - docs, err := db.Get(ctx, "doc1", true) - c.So(err, ShouldBeNil) - c.So(len(docs), ShouldEqual, 0) + docs, err = db.Get(ctx, "docupper1", &iface.DocumentStoreGetOptions{CaseInsensitive: false}) + require.NoError(t, err) + require.Equal(t, 0, len(docs)) }) - c.Convey("put updates a value", FailureHalts, func(c C) { - documentOne := createDocument("doc1", "hello", "world") - _, err := db.Put(ctx, documentOne) - c.So(err, ShouldBeNil) + t.Run("put updates a value", func(t *testing.T) { + _, err = db.Put(ctx, documentUpdate1) + require.NoError(t, err) - documentTwo := createDocument("doc1", "hello", "galaxy") - _, err = db.Put(ctx, documentTwo) - c.So(err, ShouldBeNil) - - docs, err := db.Get(ctx, "doc1", true) - c.So(err, ShouldBeNil) - c.So(len(docs), ShouldEqual, 1) - c.So(docs[0], ShouldResemble, documentTwo) + docs, err := db.Get(ctx, "doc1", &iface.DocumentStoreGetOptions{CaseInsensitive: false}) + require.NoError(t, err) + require.Equal(t, 1, len(docs)) + require.Equal(t, documentUpdate1, docs[0]) }) - c.Convey("put/get - multiple keys", FailureHalts, func(c C) { + t.Run("put/get - multiple keys", func(t *testing.T) { documentOne := createDocument("doc1", "hello", "world") _, err := db.Put(ctx, documentOne) - c.So(err, ShouldBeNil) + require.NoError(t, err) documentTwo := createDocument("doc2", "hello", "galaxy") _, err = db.Put(ctx, documentTwo) - c.So(err, ShouldBeNil) + require.NoError(t, err) documentThree := createDocument("doc3", "hello", "universe") _, err = db.Put(ctx, documentThree) - c.So(err, ShouldBeNil) + require.NoError(t, err) + + docsOne, err := db.Get(ctx, "doc1", &iface.DocumentStoreGetOptions{CaseInsensitive: false}) + require.NoError(t, err) + + docsTwo, err := db.Get(ctx, "doc2", &iface.DocumentStoreGetOptions{CaseInsensitive: false}) + require.NoError(t, err) + + docsThree, err := db.Get(ctx, "doc3", &iface.DocumentStoreGetOptions{CaseInsensitive: false}) + require.NoError(t, err) + + require.Equal(t, 1, len(docsOne)) + require.Equal(t, documentOne, docsOne[0]) + require.Equal(t, 1, len(docsTwo)) + require.Equal(t, documentTwo, docsTwo[0]) + require.Equal(t, 1, len(docsThree)) + require.Equal(t, documentThree, docsThree[0]) + }) + + t.Run("get - partial term match - PartialMatches: true", func(t *testing.T) { + doc1 := createDocument("hello world", "doc", "some things") + doc2 := createDocument("hello universe", "doc", "all the things") + doc3 := createDocument("sup world", "doc", "other things") + + _, err := db.Put(ctx, doc1) + require.NoError(t, err) + + _, err = db.Put(ctx, doc2) + require.NoError(t, err) - docsOne, err := db.Get(ctx, "doc1", true) - c.So(err, ShouldBeNil) + _, err = db.Put(ctx, doc3) + require.NoError(t, err) - docsTwo, err := db.Get(ctx, "doc2", true) - c.So(err, ShouldBeNil) + fetchedDocs, err := db.Get(ctx, "hello", &iface.DocumentStoreGetOptions{PartialMatches: true}) + require.Equal(t, 2, len(fetchedDocs)) + require.Contains(t, fetchedDocs, doc1) + require.Contains(t, fetchedDocs, doc2) + }) + + t.Run("get - partial term match - PartialMatches: false", func(t *testing.T) { + doc1 := createDocument("hello world", "doc", "some things") + doc2 := createDocument("hello universe", "doc", "all the things") + doc3 := createDocument("sup world", "doc", "other things") + + _, err := db.Put(ctx, doc1) + require.NoError(t, err) - docsThree, err := db.Get(ctx, "doc3", true) - c.So(err, ShouldBeNil) + _, err = db.Put(ctx, doc2) + require.NoError(t, err) - c.So(len(docsOne), ShouldEqual, 1) - c.So(docsOne[0], ShouldResemble, documentOne) - c.So(len(docsTwo), ShouldEqual, 1) - c.So(docsTwo[0], ShouldResemble, documentTwo) - c.So(len(docsThree), ShouldEqual, 1) - c.So(docsThree[0], ShouldResemble, documentThree) + _, err = db.Put(ctx, doc3) + require.NoError(t, err) + + fetchedDocs, err := db.Get(ctx, "hello", &iface.DocumentStoreGetOptions{PartialMatches: false}) + require.Equal(t, 0, len(fetchedDocs)) }) - c.Convey("deletes a key", FailureHalts, func(c C) { + t.Run("deletes a key", func(t *testing.T) { document := createDocument("doc1", "hello", "world") _, err := db.Put(ctx, document) - c.So(err, ShouldBeNil) + require.NoError(t, err) _, err = db.Delete(ctx, "doc1") - c.So(err, ShouldBeNil) + require.NoError(t, err) - docs, err := db.Get(ctx, "doc1", true) - c.So(err, ShouldBeNil) - c.So(len(docs), ShouldEqual, 0) + docs, err := db.Get(ctx, "doc1", &iface.DocumentStoreGetOptions{CaseInsensitive: false}) + require.NoError(t, err) + require.Equal(t, 0, len(docs)) }) - c.Convey("deletes a key after multiple updates", FailureHalts, func(c C) { + t.Run("deletes a key after multiple updates", func(t *testing.T) { documentOne := createDocument("doc1", "hello", "world") _, err := db.Put(ctx, documentOne) - c.So(err, ShouldBeNil) + require.NoError(t, err) documentTwo := createDocument("doc1", "hello", "galaxy") _, err = db.Put(ctx, documentTwo) - c.So(err, ShouldBeNil) + require.NoError(t, err) documentThree := createDocument("doc1", "hello", "universe") _, err = db.Put(ctx, documentThree) - c.So(err, ShouldBeNil) + require.NoError(t, err) _, err = db.Delete(ctx, "doc1") - c.So(err, ShouldBeNil) + require.NoError(t, err) - docs, err := db.Get(ctx, "doc1", true) - c.So(err, ShouldBeNil) - c.So(len(docs), ShouldEqual, 0) + docs, err := db.Get(ctx, "doc1", &iface.DocumentStoreGetOptions{CaseInsensitive: false}) + require.NoError(t, err) + require.Equal(t, 0, len(docs)) }) + t.Run("Specified index", func(t *testing.T) { + options := documentstore.DefaultStoreOptsForMap("doc") + + db, err := orbitdb1.Docs(ctx, "orbit-db-tests-specified-index", &iface.CreateDBOptions{StoreSpecificOpts: options}) + require.NoError(t, err) + + defer func() { _ = db.Drop() }() + + doc1 := createDocument("hello world", "doc", "all the things") + doc2 := createDocument("hello world", "doc", "some things") + + t.Run("put", func(t *testing.T) { + _, err := db.Put(ctx, doc1) + require.NoError(t, err) + + value, err := db.Get(ctx, "all", &iface.DocumentStoreGetOptions{PartialMatches: true}) + require.NoError(t, err) + + require.Equal(t, 1, len(value)) + require.Equal(t, doc1, value[0]) + }) + + t.Run("matches specified index", func(t *testing.T) { + _, err = db.Put(ctx, doc2) + require.NoError(t, err) + + value1, err := db.Get(ctx, "all", &iface.DocumentStoreGetOptions{PartialMatches: true}) + require.NoError(t, err) + + require.Equal(t, 1, len(value1)) + require.Equal(t, doc1, value1[0]) + + value2, err := db.Get(ctx, "some", &iface.DocumentStoreGetOptions{PartialMatches: true}) + require.NoError(t, err) + + require.Equal(t, 1, len(value2)) + require.Equal(t, doc2, value2[0]) + }) + }) + + t.Run("putAll", func(t *testing.T) { + db, err := orbitdb1.Docs(ctx, "orbit-db-tests-putall", nil) + require.NoError(t, err) + + defer func() { _ = db.Drop() }() + + doc1 := createDocument("id1", "doc", "all the things") + doc2 := createDocument("id2", "doc", "some things") + doc3 := createDocument("id3", "doc", "more things") + + _, err = db.PutAll(ctx, []interface{}{doc1, doc2, doc3}) + require.NoError(t, err) + + value, err := db.Get(ctx, "", &iface.DocumentStoreGetOptions{PartialMatches: true}) + require.NoError(t, err) + + require.Equal(t, 3, len(value)) + require.Contains(t, value, doc1) + require.Contains(t, value, doc2) + require.Contains(t, value, doc3) + }) }) } From d425feabb6e410839ec63170a6bce1de33da31d2 Mon Sep 17 00:00:00 2001 From: Guillaume Louvigny Date: Sat, 21 Aug 2021 17:30:00 +0200 Subject: [PATCH 4/4] feat(docstore): added Query method Signed-off-by: Guillaume Louvigny --- iface/interface.go | 3 + stores/documentstore/document.go | 29 +++++++ tests/docs_test.go | 134 ++++++++++++++++++++++++------- 3 files changed, 138 insertions(+), 28 deletions(-) diff --git a/iface/interface.go b/iface/interface.go index 1b7d8a13..4360a39f 100644 --- a/iface/interface.go +++ b/iface/interface.go @@ -277,6 +277,9 @@ type DocumentStore interface { // Get Retrieves the document for a key Get(ctx context.Context, key string, opts *DocumentStoreGetOptions) ([]interface{}, error) + + // Query Finds documents using a filter function + Query(ctx context.Context, filter func(doc interface{}) (bool, error)) ([]interface{}, error) } // StoreIndex Index contains the state of a datastore, diff --git a/stores/documentstore/document.go b/stores/documentstore/document.go index c65fe54a..71ec69d5 100644 --- a/stores/documentstore/document.go +++ b/stores/documentstore/document.go @@ -178,6 +178,35 @@ func (o *orbitDBDocumentStore) PutAll(ctx context.Context, values []interface{}) return op, nil } +// Query Finds documents using a filter function +func (o *orbitDBDocumentStore) Query(ctx context.Context, filter func(doc interface{}) (bool, error)) ([]interface{}, error) { + docIndex, ok := o.Index().(*documentIndex) + if !ok { + return nil, fmt.Errorf("unable to cast index to documentIndex") + } + + documents := []interface{}(nil) + for _, indexKey := range docIndex.Keys() { + doc := docIndex.Get(indexKey) + if doc == nil { + continue + } + + value := o.docOpts.ItemFactory() + if err := o.docOpts.Unmarshal(doc.([]byte), &value); err != nil { + return nil, fmt.Errorf("unable to unmarshal document: %w", err) + } + + if ok, err := filter(value); err != nil { + return nil, fmt.Errorf("error while filtering value: %w", err) + } else if ok { + documents = append(documents, value) + } + } + + return documents, nil +} + func (o *orbitDBDocumentStore) Type() string { return "docstore" } diff --git a/tests/docs_test.go b/tests/docs_test.go index 04c19dbe..513c80cb 100644 --- a/tests/docs_test.go +++ b/tests/docs_test.go @@ -2,6 +2,7 @@ package tests import ( "context" + "fmt" "testing" "github.com/stretchr/testify/require" @@ -27,13 +28,6 @@ func TestDocumentsStore(t *testing.T) { } } -func createDocument(id string, key string, value string) map[string]interface{} { - m := make(map[string]interface{}) - m["_id"] = id - m[key] = value - return m -} - func testingDocsStore(t *testing.T, dir string) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -74,9 +68,9 @@ func testingDocsStore(t *testing.T, dir string) { require.Equal(t, "first docs database", db.DBName()) }) - document := createDocument("doc1", "hello", "world") - documentUpdate1 := createDocument("doc1", "hello", "galaxy") - documentUppercase := createDocument("DOCUPPER1", "hello", "world") + document := map[string]interface{}{"_id": "doc1", "hello": "world"} + documentUpdate1 := map[string]interface{}{"_id": "doc1", "hello": "galaxy"} + documentUppercase := map[string]interface{}{"_id": "DOCUPPER1", "hello": "world"} t.Run("put/get", func(t *testing.T) { _, err := db.Put(ctx, document) @@ -124,15 +118,15 @@ func testingDocsStore(t *testing.T, dir string) { }) t.Run("put/get - multiple keys", func(t *testing.T) { - documentOne := createDocument("doc1", "hello", "world") + documentOne := map[string]interface{}{"_id": "doc1", "hello": "world"} _, err := db.Put(ctx, documentOne) require.NoError(t, err) - documentTwo := createDocument("doc2", "hello", "galaxy") + documentTwo := map[string]interface{}{"_id": "doc2", "hello": "galaxy"} _, err = db.Put(ctx, documentTwo) require.NoError(t, err) - documentThree := createDocument("doc3", "hello", "universe") + documentThree := map[string]interface{}{"_id": "doc3", "hello": "universe"} _, err = db.Put(ctx, documentThree) require.NoError(t, err) @@ -154,9 +148,9 @@ func testingDocsStore(t *testing.T, dir string) { }) t.Run("get - partial term match - PartialMatches: true", func(t *testing.T) { - doc1 := createDocument("hello world", "doc", "some things") - doc2 := createDocument("hello universe", "doc", "all the things") - doc3 := createDocument("sup world", "doc", "other things") + doc1 := map[string]interface{}{"_id": "hello world", "doc": "some things"} + doc2 := map[string]interface{}{"_id": "hello universe", "doc": "all the things"} + doc3 := map[string]interface{}{"_id": "sup world", "doc": "other things"} _, err := db.Put(ctx, doc1) require.NoError(t, err) @@ -174,9 +168,9 @@ func testingDocsStore(t *testing.T, dir string) { }) t.Run("get - partial term match - PartialMatches: false", func(t *testing.T) { - doc1 := createDocument("hello world", "doc", "some things") - doc2 := createDocument("hello universe", "doc", "all the things") - doc3 := createDocument("sup world", "doc", "other things") + doc1 := map[string]interface{}{"_id": "hello world", "doc": "some things"} + doc2 := map[string]interface{}{"_id": "hello universe", "doc": "all the things"} + doc3 := map[string]interface{}{"_id": "sup world", "doc": "other things"} _, err := db.Put(ctx, doc1) require.NoError(t, err) @@ -192,7 +186,7 @@ func testingDocsStore(t *testing.T, dir string) { }) t.Run("deletes a key", func(t *testing.T) { - document := createDocument("doc1", "hello", "world") + document := map[string]interface{}{"_id": "doc1", "hello": "world"} _, err := db.Put(ctx, document) require.NoError(t, err) @@ -205,15 +199,15 @@ func testingDocsStore(t *testing.T, dir string) { }) t.Run("deletes a key after multiple updates", func(t *testing.T) { - documentOne := createDocument("doc1", "hello", "world") + documentOne := map[string]interface{}{"_id": "doc1", "hello": "world"} _, err := db.Put(ctx, documentOne) require.NoError(t, err) - documentTwo := createDocument("doc1", "hello", "galaxy") + documentTwo := map[string]interface{}{"_id": "doc1", "hello": "galaxy"} _, err = db.Put(ctx, documentTwo) require.NoError(t, err) - documentThree := createDocument("doc1", "hello", "universe") + documentThree := map[string]interface{}{"_id": "doc1", "hello": "universe"} _, err = db.Put(ctx, documentThree) require.NoError(t, err) @@ -233,8 +227,8 @@ func testingDocsStore(t *testing.T, dir string) { defer func() { _ = db.Drop() }() - doc1 := createDocument("hello world", "doc", "all the things") - doc2 := createDocument("hello world", "doc", "some things") + doc1 := map[string]interface{}{"_id": "hello world", "doc": "all the things"} + doc2 := map[string]interface{}{"_id": "hello world", "doc": "some things"} t.Run("put", func(t *testing.T) { _, err := db.Put(ctx, doc1) @@ -271,9 +265,9 @@ func testingDocsStore(t *testing.T, dir string) { defer func() { _ = db.Drop() }() - doc1 := createDocument("id1", "doc", "all the things") - doc2 := createDocument("id2", "doc", "some things") - doc3 := createDocument("id3", "doc", "more things") + doc1 := map[string]interface{}{"_id": "id1", "doc": "all the things"} + doc2 := map[string]interface{}{"_id": "id2", "doc": "some things"} + doc3 := map[string]interface{}{"_id": "id3", "doc": "more things"} _, err = db.PutAll(ctx, []interface{}{doc1, doc2, doc3}) require.NoError(t, err) @@ -286,5 +280,89 @@ func testingDocsStore(t *testing.T, dir string) { require.Contains(t, value, doc2) require.Contains(t, value, doc3) }) + + t.Run("query", func(t *testing.T) { + viewsFilter := func(expectedCount int) func(e interface{}) (bool, error) { + return func(e interface{}) (bool, error) { + entry, ok := e.(map[string]interface{}) + if !ok { + return false, fmt.Errorf("unable to cast entry") + } + + if _, ok := entry["views"]; !ok { + return false, nil + } + + views, ok := entry["views"].(float64) + if !ok { + return false, fmt.Errorf("unable to cast value for field views") + } + + return int(views) > expectedCount, nil + } + } + + t.Run("query - simple", func(t *testing.T) { + db, err := orbitdb1.Docs(ctx, "orbit-db-tests-putall", nil) + require.NoError(t, err) + + defer func() { _ = db.Drop() }() + + doc1 := map[string]interface{}{"_id": "hello world", "doc": "all the things", "views": 17} + doc2 := map[string]interface{}{"_id": "sup world', doc: 'some of the things", "views": 10} + doc3 := map[string]interface{}{"_id": "hello other world", "doc": "none of the things", "views": 5} + doc4 := map[string]interface{}{"_id": "hey universe", "doc": ""} + + for _, doc := range []map[string]interface{}{doc1, doc2, doc3, doc4} { + _, err := db.Put(ctx, doc) + require.NoError(t, err) + } + + value, err := db.Query(ctx, viewsFilter(5)) + require.NoError(t, err) + require.Len(t, value, 2) + require.Contains(t, fmt.Sprintf("%v", value), fmt.Sprintf("%v", doc1)) + require.Contains(t, fmt.Sprintf("%v", value), fmt.Sprintf("%v", doc2)) + + value, err = db.Query(ctx, viewsFilter(10)) + require.NoError(t, err) + require.Len(t, value, 1) + require.Contains(t, fmt.Sprintf("%v", value), fmt.Sprintf("%v", doc1)) + + value, err = db.Query(ctx, viewsFilter(17)) + require.NoError(t, err) + require.Len(t, value, 0) + }) + + t.Run("query after delete", func(t *testing.T) { + db, err := orbitdb1.Docs(ctx, "orbit-db-tests-putall", nil) + require.NoError(t, err) + + defer func() { _ = db.Drop() }() + + doc1 := map[string]interface{}{"_id": "hello world", "doc": "all the things", "views": 17} + doc2 := map[string]interface{}{"_id": "sup world', doc: 'some of the things", "views": 10} + doc3 := map[string]interface{}{"_id": "hello other world", "doc": "none of the things", "views": 5} + doc4 := map[string]interface{}{"_id": "hey universe", "doc": ""} + + for _, doc := range []map[string]interface{}{doc1, doc2, doc3, doc4} { + _, err := db.Put(ctx, doc) + require.NoError(t, err) + } + _, err = db.Delete(ctx, "hello world") + require.NoError(t, err) + + value, err := db.Query(ctx, viewsFilter(4)) + require.NoError(t, err) + require.Len(t, value, 2) + require.Contains(t, fmt.Sprintf("%v", value), fmt.Sprintf("%v", doc2)) + require.Contains(t, fmt.Sprintf("%v", value), fmt.Sprintf("%v", doc3)) + + value, err = db.Query(ctx, viewsFilter(9)) + require.NoError(t, err) + require.Len(t, value, 1) + require.Contains(t, fmt.Sprintf("%v", value), fmt.Sprintf("%v", doc2)) + }) + }) }) }