Skip to content

Commit

Permalink
update for new new ack format
Browse files Browse the repository at this point in the history
Signed-off-by: R.I.Pienaar <[email protected]>
  • Loading branch information
ripienaar committed Aug 26, 2021
1 parent 1c7dccd commit 71e429b
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 34 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/dustin/go-humanize v1.0.0
github.com/google/go-cmp v0.5.5
github.com/klauspost/compress v1.13.4
github.com/nats-io/nats-server/v2 v2.3.5-0.20210824012648-9f3dfc0ba141
github.com/nats-io/nats.go v1.11.1-0.20210819195927-9053aa4200f0
github.com/nats-io/nats-server/v2 v2.3.5-0.20210825221009-41a253dabb43
github.com/nats-io/nats.go v1.12.0
github.com/nats-io/nuid v1.0.1
)
7 changes: 4 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU=
github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q=
github.com/nats-io/jwt/v2 v2.0.3 h1:i/O6cmIsjpcQyWDYNcq2JyZ3/VTF8SJ4JWluI5OhpvI=
github.com/nats-io/jwt/v2 v2.0.3/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY=
github.com/nats-io/nats-server/v2 v2.3.5-0.20210824012648-9f3dfc0ba141 h1:RkRo/h84AkVXGgFWRcWiLiYbenEgPAAyoP3AE8qvYXQ=
github.com/nats-io/nats-server/v2 v2.3.5-0.20210824012648-9f3dfc0ba141/go.mod h1:jgHRB+EfZisUr6j50/g7Gcah7AR8qtk3as42DJmESCk=
github.com/nats-io/nats.go v1.11.1-0.20210819195927-9053aa4200f0 h1:lffRgFiHXqxwf8lYNSXXeOZdOgAIOabGwOSwdttqCn0=
github.com/nats-io/nats-server/v2 v2.3.5-0.20210825221009-41a253dabb43 h1:Sbb4QxNsccsPERg0C7uQX7/xgOCOTMIvDH9Ytb5MXsU=
github.com/nats-io/nats-server/v2 v2.3.5-0.20210825221009-41a253dabb43/go.mod h1:jgHRB+EfZisUr6j50/g7Gcah7AR8qtk3as42DJmESCk=
github.com/nats-io/nats.go v1.11.1-0.20210819195927-9053aa4200f0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nats.go v1.12.0 h1:n0oZzK2aIZDMKuEiMKJ9qkCUgVY5vTAAksSXtLlz5Xc=
github.com/nats-io/nats.go v1.12.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
Expand Down
28 changes: 12 additions & 16 deletions msginfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ type MsgInfo struct {
pending uint64
ts time.Time
domain string
account string
}

// Stream is the stream this message is stored in
Expand Down Expand Up @@ -71,11 +70,6 @@ func (i *MsgInfo) Domain() string {
return i.domain
}

// Account is the account where the message came from, can be empty
func (i *MsgInfo) Account() string {
return i.account
}

// Pending is the number of messages left to consume, -1 when the number is not reported
func (i *MsgInfo) Pending() uint64 {
return i.pending
Expand All @@ -90,17 +84,23 @@ func ParseJSMsgMetadataReply(reply string) (info *MsgInfo, err error) {
}

parts := strings.Split(reply, ".")
c := len(parts)

if c < 9 || (c > 9 && c < 11) {
return nil, fmt.Errorf("message metadata does not appear to be an ACK")
}

if parts[0] != "$JS" || parts[1] != "ACK" {
return nil, fmt.Errorf("message metadata does not appear to be an ACK")
}

// $JS.ACK.<account hash>.<stream>.<consumer>...
// $JS.ACK.<domain>.<account hash>.<stream>.<consumer>...
// $JS.ACK.<domain>.<account hash>.<stream>.<consumer>...<random>
// $JS.ACK.<stream>.<consumer>...

c := len(parts)
offset := c - 9
offset := 0
if c == 12 {
offset = 2
}

stream := parts[2+offset]
consumer := parts[3+offset]
Expand All @@ -113,15 +113,11 @@ func ParseJSMsgMetadataReply(reply string) (info *MsgInfo, err error) {
pending, _ = strconv.ParseUint(parts[8+offset], 10, 64)

domain := _EMPTY_
account := _EMPTY_
if c == 11 {
if c == 12 {
domain = parts[2]
account = parts[3]
} else if c == 10 {
account = parts[2]
}

return &MsgInfo{stream, consumer, streamSeq, consumerSeq, delivered, pending, ts, domain, account}, nil
return &MsgInfo{stream, consumer, streamSeq, consumerSeq, delivered, pending, ts, domain}, nil
}

// ParseJSMsgMetadata parse the reply subject metadata to determine message metadata
Expand Down
20 changes: 7 additions & 13 deletions msginfo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package jsm_test

import (
"fmt"
"testing"
"time"

Expand All @@ -24,19 +25,17 @@ import (

func TestParseJSMsgMetadata_New(t *testing.T) {
cases := []struct {
meta string
pending uint64
hasAccount bool
hasDomain bool
meta string
pending uint64
hasDomain bool
}{
{"$JS.ACK.ORDERS.NEW.1.2.3.1587466354254920000.10", 10, false, false},
{"$JS.ACK.ACCOUNT.ORDERS.NEW.1.2.3.1587466354254920000.10", 10, true, false},
{"$JS.ACK.DOMAIN.ACCOUNT.ORDERS.NEW.1.2.3.1587466354254920000.10", 10, true, true},
{"$JS.ACK.ORDERS.NEW.1.2.3.1587466354254920000.10", 10, false},
{"$JS.ACK.DOMAIN.ACCOUNT.ORDERS.NEW.1.2.3.1587466354254920000.10.random", 10, true},
}

for _, tc := range cases {
i, err := jsm.ParseJSMsgMetadata(&nats.Msg{Reply: tc.meta})
checkErr(t, err, "msg parse failed")
checkErr(t, err, fmt.Sprintf("msg parse failed for '%s'", tc.meta))

if i.Stream() != "ORDERS" {
t.Fatalf("expected ORDERS got %s", i.Stream())
Expand Down Expand Up @@ -70,10 +69,5 @@ func TestParseJSMsgMetadata_New(t *testing.T) {
if tc.hasDomain && i.Domain() != "DOMAIN" {
t.Fatalf("expected DOMAIN got %q", i.Domain())
}

if tc.hasAccount && i.Account() != "ACCOUNT" {
t.Fatalf("expected ACCOUNT got %q", i.Account())
}

}
}

0 comments on commit 71e429b

Please sign in to comment.