Skip to content

Commit

Permalink
speed up stream pager by using new next msg request
Browse files Browse the repository at this point in the history
This uses the new next message structure and the 404 behaviour
to detect end of stream without relying on any timeouts

Signed-off-by: R.I.Pienaar <[email protected]>
  • Loading branch information
ripienaar committed Oct 29, 2020
1 parent 0158bc6 commit ec2bc5f
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 6 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.14

require (
github.com/dustin/go-humanize v1.0.0
github.com/nats-io/nats-server/v2 v2.1.8-0.20201028153921-3e5d484fc8ae
github.com/nats-io/nats-server/v2 v2.1.8-0.20201029035148-5adce5c01c15
github.com/nats-io/nats.go v1.10.1-0.20201028154001-fbdabc0ebcfc
google.golang.org/protobuf v1.24.0 // indirect
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ github.com/nats-io/nats-server/v2 v2.1.8-0.20200601203034-f8d6dd992b71/go.mod h1
github.com/nats-io/nats-server/v2 v2.1.8-0.20200929001935-7f44d075f7ad/go.mod h1:TkHpUIDETmTI7mrHN40D1pzxfzHZuGmtMbtb83TGVQw=
github.com/nats-io/nats-server/v2 v2.1.8-0.20201028143001-778aa5e6bbd1 h1:zE8nYJ0CCvbwauZYZZ+Dd5WUH+wqxh7k0QDqDqTvDBc=
github.com/nats-io/nats-server/v2 v2.1.8-0.20201028143001-778aa5e6bbd1/go.mod h1:XD0zHR/jTXdZvWaQfS5mQgsXj6x12kMjKLyAk/cOGgY=
github.com/nats-io/nats-server/v2 v2.1.8-0.20201028153921-3e5d484fc8ae h1:G8bmtuBEUIqBm7S0BTCLD3FaepWw9tk1XDYPKiCvAIA=
github.com/nats-io/nats-server/v2 v2.1.8-0.20201028153921-3e5d484fc8ae/go.mod h1:XD0zHR/jTXdZvWaQfS5mQgsXj6x12kMjKLyAk/cOGgY=
github.com/nats-io/nats-server/v2 v2.1.8-0.20201029035148-5adce5c01c15 h1:StZ9VWtC5iYdC/rzsji3rkzx3XDiAXYq0rAOXanx7eU=
github.com/nats-io/nats-server/v2 v2.1.8-0.20201029035148-5adce5c01c15/go.mod h1:XD0zHR/jTXdZvWaQfS5mQgsXj6x12kMjKLyAk/cOGgY=
github.com/nats-io/nats.go v1.10.0 h1:L8qnKaofSfNFbXg0C5F71LdjPRnmQwSsA4ukmkt1TvY=
github.com/nats-io/nats.go v1.10.0/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE=
github.com/nats-io/nats.go v1.10.1-0.20200531124210-96f2130e4d55 h1:AaYp5amXO8fkWw6ZUEjGFiWdj8FfjOGDquae7Ne5JOU=
Expand Down
18 changes: 16 additions & 2 deletions stream_pager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package jsm

import (
"context"
"encoding/json"
"fmt"
"os"
"strconv"
"sync"
"time"

"github.com/nats-io/nats.go"

"github.com/nats-io/jsm.go/api"
)

type StreamPager struct {
Expand Down Expand Up @@ -119,7 +121,14 @@ func (p *StreamPager) NextMsg(ctx context.Context) (msg *nats.Msg, last bool, er

if p.seen == p.pageSize || p.seen == -1 {
p.seen = 0
err := p.mgr.nc.PublishRequest(p.consumer.NextSubject(), p.sub.Subject, []byte(strconv.Itoa(p.pageSize)))

req := api.JSApiConsumerGetNextRequest{Batch: p.pageSize, NoWait: true}
rj, err := json.Marshal(req)
if err != nil {
return nil, false, err
}

err = p.mgr.nc.PublishRequest(p.consumer.NextSubject(), p.sub.Subject, rj)
if err != nil {
return nil, false, err
}
Expand All @@ -132,7 +141,12 @@ func (p *StreamPager) NextMsg(ctx context.Context) (msg *nats.Msg, last bool, er
case msg := <-p.q:
p.seen++

if msg.Header.Get("Status") == "404" {
return nil, true, fmt.Errorf("last message reached")
}

return msg, p.seen == p.pageSize, nil

case <-timeout.Done():
return nil, true, fmt.Errorf("timeout waiting for new messages")
}
Expand Down
2 changes: 1 addition & 1 deletion stream_pager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestPager(t *testing.T) {
}
}

pgr, err := str.PageContents(jsm.PagerSize(25), jsm.PagerTimeout(250*time.Millisecond))
pgr, err := str.PageContents(jsm.PagerSize(25))
if err != nil {
t.Fatalf("pager creation failed: %s", err)
}
Expand Down

0 comments on commit ec2bc5f

Please sign in to comment.