diff --git a/go.mod b/go.mod index 270b1758..7d71f788 100644 --- a/go.mod +++ b/go.mod @@ -5,8 +5,8 @@ go 1.16 require ( github.com/dustin/go-humanize v1.0.0 github.com/google/go-cmp v0.5.7 - github.com/klauspost/compress v1.14.2 - github.com/nats-io/nats-server/v2 v2.7.3-0.20220211012451-4aaf76e20d0f - github.com/nats-io/nats.go v1.13.1-0.20220210170715-3ead80931bdf + github.com/klauspost/compress v1.14.3 + github.com/nats-io/nats-server/v2 v2.7.3-0.20220217204130-58806c1290b3 + github.com/nats-io/nats.go v1.13.1-0.20220216000616-0096b1bfae8d github.com/nats-io/nuid v1.0.1 ) diff --git a/go.sum b/go.sum index b11632d0..b3faf154 100644 --- a/go.sum +++ b/go.sum @@ -14,17 +14,17 @@ github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o= github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= -github.com/klauspost/compress v1.14.2 h1:S0OHlFk/Gbon/yauFJ4FfJJF5V0fc5HbBTJazi28pRw= -github.com/klauspost/compress v1.14.2/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= +github.com/klauspost/compress v1.14.3 h1:DQv1WP+iS4srNjibdnHtqu8JNWCDMluj5NzPnFJsnvk= +github.com/klauspost/compress v1.14.3/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0= github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 h1:vU9tpM3apjYlLLeY23zRWJ9Zktr5jp+mloR942LEOpY= github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= -github.com/nats-io/nats-server/v2 v2.7.3-0.20220211012451-4aaf76e20d0f h1:TtPUkhbgqPSZo3XxoljTPq6O0F341h9FVs6484SEZzA= -github.com/nats-io/nats-server/v2 v2.7.3-0.20220211012451-4aaf76e20d0f/go.mod h1:tckmrt0M6bVaDT3kmh9UrIq/CBOBBse+TpXQi5ldaa8= +github.com/nats-io/nats-server/v2 v2.7.3-0.20220217204130-58806c1290b3 h1:/EbgHNWe/Co8f9pA0E17LpkJ+iq2GsouXO9vWEJjLLM= +github.com/nats-io/nats-server/v2 v2.7.3-0.20220217204130-58806c1290b3/go.mod h1:tckmrt0M6bVaDT3kmh9UrIq/CBOBBse+TpXQi5ldaa8= github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= -github.com/nats-io/nats.go v1.13.1-0.20220210170715-3ead80931bdf h1:cQObjeyBfURbZ7h2g3a037Ce/Xq9ZWvjDoMPgsatTsE= -github.com/nats-io/nats.go v1.13.1-0.20220210170715-3ead80931bdf/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/nats-io/nats.go v1.13.1-0.20220216000616-0096b1bfae8d h1:0aVg1U2lBfksxhhnff5DOWwJix9D2n4uBHGaHBroI6I= +github.com/nats-io/nats.go v1.13.1-0.20220216000616-0096b1bfae8d/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= diff --git a/kv/README.md b/kv/README.md deleted file mode 100644 index 6942f94e..00000000 --- a/kv/README.md +++ /dev/null @@ -1,83 +0,0 @@ -# Proposed design for JetStream based KV services - -This is a proposal to design KV services ontop of JetStream, this requires a KV client to be written, the basic -features are: - - * Multiple named buckets full of keys with n historical values kept per value - * Put and Get `string(k)=string(v)` values - * Deleting a key by adding a delete operation to the key, preserving history - * Per key TTL - * Watching a specific key or the entire bucket for live updates - * Encoders and Decoders that transforms both keys and values - * A read-cache that builds an in-memory cache for fast reads - * Ability to read from regional read replicas maintained by JS Mirroring (planned) - * read-after-write safety unless read replicas used - * Valid keys are `\A[-/_a-zA-Z0-9]+\z` after encoding - * Valid buckets are `^[a-zA-Z0-9_-]+$` - * Custom Stream Names and Stream ingest subjects to cater for different domains, mirrors and imports - * Key starting with `_kv` is reserved for internal use - -This is an implementation of this concept focussed on the CLI, nats.go and others will have to build language -specific interfaces focussed on performance and end user. - -## Design - -### Storage - -Given a bucket `CONFIGURATION` we will have: - - * A stream called `KV_CONFIGURATION` with subjects `$KV.CONFIGURATION.*` - * The stream has Max Messages Per Subject limit set to history with optional placement, R and max age for TTL - * Getting a value uses the new `JSApiMsgGetRequest` feature to get the last message for a subject - * We store headers as per the table below - -### Headers - -|Header|Description| -|------|-----------| -|KV-Origin-Cluster|The cluster where the client was connected to that created the value| -|KV-Operation|If this is a delete operation then `DEL` else blank meaning `PUT`| - -### Watchers - -Watchers can either be per key or per the entire bucket. - -For watching a key we simply send key updates over the watch, starting with the latest value or nothing. We will only -send the last result for a subject - `NumPending==0`. - -For watching the bucket we will send a nil if the bucket is empty, else every result even historical ones, deletes should -be handled via the `Operation()` value. - -### Read Replicas - -A read replicas a mirror stream from the primary stream. The KV client is configured to do its reads against the -named bucket but all writes go to the main bucket for the KV based on above naming. - -This will inevitably result in breaking the read-after-write promises and should be made clear to clients. - -Local read caches to be build from the primary bucket not the replica unless this is a RO KV client then from the -given stream. - -To assist with configuring and discovery of replicas I think once we implement them we should add some values -into the bucket: - -``` -_kv/replicas: [bob_sfo, bob_lon] -_kv/replicas/cluster/sfo/read: bob_sfo -_kv/replicas/cluster/lon/read: bob_lon -``` - -So a kv client in `sfo` can quickly discover if there is a Read replica in his cluster by doing a single lookup. - -The `_kv/replicas` is there to help tooling also manage all replicas when acting on the main. Maintaining this list -is tricky as there can be a race, but we just have to confirm read after write. Eventually there will be a per subject -aware `Nats-Expected-Last-Sequence` which we can use to update the list only if the list has not changed since reading it. - -### Local Cache - -A local cache wraps the JetStream storage with one that passes most calls to the JetStream one but will store results -from `Get()` in a local cache to serve later. - -It will also start a Bucket Watch that pro-actively builds and maintains a cache of the entire bucket locally in memory. -The cache will only be used when `ready` - meaning the watch has delivered the last message in the watched bucket to -avoid serving stale data. Later we might detect stale cache states and go out of `ready` state proactively diff --git a/kv/jetstream_backend.go b/kv/jetstream_backend.go deleted file mode 100644 index fe677fa9..00000000 --- a/kv/jetstream_backend.go +++ /dev/null @@ -1,447 +0,0 @@ -// Copyright 2021 The NATS Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package kv - -import ( - "context" - "fmt" - "strconv" - "strings" - "sync" - "time" - - "github.com/nats-io/jsm.go" - "github.com/nats-io/jsm.go/api" - "github.com/nats-io/nats.go" -) - -const ( - kvOperationHeader = "KV-Operation" -) - -type jetStreamStorage struct { - name string - streamName string - subjectPrefix string - bucketSubject string - stream *jsm.Stream - nc *nats.Conn - mgr *jsm.Manager - mu sync.Mutex - opts *options - log Logger -} - -func newJetStreamStorage(name string, nc *nats.Conn, opts *options) (Storage, error) { - if !IsValidBucket(name) { - return nil, ErrInvalidBucketName - } - - mgr, err := jsm.New(nc, jsm.WithTimeout(opts.timeout)) - if err != nil { - return nil, err - } - - js := &jetStreamStorage{ - name: name, - nc: nc, - mgr: mgr, - subjectPrefix: "$KV", - log: opts.log, - opts: opts, - } - - if opts.overrideSubjectPrefix != "" { - js.subjectPrefix = opts.overrideSubjectPrefix - } - - if opts.overrideStreamName != "" { - js.streamName = opts.overrideStreamName - } else { - js.streamName = js.streamForBucket(name) - } - - js.bucketSubject = js.subjectForBucket(name) - - return js, nil -} - -func (j *jetStreamStorage) Status() (Status, error) { - stream, err := j.getOrLoadStream() - if err != nil { - return nil, err - } - - state, err := stream.State() - if err != nil { - return nil, err - } - - info, err := stream.Information() - if err != nil { - return nil, err - } - - return &jsStatus{ - name: j.name, - state: state, - info: *info, - }, nil -} - -func (j *jetStreamStorage) Close() error { - j.mu.Lock() - j.stream = nil - j.mu.Unlock() - - return nil -} - -func (j *jetStreamStorage) encode(val []byte) ([]byte, error) { - if j.opts.enc == nil { - return val, nil - } - - return j.opts.enc.Encode(val) -} - -func (j *jetStreamStorage) decode(val []byte) ([]byte, error) { - if j.opts.dec == nil { - return val, nil - } - - return j.opts.dec.Decode(val) -} - -func (j *jetStreamStorage) encodeKey(key string) (string, error) { - res := []string{} - for _, t := range strings.Split(key, ".") { - if t == ">" || t == "*" { - res = append(res, t) - continue - } - - et, err := j.encode([]byte(t)) - if err != nil { - return "", err - } - - res = append(res, string(et)) - } - - return strings.Join(res, "."), nil -} - -func (j *jetStreamStorage) Put(key string, val []byte, opts ...PutOption) (seq uint64, err error) { - ek, err := j.encodeKey(key) - if err != nil { - return 0, err - } - - if !IsValidKey(ek) { - return 0, ErrInvalidKey - } - - popts, err := newPutOpts(opts...) - if err != nil { - return 0, err - } - - msg := nats.NewMsg(j.subjectForKey(ek)) - msg.Data, err = j.encode(val) - if err != nil { - return 0, err - } - - if popts.jsPreviousSeq != 0 { - msg.Header.Add(api.JSExpectedLastSubjSeq, strconv.Itoa(int(popts.jsPreviousSeq))) - } - - res, err := j.nc.RequestMsg(msg, j.opts.timeout) - if err != nil { - return 0, err - } - pa, err := jsm.ParsePubAck(res) - if err != nil { - return 0, err - } - - return pa.Sequence, nil -} - -func (j *jetStreamStorage) History(ctx context.Context, key string) ([]Entry, error) { - ek, err := j.encodeKey(key) - if err != nil { - return nil, err - } - - if !IsValidKey(ek) { - return nil, ErrInvalidKey - } - - stream, err := j.getOrLoadStream() - if err != nil { - return nil, err - } - - sub, err := j.nc.SubscribeSync(nats.NewInbox()) - if err != nil { - return nil, err - } - - c, err := stream.NewConsumer(jsm.FilterStreamBySubject(j.subjectForKey(ek)), jsm.DeliverySubject(sub.Subject), jsm.DeliverAllAvailable()) - if err != nil { - return nil, err - } - - state, err := c.State() - if err != nil { - return nil, err - } - - if state.NumPending+state.Delivered.Consumer == 0 { - return nil, ErrUnknownKey - } - - var results []Entry - for { - msg, err := sub.NextMsgWithContext(ctx) - if err != nil { - return nil, err - } - - r, err := jsEntryFromMessage(j.name, key, msg, j.decode) - if err != nil { - return nil, err - } - - results = append(results, r) - - if r.Delta() == 0 { - break - } - } - - return results, nil -} - -func (j *jetStreamStorage) Get(key string) (Entry, error) { - ek, err := j.encodeKey(key) - if err != nil { - return nil, err - } - - if !IsValidKey(ek) { - return nil, ErrInvalidKey - } - - msg, err := j.mgr.ReadLastMessageForSubject(j.streamName, j.subjectForKey(ek)) - if err != nil { - if apiErr, ok := err.(api.ApiError); ok { - if apiErr.NatsErrorCode() == 10037 { - return nil, ErrUnknownKey - } - } - - return nil, err - } - - res, err := jsEntryFromStoredMessage(j.name, key, msg, j.decode) - if err != nil { - return nil, err - } - - if res.Operation() == DeleteOperation { - return nil, ErrUnknownKey - } - - return res, nil -} - -func (j *jetStreamStorage) Bucket() string { return j.name } -func (j *jetStreamStorage) BucketSubject() string { return j.bucketSubject } - -func (j *jetStreamStorage) Watch(ctx context.Context, key string) (Watch, error) { - if key == "" { - return newJSWatch(ctx, ">", j) - } - - ek, err := j.encodeKey(key) - if err != nil { - return nil, err - } - - return newJSWatch(ctx, j.subjectForKey(ek), j) -} - -// Delete deletes all values held for a key -func (j *jetStreamStorage) Delete(key string) error { - return j.deleteWithHdr(key, nil) -} - -func (j *jetStreamStorage) deleteWithHdr(key string, hdrs [][2]string) error { - ek, err := j.encodeKey(key) - if err != nil { - return err - } - - if !IsValidKey(ek) { - return ErrInvalidKey - } - - msg := nats.NewMsg(j.subjectForKey(ek)) - msg.Header.Add(kvOperationHeader, delOperationString) - - for _, h := range hdrs { - msg.Header.Add(h[0], h[1]) - } - - res, err := j.nc.RequestMsg(msg, j.opts.timeout) - if err != nil { - return err - } - - _, err = jsm.ParsePubAck(res) - return err -} - -func (j *jetStreamStorage) Purge(key string) error { - return j.deleteWithHdr(key, [][2]string{ - {api.JSRollup, api.JSRollupSubject}, - }) -} - -func (j *jetStreamStorage) Destroy() error { - stream, err := j.getOrLoadStream() - if err != nil { - return err - } - - err = stream.Delete() - if err != nil { - return err - } - - return nil -} - -func (j *jetStreamStorage) loadBucket() error { - j.mu.Lock() - defer j.mu.Unlock() - - stream, err := j.mgr.LoadStream(j.streamName) - if err != nil { - if api.IsNatsErr(err, 10059) { - return ErrUnknownBucket - } - return err - } - - j.stream = stream - - return err -} - -// CreateBucket creates a bucket matching the supplied options if none exist, else loads the existing bucket, does not try to consolidate configuration if it already exists -func (j *jetStreamStorage) CreateBucket() error { - j.mu.Lock() - defer j.mu.Unlock() - - opts := []jsm.StreamOption{ - jsm.StreamDescription(fmt.Sprintf("KV Bucket %s", j.name)), - jsm.MaxMessagesPerSubject(int64(j.opts.history)), - jsm.LimitsRetention(), - jsm.MaxAge(j.opts.ttl), - jsm.MaxMessageSize(j.opts.maxValueSize), - jsm.MaxBytes(j.opts.maxBucketSize), - jsm.AllowRollup(), - } - - if j.opts.replicas > 1 { - opts = append(opts, jsm.Replicas(int(j.opts.replicas))) - } - - if j.opts.placementCluster != "" { - opts = append(opts, jsm.PlacementCluster(j.opts.placementCluster)) - } - - if j.opts.ttl < 2*time.Minute { - opts = append(opts, jsm.DuplicateWindow(j.opts.ttl)) - } - - // TODO: mirrors - opts = append(opts, jsm.Subjects(j.bucketSubject)) - - stream, err := j.mgr.LoadOrNewStream(j.streamName, opts...) - if err != nil { - return err - } - - j.stream = stream - - // upgrade for . in keys - if len(stream.Subjects()) == 1 && stream.Subjects()[0] != j.bucketSubject { - err = stream.UpdateConfiguration(stream.Configuration(), jsm.Subjects(j.bucketSubject)) - if err != nil { - return err - } - } - - // upgrade for next server version - if !stream.RollupAllowed() { - err = stream.UpdateConfiguration(stream.Configuration(), jsm.AllowRollup()) - if err != nil { - return err - } - } - - return nil -} - -func (j *jetStreamStorage) streamForBucket(b string) string { - return fmt.Sprintf("KV_%s", b) -} - -func (j *jetStreamStorage) subjectForBucket(b string) string { - return fmt.Sprintf("%s.%s.>", j.subjectPrefix, b) -} - -func (j *jetStreamStorage) subjectForKey(k string) string { - return fmt.Sprintf("%s.%s.%s", j.subjectPrefix, j.name, k) -} - -func (j *jetStreamStorage) getOrLoadStream() (*jsm.Stream, error) { - j.mu.Lock() - stream := j.stream - j.mu.Unlock() - - if stream != nil { - return stream, nil - } - - err := j.loadBucket() - if err != nil { - return nil, err - } - - j.mu.Lock() - stream = j.stream - j.mu.Unlock() - - if stream == nil { - return nil, fmt.Errorf("no stream found") - } - - return stream, nil -} diff --git a/kv/jetstream_backend_test.go b/kv/jetstream_backend_test.go deleted file mode 100644 index 2d555e27..00000000 --- a/kv/jetstream_backend_test.go +++ /dev/null @@ -1,920 +0,0 @@ -// Copyright 2021 The NATS Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package kv - -import ( - "bytes" - "context" - "encoding/base64" - "fmt" - "io/ioutil" - "strconv" - "testing" - "time" - - "github.com/google/go-cmp/cmp" - "github.com/nats-io/jsm.go" - "github.com/nats-io/jsm.go/api" - natsd "github.com/nats-io/nats-server/v2/server" - "github.com/nats-io/nats.go" -) - -type BOrT interface { - Helper() - Fatalf(format string, args ...interface{}) -} - -type base64Codec struct{} - -func (r base64Codec) Encode(v []byte) ([]byte, error) { - bres := make([]byte, base64.StdEncoding.EncodedLen(len(v))) - base64.StdEncoding.Encode(bres, v) - return bres, nil -} - -func (r base64Codec) Decode(v []byte) ([]byte, error) { - dbuff := make([]byte, base64.StdEncoding.DecodedLen(len(v))) - n, err := base64.StdEncoding.Decode(dbuff, v) - return dbuff[:n], err -} - -func assertEntryHasStringValue(t *testing.T, res Entry, val string) { - t.Helper() - - if bytes.Equal(res.Value(), []byte(val)) { - return - } - - t.Fatalf("%s should have value %q got %q", res.Key(), val, res.Value()) -} - -func setupBasicTestBucket(t BOrT, so ...Option) (*jetStreamStorage, *natsd.Server, *nats.Conn, *jsm.Manager) { - t.Helper() - - srv, nc, mgr := startJSServer(t) - opts, _ := newOpts(so...) - opts.history = 5 - store, err := newJetStreamStorage("TEST", nc, opts) - if err != nil { - t.Fatalf("store create failed: %s", err) - } - - err = store.CreateBucket() - if err != nil { - t.Fatalf("create failed: %s", err) - } - - return store.(*jetStreamStorage), srv, nc, mgr -} - -func TestJetStreamStorage_WithStreamSubjectPrefix(t *testing.T) { - store, srv, nc, _ := setupBasicTestBucket(t, WithStreamSubjectPrefix("$BOB")) - defer srv.Shutdown() - defer nc.Close() - defer store.Close() - - _, err := store.Put("hello", []byte("world")) - if err != nil { - t.Fatalf("put failed: %s", err) - } - - val, err := store.Get("hello") - if err != nil { - t.Fatalf("get failed: %s", err) - } - - assertEntryHasStringValue(t, val, "world") - - str, err := store.getOrLoadStream() - if err != nil { - t.Fatalf("stream load failed: %s", err) - } - - if !cmp.Equal(str.Subjects(), []string{"$BOB.TEST.>"}) { - t.Fatalf("invalid stream subjects: %v", str.Subjects()) - } -} - -func TestJetStreamStorage_WithStreamName(t *testing.T) { - store, srv, nc, mgr := setupBasicTestBucket(t, WithStreamName("OVERRIDE")) - defer srv.Shutdown() - defer nc.Close() - defer store.Close() - - _, err := store.Put("hello", []byte("world")) - if err != nil { - t.Fatalf("put failed: %s", err) - } - - val, err := store.Get("hello") - if err != nil { - t.Fatalf("get failed: %s", err) - } - - assertEntryHasStringValue(t, val, "world") - - assertStream := func(t *testing.T, stream string, should bool) { - known, err := mgr.IsKnownStream(stream) - if err != nil { - t.Fatalf("unexpected error: %s", err) - } - - if should && !known { - t.Fatalf("%s did not exist", stream) - } else if !should && known { - t.Fatalf("%s did exist", stream) - } - } - - assertStream(t, "OVERRIDE", true) - assertStream(t, "KV_TEST", false) -} - -func TestJetStreamStorage_Codec(t *testing.T) { - store, srv, nc, _ := setupBasicTestBucket(t, WithTTL(time.Minute), WithCodec(base64Codec{})) - defer srv.Shutdown() - defer nc.Close() - defer store.Close() - - seq, err := store.Put("hello.world", []byte("world")) - if err != nil { - t.Fatalf("put failed: %s", err) - } - - stream, err := store.getOrLoadStream() - if err != nil { - t.Fatalf("stream load failed: %s", err) - } - - if stream.MaxAge() != time.Minute { - t.Fatalf("age was not 60s: %v", stream.MaxAge()) - } - - if stream.DuplicateWindow() != time.Minute { - t.Fatalf("duplicate window is not 60s: %v", stream.DuplicateWindow()) - } - - msg, err := stream.ReadMessage(seq) - if err != nil { - t.Fatalf("read failed: %s", err) - } - - if !bytes.Equal(msg.Data, []byte("d29ybGQ=")) { - t.Fatalf("encoded string was not stored: %q", msg.Data) - } - - if msg.Subject != "$KV.TEST.aGVsbG8=.d29ybGQ=" { - t.Fatalf("subject was not encoded: %s", msg.Subject) - } - - val, err := store.Get("hello.world") - if err != nil { - t.Fatalf("get failed: %s", err) - } - - if val.Key() != "hello.world" { - t.Fatalf("received invald key in entry: %v", val.Key()) - } - - assertEntryHasStringValue(t, val, "world") -} - -func TestJetStreamStorage_WatchBucket(t *testing.T) { - store, srv, nc, _ := setupBasicTestBucket(t) - defer srv.Shutdown() - defer nc.Close() - - for m := 0; m < 10; m++ { - _, err := store.Put("key", []byte(strconv.Itoa(m))) - if err != nil { - t.Fatalf("put failed: %s", err) - } - } - - status, _ := store.Status() - if status.Values() != 5 { - t.Fatalf("expected 5 messages got %d", status.Values()) - } - - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - watch, err := store.Watch(ctx, "") - if err != nil { - t.Fatalf("watch failed: %s", err) - } - - cnt := 9 - kills := 0 - var latest Entry - - // similar to the Watch() test but now we make sure we get old values and new values and that - // even after a consumer outage we do not get duplicate messages over the channel - for { - select { - case r, ok := <-watch.Channel(): - if !ok { - // channel is closed: check we got the last message we sent - assertEntryHasStringValue(t, latest, strconv.Itoa(cnt)) - - if kills == 0 { - t.Fatalf("did not kill the consumer during the test") - } - - return - } - - latest = r - - // value should be that from the last pass through the watch loop or the initial from the warm up for loop - assertEntryHasStringValue(t, latest, strconv.Itoa(cnt)) - - // after 10 the test is done, close the watch, channel close handler - // will verify we got what we needed - if cnt == 20 { - watch.Close() - continue - } - - cnt++ - - _, err = store.Put("key", []byte(strconv.Itoa(cnt))) - if err != nil { - t.Fatalf("put failed: %s", err) - } - - // after a few we kill the consumer to test recover - if cnt == 15 { - kills++ - watch.(*jsWatch).cons.Delete() - } - - case <-ctx.Done(): - t.Fatalf("timeout running test") - } - } -} - -func TestJetStreamStorage_WatchEndNotify(t *testing.T) { - store, srv, nc, _ := setupBasicTestBucket(t) - defer srv.Shutdown() - defer nc.Close() - - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - watch, err := store.Watch(ctx, "x.>") - if err != nil { - t.Fatalf(": %s", err) - } - e := <-watch.Channel() - if e != nil { - t.Fatalf("expected nil value got %+v", e) - } - _, err = store.Put("x.y", []byte("hello")) - if err != nil { - t.Fatalf(": %s", err) - } - e = <-watch.Channel() - if string(e.Value()) != "hello" { - t.Fatalf("Expected hello got: %q", e.Value()) - } - if e.Key() != "x.y" { - t.Fatalf("Expected key x.y got: %v", e.Key()) - } -} - -func TestJetStreamStorage_Watch(t *testing.T) { - store, srv, nc, _ := setupBasicTestBucket(t) - defer srv.Shutdown() - defer nc.Close() - - for m := 0; m < 10; m++ { - _, err := store.Put("key", []byte(strconv.Itoa(m))) - if err != nil { - t.Fatalf("put failed: %s", err) - } - } - - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - watch, err := store.Watch(ctx, "key") - if err != nil { - t.Fatalf("watch failed: %s", err) - } - - status, _ := store.Status() - if status.Values() != 5 { - t.Fatalf("expected 5 messages got %d", status.Values()) - } - - cnt := 9 - kills := 0 - var latest Entry - - for { - select { - case r, ok := <-watch.Channel(): - if !ok { - // channel is closed: check we got the last message we sent - assertEntryHasStringValue(t, latest, strconv.Itoa(cnt)) - - if kills == 0 { - t.Fatalf("did not kill the consumer during the test") - } - - return - } - - latest = r - - // value should be that from the last pass through the watch loop or the initial from the warm up for loop - assertEntryHasStringValue(t, latest, strconv.Itoa(cnt)) - - // we should always only get the latest value - if r.Delta() != 0 { - t.Fatalf("received non latest message %+v", r) - } - - // after 10 the test is done, close the watch, channel close handler - // will verify we got what we needed - if cnt == 20 { - watch.Close() - continue - } - - cnt++ - - _, err = store.Put("key", []byte(strconv.Itoa(cnt))) - if err != nil { - t.Fatalf("put failed: %s", err) - } - - // after a few we kill the consumer to test recover - if cnt == 15 { - kills++ - watch.(*jsWatch).cons.Delete() - } - - case <-ctx.Done(): - t.Fatalf("timeout running test") - } - } -} - -func TestJetStreamStorage_Purge(t *testing.T) { - store, srv, nc, _ := setupBasicTestBucket(t) - defer srv.Shutdown() - defer nc.Close() - - for i := 0; i < 5; i++ { - _, err := store.Put("x", []byte(strconv.Itoa(i))) - if err != nil { - t.Fatalf("put failed: %s", err) - } - - _, err = store.Put("y", []byte(strconv.Itoa(i))) - if err != nil { - t.Fatalf("put failed: %s", err) - } - } - - checkCount := func(t *testing.T, subj string, expect uint64) { - t.Helper() - - c, err := store.stream.NewConsumer(jsm.DurableName("X"), jsm.FilterStreamBySubject(subj)) - if err != nil { - t.Fatalf("consumer failed: %s", err) - } - defer c.Delete() - - state, _ := c.LatestState() - cnt := state.NumPending + state.Delivered.Consumer - if cnt != expect { - t.Fatalf("expected 5 messages got: %d", cnt) - } - } - - checkCount(t, store.subjectForKey("x"), 5) - checkCount(t, store.subjectForKey("y"), 5) - - err := store.Purge("x") - if err != nil { - t.Fatalf("purge failed: %s", err) - } - - checkCount(t, store.subjectForKey("x"), 1) - checkCount(t, store.subjectForKey("y"), 5) - checkCount(t, store.subjectForKey("z"), 0) - - _, err = store.Get("x") - if err != ErrUnknownKey { - t.Fatalf("get failed: %s", err) - } -} - -func TestJetStreamStorage_Delete(t *testing.T) { - store, srv, nc, _ := setupBasicTestBucket(t) - defer srv.Shutdown() - defer nc.Close() - - store.Put("x", []byte("x")) - store.Put("x", []byte("y")) - store.Put("x", []byte("z")) - store.Put("y", []byte("y")) - store.Put("z", []byte("y")) - - res, err := store.Get("x") - if err != nil { - t.Fatalf("get failed: %s", err) - } - assertEntryHasStringValue(t, res, "z") - - err = store.Delete("x") - if err != nil { - t.Fatalf("delete failed: %s", err) - } - - _, err = store.Get("x") - if err != ErrUnknownKey { - t.Fatalf("expected unknown key error: %v", err) - } - - res, err = store.Get("z") - if err != nil { - t.Fatalf("unexpected error: %s", err) - } - assertEntryHasStringValue(t, res, "y") - - res, err = store.Get("y") - if err != nil { - t.Fatalf("unexpected error: %s", err) - } - assertEntryHasStringValue(t, res, "y") - - err = store.Delete("x.>") - if err != ErrInvalidKey { - t.Fatalf("Expected invalid key error doing Delete() on wildcard: %v", err) - } -} - -func TestJetStreamStorage_Status(t *testing.T) { - store, srv, nc, _ := setupBasicTestBucket(t) - defer srv.Shutdown() - defer nc.Close() - - store.Put("x", []byte("y")) - store.Put("y", []byte("y")) - store.Put("z", []byte("y")) - - status, err := store.Status() - if err != nil { - t.Fatalf("status failed: %s", err) - } - - if status.Values() != 3 { - t.Fatalf("invalid values %d", status.Values()) - } - - if ok, failed := status.Replicas(); ok != 1 || failed != 0 { - t.Fatalf("invalid replicas ok: %d failed: %d", ok, failed) - } - - if status.BucketLocation() != "unknown" { - t.Fatalf("invalid cluster %q", status.BucketLocation()) - } - - if status.History() != 5 { - t.Fatalf("invalid history %d", status.History()) - } -} - -func TestJetStreamStorage_Put(t *testing.T) { - store, srv, nc, _ := setupBasicTestBucket(t) - defer srv.Shutdown() - defer nc.Close() - - for i := uint64(1); i <= 100; i++ { - seq, err := store.Put("hello", []byte("world")) - if err != nil { - t.Fatalf("put failed: %s", err) - } - - if seq != i { - t.Fatalf("invalid sequence %d received", seq) - } - - res, err := store.Get("hello") - if err != nil { - t.Fatalf("get failed: %s", err) - } - - if res.Key() != "hello" { - t.Fatalf("incorrect key: %s", res.Key()) - } - assertEntryHasStringValue(t, res, "world") - if res.Sequence() != seq { - t.Fatalf("incorrect seq: %d", res.Sequence()) - } - if res.Delta() != 0 { - t.Fatalf("incorrect delta: %d", res.Delta()) - } - - // within reasonable grace period - if res.Created().Before(time.Now().Add(-1 * time.Second)) { - t.Fatalf("incorrect create time: %v", res.Created()) - } - } - - // test . in keys - _, err := store.Put("x.y.hello", []byte("world.world.world")) - if err != nil { - t.Fatalf("put failed: %s", err) - } - res, err := store.Get("x.y.hello") - if err != nil { - t.Fatalf("get failed: %s", err) - } - if string(res.Value()) != "world.world.world" { - t.Fatalf("got invalid value %q", res.Value()) - } - if res.Key() != "x.y.hello" { - t.Fatalf("got invalid key %q", res.Key()) - } - - seq, err := store.Put("hello", []byte("world")) - if err != nil { - t.Fatalf("put failed: %s", err) - } - - _, err = store.Put("x.>", []byte("world")) - if err == nil { - t.Fatalf("Expected error doing Put() on wildcard key") - } - - // test only if last value seq - _, err = store.Put("hello", []byte("world"), OnlyIfLastValueSequence(seq-1)) - if err != nil { - apiErr, ok := err.(api.ApiError) - if ok { - if apiErr.NatsErrorCode() != 10071 { - t.Fatalf("Expected error 10071, got %v", apiErr) - } - } else { - t.Fatalf("Expected err 10071 got, got generic error: %v", err) - } - } - - _, err = store.Put("hello", []byte("world"), OnlyIfLastValueSequence(seq)) - if err != nil { - t.Fatalf("Expected correct sequence put to succeed: %s", err) - } -} - -func TestJetStreamStorage_History(t *testing.T) { - store, srv, nc, _ := setupBasicTestBucket(t, WithHistory(5)) - defer srv.Shutdown() - defer nc.Close() - - publish := uint64(5) - - for i := uint64(1); i <= publish; i++ { - _, err := store.Put("k", []byte(fmt.Sprintf("val%d", i))) - if err != nil { - t.Fatalf("put failed: %s", err) - } - } - - hist, err := store.History(context.Background(), "k") - if err != nil { - t.Fatalf("history failed: %s", err) - } - - if len(hist) != int(publish) { - t.Fatalf("expected %d history got %d", publish, len(hist)) - } - - for i, r := range hist { - assertEntryHasStringValue(t, r, fmt.Sprintf("val%d", i+1)) - } - - // invalid keys - _, err = store.History(context.Background(), ">") - if err != ErrInvalidKey { - t.Fatalf("Expected invalid key error doing History() for wildcard: %s", err) - } - - // unknown key - _, err = store.History(context.Background(), "x.y") - if err != ErrUnknownKey { - t.Fatalf("Expected unknown key error doing History() for unknown key: %s", err) - } - -} - -func TestJetStreamStorage_Get(t *testing.T) { - store, srv, nc, _ := setupBasicTestBucket(t) - defer srv.Shutdown() - defer nc.Close() - - for i := uint64(1); i <= 1000; i++ { - _, err := store.Put(fmt.Sprintf("k%d", i), []byte(fmt.Sprintf("val%d", i))) - if err != nil { - t.Fatalf("put failed: %s", err) - } - } - - state, err := store.stream.State() - if err != nil { - t.Fatalf("state failed: %s", err) - } - if state.Msgs != 1000 { - t.Fatalf("expected 1000 messages got %d", state.Msgs) - } - - for i := uint64(1); i <= 1000; i++ { - key := fmt.Sprintf("k%d", i) - res, err := store.Get(key) - if err != nil { - t.Fatalf("get failed: %s", err) - } - - if res.Key() != key { - t.Fatalf("invalid key: %s", res.Key()) - } - assertEntryHasStringValue(t, res, fmt.Sprintf("val%d", i)) - if res.Sequence() != i { - t.Fatalf("invalid sequence: %d", res.Sequence()) - } - } - - res, err := store.Get("UNKNOWN") - if err == nil { - t.Fatalf("expected an error") - } - if res != nil { - t.Fatalf("expected nil result") - } - - _, err = store.Get(">") - if err == nil { - t.Fatalf("expected error dong Get() on wildcard") - } -} - -func TestJetStreamStorage_Close(t *testing.T) { - store, srv, nc, _ := setupBasicTestBucket(t) - defer srv.Shutdown() - defer nc.Close() - - if store.stream == nil { - t.Fatalf("load failed") - } - - err := store.Close() - if err != nil { - t.Fatalf("close failed: %s", err) - } - - if store.stream != nil { - t.Fatalf("close failed, stream is not nil") - } -} - -func TestJetStreamStorage_UnknownBucket(t *testing.T) { - srv, nc, _ := startJSServer(t) - defer srv.Shutdown() - defer nc.Close() - - kv, err := NewClient(nc, "MISSING") - if err != nil { - t.Fatalf("new failed: %s", err) - } - - _, err = kv.Status() - if err != ErrUnknownBucket { - t.Fatalf("Unexpected error: %s", err) - } -} - -func TestJetStreamStorage_CreateBucket(t *testing.T) { - srv, nc, _ := startJSServer(t) - defer srv.Shutdown() - defer nc.Close() - - opts, _ := newOpts(WithHistory(5), WithTTL(24*time.Hour)) - - st, err := newJetStreamStorage("TEST", nc, opts) - if err != nil { - t.Fatalf("store create failed: %s", err) - } - - store := st.(*jetStreamStorage) - err = store.CreateBucket() - if err != nil { - t.Fatalf("create failed: %s", err) - } - - if store.stream == nil { - t.Fatalf("no stream stored") - } - - if store.stream.Name() != "KV_TEST" { - t.Fatalf("invalid stream name %s", store.stream.Name()) - } - - if store.stream.MaxAge() != 24*time.Hour { - t.Fatalf("invalid stream retention: %v", store.stream.MaxAge()) - } - - if store.stream.MaxMsgsPerSubject() != 5 { - t.Fatalf("invalid stream retention: %v", store.stream.MaxMsgsPerSubject()) - } -} - -func TestJetStreamStorage_Destroy(t *testing.T) { - srv, nc, mgr := startJSServer(t) - defer srv.Shutdown() - defer nc.Close() - - opts, _ := newOpts() - store, err := newJetStreamStorage("TEST", nc, opts) - if err != nil { - t.Fatalf("store create failed: %s", err) - } - - err = store.CreateBucket() - if err != nil { - t.Fatalf("create failed: %s", err) - } - - err = store.Destroy() - if err != nil { - t.Fatalf("destroy failed: %s", err) - } - - known, err := mgr.IsKnownStream("KV_TEST") - if err != nil { - t.Fatalf("known failed: %s", err) - } - - if known { - t.Fatalf("stream existed after Destroy()") - } -} - -func BenchmarkJetStreamPut(b *testing.B) { - store, srv, nc, _ := setupBasicTestBucket(b) - defer srv.Shutdown() - defer nc.Close() - defer store.Close() - - b.ResetTimer() - - for n := 0; n < b.N; n++ { - key := fmt.Sprintf("k%d", n%10) - val := strconv.Itoa(n) - _, err := store.Put(key, []byte(val)) - if err != nil { - b.Fatalf("put failed: %s", err) - } - } -} - -func BenchmarkReadCacheGet(b *testing.B) { - b.StopTimer() - store, srv, nc, _ := setupBasicTestBucket(b) - defer srv.Shutdown() - defer nc.Close() - defer store.Close() - - cached, err := newReadCache(store, store.log) - if err != nil { - b.Fatalf("cache setup failed: %s", err) - } - defer cached.Close() - - seq, err := cached.Put("hello", []byte("world")) - if err != nil { - b.Fatalf("put failed: %s", err) - } - - var res Entry - - b.StartTimer() - - for n := 0; n < b.N; n++ { - res, err = cached.Get("hello") - if err != nil { - b.Fatalf("get failed: %s", err) - } - if res.Sequence() != seq { - b.Fatalf("got wrong sequence: %d", res.Sequence()) - } - } -} - -func BenchmarkJetStreamGet(b *testing.B) { - b.StopTimer() - store, srv, nc, _ := setupBasicTestBucket(b) - defer srv.Shutdown() - defer nc.Close() - defer store.Close() - - seq, err := store.Put("hello", []byte("world")) - if err != nil { - b.Fatalf("put failed: %s", err) - } - b.StartTimer() - - var res Entry - for n := 0; n < b.N; n++ { - res, err = store.Get("hello") - if err != nil { - b.Fatalf("get failed: %s", err) - } - if res.Sequence() != seq { - b.Fatalf("got wrong sequence: %d", res.Sequence()) - } - } -} - -func BenchmarkJetStreamPutGet(b *testing.B) { - b.StopTimer() - store, srv, nc, _ := setupBasicTestBucket(b) - defer srv.Shutdown() - defer nc.Close() - defer store.Close() - - b.StartTimer() - - for n := 0; n < b.N; n++ { - key := fmt.Sprintf("k%d", n%10) - _, err := store.Put(key, []byte(strconv.Itoa(n))) - if err != nil { - b.Fatalf("put failed: %s", err) - } - - _, err = store.Get(key) - if err != nil { - b.Fatalf("get failed: %s", err) - } - } -} - -func startJSServer(t BOrT) (*natsd.Server, *nats.Conn, *jsm.Manager) { - t.Helper() - - d, err := ioutil.TempDir("", "jstest") - if err != nil { - t.Fatalf("temp dir could not be made: %s", err) - } - - opts := &natsd.Options{ - ServerName: "test.example.net", - JetStream: true, - StoreDir: d, - Port: -1, - Host: "localhost", - LogFile: "/tmp/server.log", - // Trace: true, - // TraceVerbose: true, - Cluster: natsd.ClusterOpts{Name: "gotest"}, - } - - s, err := natsd.NewServer(opts) - if err != nil { - t.Fatalf("server start failed: %s", err) - } - - go s.Start() - if !s.ReadyForConnections(10 * time.Second) { - t.Fatalf("nats server did not start") - } - - // s.ConfigureLogger() - - nc, err := nats.Connect(s.ClientURL(), nats.UseOldRequestStyle(), nats.MaxReconnects(100)) - if err != nil { - t.Fatalf("client start failed: %s", err) - } - - mgr, err := jsm.New(nc, jsm.WithTimeout(time.Second)) - if err != nil { - t.Fatalf("manager creation failed: %s", err) - } - - return s, nc, mgr -} diff --git a/kv/jetstream_entry.go b/kv/jetstream_entry.go deleted file mode 100644 index dcae1d52..00000000 --- a/kv/jetstream_entry.go +++ /dev/null @@ -1,139 +0,0 @@ -// Copyright 2021 The NATS Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package kv - -import ( - "bufio" - "bytes" - "encoding/json" - "fmt" - "net/http" - "net/textproto" - "time" - - "github.com/nats-io/jsm.go" - "github.com/nats-io/jsm.go/api" - "github.com/nats-io/nats.go" -) - -type jsEntry struct { - bucket string - key string - val []byte - ts time.Time - seq uint64 - pending uint64 - operation Operation -} - -func (j *jsEntry) Bucket() string { return j.bucket } -func (j *jsEntry) Key() string { return j.key } -func (j *jsEntry) Value() []byte { return j.val } -func (j *jsEntry) Created() time.Time { return j.ts } -func (j *jsEntry) Sequence() uint64 { return j.seq } -func (j *jsEntry) Delta() uint64 { return j.pending } -func (j *jsEntry) Operation() Operation { return j.operation } -func (j *jsEntry) MarshalJSON() ([]byte, error) { - return json.Marshal(j.genericEntry()) -} - -func (j *jsEntry) genericEntry() *GenericEntry { - return &GenericEntry{ - Bucket: j.bucket, - Key: j.key, - Val: j.val, - Created: j.ts.UnixNano(), - Seq: j.seq, - Operation: string(j.operation), - } -} - -func jsEntryFromStoredMessage(bucket, key string, m *api.StoredMsg, dec func([]byte) ([]byte, error)) (*jsEntry, error) { - res := &jsEntry{ - bucket: bucket, - key: key, - ts: m.Time, - seq: m.Sequence, - operation: PutOperation, - pending: 0, // we dont know from StoredMsg and we only use this in get last for subject, so 0 is right - } - - var err error - res.val, err = dec(m.Data) - if err != nil { - return nil, err - } - - if m.Header != nil || len(m.Header) > 0 { - hdrs, err := decodeHeadersMsg(m.Header) - if err != nil { - return nil, err - } - - if op := hdrs.Get(kvOperationHeader); op == delOperationString { - res.operation = DeleteOperation - } - } - - return res, nil -} - -func jsEntryFromMessage(bucket, key string, m *nats.Msg, dec func([]byte) ([]byte, error)) (*jsEntry, error) { - meta, err := jsm.ParseJSMsgMetadata(m) - if err != nil { - return nil, err - } - - res := &jsEntry{ - bucket: bucket, - key: key, - ts: meta.TimeStamp(), - seq: meta.StreamSequence(), - pending: meta.Pending(), - operation: PutOperation, - } - - res.val, err = dec(m.Data) - if err != nil { - return nil, err - } - - if op := m.Header.Get(kvOperationHeader); op != "" { - if op == delOperationString { - res.operation = DeleteOperation - } - } - - return res, nil -} - -const ( - hdrLine = "NATS/1.0\r\n" - crlf = "\r\n" - hdrPreEnd = len(hdrLine) - len(crlf) -) - -func decodeHeadersMsg(data []byte) (http.Header, error) { - tp := textproto.NewReader(bufio.NewReader(bytes.NewReader(data))) - if l, err := tp.ReadLine(); err != nil || l != hdrLine[:hdrPreEnd] { - return nil, fmt.Errorf("could not decode headers") - } - - mh, err := tp.ReadMIMEHeader() - if err != nil { - return nil, fmt.Errorf("could not decode headers") - } - - return http.Header(mh), nil -} diff --git a/kv/jetstream_status.go b/kv/jetstream_status.go deleted file mode 100644 index 7daa8bde..00000000 --- a/kv/jetstream_status.go +++ /dev/null @@ -1,64 +0,0 @@ -// Copyright 2021 The NATS Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package kv - -import ( - "fmt" - "time" - - "github.com/nats-io/jsm.go/api" -) - -type jsStatus struct { - name string - state api.StreamState - info api.StreamInfo -} - -func (j *jsStatus) TTL() time.Duration { return j.info.Config.MaxAge } -func (j *jsStatus) BackingStore() string { return j.info.Config.Name } -func (j *jsStatus) Keys() ([]string, error) { return nil, fmt.Errorf("unsupported") } -func (j *jsStatus) Bucket() string { return j.name } -func (j *jsStatus) Values() uint64 { return j.state.Msgs } -func (j *jsStatus) History() int64 { return j.info.Config.MaxMsgsPer } -func (j *jsStatus) MaxBucketSize() int64 { return j.info.Config.MaxBytes } -func (j *jsStatus) MaxValueSize() int32 { return j.info.Config.MaxMsgSize } -func (j *jsStatus) BucketLocation() string { - if j.info.Cluster != nil { - return j.info.Cluster.Name - } - - return "unknown" -} - -func (j *jsStatus) Replicas() (ok int, failed int) { - // the leader isnt listed as a peer, so we start with 1 - ok = 1 - - if j.info.Cluster != nil { - for _, peer := range j.info.Cluster.Replicas { - if peer.Current { - ok++ - } else { - failed++ - } - } - - } - - return ok, failed -} -func (j *jsStatus) MirrorStatus() (lag int64, active bool, err error) { - return 0, false, fmt.Errorf("unsupported") -} diff --git a/kv/jetstream_watch.go b/kv/jetstream_watch.go deleted file mode 100644 index 083712b1..00000000 --- a/kv/jetstream_watch.go +++ /dev/null @@ -1,265 +0,0 @@ -// Copyright 2021 The NATS Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package kv - -import ( - "context" - "fmt" - "strings" - "sync" - "time" - - "github.com/nats-io/jsm.go" - "github.com/nats-io/nats.go" -) - -type jsWatch struct { - streamName string - bucket string - subj string - subjPrefix string - mgr *jsm.Manager - cons *jsm.Consumer - sub *nats.Subscription - nc *nats.Conn - lastSeen time.Time - lastSeq uint64 - outCh chan Entry - ctx context.Context - dec Decoder - cancel func() - log Logger - running bool - mu sync.Mutex -} - -func newJSWatch(ctx context.Context, subj string, j *jetStreamStorage) (*jsWatch, error) { - if subj == "" { - subj = ">" - } - - w := &jsWatch{ - streamName: j.streamName, - bucket: j.name, - subj: subj, - subjPrefix: j.subjectPrefix + "." + j.name + ".", - nc: j.nc, - mgr: j.mgr, - log: j.log, - dec: j.opts.dec, - outCh: make(chan Entry, 1000), - } - - w.ctx, w.cancel = context.WithCancel(ctx) - - err := w.start() - if err != nil { - w.cancel() - return nil, err - } - - return w, nil -} - -func (w *jsWatch) Channel() chan Entry { - w.mu.Lock() - defer w.mu.Unlock() - - return w.outCh -} - -func (w *jsWatch) Close() error { - w.mu.Lock() - defer w.mu.Unlock() - - if w.ctx == nil { - return nil - } - - w.cancel() - - return nil -} - -func (w *jsWatch) decode(v []byte) ([]byte, error) { - if w.dec == nil { - return v, nil - } - - return w.dec.Decode(v) -} - -func (w *jsWatch) handler(m *nats.Msg) { - w.mu.Lock() - w.lastSeen = time.Now() - w.mu.Unlock() - - hdr := m.Header.Get("Status") - if hdr == "100" && m.Reply != "" { - m.Respond(nil) - return - } else if hdr == "100" { - return - } - - if !strings.HasPrefix(m.Subject, w.subjPrefix) { - return - } - - key := strings.TrimPrefix(m.Subject, w.subjPrefix) - res, err := jsEntryFromMessage(w.bucket, key, m, w.decode) - if err != nil { - return - } - - w.mu.Lock() - defer w.mu.Unlock() - - if !w.running { - return - } - - select { - case w.outCh <- res: - w.lastSeq = res.Sequence() - - m.Ack() - default: - } -} - -func (w *jsWatch) consumerHealthManager() { - // todo: use a backoff - ticker := time.NewTicker(500 * time.Millisecond) - - // give the first check some time - w.mu.Lock() - w.lastSeen = time.Now() - w.mu.Unlock() - - recreate := func() error { - w.mu.Lock() - w.sub.Unsubscribe() - w.sub = nil - w.cons = nil - w.mu.Unlock() - - return w.createConsumer() - } - - for { - select { - case <-ticker.C: - w.mu.Lock() - seen := w.lastSeen - w.mu.Unlock() - - if time.Since(seen) > 3*time.Second { - w.log.Errorf("consumer failed, no heartbeats since %s", seen) - - err := recreate() - if err != nil { - w.log.Errorf("recreating failed consumer failed: %s", err) - } - } - - case <-w.ctx.Done(): - ticker.Stop() - return - } - } -} - -func (w *jsWatch) createConsumer() error { - w.mu.Lock() - defer w.mu.Unlock() - - var err error - - if w.sub != nil || w.cons != nil { - return fmt.Errorf("already running") - } - - w.sub, err = w.nc.Subscribe(nats.NewInbox(), w.handler) - if err != nil { - return err - } - - opts := []jsm.ConsumerOption{ - jsm.FilterStreamBySubject(w.subj), - jsm.DeliverySubject(w.sub.Subject), - jsm.IdleHeartbeat(2 * time.Second), - jsm.AcknowledgeExplicit(), - jsm.ConsumerDescription(fmt.Sprintf("KV watch for subject %v", w.subj)), - } - - if w.lastSeq != 0 { - opts = append(opts, jsm.StartAtSequence(w.lastSeq+1)) - } else { - opts = append(opts, jsm.DeliverLastPerSubject()) - } - - w.cons, err = w.mgr.NewConsumer(w.streamName, opts...) - if err != nil { - return err - } - - state, err := w.cons.LatestState() - if err != nil { - return err - } - - // if no messages are pending or delivered we should tell the watcher - // we have nothing and it should assume its up to date - if state.NumPending+state.Delivered.Consumer == 0 { - w.outCh <- nil - } - - return nil -} - -func (w *jsWatch) start() error { - w.mu.Lock() - if w.running { - w.mu.Unlock() - return fmt.Errorf("already running") - } - w.running = true - w.mu.Unlock() - - go func() { - <-w.ctx.Done() - - w.mu.Lock() - defer w.mu.Unlock() - - w.running = false - if w.outCh != nil { - close(w.outCh) - } - - if w.sub != nil { - w.sub.Unsubscribe() - } - }() - - err := w.createConsumer() - if err != nil { - return err - } - - go w.consumerHealthManager() - - return nil -} diff --git a/kv/kv.go b/kv/kv.go deleted file mode 100644 index 780c0023..00000000 --- a/kv/kv.go +++ /dev/null @@ -1,269 +0,0 @@ -// Copyright 2021 The NATS Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package kv - -import ( - "context" - "errors" - "regexp" - "strings" - "time" - - "github.com/nats-io/nats.go" -) - -var ( - validKeyRe = regexp.MustCompile(ValidKeyPattern) - validBucketRe = regexp.MustCompile(ValidBucketPattern) -) - -// Operation is the type of action taken on a key -type Operation string - -const ( - // PutOperation represents a PUT of data into the kv bucket - PutOperation Operation = "PUT" - - // DeleteOperation represents a delete of a specific key in a bucket - DeleteOperation Operation = "DEL" - - // ValidKeyPattern is a regular expression key should match, post encoding, to be valid - ValidKeyPattern = `\A[-/_=\.a-zA-Z0-9]+\z` - - // ValidBucketPattern is a regular expression bucket names should match - ValidBucketPattern = `\A[a-zA-Z0-9_-]+\z` - - // ReservedKeyPrefix is a prefix for keys reserved for internal use - ReservedKeyPrefix = "_kv" - - delOperationString string = string(DeleteOperation) - putOperationString string = string(PutOperation) -) - -var ( - // ErrUnknownKey is returned when a key does not exist - ErrUnknownKey = errors.New("unknown key") - - // ErrInvalidKey is returned for keys that do not match ValidKeyPattern - ErrInvalidKey = errors.New("invalid key") - - // ErrInvalidBucketName is returned when trying to access buckets that do not match ValidBucketPattern - ErrInvalidBucketName = errors.New("invalid bucket name") - - // ErrUnknownBucket is returned when a bucket could not be found - ErrUnknownBucket = errors.New("unknown bucket") -) - -// NewBucket creates or load a bucket. If the bucket already exist the existing bucket configuration is not reconciled -// -// Deprecated: this is now deprecated, please use the KV feature in nats.go -func NewBucket(nc *nats.Conn, bucket string, opts ...Option) (KV, error) { - return newOrLoad(nc, bucket, true, opts...) -} - -// NewClient creates a new read-write client -// -// Deprecated: this is now deprecated, please use the KV feature in nats.go -func NewClient(nc *nats.Conn, bucket string, opts ...Option) (KV, error) { - return newOrLoad(nc, bucket, false, opts...) -} - -// NewRoClient creates a read only key value store. -// -// Deprecated: this is now deprecated, please use the KV feature in nats.go -func NewRoClient(nc *nats.Conn, bucket string, opts ...Option) (RoKV, error) { - return NewClient(nc, bucket, opts...) -} - -func newOrLoad(nc *nats.Conn, bucket string, create bool, opts ...Option) (KV, error) { - o, err := newOpts(opts...) - if err != nil { - return nil, err - } - - store, err := newJetStreamStorage(bucket, nc, o) - if err != nil { - return nil, err - } - - if create { - err = store.CreateBucket() - if err != nil { - return nil, err - } - } - - if o.localCache { - return newReadCache(store, o.log) - } - - return store, nil -} - -// Deprecated: this is now deprecated, please use the KV feature in nats.go -type Storage interface { - KV - - Bucket() string - BucketSubject() string - CreateBucket() error -} - -// KV is a read-write interface to a single key-value store bucket -// -// Deprecated: this is now deprecated, please use the KV feature in nats.go -type KV interface { - // Put saves a value into a key - Put(key string, val []byte, opts ...PutOption) (seq uint64, err error) - - // Delete marks the key as deleted, history is retained subject to configured history limit - Delete(key string) error - - // Purge marks the key as deleted and removes history, after this operation 1 historic value is kept - the purge - Purge(key string) error - - // Destroy removes the entire bucket and all data, KV cannot be used after - Destroy() error - - RoKV -} - -// RoKV is a read-only interface to a single key-value store bucket -// -// Deprecated: this is now deprecated, please use the KV feature in nats.go -type RoKV interface { - // Get gets a key from the store - Get(key string) (Entry, error) - - // History retrieves historic values for a key - History(ctx context.Context, key string) ([]Entry, error) - - // Watch a key for updates, the same Entry might be delivered more than once, a nil entry means end of available data was reached - Watch(ctx context.Context, key string) (Watch, error) - - // Close releases in-memory resources held by the KV, called automatically if the context used to create it is canceled - Close() error - - // Status retrieves the status of the bucket - Status() (Status, error) -} - -// Codec encodes/decodes values using Encoders and Decoders -type Codec interface { - Encoder - Decoder -} - -// Encoder encodes values before saving -type Encoder interface { - Encode(value []byte) ([]byte, error) -} - -// Decoder decodes values before saving -type Decoder interface { - Decode(value []byte) ([]byte, error) -} - -// Deprecated: this is now deprecated, please use the KV feature in nats.go -type Entry interface { - // Bucket is the bucket the data was loaded from - Bucket() string - // Key is the key that was retrieved - Key() string - // Value is the retrieved value - Value() []byte - // Created is the time the data was received in the bucket - Created() time.Time - // Sequence is a unique sequence for this value - Sequence() uint64 - // Delta is distance from the latest value. If history is enabled this is effectively the index of the historical value, 0 for latest, 1 for most recent etc. - Delta() uint64 - // Operation is the kind of operation this result represents - Operation() Operation -} - -// GenericEntry is a generic, non implementation specific, representation of a Entry -type GenericEntry struct { - Bucket string `json:"bucket"` - Key string `json:"key"` - Val []byte `json:"val"` - Created int64 `json:"created"` - Seq uint64 `json:"seq"` - Operation string `json:"operation"` -} - -// Watch observes a bucket and report any changes via NextValue or Channel -// -// Deprecated: this is now deprecated, please use the KV feature in nats.go -type Watch interface { - // Channel returns a channel to read changes from - Channel() chan Entry - - // Close must be called to dispose of resources, called if the context used to create the watch is canceled - Close() error -} - -type Status interface { - // Bucket the name of the bucket - Bucket() string - - // Values is how many messages are in the bucket, including historical values - Values() uint64 - - // History returns the configured history kept per key - History() int64 - - // TTL is how long the bucket keeps values for - TTL() time.Duration - - // BucketLocation returns the name of the cluster holding the read replica of the data - BucketLocation() string - - // Replicas returns how many times data in the bucket is replicated at storage - Replicas() (ok int, failed int) - - // Keys returns a list of all keys in the bucket - not possible now - Keys() ([]string, error) - - // BackingStore is a backend specific name for the underlying storage - eg. stream name - BackingStore() string - - // MirrorStatus is the status of a read replica, error when not accessing a replica - MirrorStatus() (lag int64, active bool, err error) - - // MaxBucketSize is the configured maximum size of the bucket in bytes - MaxBucketSize() int64 - - // MaxValueSize is the configured maximum size of a single value in bytes - MaxValueSize() int32 -} - -// IsReservedKey determines if key is a reserved key -func IsReservedKey(key string) bool { - return strings.HasPrefix(key, ReservedKeyPrefix) -} - -// IsValidKey determines if key is a valid key -func IsValidKey(key string) bool { - if strings.HasPrefix(key, ".") || strings.HasSuffix(key, ".") { - return false - } - - return validKeyRe.MatchString(key) -} - -// IsValidBucket determines if bucket is a valid bucket name -func IsValidBucket(bucket string) bool { - return validBucketRe.MatchString(bucket) -} diff --git a/kv/kv_test.go b/kv/kv_test.go deleted file mode 100644 index af50159e..00000000 --- a/kv/kv_test.go +++ /dev/null @@ -1,43 +0,0 @@ -package kv - -import ( - "testing" -) - -func TestIsReservedKey(t *testing.T) { - if !IsReservedKey("_kv_x") { - t.Fatalf("_kv_x was not a reserved key") - } - - if IsReservedKey("bob") { - t.Fatalf("bob was a reserved key") - } -} - -func TestIsValidKey(t *testing.T) { - for _, k := range []string{" x y", "x ", "x!", "xx$", "*", ">", "x.>", "x.*", ".", ".x", ".x.", "x."} { - if IsValidKey(k) { - t.Fatalf("%q was valid", k) - } - } - - for _, k := range []string{"foo", "_foo", "-foo", "_kv_foo", "foo123", "123", "a/b/c", "a.b.c"} { - if !IsValidKey(k) { - t.Fatalf("%q was invalid", k) - } - } -} - -func TestIsValidBucket(t *testing.T) { - for _, b := range []string{" B", "!", "x/y", "x>", "x.x", "x.*", "x.>", "x*", "*", ">"} { - if IsValidBucket(b) { - t.Fatalf("%q was valid", b) - } - } - - for _, b := range []string{"B", "b", "123", "1_2_3", "1-2-3"} { - if !IsValidKey(b) { - t.Fatalf("%q was invalid", b) - } - } -} diff --git a/kv/opts.go b/kv/opts.go deleted file mode 100644 index 59ec7fef..00000000 --- a/kv/opts.go +++ /dev/null @@ -1,238 +0,0 @@ -// Copyright 2021 The NATS Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package kv - -import ( - "fmt" - "strings" - "time" -) - -const ( - // DefaultTimeout is the default timeout used when waiting for the backend, override using WithTimeout() - DefaultTimeout = 2 * time.Second - - // DefaultHistory is how many historical values are kept per key - DefaultHistory uint64 = 1 - - // DefaultMaxBucketSize maximum size for the entire bucket, -1 for unlimited - DefaultMaxBucketSize int64 = -1 - - // DefaultMaxValueSize maximum size for individual values - DefaultMaxValueSize int32 = -1 -) - -type options struct { - history uint64 - replicas uint - maxBucketSize int64 - maxValueSize int32 - placementCluster string - mirrorBucket string - ttl time.Duration - localCache bool - enc Encoder - dec Decoder - log Logger - timeout time.Duration - overrideStreamName string - overrideSubjectPrefix string -} - -// Option configures the KV client -type Option func(o *options) error - -// PutOption is a option passed to put, reserved for future work like put only if last value had sequence x -type PutOption func(o *putOptions) - -type putOptions struct { - jsPreviousSeq uint64 -} - -func newOpts(opts ...Option) (*options, error) { - o := &options{ - replicas: 1, - history: DefaultHistory, - timeout: DefaultTimeout, - maxBucketSize: DefaultMaxBucketSize, - maxValueSize: DefaultMaxValueSize, - log: &stdLogger{}, - } - - for _, opt := range opts { - err := opt(o) - if err != nil { - return nil, err - } - } - - return o, nil -} - -func newPutOpts(opts ...PutOption) (*putOptions, error) { - o := &putOptions{} - - for _, opt := range opts { - opt(o) - } - - return o, nil -} - -// WithHistory sets the number of historic values to keep for a key -func WithHistory(h uint64) Option { - return func(o *options) error { - o.history = h - return nil - } -} - -// WithMaxValueSize is the biggest size value that can be placed in the bucket including some header overhead -func WithMaxValueSize(s int32) Option { - return func(o *options) error { - if s < -1 { - return fmt.Errorf("minimum value for WithMaxValueSize is -1") - } - - o.maxValueSize = s - return nil - } -} - -// WithMaxBucketSize limits the entire bucket to a specific size -func WithMaxBucketSize(s int64) Option { - return func(o *options) error { - if s < -1 { - return fmt.Errorf("minimum value for WithMaxBucketSize is -1") - } - - o.maxBucketSize = s - return nil - } -} - -// WithReplicas sets the number of replicas to keep for a bucket -func WithReplicas(r uint) Option { - return func(o *options) error { - o.replicas = r - return nil - } -} - -// WithPlacementCluster places the bucket in a specific cluster -func WithPlacementCluster(c string) Option { - return func(o *options) error { - o.placementCluster = c - return nil - } -} - -// WithMirroredBucket creates a read replica that mirrors a specified bucket -func WithMirroredBucket(b string) Option { - return func(o *options) error { - // TODO: validate - o.mirrorBucket = b - return nil - } -} - -// WithTTL sets the maximum time a value will be kept in the bucket -func WithTTL(ttl time.Duration) Option { - return func(o *options) error { - o.ttl = ttl - return nil - } -} - -// WithLocalCache creates a local in-memory cache of the entire bucket that's kept up to date in real time using a watch -func WithLocalCache() Option { - return func(o *options) error { - o.localCache = true - return nil - } -} - -// WithEncoder sets a value encoder, multiple encoders can be set and will be called in order, programs that just write values can use this to avoid the configuring decoders -func WithEncoder(e Encoder) Option { - return func(o *options) error { - o.enc = e - return nil - } -} - -// WithDecoder sets a value decoder, multiple decoders can be set and will be called in order, programs that just read values can use this to avoid the configuring encoders -func WithDecoder(d Decoder) Option { - return func(o *options) error { - o.dec = d - return nil - } -} - -// WithCodec sets a value encode/decoder, multiple codecs can be set and will be called in order, programs that read and write values can set this to do bi-directional encoding and decoding -func WithCodec(c Codec) Option { - return func(o *options) error { - o.enc = c.(Encoder) - o.dec = c.(Decoder) - return nil - } -} - -// WithLogger sets a logger to use, STDOUT logging otherwise -func WithLogger(log Logger) Option { - return func(o *options) error { - o.log = log - return nil - } -} - -// WithTimeout sets the timeout for calls to the storage layer -func WithTimeout(t time.Duration) Option { - return func(o *options) error { - o.timeout = t - return nil - } -} - -// WithStreamName overrides the usual stream name that is formed as KV_ -func WithStreamName(n string) Option { - return func(o *options) error { - if strings.Contains(n, ">") || strings.Contains(n, "*") || strings.Contains(n, ".") { - return fmt.Errorf("invalid stream name") - } - - o.overrideStreamName = n - return nil - } -} - -// WithStreamSubjectPrefix overrides the usual stream subject changing the `kv.*.*` to `.*.*` -func WithStreamSubjectPrefix(p string) Option { - return func(o *options) error { - if strings.Contains(p, ">") || strings.Contains(p, "*") { - return fmt.Errorf("invalid prefix") - } - - p = strings.TrimSuffix(p, ".") - - o.overrideSubjectPrefix = p - return nil - } -} - -// OnlyIfLastValueSequence the put will only succeed if the last set value for the key had this sequence -func OnlyIfLastValueSequence(seq uint64) PutOption { - return func(o *putOptions) { - o.jsPreviousSeq = seq - } -} diff --git a/kv/read_cache.go b/kv/read_cache.go deleted file mode 100644 index 96206a7f..00000000 --- a/kv/read_cache.go +++ /dev/null @@ -1,191 +0,0 @@ -// Copyright 2021 The NATS Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package kv - -import ( - "context" - "fmt" - "sync" -) - -type readCache struct { - backend Storage - cache map[string]Entry - watch Watch - ctx context.Context - cancel func() - ready bool - log Logger - mu sync.Mutex -} - -func newReadCache(b Storage, log Logger) (*readCache, error) { - if b == nil { - return nil, fmt.Errorf("storage is required") - } - - cache := &readCache{ - backend: b, - cache: map[string]Entry{}, - log: log, - } - - var err error - cache.ctx, cache.cancel = context.WithCancel(context.Background()) - - cache.watch, err = b.Watch(cache.ctx, ">") - if err != nil { - cache.Close() - return nil, err - } - - go cache.watcher() - - return cache, nil -} - -func (c *readCache) Bucket() string { return c.backend.Bucket() } -func (c *readCache) BucketSubject() string { return c.backend.BucketSubject() } -func (c *readCache) CreateBucket() error { return c.backend.CreateBucket() } -func (c *readCache) Status() (Status, error) { return c.backend.Status() } - -func (c *readCache) Watch(ctx context.Context, key string) (Watch, error) { - return c.backend.Watch(ctx, key) -} - -func (c *readCache) Put(key string, val []byte, opts ...PutOption) (uint64, error) { - // put on the backend can fail making this invalidate not needed but - // put can also succeed and then error due to network timeouts etc, so - // safest possible thing is to always invalidate on put regardless of outcome - c.mu.Lock() - delete(c.cache, key) - c.mu.Unlock() - - return c.backend.Put(key, val, opts...) -} - -func (c *readCache) Destroy() error { - c.mu.Lock() - c.cache = map[string]Entry{} - c.mu.Unlock() - - return c.backend.Destroy() -} - -func (c *readCache) Purge(key string) error { - c.mu.Lock() - delete(c.cache, key) // possible race here if a update is enroute from the watched bucket that, probably ok - c.mu.Unlock() - - return c.backend.Purge(key) -} - -func (c *readCache) Delete(key string) error { - c.mu.Lock() - delete(c.cache, key) // possible race here if a update is enroute from the watched bucket that, probably ok - c.mu.Unlock() - - return c.backend.Delete(key) -} - -func (c *readCache) History(ctx context.Context, key string) ([]Entry, error) { - return c.backend.History(ctx, key) -} - -func (c *readCache) Get(key string) (Entry, error) { - c.mu.Lock() - entry, ok := c.cache[key] - ready := c.ready - c.mu.Unlock() - - if ready && ok { - return entry, nil - } - - res, err := c.backend.Get(key) - if err != nil { - return nil, err - } - - c.mu.Lock() - c.cache[key] = res - c.mu.Unlock() - - return res, nil -} - -func (c *readCache) Close() error { - c.mu.Lock() - defer c.mu.Unlock() - - c.cache = nil - - // stop the watch - if c.cancel != nil { - c.cancel() - } - - if c.watch != nil { - c.watch.Close() - } - - return c.backend.Close() -} - -func (c *readCache) watcher() { - for { - select { - case result, ok := <-c.watch.Channel(): - if !ok { // channel is closed we should shut down - c.mu.Lock() - c.ready = false - c.mu.Unlock() - - return - } - - // we got a message so channel isn't closed but it's a nil, - // that's signaling that the stream is empty so we should go ready - if result == nil { - c.mu.Lock() - c.ready = true - c.mu.Unlock() - - } else { - c.mu.Lock() - - switch result.Operation() { - case DeleteOperation: - delete(c.cache, result.Key()) - case PutOperation: - c.cache[result.Key()] = result - } - - ready := c.ready - c.mu.Unlock() - - // once we have reached the end of the data stream we have all latest values - // signal ready ensuring cache reads only get latest data - if !ready && result.Delta() == 0 { - c.mu.Lock() - c.ready = true - c.mu.Unlock() - } - } - - case <-c.ctx.Done(): - return - } - } -} diff --git a/kv/read_cache_test.go b/kv/read_cache_test.go deleted file mode 100644 index 401dffc3..00000000 --- a/kv/read_cache_test.go +++ /dev/null @@ -1,231 +0,0 @@ -// Copyright 2021 The NATS Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package kv - -import ( - "testing" - "time" - - "github.com/nats-io/jsm.go" - natsd "github.com/nats-io/nats-server/v2/server" - "github.com/nats-io/nats.go" -) - -func setupBasicCachedTestBucket(t BOrT) (*readCache, *jetStreamStorage, *natsd.Server, *nats.Conn, *jsm.Manager) { - store, srv, nc, mgr := setupBasicTestBucket(t) - cache, err := newReadCache(store, store.log) - if err != nil { - t.Fatalf("cache failed: %s", err) - } - - return cache, store, srv, nc, mgr -} - -func TestReadCache_GetPut(t *testing.T) { - cache, store, srv, nc, _ := setupBasicCachedTestBucket(t) - defer srv.Shutdown() - defer nc.Close() - defer store.Close() - defer cache.Close() - - expectCache := func(t *testing.T, expect int) { - t.Helper() - cache.mu.Lock() - defer cache.mu.Unlock() - - if len(cache.cache) > expect { - t.Fatalf("cache has %d expected %d: %+v", len(cache.cache), expect, cache.cache) - } - } - - assertCacheReady := func(t *testing.T) { - t.Helper() - cache.mu.Lock() - defer cache.mu.Unlock() - - if !cache.ready { - t.Fatalf("cache is not ready") - } - } - _, err := cache.Get("missing") - if err == nil { - t.Fatalf("expected an error got none") - } - - expectCache(t, 0) - assertCacheReady(t) - - _, err = cache.Put("hello", []byte("world")) - if err != nil { - t.Fatalf("put failed: %s", err) - } - - // let the watch get from the consumer etc - time.Sleep(20 * time.Millisecond) - - expectCache(t, 1) - assertCacheReady(t) - - _, err = cache.Get("hello") - if err != nil { - t.Fatalf("get failed: %s", err) - } - - expectCache(t, 1) - - _, err = cache.Get("hello") - if err != nil { - t.Fatalf("get failed: %s", err) - } - - expectCache(t, 1) - - _, err = cache.Put("hello", []byte("wrld")) - if err != nil { - t.Fatalf("put failed: %s", err) - } - - // let the watch get from the consumer etc - time.Sleep(20 * time.Millisecond) - - expectCache(t, 1) - - val, err := cache.Get("hello") - if err != nil { - t.Fatalf("get failed: %s", err) - } - - assertEntryHasStringValue(t, val, "wrld") -} - -func TestReadCache_Delete(t *testing.T) { - cache, store, srv, nc, _ := setupBasicCachedTestBucket(t) - defer srv.Shutdown() - defer nc.Close() - defer store.Close() - defer cache.Close() - - expectCache := func(t *testing.T, expect int) { - t.Helper() - - cache.mu.Lock() - defer cache.mu.Unlock() - - if len(cache.cache) > expect { - t.Fatalf("cache has %d expected %d", len(cache.cache), expect) - } - } - - cache.Put("x", []byte("y")) - cache.Put("y", []byte("y")) - - time.Sleep(20 * time.Millisecond) - - expectCache(t, 2) - - cache.Delete("x") - expectCache(t, 1) - - _, err := cache.Get("x") - if err == nil { - t.Fatalf("get succeeded") - } - - _, err = cache.Get("y") - if err != nil { - t.Fatalf("'y' get failed: %s", err) - } - - // now do a delete from the backend directly and wait for watch to - // deliver the delete operation, the cache should then not have this value - err = cache.backend.Delete("y") - if err != nil { - t.Fatalf("delete failed: %s", err) - } - - time.Sleep(20 * time.Millisecond) - - expectCache(t, 0) -} - -func TestReadCache_Purge(t *testing.T) { - cache, store, srv, nc, _ := setupBasicCachedTestBucket(t) - defer srv.Shutdown() - defer nc.Close() - defer store.Close() - defer cache.Close() - - expectCache := func(t *testing.T, expect int) { - t.Helper() - cache.mu.Lock() - defer cache.mu.Unlock() - - if len(cache.cache) > expect { - t.Fatalf("cache has %d expected %d", len(cache.cache), expect) - } - } - - cache.Put("x", []byte("y")) - cache.Put("y", []byte("y")) - - time.Sleep(20 * time.Millisecond) - - expectCache(t, 2) - - cache.Purge("x") - expectCache(t, 1) - - _, err := cache.Get("x") - if err == nil { - t.Fatalf("get succeeded") - } - - _, err = cache.Get("y") - if err != nil { - t.Fatalf("get failed: %s", err) - } -} - -func TestReadCache_Destroy(t *testing.T) { - cache, store, srv, nc, _ := setupBasicCachedTestBucket(t) - defer srv.Shutdown() - defer nc.Close() - defer store.Close() - defer cache.Close() - - expectCache := func(t *testing.T, expect int) { - t.Helper() - cache.mu.Lock() - defer cache.mu.Unlock() - - if len(cache.cache) > expect { - t.Fatalf("cache has %d expected %d", len(cache.cache), expect) - } - } - - cache.Put("x", []byte("y")) - cache.Put("y", []byte("y")) - - time.Sleep(20 * time.Millisecond) - - expectCache(t, 2) - - cache.Destroy() - expectCache(t, 0) - - _, err := cache.Get("x") - if err == nil { - t.Fatalf("get succeeded") - } -} diff --git a/kv/stdout_logger.go b/kv/stdout_logger.go deleted file mode 100644 index 081d92c9..00000000 --- a/kv/stdout_logger.go +++ /dev/null @@ -1,41 +0,0 @@ -// Copyright 2021 The NATS Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package kv - -import ( - "log" -) - -// Logger is a custom logger -type Logger interface { - Debugf(format string, a ...interface{}) - Infof(format string, a ...interface{}) - Warnf(format string, a ...interface{}) - Errorf(format string, a ...interface{}) -} - -type stdLogger struct{} - -func (s *stdLogger) Debugf(format string, a ...interface{}) { - log.Printf(format, a...) -} -func (s *stdLogger) Infof(format string, a ...interface{}) { - log.Printf(format, a...) -} -func (s *stdLogger) Warnf(format string, a ...interface{}) { - log.Printf(format, a...) -} -func (s *stdLogger) Errorf(format string, a ...interface{}) { - log.Printf(format, a...) -}