diff --git a/baseorbitdb/orbitdb.go b/baseorbitdb/orbitdb.go index fa0238a2..2139269d 100644 --- a/baseorbitdb/orbitdb.go +++ b/baseorbitdb/orbitdb.go @@ -754,16 +754,17 @@ func (o *orbitDB) createStore(ctx context.Context, storeType string, parsedDBAdd } store, err := storeFunc(ctx, o.IPFS(), identity, parsedDBAddress, &iface.NewStoreOptions{ - AccessController: accessController, - Cache: options.Cache, - Replicate: options.Replicate, - Directory: *options.Directory, - SortFn: options.SortFn, - CacheDestroy: func() error { return o.cache.Destroy(o.directory, parsedDBAddress) }, - Logger: o.logger, - Tracer: o.tracer, - IO: options.IO, - SharedKey: options.SharedKey, + AccessController: accessController, + Cache: options.Cache, + Replicate: options.Replicate, + Directory: *options.Directory, + SortFn: options.SortFn, + CacheDestroy: func() error { return o.cache.Destroy(o.directory, parsedDBAddress) }, + Logger: o.logger, + Tracer: o.tracer, + IO: options.IO, + SharedKey: options.SharedKey, + StoreSpecificOpts: options.StoreSpecificOpts, }) if err != nil { return nil, errors.Wrap(err, "unable to instantiate store") diff --git a/iface/interface.go b/iface/interface.go index 177e2c4e..4360a39f 100644 --- a/iface/interface.go +++ b/iface/interface.go @@ -38,6 +38,14 @@ type CreateDBOptions struct { SortFn ipfslog.SortFn IO ipfslog.IO SharedKey enc.SharedKey + StoreSpecificOpts interface{} +} + +type CreateDocumentDBOptions struct { + KeyExtractor func(interface{}) (string, error) + Marshal func(interface{}) ([]byte, error) + Unmarshal func(data []byte, v interface{}) error + ItemFactory func() interface{} } // DetermineAddressOptions Lists the arguments used to determine a store address @@ -70,7 +78,7 @@ type BaseOrbitDB interface { // RegisterStoreType Registers a new store type RegisterStoreType(storeType string, constructor StoreConstructor) - // RegisterStoreType Removes a store type + // UnregisterStoreType Removes a store type UnregisterStoreType(storeType string) // RegisterAccessControllerType Registers a new access controller type @@ -89,13 +97,25 @@ type BaseOrbitDB interface { Tracer() trace.Tracer } +// OrbitDBDocumentStore An OrbitDB instance providing a Document store +type OrbitDBDocumentStore interface { + BaseOrbitDB + OrbitDBDocumentStoreProvider +} + +// OrbitDBDocumentStoreProvider Exposes a method providing a document store +type OrbitDBDocumentStoreProvider interface { + // Docs Creates or opens an DocumentStore + Docs(ctx context.Context, address string, options *CreateDBOptions) (DocumentStore, error) +} + // OrbitDBKVStore An OrbitDB instance providing a KeyValue store type OrbitDBKVStore interface { BaseOrbitDB OrbitDBKVStoreProvider } -// OrbitDBLogStoreProvider Exposes a method providing a key value store +// OrbitDBKVStoreProvider Exposes a method providing a key value store type OrbitDBKVStoreProvider interface { // KeyValue Creates or opens an KeyValueStore KeyValue(ctx context.Context, address string, options *CreateDBOptions) (KeyValueStore, error) @@ -119,6 +139,7 @@ type OrbitDB interface { OrbitDBKVStoreProvider OrbitDBLogStoreProvider + OrbitDBDocumentStoreProvider } // StreamOptions Defines the parameters that can be given to the Stream function of an EventLogStore @@ -152,7 +173,7 @@ type Store interface { // Replicator Returns the Replicator object Replicator() replicator.Replicator - // Replicator Returns the Cache object + // Cache Returns the Cache object Cache() datastore.Datastore // Drop Removes all the local store content @@ -216,7 +237,7 @@ type EventLogStore interface { List(ctx context.Context, options *StreamOptions) ([]operation.Operation, error) } -// EventLogStore A type of store that provides a key value store +// KeyValueStore A type of store that provides a key value store type KeyValueStore interface { Store @@ -233,6 +254,34 @@ type KeyValueStore interface { Get(ctx context.Context, key string) ([]byte, error) } +type DocumentStoreGetOptions struct { + CaseInsensitive bool + PartialMatches bool +} + +// DocumentStore A type of store that provides a document store +type DocumentStore interface { + Store + + // Put Stores the document + Put(ctx context.Context, document interface{}) (operation.Operation, error) + + // Delete Clears the document for a key + Delete(ctx context.Context, key string) (operation.Operation, error) + + // PutBatch Add values as multiple operations and returns the latest + PutBatch(ctx context.Context, values []interface{}) (operation.Operation, error) + + // PutAll Add values as a single operation and returns it + PutAll(ctx context.Context, values []interface{}) (operation.Operation, error) + + // Get Retrieves the document for a key + Get(ctx context.Context, key string, opts *DocumentStoreGetOptions) ([]interface{}, error) + + // Query Finds documents using a filter function + Query(ctx context.Context, filter func(doc interface{}) (bool, error)) ([]interface{}, error) +} + // StoreIndex Index contains the state of a datastore, // ie. what data we currently have. // @@ -272,6 +321,7 @@ type NewStoreOptions struct { Tracer trace.Tracer IO ipfslog.IO SharedKey enc.SharedKey + StoreSpecificOpts interface{} } type DirectChannelOptions struct { @@ -284,7 +334,7 @@ type DirectChannel interface { // Connect Waits for the other peer to be connected Connect(context.Context) error - // Sends Sends a message to the other peer + // Send Sends a message to the other peer Send(context.Context, []byte) error // Close Closes the connection @@ -316,15 +366,15 @@ type PubSubTopic interface { // WatchPeers subscribes to peers joining or leaving the topic WatchPeers(ctx context.Context) (<-chan events.Event, error) - // WatchMessages + // WatchMessages Subscribes to new messages WatchMessages(ctx context.Context) (<-chan *EventPubSubMessage, error) - // Returns the topic name + // Topic Returns the topic name Topic() string } type PubSubInterface interface { - // Subscribe Subscribes to a topic + // TopicSubscribe Subscribes to a topic TopicSubscribe(ctx context.Context, topic string) (PubSubTopic, error) } diff --git a/orbitdb.go b/orbitdb.go index 473b0ca5..54b65650 100644 --- a/orbitdb.go +++ b/orbitdb.go @@ -3,15 +3,17 @@ package orbitdb import ( "context" + coreapi "github.com/ipfs/interface-go-ipfs-core" + "github.com/pkg/errors" + "berty.tech/go-orbit-db/accesscontroller/ipfs" "berty.tech/go-orbit-db/accesscontroller/orbitdb" "berty.tech/go-orbit-db/accesscontroller/simple" "berty.tech/go-orbit-db/baseorbitdb" "berty.tech/go-orbit-db/iface" + "berty.tech/go-orbit-db/stores/documentstore" "berty.tech/go-orbit-db/stores/eventlogstore" "berty.tech/go-orbit-db/stores/kvstore" - coreapi "github.com/ipfs/interface-go-ipfs-core" - "github.com/pkg/errors" ) type orbitDB struct { @@ -30,6 +32,9 @@ type EventLogStore = iface.EventLogStore // KeyValueStore An alias of the type defined in the iface package type KeyValueStore = iface.KeyValueStore +// DocumentStore An alias of the type defined in the iface package +type DocumentStore = iface.DocumentStore + // StoreIndex An alias of the type defined in the iface package type StoreIndex = iface.StoreIndex @@ -48,6 +53,9 @@ type StreamOptions = iface.StreamOptions // CreateDBOptions An alias of the type defined in the iface package type CreateDBOptions = iface.CreateDBOptions +// CreateDocumentDBOptions An alias of the type defined in the iface package +type CreateDocumentDBOptions = iface.CreateDocumentDBOptions + // DetermineAddressOptions An alias of the type defined in the iface package type DetermineAddressOptions = iface.DetermineAddressOptions @@ -64,6 +72,7 @@ func NewOrbitDB(ctx context.Context, i coreapi.CoreAPI, options *NewOrbitDBOptio odb.RegisterStoreType("eventlog", eventlogstore.NewOrbitDBEventLogStore) odb.RegisterStoreType("keyvalue", kvstore.NewOrbitDBKeyValue) + odb.RegisterStoreType("docstore", documentstore.NewOrbitDBDocumentStore) _ = odb.RegisterAccessControllerType(ipfs.NewIPFSAccessController) _ = odb.RegisterAccessControllerType(orbitdb.NewOrbitDBAccessController) @@ -123,4 +132,25 @@ func (o *orbitDB) KeyValue(ctx context.Context, address string, options *CreateD return kvStore, nil } +func (o *orbitDB) Docs(ctx context.Context, address string, options *CreateDBOptions) (DocumentStore, error) { + if options == nil { + options = &CreateDBOptions{} + } + + options.Create = boolPtr(true) + options.StoreType = stringPtr("docstore") + + store, err := o.Open(ctx, address, options) + if err != nil { + return nil, errors.Wrap(err, "unable to open database") + } + + documentStore, ok := store.(DocumentStore) + if !ok { + return nil, errors.New("unable to cast store to document") + } + + return documentStore, nil +} + var _ OrbitDB = (*orbitDB)(nil) diff --git a/stores/documentstore/doc.go b/stores/documentstore/doc.go new file mode 100644 index 00000000..3e348d6b --- /dev/null +++ b/stores/documentstore/doc.go @@ -0,0 +1,2 @@ +// documentstore a document store for OrbitDB +package documentstore // import "berty.tech/go-orbit-db/stores/documentstore" diff --git a/stores/documentstore/document.go b/stores/documentstore/document.go new file mode 100644 index 00000000..71ec69d5 --- /dev/null +++ b/stores/documentstore/document.go @@ -0,0 +1,282 @@ +package documentstore + +import ( + "context" + "encoding/json" + "fmt" + "strings" + + "berty.tech/go-ipfs-log/identityprovider" + "berty.tech/go-orbit-db/address" + "berty.tech/go-orbit-db/iface" + "berty.tech/go-orbit-db/stores/basestore" + "berty.tech/go-orbit-db/stores/operation" + + coreapi "github.com/ipfs/interface-go-ipfs-core" +) + +type orbitDBDocumentStore struct { + basestore.BaseStore + docOpts *iface.CreateDocumentDBOptions +} + +func (o *orbitDBDocumentStore) Get(ctx context.Context, key string, opts *iface.DocumentStoreGetOptions) ([]interface{}, error) { + if opts == nil { + opts = &iface.DocumentStoreGetOptions{} + } + + hasMultipleTerms := strings.Contains(key, " ") + if hasMultipleTerms { + key = strings.ReplaceAll(key, ".", " ") + } + if opts.CaseInsensitive { + key = strings.ToLower(key) + } + + docIndex, ok := o.Index().(*documentIndex) + if !ok { + return nil, fmt.Errorf("unable to cast index to documentIndex") + } + + documents := []interface{}(nil) + + for _, indexKey := range docIndex.Keys() { + indexKeyForSearch := indexKey + + if opts.CaseInsensitive { + indexKeyForSearch = strings.ToLower(indexKeyForSearch) + if hasMultipleTerms { + indexKeyForSearch = strings.ReplaceAll(indexKeyForSearch, ".", " ") + } + } + + if !opts.PartialMatches { + if indexKeyForSearch != key { + continue + } + } else if opts.PartialMatches { + if !strings.Contains(indexKeyForSearch, key) { + continue + } + } + + value := o.Index().Get(indexKey) + if value == nil { + return nil, fmt.Errorf("value not found for key %s", indexKey) + } + + if _, ok := value.([]byte); !ok { + return nil, fmt.Errorf("invalid type for key %s", indexKey) + } + + out := o.docOpts.ItemFactory() + if err := o.docOpts.Unmarshal(value.([]byte), &out); err != nil { + return nil, fmt.Errorf("unable to unmarshal value for key %s: %w", indexKey, err) + } + + documents = append(documents, out) + } + + return documents, nil +} + +func (o *orbitDBDocumentStore) Put(ctx context.Context, document interface{}) (operation.Operation, error) { + key, err := o.docOpts.KeyExtractor(document) + if err != nil { + return nil, fmt.Errorf("unable to extract key from value: %w", err) + } + + data, err := o.docOpts.Marshal(document) + if err != nil { + return nil, fmt.Errorf("unable to marshal value: %w", err) + } + + op := operation.NewOperation(&key, "PUT", data) + + e, err := o.AddOperation(ctx, op, nil) + if err != nil { + return nil, fmt.Errorf("error while adding operation: %w", err) + } + + op, err = operation.ParseOperation(e) + if err != nil { + return nil, fmt.Errorf("unable to parse newly created entry: %w", err) + } + + return op, nil +} + +func (o *orbitDBDocumentStore) Delete(ctx context.Context, key string) (operation.Operation, error) { + if e := o.Index().Get(key); e == nil { + return nil, fmt.Errorf("no entry with key '%s' in database", key) + } + + op := operation.NewOperation(&key, "DEL", nil) + + e, err := o.AddOperation(ctx, op, nil) + if err != nil { + return nil, fmt.Errorf("error while adding operation: %w", err) + } + + op, err = operation.ParseOperation(e) + if err != nil { + return nil, fmt.Errorf("unable to parse newly created entry: %w", err) + } + + return op, nil +} + +// PutBatch Add values as multiple operations and returns the latest +func (o *orbitDBDocumentStore) PutBatch(ctx context.Context, values []interface{}) (operation.Operation, error) { + if len(values) == 0 { + return nil, fmt.Errorf("nothing to add to the store") + } + + op := operation.Operation(nil) + var err error + for _, val := range values { + op, err = o.Put(ctx, val) + if err != nil { + return nil, fmt.Errorf("unable to add data to the store: %w", err) + } + } + + return op, nil +} + +// PutAll Add values as a single operation and returns it +func (o *orbitDBDocumentStore) PutAll(ctx context.Context, values []interface{}) (operation.Operation, error) { + toAdd := map[string][]byte{} + + for _, val := range values { + key, err := o.docOpts.KeyExtractor(val) + if err != nil { + return nil, fmt.Errorf("one of the provided documents has no index key") + } + + data, err := o.docOpts.Marshal(val) + if err != nil { + return nil, fmt.Errorf("unable to marshal one of the provided documents") + } + + toAdd[key] = data + } + + empty := "" + op := operation.NewOperationWithDocuments(&empty, "PUTALL", toAdd) + + e, err := o.AddOperation(ctx, op, nil) + if err != nil { + return nil, fmt.Errorf("error while adding operation: %w", err) + } + + op, err = operation.ParseOperation(e) + if err != nil { + return nil, fmt.Errorf("unable to parse newly created entry: %w", err) + } + + return op, nil +} + +// Query Finds documents using a filter function +func (o *orbitDBDocumentStore) Query(ctx context.Context, filter func(doc interface{}) (bool, error)) ([]interface{}, error) { + docIndex, ok := o.Index().(*documentIndex) + if !ok { + return nil, fmt.Errorf("unable to cast index to documentIndex") + } + + documents := []interface{}(nil) + for _, indexKey := range docIndex.Keys() { + doc := docIndex.Get(indexKey) + if doc == nil { + continue + } + + value := o.docOpts.ItemFactory() + if err := o.docOpts.Unmarshal(doc.([]byte), &value); err != nil { + return nil, fmt.Errorf("unable to unmarshal document: %w", err) + } + + if ok, err := filter(value); err != nil { + return nil, fmt.Errorf("error while filtering value: %w", err) + } else if ok { + documents = append(documents, value) + } + } + + return documents, nil +} + +func (o *orbitDBDocumentStore) Type() string { + return "docstore" +} + +func MapKeyExtractor(keyField string) func(obj interface{}) (string, error) { + return func(obj interface{}) (string, error) { + mapped, ok := obj.(map[string]interface{}) + if !ok { + return "", fmt.Errorf("can't extract key from something else than a `map[string]interface{}` entry") + } + + val, ok := mapped[keyField] + if !ok { + return "", fmt.Errorf("missing value for field `%s` in entry", keyField) + } + + key, ok := val.(string) + if !ok { + return "", fmt.Errorf("value for field `%s` is not a string", keyField) + } + + return key, nil + } +} + +func DefaultStoreOptsForMap(keyField string) *iface.CreateDocumentDBOptions { + return &iface.CreateDocumentDBOptions{ + Marshal: json.Marshal, + Unmarshal: json.Unmarshal, + KeyExtractor: MapKeyExtractor(keyField), + ItemFactory: func() interface{} { return map[string]interface{}{} }, + } +} + +// NewOrbitDBDocumentStore Instantiates a new DocumentStore +func NewOrbitDBDocumentStore(ctx context.Context, ipfs coreapi.CoreAPI, identity *identityprovider.Identity, addr address.Address, options *iface.NewStoreOptions) (iface.Store, error) { + if options.StoreSpecificOpts == nil { + options.StoreSpecificOpts = DefaultStoreOptsForMap("_id") + } + + docOpts, ok := options.StoreSpecificOpts.(*iface.CreateDocumentDBOptions) + if !ok { + return nil, fmt.Errorf("invalid type supplied for opts.StoreSpecificOpts") + } + + if docOpts.Marshal == nil { + return nil, fmt.Errorf("missing value for option opts.StoreSpecificOpts.Marshal") + } + + if docOpts.Unmarshal == nil { + return nil, fmt.Errorf("missing value for option opts.StoreSpecificOpts.Unmarshal") + } + + if docOpts.ItemFactory == nil { + return nil, fmt.Errorf("missing value for option opts.StoreSpecificOpts.ItemFactory") + } + + if docOpts.KeyExtractor == nil { + return nil, fmt.Errorf("missing value for option opts.StoreSpecificOpts.ExtractKey") + } + + store := &orbitDBDocumentStore{docOpts: docOpts} + options.Index = func(_ []byte) iface.StoreIndex { return newDocumentIndex(docOpts) } + + err := store.InitBaseStore(ctx, ipfs, identity, addr, options) + if err != nil { + return nil, fmt.Errorf("unable to initialize document store: %w", err) + } + + return store, nil +} + +var _ iface.DocumentStore = &orbitDBDocumentStore{} diff --git a/stores/documentstore/index.go b/stores/documentstore/index.go new file mode 100644 index 00000000..b7452737 --- /dev/null +++ b/stores/documentstore/index.go @@ -0,0 +1,109 @@ +package documentstore + +import ( + "fmt" + "sync" + + ipfslog "berty.tech/go-ipfs-log" + "berty.tech/go-orbit-db/iface" + "berty.tech/go-orbit-db/stores/operation" +) + +type documentIndex struct { + iface.StoreIndex + index map[string][]byte + muIndex sync.RWMutex + opts *iface.CreateDocumentDBOptions +} + +func (i *documentIndex) Keys() []string { + i.muIndex.RLock() + defer i.muIndex.RUnlock() + + keys := make([]string, len(i.index)) + + idx := 0 + for key := range i.index { + keys[idx] = key + idx++ + } + + return keys +} + +func (i *documentIndex) Get(key string) interface{} { + i.muIndex.RLock() + defer i.muIndex.RUnlock() + + if i.index == nil { + return nil + } + + entry, ok := i.index[key] + if !ok { + return nil + } + + return entry +} + +func (i *documentIndex) UpdateIndex(oplog ipfslog.Log, _ []ipfslog.Entry) error { + entries := oplog.Values().Slice() + size := len(entries) + + handled := map[string]struct{}{} + + i.muIndex.Lock() + defer i.muIndex.Unlock() + + for idx := range entries { + item, err := operation.ParseOperation(entries[size-idx-1]) + if err != nil { + return fmt.Errorf("unable to parse log documentstore operation: %w", err) + } + + if item.GetOperation() == "PUTALL" { + for _, opDoc := range item.GetDocs() { + if _, ok := handled[opDoc.GetKey()]; ok { + continue + } + + handled[*item.GetKey()] = struct{}{} + i.index[opDoc.GetKey()] = opDoc.GetValue() + } + + continue + } + + key := item.GetKey() + if key == nil || *key == "" { + // ignoring entries with nil or empty keys + continue + } + + if _, ok := handled[*item.GetKey()]; ok { + continue + } + + handled[*item.GetKey()] = struct{}{} + switch item.GetOperation() { + case "PUT": + i.index[*item.GetKey()] = item.GetValue() + + case "DEL": + delete(i.index, *item.GetKey()) + } + } + + return nil +} + +// newDocumentIndex Creates a new index for a Document Store +func newDocumentIndex(opts *iface.CreateDocumentDBOptions) iface.StoreIndex { + return &documentIndex{ + index: map[string][]byte{}, + opts: opts, + } +} + +var _ iface.StoreIndex = &documentIndex{} diff --git a/stores/operation/interface.go b/stores/operation/interface.go index 739adc14..2c4ffde7 100644 --- a/stores/operation/interface.go +++ b/stores/operation/interface.go @@ -4,6 +4,11 @@ import ( ipfslog "berty.tech/go-ipfs-log" ) +type OpDoc interface { + GetKey() string + GetValue() []byte +} + // Operation Describe an CRDT operation type Operation interface { // GetKey Gets a key if applicable (ie. key value stores) @@ -18,6 +23,9 @@ type Operation interface { // GetEntry Gets the underlying IPFS log Entry GetEntry() ipfslog.Entry + // GetDocs Gets the list of documents + GetDocs() []OpDoc + // Marshal Serializes the operation Marshal() ([]byte, error) } diff --git a/stores/operation/operation.go b/stores/operation/operation.go index e8b9ae76..999a5795 100644 --- a/stores/operation/operation.go +++ b/stores/operation/operation.go @@ -8,10 +8,24 @@ import ( "github.com/pkg/errors" ) +type opDoc struct { + Key string `json:"key,omitempty"` + Value []byte `json:"value,omitempty"` +} + +func (b *opDoc) GetKey() string { + return b.Key +} + +func (b *opDoc) GetValue() []byte { + return b.Value +} + type operation struct { Key *string `json:"key,omitempty"` Op string `json:"op,omitempty"` Value []byte `json:"value,omitempty"` + Docs []*opDoc `json:"docs,omitempty"` Entry ipfslog.Entry `json:"-"` } @@ -35,6 +49,16 @@ func (o *operation) GetEntry() ipfslog.Entry { return o.Entry } +func (o *operation) GetDocs() []OpDoc { + ret := make([]OpDoc, len(o.Docs)) + + for i, val := range o.Docs { + ret[i] = val + } + + return ret +} + // ParseOperation Gets the operation from an entry func ParseOperation(e ipfslog.Entry) (Operation, error) { if e == nil { @@ -53,6 +77,13 @@ func ParseOperation(e ipfslog.Entry) (Operation, error) { return &op, nil } +func NewOpDoc(key string, value []byte) OpDoc { + return &opDoc{ + Key: key, + Value: value, + } +} + // NewOperation Creates a new operation func NewOperation(key *string, op string, value []byte) Operation { return &operation{ @@ -62,4 +93,24 @@ func NewOperation(key *string, op string, value []byte) Operation { } } +// NewOperationWithDocuments Creates a new operation from a map of batched documents +func NewOperationWithDocuments(key *string, op string, docs map[string][]byte) Operation { + _docs := make([]*opDoc, len(docs)) + + i := 0 + for k, v := range docs { + _docs[i] = &opDoc{ + Key: k, + Value: v, + } + i++ + } + + return &operation{ + Key: key, + Op: op, + Docs: _docs, + } +} + var _ Operation = &operation{} diff --git a/tests/docs_test.go b/tests/docs_test.go new file mode 100644 index 00000000..513c80cb --- /dev/null +++ b/tests/docs_test.go @@ -0,0 +1,368 @@ +package tests + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/require" + + orbitdb2 "berty.tech/go-orbit-db" + "berty.tech/go-orbit-db/iface" + "berty.tech/go-orbit-db/stores/documentstore" +) + +func TestDocumentsStore(t *testing.T) { + tmpDir, clean := testingTempDir(t, "db-docsstore") + defer clean() + + cases := []struct{ Name, Directory string }{ + {Name: "in memory", Directory: ":memory:"}, + {Name: "persistent", Directory: tmpDir}, + } + + for _, c := range cases { + t.Run(c.Name, func(t *testing.T) { + testingDocsStore(t, c.Directory) + }) + } +} + +func testingDocsStore(t *testing.T, dir string) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + dbname := "orbit-db-tests" + + t.Run("orbit-db - Documents Database", func(t *testing.T) { + mocknet := testingMockNet(ctx) + + node, clean := testingIPFSNode(ctx, t, mocknet) + defer clean() + + db1IPFS := testingCoreAPI(t, node) + + orbitdb1, err := orbitdb2.NewOrbitDB(ctx, db1IPFS, &orbitdb2.NewOrbitDBOptions{ + Directory: &dir, + }) + defer func() { _ = orbitdb1.Close() }() + + require.NoError(t, err) + + db, err := orbitdb1.Docs(ctx, dbname, nil) + require.NoError(t, err) + + defer func() { _ = db.Close() }() + + t.Run("creates and opens a database", func(t *testing.T) { + db, err := orbitdb1.Docs(ctx, "first docs database", nil) + require.NoError(t, err) + + if db == nil { + t.Fatalf("db should not be nil") + } + + defer func() { _ = db.Close() }() + + require.Equal(t, "docstore", db.Type()) + require.Equal(t, "first docs database", db.DBName()) + }) + + document := map[string]interface{}{"_id": "doc1", "hello": "world"} + documentUpdate1 := map[string]interface{}{"_id": "doc1", "hello": "galaxy"} + documentUppercase := map[string]interface{}{"_id": "DOCUPPER1", "hello": "world"} + + t.Run("put/get", func(t *testing.T) { + _, err := db.Put(ctx, document) + require.NoError(t, err) + + docs, err := db.Get(ctx, "doc1", &iface.DocumentStoreGetOptions{CaseInsensitive: false}) + require.NoError(t, err) + require.Equal(t, 1, len(docs)) + require.Equal(t, document, docs[0]) + }) + + _, err = db.Put(ctx, documentUppercase) + require.NoError(t, err) + + t.Run("get case insensitive", func(t *testing.T) { + docs, err := db.Get(ctx, "DOC1", &iface.DocumentStoreGetOptions{CaseInsensitive: true}) + require.NoError(t, err) + require.Equal(t, 1, len(docs)) + require.Equal(t, document, docs[0]) + + docs, err = db.Get(ctx, "docupper1", &iface.DocumentStoreGetOptions{CaseInsensitive: true}) + require.NoError(t, err) + require.Equal(t, 1, len(docs)) + require.Equal(t, documentUppercase, docs[0]) + }) + + t.Run("get case sensitive without match", func(t *testing.T) { + docs, err := db.Get(ctx, "DOC1", &iface.DocumentStoreGetOptions{CaseInsensitive: false}) + require.NoError(t, err) + require.Equal(t, 0, len(docs)) + + docs, err = db.Get(ctx, "docupper1", &iface.DocumentStoreGetOptions{CaseInsensitive: false}) + require.NoError(t, err) + require.Equal(t, 0, len(docs)) + }) + + t.Run("put updates a value", func(t *testing.T) { + _, err = db.Put(ctx, documentUpdate1) + require.NoError(t, err) + + docs, err := db.Get(ctx, "doc1", &iface.DocumentStoreGetOptions{CaseInsensitive: false}) + require.NoError(t, err) + require.Equal(t, 1, len(docs)) + require.Equal(t, documentUpdate1, docs[0]) + }) + + t.Run("put/get - multiple keys", func(t *testing.T) { + documentOne := map[string]interface{}{"_id": "doc1", "hello": "world"} + _, err := db.Put(ctx, documentOne) + require.NoError(t, err) + + documentTwo := map[string]interface{}{"_id": "doc2", "hello": "galaxy"} + _, err = db.Put(ctx, documentTwo) + require.NoError(t, err) + + documentThree := map[string]interface{}{"_id": "doc3", "hello": "universe"} + _, err = db.Put(ctx, documentThree) + require.NoError(t, err) + + docsOne, err := db.Get(ctx, "doc1", &iface.DocumentStoreGetOptions{CaseInsensitive: false}) + require.NoError(t, err) + + docsTwo, err := db.Get(ctx, "doc2", &iface.DocumentStoreGetOptions{CaseInsensitive: false}) + require.NoError(t, err) + + docsThree, err := db.Get(ctx, "doc3", &iface.DocumentStoreGetOptions{CaseInsensitive: false}) + require.NoError(t, err) + + require.Equal(t, 1, len(docsOne)) + require.Equal(t, documentOne, docsOne[0]) + require.Equal(t, 1, len(docsTwo)) + require.Equal(t, documentTwo, docsTwo[0]) + require.Equal(t, 1, len(docsThree)) + require.Equal(t, documentThree, docsThree[0]) + }) + + t.Run("get - partial term match - PartialMatches: true", func(t *testing.T) { + doc1 := map[string]interface{}{"_id": "hello world", "doc": "some things"} + doc2 := map[string]interface{}{"_id": "hello universe", "doc": "all the things"} + doc3 := map[string]interface{}{"_id": "sup world", "doc": "other things"} + + _, err := db.Put(ctx, doc1) + require.NoError(t, err) + + _, err = db.Put(ctx, doc2) + require.NoError(t, err) + + _, err = db.Put(ctx, doc3) + require.NoError(t, err) + + fetchedDocs, err := db.Get(ctx, "hello", &iface.DocumentStoreGetOptions{PartialMatches: true}) + require.Equal(t, 2, len(fetchedDocs)) + require.Contains(t, fetchedDocs, doc1) + require.Contains(t, fetchedDocs, doc2) + }) + + t.Run("get - partial term match - PartialMatches: false", func(t *testing.T) { + doc1 := map[string]interface{}{"_id": "hello world", "doc": "some things"} + doc2 := map[string]interface{}{"_id": "hello universe", "doc": "all the things"} + doc3 := map[string]interface{}{"_id": "sup world", "doc": "other things"} + + _, err := db.Put(ctx, doc1) + require.NoError(t, err) + + _, err = db.Put(ctx, doc2) + require.NoError(t, err) + + _, err = db.Put(ctx, doc3) + require.NoError(t, err) + + fetchedDocs, err := db.Get(ctx, "hello", &iface.DocumentStoreGetOptions{PartialMatches: false}) + require.Equal(t, 0, len(fetchedDocs)) + }) + + t.Run("deletes a key", func(t *testing.T) { + document := map[string]interface{}{"_id": "doc1", "hello": "world"} + _, err := db.Put(ctx, document) + require.NoError(t, err) + + _, err = db.Delete(ctx, "doc1") + require.NoError(t, err) + + docs, err := db.Get(ctx, "doc1", &iface.DocumentStoreGetOptions{CaseInsensitive: false}) + require.NoError(t, err) + require.Equal(t, 0, len(docs)) + }) + + t.Run("deletes a key after multiple updates", func(t *testing.T) { + documentOne := map[string]interface{}{"_id": "doc1", "hello": "world"} + _, err := db.Put(ctx, documentOne) + require.NoError(t, err) + + documentTwo := map[string]interface{}{"_id": "doc1", "hello": "galaxy"} + _, err = db.Put(ctx, documentTwo) + require.NoError(t, err) + + documentThree := map[string]interface{}{"_id": "doc1", "hello": "universe"} + _, err = db.Put(ctx, documentThree) + require.NoError(t, err) + + _, err = db.Delete(ctx, "doc1") + require.NoError(t, err) + + docs, err := db.Get(ctx, "doc1", &iface.DocumentStoreGetOptions{CaseInsensitive: false}) + require.NoError(t, err) + require.Equal(t, 0, len(docs)) + }) + + t.Run("Specified index", func(t *testing.T) { + options := documentstore.DefaultStoreOptsForMap("doc") + + db, err := orbitdb1.Docs(ctx, "orbit-db-tests-specified-index", &iface.CreateDBOptions{StoreSpecificOpts: options}) + require.NoError(t, err) + + defer func() { _ = db.Drop() }() + + doc1 := map[string]interface{}{"_id": "hello world", "doc": "all the things"} + doc2 := map[string]interface{}{"_id": "hello world", "doc": "some things"} + + t.Run("put", func(t *testing.T) { + _, err := db.Put(ctx, doc1) + require.NoError(t, err) + + value, err := db.Get(ctx, "all", &iface.DocumentStoreGetOptions{PartialMatches: true}) + require.NoError(t, err) + + require.Equal(t, 1, len(value)) + require.Equal(t, doc1, value[0]) + }) + + t.Run("matches specified index", func(t *testing.T) { + _, err = db.Put(ctx, doc2) + require.NoError(t, err) + + value1, err := db.Get(ctx, "all", &iface.DocumentStoreGetOptions{PartialMatches: true}) + require.NoError(t, err) + + require.Equal(t, 1, len(value1)) + require.Equal(t, doc1, value1[0]) + + value2, err := db.Get(ctx, "some", &iface.DocumentStoreGetOptions{PartialMatches: true}) + require.NoError(t, err) + + require.Equal(t, 1, len(value2)) + require.Equal(t, doc2, value2[0]) + }) + }) + + t.Run("putAll", func(t *testing.T) { + db, err := orbitdb1.Docs(ctx, "orbit-db-tests-putall", nil) + require.NoError(t, err) + + defer func() { _ = db.Drop() }() + + doc1 := map[string]interface{}{"_id": "id1", "doc": "all the things"} + doc2 := map[string]interface{}{"_id": "id2", "doc": "some things"} + doc3 := map[string]interface{}{"_id": "id3", "doc": "more things"} + + _, err = db.PutAll(ctx, []interface{}{doc1, doc2, doc3}) + require.NoError(t, err) + + value, err := db.Get(ctx, "", &iface.DocumentStoreGetOptions{PartialMatches: true}) + require.NoError(t, err) + + require.Equal(t, 3, len(value)) + require.Contains(t, value, doc1) + require.Contains(t, value, doc2) + require.Contains(t, value, doc3) + }) + + t.Run("query", func(t *testing.T) { + viewsFilter := func(expectedCount int) func(e interface{}) (bool, error) { + return func(e interface{}) (bool, error) { + entry, ok := e.(map[string]interface{}) + if !ok { + return false, fmt.Errorf("unable to cast entry") + } + + if _, ok := entry["views"]; !ok { + return false, nil + } + + views, ok := entry["views"].(float64) + if !ok { + return false, fmt.Errorf("unable to cast value for field views") + } + + return int(views) > expectedCount, nil + } + } + + t.Run("query - simple", func(t *testing.T) { + db, err := orbitdb1.Docs(ctx, "orbit-db-tests-putall", nil) + require.NoError(t, err) + + defer func() { _ = db.Drop() }() + + doc1 := map[string]interface{}{"_id": "hello world", "doc": "all the things", "views": 17} + doc2 := map[string]interface{}{"_id": "sup world', doc: 'some of the things", "views": 10} + doc3 := map[string]interface{}{"_id": "hello other world", "doc": "none of the things", "views": 5} + doc4 := map[string]interface{}{"_id": "hey universe", "doc": ""} + + for _, doc := range []map[string]interface{}{doc1, doc2, doc3, doc4} { + _, err := db.Put(ctx, doc) + require.NoError(t, err) + } + + value, err := db.Query(ctx, viewsFilter(5)) + require.NoError(t, err) + require.Len(t, value, 2) + require.Contains(t, fmt.Sprintf("%v", value), fmt.Sprintf("%v", doc1)) + require.Contains(t, fmt.Sprintf("%v", value), fmt.Sprintf("%v", doc2)) + + value, err = db.Query(ctx, viewsFilter(10)) + require.NoError(t, err) + require.Len(t, value, 1) + require.Contains(t, fmt.Sprintf("%v", value), fmt.Sprintf("%v", doc1)) + + value, err = db.Query(ctx, viewsFilter(17)) + require.NoError(t, err) + require.Len(t, value, 0) + }) + + t.Run("query after delete", func(t *testing.T) { + db, err := orbitdb1.Docs(ctx, "orbit-db-tests-putall", nil) + require.NoError(t, err) + + defer func() { _ = db.Drop() }() + + doc1 := map[string]interface{}{"_id": "hello world", "doc": "all the things", "views": 17} + doc2 := map[string]interface{}{"_id": "sup world', doc: 'some of the things", "views": 10} + doc3 := map[string]interface{}{"_id": "hello other world", "doc": "none of the things", "views": 5} + doc4 := map[string]interface{}{"_id": "hey universe", "doc": ""} + + for _, doc := range []map[string]interface{}{doc1, doc2, doc3, doc4} { + _, err := db.Put(ctx, doc) + require.NoError(t, err) + } + _, err = db.Delete(ctx, "hello world") + require.NoError(t, err) + + value, err := db.Query(ctx, viewsFilter(4)) + require.NoError(t, err) + require.Len(t, value, 2) + require.Contains(t, fmt.Sprintf("%v", value), fmt.Sprintf("%v", doc2)) + require.Contains(t, fmt.Sprintf("%v", value), fmt.Sprintf("%v", doc3)) + + value, err = db.Query(ctx, viewsFilter(9)) + require.NoError(t, err) + require.Len(t, value, 1) + require.Contains(t, fmt.Sprintf("%v", value), fmt.Sprintf("%v", doc2)) + }) + }) + }) +}