Skip to content

Commit

Permalink
Keep event history in circular buffer
Browse files Browse the repository at this point in the history
Storing event history in a circular buffer avoids memory allocation after the storage limit is reached.

- Minor fixes to storage history logic
- Order message type switch to check for most common messages first
- Minor code improvements
  • Loading branch information
gammazero committed Apr 25, 2024
1 parent dd076ea commit 0e186cf
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 85 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
module github.com/gammazero/nexus/v3

go 1.20
go 1.21

require (
github.com/gammazero/deque v0.2.1
github.com/gorilla/websocket v1.5.0
github.com/stretchr/testify v1.8.4
github.com/ugorji/go/codec v1.2.11
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/gammazero/deque v0.2.1 h1:qSdsbG6pgp6nL7A0+K/B7s12mcCY/5l5SIUpMOl+dC0=
github.com/gammazero/deque v0.2.1/go.mod h1:LFroj8x4cMYCukHJDbxFCkT+r9AndaJnFMuZDV34tuU=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand All @@ -18,5 +21,6 @@ golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k=
golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
67 changes: 31 additions & 36 deletions router/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"time"

"github.com/gammazero/deque"
"github.com/gammazero/nexus/v3/stdlog"
"github.com/gammazero/nexus/v3/wamp"
)
Expand Down Expand Up @@ -52,10 +53,13 @@ type historyEntry struct {
}

type historyStore struct {
entries []historyEntry
matchPolicy string
limit int
isLimitReached bool
entries deque.Deque[historyEntry]
matchPolicy string
limit int
}

func (h *historyStore) atLimit() bool {
return h.entries.Len() >= h.limit
}

type broker struct {
Expand Down Expand Up @@ -149,10 +153,8 @@ func (b *broker) PreInitEventHistoryTopics(evntCfgs []*TopicEventHistoryConfig)
sub, _ := b.syncInitSubscription(topicCfg.Topic, topicCfg.MatchPolicy, nil)

b.eventHistoryStore[sub] = &historyStore{
entries: []historyEntry{},
matchPolicy: topicCfg.MatchPolicy,
limit: topicCfg.Limit,
isLimitReached: false,
matchPolicy: topicCfg.MatchPolicy,
limit: topicCfg.Limit,
}

}
Expand Down Expand Up @@ -383,12 +385,8 @@ func newSubscription(id wamp.ID, subscriber *wamp.Session, topic wamp.URI, match
}

func (b *broker) syncSaveEvent(eventStore *historyStore, pub *wamp.Publish, event *wamp.Event) {

if eventStore.isLimitReached {
eventStore.entries = eventStore.entries[1:]
} else if len(eventStore.entries) >= eventStore.limit {
eventStore.isLimitReached = true
eventStore.entries = eventStore.entries[1:]
if eventStore.atLimit() {
eventStore.entries.PopFront()
}

item := historyEntry{
Expand All @@ -403,7 +401,7 @@ func (b *broker) syncSaveEvent(eventStore *historyStore, pub *wamp.Publish, even
},
}

eventStore.entries = append(eventStore.entries, item)
eventStore.entries.PushBack(item)
}

func (b *broker) syncInitSubscription(topic wamp.URI, match string, subscriber *wamp.Session) (sub *subscription, existingSub bool) {
Expand Down Expand Up @@ -1138,7 +1136,7 @@ func (b *broker) subEventHistory(msg *wamp.Invocation) wamp.Message {

fromPubOp, ok := msg.ArgumentsKw["from_publication"]
if ok {
fromPub = fromPubOp.(wamp.ID)
fromPub, ok = fromPubOp.(wamp.ID)
if !ok || fromPub < 1 {
return &wamp.Error{
Type: msg.MessageType(),
Expand All @@ -1152,7 +1150,7 @@ func (b *broker) subEventHistory(msg *wamp.Invocation) wamp.Message {

afterPubOp, ok := msg.ArgumentsKw["after_publication"]
if ok {
afterPub = afterPubOp.(wamp.ID)
afterPub, ok = afterPubOp.(wamp.ID)
if !ok || afterPub < 1 {
return &wamp.Error{
Type: msg.MessageType(),
Expand All @@ -1165,7 +1163,7 @@ func (b *broker) subEventHistory(msg *wamp.Invocation) wamp.Message {

beforePubOp, ok := msg.ArgumentsKw["before_publication"]
if ok {
beforePub = beforePubOp.(wamp.ID)
beforePub, ok = beforePubOp.(wamp.ID)
if !ok || beforePub < 1 {
return &wamp.Error{
Type: msg.MessageType(),
Expand All @@ -1178,7 +1176,7 @@ func (b *broker) subEventHistory(msg *wamp.Invocation) wamp.Message {

untilPubOp, ok := msg.ArgumentsKw["until_publication"]
if ok {
untilPub = untilPubOp.(wamp.ID)
untilPub, ok = untilPubOp.(wamp.ID)
if !ok || untilPub < 1 {
return &wamp.Error{
Type: msg.MessageType(),
Expand All @@ -1189,19 +1187,20 @@ func (b *broker) subEventHistory(msg *wamp.Invocation) wamp.Message {
}
}

ch := make(chan struct{})
done := make(chan struct{})
b.actionChan <- func() {
defer close(done)

var filteredEvents []storedEvent

if subscription, ok := b.subscriptions[subId]; ok {
if storeItem, ok := b.eventHistoryStore[subscription]; ok {
isLimitReached = storeItem.isLimitReached
isLimitReached = storeItem.atLimit()

fromPubReached := false
afterPubReached := false
untilPubReached := false
var untilPubReached bool

for _, entry := range storeItem.entries {
for i := 0; i < storeItem.entries.Len(); i++ {
entry := storeItem.entries.At(i)
if !fromDate.IsZero() && entry.event.timestamp.Before(fromDate) {
continue
}
Expand All @@ -1214,20 +1213,17 @@ func (b *broker) subEventHistory(msg *wamp.Invocation) wamp.Message {
if !untilDate.IsZero() && entry.event.timestamp.After(untilDate) {
continue
}
if fromPub > 0 && !fromPubReached {
if fromPub != 0 {
if entry.event.Publication != fromPub {
continue
} else {
fromPubReached = true
}
fromPub = 0
}
if afterPub > 0 && !afterPubReached {
if entry.event.Publication != afterPub {
continue
} else {
afterPubReached = true
continue
if afterPub != 0 {
if entry.event.Publication == afterPub {
afterPub = 0
}
continue
}
if beforePub > 0 && entry.event.Publication == beforePub {
break
Expand Down Expand Up @@ -1270,9 +1266,8 @@ func (b *broker) subEventHistory(msg *wamp.Invocation) wamp.Message {
}

events, _ = wamp.AsList(filteredEvents)
close(ch)
}
<-ch
<-done

return &wamp.Yield{
Request: msg.Request,
Expand Down
62 changes: 31 additions & 31 deletions router/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -699,48 +699,48 @@ func TestEventHistory(t *testing.T) {
topic := wamp.URI("nexus.test.exact.topic")
subscription := broker.topicSubscription[topic]
subEvents := broker.eventHistoryStore[subscription].entries
require.Equalf(t, 3, len(subEvents), "Store for topic %s should hold 3 records", topic)
require.Truef(t, broker.eventHistoryStore[subscription].isLimitReached, "Limit for the store for topic %s should be reached", topic)
require.Equalf(t, "nexus.test.exact.topic", subEvents[0].event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic)
require.Equalf(t, 25509, subEvents[0].event.Arguments[0], "Event store for topic %s holds invalid event", topic)
require.Equalf(t, "nexus.test.exact.topic", subEvents[1].event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic)
require.Equalf(t, 25513, subEvents[1].event.Arguments[0], "Event store for topic %s holds invalid event", topic)
require.Equalf(t, "nexus.test.exact.topic", subEvents[2].event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic)
require.Equalf(t, 25517, subEvents[2].event.Arguments[0], "Event store for topic %s holds invalid event", topic)
require.Equalf(t, 3, subEvents.Len(), "Store for topic %s should hold 3 records", topic)
require.Truef(t, broker.eventHistoryStore[subscription].atLimit(), "Limit for the store for topic %s should be reached", topic)
require.Equalf(t, "nexus.test.exact.topic", subEvents.At(0).event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic)
require.Equalf(t, 25509, subEvents.At(0).event.Arguments[0], "Event store for topic %s holds invalid event", topic)
require.Equalf(t, "nexus.test.exact.topic", subEvents.At(1).event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic)
require.Equalf(t, 25513, subEvents.At(1).event.Arguments[0], "Event store for topic %s holds invalid event", topic)
require.Equalf(t, "nexus.test.exact.topic", subEvents.At(2).event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic)
require.Equalf(t, 25517, subEvents.At(2).event.Arguments[0], "Event store for topic %s holds invalid event", topic)

topic = wamp.URI("nexus.test")
subscription = broker.pfxTopicSubscription[topic]
subEvents = broker.eventHistoryStore[subscription].entries
require.Equalf(t, 4, len(subEvents), "Store for topic %s should hold 3 records", topic)
require.Truef(t, broker.eventHistoryStore[subscription].isLimitReached, "Limit for the store for topic %s should be reached", topic)
require.Equalf(t, "nexus.test.exact.topic", subEvents[0].event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic)
require.Equalf(t, 25517, subEvents[0].event.Arguments[0], "Event store for topic %s holds invalid event", topic)
require.Equalf(t, "nexus.test.prefix.catch", subEvents[1].event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic)
require.Equalf(t, 25518, subEvents[1].event.Arguments[0], "Event store for topic %s holds invalid event", topic)
require.Equalf(t, "nexus.test.wildcard.topic", subEvents[2].event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic)
require.Equalf(t, 25519, subEvents[2].event.Arguments[0], "Event store for topic %s holds invalid event", topic)
require.Equalf(t, "nexus.test.wildcard.miss", subEvents[3].event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic)
require.Equalf(t, 25520, subEvents[3].event.Arguments[0], "Event store for topic %s holds invalid event", topic)
require.Equalf(t, 4, subEvents.Len(), "Store for topic %s should hold 3 records", topic)
require.Truef(t, broker.eventHistoryStore[subscription].atLimit(), "Limit for the store for topic %s should be reached", topic)
require.Equalf(t, "nexus.test.exact.topic", subEvents.At(0).event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic)
require.Equalf(t, 25517, subEvents.At(0).event.Arguments[0], "Event store for topic %s holds invalid event", topic)
require.Equalf(t, "nexus.test.prefix.catch", subEvents.At(1).event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic)
require.Equalf(t, 25518, subEvents.At(1).event.Arguments[0], "Event store for topic %s holds invalid event", topic)
require.Equalf(t, "nexus.test.wildcard.topic", subEvents.At(2).event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic)
require.Equalf(t, 25519, subEvents.At(2).event.Arguments[0], "Event store for topic %s holds invalid event", topic)
require.Equalf(t, "nexus.test.wildcard.miss", subEvents.At(3).event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic)
require.Equalf(t, 25520, subEvents.At(3).event.Arguments[0], "Event store for topic %s holds invalid event", topic)

topic = wamp.URI("nexus.test..topic")
subscription = broker.wcTopicSubscription[topic]
subEvents = broker.eventHistoryStore[subscription].entries
require.Equalf(t, 4, len(subEvents), "Store for topic %s should hold 3 records", topic)
require.Truef(t, broker.eventHistoryStore[subscription].isLimitReached, "Limit for the store for topic %s should be reached", topic)
require.Equalf(t, "nexus.test.exact.topic", subEvents[0].event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic)
require.Equalf(t, 25513, subEvents[0].event.Arguments[0], "Event store for topic %s holds invalid event", topic)
require.Equalf(t, "nexus.test.wildcard.topic", subEvents[1].event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic)
require.Equalf(t, 25515, subEvents[1].event.Arguments[0], "Event store for topic %s holds invalid event", topic)
require.Equalf(t, "nexus.test.exact.topic", subEvents[2].event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic)
require.Equalf(t, 25517, subEvents[2].event.Arguments[0], "Event store for topic %s holds invalid event", topic)
require.Equalf(t, "nexus.test.wildcard.topic", subEvents[3].event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic)
require.Equalf(t, 25519, subEvents[3].event.Arguments[0], "Event store for topic %s holds invalid event", topic)
require.Equalf(t, 4, subEvents.Len(), "Store for topic %s should hold 3 records", topic)
require.Truef(t, broker.eventHistoryStore[subscription].atLimit(), "Limit for the store for topic %s should be reached", topic)
require.Equalf(t, "nexus.test.exact.topic", subEvents.At(0).event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic)
require.Equalf(t, 25513, subEvents.At(0).event.Arguments[0], "Event store for topic %s holds invalid event", topic)
require.Equalf(t, "nexus.test.wildcard.topic", subEvents.At(1).event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic)
require.Equalf(t, 25515, subEvents.At(1).event.Arguments[0], "Event store for topic %s holds invalid event", topic)
require.Equalf(t, "nexus.test.exact.topic", subEvents.At(2).event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic)
require.Equalf(t, 25517, subEvents.At(2).event.Arguments[0], "Event store for topic %s holds invalid event", topic)
require.Equalf(t, "nexus.test.wildcard.topic", subEvents.At(3).event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic)
require.Equalf(t, 25519, subEvents.At(3).event.Arguments[0], "Event store for topic %s holds invalid event", topic)

topic = wamp.URI("nexus")
subscription = broker.pfxTopicSubscription[topic]
subEvents = broker.eventHistoryStore[subscription].entries
require.Equalf(t, 20, len(subEvents), "Store for topic %s should hold 20 records", topic)
require.Falsef(t, broker.eventHistoryStore[subscription].isLimitReached, "Limit for the store for topic %s should not be reached", topic)
require.Equalf(t, 20, subEvents.Len(), "Store for topic %s should hold 20 records", topic)
require.Falsef(t, broker.eventHistoryStore[subscription].atLimit(), "Limit for the store for topic %s should not be reached", topic)

//Now let's test Event History MetaRPCs
topic = wamp.URI("nexus.test.exact.topic")
Expand Down Expand Up @@ -885,7 +885,7 @@ func TestEventHistory(t *testing.T) {
// Let's test filtering based on publication ID
topic = wamp.URI("nexus")
subscription = broker.pfxTopicSubscription[topic]
pubId := broker.eventHistoryStore[subscription].entries[4].event.Publication
pubId := broker.eventHistoryStore[subscription].entries.At(4).event.Publication
inv = wamp.Invocation{
Request: wamp.ID(reqId),
Registration: 0,
Expand Down
17 changes: 8 additions & 9 deletions router/realm.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,21 +472,20 @@ func (r *realm) handleInboundMessages(sess *wamp.Session) (bool, bool, error) {
switch msg := msg.(type) {
case *wamp.Publish:
r.broker.publish(sess, msg)
case *wamp.Yield:
r.dealer.yield(sess, msg)
case *wamp.Call:
r.dealer.call(sess, msg)
case *wamp.Cancel:
r.dealer.cancel(sess, msg)
case *wamp.Subscribe:
r.broker.subscribe(sess, msg)
case *wamp.Unsubscribe:
r.broker.unsubscribe(sess, msg)

case *wamp.Register:
r.dealer.register(sess, msg)
case *wamp.Unsubscribe:
r.broker.unsubscribe(sess, msg)
case *wamp.Unregister:
r.dealer.unregister(sess, msg)
case *wamp.Call:
r.dealer.call(sess, msg)
case *wamp.Yield:
r.dealer.yield(sess, msg)
case *wamp.Cancel:
r.dealer.cancel(sess, msg)

case *wamp.Error:
// An INVOCATION error is the only type of ERROR message the
Expand Down
15 changes: 7 additions & 8 deletions wamp/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,22 +71,21 @@ func (s *Session) HasFeature(role, feature string) bool {
// by calling EndRecv.
func (s *Session) RecvDone() <-chan struct{} {
s.mu.Lock()
defer s.mu.Unlock()

if s.done == nil {
s.done = make(chan struct{})
}
d := s.done
s.mu.Unlock()
return d
return s.done
}

// If RecvDone is not yet closed, Goodbye returns nil.
// If RecvDone is closed, Goodbye returns the GOODBYE message that was supplied
// when RecvEnd was called.
func (s *Session) Goodbye() *Goodbye {
s.mu.Lock()
g := s.goodbye
s.mu.Unlock()
return g
defer s.mu.Unlock()
return s.goodbye
}

// EndRecv tells the session to signal messages handlers to stop receiving
Expand All @@ -96,8 +95,9 @@ func (s *Session) Goodbye() *Goodbye {
// with exiting the message handler for other reasons.
func (s *Session) EndRecv(goodbye *Goodbye) bool {
s.mu.Lock()
defer s.mu.Unlock()

if s.goodbye != nil {
s.mu.Unlock()
return false // already ended
}

Expand All @@ -112,7 +112,6 @@ func (s *Session) EndRecv(goodbye *Goodbye) bool {
}
close(s.done)

s.mu.Unlock()
return true
}

Expand Down

0 comments on commit 0e186cf

Please sign in to comment.