Skip to content

Commit

Permalink
MQTT topic matcher (#32)
Browse files Browse the repository at this point in the history
  • Loading branch information
cravler authored Feb 14, 2021
1 parent 9d80ac9 commit c200903
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 7 deletions.
7 changes: 5 additions & 2 deletions v2/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func NewClient(options ...func(*Client)) *Client {
opts: mqtt.NewClientOptions(),
timeout: 60 * time.Second,
store: new(store),
handlers: newTrie(),
handlers: NewTrie(),
}

// Set handlers
Expand Down Expand Up @@ -301,8 +301,11 @@ func (c *Client) Subscribe(key string, channel string, optionalHandler MessageHa
c.handlers.AddHandler(channel, optionalHandler)
}

// https://github.com/eclipse/paho.mqtt.golang/blob/master/topic.go#L78
topic := strings.ReplaceAll(formatTopic(key, channel, options), "#/", "#")

// Issue subscribe
token := c.conn.Subscribe(formatTopic(key, channel, options), 0, nil)
token := c.conn.Subscribe(topic, 0, nil)
return c.do(token)
}

Expand Down
9 changes: 9 additions & 0 deletions v2/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,15 @@ import (
"time"
)

// WithMatcher If "mqtt", then topic matching would follow MQTT specification.
func WithMatcher(matcher string) func(*Client) {
return func(c *Client) {
if "mqtt" == matcher {
c.handlers = NewTrieMQTT()
}
}
}

// WithBrokers configures broker URIs to connect to. The format should be scheme://host:port
// Where "scheme" is one of "tcp", "ssl", or "ws", "host" is the ip-address (or hostname)
// and "port" is the port on which the broker is accepting connections.
Expand Down
52 changes: 48 additions & 4 deletions v2/subtrie.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,10 @@ func (n *node) orphan() {
type trie struct {
sync.RWMutex
root *node // The root node of the tree.
lookup func(query []string, result *[]MessageHandler, node *node)
}

// newTrie creates a new matcher for the subscriptions.
// newTrie creates a new trie without a lookup function.
func newTrie() *trie {
return &trie{
root: &node{
Expand All @@ -57,6 +58,20 @@ func newTrie() *trie {
}
}

// NewTrie creates a new subscriptions matcher using standard emitter strategy.
func NewTrie() *trie {
t := newTrie()
t.lookup = t.lookupEmitter
return t
}

// NewTrieMQTT creates a new subscriptions matcher using standard MQTT strategy.
func NewTrieMQTT() *trie {
t := newTrie()
t.lookup = t.lookupMqtt
return t
}

// AddHandler adds a message handler to a topic.
func (t *trie) AddHandler(topic string, handler MessageHandler) error {
query, rt := newRoute(topic, handler)
Expand Down Expand Up @@ -120,7 +135,7 @@ func (t *trie) Lookup(topic string) []MessageHandler {
return result
}

func (t *trie) lookup(query []string, result *[]MessageHandler, node *node) {
func (t *trie) lookupEmitter(query []string, result *[]MessageHandler, node *node) {

// Add routes from the current branch
for _, route := range node.routes {
Expand All @@ -132,12 +147,41 @@ func (t *trie) lookup(query []string, result *[]MessageHandler, node *node) {

// Go through the exact match branch
if n, ok := node.children[query[0]]; ok {
t.lookup(query[1:], result, n)
t.lookupEmitter(query[1:], result, n)
}

// Go through wildcard match branch
if n, ok := node.children["+"]; ok {
t.lookup(query[1:], result, n)
t.lookupEmitter(query[1:], result, n)
}
}
}

func (t *trie) lookupMqtt(query []string, result *[]MessageHandler, node *node) {
if len(query) == 0 {
// Add routes from the current branch
for _, route := range node.routes {
*result = append(*result, route.Action)
}
}

// If we're not yet done, continue
if len(query) > 0 {
// Go through the exact match branch
if n, ok := node.children[query[0]]; ok {
t.lookupMqtt(query[1:], result, n)
}

// Go through wildcard match branch
if n, ok := node.children["+"]; ok {
t.lookupMqtt(query[1:], result, n)
}

// Go through wildcard match branch
if n, ok := node.children["#"]; ok {
for _, route := range n.routes {
*result = append(*result, route.Action)
}
}
}
}
2 changes: 1 addition & 1 deletion v2/subtrie_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ func TestRoute(t *testing.T) {
}

func TestTrieMatch(t *testing.T) {
m := newTrie()
m := NewTrie()
testPopulateWithStrings(m, []string{
"a/",
"a/b/c/",
Expand Down

0 comments on commit c200903

Please sign in to comment.