Skip to content

Commit

Permalink
support paging WQs that has direct access enabled
Browse files Browse the repository at this point in the history
Signed-off-by: R.I.Pienaar <[email protected]>
  • Loading branch information
ripienaar committed Jun 11, 2023
1 parent ef62e6a commit b1ab825
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 24 deletions.
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/dustin/go-humanize v1.0.1
github.com/google/go-cmp v0.5.9
github.com/klauspost/compress v1.16.5
github.com/nats-io/nats-server/v2 v2.9.17-0.20230602055328-c24547eb4ed6
github.com/nats-io/nats-server/v2 v2.9.18-0.20230608221425-40619659d53b
github.com/nats-io/nats.go v1.26.0
golang.org/x/net v0.10.0
golang.org/x/text v0.9.0
Expand All @@ -20,7 +20,6 @@ require (
github.com/nats-io/jwt/v2 v2.4.1 // indirect
github.com/nats-io/nkeys v0.4.4 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
go.uber.org/automaxprocs v1.5.1 // indirect
golang.org/x/crypto v0.9.0 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/time v0.3.0 // indirect
Expand Down
8 changes: 2 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,14 @@ github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.4.1 h1:Y35W1dgbbz2SQUYDPCaclXcuqleVmpbRa7646Jf2EX4=
github.com/nats-io/jwt/v2 v2.4.1/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI=
github.com/nats-io/nats-server/v2 v2.9.17-0.20230525004237-24d4bd603926 h1:FuenfU/cdnR3VowYOJNtlivDlsOv6pHlg4so26wGQD8=
github.com/nats-io/nats-server/v2 v2.9.17-0.20230525004237-24d4bd603926/go.mod h1:eQysm3xDZmIjfkjr7DuD9DjRFpnxQc2vKVxtEg0Dp6s=
github.com/nats-io/nats-server/v2 v2.9.17-0.20230602055328-c24547eb4ed6 h1:WZ0HfwvauMlnIw4zPckRTT/vckDDQUXoZlHW2mE1pIY=
github.com/nats-io/nats-server/v2 v2.9.17-0.20230602055328-c24547eb4ed6/go.mod h1:eQysm3xDZmIjfkjr7DuD9DjRFpnxQc2vKVxtEg0Dp6s=
github.com/nats-io/nats-server/v2 v2.9.18-0.20230608221425-40619659d53b h1:Vo1qKhQ342Gscs4DwJRVB7Lzbyc+5A6S3KGP9LD6JUI=
github.com/nats-io/nats-server/v2 v2.9.18-0.20230608221425-40619659d53b/go.mod h1:Jn7aad/q8GisP+CkkN+/G3V3aZHAiytJSvmaolOhIYk=
github.com/nats-io/nats.go v1.26.0 h1:fWJTYPnZ8DzxIaqIHOAMfColuznchnd5Ab5dbJpgPIE=
github.com/nats-io/nats.go v1.26.0/go.mod h1:XpbWUlOElGwTYbMR7imivs7jJj9GtK7ypv321Wp6pjc=
github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA=
github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
go.uber.org/automaxprocs v1.5.1 h1:e1YG66Lrk73dn4qhg8WFSvhF0JuFQF0ERIp4rpuV8Qk=
go.uber.org/automaxprocs v1.5.1/go.mod h1:BF4eumQw0P9GtnuxxovUd06vwm1o18oMzFtK66vU6XU=
golang.org/x/crypto v0.9.0 h1:LF6fAI+IutBocDJ2OT0Q1g8plpYljMZ4+lty+dsqw3g=
golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0=
golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M=
Expand Down
92 changes: 80 additions & 12 deletions stream_pager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type StreamPager struct {
consumer *Consumer

q chan *nats.Msg
stream string
stream *Stream
startSeq int
startDelta time.Duration
pageSize int
Expand All @@ -28,6 +28,9 @@ type StreamPager struct {
timeout time.Duration
seen int

useDirect bool
curSeq uint64 // for direct where to get

mu sync.Mutex
}

Expand Down Expand Up @@ -69,7 +72,7 @@ func PagerTimeout(d time.Duration) PagerOption {
}
}

func (p *StreamPager) start(stream string, mgr *Manager, opts ...PagerOption) error {
func (p *StreamPager) start(stream *Stream, mgr *Manager, opts ...PagerOption) error {
p.mu.Lock()
defer p.mu.Unlock()

Expand All @@ -96,6 +99,14 @@ func (p *StreamPager) start(stream string, mgr *Manager, opts ...PagerOption) er
}

var err error

if p.stream.Retention() == api.WorkQueuePolicy && !p.stream.DirectAllowed() {
return fmt.Errorf("work queue retention streams can only be paged if direct access is allowed")
}

// for now only on WQ because its slow, until there is a batch mode direct request
p.useDirect = p.stream.Retention() == api.WorkQueuePolicy && p.stream.DirectAllowed()

p.q = make(chan *nats.Msg, p.pageSize)
p.sub, err = mgr.nc.ChanSubscribe(mgr.nc.NewRespInbox(), p.q)
if err != nil {
Expand All @@ -114,6 +125,41 @@ func (p *StreamPager) start(stream string, mgr *Manager, opts ...PagerOption) er
return nil
}

func (p *StreamPager) directGetBatch() error {
for i := 1; i <= p.pageSize; i++ {
req := api.JSApiMsgGetRequest{Seq: p.curSeq, NextFor: p.filterSubject}

rj, err := json.Marshal(req)
if err != nil {
return err
}

err = p.mgr.nc.PublishRequest(p.consumer.NextSubject(), p.sub.Subject, rj)
if err != nil {
return err
}

p.curSeq++
}

return nil
}

func (p *StreamPager) fetchBatch() error {
req := api.JSApiConsumerGetNextRequest{Batch: p.pageSize, NoWait: true}
rj, err := json.Marshal(req)
if err != nil {
return err
}

err = p.mgr.nc.PublishRequest(p.consumer.NextSubject(), p.sub.Subject, rj)
if err != nil {
return err
}

return nil
}

// NextMsg retrieves the next message from the pager interrupted by ctx.
//
// last indicates if the message is the last in the current page, the next call
Expand All @@ -130,13 +176,11 @@ func (p *StreamPager) NextMsg(ctx context.Context) (msg *nats.Msg, last bool, er
if p.seen == p.pageSize || p.seen == -1 {
p.seen = 0

req := api.JSApiConsumerGetNextRequest{Batch: p.pageSize, NoWait: true}
rj, err := json.Marshal(req)
if err != nil {
return nil, false, err
if p.useDirect {
err = p.directGetBatch()
} else {
err = p.fetchBatch()
}

err = p.mgr.nc.PublishRequest(p.consumer.NextSubject(), p.sub.Subject, rj)
if err != nil {
return nil, false, err
}
Expand All @@ -154,7 +198,17 @@ func (p *StreamPager) NextMsg(ctx context.Context) (msg *nats.Msg, last bool, er
return nil, true, fmt.Errorf("last message reached")
}

msg.Ack()
if p.useDirect {
nfo, err := ParseJSMsgMetadata(msg)
if err != nil {
p.curSeq = nfo.StreamSequence()
}
if nfo.Pending() == 0 {
return msg, true, fmt.Errorf("last message reached")
}
} else {
msg.Ack()
}

return msg, p.seen == p.pageSize, nil

Expand All @@ -168,7 +222,7 @@ func (p *StreamPager) createConsumer() error {
ConsumerDescription("JSM Stream Pager"),
InactiveThreshold(time.Hour),
DurableName(fmt.Sprintf("stream_pager_%d%d", os.Getpid(), time.Now().UnixNano())),
ConsumerOverrideReplicas(1),
ConsumerOverrideMemoryStorage(),
}

switch {
Expand All @@ -187,9 +241,23 @@ func (p *StreamPager) createConsumer() error {
}

var err error
p.consumer, err = p.mgr.NewConsumer(p.stream, cops...)
p.consumer, err = p.mgr.NewConsumer(p.stream.Name(), cops...)
if err != nil {
return err
}

if p.useDirect {
nfo, err := p.consumer.LatestState()
if err != nil {
return err
}

return err
// record the initial seq based on consumer config like time deltas etc
// direct will start from there following subject filter
p.curSeq = nfo.Delivered.Stream
}

return nil
}

func (p *StreamPager) close() error {
Expand Down
6 changes: 3 additions & 3 deletions streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,12 +454,12 @@ func Compression(alg api.Compression) StreamOption {
// PageContents creates a StreamPager used to traverse the contents of the stream,
// Close() should be called to dispose of the background consumer and resources
func (s *Stream) PageContents(opts ...PagerOption) (*StreamPager, error) {
if s.Retention() == api.WorkQueuePolicy {
return nil, fmt.Errorf("work queue retention streams can not be paged")
if s.Retention() == api.WorkQueuePolicy && !s.DeleteAllowed() {
return nil, fmt.Errorf("work queue retention streams can only be paged if direct access is allowed")
}

pgr := &StreamPager{}
err := pgr.start(s.Name(), s.mgr, opts...)
err := pgr.start(s, s.mgr, opts...)
if err != nil {
return nil, err
}
Expand Down
11 changes: 10 additions & 1 deletion streams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -890,12 +890,21 @@ func TestStreamPageContents(t *testing.T) {
checkErr(t, err, "create failed")

pager, err := s.PageContents()
if err.Error() != "work queue retention streams can not be paged" {
if err.Error() != "work queue retention streams can only be paged if direct access is allowed" {
t.Fatalf("Expected an error, got: %v", err)
}
if pager != nil {
t.Fatalf("expected a nil pager")
}

err = s.UpdateConfiguration(s.Configuration(), jsm.AllowDirect())
if err != nil {
t.Fatalf("update failed: %v", err)
}
_, err = s.PageContents()
if err != nil {
t.Fatalf("paging direct enabled WQ failed: %v", err)
}
}

func TestStreamSealed(t *testing.T) {
Expand Down

0 comments on commit b1ab825

Please sign in to comment.