Skip to content

Commit

Permalink
Consumer creation enhancements
Browse files Browse the repository at this point in the history
 * This uses the new named consumer approach to create all consumers
   rather than specific ephemeral create subject etc. Generates random
   consumer names when needed
 * Also uses the new create subject with filter subject included for
   single subject filtered consumers

Signed-off-by: R.I.Pienaar <[email protected]>
  • Loading branch information
ripienaar committed Jun 18, 2023
1 parent dfe6b4f commit 079296a
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 39 deletions.
1 change: 1 addition & 0 deletions api/consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

const (
JSApiConsumerCreateT = "$JS.API.CONSUMER.CREATE.%s"
JSApiConsumerCreateWithNameT = "$JS.API.CONSUMER.CREATE.%s.%s"
JSApiDurableCreateT = "$JS.API.CONSUMER.DURABLE.CREATE.%s.%s"
JSApiConsumerCreateExT = "$JS.API.CONSUMER.CREATE.%s.%s.%s"
JSApiConsumerNamesT = "$JS.API.CONSUMER.NAMES.%s"
Expand Down
66 changes: 49 additions & 17 deletions consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package jsm

import (
"context"
"crypto/sha256"
"encoding/json"
"fmt"
"log"
Expand All @@ -25,6 +26,7 @@ import (

"github.com/nats-io/jsm.go/api"
"github.com/nats-io/nats.go"
"github.com/nats-io/nuid"
)

// DefaultConsumer is the configuration that will be used to create new Consumers in NewConsumer
Expand Down Expand Up @@ -79,14 +81,7 @@ func (m *Manager) NewConsumerFromDefault(stream string, dflt api.ConsumerConfig,
Config: *cfg,
}

var createdInfo *api.ConsumerInfo

switch req.Config.Durable {
case "":
createdInfo, err = m.createEphemeralConsumer(req)
default:
createdInfo, err = m.createDurableConsumer(req)
}
createdInfo, err := m.createConsumer(req)
if err != nil {
return nil, err
}
Expand All @@ -101,19 +96,21 @@ func (m *Manager) NewConsumerFromDefault(stream string, dflt api.ConsumerConfig,
return c, nil
}

func (m *Manager) createDurableConsumer(req api.JSApiConsumerCreateRequest) (info *api.ConsumerInfo, err error) {
func (m *Manager) createConsumer(req api.JSApiConsumerCreateRequest) (info *api.ConsumerInfo, err error) {
var resp api.JSApiConsumerCreateResponse
err = m.jsonRequest(fmt.Sprintf(api.JSApiDurableCreateT, req.Stream, req.Config.Durable), req, &resp)
if err != nil {
return nil, err

if req.Config.Name == "" {
return nil, fmt.Errorf("consumer conmfiguration requires a name")
}

return resp.ConsumerInfo, nil
}
var subj string
if req.Config.FilterSubject == "" {
subj = fmt.Sprintf(api.JSApiConsumerCreateWithNameT, req.Stream, req.Config.Name)
} else {
subj = fmt.Sprintf(api.JSApiConsumerCreateExT, req.Stream, req.Config.Name, req.Config.FilterSubject)
}

func (m *Manager) createEphemeralConsumer(req api.JSApiConsumerCreateRequest) (info *api.ConsumerInfo, err error) {
var resp api.JSApiConsumerCreateResponse
err = m.jsonRequest(fmt.Sprintf(api.JSApiConsumerCreateT, req.Stream), req, &resp)
err = m.jsonRequest(subj, req, &resp)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -197,9 +194,31 @@ func NewConsumerConfiguration(dflt api.ConsumerConfig, opts ...ConsumerOption) (
}
}

if cfg.Durable != "" {
cfg.Name = cfg.Durable
}

if cfg.Name == "" {
cfg.Name = generateConsName()
}

return &cfg, nil
}

const rdigits = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
const base = 62

func generateConsName() string {
name := nuid.Next()
sha := sha256.New()
sha.Write([]byte(name))
b := sha.Sum(nil)
for i := 0; i < 8; i++ {
b[i] = rdigits[int(b[i]%base)]
}
return string(b[:8])
}

func (m *Manager) loadConfigForConsumer(consumer *Consumer) (err error) {
info, err := m.loadConsumerInfo(consumer.stream, consumer.name)
if err != nil {
Expand Down Expand Up @@ -240,6 +259,19 @@ func DeliverySubject(s string) ConsumerOption {
}
}

// ConsumerName sets a name for the consumer, when creating a durable consumer use DurableName, using ConsumerName allows
// for creating named ephemeral consumers, else a random name will be generated
func ConsumerName(s string) ConsumerOption {
return func(o *api.ConsumerConfig) error {
if !IsValidName(s) {
return fmt.Errorf("%q is not a valid consumer name", s)
}

o.Name = s
return nil
}
}

// DurableName is the name given to the consumer, when not set an ephemeral consumer is created
func DurableName(s string) ConsumerOption {
return func(o *api.ConsumerConfig) error {
Expand Down
42 changes: 42 additions & 0 deletions consumers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@ func TestNewConsumer(t *testing.T) {
consumer, err := stream.NewConsumer(jsm.DurableName("NEW"), jsm.FilterStreamBySubject("ORDERS.new"))
checkErr(t, err, "create failed")

if consumer.Configuration().Name != "NEW" {
t.Fatalf("consumer name was not set")
}

consumer.Reset()
if consumer.AckPolicy() != api.AckExplicit {
t.Fatalf("expected explicit ack got %s", consumer.AckPolicy())
Expand Down Expand Up @@ -202,6 +206,44 @@ func TestNewConsumerFromDefaultEphemeral(t *testing.T) {
consumer, err := stream.NewConsumerFromDefault(jsm.SampledDefaultConsumer, jsm.DeliverySubject("out"), jsm.FilterStreamBySubject("ORDERS.new"))
checkErr(t, err, "create failed")

if consumer.Configuration().Name != consumer.Name() {
t.Fatalf("consumer name wqs not set")
}

consumers, err := mgr.ConsumerNames("ORDERS")
checkErr(t, err, "consumer list failed")
if len(consumers) != 1 {
t.Fatalf("expected 1 consumer got %v", consumers)
}

if consumer.Name() != consumers[0] {
t.Fatalf("incorrect consumer name '%s' expected '%s'", consumer.Name(), consumers[0])
}

if consumer.IsDurable() {
t.Fatalf("expected ephemeral consumer got durable")
}
}

func TestNewConsumerFromDefaultNamedEphemeral(t *testing.T) {
srv, nc, stream, mgr := setupConsumerTest(t)
defer srv.Shutdown()
defer nc.Flush()

// interest is needed
nc.Subscribe("out", func(_ *nats.Msg) {})

consumer, err := stream.NewConsumerFromDefault(jsm.SampledDefaultConsumer, jsm.ConsumerName("EPHEMERAL"), jsm.DeliverySubject("out"), jsm.FilterStreamBySubject("ORDERS.new"))
checkErr(t, err, "create failed")

if consumer.Configuration().Name != consumer.Name() {
t.Fatalf("consumer name was not set")
}

if consumer.Name() != "EPHEMERAL" {
t.Fatalf("consumer ephemeral name was not set")
}

consumers, err := mgr.ConsumerNames("ORDERS")
checkErr(t, err, "consumer list failed")
if len(consumers) != 1 {
Expand Down
16 changes: 8 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ go 1.19
require (
github.com/dustin/go-humanize v1.0.1
github.com/google/go-cmp v0.5.9
github.com/klauspost/compress v1.16.5
github.com/nats-io/nats-server/v2 v2.9.18-0.20230608221425-40619659d53b
github.com/nats-io/nats.go v1.26.0
golang.org/x/net v0.10.0
golang.org/x/text v0.9.0
github.com/klauspost/compress v1.16.6
github.com/nats-io/nats-server/v2 v2.9.19-0.20230616205649-42827596d857
github.com/nats-io/nats.go v1.27.0
github.com/nats-io/nuid v1.0.1
golang.org/x/net v0.11.0
golang.org/x/text v0.10.0
gopkg.in/yaml.v3 v3.0.1
)

Expand All @@ -19,9 +20,8 @@ require (
github.com/minio/highwayhash v1.0.2 // indirect
github.com/nats-io/jwt/v2 v2.4.1 // indirect
github.com/nats-io/nkeys v0.4.4 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
golang.org/x/crypto v0.9.0 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/crypto v0.10.0 // indirect
golang.org/x/sys v0.9.0 // indirect
golang.org/x/time v0.3.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
Expand Down
28 changes: 14 additions & 14 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/klauspost/compress v1.16.5 h1:IFV2oUNUzZaz+XyusxpLzpzS8Pt5rh0Z16For/djlyI=
github.com/klauspost/compress v1.16.5/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/compress v1.16.6 h1:91SKEy4K37vkp255cJ8QesJhjyRO0hn9i9G0GoUwLsk=
github.com/klauspost/compress v1.16.6/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
Expand All @@ -17,23 +17,23 @@ github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.4.1 h1:Y35W1dgbbz2SQUYDPCaclXcuqleVmpbRa7646Jf2EX4=
github.com/nats-io/jwt/v2 v2.4.1/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI=
github.com/nats-io/nats-server/v2 v2.9.18-0.20230608221425-40619659d53b h1:Vo1qKhQ342Gscs4DwJRVB7Lzbyc+5A6S3KGP9LD6JUI=
github.com/nats-io/nats-server/v2 v2.9.18-0.20230608221425-40619659d53b/go.mod h1:Jn7aad/q8GisP+CkkN+/G3V3aZHAiytJSvmaolOhIYk=
github.com/nats-io/nats.go v1.26.0 h1:fWJTYPnZ8DzxIaqIHOAMfColuznchnd5Ab5dbJpgPIE=
github.com/nats-io/nats.go v1.26.0/go.mod h1:XpbWUlOElGwTYbMR7imivs7jJj9GtK7ypv321Wp6pjc=
github.com/nats-io/nats-server/v2 v2.9.19-0.20230616205649-42827596d857 h1:o9HttQWyHP/Avlwna9Vvha1nL0TJPrvrokCUbbQ4XsQ=
github.com/nats-io/nats-server/v2 v2.9.19-0.20230616205649-42827596d857/go.mod h1:aTb/xtLCGKhfTFLxP591CMWfkdgBmcUUSkiSOe5A3gw=
github.com/nats-io/nats.go v1.27.0 h1:3o9fsPhmoKm+yK7rekH2GtWoE+D9jFbw8N3/ayI1C00=
github.com/nats-io/nats.go v1.27.0/go.mod h1:XpbWUlOElGwTYbMR7imivs7jJj9GtK7ypv321Wp6pjc=
github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA=
github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
golang.org/x/crypto v0.9.0 h1:LF6fAI+IutBocDJ2OT0Q1g8plpYljMZ4+lty+dsqw3g=
golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0=
golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/crypto v0.10.0 h1:LKqV2xt9+kDzSTfOhx4FrkEBcMrAgHSYgzywV9zcGmM=
golang.org/x/crypto v0.10.0/go.mod h1:o4eNf7Ede1fv+hwOwZsTHl9EsPFO6q6ZvYR8vYfY45I=
golang.org/x/net v0.11.0 h1:Gi2tvZIJyBtO9SDr1q9h5hEQCp/4L2RQ+ar0qjx2oNU=
golang.org/x/net v0.11.0/go.mod h1:2L/ixqYpgIVXmeoSA/4Lu7BzTG4KIyPIryS4IsOd1oQ=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/sys v0.9.0 h1:KS/R3tvhPqvJvwcKfnBHJwwthS11LRhmM5D59eEXa0s=
golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.10.0 h1:UpjohKhiEgNc0CSauXmwYftY1+LlaC75SJwh0SgCX58=
golang.org/x/text v0.10.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down

0 comments on commit 079296a

Please sign in to comment.